Compare commits

...

11 commits

Author SHA1 Message Date
48bcb33096
Merge pull request #39 from zvx-echo6/feature/2-i-producer-docs
docs(2-I): producer integration spec — docs/PRODUCER-INTEGRATION.md
2026-05-19 15:53:17 -06:00
Matt Johnson
d8024f6f4f tests(2-I): derive syntax_tokens whitelist from STREAMS per §10.4
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 21:48:44 +00:00
Matt Johnson
6afe80ded3 docs(2-I): producer integration spec — docs/PRODUCER-INTEGRATION.md
The producer-side contract for adapter authors, mirroring PR H's consumer
spec. Self-contained — readers should not need to grep the codebase to
understand what a new SourceAdapter subclass must implement.

Bakes in the Phase 2 design principle ("Central takes it all and gives it
all. It's up to the pipe to do with it what it will.") so future authors
reject enrichment / silent-drop / opinionated-translation proposals on
sight. The previously-proposed Phase 3 NWIS metadata-enrichment ticket is
called out by name as an example of what gets rejected.

12-section outline locked with PM: design principle, quick start (clone
swpc_kindex), SourceAdapter base class, settings, subject namespace,
dedup keys, StreamEntry registry, removal/fall-off, anti-patterns,
preview hook, acceptance gate.

Sibling test (tests/test_producer_doc.py) mirrors test_consumer_doc.py
discipline:
  - bidirectional == between SourceAdapter API and §4 method coverage
  - preview_for_settings contract verbatim against live docstring
  - top-level domain enumeration vs central.streams.STREAMS prefixes
  - §8 STREAMS snippet vs central.streams.STREAMS
  - anti-patterns adapter-name examples vs discover_adapters()

No hardcoded stream / adapter / domain lists anywhere in the test —
every expected value derives from central.streams,
central.adapter_discovery, or central.adapter at runtime.

Honest about the pre-existing `:` vs `|` dedup-key separator
inconsistency (swpc_alerts and swpc_protons use `|`; everyone else
uses `:`). Recommends `:` for new adapters without forcing a rename PR
on the SWPC pair (separators are persisted in cursors.db rows).

Acceptance bars:
  (a) grep -rn 'subject_for_event\|_ADAPTER_REGISTRY' src tests → empty
  (b) bidirectional override-method coverage asserted in test
  (c) tests/test_producer_doc.py → 6/6 pass
  (d) full pytest suite → 469 pass (was 463 pre-PR; +6 new)
  (e) doc length: 823 lines (within 500–1200 envelope)
  (f) code fences balanced; JSON/Python blocks parse

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 21:17:48 +00:00
d92074b134
docs(2-H): consumer integration spec — docs/CONSUMER-INTEGRATION.md (#38)
Adds the consumer contract for Central's NATS event streams. Primary reader:
a Claude Code instance building MeshAI's ingestion layer. The doc IS the spec --
no "see source for details".

Opens with Matt's framing: "Central takes it all and gives it all. It's up to
the pipe to do with it what it will." Central is a faithful firehose --
adapters preserve every upstream field with no enrichment / formatting /
opinionated translation. The CloudEvents envelope adds routing + dedup support;
everything else is upstream-shaped. Where the doc lists upstream lookup
endpoints for ID-only fields, that is consumer-side convenience -- explicitly
NOT a recommendation that Central enrich.

Sections (11 total):
  1. Quick start (5-line nats-py subscribe-and-print)
  2. Connection details (URL / auth / JetStream context / stream discovery)
  3. Stream layout (7 streams, derived from streams.py registry)
  4. Subject namespace registry (Mermaid tree + full pattern table)
  5. Wire format (5a CloudEvents envelope; 5b inner Event payload)
     -- explicit callout that geo.centroid is [lon, lat] GeoJSON, NOT [lat, lon]
  6. Per-adapter reference (12 subsections, locked template)
  7. Fall-off / removal semantics (explicit subjects vs absence-as-signal)
  8. Consumer patterns (durable vs ephemeral, ack/nack/term, worked example)
  9. Dedup implementation guide (single-token vs composite-key adapters)
  10. Writing a new consumer checklist
  11. Troubleshooting

Doc length: 1878 lines (target was 600-1000 originally; revised to 1200-1800
once full-fidelity JSON examples + inciweb 3x narratives + wfigs_perimeters
polygon were folded in). Completeness wins per the design principle.

Every JSON example is verbatim from CT104. 11 examples sourced from
/tmp/nwis-build/evidence.txt (dumped via psql jsonb_pretty); the wfigs_perimeters
example is a freshly pulled smallest-active-polygon record so the doc captures
the live polygon shape without flooding the page with thousands of coordinate
pairs.

The doc is assembled by /tmp/nwis-build/build_doc.py which splices live JSON
blocks into a markdown template. The build script is local-only (not committed)
because the doc itself is the artifact; future updates regenerate by re-pulling
live evidence and re-running the assembler.

New test: tests/test_consumer_doc.py (5 tests). Parses the doc and asserts:
  - The "Stream layout" table matches central.streams.STREAMS exactly
    (stream names + subject filters).
  - The (name, subject_filter) pairs match the registry as pairs (catches
    swapped subject filters on existing streams).
  - Every adapter discovered via central.adapter_discovery.discover_adapters()
    has a per-adapter subsection -- and vice versa.
  - The subsection count equals the registry size (catches duplicates).

Verification:
  - 463/463 full suite green (was 458; +5 new consumer_doc tests).
  - Doc structure: 1 H1, 12 H2, 33 H3, 12 per-adapter sections, 1 mermaid block,
    12 JSON blocks (all parse).
  - All 12 adapters covered.
  - No regressions elsewhere.

Acceptance bars (a)-(e) verbatim:
  (a) grep "subject_for_event|_ADAPTER_REGISTRY" -> empty
  (b) all 12 adapters have per-adapter subsections
  (c) 5/5 consumer-doc tests pass
  (d) 463/463 full suite
  (e) doc length 1878 lines

markdownlint was not available on CT104; substituted an inline Python sanity
check confirming code-fence balance, JSON-block validity, and structural
integrity (12 H2 / 33 H3 / 1 mermaid).

Co-authored-by: zvx <zvx@central>
2026-05-19 14:33:51 -06:00
93b412fa22
Merge feature/2-g5-preview-hook (PR G.5: preview_for_settings framework hook)
feat(2-G.5): preview_for_settings framework hook + NWIS opt-in
2026-05-19 12:00:52 -06:00
zvx
570b121276 fix(2-G.5): preview_for_settings contract in adapter docstring + distinguish [] from None
Fixup 1 — Contract section appended to SourceAdapter.preview_for_settings's
docstring. Override authors read adapter.py, not routes.py, so the contract
(pure function of settings; open your own short-lived aiohttp session; None
vs [] semantics) belongs on the base method, not on the GUI stub class.

Fixup 2 — _adapter_preview.html distinguishes [] from None. Previously the
elif test was truthiness (`elif preview_rows`) which collapsed both into
"render nothing". Now uses `elif preview_rows is not none` and special-cases
the empty-list case inside: legend "Preview (0 rows)" with no table; None
still renders nothing at all. Lets adapters signal "query ran, matched zero"
distinctly from "preview not meaningful".

Tests +1:
- test_partial_renders_empty_list — [] yields "Preview (0 rows)" legend,
  no table, no headers. Distinct from the existing None case.

Acceptance:
- 27/27 targeted (preview_hook +1 new, nwis, stream_registry).
- 458/458 full suite.
- (b) framework GUI dir still has zero adapter-name branches.
2026-05-19 17:55:39 +00:00
zvx
ead6ef8ce1 feat(2-G.5): preview_for_settings framework hook + NWIS opt-in
Adds an optional async hook on SourceAdapter so any adapter can surface a
settings-driven preview on its /adapters/<name> edit page. The framework
renders the result generically as a table — no adapter-name branches in
GUI templates or route code.

Framework changes:
- src/central/adapter.py: new async preview_for_settings(self, settings)
  on the base class, default returns None. Adapters opt in by overriding;
  non-overriding adapters render unchanged.
- src/central/gui/routes.py: GET /adapters/{name} instantiates the adapter
  with a no-op _PreviewConfigStore stub and a /dev/null cursor path (GUI
  has no live ConfigStore), constructs settings_obj via the schema, and
  calls preview_for_settings inside a try/except. Result lands in template
  context as preview_rows / preview_error.
- src/central/gui/templates/_adapter_preview.html: new partial. Generic
  table with columns derived from the first dict's keys; error banner
  mirrors the existing last_error article style.
- src/central/gui/templates/adapters_edit.html: one-line include between
  the Region fieldset and Save/Cancel.

NWIS opt-in:
- New NWIS_MONITORING_LOCATIONS_URL constant and _PREVIEW_LIMIT cap of 50.
- preview_for_settings returns None when region is None, otherwise one-shot
  fetches monitoring-locations within the bbox via a fresh aiohttp session.
  Must work even when adapter is not started -- the GUI process never calls
  startup(). Returns list[dict] with the contract column order: site_id,
  name, site_type, state. Errors propagate so the framework can render the
  operator-visible banner.
- HTTP call factored into _fetch_preview_text so tests mock cleanly.

Tests (7 new):
- tests/test_preview_hook.py: default returns None; partial renders list
  with correct headers/rows/count; partial renders error banner; partial
  renders empty when both context values are None.
- tests/test_nwis.py adds TestNWISPreview: returns None without region,
  returns rows with correct column order, propagates HTTP errors.

Verification:
- 457/457 full suite green (was 450; +7 new tests).
- Live /adapters/nwis preview returns 50 rows with the contract keys
  against the current production Iowa bbox.
- /adapters/eonet preview_for_settings returns None via base default --
  proves framework is duck-typed, no NWIS-specific code in framework.
2026-05-19 17:34:35 +00:00
1f0e2a091e
Merge feature/2-g-nwis (PR G: USGS NWIS + CENTRAL_HYDRO)
feat(2-G): USGS NWIS adapter (OGC API) + CENTRAL_HYDRO stream
2026-05-19 10:56:43 -06:00
zvx
5d64a8f70d feat(2-G): USGS NWIS adapter (OGC API) + CENTRAL_HYDRO stream
NASA WaterData OGC API v0 (latest-continuous collection) — polls configured
parameter codes within an operator-set bbox and publishes on the new
CENTRAL_HYDRO stream.

- Subject: central.hydro.<parameter_code>.<agency>.<bare_site_no>
  (e.g. central.hydro.00060.usgs.05420500). The agency/site decomposition
  lives in a single _subject_tokens_for_id helper.
- Default parameter codes: 00060 (discharge), 00065 (gage height),
  00010 (water temperature). Operator-tunable; single SoT in
  _DEFAULT_PARAMETER_CODES — no parallel literals.
- Composite dedup: nwis:<monitoring_location_id>:<param>:<time_iso>.
  Prefix kept in dedup key for cross-agency uniqueness.
- Pagination: follows OGC 'rel=next' link until absent (cursor-based).
- Region bbox is REQUIRED in practice; adapter logs WARN at startup if
  region is None (does not refuse to start).
- New stream CENTRAL_HYDRO added to streams.py registry (one line).
  Retention mirrors CENTRAL_DISASTER (7 days, 1 GiB).
- No removal pattern in v1 — sites are static; missing data is the signal.

Upstream divergences from the original spec brief, caught by pre-build curl:
- Collection is 'latest-continuous', not 'instantaneous-values'.
- Site filter param is 'monitoring_location_id' (singular), not
  'monitoring_locations_id' (plural).
- Site identifier requires agency prefix in queries (USGS-NNNNN).
- feature.id is a per-record UUID, not stable; dedup uses joint key.

Ships disabled; operator enables via GUI after setting a bbox.
2026-05-19 16:50:21 +00:00
befdf7a38c
Merge feature/2-f-eonet (PR F: NASA EONET)
feat(2-F): NASA EONET disaster adapter
2026-05-19 09:54:23 -06:00
zvx
0b26bf902a feat(2-F): NASA EONET disaster adapter
Adds the NASA Earth Observatory Natural Event Tracker (EONET v3) adapter,
publishing on the existing CENTRAL_DISASTER stream under
central.disaster.eonet.<category>.global subjects.

- One Central event per EONET event id; geo = most-recent geometry point.
- Composite dedup key (eonet:<id>:<latest_geometry_date_iso>) — timeline
  advance re-publishes, idle re-poll suppresses.
- category_allowlist defaults to all 13 upstream categories; operator opts
  OUT per-category if GDACS overlap (wildfires/floods/severeStorms/volcanoes)
  produces unwanted dupes on gdacs.* subjects.
- camelCase upstream IDs (seaLakeIce, dustHaze, etc.) mapped to
  lower_snake_case subject components by a single _subject_category helper.
- Country resolves to literal 'global' (no reverse-geocode in v1).
- Fall-off: missing-from-feed event emits central.disaster.eonet.<cat>.removed.global,
  subtype before 'removed' per §8 canonical pattern.

Adapter ships disabled; operator enables via GUI.
2026-05-19 15:35:25 +00:00
18 changed files with 4986 additions and 1 deletions

1878
docs/CONSUMER-INTEGRATION.md Normal file

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,823 @@
# Central — Producer Integration
How to add a new `SourceAdapter` subclass to Central. This is the contract for
adapter authors: what to implement, what NOT to implement, and how a new feed
plugs into the existing stream / subject / dedup machinery without restating
anything from `docs/CONSUMER-INTEGRATION.md`.
The companion document is [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md) — the contract
for downstream consumers. Producer-side semantics are in scope here; consumer-side
semantics (wire format, dedup recommendations, fall-off handling at the subscriber)
are intentionally not restated. Cross-references point into that doc.
## Table of contents
1. [What this doc is for](#1-what-this-doc-is-for)
2. [The design principle](#2-the-design-principle)
3. [Quick start: cloning an existing adapter](#3-quick-start-cloning-an-existing-adapter)
4. [The SourceAdapter base class](#4-the-sourceadapter-base-class)
5. [Settings and configuration](#5-settings-and-configuration)
6. [Subject namespace conventions](#6-subject-namespace-conventions)
7. [Dedup key construction](#7-dedup-key-construction)
8. [The StreamEntry registry](#8-the-streamentry-registry)
9. [Removal / fall-off patterns](#9-removal--fall-off-patterns)
10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do)
11. [Settings preview hook](#11-settings-preview-hook)
12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter)
---
## 1. What this doc is for
**Audience:** an engineer (or a code-gen agent acting as one) writing a new
`SourceAdapter` subclass in `src/central/adapters/`.
**Covered:** the SourceAdapter contract, the StreamEntry registry, subject
namespace conventions, dedup-key construction, the settings preview hook, the
mechanical checklist a new-adapter PR must satisfy.
**NOT covered:** end-to-end walkthroughs of one specific adapter (read the source
for the closest existing adapter — `src/central/adapters/swpc_kindex.py` is the
smallest), upstream feed API documentation (lives upstream), or consumer-side
concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)).
---
## 2. The design principle
> "Central takes it all and gives it all. It's up to the pipe to do with it
> what it will."
> — Matt, 2026-05-19
Adapter authors translate that single sentence into a small number of concrete
rules:
- **Preserve every upstream field.** Anything the upstream returns lives in
`Event.data` verbatim. Adapters do not silently drop fields, even ones that
look redundant or low-value today.
- **No enrichment.** Adapters do not reverse-geocode, do not call out to
upstream metadata endpoints during normal `poll()` flow, do not consult a
second source to "fill in" a missing field. If a downstream consumer wants
enrichment, that is consumer-side work.
- **No opinionated translation.** Adapters do not coerce units, do not rename
fields to match a Central-wide vocabulary, do not collapse upstream
enumerations into Central's preferred labels.
- **The only adapter-side transforms are mechanical.** Specifically:
subject-token normalization (camelCase → snake_case, agency-prefix splitting,
whitespace → underscore, lowercase) and dedup-key construction. Both are
deterministic functions of upstream identifiers. Nothing else.
This rules out a whole category of plausible-sounding work that prior reviews
have already rejected. For instance, "enrich NWIS site rows with USGS
monitoring-locations metadata during `poll()`" was proposed for Phase 3 and
killed on this principle. The producer adds the field-preserving pipe; the pipe
ends at JetStream publish; everything richer is a downstream concern.
See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of
anti-patterns. Future authors should reject the same proposals on the same
grounds.
---
## 3. Quick start: cloning an existing adapter
The cheapest way to start a new adapter is to copy the smallest existing one
and edit. As of this writing the smallest is `swpc_kindex` (one fixed subject,
no region filter, no preview hook).
**File layout for a one-file adapter:**
```
src/central/adapters/<your_adapter>.py # the SourceAdapter subclass
tests/test_<your_adapter>.py # unit tests
```
When the upstream API needs shared utilities (e.g. WFIGS shares helpers across
`wfigs_incidents` and `wfigs_perimeters`), factor them into a sibling
`<family>_common.py` module — see `wfigs_common.py` and `swpc_common.py` for
the existing precedent.
**Minimum overrides:**
```python
from collections.abc import AsyncIterator
from pathlib import Path
import aiohttp
from pydantic import BaseModel
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
class ExampleSettings(BaseModel):
# Adapter-tunable settings live here. Pydantic v2 model. Forbid extras
# if you want strictness; otherwise extra-field silence is the default.
pass
class ExampleAdapter(SourceAdapter):
name = "example"
display_name = "Example Source"
description = "One-line operator-facing description."
settings_schema = ExampleSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
async def apply_config(self, new_config: AdapterConfig) -> None:
# Re-derive any cached settings from new_config.settings.
pass
def subject_for(self, event: Event) -> str:
return f"central.<domain>.example"
async def poll(self) -> AsyncIterator[Event]:
# Fetch, parse, yield Events. Dedup using a sqlite3 cursor_db
# connection initialized in startup() — see swpc_kindex for the
# established `published_ids` table shape.
if False:
yield # type: ignore[unreachable]
return
```
Auto-discovery picks the adapter up the moment the module imports cleanly. No
registry edit is required — `central.adapter_discovery.discover_adapters()`
walks `central.adapters.*` at import time and registers every concrete
`SourceAdapter` subclass keyed by its `name` class-attribute.
---
## 4. The SourceAdapter base class
`src/central/adapter.py` is the SoT. The contract is reproduced here verbatim
where the docstrings already capture it; this section adds the supervisor-side
context that the source file does not narrate.
### 4.1 Class attributes
Every subclass must define:
| Attribute | Type | Purpose |
|---|---|---|
| `name` | `str` | Short identifier, e.g. `"nws"`. Becomes the `adapter` key in `Event.adapter` and the discovery-registry key. Must be unique across `central.adapters.*`. |
| `display_name` | `str` | Human-readable name shown in the GUI. |
| `description` | `str` | Short description shown on the adapter detail page. |
| `settings_schema` | `type[BaseModel]` | Pydantic v2 model class for adapter settings. See [§5](#5-settings-and-configuration). |
| `default_cadence_s` | `int` | Default poll interval in seconds. Operators may override via `config.adapters`; `AdapterConfig.cadence_s` has a Pydantic `ge=10` floor. |
Optional class attributes (default to `None` / not-in-wizard):
| Attribute | Type | Purpose |
|---|---|---|
| `requires_api_key` | `str \| None` | Key alias if an API key is required, else `None`. |
| `api_key_field` | `str \| None` | Names the `settings_schema` field that holds the api_key alias reference. The GUI renders this as a select populated from `config.api_keys`; the wizard validates it against the staged `api_keys` state. |
| `wizard_order` | `int \| None` | Position in the setup wizard. `None` excludes the adapter from the wizard. |
### 4.2 Required methods
Every subclass must implement these three abstract methods.
**`async def poll(self) -> AsyncIterator[Event]`**
> Poll the source for new events. Yields `Event` objects for each new/updated
> event found.
The supervisor drives the loop; `poll()` is a one-shot async generator the
supervisor calls once per cadence tick, drains, and discards. Adapters MUST NOT
loop internally — control of cadence and concurrency lives in the supervisor.
The supervisor wraps each yielded `Event` in a CloudEvents envelope and
publishes it. The adapter is responsible only for yielding `Event` objects;
all envelope construction, NATS publish, and metadata heartbeat emission live
outside the adapter.
**`async def apply_config(self, new_config: AdapterConfig) -> None`**
> Apply new configuration to the adapter. Called by supervisor when config
> changes via hot-reload. The adapter should extract relevant settings from
> `new_config.settings` and update its internal state.
`new_config.settings` is a `dict[str, Any]`; re-validate it against
`settings_schema` if you intend to read structured fields. Most adapters cache
a couple of fields (e.g. `region`, `parameter_codes`) on `self` during
`__init__` and refresh them here. Do NOT tear down `self._session` or
`self._db` here — that is `shutdown()`'s responsibility.
**`def subject_for(self, event: Event) -> str`**
> Compute the NATS subject for an event. Each adapter knows its own subject
> hierarchy. The supervisor calls this to determine where to publish each
> event.
The result must match the subject filter of exactly one `StreamEntry` in
`central.streams.STREAMS`. See [§6](#6-subject-namespace-conventions) for the
naming rules and [§8](#8-the-streamentry-registry) for the stream side.
### 4.3 Optional lifecycle hooks
**`async def startup(self) -> None`** — Optional. Called once before the first
`poll()`. Open the HTTP session, open the sqlite3 cursor DB, create any
required tables. The default is a no-op.
**`async def shutdown(self) -> None`** — Optional. Called once at graceful
shutdown. Close the HTTP session, close the sqlite3 cursor DB. The default is
a no-op.
**`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** —
Optional. The settings-page preview hook. The default returns `None` (no
preview). See [§11](#11-settings-preview-hook) for the contract.
### 4.4 Constructor signature
The supervisor instantiates adapters with this exact keyword-argument call
(`src/central/supervisor.py` `_create_adapter`):
```python
cls(
config=config, # AdapterConfig — name, enabled, cadence_s, settings, …
config_store=self._config_store, # ConfigStore — staged-vs-live config access
cursor_db_path=CURSOR_DB_PATH, # Path — /var/lib/central/cursors.db
)
```
Adapters take exactly these three keyword arguments. Adding new constructor
parameters silently breaks supervisor instantiation; if you need additional
state, derive it from `config.settings` or read it lazily during `startup()`.
---
## 5. Settings and configuration
### 5.1 The settings schema
`settings_schema` is a Pydantic v2 `BaseModel` subclass. It is the SoT for
operator-tunable fields and the basis for GUI form rendering, wizard
validation, and migration seed defaults. Three rules:
- **Default values are SoT.** The settings model's defaults are the canonical
starting state. Migrations seed `config.adapters.settings` from the same
model; tests build fixtures by instantiating the model with no args. Do NOT
duplicate default lists in migration JSON, fixtures, or test code — derive
them from the model class.
- **Use `RegionConfig` for bounding boxes.** Defined in
`central.config_models.RegionConfig`. Validates north/south/east/west,
rejects zero-width bboxes, rejects antimeridian crossings. Reuse it; do not
reinvent it.
- **Optional region.** Some adapters strongly recommend a region (NWIS, WFIGS)
but accept `None` so an operator can test before configuring. Log a WARN at
`startup()` if a region is recommended but unset; do not refuse to start.
### 5.2 What gets stored where
| State | Location | Adapter access |
|---|---|---|
| Adapter config (cadence, settings, enabled flag, paused state) | DB table `config.adapters`, fronted by `ConfigStore` | Passed in `__init__` as `config: AdapterConfig`; refreshed via `apply_config()` |
| API keys | DB table `config.api_keys`, fronted by `ConfigStore` | Read via `self._config_store.get_api_key(alias)`; never persisted on the adapter |
| Dedup cursor / observed sets | sqlite3 at `cursor_db_path` (`/var/lib/central/cursors.db`) | Each adapter creates its own table(s); the file is shared across adapters by convention but per-adapter rows are namespaced by the `adapter` column |
| Stream retention / max_bytes | DB table `config.streams`; managed by supervisor | Adapters do not read or write stream config |
`AdapterConfig.settings` is a `dict[str, Any]` — re-validate against
`settings_schema` before consuming structured fields. Most adapters do this
once in `__init__` and again in `apply_config()`.
### 5.3 preview_for_settings boundary
`preview_for_settings()` is the one place that does NOT have access to
`self._config_store` or `cursor_db_path` semantics. See
[§11](#11-settings-preview-hook). The framework may instantiate adapters
with a stub config store solely to call this method, and the GUI process never
calls `startup()`. Do not assume the adapter's HTTP session, cursor DB, or any
other startup-time state exists when preview runs.
---
## 6. Subject namespace conventions
### 6.1 The shape
Every subject starts with `central.` and decomposes as:
```
central.<domain>.<subtype>[.<dimensions>...]
```
- `<domain>` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`,
`meta` (the current set — see [§8](#8-the-streamentry-registry) for adding
one). Operators MUST be able to subscribe to all of one domain with
`central.<domain>.>`.
- `<subtype>` is adapter-driven and identifies the event category within the
domain (e.g. `hotspot`, `alert`, `incident`, `kindex`, `proton_flux`).
- `<dimensions>` are zero or more refinement tokens — geographic, temporal,
upstream identifier — that the consumer would plausibly want to filter on.
Token rules:
- **All lowercase.** No uppercase, no mixed-case.
- **`snake_case` tokens.** Multi-word tokens use underscores
(`proton_flux`, `sea_lake_ice`), not hyphens.
- **No empty tokens.** Fall back to a literal `unknown` token when a dimension
is genuinely absent (NWS does this for alerts lacking a primary region).
- **No `.removed` injected mid-token.** Removal subjects always read
`<subtype>.removed.<region>` — see [§9](#9-removal--fall-off-patterns).
### 6.2 Where the canonical subjects live
`docs/CONSUMER-INTEGRATION.md` §4 is the SoT for the concrete subject patterns
that exist today. Adapter authors should mirror new subjects there as part of
the same PR; the consumer test suite asserts every discovered adapter has a
per-adapter subsection.
### 6.3 Subject-token decomposition belongs in the adapter
When an upstream identifier is composite — agency-prefix + bare site number,
state + county, satellite + confidence — the adapter is responsible for
splitting it into tokens. Two conventions:
- **One helper function per decomposition, one place.** NWIS' helper sits at
module scope in `nwis.py`:
```python
def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]:
"""Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no)."""
...
```
Both `subject_for()` and `Event.category` construction call through it. WFIGS
does the same with `subject_suffix()` in `wfigs_common.py`. Never inline the
decomposition in two places — they will drift.
- **Round-trip via `Event.category`.** Adapters set `Event.category` to a
Central-relative form (`hydro.<param>.<agency>.<bare_site_no>`,
`fire.incident.<state>.<county>`); `subject_for()` then prepends `central.`
and may rearrange tokens. The category lives in the wire payload (see
[`CONSUMER-INTEGRATION.md` §5b](./CONSUMER-INTEGRATION.md#5b-inner-event-payload));
the subject is computed from it at publish time.
### 6.4 Adding a new top-level domain
Top-level domains are stream-mapped 1:1 (see [§8](#8-the-streamentry-registry)).
Adding `central.<new_domain>.>` requires:
1. One new `StreamEntry` in `src/central/streams.py`.
2. One new migration row that seeds `config.streams` with retention and
`max_bytes`.
3. Update `docs/CONSUMER-INTEGRATION.md` §3 and §4 to advertise the new
stream / subject pattern.
`supervisor.STREAM_SUBJECTS`, `archive.STREAMS`, and `gui.DASHBOARD_STREAMS`
all derive from `STREAMS` at import time; no code edits in those modules are
required.
---
## 7. Dedup key construction
### 7.1 What "dedup key" means here
Adapters maintain a per-adapter sqlite3 table at `cursor_db_path` (typically
`published_ids` keyed by `(adapter, event_id)`) to suppress re-yielding the
same event across successive `poll()` runs. The string the adapter writes to
`published_ids.event_id` is the **adapter-side dedup key**.
Separately, every yielded `Event.id` flows into the CloudEvents envelope's
`id` field (also set as the `Nats-Msg-Id` header on publish). Most adapters
use the same string for both — adapter-side dedup key and `Event.id` are
identical — which keeps consumer-side dedup ([§9 of CONSUMER-INTEGRATION.md](./CONSUMER-INTEGRATION.md#9-dedup-implementation-guide))
trivial.
A few adapters intentionally diverge: `eonet` stores a richer composite
adapter-side (id + timeline date) while keeping `Event.id` stable, because
consumers usually want timeline updates to overwrite the same id. The
`Event.id` is always the consumer contract; adapter-side composites are an
implementation detail.
### 7.2 Two shapes
**Single-token.** Upstream provides a stable, globally-unique identifier:
- `usgs_quake` — USGS feature id (`ci10240102`)
- `gdacs` — RSS guid (`WF1028708`)
- `wfigs_incidents` / `wfigs_perimeters` — IRWIN UUID
- `inciweb` — numeric incident id
- `swpc_kindex` — bin timestamp (single fixed subject, time is the natural key)
- `nws` — alert URL
- `eonet``EONET_NNNNN` (consumer-facing); composite tracked separately
adapter-side as described in §7.1
Use the upstream id directly. No prefix, no separators.
**Composite.** Upstream id is not by itself sufficient — needs a parameter, a
timestamp, or a tombstone marker to dedup correctly:
| Adapter | Shape |
|---|---|
| `firms` | `<satellite>:<acq_date>:<acq_time>:<lat_3dp>:<lon_3dp>` |
| `nwis` | `nwis:<monitoring_location_id>:<parameter_code>:<time_iso>` |
| `wfigs_incidents` / `wfigs_perimeters` removals | `<IrwinID>:removed:<iso_now>` |
### 7.3 Separator convention
Use `:` as the dedup-key separator for new adapters.
There is a pre-existing wrinkle: two SWPC adapters use `|` instead.
| Adapter | Separator | Shape |
|---|---|---|
| `swpc_alerts` | `\|` | `<product_id>\|<issue_timestamp>` |
| `swpc_protons` | `\|` | `<time_tag>\|<energy>` |
This is the historical reality at the time of writing — not a recommendation.
Do not propagate `|` to new adapters; do not refactor the SWPC pair to `:`
without an independent rename PR (the strings are persisted in
`/var/lib/central/cursors.db` rows).
### 7.4 Implementation
Every existing adapter follows the same pattern:
```python
async def startup(self) -> None:
...
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
```
Then `is_published(event_id) -> bool`, `mark_published(event_id) -> None`, and
a periodic `sweep_old_ids()` (typical retention: 7 to 30 days, tuned per
adapter). Copy from `swpc_kindex.py` — it is the canonical reference shape.
### 7.5 The CloudEvents `id` field
`Event.id` becomes the CloudEvents envelope `id` and the `Nats-Msg-Id` header.
JetStream applies its own short dedup window keyed on `Nats-Msg-Id`, but
consumers do their own dedup ([CONSUMER-INTEGRATION.md §9](./CONSUMER-INTEGRATION.md#9-dedup-implementation-guide))
and should not rely on JetStream's window. Stability of `Event.id` across
re-publishes is the producer's contract; the consumer side trusts it.
---
## 8. The StreamEntry registry
`src/central/streams.py` is the SoT for stream definitions. The whole file is
short enough to quote:
```python
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True
dashboard: bool = True
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
```
### 8.1 What derives from STREAMS
Three import-time consumers read `STREAMS` and build their own structures:
| Derived constant | Module | Shape |
|---|---|---|
| `STREAM_SUBJECTS` | `central.supervisor` | `dict[str, list[str]]` — used to create JetStream streams |
| `STREAMS` | `central.archive` | `list[tuple[str, str]]` for `event_bearing=True` entries only |
| `DASHBOARD_STREAMS` | `central.gui.routes` | `list[str]` for `dashboard=True` entries |
`tests/test_stream_registry.py` asserts each derivation matches the registry —
adding a stream automatically extends the test surface; no new test code is
required.
### 8.2 Adding a stream
When a new adapter needs a new top-level domain:
1. **One line in `src/central/streams.py`.** Append a `StreamEntry(...)` to
the `STREAMS` list.
2. **One migration row.** Seed `config.streams` with retention and
`max_bytes`. Existing migrations in `migrations/` are the template.
`event_bearing=False` is reserved for `CENTRAL_META` today; adding a second
non-event-bearing stream silently excludes it from the archive. The registry
test explicitly guards against this and requires a deliberate test update if
the invariant changes.
### 8.3 The consistency gate
`tests/test_stream_registry.py` is the property-test gate. It asserts:
- Stream names and subject filters are unique.
- Every subject filter matches `^central\.[a-z][a-z_]*\.>$`.
- `CENTRAL_META` is the only non-event-bearing stream.
- The supervisor / archive / GUI derivations all match the registry.
Mirror that discipline in any new test you write for adapter-side stream
behavior: derive expected values from `STREAMS` rather than hand-typing them.
---
## 9. Removal / fall-off patterns
When an upstream record disappears, an adapter has two options:
### 9.1 Explicit removal subject
Emit an `Event` whose subject is `<subtype>.removed.<region>` and whose
`category` ends in `.removed`. This is the right choice when:
- Upstream signals retirement explicitly (`gdacs`'s `iscurrent: false`).
- The upstream is a "current state" feed and absence is meaningful
(`wfigs_incidents`, `wfigs_perimeters`, `eonet`).
The subtype-before-removed token order is a hard convention — consumers
filtering on `central.<domain>.<subtype>.>` receive both live and removal
events with a single subscription. See
[`CONSUMER-INTEGRATION.md` §7a](./CONSUMER-INTEGRATION.md#7a-explicit-removal-subjects-consumer-must-subscribe-to-handle-cleanly)
for the consumer side.
### 9.2 Absence-as-signal
No removal subject; consumers infer disappearance from a time gap or an
upstream-specific marker (`expires` field, supersession text). This is the
right choice when:
- The upstream is a stream of point-in-time observations (`firms` hotspot
pixels, `swpc_kindex` 3-hour bins, `swpc_protons` cadence-driven samples).
- Upstream signals expiry inline (`nws` `expires` field, `nws`
`messageType: Cancel`).
- The upstream is durable / append-only (`usgs_quake` catalog, `inciweb` RSS).
See [`CONSUMER-INTEGRATION.md` §7b](./CONSUMER-INTEGRATION.md#7b-absence-as-signal-no-removal-subject--consumer-infers-from-time-gap)
for the consumer-side enumeration.
### 9.3 Composite removal dedup key
When an adapter emits explicit removals, the same upstream id can fall off
and re-appear multiple times over an adapter's lifetime. The dedup key for a
removal event therefore needs a removal-detection timestamp:
```python
Event(
id=f"{irwin_id}:removed:{now.isoformat()}",
category="fire.incident.removed",
# ...
)
```
The `:removed:<iso_now>` suffix is the cross-adapter convention. Keep it.
### 9.4 Removal payload schema
Removal events ship an empty `geo` block and a sparse `data` payload. The
exact shape (id field name, reason values, last_observed_at, adapter-specific
context fields) is documented in
[`CONSUMER-INTEGRATION.md` §7c](./CONSUMER-INTEGRATION.md#7c-removal-event-payload-schema-cross-cutting);
adapter authors should mirror it. Do not restate it here.
---
## 10. Anti-patterns — what NOT to do
These are the patterns prior reviews have explicitly rejected. Reject them
again on sight in a new-adapter PR.
### 10.1 Enrichment during `poll()`
No calls to upstream metadata endpoints, no reverse-geocode, no consultation
of a second source to fill in fields the primary feed omitted. The "NWIS
enrichment" Phase 3 proposal — joining live measurements against the
monitoring-locations metadata endpoint during `poll()` — was rejected on the
[§2](#2-the-design-principle) principle. Future proposals along the same
lines get the same answer.
If enrichment is genuinely necessary, the right shape is a separate adapter
(or a downstream consumer) — not an `if metadata_missing: await
fetch_metadata()` branch buried in an adapter's `poll()`.
### 10.2 Silent field dropping
`Event.data` carries the upstream payload verbatim. Do not omit a field
because it "looks redundant" or "isn't useful right now." Operators
debugging a downstream consumer often need every field the upstream sent;
dropping fields makes that impossible after the fact.
The two acceptable transforms — token normalization (case, separators) and
dedup-key construction — are not field-drops. They produce new fields
derived from the upstream payload; the upstream payload itself is preserved.
### 10.3 Magic-number asserts in tests
Adapter tests that pin event counts to literal integers (`assert
len(events) == 47`) break the next time the upstream's fixture is
refreshed and contribute zero diagnostic value. Either assert structural
invariants (every yielded event has a non-empty `id`; subjects match the
adapter's `subject_for()` output; etc.) or assert against a count computed
from the input fixture (`assert len(events) == len(fixture.features)`).
### 10.4 Hardcoded stream / adapter lists in tests
Tests that enumerate `["CENTRAL_WX", "CENTRAL_FIRE", ...]` or `["firms",
"inciweb", ...]` as literals drift the moment a stream or adapter is added
or removed. Derive from the SoT:
```python
from central.streams import STREAMS
from central.adapter_discovery import discover_adapters
stream_names = {s.name for s in STREAMS}
adapter_names = set(discover_adapters().keys())
```
`tests/test_stream_registry.py` and `tests/test_consumer_doc.py` are the
in-tree templates. The sibling test for this doc
(`tests/test_producer_doc.py`) follows the same rule.
### 10.5 Persistent state outside `cursor_db_path`
Adapters MUST NOT write to disk outside the cursor DB at
`cursor_db_path`. No JSON files in `/var/lib/central/<adapter>/`, no
state stashed in `/tmp`. Multi-process invariants (the supervisor may
restart an adapter; the GUI may instantiate it independently to call
`preview_for_settings()`) only hold for the cursor DB and `config.adapters`.
### 10.6 Blocking calls in async paths
`poll()`, `apply_config()`, `startup()`, `shutdown()`, and
`preview_for_settings()` are async. Use `aiohttp`, not `requests`; use
`asyncio.to_thread()` to push CPU-bound or genuinely blocking work
(`shapely` geometry operations, large JSON parsing) off the event loop.
The supervisor runs every adapter on a single event loop — one blocked
adapter stalls every other adapter.
### 10.7 Reaching into framework internals from `preview_for_settings`
`preview_for_settings()` is a pure function of `settings`. Do NOT access
`self._config_store`, `self._cursor_db_path`, or any state established
in `startup()`. The framework may instantiate the adapter with a stub
config store solely to call this method. See [§11](#11-settings-preview-hook).
### 10.8 Hardcoded subject strings in tests
A test that asserts `subject == "central.fire.incident.mt.flathead"` for a
specific fixture is fine; a test that hardcodes the subject pattern as a
string and re-derives it instead of calling the adapter's
`subject_for(event)` is the wrong shape. Round-trip through the adapter so
the test breaks the day someone shifts a token.
---
## 11. Settings preview hook
`preview_for_settings()` is the optional GUI-side hook that surfaces a
settings-driven preview on the adapter edit page. NWIS uses it to show the
monitoring-locations inside the configured bbox before the operator hits
save; most adapters opt out by inheriting the base class's `return None`.
### 11.1 The contract (verbatim from `central.adapter.SourceAdapter`)
> Optional. Override to surface a settings-driven preview on the edit page.
>
> Return `list[dict]` (framework renders as a generic table; columns come from
> the first dict's keys, in insertion order). Return `None` to skip preview.
> Raise to surface an error banner — framework catches at the route boundary.
>
> Contract:
> - Preview is a pure function of `settings`. Do NOT access
> `self._config_store` or `cursor_db` state — the framework may instantiate
> adapters with a stub config_store solely to call this method.
> - Network preview implementations must open their own short-lived
> `aiohttp` session (the adapter's polling session may not exist; the GUI
> process never calls `startup()`).
> - Return `None` when preview is not meaningful (e.g., required settings
> like region are unset). Return `[]` explicitly if the query ran and
> matched zero rows — the framework renders that distinctly from `None`.
### 11.2 The three return states
| Return | Framework renders |
|---|---|
| `None` | Nothing — no preview block on the edit page |
| `[]` | `"Preview (0 rows)"` legend, no table |
| `list[dict]` with rows | Generic table; columns from the first dict's keys in insertion order; legend `"Preview (N rows)"` |
`tests/test_preview_hook.py` is the test surface for the partial; mirror its
discipline for any adapter-specific preview test.
### 11.3 Generic rendering — no per-adapter Jinja
The framework is duck-typed on `list[dict]`. Column headers are the first
dict's keys. There is no per-adapter branch in the template, and no
adapter-name conditionals are accepted on review. If an adapter's preview
needs a column the framework does not surface, the adapter rewrites the dict
shape, not the template.
### 11.4 Implementation pattern
NWIS is the canonical example. Skeleton:
```python
async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None:
if settings.region is None:
return None
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=15),
) as session:
async with session.get(url, headers=...) as resp:
resp.raise_for_status()
page = await resp.json()
return [
{"site_id": ..., "name": ..., "site_type": ..., "state": ...}
for feat in page.get("features") or []
]
```
Key points:
- Fresh `aiohttp.ClientSession` per call — `self._session` may not exist.
- Short timeout (NWIS uses 15s) — the edit page must render quickly.
- Small row cap (NWIS uses 50) — the preview is for orientation, not bulk
inspection.
- Raise on HTTP / parse failure — the framework catches at the route boundary
and renders an error banner.
---
## 12. Acceptance gate for a new adapter
A new-adapter PR is accepted when every item below holds. The list is
mechanical; reviewers and the PR author should both run through it before
requesting / granting merge.
- [ ] **Upstream curl evidence captured.** A real HTTP transcript (curl
command + redacted response body) is pinned somewhere in the PR
description or a fixture file. Documents what the adapter was written
against.
- [ ] **`SourceAdapter` subclass conforms to [§4](#4-the-sourceadapter-base-class).**
Class attributes set, three abstract methods implemented, constructor
signature matches the supervisor call exactly.
- [ ] **`tests/test_<adapter>.py` exists.** Unit tests for the adapter; no
magic-number asserts ([§10.3](#103-magic-number-asserts-in-tests)); no
hardcoded stream / adapter lists
([§10.4](#104-hardcoded-stream--adapter-lists-in-tests)).
- [ ] **Dry-run on CT104 passes.** The supervisor instantiates the adapter
without errors; one full `poll()` cycle yields events without raising.
- [ ] **Dedup keys stable across multiple ticks.** Two successive `poll()`
runs against a frozen upstream fixture yield the same event-count delta
as expected (zero new events on the second run if upstream is
unchanged).
- [ ] **No hardcoded values that should be derived.** Subject prefixes,
stream names, default parameter lists, etc., all derive from the
registry / settings model / discovery surface.
- [ ] **`CONSUMER-INTEGRATION.md` updated.** New per-adapter subsection in
§6, new subject row in §4, new stream row in §3 if a new domain was
introduced. `tests/test_consumer_doc.py` passes.
- [ ] **`PRODUCER-INTEGRATION.md` mentions the adapter only if it
introduces a new convention.** This doc is contract-shaped, not
catalog-shaped — most new adapters need no edit here.
- [ ] **Full pytest suite green.**
---

View file

@ -0,0 +1,21 @@
-- Migration: 022_add_eonet_adapter
-- Adds the NASA EONET adapter row to config.adapters.
-- Ships disabled; operator enables via GUI.
--
-- The settings JSON below is the literal output of EONETSettings().model_dump_json()
-- at migration-author time. Regenerate via:
-- sudo -u central .venv/bin/python -c \
-- "from central.adapters.eonet import EONETSettings; print(EONETSettings().model_dump_json())"
-- Do NOT hand-edit the category_allowlist here — _DEFAULT_CATEGORIES in
-- src/central/adapters/eonet.py is the single source of truth.
--
-- Idempotent: uses ON CONFLICT DO NOTHING.
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'eonet',
false,
1800,
'{"category_allowlist":["drought","dustHaze","earthquakes","floods","landslides","manmade","seaLakeIce","severeStorms","snow","tempExtremes","volcanoes","waterColor","wildfires"],"region":null}'::jsonb
)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,29 @@
-- Migration: 023_add_nwis_adapter_and_hydro_stream
-- Adds the CENTRAL_HYDRO JetStream stream row AND the NWIS adapter row.
-- Folded into a single migration because the adapter publishes onto
-- central.hydro.> — both rows ship together.
--
-- Stream retention mirrors CENTRAL_DISASTER (7 days, 1 GiB).
-- Adapter ships disabled; operator enables via GUI after setting a bbox.
--
-- The settings JSON below is the literal output of NWISSettings().model_dump_json()
-- at migration-author time. Regenerate via:
-- sudo -u central .venv/bin/python -c \
-- "from central.adapters.nwis import NWISSettings; print(NWISSettings().model_dump_json())"
-- Do NOT hand-edit the parameter_codes here — _DEFAULT_PARAMETER_CODES in
-- src/central/adapters/nwis.py is the single source of truth.
--
-- Idempotent: both inserts use ON CONFLICT DO NOTHING.
INSERT INTO config.streams (name, max_age_s, max_bytes)
VALUES ('CENTRAL_HYDRO', 604800, 1073741824)
ON CONFLICT (name) DO NOTHING;
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'nwis',
false,
900,
'{"parameter_codes":["00060","00065","00010"],"region":null}'::jsonb
)
ON CONFLICT (name) DO NOTHING;

View file

@ -78,3 +78,23 @@ class SourceAdapter(ABC):
async def shutdown(self) -> None: async def shutdown(self) -> None:
"""Optional lifecycle hook called on graceful shutdown.""" """Optional lifecycle hook called on graceful shutdown."""
pass pass
async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None:
"""Optional. Override to surface a settings-driven preview on the edit page.
Return list[dict] (framework renders as a generic table; columns come from
the first dict's keys, in insertion order). Return None to skip preview.
Raise to surface an error banner framework catches at the route boundary.
Contract:
- Preview is a pure function of `settings`. Do NOT access
self._config_store or cursor_db state the framework may instantiate
adapters with a stub config_store solely to call this method.
- Network preview implementations must open their own short-lived
aiohttp session (the adapter's polling session may not exist; the GUI
process never calls startup()).
- Return None when preview is not meaningful (e.g., required settings
like region are unset). Return [] explicitly if the query ran and
matched zero rows the framework renders that distinctly from None.
"""
return None

View file

@ -0,0 +1,434 @@
"""NASA EONET (Earth Observatory Natural Event Tracker) adapter."""
import json
import logging
import re
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from pydantic import BaseModel, Field
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
EONET_EVENTS_URL = "https://eonet.gsfc.nasa.gov/api/v3/events"
# Single source of truth for default category list. Mirrors the upstream
# /api/v3/categories registry at the time of integration. Do NOT duplicate
# this list in tests, fixtures, or migrations — derive from EONETSettings'
# default instead. Refresh by curling /api/v3/categories if upstream adds
# new IDs.
_DEFAULT_CATEGORIES: list[str] = [
"drought",
"dustHaze",
"earthquakes",
"floods",
"landslides",
"manmade",
"seaLakeIce",
"severeStorms",
"snow",
"tempExtremes",
"volcanoes",
"waterColor",
"wildfires",
]
def _subject_category(category_id: str | None) -> str:
"""Convert upstream EONET camelCase category id to lower_snake_case subject component.
Examples: seaLakeIce -> sea_lake_ice, dustHaze -> dust_haze,
severeStorms -> severe_storms. Single-word ids pass through lowercased.
Empty/None -> 'unknown'. This is the ONLY place this mapping lives.
"""
if not category_id:
return "unknown"
return re.sub(r"(?<!^)(?=[A-Z])", "_", category_id).lower()
def _parse_iso_utc(raw: str | None) -> datetime | None:
"""Parse an ISO 8601 timestamp ('...Z' or with offset) to UTC datetime."""
if not raw:
return None
try:
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
except (ValueError, TypeError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _dedup_key(event_id: str, latest_geometry_date_iso: str) -> str:
"""Composite dedup key: same id + same timeline -> suppress; timeline advance -> re-publish."""
return f"eonet:{event_id}:{latest_geometry_date_iso}"
def init_eonet_observed_table(db: sqlite3.Connection) -> None:
db.execute("""
CREATE TABLE IF NOT EXISTS eonet_observed (
event_id TEXT PRIMARY KEY,
category_id TEXT,
last_observed_at TEXT NOT NULL
)
""")
db.commit()
def get_observed(db: sqlite3.Connection) -> dict[str, str | None]:
cur = db.execute("SELECT event_id, category_id FROM eonet_observed")
return {row[0]: row[1] for row in cur.fetchall()}
def mark_observed(db: sqlite3.Connection, event_id: str, category_id: str | None) -> None:
db.execute(
"""
INSERT INTO eonet_observed (event_id, category_id, last_observed_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT (event_id) DO UPDATE SET
category_id = excluded.category_id,
last_observed_at = CURRENT_TIMESTAMP
""",
(event_id, category_id),
)
db.commit()
def mark_retired(db: sqlite3.Connection, event_ids: set[str]) -> None:
for event_id in event_ids:
db.execute("DELETE FROM eonet_observed WHERE event_id = ?", (event_id,))
db.commit()
class EONETSettings(BaseModel):
"""Settings schema for NASA EONET adapter.
category_allowlist defaults to ALL upstream categories. PM call: keep the
knob symmetric with GDACS event_types operator opts OUT per-category if
duplicate events on gdacs.* and eonet.* subjects become a problem in
practice. Empirical note: in a 200-event upstream sample, ~77.5% of events
for wildfires/floods/severeStorms/volcanoes were GDACS-sourced. Disable
those categories here if downstream subscribers already consume the
GDACS adapter.
"""
category_allowlist: list[str] = Field(default=list(_DEFAULT_CATEGORIES))
region: RegionConfig | None = None
class EONETAdapter(SourceAdapter):
"""NASA EONET v3 natural-event tracker adapter."""
name = "eonet"
display_name = "NASA EONET — Earth Observatory"
description = (
"Natural event tracker from NASA EONET v3. Note: heavy GDACS overlap "
"for wildfires/floods/severeStorms/volcanoes — disable per-category "
"via the allowlist below if duplicate events on gdacs.* and eonet.* "
"subjects are not wanted."
)
settings_schema = EONETSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 1800
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self.category_allowlist: list[str] = list(
config.settings.get("category_allowlist", _DEFAULT_CATEGORIES)
)
region_dict = config.settings.get("region")
self.region: RegionConfig | None = (
RegionConfig(**region_dict) if region_dict else None
)
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
init_eonet_observed_table(self._db)
self._db.commit()
logger.info(
"EONET adapter started",
extra={
"categories": self.category_allowlist,
"region": self.region.model_dump() if self.region else None,
},
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("EONET adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
self.category_allowlist = list(
new_config.settings.get("category_allowlist", _DEFAULT_CATEGORIES)
)
region_dict = new_config.settings.get("region")
self.region = RegionConfig(**region_dict) if region_dict else None
logger.info(
"EONET config updated",
extra={
"categories": self.category_allowlist,
"region": self.region.model_dump() if self.region else None,
},
)
def is_published(self, dedup_key: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, dedup_key),
)
return cur.fetchone() is not None
def mark_published(self, dedup_key: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, dedup_key),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("EONET swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
# event.category is "disaster.eonet.<subject_category>[.removed]"
parts = event.category.split(".")
subj_cat = parts[2] if len(parts) >= 3 else "unknown"
if len(parts) >= 4 and parts[-1] == "removed":
return f"central.disaster.eonet.{subj_cat}.removed.global"
return f"central.disaster.eonet.{subj_cat}.global"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch(self) -> str:
if not self._session:
raise RuntimeError("Session not initialized")
async with self._session.get(
EONET_EVENTS_URL, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
text = await resp.text()
return text
async def poll(self) -> AsyncIterator[Event]:
if not self._db:
raise RuntimeError("Database not initialized")
try:
content = await self._fetch()
except Exception as e:
logger.error("EONET fetch failed", extra={"error": str(e)})
raise
try:
payload = json.loads(content)
except json.JSONDecodeError as e:
logger.error("EONET JSON parse error", extra={"error": str(e)})
raise
items = payload.get("events", [])
logger.info("EONET fetch completed", extra={"item_count": len(items)})
observed_before = get_observed(self._db)
current_ids: set[str] = set()
events_yielded = 0
for item in items:
event_id = item.get("id")
if not event_id:
continue
categories = item.get("categories") or []
category_id: str | None = None
if categories and isinstance(categories[0], dict):
category_id = categories[0].get("id")
if not category_id or category_id not in self.category_allowlist:
continue
subject_cat = _subject_category(category_id)
geometry = item.get("geometry") or []
if geometry:
latest = max(geometry, key=lambda g: g.get("date") or "")
else:
latest = None
latest_date_iso = (latest or {}).get("date") or ""
event_time = _parse_iso_utc(latest_date_iso) or datetime.now(timezone.utc)
coords = (latest or {}).get("coordinates")
centroid: tuple[float, float] | None = None
if (
isinstance(coords, list)
and len(coords) == 2
and all(isinstance(c, (int, float)) for c in coords)
):
centroid = (float(coords[0]), float(coords[1])) # GeoJSON (lon, lat)
if self.region is not None:
if centroid is None:
continue
lon, lat = centroid
if not (
self.region.west <= lon <= self.region.east
and self.region.south <= lat <= self.region.north
):
continue
current_ids.add(event_id)
geo = Geo(
centroid=centroid,
bbox=None,
regions=[],
primary_region=None,
)
magnitude_value = (latest or {}).get("magnitudeValue")
magnitude_unit = (latest or {}).get("magnitudeUnit")
sources = item.get("sources") or []
data: dict[str, Any] = {
"event_id": event_id,
"category_id": category_id,
"title": item.get("title") or "",
"description": item.get("description") or "",
"url": item.get("link") or "",
"closed": item.get("closed"),
"sources": [
{"id": s.get("id"), "url": s.get("url")} for s in sources
],
"magnitudeValue": magnitude_value,
"magnitudeUnit": magnitude_unit,
"latest_geometry_date": latest_date_iso or None,
}
dedup_key = _dedup_key(event_id, latest_date_iso)
if self.is_published(dedup_key):
mark_observed(self._db, event_id, category_id)
continue
event = Event(
id=event_id,
adapter=self.name,
category=f"disaster.eonet.{subject_cat}",
time=event_time,
severity=0,
geo=geo,
data=data,
)
yield event
self.mark_published(dedup_key)
mark_observed(self._db, event_id, category_id)
events_yielded += 1
# Fall-off: events present in observed_before but absent from this poll
fallen_off = set(observed_before.keys()) - current_ids
for event_id in fallen_off:
prior_category_id = observed_before[event_id]
if prior_category_id and prior_category_id not in self.category_allowlist:
# Was published before settings narrowed; clean up silently.
mark_retired(self._db, {event_id})
continue
subject_cat = _subject_category(prior_category_id)
tombstone_id = f"{event_id}:removed"
tombstone_dedup_key = _dedup_key(tombstone_id, "")
if self.is_published(tombstone_dedup_key):
mark_retired(self._db, {event_id})
continue
tombstone = Event(
id=tombstone_id,
adapter=self.name,
category=f"disaster.eonet.{subject_cat}.removed",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={
"event_id": event_id,
"category_id": prior_category_id,
"reason": "missing_from_feed",
},
)
yield tombstone
self.mark_published(tombstone_dedup_key)
mark_retired(self._db, {event_id})
events_yielded += 1
self.sweep_old_ids()
logger.info(
"EONET poll completed",
extra={
"events_yielded": events_yielded,
"current_observed": len(current_ids),
"fallen_off": len(fallen_off),
},
)

View file

@ -0,0 +1,435 @@
"""USGS NWIS (National Water Information System) adapter — OGC API v0."""
import json
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
import aiohttp
from pydantic import BaseModel, Field
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
NWIS_LATEST_CONTINUOUS_URL = (
"https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items"
)
NWIS_MONITORING_LOCATIONS_URL = (
"https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items"
)
# Per-render cap for the settings-driven preview (PR G.5). Keep small so the
# /adapters/<name> edit page renders quickly.
_PREVIEW_LIMIT = 50
# Single source of truth for the parameter-code default. Operators tune via
# NWISSettings.parameter_codes; do NOT duplicate this list elsewhere
# (tests, fixtures, migration JSON all derive from NWISSettings defaults).
# Codes are USGS pcodes — see /api/v3/parameter-codes for the registry.
# 00060 = Discharge, cubic feet per second
# 00065 = Gage height, feet
# 00010 = Temperature, water, degrees Celsius
_DEFAULT_PARAMETER_CODES: list[str] = ["00060", "00065", "00010"]
# Per-request page size cap. Upstream maxes around 10000; we use a
# moderate value to balance pagination overhead vs latency.
_PAGE_LIMIT = 1000
def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]:
"""Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no).
Examples:
USGS-05420500 -> ("usgs", "05420500")
MO005-400105... -> ("mo005", "400105...")
no-dash-id -> ("unknown", "no-dash-id"-lowercased; effectively the whole id)
This is the ONLY place this decomposition lives subject_for() and
Event.category construction both call through here.
"""
if "-" not in monitoring_location_id:
return ("unknown", monitoring_location_id.lower())
agency, bare = monitoring_location_id.split("-", 1)
return (agency.lower(), bare)
def _parse_iso_utc(raw: str | None) -> datetime | None:
"""Parse an ISO 8601 timestamp ('...Z' or with offset) to UTC datetime."""
if not raw:
return None
try:
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
except (ValueError, TypeError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _dedup_key(monitoring_location_id: str, parameter_code: str, time_iso: str) -> str:
"""Composite dedup: same site+param+measurement-time -> suppress; new time -> re-publish."""
return f"nwis:{monitoring_location_id}:{parameter_code}:{time_iso}"
def _next_link(page: dict) -> str | None:
"""Extract OGC API pagination 'next' link href, or None if absent."""
for link in page.get("links") or []:
if link.get("rel") == "next" and link.get("href"):
return link["href"]
return None
class NWISSettings(BaseModel):
"""Settings schema for USGS NWIS adapter.
bbox via RegionConfig is REQUIRED in practice without a region the
upstream endpoint returns CONUS-wide records (tens of thousands per poll).
Adapter logs WARN at startup if region is None; it does not refuse to
start (operator may be testing).
"""
parameter_codes: list[str] = Field(default=list(_DEFAULT_PARAMETER_CODES))
region: RegionConfig | None = None
class NWISAdapter(SourceAdapter):
"""USGS NWIS adapter via the OGC API v0 `latest-continuous` collection."""
name = "nwis"
display_name = "USGS NWIS — Water Data (OGC)"
description = (
"USGS National Water Information System via the OGC API "
"(latest-continuous collection). Polls the configured parameter codes "
"within the configured bbox. Default params: discharge (00060), "
"gage height (00065), water temperature (00010). Operator opts in to "
"more via parameter_codes. bbox is REQUIRED — without one the endpoint "
"returns the entire US (tens of thousands of records per poll)."
)
settings_schema = NWISSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 900
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self.parameter_codes: list[str] = list(
config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES)
)
region_dict = config.settings.get("region")
self.region: RegionConfig | None = (
RegionConfig(**region_dict) if region_dict else None
)
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
if self.region is None:
logger.warning(
"NWIS started without region bbox — upstream will return CONUS-wide records on every poll. "
"Set region via the GUI before relying on this adapter."
)
logger.info(
"NWIS adapter started",
extra={
"parameter_codes": self.parameter_codes,
"region": self.region.model_dump() if self.region else None,
},
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("NWIS adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
self.parameter_codes = list(
new_config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES)
)
region_dict = new_config.settings.get("region")
self.region = RegionConfig(**region_dict) if region_dict else None
logger.info(
"NWIS config updated",
extra={
"parameter_codes": self.parameter_codes,
"region": self.region.model_dump() if self.region else None,
},
)
def is_published(self, dedup_key: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, dedup_key),
)
return cur.fetchone() is not None
def mark_published(self, dedup_key: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, dedup_key),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("NWIS swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
# event.category is "hydro.<parameter_code>.<agency>.<bare_site_no>"
parts = event.category.split(".")
if len(parts) >= 4:
return f"central.hydro.{parts[1]}.{parts[2]}.{parts[3]}"
return "central.hydro.unknown.unknown.unknown"
def _initial_url(self, parameter_code: str) -> str:
params: dict[str, str] = {
"parameter_code": parameter_code,
"limit": str(_PAGE_LIMIT),
}
if self.region is not None:
params["bbox"] = (
f"{self.region.west},{self.region.south},"
f"{self.region.east},{self.region.north}"
)
return f"{NWIS_LATEST_CONTINUOUS_URL}?{urlencode(params)}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch(self, url: str) -> str:
if not self._session:
raise RuntimeError("Session not initialized")
async with self._session.get(
url, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
return await resp.text()
async def poll(self) -> AsyncIterator[Event]:
if not self._db:
raise RuntimeError("Database not initialized")
events_yielded = 0
for parameter_code in self.parameter_codes:
url: str | None = self._initial_url(parameter_code)
pages_fetched = 0
features_seen = 0
while url:
try:
content = await self._fetch(url)
except Exception as e:
logger.error(
"NWIS fetch failed",
extra={"error": str(e), "parameter_code": parameter_code},
)
raise
try:
page = json.loads(content)
except json.JSONDecodeError as e:
logger.error(
"NWIS JSON parse error",
extra={"error": str(e), "parameter_code": parameter_code},
)
raise
pages_fetched += 1
features = page.get("features") or []
features_seen += len(features)
for feature in features:
event = self._build_event(feature, parameter_code)
if event is None:
continue
dedup_key = _dedup_key(
event.data["monitoring_location_id"],
parameter_code,
event.data["time"],
)
if self.is_published(dedup_key):
continue
yield event
self.mark_published(dedup_key)
events_yielded += 1
url = _next_link(page)
logger.info(
"NWIS parameter poll completed",
extra={
"parameter_code": parameter_code,
"pages_fetched": pages_fetched,
"features_seen": features_seen,
},
)
self.sweep_old_ids()
logger.info(
"NWIS poll completed",
extra={"events_yielded": events_yielded},
)
def _build_event(self, feature: dict, parameter_code: str) -> Event | None:
props = feature.get("properties") or {}
monitoring_location_id = props.get("monitoring_location_id")
if not monitoring_location_id:
return None
time_iso = props.get("time")
event_time = _parse_iso_utc(time_iso)
if event_time is None or not time_iso:
return None
value_raw = props.get("value")
try:
value = float(value_raw) if value_raw is not None else None
except (TypeError, ValueError):
value = None
if value is None:
return None
geom = feature.get("geometry") or {}
coords = geom.get("coordinates")
centroid: tuple[float, float] | None = None
if (
isinstance(coords, list)
and len(coords) == 2
and all(isinstance(c, (int, float)) for c in coords)
):
centroid = (float(coords[0]), float(coords[1])) # GeoJSON (lon, lat)
agency, bare_site_no = _subject_tokens_for_id(monitoring_location_id)
data: dict[str, Any] = {
"monitoring_location_id": monitoring_location_id,
"parameter_code": parameter_code,
"time": time_iso,
"value": value,
"unit_of_measure": props.get("unit_of_measure"),
"statistic_id": props.get("statistic_id"),
"approval_status": props.get("approval_status"),
"qualifier": props.get("qualifier"),
"time_series_id": props.get("time_series_id"),
"last_modified": props.get("last_modified"),
}
return Event(
id=f"{monitoring_location_id}:{parameter_code}:{time_iso}",
adapter=self.name,
category=f"hydro.{parameter_code}.{agency}.{bare_site_no}",
time=event_time,
severity=0,
geo=Geo(centroid=centroid),
data=data,
)
async def _fetch_preview_text(self, url: str) -> str:
"""One-shot GET for the preview render.
Uses a fresh aiohttp session preview must work even when the adapter
isn't started (the GUI process never calls startup()). Factored out so
tests can mock the HTTP call without touching aiohttp internals.
"""
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=15),
) as session:
async with session.get(
url, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
return await resp.text()
async def preview_for_settings(self, settings: NWISSettings) -> list[dict] | None:
"""Surface monitoring-locations inside the configured bbox.
Returns up to _PREVIEW_LIMIT rows from the monitoring-locations
collection. Returns None if region is unset (no useful preview).
Raises on HTTP / JSON / shape failure framework catches at the route.
"""
if settings.region is None:
return None
params = {
"bbox": (
f"{settings.region.west},{settings.region.south},"
f"{settings.region.east},{settings.region.north}"
),
"limit": str(_PREVIEW_LIMIT),
}
url = f"{NWIS_MONITORING_LOCATIONS_URL}?{urlencode(params)}"
text = await self._fetch_preview_text(url)
page = json.loads(text)
features = page.get("features") or []
rows: list[dict] = []
for feat in features:
props = feat.get("properties") or {}
rows.append(
{
"site_id": feat.get("id"),
"name": props.get("monitoring_location_name"),
"site_type": props.get("site_type_code"),
"state": props.get("state_name"),
}
)
return rows

View file

@ -46,6 +46,9 @@ from central.gui.audit import (
) )
from functools import cache from functools import cache
from pathlib import Path
from central.config_models import AdapterConfig
from central.gui.db import get_pool from central.gui.db import get_pool
from central.gui.form_descriptors import describe_fields, FieldDescriptor from central.gui.form_descriptors import describe_fields, FieldDescriptor
from central.adapter_discovery import discover_adapters from central.adapter_discovery import discover_adapters
@ -55,13 +58,23 @@ from pydantic import ValidationError
@cache @cache
def _adapter_classes() -> dict: def _adapter_classes() -> dict:
"""Cached adapter class discovery. """Cached adapter class discovery.
GUI is a separate process from supervisor; walks pkgutil itself. GUI is a separate process from supervisor; walks pkgutil itself.
Python's import cache makes subsequent calls free. Python's import cache makes subsequent calls free.
""" """
return discover_adapters() return discover_adapters()
class _PreviewConfigStore:
"""No-op stand-in passed to adapter __init__ when calling preview_for_settings.
preview_for_settings implementations must create their own one-shot HTTP
session and must not depend on config_store / cursor_db state the GUI
process has no live ConfigStore (the supervisor owns the real one)."""
pass
router = APIRouter() router = APIRouter()
# Streams to display on dashboard -- derived from the registry's dashboard flag. # Streams to display on dashboard -- derived from the registry's dashboard flag.
@ -1450,6 +1463,28 @@ async def adapters_edit_form(
) )
api_key_missing = not has_key api_key_missing = not has_key
# Generic settings-driven preview. Adapters opt in by overriding
# SourceAdapter.preview_for_settings; the framework is duck-typed on the
# returned list[dict] shape and never branches on adapter name.
preview_rows: list[dict] | None = None
preview_error: str | None = None
if adapter_cls is not None and hasattr(adapter_cls, "settings_schema"):
try:
settings_obj = adapter_cls.settings_schema(**settings)
preview_cfg = AdapterConfig(
name=row["name"],
enabled=row["enabled"],
cadence_s=row["cadence_s"],
settings=settings,
updated_at=row["updated_at"],
)
preview_adapter = adapter_cls(
preview_cfg, _PreviewConfigStore(), Path("/dev/null")
)
preview_rows = await preview_adapter.preview_for_settings(settings_obj)
except Exception as exc:
preview_error = f"Preview unavailable: {exc}"
csrf_token = request.state.csrf_token csrf_token = request.state.csrf_token
response = templates.TemplateResponse( response = templates.TemplateResponse(
request=request, request=request,
@ -1466,6 +1501,8 @@ async def adapters_edit_form(
"tile_attribution": tile_attribution, "tile_attribution": tile_attribution,
"api_key_missing": api_key_missing, "api_key_missing": api_key_missing,
"requires_api_key_alias": requires_api_key_alias, "requires_api_key_alias": requires_api_key_alias,
"preview_rows": preview_rows,
"preview_error": preview_error,
}, },
) )
return response return response

View file

@ -0,0 +1,25 @@
{% if preview_error %}
<article aria-label="Preview Unavailable" style="background-color: var(--pico-del-color); margin-bottom: 1rem;">
<strong>{{ preview_error }}</strong>
</article>
{% elif preview_rows is not none %}
<fieldset>
<legend>Preview ({{ preview_rows|length }} rows)</legend>
{% if preview_rows %}
<table class="preview-table" role="grid">
<thead>
<tr>
{% for col in preview_rows[0].keys() %}<th>{{ col }}</th>{% endfor %}
</tr>
</thead>
<tbody>
{% for row in preview_rows %}
<tr>
{% for col in preview_rows[0].keys() %}<td>{{ row[col] }}</td>{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
</fieldset>
{% endif %}

View file

@ -178,6 +178,8 @@
</fieldset> </fieldset>
{% endif %} {% endif %}
{% include "_adapter_preview.html" %}
<button type="submit">Save Changes</button> <button type="submit">Save Changes</button>
<a href="/adapters" role="button" class="outline">Cancel</a> <a href="/adapters" role="button" class="outline">Cancel</a>
</form> </form>

View file

@ -28,5 +28,6 @@ STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_QUAKE", "central.quake.>"), StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"), StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
] ]

108
tests/fixtures/eonet_sample.json vendored Normal file
View file

@ -0,0 +1,108 @@
{
"title": "EONET Events (frozen test fixture)",
"description": "Trimmed from upstream /api/v3/events for tests. Real-shape entries from live data plus synthetic items to exercise categories absent from the live sample (seaLakeIce, dustHaze, tempExtremes) and multi-geometry. Do not refresh casually.",
"link": "https://eonet.gsfc.nasa.gov/api/v3/events",
"events": [
{
"id": "EONET_20098",
"title": "Eagle Lake Fire Wildfire, Hancock, Iowa",
"description": null,
"link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_20098",
"closed": null,
"categories": [
{"id": "wildfires", "title": "Wildfires"}
],
"sources": [
{"id": "IRWIN", "url": "https://irwin.doi.gov/observer/incidents/c283aec9-aab3-4720-addc-47bb6a433a32"}
],
"geometry": [
{
"magnitudeValue": 550.00,
"magnitudeUnit": "acres",
"date": "2026-05-14T11:04:00Z",
"type": "Point",
"coordinates": [-93.72, 43.11]
}
]
},
{
"id": "EONET_31000",
"title": "Synthetic Sea Ice off Greenland",
"description": "Synthetic test event for seaLakeIce category.",
"link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_31000",
"closed": null,
"categories": [
{"id": "seaLakeIce", "title": "Sea and Lake Ice"}
],
"sources": [],
"geometry": [
{
"date": "2026-05-10T00:00:00Z",
"type": "Point",
"coordinates": [-60.0, 70.5]
}
]
},
{
"id": "EONET_42000",
"title": "Synthetic Sahara Dust",
"description": "Synthetic test event for dustHaze category.",
"link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_42000",
"closed": null,
"categories": [
{"id": "dustHaze", "title": "Dust and Haze"}
],
"sources": [],
"geometry": [
{
"date": "2026-05-11T12:00:00Z",
"type": "Point",
"coordinates": [30.0, 25.0]
}
]
},
{
"id": "EONET_55000",
"title": "Synthetic Tropical Storm Path",
"description": "Synthetic multi-geometry event for severeStorms category.",
"link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_55000",
"closed": null,
"categories": [
{"id": "severeStorms", "title": "Severe Storms"}
],
"sources": [
{"id": "JTWC", "url": "https://www.metoc.navy.mil/jtwc/jtwc.html"}
],
"geometry": [
{
"date": "2026-05-12T00:00:00Z",
"type": "Point",
"coordinates": [140.0, 18.0]
},
{
"date": "2026-05-13T06:00:00Z",
"type": "Point",
"coordinates": [142.5, 19.2]
}
]
},
{
"id": "EONET_66000",
"title": "Synthetic Heat Wave",
"description": "Synthetic test event for tempExtremes category.",
"link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_66000",
"closed": null,
"categories": [
{"id": "tempExtremes", "title": "Temperature Extremes"}
],
"sources": [],
"geometry": [
{
"date": "2026-05-09T18:00:00Z",
"type": "Point",
"coordinates": [12.5, 41.9]
}
]
}
]
}

105
tests/fixtures/nwis_latest_sample.json vendored Normal file
View file

@ -0,0 +1,105 @@
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"id": "b28554ea-25f0-485c-ade2-0a4c73901768",
"geometry": {
"type": "Point",
"coordinates": [-90.2520730305021, 41.7805863501123]
},
"properties": {
"id": "b28554ea-25f0-485c-ade2-0a4c73901768",
"time_series_id": "fbe039224c874449ae574fe6668e11d8",
"monitoring_location_id": "USGS-05420500",
"parameter_code": "00060",
"statistic_id": "00011",
"time": "2026-05-19T15:30:00+00:00",
"value": "57800",
"unit_of_measure": "ft^3/s",
"approval_status": "Provisional",
"qualifier": null,
"last_modified": "2026-05-19T15:43:28.012704+00:00"
}
},
{
"type": "Feature",
"id": "8a4eb91f-1d33-4a02-9c4a-92c5b1ac0d12",
"geometry": {
"type": "Point",
"coordinates": [-91.6510, 41.6628]
},
"properties": {
"id": "8a4eb91f-1d33-4a02-9c4a-92c5b1ac0d12",
"time_series_id": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6",
"monitoring_location_id": "USGS-05454500",
"parameter_code": "00060",
"statistic_id": "00011",
"time": "2026-05-19T15:30:00+00:00",
"value": "2110",
"unit_of_measure": "ft^3/s",
"approval_status": "Approved",
"qualifier": null,
"last_modified": "2026-05-19T15:40:00.000000+00:00"
}
},
{
"type": "Feature",
"id": "synthetic-non-usgs-record",
"geometry": {
"type": "Point",
"coordinates": [-93.98775, 40.01822222222222]
},
"properties": {
"id": "synthetic-non-usgs-record",
"time_series_id": "synth-mo005-ts-001",
"monitoring_location_id": "MO005-400105093591601",
"parameter_code": "00060",
"statistic_id": "00011",
"time": "2026-05-19T15:00:00+00:00",
"value": "12.5",
"unit_of_measure": "ft^3/s",
"approval_status": "Provisional",
"qualifier": null,
"last_modified": "2026-05-19T15:05:00.000000+00:00"
}
},
{
"type": "Feature",
"id": "synthetic-no-dash-record",
"geometry": {
"type": "Point",
"coordinates": [-92.0, 42.0]
},
"properties": {
"id": "synthetic-no-dash-record",
"time_series_id": "synth-nodash-ts",
"monitoring_location_id": "STANDALONE12345",
"parameter_code": "00060",
"statistic_id": "00011",
"time": "2026-05-19T14:45:00+00:00",
"value": "1.0",
"unit_of_measure": "ft^3/s",
"approval_status": "Provisional",
"qualifier": null,
"last_modified": "2026-05-19T14:50:00.000000+00:00"
}
}
],
"numberReturned": 4,
"links": [
{
"type": "application/json",
"rel": "self",
"title": "This document as JSON",
"href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?f=json&parameter_code=00060&limit=1000"
},
{
"type": "application/json",
"rel": "collection",
"title": "Parent collection",
"href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous"
}
],
"timeStamp": "2026-05-19T16:00:00.000000+00:00"
}

121
tests/test_consumer_doc.py Normal file
View file

@ -0,0 +1,121 @@
"""Consistency tests for docs/CONSUMER-INTEGRATION.md.
The doc is the consumer contract. These tests catch drift between the doc and
the live code:
- Every StreamEntry in src/central/streams.py must appear in the doc's
"Stream layout" table and vice versa.
- Every adapter discovered via central.adapter_discovery.discover_adapters()
must have a per-adapter subsection (### <name>) in the doc — and vice versa.
The doc's subject patterns and per-adapter copy are NOT directly asserted (they
are operator-readable prose, not machine-parsable); the registry-level checks
guard against the most common drift (adding a stream / adapter and forgetting
to document it).
"""
import re
from pathlib import Path
from central.adapter_discovery import discover_adapters
from central.streams import STREAMS
DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "CONSUMER-INTEGRATION.md"
def _doc_text() -> str:
assert DOC_PATH.is_file(), f"missing: {DOC_PATH}"
return DOC_PATH.read_text()
def _stream_layout_rows(doc: str) -> list[tuple[str, str]]:
"""Parse the doc's "Stream layout" table -> list of (stream_name, subject_filter)."""
section_re = re.compile(
r"^## 3\. Stream layout\s*\n(.*?)(?=^## )",
re.DOTALL | re.MULTILINE,
)
m = section_re.search(doc)
assert m, "doc missing '## 3. Stream layout' section"
section = m.group(1)
rows: list[tuple[str, str]] = []
# Each row: | `CENTRAL_XX` | `central.xx.>` | ... |
row_re = re.compile(r"^\|\s*`([A-Z_]+)`\s*\|\s*`(central\.[a-z_]+\.>)`\s*\|", re.MULTILINE)
for name, subj in row_re.findall(section):
rows.append((name, subj))
return rows
def _per_adapter_subsections(doc: str) -> list[str]:
"""Pull adapter names from the per-adapter section headings: '### <adapter> — ...'.
Only counts subsections inside '## 6. Per-adapter reference'.
"""
section_re = re.compile(
r"^## 6\. Per-adapter reference\s*\n(.*?)(?=^## )",
re.DOTALL | re.MULTILINE,
)
m = section_re.search(doc)
assert m, "doc missing '## 6. Per-adapter reference' section"
section = m.group(1)
heading_re = re.compile(r"^### ([a-z_]+) — ", re.MULTILINE)
return heading_re.findall(section)
def test_doc_exists():
assert DOC_PATH.is_file(), f"doc missing: {DOC_PATH}"
def test_stream_table_matches_registry():
"""Every StreamEntry in streams.py must appear in the doc's stream layout table."""
doc_rows = _stream_layout_rows(_doc_text())
doc_names = {n for n, _ in doc_rows}
doc_filters = {f for _, f in doc_rows}
code_names = {s.name for s in STREAMS}
code_filters = {s.subject_filter for s in STREAMS}
assert doc_names == code_names, (
f"stream-name drift: doc-only={doc_names - code_names}, "
f"code-only={code_names - doc_names}"
)
assert doc_filters == code_filters, (
f"subject-filter drift: doc-only={doc_filters - code_filters}, "
f"code-only={code_filters - doc_filters}"
)
def test_stream_table_name_subject_pairs_consistent():
"""Each (stream_name, subject_filter) pair in the doc must match the registry exactly.
Catches the case where someone swaps the subject filter on one stream
without updating its row.
"""
doc_rows = set(_stream_layout_rows(_doc_text()))
code_rows = {(s.name, s.subject_filter) for s in STREAMS}
assert doc_rows == code_rows, (
f"row drift: doc-only={doc_rows - code_rows}, code-only={code_rows - doc_rows}"
)
def test_every_adapter_has_a_subsection():
"""Every adapter discovered in central.adapters must have a per-adapter doc subsection."""
doc_adapters = set(_per_adapter_subsections(_doc_text()))
code_adapters = set(discover_adapters().keys())
assert doc_adapters == code_adapters, (
f"adapter coverage drift: doc-only={doc_adapters - code_adapters}, "
f"code-only={code_adapters - doc_adapters}"
)
def test_subsections_appear_in_doc_order_matches_registry_size():
"""Sanity: the count of '### <adapter>' headings inside §6 equals the registry size.
Independent count check; catches the case where one heading is duplicated.
"""
doc_adapters = _per_adapter_subsections(_doc_text())
assert len(doc_adapters) == len(set(doc_adapters)), (
f"duplicate per-adapter sections: {[a for a in doc_adapters if doc_adapters.count(a) > 1]}"
)
assert len(doc_adapters) == len(discover_adapters())

252
tests/test_eonet.py Normal file
View file

@ -0,0 +1,252 @@
"""Tests for NASA EONET adapter."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event, Geo
FIXTURE_PATH = Path(__file__).parent / "fixtures" / "eonet_sample.json"
def _fixture_text() -> str:
return FIXTURE_PATH.read_text()
def _fixture_json() -> dict:
return json.loads(_fixture_text())
def _config(settings: dict | None = None) -> AdapterConfig:
return AdapterConfig(
name="eonet",
enabled=True,
cadence_s=1800,
settings=settings or {},
updated_at=datetime.now(timezone.utc),
)
class TestEONETHelpers:
def test_camelcase_subject_conversion(self):
"""Verify the camelCase -> lower_snake_case conversion for every default category id.
Inputs are read from _DEFAULT_CATEGORIES, the single source of truth no
per-test hardcoded list of category strings.
"""
from central.adapters.eonet import _DEFAULT_CATEGORIES, _subject_category
for cat_id in _DEFAULT_CATEGORIES:
subj = _subject_category(cat_id)
assert re.match(r"^[a-z_]+$", subj), f"{cat_id} -> {subj}: must be lower_snake_case"
# Round-trip: removing underscores from the converted form must yield
# the lowercased upstream id. Catches both missed and spurious boundaries.
assert subj.replace("_", "") == cat_id.lower(), f"{cat_id} -> {subj}: round-trip failed"
def test_empty_category_subject(self):
from central.adapters.eonet import EONETAdapter, _subject_category
assert _subject_category(None) == "unknown"
assert _subject_category("") == "unknown"
# Through subject_for: a category with no upstream component yields .unknown.global
adapter = EONETAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="X",
adapter="eonet",
category="disaster.eonet.unknown",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".unknown.global")
def test_dedup_key_includes_latest_geometry_date(self):
from central.adapters.eonet import _dedup_key
date_a = "2026-05-14T11:04:00Z"
date_b = "2026-05-15T00:00:00Z"
event_id = "EONET_TEST_1"
key_a = _dedup_key(event_id, date_a)
assert date_a in key_a
assert event_id in key_a
# Different timeline date -> different dedup key
assert _dedup_key(event_id, date_b) != key_a
class TestEONETSettings:
def test_category_allowlist_default_is_full_set(self):
"""The default allowlist equals _DEFAULT_CATEGORIES — no parallel literal anywhere."""
from central.adapters.eonet import EONETSettings, _DEFAULT_CATEGORIES
assert EONETSettings().category_allowlist == _DEFAULT_CATEGORIES
class TestEONETAdapter:
def test_class_attrs_complete(self):
from central.adapters.eonet import EONETAdapter, EONETSettings
assert EONETAdapter.name == "eonet"
assert isinstance(EONETAdapter.display_name, str) and EONETAdapter.display_name
assert isinstance(EONETAdapter.description, str) and EONETAdapter.description
assert EONETAdapter.settings_schema is EONETSettings
assert EONETAdapter.requires_api_key is None
assert EONETAdapter.api_key_field is None
assert EONETAdapter.wizard_order is None
assert EONETAdapter.default_cadence_s == 1800
@pytest.mark.asyncio
async def test_geometry_singular_key(self, tmp_path: Path):
"""Adapter reads 'geometry' (singular) per upstream divergence from the spec brief."""
from central.adapters.eonet import EONETAdapter
fix = _fixture_json()
# Sanity-check the fixture itself is shaped per upstream:
assert all("geometry" in e for e in fix["events"]), "fixture must use 'geometry' (singular)"
assert all("geometries" not in e for e in fix["events"]), "fixture must not use 'geometries'"
adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# If the adapter were reading 'geometries' instead, centroids would be absent.
assert any(e.geo.centroid is not None for e in events)
@pytest.mark.asyncio
async def test_lonlat_coordinate_order(self, tmp_path: Path):
"""Upstream coordinates [lon, lat] (GeoJSON) map directly to Geo.centroid=(lon, lat)."""
from central.adapters.eonet import EONETAdapter
fix = _fixture_json()
src = next(e for e in fix["events"] if e.get("geometry"))
lon_in, lat_in = src["geometry"][0]["coordinates"]
# Sanity-check orientation of fixture datum so the assertion below isn't trivially passing.
# The first fixture event is in the western/northern hemisphere (Iowa).
assert lon_in < 0, "fixture event 0 should have western-hemisphere lon"
assert 0 < lat_in < 90, "fixture event 0 should have northern lat in (0,90)"
adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
emitted = next(e for e in events if e.id == src["id"])
assert emitted.geo.centroid is not None
out_lon, out_lat = emitted.geo.centroid
assert out_lon == lon_in, "first centroid element must equal upstream lon (no swap)"
assert out_lat == lat_in, "second centroid element must equal upstream lat (no swap)"
@pytest.mark.asyncio
async def test_country_always_global(self, tmp_path: Path):
"""Every emitted event has subject suffix '.global' (no country resolution in v1)."""
from central.adapters.eonet import EONETAdapter
adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert events, "fixture should produce at least one emitted event"
for e in events:
assert adapter.subject_for(e).endswith(".global"), e.category
@pytest.mark.asyncio
async def test_magnitude_value_surfaced(self, tmp_path: Path):
"""magnitudeValue from the most-recent geometry point is surfaced on Event.data."""
from central.adapters.eonet import EONETAdapter
adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
with_mag = [e for e in events if e.data.get("magnitudeValue") is not None]
assert with_mag, "fixture should contain at least one event with magnitudeValue"
for e in with_mag:
assert "magnitudeUnit" in e.data
@pytest.mark.asyncio
async def test_category_allowlist_filters(self, tmp_path: Path):
"""Narrowing category_allowlist drops events outside the allowlist."""
from central.adapters.eonet import EONETAdapter
fix = _fixture_json()
# Pick the first fixture event's category as the sole allowed category.
target = fix["events"][0]["categories"][0]["id"]
adapter = EONETAdapter(
_config({"category_allowlist": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert events, "fixture should include at least one event matching the target category"
for e in events:
assert e.data["category_id"] == target
@pytest.mark.asyncio
async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path):
"""Second poll with identical upstream yields no new events (composite dedup hits)."""
from central.adapters.eonet import EONETAdapter
adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert first_pass
assert second_pass == []
@pytest.mark.asyncio
async def test_fall_off_emits_removed_subject(self, tmp_path: Path):
"""Event in observed_before but absent from this poll -> removal emitted."""
from central.adapters.eonet import EONETAdapter, _subject_category
fix = _fixture_json()
first_event = fix["events"][0]
second_fix = {**fix, "events": fix["events"][1:]}
adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
assert any(e.id == first_event["id"] for e in first_pass)
adapter._fetch = AsyncMock(return_value=json.dumps(second_fix))
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
tombstones = [e for e in second_pass if e.category.endswith(".removed")]
assert len(tombstones) == 1
ts = tombstones[0]
assert ts.id == f"{first_event['id']}:removed"
assert ts.data["reason"] == "missing_from_feed"
# Subject pattern: subtype BEFORE 'removed' per §8 canonical pattern.
# Subscriber filtering on central.disaster.eonet.<cat>.> must match the
# removal subject central.disaster.eonet.<cat>.removed.global.
expected_cat = _subject_category(first_event["categories"][0]["id"])
subj = adapter.subject_for(ts)
assert subj.startswith(f"central.disaster.eonet.{expected_cat}.")
assert ".removed." in subj
assert subj.endswith(".global")

359
tests/test_nwis.py Normal file
View file

@ -0,0 +1,359 @@
"""Tests for USGS NWIS adapter (OGC API)."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event, Geo
FIXTURE_PATH = Path(__file__).parent / "fixtures" / "nwis_latest_sample.json"
def _fixture_text() -> str:
return FIXTURE_PATH.read_text()
def _fixture_json() -> dict:
return json.loads(_fixture_text())
def _config(settings: dict | None = None) -> AdapterConfig:
return AdapterConfig(
name="nwis",
enabled=True,
cadence_s=900,
settings=settings or {},
updated_at=datetime.now(timezone.utc),
)
class TestNWISHelpers:
def test_subject_decomposes_usgs(self):
"""USGS-05420500 -> agency='usgs', bare='05420500'; subject endswith .00060.usgs.05420500."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
agency, bare = _subject_tokens_for_id("USGS-05420500")
assert agency == "usgs"
assert bare == "05420500"
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="USGS-05420500:00060:2026-05-19T15:30:00+00:00",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime(2026, 5, 19, 15, 30, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".00060.usgs.05420500")
def test_subject_decomposes_non_usgs(self):
"""MO005-400105093591601 -> agency='mo005', bare='400105093591601'; subject reflects both."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
agency, bare = _subject_tokens_for_id("MO005-400105093591601")
assert agency == "mo005"
assert bare == "400105093591601"
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="MO005-400105093591601:00060:t",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601")
def test_subject_unprefixed_id_falls_back(self):
"""ID with no dash falls back to agency='unknown'."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
bare_input = "STANDALONE12345"
agency, bare = _subject_tokens_for_id(bare_input)
assert agency == "unknown"
assert bare == bare_input.lower()
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="X",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
subj = adapter.subject_for(event)
assert subj.endswith(f".00060.unknown.{bare}")
def test_dedup_key_composite(self):
"""Same id+param+time -> same key; different time -> different key."""
from central.adapters.nwis import _dedup_key
a = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
b = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
c = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:45:00+00:00")
assert a == b
assert a != c
# All three components are present in the key
assert "USGS-05420500" in a
assert "00060" in a
assert "2026-05-19T15:30:00+00:00" in a
class TestNWISSettings:
def test_parameter_codes_default_is_full_set(self):
"""Default equals _DEFAULT_PARAMETER_CODES — no parallel literal anywhere."""
from central.adapters.nwis import NWISSettings, _DEFAULT_PARAMETER_CODES
assert NWISSettings().parameter_codes == _DEFAULT_PARAMETER_CODES
class TestNWISAdapter:
def test_class_attrs_complete(self):
from central.adapters.nwis import NWISAdapter, NWISSettings
assert NWISAdapter.name == "nwis"
assert isinstance(NWISAdapter.display_name, str) and NWISAdapter.display_name
assert isinstance(NWISAdapter.description, str) and NWISAdapter.description
assert NWISAdapter.settings_schema is NWISSettings
assert NWISAdapter.requires_api_key is None
assert NWISAdapter.api_key_field is None
assert NWISAdapter.wizard_order is None
assert NWISAdapter.default_cadence_s == 900
@pytest.mark.asyncio
async def test_lonlat_coordinate_order(self, tmp_path: Path):
"""Upstream coords [lon, lat] -> Geo.centroid=(lon, lat); no axis swap."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
src = fix["features"][0]
lon_in, lat_in = src["geometry"]["coordinates"]
# Sanity: fixture event 0 is in western/northern hemisphere.
assert lon_in < 0
assert 0 < lat_in < 90
adapter = NWISAdapter(
_config({"parameter_codes": [src["properties"]["parameter_code"]]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
target_site = src["properties"]["monitoring_location_id"]
emitted = next(e for e in events if e.data["monitoring_location_id"] == target_site)
assert emitted.geo.centroid == (lon_in, lat_in)
@pytest.mark.asyncio
async def test_parameter_allowlist_filters(self, tmp_path: Path):
"""parameter_codes setting filters which queries we issue per poll.
With one allowed code, _fetch should be called exactly once
(per page; fixture has no next link one call total).
"""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert events
# All emitted events carry the only allowed parameter_code.
for e in events:
assert e.data["parameter_code"] == target
# Per-page fetch invoked once per parameter_code in the allowlist.
assert adapter._fetch.await_count == 1
@pytest.mark.asyncio
async def test_pagination_follows_next_link(self, tmp_path: Path):
"""Adapter follows OGC 'next' link until absent."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
page1 = {
"type": "FeatureCollection",
"features": fix["features"][:2],
"numberReturned": 2,
"links": [
{"rel": "next", "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?cursor=PAGE2"}
],
}
page2 = {
"type": "FeatureCollection",
"features": fix["features"][2:],
"numberReturned": len(fix["features"]) - 2,
"links": [],
}
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)])
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Both pages drained, fetch called exactly twice.
assert adapter._fetch.await_count == 2
# Events from both pages emitted.
sites = {e.data["monitoring_location_id"] for e in events}
expected_sites = {f["properties"]["monitoring_location_id"] for f in fix["features"]}
assert sites == expected_sites
@pytest.mark.asyncio
async def test_no_removal_pattern(self, tmp_path: Path):
"""Sites missing from a later poll do NOT emit a .removed event in v1."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
# First poll: full fixture.
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
assert first_pass
# Second poll: empty result (every previously-seen site is now missing).
empty_page = {"type": "FeatureCollection", "features": [], "links": []}
adapter._fetch = AsyncMock(return_value=json.dumps(empty_page))
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
# No removal events of any flavor.
assert all(".removed" not in e.category for e in second_pass)
assert second_pass == []
@pytest.mark.asyncio
async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path):
"""Second poll with identical fixture yields no new events (composite dedup hits)."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert first_pass
assert second_pass == []
class TestNWISPreview:
"""Preview hook (PR G.5) — exercised without starting the adapter."""
@pytest.mark.asyncio
async def test_preview_returns_none_without_region(self, tmp_path: Path):
from central.adapters.nwis import NWISAdapter, NWISSettings
adapter = NWISAdapter(_config({}), MagicMock(), tmp_path / "cursors.db")
result = await adapter.preview_for_settings(NWISSettings())
assert result is None
@pytest.mark.asyncio
async def test_preview_returns_rows_with_region(self, tmp_path: Path):
"""Given a bbox, preview returns one dict per monitoring-locations feature
with the contract column order: site_id, name, site_type, state."""
from central.adapters.nwis import NWISAdapter, NWISSettings
sample_response = json.dumps({
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"id": "USGS-05420500",
"geometry": {"type": "Point", "coordinates": [-90.25, 41.78]},
"properties": {
"monitoring_location_name": "MISSISSIPPI RIVER AT CLINTON, IA",
"site_type_code": "ST",
"state_name": "Iowa",
},
},
{
"type": "Feature",
"id": "USGS-05454500",
"geometry": {"type": "Point", "coordinates": [-91.65, 41.66]},
"properties": {
"monitoring_location_name": "IOWA RIVER AT IOWA CITY, IA",
"site_type_code": "ST",
"state_name": "Iowa",
},
},
],
})
adapter = NWISAdapter(
_config({"region": {"west": -94.0, "south": 40.0, "east": -93.0, "north": 41.0}}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch_preview_text = AsyncMock(return_value=sample_response)
settings = NWISSettings(region=adapter.region)
rows = await adapter.preview_for_settings(settings)
assert rows is not None
assert len(rows) == 2
# Column order is part of the contract — first row's keys must match exactly.
expected_keys = ["site_id", "name", "site_type", "state"]
assert list(rows[0].keys()) == expected_keys
# Row content reflects fixture data.
assert rows[0]["site_id"] == "USGS-05420500"
assert rows[0]["name"] == "MISSISSIPPI RIVER AT CLINTON, IA"
assert rows[0]["site_type"] == "ST"
assert rows[0]["state"] == "Iowa"
assert rows[1]["site_id"] == "USGS-05454500"
@pytest.mark.asyncio
async def test_preview_propagates_http_error(self, tmp_path: Path):
"""Preview must not swallow upstream errors — the framework needs them
to render the operator-visible 'Preview unavailable: …' banner."""
from central.adapters.nwis import NWISAdapter, NWISSettings
adapter = NWISAdapter(
_config({"region": {"west": -94.0, "south": 40.0, "east": -93.0, "north": 41.0}}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch_preview_text = AsyncMock(side_effect=RuntimeError("upstream 503"))
settings = NWISSettings(region=adapter.region)
with pytest.raises(RuntimeError, match="upstream 503"):
await adapter.preview_for_settings(settings)

120
tests/test_preview_hook.py Normal file
View file

@ -0,0 +1,120 @@
"""Tests for the SourceAdapter.preview_for_settings hook + framework rendering."""
from collections.abc import AsyncIterator
from pathlib import Path
import jinja2
import pytest
from pydantic import BaseModel
from central.adapter import SourceAdapter
from central.models import Event
TEMPLATES_DIR = Path(__file__).resolve().parents[1] / "src" / "central" / "gui" / "templates"
class _StubSettings(BaseModel):
pass
class _StubAdapter(SourceAdapter):
"""Minimal SourceAdapter subclass that does NOT override preview_for_settings."""
name = "stub"
display_name = "Stub Adapter"
description = "Test fixture"
settings_schema = _StubSettings
default_cadence_s = 60
def __init__(self) -> None:
pass
async def poll(self) -> AsyncIterator[Event]: # pragma: no cover - never invoked
if False:
yield # type: ignore[unreachable]
return
async def apply_config(self, new_config) -> None: # pragma: no cover
pass
def subject_for(self, event: Event) -> str: # pragma: no cover
return "stub"
@pytest.mark.asyncio
async def test_default_returns_none():
"""The base SourceAdapter's preview_for_settings default returns None.
Any adapter that does not override the hook gets a no-op preview, so the
framework renders the page without a preview block (no crash, no opt-out
boilerplate required in each adapter).
"""
adapter = _StubAdapter()
result = await adapter.preview_for_settings(_StubSettings())
assert result is None
def _render_partial(**context) -> str:
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(str(TEMPLATES_DIR)),
autoescape=jinja2.select_autoescape(["html"]),
)
tmpl = env.get_template("_adapter_preview.html")
return tmpl.render(**context)
def test_partial_renders_list():
"""Given list[dict] with insertion-ordered keys, partial renders a table.
Framework is duck-typed: columns come from the first dict's keys. No
adapter-name branches, no per-adapter Jinja.
"""
rows = [
{"a": "x1", "b": "y1"},
{"a": "x2", "b": "y2"},
]
out = _render_partial(preview_rows=rows, preview_error=None)
# Header row uses first dict's keys in order.
assert "<th>a</th>" in out
assert "<th>b</th>" in out
# Body rows carry every value.
for cell in ("x1", "y1", "x2", "y2"):
assert cell in out
# Row count surfaces in the legend.
assert "Preview (2 rows)" in out
# No error banner when preview_error is None.
assert "Preview Unavailable" not in out
def test_partial_renders_error():
"""When preview_error is set, partial renders the error banner and no table."""
out = _render_partial(preview_rows=None, preview_error="Preview unavailable: boom")
assert "Preview unavailable: boom" in out
# No table when an error is set.
assert "<table" not in out
assert "<th>" not in out
def test_partial_renders_nothing_when_both_none():
"""No preview_rows and no preview_error -> partial renders no preview section.
Lets adapters that don't opt in (e.g. EONET, GDACS) render normally without
any preview-related markup on the page.
"""
out = _render_partial(preview_rows=None, preview_error=None).strip()
assert "<table" not in out
assert "Preview Unavailable" not in out
# Either empty or only whitespace/newlines from the template.
assert "Preview (" not in out
def test_partial_renders_empty_list():
"""Empty list -> legend with (0 rows), no table.
Distinct from None (which renders nothing at all). Lets adapters signal
'query ran, matched zero rows' separately from 'preview not meaningful'.
"""
out = _render_partial(preview_rows=[], preview_error=None)
assert "Preview (0 rows)" in out
assert "<table" not in out
assert "<th>" not in out

215
tests/test_producer_doc.py Normal file
View file

@ -0,0 +1,215 @@
"""Consistency tests for docs/PRODUCER-INTEGRATION.md.
The doc is the producer-side contract what an adapter author implements and
the conventions Central enforces around it. These tests catch drift between
the doc and the live code:
- Every overridable SourceAdapter method documented in §4 must exist on
central.adapter.SourceAdapter and vice versa.
- The preview_for_settings contract quoted in §11.1 must come from the
actual SourceAdapter.preview_for_settings docstring.
- The set of top-level domain tokens documented in §6.1 must equal the set
derived from central.streams.STREAMS subject_filter prefixes.
- The verbatim STREAMS snippet quoted in §8 must match the live registry.
Per the doc's own §10.4, NO hardcoded stream / adapter list literals: every
expected value derives from central.streams, central.adapter, or
central.adapter_discovery at runtime.
"""
import inspect
import re
from pathlib import Path
from central.adapter import SourceAdapter
from central.adapter_discovery import discover_adapters
from central.streams import STREAMS
DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md"
def _doc_text() -> str:
assert DOC_PATH.is_file(), f"missing: {DOC_PATH}"
return DOC_PATH.read_text()
def _documented_override_methods(doc: str) -> set[str]:
"""Extract method names documented under '## 4. The SourceAdapter base class'.
Looks for the '**`async def <name>(...)`**' / '**`def <name>(...)`**'
method headings inside §4.
"""
section_re = re.compile(
r"^## 4\. The SourceAdapter base class\s*\n(.*?)(?=^## )",
re.DOTALL | re.MULTILINE,
)
m = section_re.search(doc)
assert m, "doc missing '## 4. The SourceAdapter base class' section"
section = m.group(1)
heading_re = re.compile(r"\*\*`(?:async\s+)?def\s+(\w+)\s*\(", re.MULTILINE)
return set(heading_re.findall(section))
def _sourceadapter_overridable_methods() -> set[str]:
"""Methods on SourceAdapter that an adapter author is expected to implement
or may override. Excludes Python internals (dunder), the constructor, and
private helpers.
"""
methods: set[str] = set()
for name, member in inspect.getmembers(SourceAdapter):
if name.startswith("_"):
continue
if not (inspect.isfunction(member) or inspect.iscoroutinefunction(member)):
continue
methods.add(name)
return methods
def _streams_domains() -> set[str]:
"""Top-level <domain> tokens derived from STREAMS subject filters
(central.<domain>.>).
"""
domain_re = re.compile(r"^central\.([a-z_]+)\.>$")
out: set[str] = set()
for s in STREAMS:
m = domain_re.match(s.subject_filter)
assert m, f"unexpected subject filter shape: {s.subject_filter!r}"
out.add(m.group(1))
return out
def _documented_domains(doc: str) -> set[str]:
"""Domain tokens enumerated in §6.1 as backtick literals (`wx`, `fire`, …)."""
section_re = re.compile(
r"`<domain>` is one of ([^.]+)\.",
re.DOTALL,
)
m = section_re.search(doc)
assert m, "doc missing the '`<domain>` is one of ...' enumeration in §6.1"
enum_text = m.group(1)
return set(re.findall(r"`([a-z_]+)`", enum_text))
def test_doc_exists():
assert DOC_PATH.is_file(), f"doc missing: {DOC_PATH}"
def test_documented_methods_match_sourceadapter_api():
"""Every override-able SourceAdapter method must appear in the §4 contract,
and the doc may not advertise methods that don't exist."""
doc_methods = _documented_override_methods(_doc_text())
code_methods = _sourceadapter_overridable_methods()
assert doc_methods == code_methods, (
f"override-method drift: "
f"doc-only={doc_methods - code_methods}, "
f"code-only={code_methods - doc_methods}"
)
def test_preview_hook_contract_matches_docstring():
"""The contract block quoted in §11.1 must come from the live
SourceAdapter.preview_for_settings docstring.
Normalizes both sides by collapsing whitespace and stripping the doc's
Markdown blockquote prefix (`> `).
"""
doc = _doc_text()
section_re = re.compile(
r"^### 11\.1[^\n]*\n(.*?)(?=^### |^## )",
re.DOTALL | re.MULTILINE,
)
m = section_re.search(doc)
assert m, "doc missing '### 11.1' subsection"
blockquote = "\n".join(
line[2:] if line.startswith("> ") else line.lstrip(">").lstrip()
for line in m.group(1).splitlines()
if line.lstrip().startswith(">")
)
docstring = inspect.getdoc(SourceAdapter.preview_for_settings) or ""
def norm(s: str) -> str:
# Strip markdown backticks; collapse whitespace.
return re.sub(r"\s+", " ", s.replace("`", "")).strip()
norm_block = norm(blockquote)
norm_doc = norm(docstring)
# Bidirectional: every non-empty sentence of the docstring must appear in
# the doc's blockquote, and the blockquote must not introduce new sentences
# the docstring lacks.
sentences = lambda s: {x.strip() for x in re.split(r"(?<=[.:])\s+", s) if x.strip()}
doc_sents = sentences(norm_block)
code_sents = sentences(norm_doc)
assert doc_sents == code_sents, (
f"preview_for_settings contract drift: "
f"doc-only={doc_sents - code_sents}, "
f"code-only={code_sents - doc_sents}"
)
def test_top_level_domains_match_streams_registry():
"""The §6.1 domain enumeration must equal the domain tokens derived from
central.streams.STREAMS bidirectional, no hardcoded list."""
doc_domains = _documented_domains(_doc_text())
code_domains = _streams_domains()
assert doc_domains == code_domains, (
f"domain-token drift: "
f"doc-only={doc_domains - code_domains}, "
f"code-only={code_domains - doc_domains}"
)
def test_streams_snippet_quotes_live_registry():
"""The §8 verbatim STREAMS snippet must agree with central.streams.STREAMS
on (name, subject_filter, event_bearing).
"""
doc = _doc_text()
section_re = re.compile(
r"^## 8\. The StreamEntry registry\s*\n(.*?)(?=^## )",
re.DOTALL | re.MULTILINE,
)
m = section_re.search(doc)
assert m, "doc missing '## 8. The StreamEntry registry' section"
section = m.group(1)
# Each documented entry: StreamEntry("NAME", "central.x.>"[, event_bearing=False])
entry_re = re.compile(
r'StreamEntry\(\s*"([A-Z_]+)"\s*,\s*"(central\.[a-z_]+\.>)"'
r'(?:\s*,\s*event_bearing\s*=\s*(False|True))?\s*\)',
)
doc_rows: set[tuple[str, str, bool]] = set()
for name, subj, eb in entry_re.findall(section):
event_bearing = (eb != "False") # default True if unspecified
doc_rows.add((name, subj, event_bearing))
code_rows = {(s.name, s.subject_filter, s.event_bearing) for s in STREAMS}
assert doc_rows == code_rows, (
f"STREAMS snippet drift: "
f"doc-only={doc_rows - code_rows}, code-only={code_rows - doc_rows}"
)
def test_no_orphan_adapter_references_in_anti_patterns():
"""Anti-patterns section names two real adapter modules as examples
(firms, inciweb in §10.4). Those names must still resolve via
central.adapter_discovery protects against a silent rename leaving
dead example references in the doc.
"""
doc = _doc_text()
section_re = re.compile(
r"^## 10\. Anti-patterns.*?\n(.*?)(?=^## )",
re.DOTALL | re.MULTILINE,
)
m = section_re.search(doc)
assert m, "doc missing '## 10. Anti-patterns' section"
section = m.group(1)
quoted = set(re.findall(r'"([a-z][a-z_]*)"', section))
# Whitelist Python-syntax tokens that incidentally appear in the section;
# everything else in this set is asserted to be a real adapter name.
# Derived from STREAMS per §10.4 — stream names appear quoted as examples
# and would otherwise look like orphan adapter references.
syntax_tokens = {s.name for s in STREAMS}
candidate_adapter_names = quoted - {t.lower() for t in syntax_tokens}
known_adapters = set(discover_adapters().keys())
orphans = {n for n in candidate_adapter_names if n not in known_adapters}
assert not orphans, (
f"anti-patterns section references unknown adapter names: {orphans} "
f"(known adapters: {sorted(known_adapters)})"
)