central/docs/PRODUCER-INTEGRATION.md
Matt Johnson 6afe80ded3 docs(2-I): producer integration spec — docs/PRODUCER-INTEGRATION.md
The producer-side contract for adapter authors, mirroring PR H's consumer
spec. Self-contained — readers should not need to grep the codebase to
understand what a new SourceAdapter subclass must implement.

Bakes in the Phase 2 design principle ("Central takes it all and gives it
all. It's up to the pipe to do with it what it will.") so future authors
reject enrichment / silent-drop / opinionated-translation proposals on
sight. The previously-proposed Phase 3 NWIS metadata-enrichment ticket is
called out by name as an example of what gets rejected.

12-section outline locked with PM: design principle, quick start (clone
swpc_kindex), SourceAdapter base class, settings, subject namespace,
dedup keys, StreamEntry registry, removal/fall-off, anti-patterns,
preview hook, acceptance gate.

Sibling test (tests/test_producer_doc.py) mirrors test_consumer_doc.py
discipline:
  - bidirectional == between SourceAdapter API and §4 method coverage
  - preview_for_settings contract verbatim against live docstring
  - top-level domain enumeration vs central.streams.STREAMS prefixes
  - §8 STREAMS snippet vs central.streams.STREAMS
  - anti-patterns adapter-name examples vs discover_adapters()

No hardcoded stream / adapter / domain lists anywhere in the test —
every expected value derives from central.streams,
central.adapter_discovery, or central.adapter at runtime.

Honest about the pre-existing `:` vs `|` dedup-key separator
inconsistency (swpc_alerts and swpc_protons use `|`; everyone else
uses `:`). Recommends `:` for new adapters without forcing a rename PR
on the SWPC pair (separators are persisted in cursors.db rows).

Acceptance bars:
  (a) grep -rn 'subject_for_event\|_ADAPTER_REGISTRY' src tests → empty
  (b) bidirectional override-method coverage asserted in test
  (c) tests/test_producer_doc.py → 6/6 pass
  (d) full pytest suite → 469 pass (was 463 pre-PR; +6 new)
  (e) doc length: 823 lines (within 500–1200 envelope)
  (f) code fences balanced; JSON/Python blocks parse

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 21:17:48 +00:00

34 KiB

Central — Producer Integration

How to add a new SourceAdapter subclass to Central. This is the contract for adapter authors: what to implement, what NOT to implement, and how a new feed plugs into the existing stream / subject / dedup machinery without restating anything from docs/CONSUMER-INTEGRATION.md.

The companion document is CONSUMER-INTEGRATION.md — the contract for downstream consumers. Producer-side semantics are in scope here; consumer-side semantics (wire format, dedup recommendations, fall-off handling at the subscriber) are intentionally not restated. Cross-references point into that doc.

Table of contents

  1. What this doc is for
  2. The design principle
  3. Quick start: cloning an existing adapter
  4. The SourceAdapter base class
  5. Settings and configuration
  6. Subject namespace conventions
  7. Dedup key construction
  8. The StreamEntry registry
  9. Removal / fall-off patterns
  10. Anti-patterns — what NOT to do
  11. Settings preview hook
  12. Acceptance gate for a new adapter

1. What this doc is for

Audience: an engineer (or a code-gen agent acting as one) writing a new SourceAdapter subclass in src/central/adapters/.

Covered: the SourceAdapter contract, the StreamEntry registry, subject namespace conventions, dedup-key construction, the settings preview hook, the mechanical checklist a new-adapter PR must satisfy.

NOT covered: end-to-end walkthroughs of one specific adapter (read the source for the closest existing adapter — src/central/adapters/swpc_kindex.py is the smallest), upstream feed API documentation (lives upstream), or consumer-side concerns (live in CONSUMER-INTEGRATION.md).


2. The design principle

"Central takes it all and gives it all. It's up to the pipe to do with it what it will." — Matt, 2026-05-19

Adapter authors translate that single sentence into a small number of concrete rules:

  • Preserve every upstream field. Anything the upstream returns lives in Event.data verbatim. Adapters do not silently drop fields, even ones that look redundant or low-value today.
  • No enrichment. Adapters do not reverse-geocode, do not call out to upstream metadata endpoints during normal poll() flow, do not consult a second source to "fill in" a missing field. If a downstream consumer wants enrichment, that is consumer-side work.
  • No opinionated translation. Adapters do not coerce units, do not rename fields to match a Central-wide vocabulary, do not collapse upstream enumerations into Central's preferred labels.
  • The only adapter-side transforms are mechanical. Specifically: subject-token normalization (camelCase → snake_case, agency-prefix splitting, whitespace → underscore, lowercase) and dedup-key construction. Both are deterministic functions of upstream identifiers. Nothing else.

This rules out a whole category of plausible-sounding work that prior reviews have already rejected. For instance, "enrich NWIS site rows with USGS monitoring-locations metadata during poll()" was proposed for Phase 3 and killed on this principle. The producer adds the field-preserving pipe; the pipe ends at JetStream publish; everything richer is a downstream concern.

See §10 for the enforced list of anti-patterns. Future authors should reject the same proposals on the same grounds.


3. Quick start: cloning an existing adapter

The cheapest way to start a new adapter is to copy the smallest existing one and edit. As of this writing the smallest is swpc_kindex (one fixed subject, no region filter, no preview hook).

File layout for a one-file adapter:

src/central/adapters/<your_adapter>.py     # the SourceAdapter subclass
tests/test_<your_adapter>.py               # unit tests

When the upstream API needs shared utilities (e.g. WFIGS shares helpers across wfigs_incidents and wfigs_perimeters), factor them into a sibling <family>_common.py module — see wfigs_common.py and swpc_common.py for the existing precedent.

Minimum overrides:

from collections.abc import AsyncIterator
from pathlib import Path

import aiohttp
from pydantic import BaseModel

from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo


class ExampleSettings(BaseModel):
    # Adapter-tunable settings live here. Pydantic v2 model. Forbid extras
    # if you want strictness; otherwise extra-field silence is the default.
    pass


class ExampleAdapter(SourceAdapter):
    name = "example"
    display_name = "Example Source"
    description = "One-line operator-facing description."
    settings_schema = ExampleSettings
    requires_api_key = None
    api_key_field = None
    wizard_order = None
    default_cadence_s = 600

    def __init__(
        self,
        config: AdapterConfig,
        config_store: ConfigStore,
        cursor_db_path: Path,
    ) -> None:
        self._config_store = config_store
        self._cursor_db_path = cursor_db_path
        self._session: aiohttp.ClientSession | None = None

    async def startup(self) -> None:
        self._session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=60),
        )

    async def shutdown(self) -> None:
        if self._session:
            await self._session.close()
            self._session = None

    async def apply_config(self, new_config: AdapterConfig) -> None:
        # Re-derive any cached settings from new_config.settings.
        pass

    def subject_for(self, event: Event) -> str:
        return f"central.<domain>.example"

    async def poll(self) -> AsyncIterator[Event]:
        # Fetch, parse, yield Events. Dedup using a sqlite3 cursor_db
        # connection initialized in startup() — see swpc_kindex for the
        # established `published_ids` table shape.
        if False:
            yield  # type: ignore[unreachable]
        return

