From 6afe80ded332e76c3b93534eea901c6704acdb6c Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Tue, 19 May 2026 21:17:48 +0000 Subject: [PATCH 1/2] =?UTF-8?q?docs(2-I):=20producer=20integration=20spec?= =?UTF-8?q?=20=E2=80=94=20docs/PRODUCER-INTEGRATION.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The producer-side contract for adapter authors, mirroring PR H's consumer spec. Self-contained — readers should not need to grep the codebase to understand what a new SourceAdapter subclass must implement. Bakes in the Phase 2 design principle ("Central takes it all and gives it all. It's up to the pipe to do with it what it will.") so future authors reject enrichment / silent-drop / opinionated-translation proposals on sight. The previously-proposed Phase 3 NWIS metadata-enrichment ticket is called out by name as an example of what gets rejected. 12-section outline locked with PM: design principle, quick start (clone swpc_kindex), SourceAdapter base class, settings, subject namespace, dedup keys, StreamEntry registry, removal/fall-off, anti-patterns, preview hook, acceptance gate. Sibling test (tests/test_producer_doc.py) mirrors test_consumer_doc.py discipline: - bidirectional == between SourceAdapter API and §4 method coverage - preview_for_settings contract verbatim against live docstring - top-level domain enumeration vs central.streams.STREAMS prefixes - §8 STREAMS snippet vs central.streams.STREAMS - anti-patterns adapter-name examples vs discover_adapters() No hardcoded stream / adapter / domain lists anywhere in the test — every expected value derives from central.streams, central.adapter_discovery, or central.adapter at runtime. Honest about the pre-existing `:` vs `|` dedup-key separator inconsistency (swpc_alerts and swpc_protons use `|`; everyone else uses `:`). Recommends `:` for new adapters without forcing a rename PR on the SWPC pair (separators are persisted in cursors.db rows). Acceptance bars: (a) grep -rn 'subject_for_event\|_ADAPTER_REGISTRY' src tests → empty (b) bidirectional override-method coverage asserted in test (c) tests/test_producer_doc.py → 6/6 pass (d) full pytest suite → 469 pass (was 463 pre-PR; +6 new) (e) doc length: 823 lines (within 500–1200 envelope) (f) code fences balanced; JSON/Python blocks parse Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/PRODUCER-INTEGRATION.md | 823 +++++++++++++++++++++++++++++++++++ tests/test_producer_doc.py | 215 +++++++++ 2 files changed, 1038 insertions(+) create mode 100644 docs/PRODUCER-INTEGRATION.md create mode 100644 tests/test_producer_doc.py diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md new file mode 100644 index 0000000..02ce1bf --- /dev/null +++ b/docs/PRODUCER-INTEGRATION.md @@ -0,0 +1,823 @@ +# Central — Producer Integration + +How to add a new `SourceAdapter` subclass to Central. This is the contract for +adapter authors: what to implement, what NOT to implement, and how a new feed +plugs into the existing stream / subject / dedup machinery without restating +anything from `docs/CONSUMER-INTEGRATION.md`. + +The companion document is [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md) — the contract +for downstream consumers. Producer-side semantics are in scope here; consumer-side +semantics (wire format, dedup recommendations, fall-off handling at the subscriber) +are intentionally not restated. Cross-references point into that doc. + +## Table of contents + +1. [What this doc is for](#1-what-this-doc-is-for) +2. [The design principle](#2-the-design-principle) +3. [Quick start: cloning an existing adapter](#3-quick-start-cloning-an-existing-adapter) +4. [The SourceAdapter base class](#4-the-sourceadapter-base-class) +5. [Settings and configuration](#5-settings-and-configuration) +6. [Subject namespace conventions](#6-subject-namespace-conventions) +7. [Dedup key construction](#7-dedup-key-construction) +8. [The StreamEntry registry](#8-the-streamentry-registry) +9. [Removal / fall-off patterns](#9-removal--fall-off-patterns) +10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do) +11. [Settings preview hook](#11-settings-preview-hook) +12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter) + +--- + +## 1. What this doc is for + +**Audience:** an engineer (or a code-gen agent acting as one) writing a new +`SourceAdapter` subclass in `src/central/adapters/`. + +**Covered:** the SourceAdapter contract, the StreamEntry registry, subject +namespace conventions, dedup-key construction, the settings preview hook, the +mechanical checklist a new-adapter PR must satisfy. + +**NOT covered:** end-to-end walkthroughs of one specific adapter (read the source +for the closest existing adapter — `src/central/adapters/swpc_kindex.py` is the +smallest), upstream feed API documentation (lives upstream), or consumer-side +concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)). + +--- + +## 2. The design principle + +> "Central takes it all and gives it all. It's up to the pipe to do with it +> what it will." +> — Matt, 2026-05-19 + +Adapter authors translate that single sentence into a small number of concrete +rules: + +- **Preserve every upstream field.** Anything the upstream returns lives in + `Event.data` verbatim. Adapters do not silently drop fields, even ones that + look redundant or low-value today. +- **No enrichment.** Adapters do not reverse-geocode, do not call out to + upstream metadata endpoints during normal `poll()` flow, do not consult a + second source to "fill in" a missing field. If a downstream consumer wants + enrichment, that is consumer-side work. +- **No opinionated translation.** Adapters do not coerce units, do not rename + fields to match a Central-wide vocabulary, do not collapse upstream + enumerations into Central's preferred labels. +- **The only adapter-side transforms are mechanical.** Specifically: + subject-token normalization (camelCase → snake_case, agency-prefix splitting, + whitespace → underscore, lowercase) and dedup-key construction. Both are + deterministic functions of upstream identifiers. Nothing else. + +This rules out a whole category of plausible-sounding work that prior reviews +have already rejected. For instance, "enrich NWIS site rows with USGS +monitoring-locations metadata during `poll()`" was proposed for Phase 3 and +killed on this principle. The producer adds the field-preserving pipe; the pipe +ends at JetStream publish; everything richer is a downstream concern. + +See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of +anti-patterns. Future authors should reject the same proposals on the same +grounds. + +--- + +## 3. Quick start: cloning an existing adapter + +The cheapest way to start a new adapter is to copy the smallest existing one +and edit. As of this writing the smallest is `swpc_kindex` (one fixed subject, +no region filter, no preview hook). + +**File layout for a one-file adapter:** + +``` +src/central/adapters/.py # the SourceAdapter subclass +tests/test_.py # unit tests +``` + +When the upstream API needs shared utilities (e.g. WFIGS shares helpers across +`wfigs_incidents` and `wfigs_perimeters`), factor them into a sibling +`_common.py` module — see `wfigs_common.py` and `swpc_common.py` for +the existing precedent. + +**Minimum overrides:** + +```python +from collections.abc import AsyncIterator +from pathlib import Path + +import aiohttp +from pydantic import BaseModel + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + + +class ExampleSettings(BaseModel): + # Adapter-tunable settings live here. Pydantic v2 model. Forbid extras + # if you want strictness; otherwise extra-field silence is the default. + pass + + +class ExampleAdapter(SourceAdapter): + name = "example" + display_name = "Example Source" + description = "One-line operator-facing description." + settings_schema = ExampleSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + # Re-derive any cached settings from new_config.settings. + pass + + def subject_for(self, event: Event) -> str: + return f"central..example" + + async def poll(self) -> AsyncIterator[Event]: + # Fetch, parse, yield Events. Dedup using a sqlite3 cursor_db + # connection initialized in startup() — see swpc_kindex for the + # established `published_ids` table shape. + if False: + yield # type: ignore[unreachable] + return +``` + +Auto-discovery picks the adapter up the moment the module imports cleanly. No +registry edit is required — `central.adapter_discovery.discover_adapters()` +walks `central.adapters.*` at import time and registers every concrete +`SourceAdapter` subclass keyed by its `name` class-attribute. + +--- + +## 4. The SourceAdapter base class + +`src/central/adapter.py` is the SoT. The contract is reproduced here verbatim +where the docstrings already capture it; this section adds the supervisor-side +context that the source file does not narrate. + +### 4.1 Class attributes + +Every subclass must define: + +| Attribute | Type | Purpose | +|---|---|---| +| `name` | `str` | Short identifier, e.g. `"nws"`. Becomes the `adapter` key in `Event.adapter` and the discovery-registry key. Must be unique across `central.adapters.*`. | +| `display_name` | `str` | Human-readable name shown in the GUI. | +| `description` | `str` | Short description shown on the adapter detail page. | +| `settings_schema` | `type[BaseModel]` | Pydantic v2 model class for adapter settings. See [§5](#5-settings-and-configuration). | +| `default_cadence_s` | `int` | Default poll interval in seconds. Operators may override via `config.adapters`; `AdapterConfig.cadence_s` has a Pydantic `ge=10` floor. | + +Optional class attributes (default to `None` / not-in-wizard): + +| Attribute | Type | Purpose | +|---|---|---| +| `requires_api_key` | `str \| None` | Key alias if an API key is required, else `None`. | +| `api_key_field` | `str \| None` | Names the `settings_schema` field that holds the api_key alias reference. The GUI renders this as a select populated from `config.api_keys`; the wizard validates it against the staged `api_keys` state. | +| `wizard_order` | `int \| None` | Position in the setup wizard. `None` excludes the adapter from the wizard. | + +### 4.2 Required methods + +Every subclass must implement these three abstract methods. + +**`async def poll(self) -> AsyncIterator[Event]`** + +> Poll the source for new events. Yields `Event` objects for each new/updated +> event found. + +The supervisor drives the loop; `poll()` is a one-shot async generator the +supervisor calls once per cadence tick, drains, and discards. Adapters MUST NOT +loop internally — control of cadence and concurrency lives in the supervisor. + +The supervisor wraps each yielded `Event` in a CloudEvents envelope and +publishes it. The adapter is responsible only for yielding `Event` objects; +all envelope construction, NATS publish, and metadata heartbeat emission live +outside the adapter. + +**`async def apply_config(self, new_config: AdapterConfig) -> None`** + +> Apply new configuration to the adapter. Called by supervisor when config +> changes via hot-reload. The adapter should extract relevant settings from +> `new_config.settings` and update its internal state. + +`new_config.settings` is a `dict[str, Any]`; re-validate it against +`settings_schema` if you intend to read structured fields. Most adapters cache +a couple of fields (e.g. `region`, `parameter_codes`) on `self` during +`__init__` and refresh them here. Do NOT tear down `self._session` or +`self._db` here — that is `shutdown()`'s responsibility. + +**`def subject_for(self, event: Event) -> str`** + +> Compute the NATS subject for an event. Each adapter knows its own subject +> hierarchy. The supervisor calls this to determine where to publish each +> event. + +The result must match the subject filter of exactly one `StreamEntry` in +`central.streams.STREAMS`. See [§6](#6-subject-namespace-conventions) for the +naming rules and [§8](#8-the-streamentry-registry) for the stream side. + +### 4.3 Optional lifecycle hooks + +**`async def startup(self) -> None`** — Optional. Called once before the first +`poll()`. Open the HTTP session, open the sqlite3 cursor DB, create any +required tables. The default is a no-op. + +**`async def shutdown(self) -> None`** — Optional. Called once at graceful +shutdown. Close the HTTP session, close the sqlite3 cursor DB. The default is +a no-op. + +**`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** — +Optional. The settings-page preview hook. The default returns `None` (no +preview). See [§11](#11-settings-preview-hook) for the contract. + +### 4.4 Constructor signature + +The supervisor instantiates adapters with this exact keyword-argument call +(`src/central/supervisor.py` `_create_adapter`): + +```python +cls( + config=config, # AdapterConfig — name, enabled, cadence_s, settings, … + config_store=self._config_store, # ConfigStore — staged-vs-live config access + cursor_db_path=CURSOR_DB_PATH, # Path — /var/lib/central/cursors.db +) +``` + +Adapters take exactly these three keyword arguments. Adding new constructor +parameters silently breaks supervisor instantiation; if you need additional +state, derive it from `config.settings` or read it lazily during `startup()`. + +--- + +## 5. Settings and configuration + +### 5.1 The settings schema + +`settings_schema` is a Pydantic v2 `BaseModel` subclass. It is the SoT for +operator-tunable fields and the basis for GUI form rendering, wizard +validation, and migration seed defaults. Three rules: + +- **Default values are SoT.** The settings model's defaults are the canonical + starting state. Migrations seed `config.adapters.settings` from the same + model; tests build fixtures by instantiating the model with no args. Do NOT + duplicate default lists in migration JSON, fixtures, or test code — derive + them from the model class. +- **Use `RegionConfig` for bounding boxes.** Defined in + `central.config_models.RegionConfig`. Validates north/south/east/west, + rejects zero-width bboxes, rejects antimeridian crossings. Reuse it; do not + reinvent it. +- **Optional region.** Some adapters strongly recommend a region (NWIS, WFIGS) + but accept `None` so an operator can test before configuring. Log a WARN at + `startup()` if a region is recommended but unset; do not refuse to start. + +### 5.2 What gets stored where + +| State | Location | Adapter access | +|---|---|---| +| Adapter config (cadence, settings, enabled flag, paused state) | DB table `config.adapters`, fronted by `ConfigStore` | Passed in `__init__` as `config: AdapterConfig`; refreshed via `apply_config()` | +| API keys | DB table `config.api_keys`, fronted by `ConfigStore` | Read via `self._config_store.get_api_key(alias)`; never persisted on the adapter | +| Dedup cursor / observed sets | sqlite3 at `cursor_db_path` (`/var/lib/central/cursors.db`) | Each adapter creates its own table(s); the file is shared across adapters by convention but per-adapter rows are namespaced by the `adapter` column | +| Stream retention / max_bytes | DB table `config.streams`; managed by supervisor | Adapters do not read or write stream config | + +`AdapterConfig.settings` is a `dict[str, Any]` — re-validate against +`settings_schema` before consuming structured fields. Most adapters do this +once in `__init__` and again in `apply_config()`. + +### 5.3 preview_for_settings boundary + +`preview_for_settings()` is the one place that does NOT have access to +`self._config_store` or `cursor_db_path` semantics. See +[§11](#11-settings-preview-hook). The framework may instantiate adapters +with a stub config store solely to call this method, and the GUI process never +calls `startup()`. Do not assume the adapter's HTTP session, cursor DB, or any +other startup-time state exists when preview runs. + +--- + +## 6. Subject namespace conventions + +### 6.1 The shape + +Every subject starts with `central.` and decomposes as: + +``` +central..[....] +``` + +- `` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`, + `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding + one). Operators MUST be able to subscribe to all of one domain with + `central..>`. +- `` is adapter-driven and identifies the event category within the + domain (e.g. `hotspot`, `alert`, `incident`, `kindex`, `proton_flux`). +- `` are zero or more refinement tokens — geographic, temporal, + upstream identifier — that the consumer would plausibly want to filter on. + +Token rules: + +- **All lowercase.** No uppercase, no mixed-case. +- **`snake_case` tokens.** Multi-word tokens use underscores + (`proton_flux`, `sea_lake_ice`), not hyphens. +- **No empty tokens.** Fall back to a literal `unknown` token when a dimension + is genuinely absent (NWS does this for alerts lacking a primary region). +- **No `.removed` injected mid-token.** Removal subjects always read + `.removed.` — see [§9](#9-removal--fall-off-patterns). + +### 6.2 Where the canonical subjects live + +`docs/CONSUMER-INTEGRATION.md` §4 is the SoT for the concrete subject patterns +that exist today. Adapter authors should mirror new subjects there as part of +the same PR; the consumer test suite asserts every discovered adapter has a +per-adapter subsection. + +### 6.3 Subject-token decomposition belongs in the adapter + +When an upstream identifier is composite — agency-prefix + bare site number, +state + county, satellite + confidence — the adapter is responsible for +splitting it into tokens. Two conventions: + +- **One helper function per decomposition, one place.** NWIS' helper sits at + module scope in `nwis.py`: + + ```python + def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]: + """Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no).""" + ... + ``` + + Both `subject_for()` and `Event.category` construction call through it. WFIGS + does the same with `subject_suffix()` in `wfigs_common.py`. Never inline the + decomposition in two places — they will drift. + +- **Round-trip via `Event.category`.** Adapters set `Event.category` to a + Central-relative form (`hydro...`, + `fire.incident..`); `subject_for()` then prepends `central.` + and may rearrange tokens. The category lives in the wire payload (see + [`CONSUMER-INTEGRATION.md` §5b](./CONSUMER-INTEGRATION.md#5b-inner-event-payload)); + the subject is computed from it at publish time. + +### 6.4 Adding a new top-level domain + +Top-level domains are stream-mapped 1:1 (see [§8](#8-the-streamentry-registry)). +Adding `central..>` requires: + +1. One new `StreamEntry` in `src/central/streams.py`. +2. One new migration row that seeds `config.streams` with retention and + `max_bytes`. +3. Update `docs/CONSUMER-INTEGRATION.md` §3 and §4 to advertise the new + stream / subject pattern. + +`supervisor.STREAM_SUBJECTS`, `archive.STREAMS`, and `gui.DASHBOARD_STREAMS` +all derive from `STREAMS` at import time; no code edits in those modules are +required. + +--- + +## 7. Dedup key construction + +### 7.1 What "dedup key" means here + +Adapters maintain a per-adapter sqlite3 table at `cursor_db_path` (typically +`published_ids` keyed by `(adapter, event_id)`) to suppress re-yielding the +same event across successive `poll()` runs. The string the adapter writes to +`published_ids.event_id` is the **adapter-side dedup key**. + +Separately, every yielded `Event.id` flows into the CloudEvents envelope's +`id` field (also set as the `Nats-Msg-Id` header on publish). Most adapters +use the same string for both — adapter-side dedup key and `Event.id` are +identical — which keeps consumer-side dedup ([§9 of CONSUMER-INTEGRATION.md](./CONSUMER-INTEGRATION.md#9-dedup-implementation-guide)) +trivial. + +A few adapters intentionally diverge: `eonet` stores a richer composite +adapter-side (id + timeline date) while keeping `Event.id` stable, because +consumers usually want timeline updates to overwrite the same id. The +`Event.id` is always the consumer contract; adapter-side composites are an +implementation detail. + +### 7.2 Two shapes + +**Single-token.** Upstream provides a stable, globally-unique identifier: + +- `usgs_quake` — USGS feature id (`ci10240102`) +- `gdacs` — RSS guid (`WF1028708`) +- `wfigs_incidents` / `wfigs_perimeters` — IRWIN UUID +- `inciweb` — numeric incident id +- `swpc_kindex` — bin timestamp (single fixed subject, time is the natural key) +- `nws` — alert URL +- `eonet` — `EONET_NNNNN` (consumer-facing); composite tracked separately + adapter-side as described in §7.1 + +Use the upstream id directly. No prefix, no separators. + +**Composite.** Upstream id is not by itself sufficient — needs a parameter, a +timestamp, or a tombstone marker to dedup correctly: + +| Adapter | Shape | +|---|---| +| `firms` | `::::` | +| `nwis` | `nwis:::` | +| `wfigs_incidents` / `wfigs_perimeters` removals | `:removed:` | + +### 7.3 Separator convention + +Use `:` as the dedup-key separator for new adapters. + +There is a pre-existing wrinkle: two SWPC adapters use `|` instead. + +| Adapter | Separator | Shape | +|---|---|---| +| `swpc_alerts` | `\|` | `\|` | +| `swpc_protons` | `\|` | `\|` | + +This is the historical reality at the time of writing — not a recommendation. +Do not propagate `|` to new adapters; do not refactor the SWPC pair to `:` +without an independent rename PR (the strings are persisted in +`/var/lib/central/cursors.db` rows). + +### 7.4 Implementation + +Every existing adapter follows the same pattern: + +```python +async def startup(self) -> None: + ... + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() +``` + +Then `is_published(event_id) -> bool`, `mark_published(event_id) -> None`, and +a periodic `sweep_old_ids()` (typical retention: 7 to 30 days, tuned per +adapter). Copy from `swpc_kindex.py` — it is the canonical reference shape. + +### 7.5 The CloudEvents `id` field + +`Event.id` becomes the CloudEvents envelope `id` and the `Nats-Msg-Id` header. +JetStream applies its own short dedup window keyed on `Nats-Msg-Id`, but +consumers do their own dedup ([CONSUMER-INTEGRATION.md §9](./CONSUMER-INTEGRATION.md#9-dedup-implementation-guide)) +and should not rely on JetStream's window. Stability of `Event.id` across +re-publishes is the producer's contract; the consumer side trusts it. + +--- + +## 8. The StreamEntry registry + +`src/central/streams.py` is the SoT for stream definitions. The whole file is +short enough to quote: + +```python +@dataclass(frozen=True) +class StreamEntry: + name: str + subject_filter: str + event_bearing: bool = True + dashboard: bool = True + + +STREAMS: list[StreamEntry] = [ + StreamEntry("CENTRAL_WX", "central.wx.>"), + StreamEntry("CENTRAL_FIRE", "central.fire.>"), + StreamEntry("CENTRAL_QUAKE", "central.quake.>"), + StreamEntry("CENTRAL_SPACE", "central.space.>"), + StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), + StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), + StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), +] +``` + +### 8.1 What derives from STREAMS + +Three import-time consumers read `STREAMS` and build their own structures: + +| Derived constant | Module | Shape | +|---|---|---| +| `STREAM_SUBJECTS` | `central.supervisor` | `dict[str, list[str]]` — used to create JetStream streams | +| `STREAMS` | `central.archive` | `list[tuple[str, str]]` for `event_bearing=True` entries only | +| `DASHBOARD_STREAMS` | `central.gui.routes` | `list[str]` for `dashboard=True` entries | + +`tests/test_stream_registry.py` asserts each derivation matches the registry — +adding a stream automatically extends the test surface; no new test code is +required. + +### 8.2 Adding a stream + +When a new adapter needs a new top-level domain: + +1. **One line in `src/central/streams.py`.** Append a `StreamEntry(...)` to + the `STREAMS` list. +2. **One migration row.** Seed `config.streams` with retention and + `max_bytes`. Existing migrations in `migrations/` are the template. + +`event_bearing=False` is reserved for `CENTRAL_META` today; adding a second +non-event-bearing stream silently excludes it from the archive. The registry +test explicitly guards against this and requires a deliberate test update if +the invariant changes. + +### 8.3 The consistency gate + +`tests/test_stream_registry.py` is the property-test gate. It asserts: + +- Stream names and subject filters are unique. +- Every subject filter matches `^central\.[a-z][a-z_]*\.>$`. +- `CENTRAL_META` is the only non-event-bearing stream. +- The supervisor / archive / GUI derivations all match the registry. + +Mirror that discipline in any new test you write for adapter-side stream +behavior: derive expected values from `STREAMS` rather than hand-typing them. + +--- + +## 9. Removal / fall-off patterns + +When an upstream record disappears, an adapter has two options: + +### 9.1 Explicit removal subject + +Emit an `Event` whose subject is `.removed.` and whose +`category` ends in `.removed`. This is the right choice when: + +- Upstream signals retirement explicitly (`gdacs`'s `iscurrent: false`). +- The upstream is a "current state" feed and absence is meaningful + (`wfigs_incidents`, `wfigs_perimeters`, `eonet`). + +The subtype-before-removed token order is a hard convention — consumers +filtering on `central...>` receive both live and removal +events with a single subscription. See +[`CONSUMER-INTEGRATION.md` §7a](./CONSUMER-INTEGRATION.md#7a-explicit-removal-subjects-consumer-must-subscribe-to-handle-cleanly) +for the consumer side. + +### 9.2 Absence-as-signal + +No removal subject; consumers infer disappearance from a time gap or an +upstream-specific marker (`expires` field, supersession text). This is the +right choice when: + +- The upstream is a stream of point-in-time observations (`firms` hotspot + pixels, `swpc_kindex` 3-hour bins, `swpc_protons` cadence-driven samples). +- Upstream signals expiry inline (`nws` `expires` field, `nws` + `messageType: Cancel`). +- The upstream is durable / append-only (`usgs_quake` catalog, `inciweb` RSS). + +See [`CONSUMER-INTEGRATION.md` §7b](./CONSUMER-INTEGRATION.md#7b-absence-as-signal-no-removal-subject--consumer-infers-from-time-gap) +for the consumer-side enumeration. + +### 9.3 Composite removal dedup key + +When an adapter emits explicit removals, the same upstream id can fall off +and re-appear multiple times over an adapter's lifetime. The dedup key for a +removal event therefore needs a removal-detection timestamp: + +```python +Event( + id=f"{irwin_id}:removed:{now.isoformat()}", + category="fire.incident.removed", + # ... +) +``` + +The `:removed:` suffix is the cross-adapter convention. Keep it. + +### 9.4 Removal payload schema + +Removal events ship an empty `geo` block and a sparse `data` payload. The +exact shape (id field name, reason values, last_observed_at, adapter-specific +context fields) is documented in +[`CONSUMER-INTEGRATION.md` §7c](./CONSUMER-INTEGRATION.md#7c-removal-event-payload-schema-cross-cutting); +adapter authors should mirror it. Do not restate it here. + +--- + +## 10. Anti-patterns — what NOT to do + +These are the patterns prior reviews have explicitly rejected. Reject them +again on sight in a new-adapter PR. + +### 10.1 Enrichment during `poll()` + +No calls to upstream metadata endpoints, no reverse-geocode, no consultation +of a second source to fill in fields the primary feed omitted. The "NWIS +enrichment" Phase 3 proposal — joining live measurements against the +monitoring-locations metadata endpoint during `poll()` — was rejected on the +[§2](#2-the-design-principle) principle. Future proposals along the same +lines get the same answer. + +If enrichment is genuinely necessary, the right shape is a separate adapter +(or a downstream consumer) — not an `if metadata_missing: await +fetch_metadata()` branch buried in an adapter's `poll()`. + +### 10.2 Silent field dropping + +`Event.data` carries the upstream payload verbatim. Do not omit a field +because it "looks redundant" or "isn't useful right now." Operators +debugging a downstream consumer often need every field the upstream sent; +dropping fields makes that impossible after the fact. + +The two acceptable transforms — token normalization (case, separators) and +dedup-key construction — are not field-drops. They produce new fields +derived from the upstream payload; the upstream payload itself is preserved. + +### 10.3 Magic-number asserts in tests + +Adapter tests that pin event counts to literal integers (`assert +len(events) == 47`) break the next time the upstream's fixture is +refreshed and contribute zero diagnostic value. Either assert structural +invariants (every yielded event has a non-empty `id`; subjects match the +adapter's `subject_for()` output; etc.) or assert against a count computed +from the input fixture (`assert len(events) == len(fixture.features)`). + +### 10.4 Hardcoded stream / adapter lists in tests + +Tests that enumerate `["CENTRAL_WX", "CENTRAL_FIRE", ...]` or `["firms", +"inciweb", ...]` as literals drift the moment a stream or adapter is added +or removed. Derive from the SoT: + +```python +from central.streams import STREAMS +from central.adapter_discovery import discover_adapters + +stream_names = {s.name for s in STREAMS} +adapter_names = set(discover_adapters().keys()) +``` + +`tests/test_stream_registry.py` and `tests/test_consumer_doc.py` are the +in-tree templates. The sibling test for this doc +(`tests/test_producer_doc.py`) follows the same rule. + +### 10.5 Persistent state outside `cursor_db_path` + +Adapters MUST NOT write to disk outside the cursor DB at +`cursor_db_path`. No JSON files in `/var/lib/central//`, no +state stashed in `/tmp`. Multi-process invariants (the supervisor may +restart an adapter; the GUI may instantiate it independently to call +`preview_for_settings()`) only hold for the cursor DB and `config.adapters`. + +### 10.6 Blocking calls in async paths + +`poll()`, `apply_config()`, `startup()`, `shutdown()`, and +`preview_for_settings()` are async. Use `aiohttp`, not `requests`; use +`asyncio.to_thread()` to push CPU-bound or genuinely blocking work +(`shapely` geometry operations, large JSON parsing) off the event loop. +The supervisor runs every adapter on a single event loop — one blocked +adapter stalls every other adapter. + +### 10.7 Reaching into framework internals from `preview_for_settings` + +`preview_for_settings()` is a pure function of `settings`. Do NOT access +`self._config_store`, `self._cursor_db_path`, or any state established +in `startup()`. The framework may instantiate the adapter with a stub +config store solely to call this method. See [§11](#11-settings-preview-hook). + +### 10.8 Hardcoded subject strings in tests + +A test that asserts `subject == "central.fire.incident.mt.flathead"` for a +specific fixture is fine; a test that hardcodes the subject pattern as a +string and re-derives it instead of calling the adapter's +`subject_for(event)` is the wrong shape. Round-trip through the adapter so +the test breaks the day someone shifts a token. + +--- + +## 11. Settings preview hook + +`preview_for_settings()` is the optional GUI-side hook that surfaces a +settings-driven preview on the adapter edit page. NWIS uses it to show the +monitoring-locations inside the configured bbox before the operator hits +save; most adapters opt out by inheriting the base class's `return None`. + +### 11.1 The contract (verbatim from `central.adapter.SourceAdapter`) + +> Optional. Override to surface a settings-driven preview on the edit page. +> +> Return `list[dict]` (framework renders as a generic table; columns come from +> the first dict's keys, in insertion order). Return `None` to skip preview. +> Raise to surface an error banner — framework catches at the route boundary. +> +> Contract: +> - Preview is a pure function of `settings`. Do NOT access +> `self._config_store` or `cursor_db` state — the framework may instantiate +> adapters with a stub config_store solely to call this method. +> - Network preview implementations must open their own short-lived +> `aiohttp` session (the adapter's polling session may not exist; the GUI +> process never calls `startup()`). +> - Return `None` when preview is not meaningful (e.g., required settings +> like region are unset). Return `[]` explicitly if the query ran and +> matched zero rows — the framework renders that distinctly from `None`. + +### 11.2 The three return states + +| Return | Framework renders | +|---|---| +| `None` | Nothing — no preview block on the edit page | +| `[]` | `"Preview (0 rows)"` legend, no table | +| `list[dict]` with rows | Generic table; columns from the first dict's keys in insertion order; legend `"Preview (N rows)"` | + +`tests/test_preview_hook.py` is the test surface for the partial; mirror its +discipline for any adapter-specific preview test. + +### 11.3 Generic rendering — no per-adapter Jinja + +The framework is duck-typed on `list[dict]`. Column headers are the first +dict's keys. There is no per-adapter branch in the template, and no +adapter-name conditionals are accepted on review. If an adapter's preview +needs a column the framework does not surface, the adapter rewrites the dict +shape, not the template. + +### 11.4 Implementation pattern + +NWIS is the canonical example. Skeleton: + +```python +async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None: + if settings.region is None: + return None + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=15), + ) as session: + async with session.get(url, headers=...) as resp: + resp.raise_for_status() + page = await resp.json() + return [ + {"site_id": ..., "name": ..., "site_type": ..., "state": ...} + for feat in page.get("features") or [] + ] +``` + +Key points: + +- Fresh `aiohttp.ClientSession` per call — `self._session` may not exist. +- Short timeout (NWIS uses 15s) — the edit page must render quickly. +- Small row cap (NWIS uses 50) — the preview is for orientation, not bulk + inspection. +- Raise on HTTP / parse failure — the framework catches at the route boundary + and renders an error banner. + +--- + +## 12. Acceptance gate for a new adapter + +A new-adapter PR is accepted when every item below holds. The list is +mechanical; reviewers and the PR author should both run through it before +requesting / granting merge. + +- [ ] **Upstream curl evidence captured.** A real HTTP transcript (curl + command + redacted response body) is pinned somewhere in the PR + description or a fixture file. Documents what the adapter was written + against. +- [ ] **`SourceAdapter` subclass conforms to [§4](#4-the-sourceadapter-base-class).** + Class attributes set, three abstract methods implemented, constructor + signature matches the supervisor call exactly. +- [ ] **`tests/test_.py` exists.** Unit tests for the adapter; no + magic-number asserts ([§10.3](#103-magic-number-asserts-in-tests)); no + hardcoded stream / adapter lists + ([§10.4](#104-hardcoded-stream--adapter-lists-in-tests)). +- [ ] **Dry-run on CT104 passes.** The supervisor instantiates the adapter + without errors; one full `poll()` cycle yields events without raising. +- [ ] **Dedup keys stable across multiple ticks.** Two successive `poll()` + runs against a frozen upstream fixture yield the same event-count delta + as expected (zero new events on the second run if upstream is + unchanged). +- [ ] **No hardcoded values that should be derived.** Subject prefixes, + stream names, default parameter lists, etc., all derive from the + registry / settings model / discovery surface. +- [ ] **`CONSUMER-INTEGRATION.md` updated.** New per-adapter subsection in + §6, new subject row in §4, new stream row in §3 if a new domain was + introduced. `tests/test_consumer_doc.py` passes. +- [ ] **`PRODUCER-INTEGRATION.md` mentions the adapter only if it + introduces a new convention.** This doc is contract-shaped, not + catalog-shaped — most new adapters need no edit here. +- [ ] **Full pytest suite green.** + +--- diff --git a/tests/test_producer_doc.py b/tests/test_producer_doc.py new file mode 100644 index 0000000..7cf2767 --- /dev/null +++ b/tests/test_producer_doc.py @@ -0,0 +1,215 @@ +"""Consistency tests for docs/PRODUCER-INTEGRATION.md. + +The doc is the producer-side contract — what an adapter author implements and +the conventions Central enforces around it. These tests catch drift between +the doc and the live code: + + - Every overridable SourceAdapter method documented in §4 must exist on + central.adapter.SourceAdapter — and vice versa. + - The preview_for_settings contract quoted in §11.1 must come from the + actual SourceAdapter.preview_for_settings docstring. + - The set of top-level domain tokens documented in §6.1 must equal the set + derived from central.streams.STREAMS subject_filter prefixes. + - The verbatim STREAMS snippet quoted in §8 must match the live registry. + +Per the doc's own §10.4, NO hardcoded stream / adapter list literals: every +expected value derives from central.streams, central.adapter, or +central.adapter_discovery at runtime. +""" + +import inspect +import re +from pathlib import Path + +from central.adapter import SourceAdapter +from central.adapter_discovery import discover_adapters +from central.streams import STREAMS + +DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md" + + +def _doc_text() -> str: + assert DOC_PATH.is_file(), f"missing: {DOC_PATH}" + return DOC_PATH.read_text() + + +def _documented_override_methods(doc: str) -> set[str]: + """Extract method names documented under '## 4. The SourceAdapter base class'. + + Looks for the '**`async def (...)`**' / '**`def (...)`**' + method headings inside §4. + """ + section_re = re.compile( + r"^## 4\. The SourceAdapter base class\s*\n(.*?)(?=^## )", + re.DOTALL | re.MULTILINE, + ) + m = section_re.search(doc) + assert m, "doc missing '## 4. The SourceAdapter base class' section" + section = m.group(1) + heading_re = re.compile(r"\*\*`(?:async\s+)?def\s+(\w+)\s*\(", re.MULTILINE) + return set(heading_re.findall(section)) + + +def _sourceadapter_overridable_methods() -> set[str]: + """Methods on SourceAdapter that an adapter author is expected to implement + or may override. Excludes Python internals (dunder), the constructor, and + private helpers. + """ + methods: set[str] = set() + for name, member in inspect.getmembers(SourceAdapter): + if name.startswith("_"): + continue + if not (inspect.isfunction(member) or inspect.iscoroutinefunction(member)): + continue + methods.add(name) + return methods + + +def _streams_domains() -> set[str]: + """Top-level tokens derived from STREAMS subject filters + (central..>). + """ + domain_re = re.compile(r"^central\.([a-z_]+)\.>$") + out: set[str] = set() + for s in STREAMS: + m = domain_re.match(s.subject_filter) + assert m, f"unexpected subject filter shape: {s.subject_filter!r}" + out.add(m.group(1)) + return out + + +def _documented_domains(doc: str) -> set[str]: + """Domain tokens enumerated in §6.1 as backtick literals (`wx`, `fire`, …).""" + section_re = re.compile( + r"`` is one of ([^.]+)\.", + re.DOTALL, + ) + m = section_re.search(doc) + assert m, "doc missing the '`` is one of ...' enumeration in §6.1" + enum_text = m.group(1) + return set(re.findall(r"`([a-z_]+)`", enum_text)) + + +def test_doc_exists(): + assert DOC_PATH.is_file(), f"doc missing: {DOC_PATH}" + + +def test_documented_methods_match_sourceadapter_api(): + """Every override-able SourceAdapter method must appear in the §4 contract, + and the doc may not advertise methods that don't exist.""" + doc_methods = _documented_override_methods(_doc_text()) + code_methods = _sourceadapter_overridable_methods() + assert doc_methods == code_methods, ( + f"override-method drift: " + f"doc-only={doc_methods - code_methods}, " + f"code-only={code_methods - doc_methods}" + ) + + +def test_preview_hook_contract_matches_docstring(): + """The contract block quoted in §11.1 must come from the live + SourceAdapter.preview_for_settings docstring. + + Normalizes both sides by collapsing whitespace and stripping the doc's + Markdown blockquote prefix (`> `). + """ + doc = _doc_text() + section_re = re.compile( + r"^### 11\.1[^\n]*\n(.*?)(?=^### |^## )", + re.DOTALL | re.MULTILINE, + ) + m = section_re.search(doc) + assert m, "doc missing '### 11.1' subsection" + blockquote = "\n".join( + line[2:] if line.startswith("> ") else line.lstrip(">").lstrip() + for line in m.group(1).splitlines() + if line.lstrip().startswith(">") + ) + docstring = inspect.getdoc(SourceAdapter.preview_for_settings) or "" + + def norm(s: str) -> str: + # Strip markdown backticks; collapse whitespace. + return re.sub(r"\s+", " ", s.replace("`", "")).strip() + + norm_block = norm(blockquote) + norm_doc = norm(docstring) + # Bidirectional: every non-empty sentence of the docstring must appear in + # the doc's blockquote, and the blockquote must not introduce new sentences + # the docstring lacks. + sentences = lambda s: {x.strip() for x in re.split(r"(?<=[.:])\s+", s) if x.strip()} + doc_sents = sentences(norm_block) + code_sents = sentences(norm_doc) + assert doc_sents == code_sents, ( + f"preview_for_settings contract drift: " + f"doc-only={doc_sents - code_sents}, " + f"code-only={code_sents - doc_sents}" + ) + + +def test_top_level_domains_match_streams_registry(): + """The §6.1 domain enumeration must equal the domain tokens derived from + central.streams.STREAMS — bidirectional, no hardcoded list.""" + doc_domains = _documented_domains(_doc_text()) + code_domains = _streams_domains() + assert doc_domains == code_domains, ( + f"domain-token drift: " + f"doc-only={doc_domains - code_domains}, " + f"code-only={code_domains - doc_domains}" + ) + + +def test_streams_snippet_quotes_live_registry(): + """The §8 verbatim STREAMS snippet must agree with central.streams.STREAMS + on (name, subject_filter, event_bearing). + """ + doc = _doc_text() + section_re = re.compile( + r"^## 8\. The StreamEntry registry\s*\n(.*?)(?=^## )", + re.DOTALL | re.MULTILINE, + ) + m = section_re.search(doc) + assert m, "doc missing '## 8. The StreamEntry registry' section" + section = m.group(1) + # Each documented entry: StreamEntry("NAME", "central.x.>"[, event_bearing=False]) + entry_re = re.compile( + r'StreamEntry\(\s*"([A-Z_]+)"\s*,\s*"(central\.[a-z_]+\.>)"' + r'(?:\s*,\s*event_bearing\s*=\s*(False|True))?\s*\)', + ) + doc_rows: set[tuple[str, str, bool]] = set() + for name, subj, eb in entry_re.findall(section): + event_bearing = (eb != "False") # default True if unspecified + doc_rows.add((name, subj, event_bearing)) + code_rows = {(s.name, s.subject_filter, s.event_bearing) for s in STREAMS} + assert doc_rows == code_rows, ( + f"STREAMS snippet drift: " + f"doc-only={doc_rows - code_rows}, code-only={code_rows - doc_rows}" + ) + + +def test_no_orphan_adapter_references_in_anti_patterns(): + """Anti-patterns section names two real adapter modules as examples + (firms, inciweb in §10.4). Those names must still resolve via + central.adapter_discovery — protects against a silent rename leaving + dead example references in the doc. + """ + doc = _doc_text() + section_re = re.compile( + r"^## 10\. Anti-patterns.*?\n(.*?)(?=^## )", + re.DOTALL | re.MULTILINE, + ) + m = section_re.search(doc) + assert m, "doc missing '## 10. Anti-patterns' section" + section = m.group(1) + quoted = set(re.findall(r'"([a-z][a-z_]*)"', section)) + # Whitelist Python-syntax tokens that incidentally appear in the section; + # everything else in this set is asserted to be a real adapter name. + syntax_tokens = { + "CENTRAL_WX", "CENTRAL_FIRE", # stream names quoted as examples + } + candidate_adapter_names = quoted - {t.lower() for t in syntax_tokens} + known_adapters = set(discover_adapters().keys()) + orphans = {n for n in candidate_adapter_names if n not in known_adapters} + assert not orphans, ( + f"anti-patterns section references unknown adapter names: {orphans} " + f"(known adapters: {sorted(known_adapters)})" + ) From d8024f6f4fd1a42063031ca0fe3a4119050a73c4 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Tue, 19 May 2026 21:48:44 +0000 Subject: [PATCH 2/2] =?UTF-8?q?tests(2-I):=20derive=20syntax=5Ftokens=20wh?= =?UTF-8?q?itelist=20from=20STREAMS=20per=20=C2=A710.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_producer_doc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_producer_doc.py b/tests/test_producer_doc.py index 7cf2767..aef32e7 100644 --- a/tests/test_producer_doc.py +++ b/tests/test_producer_doc.py @@ -203,9 +203,9 @@ def test_no_orphan_adapter_references_in_anti_patterns(): quoted = set(re.findall(r'"([a-z][a-z_]*)"', section)) # Whitelist Python-syntax tokens that incidentally appear in the section; # everything else in this set is asserted to be a real adapter name. - syntax_tokens = { - "CENTRAL_WX", "CENTRAL_FIRE", # stream names quoted as examples - } + # Derived from STREAMS per §10.4 — stream names appear quoted as examples + # and would otherwise look like orphan adapter references. + syntax_tokens = {s.name for s in STREAMS} candidate_adapter_names = quoted - {t.lower() for t in syntax_tokens} known_adapters = set(discover_adapters().keys()) orphans = {n for n in candidate_adapter_names if n not in known_adapters}