The producer-side contract for adapter authors, mirroring PR H's consumer
spec. Self-contained — readers should not need to grep the codebase to
understand what a new SourceAdapter subclass must implement.
Bakes in the Phase 2 design principle ("Central takes it all and gives it
all. It's up to the pipe to do with it what it will.") so future authors
reject enrichment / silent-drop / opinionated-translation proposals on
sight. The previously-proposed Phase 3 NWIS metadata-enrichment ticket is
called out by name as an example of what gets rejected.
12-section outline locked with PM: design principle, quick start (clone
swpc_kindex), SourceAdapter base class, settings, subject namespace,
dedup keys, StreamEntry registry, removal/fall-off, anti-patterns,
preview hook, acceptance gate.
Sibling test (tests/test_producer_doc.py) mirrors test_consumer_doc.py
discipline:
- bidirectional == between SourceAdapter API and §4 method coverage
- preview_for_settings contract verbatim against live docstring
- top-level domain enumeration vs central.streams.STREAMS prefixes
- §8 STREAMS snippet vs central.streams.STREAMS
- anti-patterns adapter-name examples vs discover_adapters()
No hardcoded stream / adapter / domain lists anywhere in the test —
every expected value derives from central.streams,
central.adapter_discovery, or central.adapter at runtime.
Honest about the pre-existing `:` vs `|` dedup-key separator
inconsistency (swpc_alerts and swpc_protons use `|`; everyone else
uses `:`). Recommends `:` for new adapters without forcing a rename PR
on the SWPC pair (separators are persisted in cursors.db rows).
Acceptance bars:
(a) grep -rn 'subject_for_event\|_ADAPTER_REGISTRY' src tests → empty
(b) bidirectional override-method coverage asserted in test
(c) tests/test_producer_doc.py → 6/6 pass
(d) full pytest suite → 469 pass (was 463 pre-PR; +6 new)
(e) doc length: 823 lines (within 500–1200 envelope)
(f) code fences balanced; JSON/Python blocks parse
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
34 KiB
Central — Producer Integration
How to add a new SourceAdapter subclass to Central. This is the contract for
adapter authors: what to implement, what NOT to implement, and how a new feed
plugs into the existing stream / subject / dedup machinery without restating
anything from docs/CONSUMER-INTEGRATION.md.
The companion document is CONSUMER-INTEGRATION.md — the contract
for downstream consumers. Producer-side semantics are in scope here; consumer-side
semantics (wire format, dedup recommendations, fall-off handling at the subscriber)
are intentionally not restated. Cross-references point into that doc.
Table of contents
- What this doc is for
- The design principle
- Quick start: cloning an existing adapter
- The SourceAdapter base class
- Settings and configuration
- Subject namespace conventions
- Dedup key construction
- The StreamEntry registry
- Removal / fall-off patterns
- Anti-patterns — what NOT to do
- Settings preview hook
- Acceptance gate for a new adapter
1. What this doc is for
Audience: an engineer (or a code-gen agent acting as one) writing a new
SourceAdapter subclass in src/central/adapters/.
Covered: the SourceAdapter contract, the StreamEntry registry, subject namespace conventions, dedup-key construction, the settings preview hook, the mechanical checklist a new-adapter PR must satisfy.
NOT covered: end-to-end walkthroughs of one specific adapter (read the source
for the closest existing adapter — src/central/adapters/swpc_kindex.py is the
smallest), upstream feed API documentation (lives upstream), or consumer-side
concerns (live in CONSUMER-INTEGRATION.md).
2. The design principle
"Central takes it all and gives it all. It's up to the pipe to do with it what it will." — Matt, 2026-05-19
Adapter authors translate that single sentence into a small number of concrete rules:
- Preserve every upstream field. Anything the upstream returns lives in
Event.dataverbatim. Adapters do not silently drop fields, even ones that look redundant or low-value today. - No enrichment. Adapters do not reverse-geocode, do not call out to
upstream metadata endpoints during normal
poll()flow, do not consult a second source to "fill in" a missing field. If a downstream consumer wants enrichment, that is consumer-side work. - No opinionated translation. Adapters do not coerce units, do not rename fields to match a Central-wide vocabulary, do not collapse upstream enumerations into Central's preferred labels.
- The only adapter-side transforms are mechanical. Specifically: subject-token normalization (camelCase → snake_case, agency-prefix splitting, whitespace → underscore, lowercase) and dedup-key construction. Both are deterministic functions of upstream identifiers. Nothing else.
This rules out a whole category of plausible-sounding work that prior reviews
have already rejected. For instance, "enrich NWIS site rows with USGS
monitoring-locations metadata during poll()" was proposed for Phase 3 and
killed on this principle. The producer adds the field-preserving pipe; the pipe
ends at JetStream publish; everything richer is a downstream concern.
See §10 for the enforced list of anti-patterns. Future authors should reject the same proposals on the same grounds.
3. Quick start: cloning an existing adapter
The cheapest way to start a new adapter is to copy the smallest existing one
and edit. As of this writing the smallest is swpc_kindex (one fixed subject,
no region filter, no preview hook).
File layout for a one-file adapter:
src/central/adapters/<your_adapter>.py # the SourceAdapter subclass
tests/test_<your_adapter>.py # unit tests
When the upstream API needs shared utilities (e.g. WFIGS shares helpers across
wfigs_incidents and wfigs_perimeters), factor them into a sibling
<family>_common.py module — see wfigs_common.py and swpc_common.py for
the existing precedent.
Minimum overrides:
from collections.abc import AsyncIterator
from pathlib import Path
import aiohttp
from pydantic import BaseModel
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
class ExampleSettings(BaseModel):
# Adapter-tunable settings live here. Pydantic v2 model. Forbid extras
# if you want strictness; otherwise extra-field silence is the default.
pass
class ExampleAdapter(SourceAdapter):
name = "example"
display_name = "Example Source"
description = "One-line operator-facing description."
settings_schema = ExampleSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
async def apply_config(self, new_config: AdapterConfig) -> None:
# Re-derive any cached settings from new_config.settings.
pass
def subject_for(self, event: Event) -> str:
return f"central.<domain>.example"
async def poll(self) -> AsyncIterator[Event]:
# Fetch, parse, yield Events. Dedup using a sqlite3 cursor_db
# connection initialized in startup() — see swpc_kindex for the
# established `published_ids` table shape.
if False:
yield # type: ignore[unreachable]
return
Auto-discovery picks the adapter up the moment the module imports cleanly. No
registry edit is required — central.adapter_discovery.discover_adapters()
walks central.adapters.* at import time and registers every concrete
SourceAdapter subclass keyed by its name class-attribute.
4. The SourceAdapter base class
src/central/adapter.py is the SoT. The contract is reproduced here verbatim
where the docstrings already capture it; this section adds the supervisor-side
context that the source file does not narrate.
4.1 Class attributes
Every subclass must define:
| Attribute | Type | Purpose |
|---|---|---|
name |
str |
Short identifier, e.g. "nws". Becomes the adapter key in Event.adapter and the discovery-registry key. Must be unique across central.adapters.*. |
display_name |
str |
Human-readable name shown in the GUI. |
description |
str |
Short description shown on the adapter detail page. |
settings_schema |
type[BaseModel] |
Pydantic v2 model class for adapter settings. See §5. |
default_cadence_s |
int |
Default poll interval in seconds. Operators may override via config.adapters; AdapterConfig.cadence_s has a Pydantic ge=10 floor. |
Optional class attributes (default to None / not-in-wizard):
| Attribute | Type | Purpose |
|---|---|---|
requires_api_key |
str | None |
Key alias if an API key is required, else None. |
api_key_field |
str | None |
Names the settings_schema field that holds the api_key alias reference. The GUI renders this as a select populated from config.api_keys; the wizard validates it against the staged api_keys state. |
wizard_order |
int | None |
Position in the setup wizard. None excludes the adapter from the wizard. |
4.2 Required methods
Every subclass must implement these three abstract methods.
async def poll(self) -> AsyncIterator[Event]
Poll the source for new events. Yields
Eventobjects for each new/updated event found.
The supervisor drives the loop; poll() is a one-shot async generator the
supervisor calls once per cadence tick, drains, and discards. Adapters MUST NOT
loop internally — control of cadence and concurrency lives in the supervisor.
The supervisor wraps each yielded Event in a CloudEvents envelope and
publishes it. The adapter is responsible only for yielding Event objects;
all envelope construction, NATS publish, and metadata heartbeat emission live
outside the adapter.
async def apply_config(self, new_config: AdapterConfig) -> None
Apply new configuration to the adapter. Called by supervisor when config changes via hot-reload. The adapter should extract relevant settings from
new_config.settingsand update its internal state.
new_config.settings is a dict[str, Any]; re-validate it against
settings_schema if you intend to read structured fields. Most adapters cache
a couple of fields (e.g. region, parameter_codes) on self during
__init__ and refresh them here. Do NOT tear down self._session or
self._db here — that is shutdown()'s responsibility.
def subject_for(self, event: Event) -> str
Compute the NATS subject for an event. Each adapter knows its own subject hierarchy. The supervisor calls this to determine where to publish each event.
The result must match the subject filter of exactly one StreamEntry in
central.streams.STREAMS. See §6 for the
naming rules and §8 for the stream side.
4.3 Optional lifecycle hooks
async def startup(self) -> None — Optional. Called once before the first
poll(). Open the HTTP session, open the sqlite3 cursor DB, create any
required tables. The default is a no-op.
async def shutdown(self) -> None — Optional. Called once at graceful
shutdown. Close the HTTP session, close the sqlite3 cursor DB. The default is
a no-op.
async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None —
Optional. The settings-page preview hook. The default returns None (no
preview). See §11 for the contract.
4.4 Constructor signature
The supervisor instantiates adapters with this exact keyword-argument call
(src/central/supervisor.py _create_adapter):
cls(
config=config, # AdapterConfig — name, enabled, cadence_s, settings, …
config_store=self._config_store, # ConfigStore — staged-vs-live config access
cursor_db_path=CURSOR_DB_PATH, # Path — /var/lib/central/cursors.db
)
Adapters take exactly these three keyword arguments. Adding new constructor
parameters silently breaks supervisor instantiation; if you need additional
state, derive it from config.settings or read it lazily during startup().
5. Settings and configuration
5.1 The settings schema
settings_schema is a Pydantic v2 BaseModel subclass. It is the SoT for
operator-tunable fields and the basis for GUI form rendering, wizard
validation, and migration seed defaults. Three rules:
- Default values are SoT. The settings model's defaults are the canonical
starting state. Migrations seed
config.adapters.settingsfrom the same model; tests build fixtures by instantiating the model with no args. Do NOT duplicate default lists in migration JSON, fixtures, or test code — derive them from the model class. - Use
RegionConfigfor bounding boxes. Defined incentral.config_models.RegionConfig. Validates north/south/east/west, rejects zero-width bboxes, rejects antimeridian crossings. Reuse it; do not reinvent it. - Optional region. Some adapters strongly recommend a region (NWIS, WFIGS)
but accept
Noneso an operator can test before configuring. Log a WARN atstartup()if a region is recommended but unset; do not refuse to start.
5.2 What gets stored where
| State | Location | Adapter access |
|---|---|---|
| Adapter config (cadence, settings, enabled flag, paused state) | DB table config.adapters, fronted by ConfigStore |
Passed in __init__ as config: AdapterConfig; refreshed via apply_config() |
| API keys | DB table config.api_keys, fronted by ConfigStore |
Read via self._config_store.get_api_key(alias); never persisted on the adapter |
| Dedup cursor / observed sets | sqlite3 at cursor_db_path (/var/lib/central/cursors.db) |
Each adapter creates its own table(s); the file is shared across adapters by convention but per-adapter rows are namespaced by the adapter column |
| Stream retention / max_bytes | DB table config.streams; managed by supervisor |
Adapters do not read or write stream config |
AdapterConfig.settings is a dict[str, Any] — re-validate against
settings_schema before consuming structured fields. Most adapters do this
once in __init__ and again in apply_config().
5.3 preview_for_settings boundary
preview_for_settings() is the one place that does NOT have access to
self._config_store or cursor_db_path semantics. See
§11. The framework may instantiate adapters
with a stub config store solely to call this method, and the GUI process never
calls startup(). Do not assume the adapter's HTTP session, cursor DB, or any
other startup-time state exists when preview runs.
6. Subject namespace conventions
6.1 The shape
Every subject starts with central. and decomposes as:
central.<domain>.<subtype>[.<dimensions>...]
<domain>is one ofwx,fire,quake,space,disaster,hydro,meta(the current set — see §8 for adding one). Operators MUST be able to subscribe to all of one domain withcentral.<domain>.>.<subtype>is adapter-driven and identifies the event category within the domain (e.g.hotspot,alert,incident,kindex,proton_flux).<dimensions>are zero or more refinement tokens — geographic, temporal, upstream identifier — that the consumer would plausibly want to filter on.
Token rules:
- All lowercase. No uppercase, no mixed-case.
snake_casetokens. Multi-word tokens use underscores (proton_flux,sea_lake_ice), not hyphens.- No empty tokens. Fall back to a literal
unknowntoken when a dimension is genuinely absent (NWS does this for alerts lacking a primary region). - No
.removedinjected mid-token. Removal subjects always read<subtype>.removed.<region>— see §9.
6.2 Where the canonical subjects live
docs/CONSUMER-INTEGRATION.md §4 is the SoT for the concrete subject patterns
that exist today. Adapter authors should mirror new subjects there as part of
the same PR; the consumer test suite asserts every discovered adapter has a
per-adapter subsection.
6.3 Subject-token decomposition belongs in the adapter
When an upstream identifier is composite — agency-prefix + bare site number, state + county, satellite + confidence — the adapter is responsible for splitting it into tokens. Two conventions:
-
One helper function per decomposition, one place. NWIS' helper sits at module scope in
nwis.py:def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]: """Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no).""" ...Both
subject_for()andEvent.categoryconstruction call through it. WFIGS does the same withsubject_suffix()inwfigs_common.py. Never inline the decomposition in two places — they will drift. -
Round-trip via
Event.category. Adapters setEvent.categoryto a Central-relative form (hydro.<param>.<agency>.<bare_site_no>,fire.incident.<state>.<county>);subject_for()then prependscentral.and may rearrange tokens. The category lives in the wire payload (seeCONSUMER-INTEGRATION.md§5b); the subject is computed from it at publish time.
6.4 Adding a new top-level domain
Top-level domains are stream-mapped 1:1 (see §8).
Adding central.<new_domain>.> requires:
- One new
StreamEntryinsrc/central/streams.py. - One new migration row that seeds
config.streamswith retention andmax_bytes. - Update
docs/CONSUMER-INTEGRATION.md§3 and §4 to advertise the new stream / subject pattern.
supervisor.STREAM_SUBJECTS, archive.STREAMS, and gui.DASHBOARD_STREAMS
all derive from STREAMS at import time; no code edits in those modules are
required.
7. Dedup key construction
7.1 What "dedup key" means here
Adapters maintain a per-adapter sqlite3 table at cursor_db_path (typically
published_ids keyed by (adapter, event_id)) to suppress re-yielding the
same event across successive poll() runs. The string the adapter writes to
published_ids.event_id is the adapter-side dedup key.
Separately, every yielded Event.id flows into the CloudEvents envelope's
id field (also set as the Nats-Msg-Id header on publish). Most adapters
use the same string for both — adapter-side dedup key and Event.id are
identical — which keeps consumer-side dedup (§9 of CONSUMER-INTEGRATION.md)
trivial.
A few adapters intentionally diverge: eonet stores a richer composite
adapter-side (id + timeline date) while keeping Event.id stable, because
consumers usually want timeline updates to overwrite the same id. The
Event.id is always the consumer contract; adapter-side composites are an
implementation detail.
7.2 Two shapes
Single-token. Upstream provides a stable, globally-unique identifier:
usgs_quake— USGS feature id (ci10240102)gdacs— RSS guid (WF1028708)wfigs_incidents/wfigs_perimeters— IRWIN UUIDinciweb— numeric incident idswpc_kindex— bin timestamp (single fixed subject, time is the natural key)nws— alert URLeonet—EONET_NNNNN(consumer-facing); composite tracked separately adapter-side as described in §7.1
Use the upstream id directly. No prefix, no separators.
Composite. Upstream id is not by itself sufficient — needs a parameter, a timestamp, or a tombstone marker to dedup correctly:
| Adapter | Shape |
|---|---|
firms |
<satellite>:<acq_date>:<acq_time>:<lat_3dp>:<lon_3dp> |
nwis |
nwis:<monitoring_location_id>:<parameter_code>:<time_iso> |
wfigs_incidents / wfigs_perimeters removals |
<IrwinID>:removed:<iso_now> |
7.3 Separator convention
Use : as the dedup-key separator for new adapters.
There is a pre-existing wrinkle: two SWPC adapters use | instead.
| Adapter | Separator | Shape |
|---|---|---|
swpc_alerts |
| |
<product_id>|<issue_timestamp> |
swpc_protons |
| |
<time_tag>|<energy> |
This is the historical reality at the time of writing — not a recommendation.
Do not propagate | to new adapters; do not refactor the SWPC pair to :
without an independent rename PR (the strings are persisted in
/var/lib/central/cursors.db rows).
7.4 Implementation
Every existing adapter follows the same pattern:
async def startup(self) -> None:
...
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
Then is_published(event_id) -> bool, mark_published(event_id) -> None, and
a periodic sweep_old_ids() (typical retention: 7 to 30 days, tuned per
adapter). Copy from swpc_kindex.py — it is the canonical reference shape.
7.5 The CloudEvents id field
Event.id becomes the CloudEvents envelope id and the Nats-Msg-Id header.
JetStream applies its own short dedup window keyed on Nats-Msg-Id, but
consumers do their own dedup (CONSUMER-INTEGRATION.md §9)
and should not rely on JetStream's window. Stability of Event.id across
re-publishes is the producer's contract; the consumer side trusts it.
8. The StreamEntry registry
src/central/streams.py is the SoT for stream definitions. The whole file is
short enough to quote:
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True
dashboard: bool = True
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
8.1 What derives from STREAMS
Three import-time consumers read STREAMS and build their own structures:
| Derived constant | Module | Shape |
|---|---|---|
STREAM_SUBJECTS |
central.supervisor |
dict[str, list[str]] — used to create JetStream streams |
STREAMS |
central.archive |
list[tuple[str, str]] for event_bearing=True entries only |
DASHBOARD_STREAMS |
central.gui.routes |
list[str] for dashboard=True entries |
tests/test_stream_registry.py asserts each derivation matches the registry —
adding a stream automatically extends the test surface; no new test code is
required.
8.2 Adding a stream
When a new adapter needs a new top-level domain:
- One line in
src/central/streams.py. Append aStreamEntry(...)to theSTREAMSlist. - One migration row. Seed
config.streamswith retention andmax_bytes. Existing migrations inmigrations/are the template.
event_bearing=False is reserved for CENTRAL_META today; adding a second
non-event-bearing stream silently excludes it from the archive. The registry
test explicitly guards against this and requires a deliberate test update if
the invariant changes.
8.3 The consistency gate
tests/test_stream_registry.py is the property-test gate. It asserts:
- Stream names and subject filters are unique.
- Every subject filter matches
^central\.[a-z][a-z_]*\.>$. CENTRAL_METAis the only non-event-bearing stream.- The supervisor / archive / GUI derivations all match the registry.
Mirror that discipline in any new test you write for adapter-side stream
behavior: derive expected values from STREAMS rather than hand-typing them.
9. Removal / fall-off patterns
When an upstream record disappears, an adapter has two options:
9.1 Explicit removal subject
Emit an Event whose subject is <subtype>.removed.<region> and whose
category ends in .removed. This is the right choice when:
- Upstream signals retirement explicitly (
gdacs'siscurrent: false). - The upstream is a "current state" feed and absence is meaningful
(
wfigs_incidents,wfigs_perimeters,eonet).
The subtype-before-removed token order is a hard convention — consumers
filtering on central.<domain>.<subtype>.> receive both live and removal
events with a single subscription. See
CONSUMER-INTEGRATION.md §7a
for the consumer side.
9.2 Absence-as-signal
No removal subject; consumers infer disappearance from a time gap or an
upstream-specific marker (expires field, supersession text). This is the
right choice when:
- The upstream is a stream of point-in-time observations (
firmshotspot pixels,swpc_kindex3-hour bins,swpc_protonscadence-driven samples). - Upstream signals expiry inline (
nwsexpiresfield,nwsmessageType: Cancel). - The upstream is durable / append-only (
usgs_quakecatalog,inciwebRSS).
See CONSUMER-INTEGRATION.md §7b
for the consumer-side enumeration.
9.3 Composite removal dedup key
When an adapter emits explicit removals, the same upstream id can fall off and re-appear multiple times over an adapter's lifetime. The dedup key for a removal event therefore needs a removal-detection timestamp:
Event(
id=f"{irwin_id}:removed:{now.isoformat()}",
category="fire.incident.removed",
# ...
)
The :removed:<iso_now> suffix is the cross-adapter convention. Keep it.
9.4 Removal payload schema
Removal events ship an empty geo block and a sparse data payload. The
exact shape (id field name, reason values, last_observed_at, adapter-specific
context fields) is documented in
CONSUMER-INTEGRATION.md §7c;
adapter authors should mirror it. Do not restate it here.
10. Anti-patterns — what NOT to do
These are the patterns prior reviews have explicitly rejected. Reject them again on sight in a new-adapter PR.
10.1 Enrichment during poll()
No calls to upstream metadata endpoints, no reverse-geocode, no consultation
of a second source to fill in fields the primary feed omitted. The "NWIS
enrichment" Phase 3 proposal — joining live measurements against the
monitoring-locations metadata endpoint during poll() — was rejected on the
§2 principle. Future proposals along the same
lines get the same answer.
If enrichment is genuinely necessary, the right shape is a separate adapter
(or a downstream consumer) — not an if metadata_missing: await fetch_metadata() branch buried in an adapter's poll().
10.2 Silent field dropping
Event.data carries the upstream payload verbatim. Do not omit a field
because it "looks redundant" or "isn't useful right now." Operators
debugging a downstream consumer often need every field the upstream sent;
dropping fields makes that impossible after the fact.
The two acceptable transforms — token normalization (case, separators) and dedup-key construction — are not field-drops. They produce new fields derived from the upstream payload; the upstream payload itself is preserved.
10.3 Magic-number asserts in tests
Adapter tests that pin event counts to literal integers (assert len(events) == 47) break the next time the upstream's fixture is
refreshed and contribute zero diagnostic value. Either assert structural
invariants (every yielded event has a non-empty id; subjects match the
adapter's subject_for() output; etc.) or assert against a count computed
from the input fixture (assert len(events) == len(fixture.features)).
10.4 Hardcoded stream / adapter lists in tests
Tests that enumerate ["CENTRAL_WX", "CENTRAL_FIRE", ...] or ["firms", "inciweb", ...] as literals drift the moment a stream or adapter is added
or removed. Derive from the SoT:
from central.streams import STREAMS
from central.adapter_discovery import discover_adapters
stream_names = {s.name for s in STREAMS}
adapter_names = set(discover_adapters().keys())
tests/test_stream_registry.py and tests/test_consumer_doc.py are the
in-tree templates. The sibling test for this doc
(tests/test_producer_doc.py) follows the same rule.
10.5 Persistent state outside cursor_db_path
Adapters MUST NOT write to disk outside the cursor DB at
cursor_db_path. No JSON files in /var/lib/central/<adapter>/, no
state stashed in /tmp. Multi-process invariants (the supervisor may
restart an adapter; the GUI may instantiate it independently to call
preview_for_settings()) only hold for the cursor DB and config.adapters.
10.6 Blocking calls in async paths
poll(), apply_config(), startup(), shutdown(), and
preview_for_settings() are async. Use aiohttp, not requests; use
asyncio.to_thread() to push CPU-bound or genuinely blocking work
(shapely geometry operations, large JSON parsing) off the event loop.
The supervisor runs every adapter on a single event loop — one blocked
adapter stalls every other adapter.
10.7 Reaching into framework internals from preview_for_settings
preview_for_settings() is a pure function of settings. Do NOT access
self._config_store, self._cursor_db_path, or any state established
in startup(). The framework may instantiate the adapter with a stub
config store solely to call this method. See §11.
10.8 Hardcoded subject strings in tests
A test that asserts subject == "central.fire.incident.mt.flathead" for a
specific fixture is fine; a test that hardcodes the subject pattern as a
string and re-derives it instead of calling the adapter's
subject_for(event) is the wrong shape. Round-trip through the adapter so
the test breaks the day someone shifts a token.
11. Settings preview hook
preview_for_settings() is the optional GUI-side hook that surfaces a
settings-driven preview on the adapter edit page. NWIS uses it to show the
monitoring-locations inside the configured bbox before the operator hits
save; most adapters opt out by inheriting the base class's return None.
11.1 The contract (verbatim from central.adapter.SourceAdapter)
Optional. Override to surface a settings-driven preview on the edit page.
Return
list[dict](framework renders as a generic table; columns come from the first dict's keys, in insertion order). ReturnNoneto skip preview. Raise to surface an error banner — framework catches at the route boundary.Contract:
- Preview is a pure function of
settings. Do NOT accessself._config_storeorcursor_dbstate — the framework may instantiate adapters with a stub config_store solely to call this method.- Network preview implementations must open their own short-lived
aiohttpsession (the adapter's polling session may not exist; the GUI process never callsstartup()).- Return
Nonewhen preview is not meaningful (e.g., required settings like region are unset). Return[]explicitly if the query ran and matched zero rows — the framework renders that distinctly fromNone.
11.2 The three return states
| Return | Framework renders |
|---|---|
None |
Nothing — no preview block on the edit page |
[] |
"Preview (0 rows)" legend, no table |
list[dict] with rows |
Generic table; columns from the first dict's keys in insertion order; legend "Preview (N rows)" |
tests/test_preview_hook.py is the test surface for the partial; mirror its
discipline for any adapter-specific preview test.
11.3 Generic rendering — no per-adapter Jinja
The framework is duck-typed on list[dict]. Column headers are the first
dict's keys. There is no per-adapter branch in the template, and no
adapter-name conditionals are accepted on review. If an adapter's preview
needs a column the framework does not surface, the adapter rewrites the dict
shape, not the template.
11.4 Implementation pattern
NWIS is the canonical example. Skeleton:
async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None:
if settings.region is None:
return None
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=15),
) as session:
async with session.get(url, headers=...) as resp:
resp.raise_for_status()
page = await resp.json()
return [
{"site_id": ..., "name": ..., "site_type": ..., "state": ...}
for feat in page.get("features") or []
]
Key points:
- Fresh
aiohttp.ClientSessionper call —self._sessionmay not exist. - Short timeout (NWIS uses 15s) — the edit page must render quickly.
- Small row cap (NWIS uses 50) — the preview is for orientation, not bulk inspection.
- Raise on HTTP / parse failure — the framework catches at the route boundary and renders an error banner.
12. Acceptance gate for a new adapter
A new-adapter PR is accepted when every item below holds. The list is mechanical; reviewers and the PR author should both run through it before requesting / granting merge.
- Upstream curl evidence captured. A real HTTP transcript (curl command + redacted response body) is pinned somewhere in the PR description or a fixture file. Documents what the adapter was written against.
SourceAdaptersubclass conforms to §4. Class attributes set, three abstract methods implemented, constructor signature matches the supervisor call exactly.tests/test_<adapter>.pyexists. Unit tests for the adapter; no magic-number asserts (§10.3); no hardcoded stream / adapter lists (§10.4).- Dry-run on CT104 passes. The supervisor instantiates the adapter
without errors; one full
poll()cycle yields events without raising. - Dedup keys stable across multiple ticks. Two successive
poll()runs against a frozen upstream fixture yield the same event-count delta as expected (zero new events on the second run if upstream is unchanged). - No hardcoded values that should be derived. Subject prefixes, stream names, default parameter lists, etc., all derive from the registry / settings model / discovery surface.
CONSUMER-INTEGRATION.mdupdated. New per-adapter subsection in §6, new subject row in §4, new stream row in §3 if a new domain was introduced.tests/test_consumer_doc.pypasses.PRODUCER-INTEGRATION.mdmentions the adapter only if it introduces a new convention. This doc is contract-shaped, not catalog-shaped — most new adapters need no edit here.- Full pytest suite green.