Auto-discovery picks the adapter up the moment the module imports cleanly. No registry edit is required — central.adapter_discovery.discover_adapters() walks central.adapters.* at import time and registers every concrete SourceAdapter subclass keyed by its name class-attribute.


4. The SourceAdapter base class

src/central/adapter.py is the SoT. The contract is reproduced here verbatim where the docstrings already capture it; this section adds the supervisor-side context that the source file does not narrate.

4.1 Class attributes

Every subclass must define:

Attribute Type Purpose
name str Short identifier, e.g. "nws". Becomes the adapter key in Event.adapter and the discovery-registry key. Must be unique across central.adapters.*.
display_name str Human-readable name shown in the GUI.
description str Short description shown on the adapter detail page.
settings_schema type[BaseModel] Pydantic v2 model class for adapter settings. See §5.
default_cadence_s int Default poll interval in seconds. Operators may override via config.adapters; AdapterConfig.cadence_s has a Pydantic ge=10 floor.

Optional class attributes (default to None / not-in-wizard):

Attribute Type Purpose
requires_api_key str | None Key alias if an API key is required, else None.
api_key_field str | None Names the settings_schema field that holds the api_key alias reference. The GUI renders this as a select populated from config.api_keys; the wizard validates it against the staged api_keys state.
wizard_order int | None Position in the setup wizard. None excludes the adapter from the wizard.

4.2 Required methods

Every subclass must implement these three abstract methods.

async def poll(self) -> AsyncIterator[Event]

