central/docs/PRODUCER-INTEGRATION.md

823 lines
34 KiB
Markdown
Raw Normal View History

docs(2-I): producer integration spec — docs/PRODUCER-INTEGRATION.md The producer-side contract for adapter authors, mirroring PR H's consumer spec. Self-contained — readers should not need to grep the codebase to understand what a new SourceAdapter subclass must implement. Bakes in the Phase 2 design principle ("Central takes it all and gives it all. It's up to the pipe to do with it what it will.") so future authors reject enrichment / silent-drop / opinionated-translation proposals on sight. The previously-proposed Phase 3 NWIS metadata-enrichment ticket is called out by name as an example of what gets rejected. 12-section outline locked with PM: design principle, quick start (clone swpc_kindex), SourceAdapter base class, settings, subject namespace, dedup keys, StreamEntry registry, removal/fall-off, anti-patterns, preview hook, acceptance gate. Sibling test (tests/test_producer_doc.py) mirrors test_consumer_doc.py discipline: - bidirectional == between SourceAdapter API and §4 method coverage - preview_for_settings contract verbatim against live docstring - top-level domain enumeration vs central.streams.STREAMS prefixes - §8 STREAMS snippet vs central.streams.STREAMS - anti-patterns adapter-name examples vs discover_adapters() No hardcoded stream / adapter / domain lists anywhere in the test — every expected value derives from central.streams, central.adapter_discovery, or central.adapter at runtime. Honest about the pre-existing `:` vs `|` dedup-key separator inconsistency (swpc_alerts and swpc_protons use `|`; everyone else uses `:`). Recommends `:` for new adapters without forcing a rename PR on the SWPC pair (separators are persisted in cursors.db rows). Acceptance bars: (a) grep -rn 'subject_for_event\|_ADAPTER_REGISTRY' src tests → empty (b) bidirectional override-method coverage asserted in test (c) tests/test_producer_doc.py → 6/6 pass (d) full pytest suite → 469 pass (was 463 pre-PR; +6 new) (e) doc length: 823 lines (within 500–1200 envelope) (f) code fences balanced; JSON/Python blocks parse Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 21:17:48 +00:00
# Central — Producer Integration
How to add a new `SourceAdapter` subclass to Central. This is the contract for
adapter authors: what to implement, what NOT to implement, and how a new feed
plugs into the existing stream / subject / dedup machinery without restating
anything from `docs/CONSUMER-INTEGRATION.md`.
The companion document is [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md) — the contract
for downstream consumers. Producer-side semantics are in scope here; consumer-side
semantics (wire format, dedup recommendations, fall-off handling at the subscriber)
are intentionally not restated. Cross-references point into that doc.
## Table of contents
1. [What this doc is for](#1-what-this-doc-is-for)
2. [The design principle](#2-the-design-principle)
3. [Quick start: cloning an existing adapter](#3-quick-start-cloning-an-existing-adapter)
4. [The SourceAdapter base class](#4-the-sourceadapter-base-class)
5. [Settings and configuration](#5-settings-and-configuration)
6. [Subject namespace conventions](#6-subject-namespace-conventions)
7. [Dedup key construction](#7-dedup-key-construction)
8. [The StreamEntry registry](#8-the-streamentry-registry)
9. [Removal / fall-off patterns](#9-removal--fall-off-patterns)
10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do)
11. [Settings preview hook](#11-settings-preview-hook)
12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter)
---
## 1. What this doc is for
**Audience:** an engineer (or a code-gen agent acting as one) writing a new
`SourceAdapter` subclass in `src/central/adapters/`.
**Covered:** the SourceAdapter contract, the StreamEntry registry, subject
namespace conventions, dedup-key construction, the settings preview hook, the
mechanical checklist a new-adapter PR must satisfy.
**NOT covered:** end-to-end walkthroughs of one specific adapter (read the source
for the closest existing adapter — `src/central/adapters/swpc_kindex.py` is the
smallest), upstream feed API documentation (lives upstream), or consumer-side
concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)).
---
## 2. The design principle
> "Central takes it all and gives it all. It's up to the pipe to do with it
> what it will."
> — Matt, 2026-05-19
Adapter authors translate that single sentence into a small number of concrete
rules:
- **Preserve every upstream field.** Anything the upstream returns lives in
`Event.data` verbatim. Adapters do not silently drop fields, even ones that
look redundant or low-value today.
- **No enrichment.** Adapters do not reverse-geocode, do not call out to
upstream metadata endpoints during normal `poll()` flow, do not consult a
second source to "fill in" a missing field. If a downstream consumer wants
enrichment, that is consumer-side work.
- **No opinionated translation.** Adapters do not coerce units, do not rename
fields to match a Central-wide vocabulary, do not collapse upstream
enumerations into Central's preferred labels.
- **The only adapter-side transforms are mechanical.** Specifically:
subject-token normalization (camelCase → snake_case, agency-prefix splitting,
whitespace → underscore, lowercase) and dedup-key construction. Both are
deterministic functions of upstream identifiers. Nothing else.
This rules out a whole category of plausible-sounding work that prior reviews
have already rejected. For instance, "enrich NWIS site rows with USGS
monitoring-locations metadata during `poll()`" was proposed for Phase 3 and
killed on this principle. The producer adds the field-preserving pipe; the pipe
ends at JetStream publish; everything richer is a downstream concern.
See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of
anti-patterns. Future authors should reject the same proposals on the same
grounds.
---
## 3. Quick start: cloning an existing adapter
The cheapest way to start a new adapter is to copy the smallest existing one
and edit. As of this writing the smallest is `swpc_kindex` (one fixed subject,
no region filter, no preview hook).
**File layout for a one-file adapter:**
```
src/central/adapters/<your_adapter>.py # the SourceAdapter subclass
tests/test_<your_adapter>.py # unit tests
```
When the upstream API needs shared utilities (e.g. WFIGS shares helpers across
`wfigs_incidents` and `wfigs_perimeters`), factor them into a sibling
`<family>_common.py` module — see `wfigs_common.py` and `swpc_common.py` for
the existing precedent.
**Minimum overrides:**
```python
from collections.abc import AsyncIterator
from pathlib import Path
import aiohttp
from pydantic import BaseModel
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
class ExampleSettings(BaseModel):
# Adapter-tunable settings live here. Pydantic v2 model. Forbid extras
# if you want strictness; otherwise extra-field silence is the default.
pass
class ExampleAdapter(SourceAdapter):
name = "example"
display_name = "Example Source"
description = "One-line operator-facing description."
settings_schema = ExampleSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
async def apply_config(self, new_config: AdapterConfig) -> None:
# Re-derive any cached settings from new_config.settings.
pass
def subject_for(self, event: Event) -> str:
return f"central.<domain>.example"
async def poll(self) -> AsyncIterator[Event]:
# Fetch, parse, yield Events. Dedup using a sqlite3 cursor_db
# connection initialized in startup() — see swpc_kindex for the
# established `published_ids` table shape.
if False:
yield # type: ignore[unreachable]
return
```
Auto-discovery picks the adapter up the moment the module imports cleanly. No
registry edit is required — `central.adapter_discovery.discover_adapters()`
walks `central.adapters.*` at import time and registers every concrete
`SourceAdapter` subclass keyed by its `name` class-attribute.
---
## 4. The SourceAdapter base class
`src/central/adapter.py` is the SoT. The contract is reproduced here verbatim
where the docstrings already capture it; this section adds the supervisor-side
context that the source file does not narrate.
### 4.1 Class attributes
Every subclass must define:
| Attribute | Type | Purpose |
|---|---|---|
| `name` | `str` | Short identifier, e.g. `"nws"`. Becomes the `adapter` key in `Event.adapter` and the discovery-registry key. Must be unique across `central.adapters.*`. |
| `display_name` | `str` | Human-readable name shown in the GUI. |
| `description` | `str` | Short description shown on the adapter detail page. |
| `settings_schema` | `type[BaseModel]` | Pydantic v2 model class for adapter settings. See [§5](#5-settings-and-configuration). |
| `default_cadence_s` | `int` | Default poll interval in seconds. Operators may override via `config.adapters`; `AdapterConfig.cadence_s` has a Pydantic `ge=10` floor. |
Optional class attributes (default to `None` / not-in-wizard):
| Attribute | Type | Purpose |
|---|---|---|
| `requires_api_key` | `str \| None` | Key alias if an API key is required, else `None`. |
| `api_key_field` | `str \| None` | Names the `settings_schema` field that holds the api_key alias reference. The GUI renders this as a select populated from `config.api_keys`; the wizard validates it against the staged `api_keys` state. |
| `wizard_order` | `int \| None` | Position in the setup wizard. `None` excludes the adapter from the wizard. |
### 4.2 Required methods
Every subclass must implement these three abstract methods.
**`async def poll(self) -> AsyncIterator[Event]`**
> Poll the source for new events. Yields `Event` objects for each new/updated
> event found.
The supervisor drives the loop; `poll()` is a one-shot async generator the
supervisor calls once per cadence tick, drains, and discards. Adapters MUST NOT
loop internally — control of cadence and concurrency lives in the supervisor.
The supervisor wraps each yielded `Event` in a CloudEvents envelope and
publishes it. The adapter is responsible only for yielding `Event` objects;
all envelope construction, NATS publish, and metadata heartbeat emission live
outside the adapter.
**`async def apply_config(self, new_config: AdapterConfig) -> None`**
> Apply new configuration to the adapter. Called by supervisor when config
> changes via hot-reload. The adapter should extract relevant settings from
> `new_config.settings` and update its internal state.
`new_config.settings` is a `dict[str, Any]`; re-validate it against
`settings_schema` if you intend to read structured fields. Most adapters cache
a couple of fields (e.g. `region`, `parameter_codes`) on `self` during
`__init__` and refresh them here. Do NOT tear down `self._session` or
`self._db` here — that is `shutdown()`'s responsibility.
**`def subject_for(self, event: Event) -> str`**
> Compute the NATS subject for an event. Each adapter knows its own subject
> hierarchy. The supervisor calls this to determine where to publish each
> event.
The result must match the subject filter of exactly one `StreamEntry` in
`central.streams.STREAMS`. See [§6](#6-subject-namespace-conventions) for the
naming rules and [§8](#8-the-streamentry-registry) for the stream side.
### 4.3 Optional lifecycle hooks
**`async def startup(self) -> None`** — Optional. Called once before the first
`poll()`. Open the HTTP session, open the sqlite3 cursor DB, create any
required tables. The default is a no-op.
**`async def shutdown(self) -> None`** — Optional. Called once at graceful
shutdown. Close the HTTP session, close the sqlite3 cursor DB. The default is
a no-op.
**`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** —
Optional. The settings-page preview hook. The default returns `None` (no
preview). See [§11](#11-settings-preview-hook) for the contract.
### 4.4 Constructor signature
The supervisor instantiates adapters with this exact keyword-argument call
(`src/central/supervisor.py` `_create_adapter`):
```python
cls(
config=config, # AdapterConfig — name, enabled, cadence_s, settings, …
config_store=self._config_store, # ConfigStore — staged-vs-live config access
cursor_db_path=CURSOR_DB_PATH, # Path — /var/lib/central/cursors.db
)
```
Adapters take exactly these three keyword arguments. Adding new constructor
parameters silently breaks supervisor instantiation; if you need additional
state, derive it from `config.settings` or read it lazily during `startup()`.
---
## 5. Settings and configuration
### 5.1 The settings schema
`settings_schema` is a Pydantic v2 `BaseModel` subclass. It is the SoT for
operator-tunable fields and the basis for GUI form rendering, wizard
validation, and migration seed defaults. Three rules:
- **Default values are SoT.** The settings model's defaults are the canonical
starting state. Migrations seed `config.adapters.settings` from the same
model; tests build fixtures by instantiating the model with no args. Do NOT
duplicate default lists in migration JSON, fixtures, or test code — derive
them from the model class.
- **Use `RegionConfig` for bounding boxes.** Defined in
`central.config_models.RegionConfig`. Validates north/south/east/west,
rejects zero-width bboxes, rejects antimeridian crossings. Reuse it; do not
reinvent it.
- **Optional region.** Some adapters strongly recommend a region (NWIS, WFIGS)
but accept `None` so an operator can test before configuring. Log a WARN at
`startup()` if a region is recommended but unset; do not refuse to start.
### 5.2 What gets stored where
| State | Location | Adapter access |
|---|---|---|
| Adapter config (cadence, settings, enabled flag, paused state) | DB table `config.adapters`, fronted by `ConfigStore` | Passed in `__init__` as `config: AdapterConfig`; refreshed via `apply_config()` |
| API keys | DB table `config.api_keys`, fronted by `ConfigStore` | Read via `self._config_store.get_api_key(alias)`; never persisted on the adapter |
| Dedup cursor / observed sets | sqlite3 at `cursor_db_path` (`/var/lib/central/cursors.db`) | Each adapter creates its own table(s); the file is shared across adapters by convention but per-adapter rows are namespaced by the `adapter` column |
| Stream retention / max_bytes | DB table `config.streams`; managed by supervisor | Adapters do not read or write stream config |
`AdapterConfig.settings` is a `dict[str, Any]` — re-validate against
`settings_schema` before consuming structured fields. Most adapters do this
once in `__init__` and again in `apply_config()`.
### 5.3 preview_for_settings boundary
`preview_for_settings()` is the one place that does NOT have access to
`self._config_store` or `cursor_db_path` semantics. See
[§11](#11-settings-preview-hook). The framework may instantiate adapters
with a stub config store solely to call this method, and the GUI process never
calls `startup()`. Do not assume the adapter's HTTP session, cursor DB, or any
other startup-time state exists when preview runs.
---
## 6. Subject namespace conventions
### 6.1 The shape
Every subject starts with `central.` and decomposes as:
```
central.<domain>.<subtype>[.<dimensions>...]
```
- `<domain>` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`,
`meta` (the current set — see [§8](#8-the-streamentry-registry) for adding
one). Operators MUST be able to subscribe to all of one domain with
`central.<domain>.>`.
- `<subtype>` is adapter-driven and identifies the event category within the
domain (e.g. `hotspot`, `alert`, `incident`, `kindex`, `proton_flux`).
- `<dimensions>` are zero or more refinement tokens — geographic, temporal,
upstream identifier — that the consumer would plausibly want to filter on.
Token rules:
- **All lowercase.** No uppercase, no mixed-case.
- **`snake_case` tokens.** Multi-word tokens use underscores
(`proton_flux`, `sea_lake_ice`), not hyphens.
- **No empty tokens.** Fall back to a literal `unknown` token when a dimension
is genuinely absent (NWS does this for alerts lacking a primary region).
- **No `.removed` injected mid-token.** Removal subjects always read
`<subtype>.removed.<region>` — see [§9](#9-removal--fall-off-patterns).
### 6.2 Where the canonical subjects live
`docs/CONSUMER-INTEGRATION.md` §4 is the SoT for the concrete subject patterns
that exist today. Adapter authors should mirror new subjects there as part of
the same PR; the consumer test suite asserts every discovered adapter has a
per-adapter subsection.
### 6.3 Subject-token decomposition belongs in the adapter
When an upstream identifier is composite — agency-prefix + bare site number,
state + county, satellite + confidence — the adapter is responsible for
splitting it into tokens. Two conventions:
- **One helper function per decomposition, one place.** NWIS' helper sits at
module scope in `nwis.py`:
```python
def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]:
"""Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no)."""
...
```
Both `subject_for()` and `Event.category` construction call through it. WFIGS
does the same with `subject_suffix()` in `wfigs_common.py`. Never inline the
decomposition in two places — they will drift.
- **Round-trip via `Event.category`.** Adapters set `Event.category` to a
Central-relative form (`hydro.<param>.<agency>.<bare_site_no>`,
`fire.incident.<state>.<county>`); `subject_for()` then prepends `central.`
and may rearrange tokens. The category lives in the wire payload (see
[`CONSUMER-INTEGRATION.md` §5b](./CONSUMER-INTEGRATION.md#5b-inner-event-payload));
the subject is computed from it at publish time.
### 6.4 Adding a new top-level domain
Top-level domains are stream-mapped 1:1 (see [§8](#8-the-streamentry-registry)).
Adding `central.<new_domain>.>` requires:
1. One new `StreamEntry` in `src/central/streams.py`.
2. One new migration row that seeds `config.streams` with retention and
`max_bytes`.
3. Update `docs/CONSUMER-INTEGRATION.md` §3 and §4 to advertise the new
stream / subject pattern.
`supervisor.STREAM_SUBJECTS`, `archive.STREAMS`, and `gui.DASHBOARD_STREAMS`
all derive from `STREAMS` at import time; no code edits in those modules are
required.
---
## 7. Dedup key construction
### 7.1 What "dedup key" means here
Adapters maintain a per-adapter sqlite3 table at `cursor_db_path` (typically
`published_ids` keyed by `(adapter, event_id)`) to suppress re-yielding the
same event across successive `poll()` runs. The string the adapter writes to
`published_ids.event_id` is the **adapter-side dedup key**.
Separately, every yielded `Event.id` flows into the CloudEvents envelope's
`id` field (also set as the `Nats-Msg-Id` header on publish). Most adapters
use the same string for both — adapter-side dedup key and `Event.id` are
identical — which keeps consumer-side dedup ([§9 of CONSUMER-INTEGRATION.md](./CONSUMER-INTEGRATION.md#9-dedup-implementation-guide))
trivial.
A few adapters intentionally diverge: `eonet` stores a richer composite
adapter-side (id + timeline date) while keeping `Event.id` stable, because
consumers usually want timeline updates to overwrite the same id. The
`Event.id` is always the consumer contract; adapter-side composites are an
implementation detail.
### 7.2 Two shapes
**Single-token.** Upstream provides a stable, globally-unique identifier:
- `usgs_quake` — USGS feature id (`ci10240102`)
- `gdacs` — RSS guid (`WF1028708`)
- `wfigs_incidents` / `wfigs_perimeters` — IRWIN UUID
- `inciweb` — numeric incident id
- `swpc_kindex` — bin timestamp (single fixed subject, time is the natural key)
- `nws` — alert URL
- `eonet``EONET_NNNNN` (consumer-facing); composite tracked separately
adapter-side as described in §7.1
Use the upstream id directly. No prefix, no separators.
**Composite.** Upstream id is not by itself sufficient — needs a parameter, a
timestamp, or a tombstone marker to dedup correctly:
| Adapter | Shape |
|---|---|
| `firms` | `<satellite>:<acq_date>:<acq_time>:<lat_3dp>:<lon_3dp>` |
| `nwis` | `nwis:<monitoring_location_id>:<parameter_code>:<time_iso>` |
| `wfigs_incidents` / `wfigs_perimeters` removals | `<IrwinID>:removed:<iso_now>` |
### 7.3 Separator convention
Use `:` as the dedup-key separator for new adapters.
There is a pre-existing wrinkle: two SWPC adapters use `|` instead.
| Adapter | Separator | Shape |
|---|---|---|
| `swpc_alerts` | `\|` | `<product_id>\|<issue_timestamp>` |
| `swpc_protons` | `\|` | `<time_tag>\|<energy>` |
This is the historical reality at the time of writing — not a recommendation.
Do not propagate `|` to new adapters; do not refactor the SWPC pair to `:`
without an independent rename PR (the strings are persisted in
`/var/lib/central/cursors.db` rows).
### 7.4 Implementation
Every existing adapter follows the same pattern:
```python
async def startup(self) -> None:
...
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
```
Then `is_published(event_id) -> bool`, `mark_published(event_id) -> None`, and
a periodic `sweep_old_ids()` (typical retention: 7 to 30 days, tuned per
adapter). Copy from `swpc_kindex.py` — it is the canonical reference shape.
### 7.5 The CloudEvents `id` field
`Event.id` becomes the CloudEvents envelope `id` and the `Nats-Msg-Id` header.
JetStream applies its own short dedup window keyed on `Nats-Msg-Id`, but
consumers do their own dedup ([CONSUMER-INTEGRATION.md §9](./CONSUMER-INTEGRATION.md#9-dedup-implementation-guide))
and should not rely on JetStream's window. Stability of `Event.id` across
re-publishes is the producer's contract; the consumer side trusts it.
---
## 8. The StreamEntry registry
`src/central/streams.py` is the SoT for stream definitions. The whole file is
short enough to quote:
```python
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True
dashboard: bool = True
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
```
### 8.1 What derives from STREAMS
Three import-time consumers read `STREAMS` and build their own structures:
| Derived constant | Module | Shape |
|---|---|---|
| `STREAM_SUBJECTS` | `central.supervisor` | `dict[str, list[str]]` — used to create JetStream streams |
| `STREAMS` | `central.archive` | `list[tuple[str, str]]` for `event_bearing=True` entries only |
| `DASHBOARD_STREAMS` | `central.gui.routes` | `list[str]` for `dashboard=True` entries |
`tests/test_stream_registry.py` asserts each derivation matches the registry —
adding a stream automatically extends the test surface; no new test code is
required.
### 8.2 Adding a stream
When a new adapter needs a new top-level domain:
1. **One line in `src/central/streams.py`.** Append a `StreamEntry(...)` to
the `STREAMS` list.
2. **One migration row.** Seed `config.streams` with retention and
`max_bytes`. Existing migrations in `migrations/` are the template.
`event_bearing=False` is reserved for `CENTRAL_META` today; adding a second
non-event-bearing stream silently excludes it from the archive. The registry
test explicitly guards against this and requires a deliberate test update if
the invariant changes.
### 8.3 The consistency gate
`tests/test_stream_registry.py` is the property-test gate. It asserts:
- Stream names and subject filters are unique.
- Every subject filter matches `^central\.[a-z][a-z_]*\.>$`.
- `CENTRAL_META` is the only non-event-bearing stream.
- The supervisor / archive / GUI derivations all match the registry.
Mirror that discipline in any new test you write for adapter-side stream
behavior: derive expected values from `STREAMS` rather than hand-typing them.
---
## 9. Removal / fall-off patterns
When an upstream record disappears, an adapter has two options:
### 9.1 Explicit removal subject
Emit an `Event` whose subject is `<subtype>.removed.<region>` and whose
`category` ends in `.removed`. This is the right choice when:
- Upstream signals retirement explicitly (`gdacs`'s `iscurrent: false`).
- The upstream is a "current state" feed and absence is meaningful
(`wfigs_incidents`, `wfigs_perimeters`, `eonet`).
The subtype-before-removed token order is a hard convention — consumers
filtering on `central.<domain>.<subtype>.>` receive both live and removal
events with a single subscription. See
[`CONSUMER-INTEGRATION.md` §7a](./CONSUMER-INTEGRATION.md#7a-explicit-removal-subjects-consumer-must-subscribe-to-handle-cleanly)
for the consumer side.
### 9.2 Absence-as-signal
No removal subject; consumers infer disappearance from a time gap or an
upstream-specific marker (`expires` field, supersession text). This is the
right choice when:
- The upstream is a stream of point-in-time observations (`firms` hotspot
pixels, `swpc_kindex` 3-hour bins, `swpc_protons` cadence-driven samples).
- Upstream signals expiry inline (`nws` `expires` field, `nws`
`messageType: Cancel`).
- The upstream is durable / append-only (`usgs_quake` catalog, `inciweb` RSS).
See [`CONSUMER-INTEGRATION.md` §7b](./CONSUMER-INTEGRATION.md#7b-absence-as-signal-no-removal-subject--consumer-infers-from-time-gap)
for the consumer-side enumeration.
### 9.3 Composite removal dedup key
When an adapter emits explicit removals, the same upstream id can fall off
and re-appear multiple times over an adapter's lifetime. The dedup key for a
removal event therefore needs a removal-detection timestamp:
```python
Event(
id=f"{irwin_id}:removed:{now.isoformat()}",
category="fire.incident.removed",
# ...
)
```
The `:removed:<iso_now>` suffix is the cross-adapter convention. Keep it.
### 9.4 Removal payload schema
Removal events ship an empty `geo` block and a sparse `data` payload. The
exact shape (id field name, reason values, last_observed_at, adapter-specific
context fields) is documented in
[`CONSUMER-INTEGRATION.md` §7c](./CONSUMER-INTEGRATION.md#7c-removal-event-payload-schema-cross-cutting);
adapter authors should mirror it. Do not restate it here.
---
## 10. Anti-patterns — what NOT to do
These are the patterns prior reviews have explicitly rejected. Reject them
again on sight in a new-adapter PR.
### 10.1 Enrichment during `poll()`
No calls to upstream metadata endpoints, no reverse-geocode, no consultation
of a second source to fill in fields the primary feed omitted. The "NWIS
enrichment" Phase 3 proposal — joining live measurements against the
monitoring-locations metadata endpoint during `poll()` — was rejected on the
[§2](#2-the-design-principle) principle. Future proposals along the same
lines get the same answer.
If enrichment is genuinely necessary, the right shape is a separate adapter
(or a downstream consumer) — not an `if metadata_missing: await
fetch_metadata()` branch buried in an adapter's `poll()`.
### 10.2 Silent field dropping
`Event.data` carries the upstream payload verbatim. Do not omit a field
because it "looks redundant" or "isn't useful right now." Operators
debugging a downstream consumer often need every field the upstream sent;
dropping fields makes that impossible after the fact.
The two acceptable transforms — token normalization (case, separators) and
dedup-key construction — are not field-drops. They produce new fields
derived from the upstream payload; the upstream payload itself is preserved.
### 10.3 Magic-number asserts in tests
Adapter tests that pin event counts to literal integers (`assert
len(events) == 47`) break the next time the upstream's fixture is
refreshed and contribute zero diagnostic value. Either assert structural
invariants (every yielded event has a non-empty `id`; subjects match the
adapter's `subject_for()` output; etc.) or assert against a count computed
from the input fixture (`assert len(events) == len(fixture.features)`).
### 10.4 Hardcoded stream / adapter lists in tests
Tests that enumerate `["CENTRAL_WX", "CENTRAL_FIRE", ...]` or `["firms",
"inciweb", ...]` as literals drift the moment a stream or adapter is added
or removed. Derive from the SoT:
```python
from central.streams import STREAMS
from central.adapter_discovery import discover_adapters
stream_names = {s.name for s in STREAMS}
adapter_names = set(discover_adapters().keys())
```
`tests/test_stream_registry.py` and `tests/test_consumer_doc.py` are the
in-tree templates. The sibling test for this doc
(`tests/test_producer_doc.py`) follows the same rule.
### 10.5 Persistent state outside `cursor_db_path`
Adapters MUST NOT write to disk outside the cursor DB at
`cursor_db_path`. No JSON files in `/var/lib/central/<adapter>/`, no
state stashed in `/tmp`. Multi-process invariants (the supervisor may
restart an adapter; the GUI may instantiate it independently to call
`preview_for_settings()`) only hold for the cursor DB and `config.adapters`.
### 10.6 Blocking calls in async paths
`poll()`, `apply_config()`, `startup()`, `shutdown()`, and
`preview_for_settings()` are async. Use `aiohttp`, not `requests`; use
`asyncio.to_thread()` to push CPU-bound or genuinely blocking work
(`shapely` geometry operations, large JSON parsing) off the event loop.
The supervisor runs every adapter on a single event loop — one blocked
adapter stalls every other adapter.
### 10.7 Reaching into framework internals from `preview_for_settings`
`preview_for_settings()` is a pure function of `settings`. Do NOT access
`self._config_store`, `self._cursor_db_path`, or any state established
in `startup()`. The framework may instantiate the adapter with a stub
config store solely to call this method. See [§11](#11-settings-preview-hook).
### 10.8 Hardcoded subject strings in tests
A test that asserts `subject == "central.fire.incident.mt.flathead"` for a
specific fixture is fine; a test that hardcodes the subject pattern as a
string and re-derives it instead of calling the adapter's
`subject_for(event)` is the wrong shape. Round-trip through the adapter so
the test breaks the day someone shifts a token.
---
## 11. Settings preview hook
`preview_for_settings()` is the optional GUI-side hook that surfaces a
settings-driven preview on the adapter edit page. NWIS uses it to show the
monitoring-locations inside the configured bbox before the operator hits
save; most adapters opt out by inheriting the base class's `return None`.
### 11.1 The contract (verbatim from `central.adapter.SourceAdapter`)
> Optional. Override to surface a settings-driven preview on the edit page.
>
> Return `list[dict]` (framework renders as a generic table; columns come from
> the first dict's keys, in insertion order). Return `None` to skip preview.
> Raise to surface an error banner — framework catches at the route boundary.
>
> Contract:
> - Preview is a pure function of `settings`. Do NOT access
> `self._config_store` or `cursor_db` state — the framework may instantiate
> adapters with a stub config_store solely to call this method.
> - Network preview implementations must open their own short-lived
> `aiohttp` session (the adapter's polling session may not exist; the GUI
> process never calls `startup()`).
> - Return `None` when preview is not meaningful (e.g., required settings
> like region are unset). Return `[]` explicitly if the query ran and
> matched zero rows — the framework renders that distinctly from `None`.
### 11.2 The three return states
| Return | Framework renders |
|---|---|
| `None` | Nothing — no preview block on the edit page |
| `[]` | `"Preview (0 rows)"` legend, no table |
| `list[dict]` with rows | Generic table; columns from the first dict's keys in insertion order; legend `"Preview (N rows)"` |
`tests/test_preview_hook.py` is the test surface for the partial; mirror its
discipline for any adapter-specific preview test.
### 11.3 Generic rendering — no per-adapter Jinja
The framework is duck-typed on `list[dict]`. Column headers are the first
dict's keys. There is no per-adapter branch in the template, and no
adapter-name conditionals are accepted on review. If an adapter's preview
needs a column the framework does not surface, the adapter rewrites the dict
shape, not the template.
### 11.4 Implementation pattern
NWIS is the canonical example. Skeleton:
```python
async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None:
if settings.region is None:
return None
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=15),
) as session:
async with session.get(url, headers=...) as resp:
resp.raise_for_status()
page = await resp.json()
return [
{"site_id": ..., "name": ..., "site_type": ..., "state": ...}
for feat in page.get("features") or []
]
```
Key points:
- Fresh `aiohttp.ClientSession` per call — `self._session` may not exist.
- Short timeout (NWIS uses 15s) — the edit page must render quickly.
- Small row cap (NWIS uses 50) — the preview is for orientation, not bulk
inspection.
- Raise on HTTP / parse failure — the framework catches at the route boundary
and renders an error banner.
---
## 12. Acceptance gate for a new adapter
A new-adapter PR is accepted when every item below holds. The list is
mechanical; reviewers and the PR author should both run through it before
requesting / granting merge.
- [ ] **Upstream curl evidence captured.** A real HTTP transcript (curl
command + redacted response body) is pinned somewhere in the PR
description or a fixture file. Documents what the adapter was written
against.
- [ ] **`SourceAdapter` subclass conforms to [§4](#4-the-sourceadapter-base-class).**
Class attributes set, three abstract methods implemented, constructor
signature matches the supervisor call exactly.
- [ ] **`tests/test_<adapter>.py` exists.** Unit tests for the adapter; no
magic-number asserts ([§10.3](#103-magic-number-asserts-in-tests)); no
hardcoded stream / adapter lists
([§10.4](#104-hardcoded-stream--adapter-lists-in-tests)).
- [ ] **Dry-run on CT104 passes.** The supervisor instantiates the adapter
without errors; one full `poll()` cycle yields events without raising.
- [ ] **Dedup keys stable across multiple ticks.** Two successive `poll()`
runs against a frozen upstream fixture yield the same event-count delta
as expected (zero new events on the second run if upstream is
unchanged).
- [ ] **No hardcoded values that should be derived.** Subject prefixes,
stream names, default parameter lists, etc., all derive from the
registry / settings model / discovery surface.
- [ ] **`CONSUMER-INTEGRATION.md` updated.** New per-adapter subsection in
§6, new subject row in §4, new stream row in §3 if a new domain was
introduced. `tests/test_consumer_doc.py` passes.
- [ ] **`PRODUCER-INTEGRATION.md` mentions the adapter only if it
introduces a new convention.** This doc is contract-shaped, not
catalog-shaped — most new adapters need no edit here.
- [ ] **Full pytest suite green.**
---