Second of three PRs for v0.5.0 (J shipped the framework; this fills in real backends + documents the reframed design principle in-tree; L is the events tab + map fix, then tag). Backends (all satisfy GeocoderBackend; never raise, all-null on any failure): - NaviBackend — composed Navi /api/reverse/<lat>/<lon> (name/address + timezone + landclass + elevation in one call). Near-passthrough: response already matches the canonical 9-field shape. Best-effort warmup ping (Boise) on construction when a loop is running; config `headers` slot for a future Authorization: Bearer (config-only, no code change). Default base_url http://192.168.1.130:8440. - PhotonBackend — raw Photon /reverse?lat&lon&limit=1 (name/address only). Maps features[0].properties; postal_code <- postcode; timezone/landclass/ elevation_m null (Navi-composed-endpoint extras). - NominatimBackend — OSM Nominatim /reverse?format=jsonv2 (name/address only). Configurable rate limit (default 1/sec; 0 disables for self-hosted) + required User-Agent. Maps the address block; landclass/elevation_m/timezone null. Registered all three in supervisor _BACKEND_REGISTRY (resolved by EnrichmentConfig backend_class name). Docs — design pivot now in-tree: - PRODUCER §2 reframed: the verbatim Matt quote stays; the translation inverts. Central is the consumer's only data plane (consumers can't do follow-up lookups), so enrich deliberately and centrally, namespaced under _enriched, failing to null. "No enrichment" is gone. - PRODUCER §10.1 inverted: enrichment is expected; the anti-pattern is doing it OUTSIDE the framework (inline in poll(), bypassing cache + _enriched namespacing + the never-raise safety net). - PRODUCER new §13 Enrichment contract: Enricher / GeocoderEnricher / GeocoderBackend Protocols, NoOpBackend default, sqlite cache + TTL + cache-all-null + don't-cache-on-raise semantics, _enriched.<name> provenance, per-field coverage matrix (cross-checked against GEOCODER_FIELDS), and the landclass antimeridian known wrinkle. - CONSUMER FIRMS section: documents the data._enriched.geocoder bundle (9 fields), per-region coverage (US-full, non-US timezone+elevation), and the antimeridian landclass caveat. Tests: - test_navi/photon/nominatim_backend.py — happy-path field mapping, null handling, extra-key drop, network/timeout/non-200/malformed -> all-null (never raises), Nominatim rate-limit (disabled + spacing) + User-Agent. Env-gated live Navi smoke (NAVI_INTEGRATION_TEST=1; skipped by default — the 192.168.1.130 endpoint isn't reachable from CT104's segment). - test_producer_doc.py — +4: §2 verbatim quote present, §10.1 subsection exists, §13 names all four protocol types, §13 coverage matrix == GEOCODER_FIELDS (derived from code, not hardcoded). Verification: full pytest 525 passed, 1 skipped (was 495; +30 backend + 4 doc tests, -1 the env-gated skip). grep subject_for_event/_ADAPTER_REGISTRY clean. All three backends import + resolve via the registry. Flagged for later (NOT done here): adapters besides FIRMS that should declare enrichment_locations (nwis, eonet, gdacs, usgs_quake, wfigs_*) — that's PR L scope alongside the events tab. See PR description. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
41 KiB
Central — Producer Integration
How to add a new SourceAdapter subclass to Central. This is the contract for
adapter authors: what to implement, what NOT to implement, and how a new feed
plugs into the existing stream / subject / dedup machinery without restating
anything from docs/CONSUMER-INTEGRATION.md.
The companion document is CONSUMER-INTEGRATION.md — the contract
for downstream consumers. Producer-side semantics are in scope here; consumer-side
semantics (wire format, dedup recommendations, fall-off handling at the subscriber)
are intentionally not restated. Cross-references point into that doc.
Table of contents
- What this doc is for
- The design principle
- Quick start: cloning an existing adapter
- The SourceAdapter base class
- Settings and configuration
- Subject namespace conventions
- Dedup key construction
- The StreamEntry registry
- Removal / fall-off patterns
- Anti-patterns — what NOT to do
- Settings preview hook
- Acceptance gate for a new adapter
- Enrichment contract
1. What this doc is for
Audience: an engineer (or a code-gen agent acting as one) writing a new
SourceAdapter subclass in src/central/adapters/.
Covered: the SourceAdapter contract, the StreamEntry registry, subject namespace conventions, dedup-key construction, the settings preview hook, the mechanical checklist a new-adapter PR must satisfy.
NOT covered: end-to-end walkthroughs of one specific adapter (read the source
for the closest existing adapter — src/central/adapters/swpc_kindex.py is the
smallest), upstream feed API documentation (lives upstream), or consumer-side
concerns (live in CONSUMER-INTEGRATION.md).
2. The design principle
"Central takes it all and gives it all. It's up to the pipe to do with it what it will." — Matt, 2026-05-19
The correct reading of that sentence: Central is the consumer's only data plane. A downstream consumer sees exactly what's on the wire and nothing more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot reverse-geocode a coordinate on its own. So whatever Central does NOT put on the wire is, for every consumer, simply missing. "Gives it all" therefore means give the consumer everything a reasonable consumer needs to act on the event — not "give the upstream payload only."
Adapter authors translate that into a small number of concrete rules:
- Preserve every upstream field. Anything the upstream returns lives in
Event.dataverbatim. Adapters do not silently drop fields, even ones that look redundant or low-value today (see §10.2). - Enrich, deliberately and centrally. Location, timezone, elevation,
landclass and similar context that consumers reliably need should be resolved
once, by Central, and attached — not left for twelve consumers to each
re-derive (most of them can't). Enrichment runs through the framework
(§13): an adapter declares
enrichment_locationsand the supervisor attaches results underEvent.data["_enriched"]. - Namespace enrichment for provenance. Central-derived fields live under
_enriched.<enricher_name>; everything else indatais upstream verbatim. A consumer can always tell which is which. - Fail gracefully to null, never to an exception. Enrichment that can't
resolve a field returns
nullfor it (a stable, documented field set), and a total enrichment failure returns an all-null bundle. A geocoder outage must never drop or corrupt the underlying event. - No opinionated translation of the upstream payload. Enrichment adds
namespaced fields; it does not rewrite upstream ones. Adapters still do not
coerce units, rename upstream fields, or collapse upstream enumerations inside
data. The only in-place adapter transforms remain mechanical: subject-token normalization (camelCase → snake_case, agency-prefix splitting, whitespace → underscore, lowercase) and dedup-key construction.
This reframes a Phase 2 rule. The earlier draft of this doc said "no enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was rejected on those grounds. That reasoning is now inverted: consumers have no practical way to do that work, so Central does it. The constraint that survives is where and how — through the framework, namespaced, cached, failing to null — not whether. See §10.1 for the remaining anti-pattern (enrichment done outside the framework) and §13 for the full contract.
3. Quick start: cloning an existing adapter
The cheapest way to start a new adapter is to copy the smallest existing one
and edit. As of this writing the smallest is swpc_kindex (one fixed subject,
no region filter, no preview hook).
File layout for a one-file adapter:
src/central/adapters/<your_adapter>.py # the SourceAdapter subclass
tests/test_<your_adapter>.py # unit tests
When the upstream API needs shared utilities (e.g. WFIGS shares helpers across
wfigs_incidents and wfigs_perimeters), factor them into a sibling
<family>_common.py module — see wfigs_common.py and swpc_common.py for
the existing precedent.
Minimum overrides:
from collections.abc import AsyncIterator
from pathlib import Path
import aiohttp
from pydantic import BaseModel
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
class ExampleSettings(BaseModel):
# Adapter-tunable settings live here. Pydantic v2 model. Forbid extras
# if you want strictness; otherwise extra-field silence is the default.
pass
class ExampleAdapter(SourceAdapter):
name = "example"
display_name = "Example Source"
description = "One-line operator-facing description."
settings_schema = ExampleSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
async def apply_config(self, new_config: AdapterConfig) -> None:
# Re-derive any cached settings from new_config.settings.
pass
def subject_for(self, event: Event) -> str:
return f"central.<domain>.example"
async def poll(self) -> AsyncIterator[Event]:
# Fetch, parse, yield Events. Dedup using a sqlite3 cursor_db
# connection initialized in startup() — see swpc_kindex for the
# established `published_ids` table shape.
if False:
yield # type: ignore[unreachable]
return
Auto-discovery picks the adapter up the moment the module imports cleanly. No
registry edit is required — central.adapter_discovery.discover_adapters()
walks central.adapters.* at import time and registers every concrete
SourceAdapter subclass keyed by its name class-attribute.
4. The SourceAdapter base class
src/central/adapter.py is the SoT. The contract is reproduced here verbatim
where the docstrings already capture it; this section adds the supervisor-side
context that the source file does not narrate.
4.1 Class attributes
Every subclass must define:
| Attribute | Type | Purpose |
|---|---|---|
name |
str |
Short identifier, e.g. "nws". Becomes the adapter key in Event.adapter and the discovery-registry key. Must be unique across central.adapters.*. |
display_name |
str |
Human-readable name shown in the GUI. |
description |
str |
Short description shown on the adapter detail page. |
settings_schema |
type[BaseModel] |
Pydantic v2 model class for adapter settings. See §5. |
default_cadence_s |
int |
Default poll interval in seconds. Operators may override via config.adapters; AdapterConfig.cadence_s has a Pydantic ge=10 floor. |
Optional class attributes (default to None / not-in-wizard):
| Attribute | Type | Purpose |
|---|---|---|
requires_api_key |
str | None |
Key alias if an API key is required, else None. |
api_key_field |
str | None |
Names the settings_schema field that holds the api_key alias reference. The GUI renders this as a select populated from config.api_keys; the wizard validates it against the staged api_keys state. |
wizard_order |
int | None |
Position in the setup wizard. None excludes the adapter from the wizard. |
4.2 Required methods
Every subclass must implement these three abstract methods.
async def poll(self) -> AsyncIterator[Event]
Poll the source for new events. Yields
Eventobjects for each new/updated event found.
The supervisor drives the loop; poll() is a one-shot async generator the
supervisor calls once per cadence tick, drains, and discards. Adapters MUST NOT
loop internally — control of cadence and concurrency lives in the supervisor.
The supervisor wraps each yielded Event in a CloudEvents envelope and
publishes it. The adapter is responsible only for yielding Event objects;
all envelope construction, NATS publish, and metadata heartbeat emission live
outside the adapter.
async def apply_config(self, new_config: AdapterConfig) -> None
Apply new configuration to the adapter. Called by supervisor when config changes via hot-reload. The adapter should extract relevant settings from
new_config.settingsand update its internal state.
new_config.settings is a dict[str, Any]; re-validate it against
settings_schema if you intend to read structured fields. Most adapters cache
a couple of fields (e.g. region, parameter_codes) on self during
__init__ and refresh them here. Do NOT tear down self._session or
self._db here — that is shutdown()'s responsibility.
def subject_for(self, event: Event) -> str
Compute the NATS subject for an event. Each adapter knows its own subject hierarchy. The supervisor calls this to determine where to publish each event.
The result must match the subject filter of exactly one StreamEntry in
central.streams.STREAMS. See §6 for the
naming rules and §8 for the stream side.
4.3 Optional lifecycle hooks
async def startup(self) -> None — Optional. Called once before the first
poll(). Open the HTTP session, open the sqlite3 cursor DB, create any
required tables. The default is a no-op.
async def shutdown(self) -> None — Optional. Called once at graceful
shutdown. Close the HTTP session, close the sqlite3 cursor DB. The default is
a no-op.
async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None —
Optional. The settings-page preview hook. The default returns None (no
preview). See §11 for the contract.
4.4 Constructor signature
The supervisor instantiates adapters with this exact keyword-argument call
(src/central/supervisor.py _create_adapter):
cls(
config=config, # AdapterConfig — name, enabled, cadence_s, settings, …
config_store=self._config_store, # ConfigStore — staged-vs-live config access
cursor_db_path=CURSOR_DB_PATH, # Path — /var/lib/central/cursors.db
)
Adapters take exactly these three keyword arguments. Adding new constructor
parameters silently breaks supervisor instantiation; if you need additional
state, derive it from config.settings or read it lazily during startup().
5. Settings and configuration
5.1 The settings schema
settings_schema is a Pydantic v2 BaseModel subclass. It is the SoT for
operator-tunable fields and the basis for GUI form rendering, wizard
validation, and migration seed defaults. Three rules:
- Default values are SoT. The settings model's defaults are the canonical
starting state. Migrations seed
config.adapters.settingsfrom the same model; tests build fixtures by instantiating the model with no args. Do NOT duplicate default lists in migration JSON, fixtures, or test code — derive them from the model class. - Use
RegionConfigfor bounding boxes. Defined incentral.config_models.RegionConfig. Validates north/south/east/west, rejects zero-width bboxes, rejects antimeridian crossings. Reuse it; do not reinvent it. - Optional region. Some adapters strongly recommend a region (NWIS, WFIGS)
but accept
Noneso an operator can test before configuring. Log a WARN atstartup()if a region is recommended but unset; do not refuse to start.
5.2 What gets stored where
| State | Location | Adapter access |
|---|---|---|
| Adapter config (cadence, settings, enabled flag, paused state) | DB table config.adapters, fronted by ConfigStore |
Passed in __init__ as config: AdapterConfig; refreshed via apply_config() |
| API keys | DB table config.api_keys, fronted by ConfigStore |
Read via self._config_store.get_api_key(alias); never persisted on the adapter |
| Dedup cursor / observed sets | sqlite3 at cursor_db_path (/var/lib/central/cursors.db) |
Each adapter creates its own table(s); the file is shared across adapters by convention but per-adapter rows are namespaced by the adapter column |
| Stream retention / max_bytes | DB table config.streams; managed by supervisor |
Adapters do not read or write stream config |
AdapterConfig.settings is a dict[str, Any] — re-validate against
settings_schema before consuming structured fields. Most adapters do this
once in __init__ and again in apply_config().
5.3 preview_for_settings boundary
preview_for_settings() is the one place that does NOT have access to
self._config_store or cursor_db_path semantics. See
§11. The framework may instantiate adapters
with a stub config store solely to call this method, and the GUI process never
calls startup(). Do not assume the adapter's HTTP session, cursor DB, or any
other startup-time state exists when preview runs.
6. Subject namespace conventions
6.1 The shape
Every subject starts with central. and decomposes as:
central.<domain>.<subtype>[.<dimensions>...]
<domain>is one ofwx,fire,quake,space,disaster,hydro,meta(the current set — see §8 for adding one). Operators MUST be able to subscribe to all of one domain withcentral.<domain>.>.<subtype>is adapter-driven and identifies the event category within the domain (e.g.hotspot,alert,incident,kindex,proton_flux).<dimensions>are zero or more refinement tokens — geographic, temporal, upstream identifier — that the consumer would plausibly want to filter on.
Token rules:
- All lowercase. No uppercase, no mixed-case.
snake_casetokens. Multi-word tokens use underscores (proton_flux,sea_lake_ice), not hyphens.- No empty tokens. Fall back to a literal
unknowntoken when a dimension is genuinely absent (NWS does this for alerts lacking a primary region). - No
.removedinjected mid-token. Removal subjects always read<subtype>.removed.<region>— see §9.
6.2 Where the canonical subjects live
docs/CONSUMER-INTEGRATION.md §4 is the SoT for the concrete subject patterns
that exist today. Adapter authors should mirror new subjects there as part of
the same PR; the consumer test suite asserts every discovered adapter has a
per-adapter subsection.
6.3 Subject-token decomposition belongs in the adapter
When an upstream identifier is composite — agency-prefix + bare site number, state + county, satellite + confidence — the adapter is responsible for splitting it into tokens. Two conventions:
-
One helper function per decomposition, one place. NWIS' helper sits at module scope in
nwis.py:def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]: """Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no).""" ...Both
subject_for()andEvent.categoryconstruction call through it. WFIGS does the same withsubject_suffix()inwfigs_common.py. Never inline the decomposition in two places — they will drift. -
Round-trip via
Event.category. Adapters setEvent.categoryto a Central-relative form (hydro.<param>.<agency>.<bare_site_no>,fire.incident.<state>.<county>);subject_for()then prependscentral.and may rearrange tokens. The category lives in the wire payload (seeCONSUMER-INTEGRATION.md§5b); the subject is computed from it at publish time.
6.4 Adding a new top-level domain
Top-level domains are stream-mapped 1:1 (see §8).
Adding central.<new_domain>.> requires:
- One new
StreamEntryinsrc/central/streams.py. - One new migration row that seeds
config.streamswith retention andmax_bytes. - Update
docs/CONSUMER-INTEGRATION.md§3 and §4 to advertise the new stream / subject pattern.
supervisor.STREAM_SUBJECTS, archive.STREAMS, and gui.DASHBOARD_STREAMS
all derive from STREAMS at import time; no code edits in those modules are
required.
7. Dedup key construction
7.1 What "dedup key" means here
Adapters maintain a per-adapter sqlite3 table at cursor_db_path (typically
published_ids keyed by (adapter, event_id)) to suppress re-yielding the
same event across successive poll() runs. The string the adapter writes to
published_ids.event_id is the adapter-side dedup key.
Separately, every yielded Event.id flows into the CloudEvents envelope's
id field (also set as the Nats-Msg-Id header on publish). Most adapters
use the same string for both — adapter-side dedup key and Event.id are
identical — which keeps consumer-side dedup (§9 of CONSUMER-INTEGRATION.md)
trivial.
A few adapters intentionally diverge: eonet stores a richer composite
adapter-side (id + timeline date) while keeping Event.id stable, because
consumers usually want timeline updates to overwrite the same id. The
Event.id is always the consumer contract; adapter-side composites are an
implementation detail.
7.2 Two shapes
Single-token. Upstream provides a stable, globally-unique identifier:
usgs_quake— USGS feature id (ci10240102)gdacs— RSS guid (WF1028708)wfigs_incidents/wfigs_perimeters— IRWIN UUIDinciweb— numeric incident idswpc_kindex— bin timestamp (single fixed subject, time is the natural key)nws— alert URLeonet—EONET_NNNNN(consumer-facing); composite tracked separately adapter-side as described in §7.1
Use the upstream id directly. No prefix, no separators.
Composite. Upstream id is not by itself sufficient — needs a parameter, a timestamp, or a tombstone marker to dedup correctly:
| Adapter | Shape |
|---|---|
firms |
<satellite>:<acq_date>:<acq_time>:<lat_3dp>:<lon_3dp> |
nwis |
nwis:<monitoring_location_id>:<parameter_code>:<time_iso> |
wfigs_incidents / wfigs_perimeters removals |
<IrwinID>:removed:<iso_now> |
7.3 Separator convention
Use : as the dedup-key separator for new adapters.
There is a pre-existing wrinkle: two SWPC adapters use | instead.
| Adapter | Separator | Shape |
|---|---|---|
swpc_alerts |
| |
<product_id>|<issue_timestamp> |
swpc_protons |
| |
<time_tag>|<energy> |
This is the historical reality at the time of writing — not a recommendation.
Do not propagate | to new adapters; do not refactor the SWPC pair to :
without an independent rename PR (the strings are persisted in
/var/lib/central/cursors.db rows).
7.4 Implementation
Every existing adapter follows the same pattern:
async def startup(self) -> None:
...
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
Then is_published(event_id) -> bool, mark_published(event_id) -> None, and
a periodic sweep_old_ids() (typical retention: 7 to 30 days, tuned per
adapter). Copy from swpc_kindex.py — it is the canonical reference shape.
7.5 The CloudEvents id field
Event.id becomes the CloudEvents envelope id and the Nats-Msg-Id header.
JetStream applies its own short dedup window keyed on Nats-Msg-Id, but
consumers do their own dedup (CONSUMER-INTEGRATION.md §9)
and should not rely on JetStream's window. Stability of Event.id across
re-publishes is the producer's contract; the consumer side trusts it.
8. The StreamEntry registry
src/central/streams.py is the SoT for stream definitions. The whole file is
short enough to quote:
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True
dashboard: bool = True
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
8.1 What derives from STREAMS
Three import-time consumers read STREAMS and build their own structures:
| Derived constant | Module | Shape |
|---|---|---|
STREAM_SUBJECTS |
central.supervisor |
dict[str, list[str]] — used to create JetStream streams |
STREAMS |
central.archive |
list[tuple[str, str]] for event_bearing=True entries only |
DASHBOARD_STREAMS |
central.gui.routes |
list[str] for dashboard=True entries |
tests/test_stream_registry.py asserts each derivation matches the registry —
adding a stream automatically extends the test surface; no new test code is
required.
8.2 Adding a stream
When a new adapter needs a new top-level domain:
- One line in
src/central/streams.py. Append aStreamEntry(...)to theSTREAMSlist. - One migration row. Seed
config.streamswith retention andmax_bytes. Existing migrations inmigrations/are the template.
event_bearing=False is reserved for CENTRAL_META today; adding a second
non-event-bearing stream silently excludes it from the archive. The registry
test explicitly guards against this and requires a deliberate test update if
the invariant changes.
8.3 The consistency gate
tests/test_stream_registry.py is the property-test gate. It asserts:
- Stream names and subject filters are unique.
- Every subject filter matches
^central\.[a-z][a-z_]*\.>$. CENTRAL_METAis the only non-event-bearing stream.- The supervisor / archive / GUI derivations all match the registry.
Mirror that discipline in any new test you write for adapter-side stream
behavior: derive expected values from STREAMS rather than hand-typing them.
9. Removal / fall-off patterns
When an upstream record disappears, an adapter has two options:
9.1 Explicit removal subject
Emit an Event whose subject is <subtype>.removed.<region> and whose
category ends in .removed. This is the right choice when:
- Upstream signals retirement explicitly (
gdacs'siscurrent: false). - The upstream is a "current state" feed and absence is meaningful
(
wfigs_incidents,wfigs_perimeters,eonet).
The subtype-before-removed token order is a hard convention — consumers
filtering on central.<domain>.<subtype>.> receive both live and removal
events with a single subscription. See
CONSUMER-INTEGRATION.md §7a
for the consumer side.
9.2 Absence-as-signal
No removal subject; consumers infer disappearance from a time gap or an
upstream-specific marker (expires field, supersession text). This is the
right choice when:
- The upstream is a stream of point-in-time observations (
firmshotspot pixels,swpc_kindex3-hour bins,swpc_protonscadence-driven samples). - Upstream signals expiry inline (
nwsexpiresfield,nwsmessageType: Cancel). - The upstream is durable / append-only (
usgs_quakecatalog,inciwebRSS).
See CONSUMER-INTEGRATION.md §7b
for the consumer-side enumeration.
9.3 Composite removal dedup key
When an adapter emits explicit removals, the same upstream id can fall off and re-appear multiple times over an adapter's lifetime. The dedup key for a removal event therefore needs a removal-detection timestamp:
Event(
id=f"{irwin_id}:removed:{now.isoformat()}",
category="fire.incident.removed",
# ...
)
The :removed:<iso_now> suffix is the cross-adapter convention. Keep it.
9.4 Removal payload schema
Removal events ship an empty geo block and a sparse data payload. The
exact shape (id field name, reason values, last_observed_at, adapter-specific
context fields) is documented in
CONSUMER-INTEGRATION.md §7c;
adapter authors should mirror it. Do not restate it here.
10. Anti-patterns — what NOT to do
These are the patterns prior reviews have explicitly rejected. Reject them again on sight in a new-adapter PR.
10.1 Enrichment outside the framework
Enrichment itself is expected, not forbidden — see
§2 and §13. Any adapter
with location data should opt in by declaring enrichment_locations on the
adapter class; the supervisor then runs the registered enrichers and attaches
results under Event.data["_enriched"].
The anti-pattern is doing enrichment the wrong way — outside the framework:
- An
if missing: await fetch_metadata()branch buried in an adapter'spoll(). This bypasses the cache (so every poll re-hits the geocoder), skips the_enrichednamespacing (so consumers can't tell upstream from Central-derived), and gives up the never-raise/all-null safety net (so a geocoder hiccup can take down the poll). - Writing enriched fields directly into the top level of
Event.datainstead of under_enriched. That destroys provenance — a consumer can no longer tell which fields came from the upstream feed and which Central added. - Standing up a parallel enrichment path (a second HTTP client, a private cache) inside one adapter instead of registering a backend with the framework.
The rule of thumb: declare enrichment_locations, let the supervisor do the
work. If the framework can't express what you need, extend the framework
(§13) — don't route around it inside an adapter.
10.2 Silent field dropping
Event.data carries the upstream payload verbatim. Do not omit a field
because it "looks redundant" or "isn't useful right now." Operators
debugging a downstream consumer often need every field the upstream sent;
dropping fields makes that impossible after the fact.
The two acceptable transforms — token normalization (case, separators) and dedup-key construction — are not field-drops. They produce new fields derived from the upstream payload; the upstream payload itself is preserved.
10.3 Magic-number asserts in tests
Adapter tests that pin event counts to literal integers (assert len(events) == 47) break the next time the upstream's fixture is
refreshed and contribute zero diagnostic value. Either assert structural
invariants (every yielded event has a non-empty id; subjects match the
adapter's subject_for() output; etc.) or assert against a count computed
from the input fixture (assert len(events) == len(fixture.features)).
10.4 Hardcoded stream / adapter lists in tests
Tests that enumerate ["CENTRAL_WX", "CENTRAL_FIRE", ...] or ["firms", "inciweb", ...] as literals drift the moment a stream or adapter is added
or removed. Derive from the SoT:
from central.streams import STREAMS
from central.adapter_discovery import discover_adapters
stream_names = {s.name for s in STREAMS}
adapter_names = set(discover_adapters().keys())
tests/test_stream_registry.py and tests/test_consumer_doc.py are the
in-tree templates. The sibling test for this doc
(tests/test_producer_doc.py) follows the same rule.
10.5 Persistent state outside cursor_db_path
Adapters MUST NOT write to disk outside the cursor DB at
cursor_db_path. No JSON files in /var/lib/central/<adapter>/, no
state stashed in /tmp. Multi-process invariants (the supervisor may
restart an adapter; the GUI may instantiate it independently to call
preview_for_settings()) only hold for the cursor DB and config.adapters.
10.6 Blocking calls in async paths
poll(), apply_config(), startup(), shutdown(), and
preview_for_settings() are async. Use aiohttp, not requests; use
asyncio.to_thread() to push CPU-bound or genuinely blocking work
(shapely geometry operations, large JSON parsing) off the event loop.
The supervisor runs every adapter on a single event loop — one blocked
adapter stalls every other adapter.
10.7 Reaching into framework internals from preview_for_settings
preview_for_settings() is a pure function of settings. Do NOT access
self._config_store, self._cursor_db_path, or any state established
in startup(). The framework may instantiate the adapter with a stub
config store solely to call this method. See §11.
10.8 Hardcoded subject strings in tests
A test that asserts subject == "central.fire.incident.mt.flathead" for a
specific fixture is fine; a test that hardcodes the subject pattern as a
string and re-derives it instead of calling the adapter's
subject_for(event) is the wrong shape. Round-trip through the adapter so
the test breaks the day someone shifts a token.
11. Settings preview hook
preview_for_settings() is the optional GUI-side hook that surfaces a
settings-driven preview on the adapter edit page. NWIS uses it to show the
monitoring-locations inside the configured bbox before the operator hits
save; most adapters opt out by inheriting the base class's return None.
11.1 The contract (verbatim from central.adapter.SourceAdapter)
Optional. Override to surface a settings-driven preview on the edit page.
Return
list[dict](framework renders as a generic table; columns come from the first dict's keys, in insertion order). ReturnNoneto skip preview. Raise to surface an error banner — framework catches at the route boundary.Contract:
- Preview is a pure function of
settings. Do NOT accessself._config_storeorcursor_dbstate — the framework may instantiate adapters with a stub config_store solely to call this method.- Network preview implementations must open their own short-lived
aiohttpsession (the adapter's polling session may not exist; the GUI process never callsstartup()).- Return
Nonewhen preview is not meaningful (e.g., required settings like region are unset). Return[]explicitly if the query ran and matched zero rows — the framework renders that distinctly fromNone.
11.2 The three return states
| Return | Framework renders |
|---|---|
None |
Nothing — no preview block on the edit page |
[] |
"Preview (0 rows)" legend, no table |
list[dict] with rows |
Generic table; columns from the first dict's keys in insertion order; legend "Preview (N rows)" |
tests/test_preview_hook.py is the test surface for the partial; mirror its
discipline for any adapter-specific preview test.
11.3 Generic rendering — no per-adapter Jinja
The framework is duck-typed on list[dict]. Column headers are the first
dict's keys. There is no per-adapter branch in the template, and no
adapter-name conditionals are accepted on review. If an adapter's preview
needs a column the framework does not surface, the adapter rewrites the dict
shape, not the template.
11.4 Implementation pattern
NWIS is the canonical example. Skeleton:
async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None:
if settings.region is None:
return None
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=15),
) as session:
async with session.get(url, headers=...) as resp:
resp.raise_for_status()
page = await resp.json()
return [
{"site_id": ..., "name": ..., "site_type": ..., "state": ...}
for feat in page.get("features") or []
]
Key points:
- Fresh
aiohttp.ClientSessionper call —self._sessionmay not exist. - Short timeout (NWIS uses 15s) — the edit page must render quickly.
- Small row cap (NWIS uses 50) — the preview is for orientation, not bulk inspection.
- Raise on HTTP / parse failure — the framework catches at the route boundary and renders an error banner.
12. Acceptance gate for a new adapter
A new-adapter PR is accepted when every item below holds. The list is mechanical; reviewers and the PR author should both run through it before requesting / granting merge.
- Upstream curl evidence captured. A real HTTP transcript (curl command + redacted response body) is pinned somewhere in the PR description or a fixture file. Documents what the adapter was written against.
SourceAdaptersubclass conforms to §4. Class attributes set, three abstract methods implemented, constructor signature matches the supervisor call exactly.tests/test_<adapter>.pyexists. Unit tests for the adapter; no magic-number asserts (§10.3); no hardcoded stream / adapter lists (§10.4).- Dry-run on CT104 passes. The supervisor instantiates the adapter
without errors; one full
poll()cycle yields events without raising. - Dedup keys stable across multiple ticks. Two successive
poll()runs against a frozen upstream fixture yield the same event-count delta as expected (zero new events on the second run if upstream is unchanged). - No hardcoded values that should be derived. Subject prefixes, stream names, default parameter lists, etc., all derive from the registry / settings model / discovery surface.
CONSUMER-INTEGRATION.mdupdated. New per-adapter subsection in §6, new subject row in §4, new stream row in §3 if a new domain was introduced.tests/test_consumer_doc.pypasses.PRODUCER-INTEGRATION.mdmentions the adapter only if it introduces a new convention. This doc is contract-shaped, not catalog-shaped — most new adapters need no edit here.- Full pytest suite green.
13. Enrichment contract
Enrichment is how Central adds consumer-needed context (location names,
timezone, elevation, landclass, …) that the upstream feed doesn't carry and a
downstream consumer can't look up itself. It runs in the supervisor, after
dedup and before the CloudEvents wrap, for any adapter that opts in. Results are
namespaced under Event.data["_enriched"] so provenance stays explicit:
everything under _enriched is Central-derived; everything else in data is
upstream verbatim.
13.1 Opting an adapter in
Declare enrichment_locations on the adapter class — a list of
(lat_field, lon_field) tuples naming top-level keys in Event.data:
class FIRMSAdapter(SourceAdapter):
enrichment_locations = [("latitude", "longitude")]
Empty (the default on SourceAdapter) means "no enrichment, publish as-is."
The supervisor uses the first tuple that resolves to a non-null coordinate pair,
runs each registered enricher over {"lat": …, "lon": …}, and attaches the
results. No adapter code calls enrichers directly.
13.2 The Enricher Protocol
An enricher is any object satisfying this Protocol (central.enrichment.base):
name: str— short identifier, used as the key underEvent.data["_enriched"].async def enrich(self, location: dict[str, float]) -> dict[str, Any]— given{"lat": float, "lon": float}, return a flat dict of enrichment fields. Fields it can't resolve are present with valueNone(not omitted). Must never raise — implementations handle their own failures and return an all-null bundle on total failure.
13.3 GeocoderEnricher and the GeocoderBackend Protocol
GeocoderEnricher (central.enrichment.geocoder, name = "geocoder") is the
only enricher today. It owns the cache and the canonical field normalization;
the actual reverse-geocode is delegated to a pluggable backend satisfying the
GeocoderBackend Protocol:
async def reverse(self, lat: float, lon: float) -> dict[str, Any]— return the canonical geocoder fields (see §13.5); fields the backend can't resolve returnNone. Must never raise.
Backends shipped: NaviBackend (composed Navi /api/reverse/<lat>/<lon>
endpoint — name/address + timezone + landclass + elevation in one call),
PhotonBackend (raw Photon, name/address only), NominatimBackend (OSM
Nominatim, name/address only, with a configurable rate limit + User-Agent),
and NoOpBackend (all-null — the default until an operator configures a real
backend).
13.4 Cache + failure semantics
GeocoderEnricher is backed by a sqlite cache (central.enrichment.cache,
/var/lib/central/enrichment_cache.db):
- Key:
(enricher_name, lat_rounded, lon_rounded), coordinates rounded to 4 decimal places (~11 m). TTL is per-enricher, default 24h. - Cache hit → return cached bundle, no backend call.
- Cache miss → call backend, cache the normalized result (even an all-null bundle — so known-empty coordinates aren't re-hammered), return it.
- Backend raises (a violation of the never-raise contract, or an infrastructure error the backend chose to surface) → return an all-null bundle and do not cache it, so the next event for that coordinate retries.
Enrichment config (EnrichmentConfig: enricher_class, backend_class,
backend_settings, cache_ttl_s) is read once at supervisor startup. Changing
the enricher set is a restart, not a hot-reload.
13.5 Per-field coverage
The canonical geocoder bundle is exactly nine fields. They mirror
central.enrichment.geocoder.GEOCODER_FIELDS (the single source of truth — this
table must match it):
| Field | US events | Non-US events today | Non-US after Photon planet expansion |
|---|---|---|---|
name |
populated (wilderness sparsity gaps) | null | populated |
city |
populated (wilderness sparsity gaps) | null | populated |
county |
populated (wilderness sparsity gaps) | null | populated |
state |
populated (wilderness sparsity gaps) | null | populated |
country |
populated (wilderness sparsity gaps) | null | populated |
postal_code |
populated (wilderness sparsity gaps) | null | populated |
timezone |
populated | populated (tz_world is planet-scale) |
populated |
landclass |
populated where PAD-US covers | null (PAD-US is US-only) | null |
elevation_m |
populated | populated (planet-DEM) | populated |
Net for v0.5.0: US events get a rich bundle; non-US events get timezone +
elevation_m and the rest null. Photon planet expansion is queued on the Navi
side with no firm ETA; when it lands, NaviBackend picks it up automatically
with zero Central code changes.
13.6 Known wrinkle — landclass antimeridian false-positive
landclass is derived from a PostGIS ST_Intersects against PAD-US polygons.
Points near 51–53°N outside the US can spuriously match the Aleutian "Rat
Islands" PAD-US polygon (false matching across the antimeridian), yielding a
non-null landclass that doesn't actually apply. This is a Navi-side bug being
worked separately; until it's fixed, treat a non-null landclass on a clearly
non-US point as suspect. Documented for consumers in
CONSUMER-INTEGRATION.md.