Poll the source for new events. Yields Event objects for each new/updated event found.

The supervisor drives the loop; poll() is a one-shot async generator the supervisor calls once per cadence tick, drains, and discards. Adapters MUST NOT loop internally — control of cadence and concurrency lives in the supervisor.

The supervisor wraps each yielded Event in a CloudEvents envelope and publishes it. The adapter is responsible only for yielding Event objects; all envelope construction, NATS publish, and metadata heartbeat emission live outside the adapter.

async def apply_config(self, new_config: AdapterConfig) -> None

Apply new configuration to the adapter. Called by supervisor when config changes via hot-reload. The adapter should extract relevant settings from new_config.settings and update its internal state.

new_config.settings is a dict[str, Any]; re-validate it against settings_schema if you intend to read structured fields. Most adapters cache a couple of fields (e.g. region, parameter_codes) on self during __init__ and refresh them here. Do NOT tear down self._session or self._db here — that is shutdown()'s responsibility.

def subject_for(self, event: Event) -> str

Compute the NATS subject for an event. Each adapter knows its own subject hierarchy. The supervisor calls this to determine where to publish each event.

The result must match the subject filter of exactly one StreamEntry in central.streams.STREAMS. See §6 for the naming rules and §8 for the stream side.

4.3 Optional lifecycle hooks

async def startup(self) -> None — Optional. Called once before the first poll(). Open the HTTP session, open the sqlite3 cursor DB, create any required tables. The default is a no-op.

async def shutdown(self) -> None — Optional. Called once at graceful shutdown. Close the HTTP session, close the sqlite3 cursor DB. The default is a no-op.

async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None — Optional. The settings-page preview hook. The default returns None (no preview). See §11 for the contract.

4.4 Constructor signature

The supervisor instantiates adapters with this exact keyword-argument call (src/central/supervisor.py _create_adapter):

cls(
    config=config,                    # AdapterConfig — name, enabled, cadence_s, settings, …
    config_store=self._config_store,  # ConfigStore — staged-vs-live config access
    cursor_db_path=CURSOR_DB_PATH,    # Path — /var/lib/central/cursors.db
)

Adapters take exactly these three keyword arguments. Adding new constructor parameters silently breaks supervisor instantiation; if you need additional state, derive it from config.settings or read it lazily during startup().


5. Settings and configuration

5.1 The settings schema

settings_schema is a Pydantic v2 BaseModel subclass. It is the SoT for operator-tunable fields and the basis for GUI form rendering, wizard validation, and migration seed defaults. Three rules:

  • Default values are SoT. The settings model's defaults are the canonical starting state. Migrations seed config.adapters.settings from the same model; tests build fixtures by instantiating the model with no args. Do NOT duplicate default lists in migration JSON, fixtures, or test code — derive them from the model class.
  • Use RegionConfig for bounding boxes. Defined in central.config_models.RegionConfig. Validates north/south/east/west, rejects zero-width bboxes, rejects antimeridian crossings. Reuse it; do not reinvent it.
  • Optional region. Some adapters strongly recommend a region (NWIS, WFIGS) but accept None so an operator can test before configuring. Log a WARN at startup() if a region is recommended but unset; do not refuse to start.

5.2 What gets stored where

State Location Adapter access
Adapter config (cadence, settings, enabled flag, paused state) DB table config.adapters, fronted by ConfigStore Passed in __init__ as config: AdapterConfig; refreshed via apply_config()
API keys DB table config.api_keys, fronted by ConfigStore Read via self._config_store.get_api_key(alias); never persisted on the adapter
Dedup cursor / observed sets sqlite3 at cursor_db_path (/var/lib/central/cursors.db) Each adapter creates its own table(s); the file is shared across adapters by convention but per-adapter rows are namespaced by the adapter column
Stream retention / max_bytes DB table config.streams; managed by supervisor Adapters do not read or write stream config

AdapterConfig.settings is a dict[str, Any] — re-validate against settings_schema before consuming structured fields. Most adapters do this once in __init__ and again in apply_config().

5.3 preview_for_settings boundary

preview_for_settings() is the one place that does NOT have access to self._config_store or cursor_db_path semantics. See §11. The framework may instantiate adapters with a stub config store solely to call this method, and the GUI process never calls startup(). Do not assume the adapter's HTTP session, cursor DB, or any other startup-time state exists when preview runs.


6. Subject namespace conventions

6.1 The shape

Every subject starts with central. and decomposes as:

central.<domain>.<subtype>[.<dimensions>...]
  • <domain> is one of wx, fire, quake, space, disaster, hydro, meta (the current set — see §8 for adding one). Operators MUST be able to subscribe to all of one domain with central.<domain>.>.
  • <subtype> is adapter-driven and identifies the event category within the domain (e.g. hotspot, alert, incident, kindex, proton_flux).
  • <dimensions> are zero or more refinement tokens — geographic, temporal, upstream identifier — that the consumer would plausibly want to filter on.

Token rules:

  • All lowercase. No uppercase, no mixed-case.
  • snake_case tokens. Multi-word tokens use underscores (proton_flux, sea_lake_ice), not hyphens.
  • No empty tokens. Fall back to a literal unknown token when a dimension is genuinely absent (NWS does this for alerts lacking a primary region).
  • No .removed injected mid-token. Removal subjects always read <subtype>.removed.<region> — see §9.

6.2 Where the canonical subjects live

docs/CONSUMER-INTEGRATION.md §4 is the SoT for the concrete subject patterns that exist today. Adapter authors should mirror new subjects there as part of the same PR; the consumer test suite asserts every discovered adapter has a per-adapter subsection.

6.3 Subject-token decomposition belongs in the adapter

When an upstream identifier is composite — agency-prefix + bare site number, state + county, satellite + confidence — the adapter is responsible for splitting it into tokens. Two conventions:

  • One helper function per decomposition, one place. NWIS' helper sits at module scope in nwis.py:

    def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]:
        """Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no)."""
        ...
    

    Both subject_for() and Event.category construction call through it. WFIGS does the same with subject_suffix() in wfigs_common.py. Never inline the decomposition in two places — they will drift.

  • Round-trip via Event.category. Adapters set Event.category to a Central-relative form (hydro.<param>.<agency>.<bare_site_no>, fire.incident.<state>.<county>); subject_for() then prepends central. and may rearrange tokens. The category lives in the wire payload (see CONSUMER-INTEGRATION.md §5b); the subject is computed from it at publish time.

6.4 Adding a new top-level domain

Top-level domains are stream-mapped 1:1 (see §8). Adding central.<new_domain>.> requires:

  1. One new StreamEntry in src/central/streams.py.
  2. One new migration row that seeds config.streams with retention and max_bytes.
  3. Update docs/CONSUMER-INTEGRATION.md §3 and §4 to advertise the new stream / subject pattern.

supervisor.STREAM_SUBJECTS, archive.STREAMS, and gui.DASHBOARD_STREAMS all derive from STREAMS at import time; no code edits in those modules are required.


7. Dedup key construction

7.1 What "dedup key" means here

Adapters maintain a per-adapter sqlite3 table at cursor_db_path (typically published_ids keyed by (adapter, event_id)) to suppress re-yielding the same event across successive poll() runs. The string the adapter writes to published_ids.event_id is the adapter-side dedup key.

Separately, every yielded Event.id flows into the CloudEvents envelope's id field (also set as the Nats-Msg-Id header on publish). Most adapters use the same string for both — adapter-side dedup key and Event.id are identical — which keeps consumer-side dedup (§9 of CONSUMER-INTEGRATION.md) trivial.

A few adapters intentionally diverge: eonet stores a richer composite adapter-side (id + timeline date) while keeping Event.id stable, because consumers usually want timeline updates to overwrite the same id. The Event.id is always the consumer contract; adapter-side composites are an implementation detail.

7.2 Two shapes

Single-token. Upstream provides a stable, globally-unique identifier:

  • usgs_quake — USGS feature id (ci10240102)
  • gdacs — RSS guid (WF1028708)
  • wfigs_incidents / wfigs_perimeters — IRWIN UUID
  • inciweb — numeric incident id
  • swpc_kindex — bin timestamp (single fixed subject, time is the natural key)
  • nws — alert URL
  • eonetEONET_NNNNN (consumer-facing); composite tracked separately adapter-side as described in §7.1

Use the upstream id directly. No prefix, no separators.

Composite. Upstream id is not by itself sufficient — needs a parameter, a timestamp, or a tombstone marker to dedup correctly:

Adapter Shape
firms <satellite>:<acq_date>:<acq_time>:<lat_3dp>:<lon_3dp>
nwis nwis:<monitoring_location_id>:<parameter_code>:<time_iso>
wfigs_incidents / wfigs_perimeters removals <IrwinID>:removed:<iso_now>

7.3 Separator convention

Use : as the dedup-key separator for new adapters.

There is a pre-existing wrinkle: two SWPC adapters use | instead.

Adapter Separator Shape
swpc_alerts | <product_id>|<issue_timestamp>
swpc_protons | <time_tag>|<energy>

This is the historical reality at the time of writing — not a recommendation. Do not propagate | to new adapters; do not refactor the SWPC pair to : without an independent rename PR (the strings are persisted in /var/lib/central/cursors.db rows).

7.4 Implementation

Every existing adapter follows the same pattern:

async def startup(self) -> None:
    ...
    self._db = sqlite3.connect(self._cursor_db_path)
    self._db.execute("""
        CREATE TABLE IF NOT EXISTS published_ids (
            adapter TEXT NOT NULL,
            event_id TEXT NOT NULL,
            first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (adapter, event_id)
        )
    """)
    self._db.execute("""
        CREATE INDEX IF NOT EXISTS published_ids_last_seen
        ON published_ids (last_seen)
    """)
    self._db.commit()

Then is_published(event_id) -> bool, mark_published(event_id) -> None, and a periodic sweep_old_ids() (typical retention: 7 to 30 days, tuned per adapter). Copy from swpc_kindex.py — it is the canonical reference shape.

7.5 The CloudEvents id field

Event.id becomes the CloudEvents envelope id and the Nats-Msg-Id header. JetStream applies its own short dedup window keyed on Nats-Msg-Id, but consumers do their own dedup (CONSUMER-INTEGRATION.md §9) and should not rely on JetStream's window. Stability of Event.id across re-publishes is the producer's contract; the consumer side trusts it.


8. The StreamEntry registry

src/central/streams.py is the SoT for stream definitions. The whole file is short enough to quote:

@dataclass(frozen=True)
class StreamEntry:
    name: str
    subject_filter: str
    event_bearing: bool = True
    dashboard: bool = True


STREAMS: list[StreamEntry] = [
    StreamEntry("CENTRAL_WX",       "central.wx.>"),
    StreamEntry("CENTRAL_FIRE",     "central.fire.>"),
    StreamEntry("CENTRAL_QUAKE",    "central.quake.>"),
    StreamEntry("CENTRAL_SPACE",    "central.space.>"),
    StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
    StreamEntry("CENTRAL_HYDRO",    "central.hydro.>"),
    StreamEntry("CENTRAL_META",     "central.meta.>", event_bearing=False),
]

8.1 What derives from STREAMS

Three import-time consumers read STREAMS and build their own structures:

Derived constant Module Shape
STREAM_SUBJECTS central.supervisor dict[str, list[str]] — used to create JetStream streams
STREAMS central.archive list[tuple[str, str]] for event_bearing=True entries only
DASHBOARD_STREAMS central.gui.routes list[str] for dashboard=True entries

tests/test_stream_registry.py asserts each derivation matches the registry — adding a stream automatically extends the test surface; no new test code is required.

8.2 Adding a stream

When a new adapter needs a new top-level domain:

  1. One line in src/central/streams.py. Append a StreamEntry(...) to the STREAMS list.
  2. One migration row. Seed config.streams with retention and max_bytes. Existing migrations in migrations/ are the template.

event_bearing=False is reserved for CENTRAL_META today; adding a second non-event-bearing stream silently excludes it from the archive. The registry test explicitly guards against this and requires a deliberate test update if the invariant changes.

8.3 The consistency gate

tests/test_stream_registry.py is the property-test gate. It asserts:

  • Stream names and subject filters are unique.
  • Every subject filter matches ^central\.[a-z][a-z_]*\.>$.
  • CENTRAL_META is the only non-event-bearing stream.
  • The supervisor / archive / GUI derivations all match the registry.

Mirror that discipline in any new test you write for adapter-side stream behavior: derive expected values from STREAMS rather than hand-typing them.


9. Removal / fall-off patterns

When an upstream record disappears, an adapter has two options:

9.1 Explicit removal subject

Emit an Event whose subject is <subtype>.removed.<region> and whose category ends in .removed. This is the right choice when:

  • Upstream signals retirement explicitly (gdacs's iscurrent: false).
  • The upstream is a "current state" feed and absence is meaningful (wfigs_incidents, wfigs_perimeters, eonet).

The subtype-before-removed token order is a hard convention — consumers filtering on central.<domain>.<subtype>.> receive both live and removal events with a single subscription. See CONSUMER-INTEGRATION.md §7a for the consumer side.

9.2 Absence-as-signal

No removal subject; consumers infer disappearance from a time gap or an upstream-specific marker (expires field, supersession text). This is the right choice when:

  • The upstream is a stream of point-in-time observations (firms hotspot pixels, swpc_kindex 3-hour bins, swpc_protons cadence-driven samples).
  • Upstream signals expiry inline (nws expires field, nws messageType: Cancel).
  • The upstream is durable / append-only (usgs_quake catalog, inciweb RSS).

See CONSUMER-INTEGRATION.md §7b for the consumer-side enumeration.

9.3 Composite removal dedup key

When an adapter emits explicit removals, the same upstream id can fall off and re-appear multiple times over an adapter's lifetime. The dedup key for a removal event therefore needs a removal-detection timestamp:

Event(
    id=f"{irwin_id}:removed:{now.isoformat()}",
    category="fire.incident.removed",
    # ...
)

The :removed:<iso_now> suffix is the cross-adapter convention. Keep it.

9.4 Removal payload schema

Removal events ship an empty geo block and a sparse data payload. The exact shape (id field name, reason values, last_observed_at, adapter-specific context fields) is documented in CONSUMER-INTEGRATION.md §7c; adapter authors should mirror it. Do not restate it here.


10. Anti-patterns — what NOT to do

These are the patterns prior reviews have explicitly rejected. Reject them again on sight in a new-adapter PR.

10.1 Enrichment during poll()

No calls to upstream metadata endpoints, no reverse-geocode, no consultation of a second source to fill in fields the primary feed omitted. The "NWIS enrichment" Phase 3 proposal — joining live measurements against the monitoring-locations metadata endpoint during poll() — was rejected on the §2 principle. Future proposals along the same lines get the same answer.

If enrichment is genuinely necessary, the right shape is a separate adapter (or a downstream consumer) — not an if metadata_missing: await fetch_metadata() branch buried in an adapter's poll().

10.2 Silent field dropping

Event.data carries the upstream payload verbatim. Do not omit a field because it "looks redundant" or "isn't useful right now." Operators debugging a downstream consumer often need every field the upstream sent; dropping fields makes that impossible after the fact.

The two acceptable transforms — token normalization (case, separators) and dedup-key construction — are not field-drops. They produce new fields derived from the upstream payload; the upstream payload itself is preserved.

10.3 Magic-number asserts in tests

Adapter tests that pin event counts to literal integers (assert len(events) == 47) break the next time the upstream's fixture is refreshed and contribute zero diagnostic value. Either assert structural invariants (every yielded event has a non-empty id; subjects match the adapter's subject_for() output; etc.) or assert against a count computed from the input fixture (assert len(events) == len(fixture.features)).

10.4 Hardcoded stream / adapter lists in tests

Tests that enumerate ["CENTRAL_WX", "CENTRAL_FIRE", ...] or ["firms", "inciweb", ...] as literals drift the moment a stream or adapter is added or removed. Derive from the SoT:

from central.streams import STREAMS
from central.adapter_discovery import discover_adapters

stream_names = {s.name for s in STREAMS}
adapter_names = set(discover_adapters().keys())

tests/test_stream_registry.py and tests/test_consumer_doc.py are the in-tree templates. The sibling test for this doc (tests/test_producer_doc.py) follows the same rule.

10.5 Persistent state outside cursor_db_path

Adapters MUST NOT write to disk outside the cursor DB at cursor_db_path. No JSON files in /var/lib/central/<adapter>/, no state stashed in /tmp. Multi-process invariants (the supervisor may restart an adapter; the GUI may instantiate it independently to call preview_for_settings()) only hold for the cursor DB and config.adapters.

10.6 Blocking calls in async paths

poll(), apply_config(), startup(), shutdown(), and preview_for_settings() are async. Use aiohttp, not requests; use asyncio.to_thread() to push CPU-bound or genuinely blocking work (shapely geometry operations, large JSON parsing) off the event loop. The supervisor runs every adapter on a single event loop — one blocked adapter stalls every other adapter.

10.7 Reaching into framework internals from preview_for_settings

preview_for_settings() is a pure function of settings. Do NOT access self._config_store, self._cursor_db_path, or any state established in startup(). The framework may instantiate the adapter with a stub config store solely to call this method. See §11.

10.8 Hardcoded subject strings in tests

A test that asserts subject == "central.fire.incident.mt.flathead" for a specific fixture is fine; a test that hardcodes the subject pattern as a string and re-derives it instead of calling the adapter's subject_for(event) is the wrong shape. Round-trip through the adapter so the test breaks the day someone shifts a token.


11. Settings preview hook

preview_for_settings() is the optional GUI-side hook that surfaces a settings-driven preview on the adapter edit page. NWIS uses it to show the monitoring-locations inside the configured bbox before the operator hits save; most adapters opt out by inheriting the base class's return None.

11.1 The contract (verbatim from central.adapter.SourceAdapter)

Optional. Override to surface a settings-driven preview on the edit page.

Return list[dict] (framework renders as a generic table; columns come from the first dict's keys, in insertion order). Return None to skip preview. Raise to surface an error banner — framework catches at the route boundary.

Contract:

  • Preview is a pure function of settings. Do NOT access self._config_store or cursor_db state — the framework may instantiate adapters with a stub config_store solely to call this method.
  • Network preview implementations must open their own short-lived aiohttp session (the adapter's polling session may not exist; the GUI process never calls startup()).
  • Return None when preview is not meaningful (e.g., required settings like region are unset). Return [] explicitly if the query ran and matched zero rows — the framework renders that distinctly from None.

11.2 The three return states

Return Framework renders
None Nothing — no preview block on the edit page
[] "Preview (0 rows)" legend, no table
list[dict] with rows Generic table; columns from the first dict's keys in insertion order; legend "Preview (N rows)"

tests/test_preview_hook.py is the test surface for the partial; mirror its discipline for any adapter-specific preview test.

11.3 Generic rendering — no per-adapter Jinja

The framework is duck-typed on list[dict]. Column headers are the first dict's keys. There is no per-adapter branch in the template, and no adapter-name conditionals are accepted on review. If an adapter's preview needs a column the framework does not surface, the adapter rewrites the dict shape, not the template.

11.4 Implementation pattern

NWIS is the canonical example. Skeleton:

async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None:
    if settings.region is None:
        return None
    async with aiohttp.ClientSession(
        timeout=aiohttp.ClientTimeout(total=15),
    ) as session:
        async with session.get(url, headers=...) as resp:
            resp.raise_for_status()
            page = await resp.json()
    return [
        {"site_id": ..., "name": ..., "site_type": ..., "state": ...}
        for feat in page.get("features") or []
    ]

Key points:

  • Fresh aiohttp.ClientSession per call — self._session may not exist.
  • Short timeout (NWIS uses 15s) — the edit page must render quickly.
  • Small row cap (NWIS uses 50) — the preview is for orientation, not bulk inspection.
  • Raise on HTTP / parse failure — the framework catches at the route boundary and renders an error banner.

12. Acceptance gate for a new adapter

A new-adapter PR is accepted when every item below holds. The list is mechanical; reviewers and the PR author should both run through it before requesting / granting merge.

  • Upstream curl evidence captured. A real HTTP transcript (curl command + redacted response body) is pinned somewhere in the PR description or a fixture file. Documents what the adapter was written against.
  • SourceAdapter subclass conforms to §4. Class attributes set, three abstract methods implemented, constructor signature matches the supervisor call exactly.
  • tests/test_<adapter>.py exists. Unit tests for the adapter; no magic-number asserts (§10.3); no hardcoded stream / adapter lists (§10.4).
  • Dry-run on CT104 passes. The supervisor instantiates the adapter without errors; one full poll() cycle yields events without raising.
  • Dedup keys stable across multiple ticks. Two successive poll() runs against a frozen upstream fixture yield the same event-count delta as expected (zero new events on the second run if upstream is unchanged).
  • No hardcoded values that should be derived. Subject prefixes, stream names, default parameter lists, etc., all derive from the registry / settings model / discovery surface.
  • CONSUMER-INTEGRATION.md updated. New per-adapter subsection in §6, new subject row in §4, new stream row in §3 if a new domain was introduced. tests/test_consumer_doc.py passes.
  • PRODUCER-INTEGRATION.md mentions the adapter only if it introduces a new convention. This doc is contract-shaped, not catalog-shaped — most new adapters need no edit here.
  • Full pytest suite green.