Compare commits

..

6 commits

Author SHA1 Message Date
3c27534e9e
Merge pull request #44 from zvx-echo6/feature/3-l5-backend-settings-schema
fix(3-L.5): per-backend settings schemas (fixes build_enrichers TypeError)
2026-05-20 17:27:13 -06:00
Matt Johnson
b694fc0c9d fix(3-L.5): per-backend settings schemas (fixes build_enrichers TypeError)
Surfaced during the 2026-05-20 NaviBackend activation: toggling
config.enrichment.backend_class to NoOpBackend while backend_settings still
held {"base_url": ...} crashed _rebuild_enrichers with
`TypeError: NoOpBackend() takes no arguments`, BEFORE invalidate() ran. Fixed
by mirroring the SourceAdapter.settings_schema pattern: each backend declares a
Pydantic settings_schema; validation happens at write-time (GUI POST) and
read-time (supervisor). A mismatch is now a clean ValidationError, never a
constructor TypeError.

Backends — each gets a `<Name>BackendSettings(BaseModel, extra="forbid")` +
`settings_schema` class attr, mirroring __init__ defaults EXACTLY (note:
timeout_s stays 10.0 — the brief's "5.0" was a transcription slip; preserve the
production default):
  NoOpBackend     -> NoOpBackendSettings        (no fields)
  NaviBackend     -> NaviBackendSettings        (base_url, timeout_s, headers, warmup)
  PhotonBackend   -> PhotonBackendSettings       (base_url, timeout_s, headers)
  NominatimBackend-> NominatimBackendSettings   (base_url, user_agent, rate_limit_per_sec, timeout_s)

GeocoderBackend Protocol (in geocoder.py, where the base actually lives — not
base.py, which only has Enricher) gains `settings_schema: type[BaseModel]`.

supervisor:
- build_enrichers validates backend_cls.settings_schema.model_validate(
  backend_settings) before instantiating, and constructs from the validated
  .model_dump(). ValidationError (not TypeError) on mismatch.
- _rebuild_enrichers builds into locals and commits to instance state only on
  success — a ValidationError leaves the previously-active enrichers/config/
  cache untouched.
- _handle_enrichment_change wraps the rebuild in try/except ValidationError:
  logs and returns, keeping the previous backend running (supervisor stays up;
  operator fixes the row; next NOTIFY applies cleanly). No cache invalidation
  on a failed change.

GUI /enrichment:
- GET skips the outer EnrichmentConfig.backend_settings field and renders a
  separate <fieldset> from describe_fields(backend_cls.settings_schema, ...)
  for the row's current backend_class. Backend fields namespaced bs_<name>.
- POST reassembles bs_<name> inputs into a backend_settings dict, validates it
  against the SUBMITTED backend_class's schema (so errors attach to the right
  fields when an operator is mid-switch), then validates the outer
  EnrichmentConfig. DB row written only if both pass; otherwise re-renders with
  field-level errors against the submitted backend.
- backend_class stays a plain text field (no <select>, no client-side reshape).

form_descriptors: generic `float -> "number"` widget (2 lines, mirrors K.5's
`dict -> "json"`), needed because backend schemas have float fields
(timeout_s, rate_limit_per_sec). Benefits any float field codebase-wide.

DB schema unchanged: backend_settings stays JSONB; validation moved to
use-site. _BACKEND_REGISTRY / _ENRICHER_REGISTRY unchanged beyond schema lookup.

Tests (test_backend_settings_schema.py, 11): schemas exist + extra='forbid';
Navi schema preserves defaults (timeout_s == 10.0); NoOp has zero fields;
build_enrichers raises ValidationError-not-TypeError for the exact 2026-05-20
case; supervisor keeps previous backend on a bad NOTIFY (the incident
scenario); valid NoOp-with-empty-settings applies + invalidates; GUI POST
rejects bad backend_settings without writing + re-renders against submitted
backend; GUI POST writes on valid settings. test_enrichment_config_plumbing
updated for the new context shape (outer_fields/backend_fields).

Verification: full pytest 546 passed, 1 skipped (was 535; +11). grep
subject_for_event/_ADAPTER_REGISTRY and grep 100.64.0./192.168.1. in src both
empty.

Does NOT touch PR L scope (events tab, remaining adapter enrichment_locations),
the DB schema, or the registries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 23:10:10 +00:00
bd809846ea
Merge pull request #43 from zvx-echo6/feature/3-k5-enrichment-config-plumbing
feat(3-K.5): operator-settable EnrichmentConfig (config plumbing)
2026-05-20 13:56:54 -06:00
Matt Johnson
04c1d07b3f feat(3-K.5): operator-settable EnrichmentConfig (config plumbing)
Bridge PR for v0.5.0. PR J wired the supervisor with a hardcoded
EnrichmentConfig() default; PR K added real backends to the registry but
left no operator path to select one. K.5 closes that gap by mirroring the
config.adapters storage + LISTEN/NOTIFY hot-reload pattern.

config.enrichment (migration 024): single-row table (id BOOLEAN PK CHECK
(id = true), mirroring config.system). Columns enricher_class, backend_class,
backend_settings JSONB, cache_ttl_s, updated_at. Reuses the existing
config.set_updated_at + config.notify_config_change triggers (the NOTIFY
function's ELSE branch emits 'enrichment:' for this keyless single-row table).
Seeds framework DEFAULTS ONLY — GeocoderEnricher + NoOpBackend, empty
backend_settings, 24h TTL. NO URLs/IPs/auth in the seed; a fresh deploy runs
NoOp out of the box. Idempotent (CREATE IF NOT EXISTS / DROP TRIGGER IF
EXISTS / INSERT ON CONFLICT DO NOTHING).

Supervisor:
- Reads config.enrichment at startup (start() -> config_source
  .get_enrichment_config()), overriding the constructor default.
- Hot-reloads via _on_config_change(table == "enrichment"): re-reads the row,
  rebuilds the enricher set, and invalidates the enrichment cache when the
  enricher/backend/settings changed (a new backend must not keep serving the
  old backend's cached bundles until TTL). TTL-only changes retain the cache.
- build_enrichers now takes an explicit EnrichmentCache (the supervisor owns
  it so it can invalidate); cache no longer built inside build_enrichers.

ConfigStore / ConfigSource: get_enrichment_config() (falls back to defaults if
the row is somehow absent) + upsert_enrichment_config(). Mirrors the adapter
accessors.

cache.py: EnrichmentCache.invalidate(enricher_name=None) — DELETE all or
enricher-scoped; returns rows deleted.

GUI /enrichment: GET renders the EnrichmentConfig form via the generic
describe_fields machinery (no enrichment-specific Jinja); POST validates via
Pydantic, writes config.enrichment, and lets the NOTIFY trigger propagate the
hot-reload. New enrichment.html + a nav link. backend_settings (a dict field)
needed a generic "json" widget in describe_fields + the template — usable by
any dict-typed settings field, not enrichment-specific.

Necessary deviation (surfaced): PR K shipped a deployment-specific default
DEFAULT_BASE_URL = "http://192.168.1.130:8440" in navi.py. Bar (b) forbids
deployer IPs in src, and operator-settable base_url is exactly K.5's purpose,
so the default is changed to http://localhost:8440 (matching Photon/Nominatim
defaults). The live integration smoke (tests/, env-gated, skipped) now reads
the endpoint from NAVI_BASE_URL — no IP anywhere in src.

Tests (test_enrichment_config_plumbing.py, 10): ConfigStore read / default
fallback / upsert-passes-dict; cache invalidate all + scoped; supervisor builds
NaviBackend from config; hot-reload rebuilds + invalidates on backend change;
no-invalidate on TTL-only change; describe_fields json widget; /enrichment GET
render. test_firms updated for the build_enrichers signature change.

Hot-reload mechanism mirrored: Postgres LISTEN/NOTIFY on channel
'config_changed' (payload 'table:key'), same path adapters/streams use; the
supervisor's existing _on_config_change dispatch gains an "enrichment" branch.

Verification: full pytest 535 passed, 1 skipped (was 525; +10). Migration
applied cleanly on the live prod schema; SELECT * FROM config.enrichment
returns the NoOp default row. grep subject_for_event/_ADAPTER_REGISTRY and
grep 100.64.0./192.168.1. in src both empty.

Does NOT activate NaviBackend (ships NoOp default; operator action) and does
NOT declare enrichment_locations on other adapters (PR L scope).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 18:52:22 +00:00
54238093a5
Merge pull request #42 from zvx-echo6/feature/3-k-geocoder-backends
feat(3-K): real geocoder backends + producer-doc reframe + consumer-doc enrichment
2026-05-20 10:24:33 -06:00
Matt Johnson
98b050b2af feat(3-K): real geocoder backends + producer-doc reframe + consumer-doc enrichment
Second of three PRs for v0.5.0 (J shipped the framework; this fills in real
backends + documents the reframed design principle in-tree; L is the events
tab + map fix, then tag).

Backends (all satisfy GeocoderBackend; never raise, all-null on any failure):
- NaviBackend — composed Navi /api/reverse/<lat>/<lon> (name/address + timezone
  + landclass + elevation in one call). Near-passthrough: response already
  matches the canonical 9-field shape. Best-effort warmup ping (Boise) on
  construction when a loop is running; config `headers` slot for a future
  Authorization: Bearer (config-only, no code change). Default base_url
  http://192.168.1.130:8440.
- PhotonBackend — raw Photon /reverse?lat&lon&limit=1 (name/address only).
  Maps features[0].properties; postal_code <- postcode; timezone/landclass/
  elevation_m null (Navi-composed-endpoint extras).
- NominatimBackend — OSM Nominatim /reverse?format=jsonv2 (name/address only).
  Configurable rate limit (default 1/sec; 0 disables for self-hosted) +
  required User-Agent. Maps the address block; landclass/elevation_m/timezone
  null.

Registered all three in supervisor _BACKEND_REGISTRY (resolved by EnrichmentConfig
backend_class name).

Docs — design pivot now in-tree:
- PRODUCER §2 reframed: the verbatim Matt quote stays; the translation inverts.
  Central is the consumer's only data plane (consumers can't do follow-up
  lookups), so enrich deliberately and centrally, namespaced under _enriched,
  failing to null. "No enrichment" is gone.
- PRODUCER §10.1 inverted: enrichment is expected; the anti-pattern is doing it
  OUTSIDE the framework (inline in poll(), bypassing cache + _enriched
  namespacing + the never-raise safety net).
- PRODUCER new §13 Enrichment contract: Enricher / GeocoderEnricher /
  GeocoderBackend Protocols, NoOpBackend default, sqlite cache + TTL +
  cache-all-null + don't-cache-on-raise semantics, _enriched.<name> provenance,
  per-field coverage matrix (cross-checked against GEOCODER_FIELDS), and the
  landclass antimeridian known wrinkle.
- CONSUMER FIRMS section: documents the data._enriched.geocoder bundle (9
  fields), per-region coverage (US-full, non-US timezone+elevation), and the
  antimeridian landclass caveat.

Tests:
- test_navi/photon/nominatim_backend.py — happy-path field mapping, null
  handling, extra-key drop, network/timeout/non-200/malformed -> all-null
  (never raises), Nominatim rate-limit (disabled + spacing) + User-Agent.
  Env-gated live Navi smoke (NAVI_INTEGRATION_TEST=1; skipped by default — the
  192.168.1.130 endpoint isn't reachable from CT104's segment).
- test_producer_doc.py — +4: §2 verbatim quote present, §10.1 subsection exists,
  §13 names all four protocol types, §13 coverage matrix == GEOCODER_FIELDS
  (derived from code, not hardcoded).

Verification: full pytest 525 passed, 1 skipped (was 495; +30 backend +
4 doc tests, -1 the env-gated skip). grep subject_for_event/_ADAPTER_REGISTRY
clean. All three backends import + resolve via the registry.

Flagged for later (NOT done here): adapters besides FIRMS that should declare
enrichment_locations (nwis, eonet, gdacs, usgs_quake, wfigs_*) — that's PR L
scope alongside the events tab. See PR description.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 16:10:44 +00:00
24 changed files with 1865 additions and 50 deletions

View file

@ -296,6 +296,36 @@ ground-survey workflows.
archive is at `https://firms.modaps.eosdis.nasa.gov/`.)
- **Removal semantics:** none. FIRMS publishes detections; absence is the signal
if a fire stops burning. Consumers should not expect explicit "removal" events.
- **Enrichment (`data._enriched.geocoder`):** FIRMS is the enrichment pilot, so
each event carries a Central-derived geocoder bundle under
`data._enriched.geocoder`. It is *not* an upstream FIRMS field — Central
reverse-geocodes the hotspot's `latitude`/`longitude` and attaches the result.
The bundle always has these nine keys (any unresolved field is `null`, never
missing):
| key | meaning |
|---|---|
| `name` | place / feature name |
| `city` | city / town / village |
| `county` | county (or equivalent) |
| `state` | state / province |
| `country` | country |
| `postal_code` | postal / ZIP code |
| `timezone` | IANA tz (e.g. `America/Boise`) |
| `landclass` | land-management class (US PAD-US) |
| `elevation_m` | ground elevation, metres |
**Coverage by region (v0.5.0):** US hotspots get the full bundle (with
sparsity gaps in deep wilderness); non-US hotspots currently get only
`timezone` and `elevation_m` populated (both planet-scale), the rest `null`,
pending an upstream planet expansion. Treat `null` as "not resolved," not
"does not exist."
**Known wrinkle — `landclass` antimeridian false-positive:** a non-US hotspot
near 5153°N can spuriously get a non-`null` `landclass` (it false-matches the
Aleutian "Rat Islands" US land-management polygon across the dateline). If you
consume `landclass`, treat a non-`null` value on a clearly non-US point as
suspect. Fix is tracked upstream.
- **Live example (verbatim from CT104):**
```json

View file

@ -24,6 +24,7 @@ are intentionally not restated. Cross-references point into that doc.
10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do)
11. [Settings preview hook](#11-settings-preview-hook)
12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter)
13. [Enrichment contract](#13-enrichment-contract)
---
@ -49,33 +50,47 @@ concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)).
> what it will."
> — Matt, 2026-05-19
Adapter authors translate that single sentence into a small number of concrete
rules:
The correct reading of that sentence: **Central is the consumer's only data
plane.** A downstream consumer sees exactly what's on the wire and nothing
more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot
reverse-geocode a coordinate on its own. So whatever Central does NOT put on
the wire is, for every consumer, simply missing. "Gives it all" therefore means
*give the consumer everything a reasonable consumer needs to act on the event*
— not "give the upstream payload only."
Adapter authors translate that into a small number of concrete rules:
- **Preserve every upstream field.** Anything the upstream returns lives in
`Event.data` verbatim. Adapters do not silently drop fields, even ones that
look redundant or low-value today.
- **No enrichment.** Adapters do not reverse-geocode, do not call out to
upstream metadata endpoints during normal `poll()` flow, do not consult a
second source to "fill in" a missing field. If a downstream consumer wants
enrichment, that is consumer-side work.
- **No opinionated translation.** Adapters do not coerce units, do not rename
fields to match a Central-wide vocabulary, do not collapse upstream
enumerations into Central's preferred labels.
- **The only adapter-side transforms are mechanical.** Specifically:
subject-token normalization (camelCase → snake_case, agency-prefix splitting,
whitespace → underscore, lowercase) and dedup-key construction. Both are
deterministic functions of upstream identifiers. Nothing else.
look redundant or low-value today (see [§10.2](#102-silent-field-dropping)).
- **Enrich, deliberately and centrally.** Location, timezone, elevation,
landclass and similar context that consumers reliably need should be resolved
once, by Central, and attached — not left for twelve consumers to each
re-derive (most of them can't). Enrichment runs through the framework
([§13](#13-enrichment-contract)): an adapter declares `enrichment_locations`
and the supervisor attaches results under `Event.data["_enriched"]`.
- **Namespace enrichment for provenance.** Central-derived fields live under
`_enriched.<enricher_name>`; everything else in `data` is upstream verbatim.
A consumer can always tell which is which.
- **Fail gracefully to null, never to an exception.** Enrichment that can't
resolve a field returns `null` for it (a stable, documented field set), and a
total enrichment failure returns an all-null bundle. A geocoder outage must
never drop or corrupt the underlying event.
- **No opinionated translation of the upstream payload.** Enrichment *adds*
namespaced fields; it does not rewrite upstream ones. Adapters still do not
coerce units, rename upstream fields, or collapse upstream enumerations inside
`data`. The only in-place adapter transforms remain mechanical: subject-token
normalization (camelCase → snake_case, agency-prefix splitting, whitespace →
underscore, lowercase) and dedup-key construction.
This rules out a whole category of plausible-sounding work that prior reviews
have already rejected. For instance, "enrich NWIS site rows with USGS
monitoring-locations metadata during `poll()`" was proposed for Phase 3 and
killed on this principle. The producer adds the field-preserving pipe; the pipe
ends at JetStream publish; everything richer is a downstream concern.
See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of
anti-patterns. Future authors should reject the same proposals on the same
grounds.
This reframes a Phase 2 rule. The earlier draft of this doc said "no
enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was
rejected on those grounds. That reasoning is now inverted: consumers have no
practical way to do that work, so Central does it. The constraint that survives
is *where* and *how* — through the framework, namespaced, cached, failing to
null — not *whether*. See [§10.1](#101-enrichment-outside-the-framework) for the
remaining anti-pattern (enrichment done outside the framework) and
[§13](#13-enrichment-contract) for the full contract.
---
@ -625,18 +640,30 @@ adapter authors should mirror it. Do not restate it here.
These are the patterns prior reviews have explicitly rejected. Reject them
again on sight in a new-adapter PR.
### 10.1 Enrichment during `poll()`
### 10.1 Enrichment outside the framework
No calls to upstream metadata endpoints, no reverse-geocode, no consultation
of a second source to fill in fields the primary feed omitted. The "NWIS
enrichment" Phase 3 proposal — joining live measurements against the
monitoring-locations metadata endpoint during `poll()` — was rejected on the
[§2](#2-the-design-principle) principle. Future proposals along the same
lines get the same answer.
Enrichment itself is **expected**, not forbidden — see
[§2](#2-the-design-principle) and [§13](#13-enrichment-contract). Any adapter
with location data should opt in by declaring `enrichment_locations` on the
adapter class; the supervisor then runs the registered enrichers and attaches
results under `Event.data["_enriched"]`.
If enrichment is genuinely necessary, the right shape is a separate adapter
(or a downstream consumer) — not an `if metadata_missing: await
fetch_metadata()` branch buried in an adapter's `poll()`.
The anti-pattern is doing enrichment the *wrong* way — outside the framework:
- An `if missing: await fetch_metadata()` branch buried in an adapter's
`poll()`. This bypasses the cache (so every poll re-hits the geocoder), skips
the `_enriched` namespacing (so consumers can't tell upstream from
Central-derived), and gives up the never-raise/all-null safety net (so a
geocoder hiccup can take down the poll).
- Writing enriched fields directly into the top level of `Event.data` instead
of under `_enriched`. That destroys provenance — a consumer can no longer
tell which fields came from the upstream feed and which Central added.
- Standing up a parallel enrichment path (a second HTTP client, a private cache)
inside one adapter instead of registering a backend with the framework.
The rule of thumb: declare `enrichment_locations`, let the supervisor do the
work. If the framework can't express what you need, extend the framework
([§13](#13-enrichment-contract)) — don't route around it inside an adapter.
### 10.2 Silent field dropping
@ -821,3 +848,111 @@ requesting / granting merge.
- [ ] **Full pytest suite green.**
---
## 13. Enrichment contract
Enrichment is how Central adds consumer-needed context (location names,
timezone, elevation, landclass, …) that the upstream feed doesn't carry and a
downstream consumer can't look up itself. It runs in the supervisor, after
dedup and before the CloudEvents wrap, for any adapter that opts in. Results are
namespaced under `Event.data["_enriched"]` so provenance stays explicit:
everything under `_enriched` is Central-derived; everything else in `data` is
upstream verbatim.
### 13.1 Opting an adapter in
Declare `enrichment_locations` on the adapter class — a list of
`(lat_field, lon_field)` tuples naming top-level keys in `Event.data`:
```python
class FIRMSAdapter(SourceAdapter):
enrichment_locations = [("latitude", "longitude")]
```
Empty (the default on `SourceAdapter`) means "no enrichment, publish as-is."
The supervisor uses the first tuple that resolves to a non-null coordinate pair,
runs each registered enricher over `{"lat": …, "lon": …}`, and attaches the
results. No adapter code calls enrichers directly.
### 13.2 The `Enricher` Protocol
An enricher is any object satisfying this Protocol (`central.enrichment.base`):
- `name: str` — short identifier, used as the key under
`Event.data["_enriched"]`.
- `async def enrich(self, location: dict[str, float]) -> dict[str, Any]`
given `{"lat": float, "lon": float}`, return a flat dict of enrichment
fields. Fields it can't resolve are present with value `None` (not omitted).
**Must never raise** — implementations handle their own failures and return
an all-null bundle on total failure.
### 13.3 `GeocoderEnricher` and the `GeocoderBackend` Protocol
`GeocoderEnricher` (`central.enrichment.geocoder`, `name = "geocoder"`) is the
only enricher today. It owns the cache and the canonical field normalization;
the actual reverse-geocode is delegated to a pluggable backend satisfying the
`GeocoderBackend` Protocol:
- `async def reverse(self, lat: float, lon: float) -> dict[str, Any]` — return
the canonical geocoder fields (see [§13.5](#135-per-field-coverage)); fields
the backend can't resolve return `None`. Must never raise.
Backends shipped: `NaviBackend` (composed Navi `/api/reverse/<lat>/<lon>`
endpoint — name/address + timezone + landclass + elevation in one call),
`PhotonBackend` (raw Photon, name/address only), `NominatimBackend` (OSM
Nominatim, name/address only, with a configurable rate limit + `User-Agent`),
and `NoOpBackend` (all-null — the default until an operator configures a real
backend).
### 13.4 Cache + failure semantics
`GeocoderEnricher` is backed by a sqlite cache (`central.enrichment.cache`,
`/var/lib/central/enrichment_cache.db`):
- Key: `(enricher_name, lat_rounded, lon_rounded)`, coordinates rounded to 4
decimal places (~11 m). TTL is per-enricher, default 24h.
- **Cache hit** → return cached bundle, no backend call.
- **Cache miss** → call backend, cache the normalized result (**even an
all-null bundle** — so known-empty coordinates aren't re-hammered), return it.
- **Backend raises** (a violation of the never-raise contract, or an
infrastructure error the backend chose to surface) → return an all-null
bundle and **do not cache** it, so the next event for that coordinate retries.
Enrichment config (`EnrichmentConfig`: `enricher_class`, `backend_class`,
`backend_settings`, `cache_ttl_s`) is read once at supervisor startup. Changing
the enricher set is a restart, not a hot-reload.
### 13.5 Per-field coverage
The canonical geocoder bundle is exactly nine fields. They mirror
`central.enrichment.geocoder.GEOCODER_FIELDS` (the single source of truth — this
table must match it):
| Field | US events | Non-US events today | Non-US after Photon planet expansion |
|---|---|---|---|
| `name` | populated (wilderness sparsity gaps) | null | populated |
| `city` | populated (wilderness sparsity gaps) | null | populated |
| `county` | populated (wilderness sparsity gaps) | null | populated |
| `state` | populated (wilderness sparsity gaps) | null | populated |
| `country` | populated (wilderness sparsity gaps) | null | populated |
| `postal_code` | populated (wilderness sparsity gaps) | null | populated |
| `timezone` | populated | populated (`tz_world` is planet-scale) | populated |
| `landclass` | populated where PAD-US covers | null (PAD-US is US-only) | null |
| `elevation_m` | populated | populated (planet-DEM) | populated |
**Net for v0.5.0:** US events get a rich bundle; non-US events get `timezone` +
`elevation_m` and the rest null. Photon planet expansion is queued on the Navi
side with no firm ETA; when it lands, `NaviBackend` picks it up automatically
with zero Central code changes.
### 13.6 Known wrinkle — landclass antimeridian false-positive
`landclass` is derived from a PostGIS `ST_Intersects` against PAD-US polygons.
Points near 5153°N **outside** the US can spuriously match the Aleutian "Rat
Islands" PAD-US polygon (false matching across the antimeridian), yielding a
non-null `landclass` that doesn't actually apply. This is a Navi-side bug being
worked separately; until it's fixed, treat a non-null `landclass` on a clearly
non-US point as suspect. Documented for consumers in
`CONSUMER-INTEGRATION.md`.
---

View file

@ -0,0 +1,44 @@
-- Migration: 024_add_config_enrichment
-- Adds config.enrichment — the single-row, operator-settable enrichment config
-- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY.
--
-- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)).
-- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty
-- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs,
-- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page
-- after this merges.
--
-- The seed mirrors central.config_models.EnrichmentConfig() defaults.
-- Regenerate via:
-- sudo -u central .venv/bin/python -c \
-- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())"
--
-- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ...
-- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER).
CREATE TABLE IF NOT EXISTS config.enrichment (
id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true),
enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher',
backend_class TEXT NOT NULL DEFAULT 'NoOpBackend',
backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb,
cache_ttl_s INTEGER NOT NULL DEFAULT 86400,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Reuse the existing updated_at trigger function (migration 002).
DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment;
CREATE TRIGGER enrichment_set_updated_at
BEFORE UPDATE ON config.enrichment
FOR EACH ROW
EXECUTE FUNCTION config.set_updated_at();
-- Reuse the existing NOTIFY function (migration 001) so the supervisor's
-- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE
-- branch emits 'enrichment:' (empty key — single-row table has no natural key).
DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment;
CREATE TRIGGER enrichment_notify
AFTER INSERT OR UPDATE OR DELETE ON config.enrichment
FOR EACH ROW EXECUTE FUNCTION config.notify_config_change();
-- Seed the single framework-default row (NoOp; no deployment-specific values).
INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING;

View file

@ -8,7 +8,7 @@ import logging
from collections.abc import Awaitable, Callable
from typing import Protocol, runtime_checkable
from central.config_models import AdapterConfig
from central.config_models import AdapterConfig, EnrichmentConfig
from central.config_store import ConfigStore
logger = logging.getLogger(__name__)
@ -26,6 +26,10 @@ class ConfigSource(Protocol):
"""Get configuration for a specific adapter."""
...
async def get_enrichment_config(self) -> EnrichmentConfig:
"""Get the enrichment configuration."""
...
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
@ -65,6 +69,10 @@ class DbConfigSource:
"""Get a specific adapter from database."""
return await self._store.get_adapter(name)
async def get_enrichment_config(self) -> EnrichmentConfig:
"""Get the enrichment configuration from database."""
return await self._store.get_enrichment_config()
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],

View file

@ -12,7 +12,7 @@ from typing import Any
import asyncpg
from central.config_models import AdapterConfig, StreamConfig
from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig
from central.crypto import decrypt, encrypt
logger = logging.getLogger(__name__)
@ -129,6 +129,48 @@ class ConfigStore:
name,
)
# -------------------------------------------------------------------------
# Enrichment configuration (single-row config.enrichment)
# -------------------------------------------------------------------------
async def get_enrichment_config(self) -> EnrichmentConfig:
"""Read the single config.enrichment row.
Falls back to EnrichmentConfig() framework defaults if the row is
somehow absent (it is migration-seeded, so this is belt-and-suspenders).
"""
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
FROM config.enrichment
WHERE id = true
"""
)
if row is None:
return EnrichmentConfig()
return EnrichmentConfig(**dict(row))
async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None:
"""Write the single config.enrichment row (id = true)."""
async with self._pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO config.enrichment
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
VALUES (true, $1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
enricher_class = EXCLUDED.enricher_class,
backend_class = EXCLUDED.backend_class,
backend_settings = EXCLUDED.backend_settings,
cache_ttl_s = EXCLUDED.cache_ttl_s
""",
config.enricher_class,
config.backend_class,
config.backend_settings, # JSON-encoded by the codec
config.cache_ttl_s,
)
# -------------------------------------------------------------------------
# Stream configuration
# -------------------------------------------------------------------------

View file

@ -1,5 +1,8 @@
"""Geocoder backend implementations."""
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.backends.nominatim import NominatimBackend
from central.enrichment.backends.no_op import NoOpBackend
from central.enrichment.backends.photon import PhotonBackend
__all__ = ["NoOpBackend"]
__all__ = ["NoOpBackend", "NaviBackend", "PhotonBackend", "NominatimBackend"]

View file

@ -0,0 +1,98 @@
"""Navi reverse-geocoding backend.
Hits the composed Navi endpoint `<base_url>/api/reverse/<lat>/<lon>`, which
already returns the canonical 9-field bundle (name, city, county, state,
country, postal_code, timezone, landclass, elevation_m). Navi composes Photon
(name/address) + tz_world (timezone) + PAD-US (landclass) + planet-DEM
(elevation_m), so this backend is a near-passthrough mapping.
Coverage today: US events get a rich bundle; non-US events get timezone +
elevation_m populated (both planet-scale) and the rest null until Navi's
Photon planet expansion lands (no Central change needed when it does).
"""
import asyncio
import logging
from typing import Any
import aiohttp
from pydantic import BaseModel, ConfigDict, Field
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
logger = logging.getLogger(__name__)
# Generic default — operators point this at their Navi instance via the
# /enrichment config page (backend_settings.base_url). No deployment-specific
# host belongs in source.
DEFAULT_BASE_URL = "http://localhost:8440"
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
_WARMUP_LAT = 43.6150
_WARMUP_LON = -116.2023
class NaviBackendSettings(BaseModel):
"""Settings for NaviBackend. Mirrors __init__ defaults exactly."""
model_config = ConfigDict(extra="forbid")
base_url: str = Field(default=DEFAULT_BASE_URL, description="Navi /api/reverse base URL")
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
headers: dict[str, str] | None = Field(
default=None, description="Extra request headers (e.g. Authorization)"
)
warmup: bool = Field(default=True, description="Fire a warmup ping on construction")
class NaviBackend:
"""GeocoderBackend backed by the composed Navi /api/reverse endpoint."""
settings_schema = NaviBackendSettings
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
timeout_s: float = 10.0,
headers: dict[str, str] | None = None,
warmup: bool = True,
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout_s = timeout_s
# Future-proof: drop an Authorization: Bearer … here config-only, no code change.
self._headers = dict(headers or {})
if warmup:
# Fire-and-forget warmup ping; only if a loop is running (it is under
# the supervisor's asyncio.run, not under sync test construction).
try:
loop = asyncio.get_running_loop()
loop.create_task(self._warmup())
except RuntimeError:
pass
def _url(self, lat: float, lon: float) -> str:
return f"{self._base_url}/api/reverse/{lat}/{lon}"
async def _warmup(self) -> None:
try:
await self._fetch(_WARMUP_LAT, _WARMUP_LON)
except Exception:
# Warmup is best-effort; a failure here must not break startup.
logger.debug("NaviBackend warmup ping failed (non-fatal)")
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
) as session:
async with session.get(self._url(lat, lon), headers=self._headers) as resp:
resp.raise_for_status()
return await resp.json()
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
try:
data = await self._fetch(lat, lon)
except Exception:
# Non-200, network error, timeout, malformed JSON — never raise.
logger.debug("NaviBackend reverse failed; returning all-null bundle")
return all_null_bundle()
# Navi's response already matches the canonical shape; map defensively.
return {field: data.get(field) for field in GEOCODER_FIELDS}

View file

@ -7,11 +7,23 @@ which satisfies the GeocoderBackend contract while resolving nothing.
from typing import Any
from pydantic import BaseModel, ConfigDict
from central.enrichment.geocoder import all_null_bundle
class NoOpBackendSettings(BaseModel):
"""No-op backend takes no settings. extra='forbid' makes switching to
NoOpBackend while stale backend_settings (e.g. a base_url) remain a clean
ValidationError instead of a TypeError at construction."""
model_config = ConfigDict(extra="forbid")
class NoOpBackend:
"""GeocoderBackend that resolves no fields."""
settings_schema = NoOpBackendSettings
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
return all_null_bundle()

View file

@ -0,0 +1,113 @@
"""OSM Nominatim reverse-geocoding backend.
Works against public OSM Nominatim (1 req/sec + User-Agent required) or a
self-hosted instance (no limit). Resolves name + address only; timezone,
landclass, and elevation_m are nulled (not in the Nominatim reverse response).
Nominatim jsonv2 reverse response shape:
{"display_name": "...", "name": "...",
"address": {city|town|village, county, state, country, postcode, ...}}
"""
import asyncio
import logging
import time
from typing import Any
from urllib.parse import urlencode
import aiohttp
from pydantic import BaseModel, ConfigDict, Field
from central.enrichment.geocoder import all_null_bundle
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org"
DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)"
class NominatimBackendSettings(BaseModel):
"""Settings for NominatimBackend. Mirrors __init__ defaults exactly."""
model_config = ConfigDict(extra="forbid")
base_url: str = Field(default=DEFAULT_BASE_URL, description="Nominatim /reverse base URL")
user_agent: str = Field(
default=DEFAULT_USER_AGENT, description="User-Agent (public OSM requires one)"
)
rate_limit_per_sec: float = Field(
default=1.0, description="Outbound request cap; 0 disables (self-hosted)"
)
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
class NominatimBackend:
"""GeocoderBackend backed by an OSM Nominatim /reverse endpoint.
rate_limit_per_sec throttles outbound requests (public OSM requires <= 1/s);
set it to 0 to disable for self-hosted instances.
"""
settings_schema = NominatimBackendSettings
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
user_agent: str = DEFAULT_USER_AGENT,
rate_limit_per_sec: float = 1.0,
timeout_s: float = 10.0,
) -> None:
self._base_url = base_url.rstrip("/")
self._user_agent = user_agent
self._min_interval = (1.0 / rate_limit_per_sec) if rate_limit_per_sec > 0 else 0.0
self._timeout_s = timeout_s
self._rl_lock = asyncio.Lock()
self._last_request_at = 0.0
def _url(self, lat: float, lon: float) -> str:
qs = urlencode({"lat": lat, "lon": lon, "format": "jsonv2"})
return f"{self._base_url}/reverse?{qs}"
def _request_headers(self) -> dict[str, str]:
# Public Nominatim rejects requests without an identifying User-Agent.
return {"User-Agent": self._user_agent}
async def _throttle(self) -> None:
if self._min_interval <= 0:
return
async with self._rl_lock:
now = time.monotonic()
wait = self._last_request_at + self._min_interval - now
if wait > 0:
await asyncio.sleep(wait)
self._last_request_at = time.monotonic()
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
) as session:
async with session.get(
self._url(lat, lon), headers=self._request_headers()
) as resp:
resp.raise_for_status()
return await resp.json()
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
try:
await self._throttle()
data = await self._fetch(lat, lon)
addr = data.get("address", {}) or {}
except Exception:
logger.debug("NominatimBackend reverse failed; returning all-null bundle")
return all_null_bundle()
return {
"name": data.get("name") or data.get("display_name"),
"city": addr.get("city") or addr.get("town") or addr.get("village"),
"county": addr.get("county"),
"state": addr.get("state"),
"country": addr.get("country"),
"postal_code": addr.get("postcode"),
"timezone": None, # not in Nominatim reverse response
"landclass": None, # Navi-composed-endpoint only
"elevation_m": None, # Navi-composed-endpoint only
}

View file

@ -0,0 +1,82 @@
"""Raw Photon reverse-geocoding backend.
For deployers who run a Photon instance directly, without the composed
Navi-style endpoint. Photon resolves name + address only timezone,
landclass, and elevation_m are Navi-composed-endpoint extras and are nulled
here.
Photon reverse response shape:
{"features": [{"properties": {name, city, county, state, country,
postcode, ...}, "geometry": {...}}]}
"""
import logging
from typing import Any
from urllib.parse import urlencode
import aiohttp
from pydantic import BaseModel, ConfigDict, Field
from central.enrichment.geocoder import all_null_bundle
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "http://localhost:2322"
class PhotonBackendSettings(BaseModel):
"""Settings for PhotonBackend. Mirrors __init__ defaults exactly."""
model_config = ConfigDict(extra="forbid")
base_url: str = Field(default=DEFAULT_BASE_URL, description="Photon /reverse base URL")
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
headers: dict[str, str] | None = Field(default=None, description="Extra request headers")
class PhotonBackend:
"""GeocoderBackend backed by a raw Photon /reverse endpoint."""
settings_schema = PhotonBackendSettings
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
timeout_s: float = 10.0,
headers: dict[str, str] | None = None,
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout_s = timeout_s
self._headers = dict(headers or {})
def _url(self, lat: float, lon: float) -> str:
qs = urlencode({"lat": lat, "lon": lon, "limit": 1})
return f"{self._base_url}/reverse?{qs}"
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
) as session:
async with session.get(self._url(lat, lon), headers=self._headers) as resp:
resp.raise_for_status()
return await resp.json()
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
try:
data = await self._fetch(lat, lon)
features = data.get("features") or []
props = features[0].get("properties", {}) if features else {}
except Exception:
logger.debug("PhotonBackend reverse failed; returning all-null bundle")
return all_null_bundle()
return {
"name": props.get("name"),
"city": props.get("city"),
"county": props.get("county"),
"state": props.get("state"),
"country": props.get("country"),
"postal_code": props.get("postcode"), # Photon names it 'postcode'
"timezone": None, # not provided by raw Photon
"landclass": None, # Navi-composed-endpoint only
"elevation_m": None, # Navi-composed-endpoint only
}

View file

@ -124,3 +124,27 @@ class EnrichmentCache:
) -> None:
"""Cache a bundle (idempotent upsert on the rounded-coords key)."""
await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload)
def _invalidate_sync(self, enricher_name: str | None) -> int:
conn = self._connect()
try:
if enricher_name is None:
cur = conn.execute("DELETE FROM enrichment_cache")
else:
cur = conn.execute(
"DELETE FROM enrichment_cache WHERE enricher_name = ?",
(enricher_name,),
)
conn.commit()
return cur.rowcount
finally:
conn.close()
async def invalidate(self, enricher_name: str | None = None) -> int:
"""Drop cached bundles. Scoped to one enricher when given, else all.
Called when enrichment config changes a new backend would otherwise
keep returning the previous backend's cached results until TTL expiry.
Returns the number of rows deleted.
"""
return await asyncio.to_thread(self._invalidate_sync, enricher_name)

View file

@ -9,6 +9,8 @@ land in PR K.
import logging
from typing import Any, Protocol, runtime_checkable
from pydantic import BaseModel
from central.enrichment.cache import EnrichmentCache
logger = logging.getLogger(__name__)
@ -38,6 +40,12 @@ def all_null_bundle() -> dict[str, Any]:
class GeocoderBackend(Protocol):
"""The pluggable reverse-geocoding layer beneath GeocoderEnricher."""
# Pydantic model (extra='forbid') describing this backend's accepted
# settings. The supervisor validates config.enrichment.backend_settings
# against it before instantiating, turning a config/settings mismatch into
# a clean ValidationError instead of a constructor TypeError.
settings_schema: type[BaseModel]
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
"""Return canonical geocoder fields (see GEOCODER_FIELDS).

View file

@ -66,6 +66,8 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
return "text", None
if field_type is int:
return "number", None
if field_type is float:
return "number", None
if field_type is bool:
return "checkbox", None
if field_type is RegionConfig:
@ -88,6 +90,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
)
# dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings).
# The form renders the value as JSON; the POST handler parses it back.
if field_type is dict or origin is dict:
return "json", None
# Check if it's a BaseModel subclass (nested model other than RegionConfig)
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
raise NotImplementedError(

View file

@ -1990,6 +1990,199 @@ async def streams_update(
return RedirectResponse(url="/streams", status_code=302)
# =============================================================================
# Enrichment config route
# =============================================================================
def _outer_enrichment_fields(current: dict) -> list[FieldDescriptor]:
"""EnrichmentConfig form fields EXCEPT backend_settings — that one is
rendered as a per-backend <fieldset> via _backend_fields()."""
from central.config_models import EnrichmentConfig
return [
f for f in describe_fields(EnrichmentConfig, current)
if f.name != "backend_settings"
]
def _backend_fields(backend_class: str | None, current_bs: dict) -> list[FieldDescriptor]:
"""Field descriptors for the selected backend's settings_schema, or [] when
the backend class is unknown. Same generic describe_fields machinery."""
from central.supervisor import _BACKEND_REGISTRY
cls = _BACKEND_REGISTRY.get(backend_class or "")
if cls is None:
return []
return describe_fields(cls.settings_schema, current_bs or {})
async def _read_enrichment_row(conn) -> dict:
row = await conn.fetchrow(
"""
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
FROM config.enrichment WHERE id = true
"""
)
return dict(row) if row is not None else {}
def _enrichment_context(request, *, outer_fields, backend_fields, backend_class,
errors=None, form_data=None, backend_form_data=None):
return {
"operator": request.state.operator,
"csrf_token": request.state.csrf_token,
"outer_fields": outer_fields,
"backend_fields": backend_fields,
"backend_class": backend_class,
"errors": errors,
"form_data": form_data,
"backend_form_data": backend_form_data,
}
@router.get("/enrichment", response_class=HTMLResponse)
async def enrichment_form(request: Request) -> HTMLResponse:
"""Render the enrichment config form (outer fields + a per-backend fieldset
for the currently-selected backend_class)."""
templates = _get_templates()
pool = get_pool()
async with pool.acquire() as conn:
current = await _read_enrichment_row(conn)
backend_class = current.get("backend_class") or "NoOpBackend"
current_bs = current.get("backend_settings") or {}
return templates.TemplateResponse(
request=request,
name="enrichment.html",
context=_enrichment_context(
request,
outer_fields=_outer_enrichment_fields(current),
backend_fields=_backend_fields(backend_class, current_bs),
backend_class=backend_class,
),
)
@router.post("/enrichment")
async def enrichment_update(request: Request) -> Response:
"""Validate + persist the enrichment config. Hot-reload picks it up via the
config.enrichment NOTIFY trigger. backend_settings is validated against the
SUBMITTED backend_class's settings_schema."""
from central.config_models import EnrichmentConfig
from central.supervisor import _BACKEND_REGISTRY
templates = _get_templates()
pool = get_pool()
form = await request.form()
if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
raise CsrfValidationError("Invalid CSRF token")
errors: dict[str, str] = {}
form_data: dict[str, Any] = {}
backend_form_data: dict[str, Any] = {}
parsed: dict[str, Any] = {}
# --- outer EnrichmentConfig fields (backend_settings excluded) ---
for field in _outer_enrichment_fields({}):
raw = form.get(field.name, "")
form_data[field.name] = raw
if field.widget == "number":
try:
parsed[field.name] = int(raw) if raw else None
except ValueError:
errors[field.name] = f"{field.label} must be a number"
else: # text
parsed[field.name] = raw.strip() if raw else None
submitted_backend_class = parsed.get("backend_class")
# --- backend settings fieldset, validated against the SUBMITTED backend ---
backend_settings: dict[str, Any] = {}
backend_cls = _BACKEND_REGISTRY.get(submitted_backend_class or "")
if backend_cls is None and submitted_backend_class:
errors["backend_class"] = f"Unknown backend: {submitted_backend_class}"
elif backend_cls is not None:
for f in describe_fields(backend_cls.settings_schema, {}):
formkey = f"bs_{f.name}"
raw = form.get(formkey, "")
backend_form_data[formkey] = raw
if f.widget == "checkbox":
backend_settings[f.name] = formkey in form
elif f.widget == "json":
if raw and raw.strip():
try:
backend_settings[f.name] = json.loads(raw)
except json.JSONDecodeError as e:
errors[formkey] = f"{f.label} is not valid JSON: {e}"
# blank -> omit, schema default applies
else: # text / number — let pydantic coerce, omit blanks for defaults
if raw.strip() != "":
backend_settings[f.name] = raw.strip()
if not errors:
try:
backend_settings = backend_cls.settings_schema.model_validate(
backend_settings
).model_dump()
except ValidationError as e:
for err in e.errors():
loc = err["loc"][0] if err["loc"] else "unknown"
errors[f"bs_{loc}"] = err["msg"]
# --- outer EnrichmentConfig validation ---
if not errors:
try:
validated = EnrichmentConfig(
**{k: v for k, v in parsed.items() if v is not None},
backend_settings=backend_settings,
)
except ValidationError as e:
for err in e.errors():
loc = err["loc"][0] if err["loc"] else "unknown"
errors[str(loc)] = err["msg"]
if errors:
# Re-render against the SUBMITTED backend_class so field errors attach
# to the right schema (operator may be mid-switch with a typo).
return templates.TemplateResponse(
request=request,
name="enrichment.html",
context=_enrichment_context(
request,
outer_fields=_outer_enrichment_fields({}),
backend_fields=_backend_fields(submitted_backend_class, backend_settings),
backend_class=submitted_backend_class,
errors=errors,
form_data=form_data,
backend_form_data=backend_form_data,
),
status_code=200,
)
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO config.enrichment
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
VALUES (true, $1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
enricher_class = EXCLUDED.enricher_class,
backend_class = EXCLUDED.backend_class,
backend_settings = EXCLUDED.backend_settings,
cache_ttl_s = EXCLUDED.cache_ttl_s
""",
validated.enricher_class,
validated.backend_class,
validated.backend_settings, # encoded as jsonb by the pool codec
validated.cache_ttl_s,
)
return RedirectResponse(url="/enrichment", status_code=302)
# Alias validation regex
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')

View file

@ -19,6 +19,7 @@
<li><a href="/adapters">Adapters</a></li>
<li><a href="/events">Events</a></li>
<li><a href="/streams">Streams</a></li>
<li><a href="/enrichment">Enrichment</a></li>
<li><a href="/api-keys">API Keys</a></li>
<li>{{ operator.username }}</li>
<li><a href="/change-password">Change Password</a></li>

View file

@ -0,0 +1,90 @@
{% extends "base.html" %}
{% block title %}Central — Enrichment{% endblock %}
{% block content %}
<h1>Enrichment</h1>
<p class="secondary">
Central-side event enrichment. Results are attached to each event under
<code>data._enriched.&lt;enricher&gt;</code>. Changes hot-reload into the
supervisor; switching backend invalidates the enrichment cache. Backend
settings below are validated against the selected backend; switching
backend then saving re-renders this form with the new backend's fields.
</p>
<form method="post" action="/enrichment">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<fieldset>
<legend>Configuration</legend>
{% for field in outer_fields %}
{% if field.widget == "text" %}
<label for="{{ field.name }}">{{ field.label }}</label>
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
{% if field.required %}required{% endif %}>
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
{% if errors and errors[field.name] %}
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
{% endif %}
{% elif field.widget == "number" %}
<label for="{{ field.name }}">{{ field.label }}</label>
<input type="number" step="any" id="{{ field.name }}" name="{{ field.name }}"
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
{% if field.required %}required{% endif %}>
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
{% if errors and errors[field.name] %}
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
{% endif %}
{% endif %}
{% endfor %}
</fieldset>
<fieldset>
<legend>Backend settings — {{ backend_class }}</legend>
{% if not backend_fields %}
<small>This backend takes no settings.</small>
{% endif %}
{% for field in backend_fields %}
{% set fk = "bs_" ~ field.name %}
{% if field.widget == "text" %}
<label for="{{ fk }}">{{ field.label }}</label>
<input type="text" id="{{ fk }}" name="{{ fk }}"
value="{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else field.current_value or '' }}">
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
{% if errors and errors[fk] %}
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
{% endif %}
{% elif field.widget == "number" %}
<label for="{{ fk }}">{{ field.label }}</label>
<input type="number" step="any" id="{{ fk }}" name="{{ fk }}"
value="{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else field.current_value or '' }}">
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
{% if errors and errors[fk] %}
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
{% endif %}
{% elif field.widget == "checkbox" %}
<label>
<input type="checkbox" name="{{ fk }}"
{% if backend_form_data %}{% if backend_form_data[fk] %}checked{% endif %}{% elif field.current_value %}checked{% endif %}>
{{ field.label }}
</label>
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
{% if errors and errors[fk] %}
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
{% endif %}
{% elif field.widget == "json" %}
<label for="{{ fk }}">{{ field.label }}</label>
<textarea id="{{ fk }}" name="{{ fk }}" rows="4"
placeholder="{}">{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else (field.current_value | tojson if field.current_value else '') }}</textarea>
<small>JSON object{% if field.description %} — {{ field.description }}{% endif %}</small>
{% if errors and errors[fk] %}
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
{% endif %}
{% endif %}
{% endfor %}
</fieldset>
<button type="submit">Save Changes</button>
</form>
{% endblock %}

View file

@ -12,6 +12,7 @@ from typing import Any
import nats
from nats.js import JetStreamContext
from pydantic import ValidationError
from central.adapter import SourceAdapter
from central.adapter_discovery import discover_adapters
@ -24,7 +25,10 @@ from central.api_key_resolver import resolve_api_key_alias
from central.config_models import EnrichmentConfig
from central.enrichment.base import Enricher
from central.enrichment.cache import EnrichmentCache
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.backends.nominatim import NominatimBackend
from central.enrichment.backends.no_op import NoOpBackend
from central.enrichment.backends.photon import PhotonBackend
from central.enrichment.geocoder import GeocoderEnricher
from central.models import Event
from central.stream_manager import StreamManager
@ -33,23 +37,32 @@ CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db")
# Enricher / backend class-name registries for EnrichmentConfig resolution.
# PR J ships GeocoderEnricher + NoOpBackend only; PR K extends these.
_ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher}
_BACKEND_REGISTRY: dict[str, type] = {"NoOpBackend": NoOpBackend}
_BACKEND_REGISTRY: dict[str, type] = {
"NoOpBackend": NoOpBackend,
"NaviBackend": NaviBackend,
"PhotonBackend": PhotonBackend,
"NominatimBackend": NominatimBackend,
}
def build_enrichers(
enrichment_config: EnrichmentConfig,
cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH,
cache: EnrichmentCache,
) -> list[Enricher]:
"""Instantiate the configured enricher(s) with their backend + cache.
"""Instantiate the configured enricher(s) with their backend + the given cache.
Read once at supervisor startup enrichment config is NOT hot-reloaded
in PR J (see EnrichmentConfig docstring).
The supervisor owns the cache (so it can invalidate it on a config change)
and passes it in. Enrichment config is read from config.enrichment at
startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change).
"""
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
backend = backend_cls(**enrichment_config.backend_settings)
cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s)
# Validate backend_settings against the backend's settings_schema BEFORE
# constructing. A mismatch (e.g. a stale base_url left in the row after
# switching to NoOpBackend) raises a clean pydantic ValidationError here
# instead of a TypeError inside the backend constructor.
validated = backend_cls.settings_schema.model_validate(enrichment_config.backend_settings)
backend = backend_cls(**validated.model_dump())
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
return [enricher_cls(backend, cache=cache)]
@ -158,9 +171,15 @@ class Supervisor:
self._config_store = config_store
self._nats_url = nats_url
self._cloudevents_config = cloudevents_config
# Enrichment is read once at startup (no hot-reload in PR J).
# Enrichment: a valid default set is built now so the supervisor is
# never in a half-state; start() then overrides it from the live
# config.enrichment row, and _on_config_change hot-reloads it.
self._active_enrichment_config = enrichment_config or EnrichmentConfig()
self._enrichment_cache = EnrichmentCache(
ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s
)
self._enrichers: list[Enricher] = build_enrichers(
enrichment_config or EnrichmentConfig()
self._active_enrichment_config, self._enrichment_cache
)
self._adapters = discover_adapters()
self._nc: nats.NATS | None = None
@ -666,11 +685,74 @@ class Supervisor:
extra={"stream": stream_config.name, "error": str(e)},
)
def _rebuild_enrichers(self, config: EnrichmentConfig) -> None:
"""Rebuild the active enricher set + cache from an EnrichmentConfig.
Builds into locals first and commits to instance state only on success,
so a ValidationError (bad backend_settings) leaves the previously-active
enrichers/config/cache untouched and propagates to the caller.
"""
cache = EnrichmentCache(ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s)
enrichers = build_enrichers(config, cache) # may raise ValidationError
self._active_enrichment_config = config
self._enrichment_cache = cache
self._enrichers = enrichers
async def _handle_enrichment_change(self) -> None:
"""Re-read config.enrichment and rebuild enrichers. Invalidate the cache
when the backend changed, so stale results from the previous backend
don't survive until TTL expiry.
Invalid backend_settings (ValidationError) leave the previous backend
running the supervisor stays up; the operator fixes the row and the
next NOTIFY brings it in cleanly.
"""
new_config = await self._config_source.get_enrichment_config()
old_config = self._active_enrichment_config
backend_changed = (
new_config.backend_class != old_config.backend_class
or new_config.backend_settings != old_config.backend_settings
or new_config.enricher_class != old_config.enricher_class
)
try:
self._rebuild_enrichers(new_config)
except ValidationError as e:
logger.error(
"Enrichment config invalid; keeping previous backend",
extra={
"enricher_class": new_config.enricher_class,
"backend_class": new_config.backend_class,
"backend_settings": new_config.backend_settings,
"errors": e.errors(),
},
)
return
if backend_changed:
deleted = await self._enrichment_cache.invalidate()
logger.info(
"Enrichment backend changed; cache invalidated",
extra={
"enricher_class": new_config.enricher_class,
"backend_class": new_config.backend_class,
"rows_deleted": deleted,
},
)
else:
logger.info(
"Enrichment config changed (cache retained)",
extra={"cache_ttl_s": new_config.cache_ttl_s},
)
async def _on_config_change(self, table: str, key: str) -> None:
"""Handle a configuration change notification.
Called when NOTIFY fires for config changes.
"""
# Handle enrichment config changes (single-row config.enrichment)
if table == "enrichment":
await self._handle_enrichment_change()
return
# Handle stream changes
if table == "streams":
stream_name = key
@ -762,6 +844,18 @@ class Supervisor:
# Ensure streams exist with correct configuration
await self._ensure_streams()
# Load the operator-set enrichment config (overrides the constructor
# default); hot-reloaded thereafter via _on_config_change.
enrichment_config = await self._config_source.get_enrichment_config()
self._rebuild_enrichers(enrichment_config)
logger.info(
"Enrichment configured",
extra={
"enricher_class": enrichment_config.enricher_class,
"backend_class": enrichment_config.backend_class,
},
)
# Load and start enabled adapters
enabled_adapters = await self._config_source.list_enabled_adapters()
for config in enabled_adapters:

View file

@ -0,0 +1,235 @@
"""Tests for per-backend settings schemas (PR L.5).
Each GeocoderBackend declares a Pydantic settings_schema (extra='forbid').
build_enrichers validates backend_settings against it BEFORE constructing, so
a config/settings mismatch is a clean ValidationError, not a TypeError. The
supervisor's hot-reload keeps the previous backend running on ValidationError;
the GUI POST re-renders with field errors and does not write the DB row.
Regression guard for the 2026-05-20 incident: switching backend_class to
NoOpBackend while backend_settings still held {"base_url": ...} crashed
_rebuild_enrichers with `TypeError: NoOpBackend() takes no arguments`.
"""
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pydantic import BaseModel, ValidationError
from central.config_models import EnrichmentConfig
from central.enrichment.cache import EnrichmentCache
from central.supervisor import _BACKEND_REGISTRY, build_enrichers
# --- schemas exist + extra='forbid' -----------------------------------------
def test_every_backend_declares_a_settings_schema():
for name, cls in _BACKEND_REGISTRY.items():
schema = getattr(cls, "settings_schema", None)
assert schema is not None, f"{name} has no settings_schema"
assert isinstance(schema, type) and issubclass(schema, BaseModel), name
def test_every_settings_schema_forbids_extra():
for name, cls in _BACKEND_REGISTRY.items():
with pytest.raises(ValidationError):
cls.settings_schema.model_validate({"definitely_not_a_field": 1})
def test_navi_schema_accepts_valid_kwargs_and_preserves_defaults():
from central.enrichment.backends.navi import NaviBackendSettings
m = NaviBackendSettings()
assert m.base_url == "http://localhost:8440"
assert m.timeout_s == 10.0 # preserved from __init__, NOT 5.0
assert m.headers is None
assert m.warmup is True
m2 = NaviBackendSettings(base_url="http://navi:8440", timeout_s=3.0, warmup=False)
assert m2.base_url == "http://navi:8440" and m2.timeout_s == 3.0
def test_noop_schema_has_no_fields():
from central.enrichment.backends.no_op import NoOpBackendSettings
assert list(NoOpBackendSettings.model_fields) == []
# --- build_enrichers validation ----------------------------------------------
def _cache(tmp_path) -> EnrichmentCache:
return EnrichmentCache(tmp_path / "c.db")
def test_build_enrichers_raises_validation_error_not_typeerror(tmp_path):
"""The exact 2026-05-20 bug: NoOpBackend + stale base_url. Must be a clean
ValidationError, never a TypeError from the constructor."""
cfg = EnrichmentConfig(backend_class="NoOpBackend", backend_settings={"base_url": "http://x"})
with pytest.raises(ValidationError):
build_enrichers(cfg, _cache(tmp_path))
def test_build_enrichers_navi_valid_settings(tmp_path):
from central.enrichment.backends.navi import NaviBackend
cfg = EnrichmentConfig(
backend_class="NaviBackend",
backend_settings={"base_url": "http://navi:8440", "warmup": False},
)
enrichers = build_enrichers(cfg, _cache(tmp_path))
assert isinstance(enrichers[0]._backend, NaviBackend)
def test_build_enrichers_navi_unknown_setting_rejected(tmp_path):
cfg = EnrichmentConfig(
backend_class="NaviBackend",
backend_settings={"base_url": "http://navi:8440", "bogus": 1},
)
with pytest.raises(ValidationError):
build_enrichers(cfg, _cache(tmp_path))
# --- supervisor hot-reload keeps previous backend on ValidationError ---------
def _supervisor():
from central import supervisor as sup_mod
config_source = MagicMock()
config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig())
return sup_mod.Supervisor(
config_source=config_source,
config_store=MagicMock(),
nats_url="nats://localhost:4222",
)
@pytest.mark.asyncio
async def test_handle_enrichment_change_keeps_previous_on_validation_error():
"""Navi active, then a bad NOTIFY (NoOp + leftover base_url) arrives. The
supervisor must keep NaviBackend running and not crash."""
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.backends.no_op import NoOpBackend
with patch("central.supervisor.EnrichmentCache") as cache_cls:
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
sup = _supervisor()
# Establish a valid NaviBackend active state.
sup._rebuild_enrichers(EnrichmentConfig(
backend_class="NaviBackend",
backend_settings={"base_url": "http://navi:8440", "warmup": False},
))
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
active_before = sup._active_enrichment_config
# Bad config arrives via NOTIFY: NoOp + stale base_url -> ValidationError.
sup._config_source.get_enrichment_config = AsyncMock(
return_value=EnrichmentConfig(
backend_class="NoOpBackend",
backend_settings={"base_url": "http://navi:8440"},
)
)
await sup._handle_enrichment_change() # must NOT raise
# Previous backend still active; config unchanged; cache NOT invalidated.
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
assert sup._active_enrichment_config is active_before
cache_cls.return_value.invalidate.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_enrichment_change_applies_valid_noop_with_empty_settings():
"""Switching to NoOp with empty backend_settings (the correct way) applies
cleanly and invalidates the cache."""
from central.enrichment.backends.no_op import NoOpBackend
with patch("central.supervisor.EnrichmentCache") as cache_cls:
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=3))
sup = _supervisor()
sup._rebuild_enrichers(EnrichmentConfig(
backend_class="NaviBackend",
backend_settings={"base_url": "http://navi:8440", "warmup": False},
))
sup._config_source.get_enrichment_config = AsyncMock(
return_value=EnrichmentConfig(backend_class="NoOpBackend", backend_settings={})
)
await sup._handle_enrichment_change()
assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
cache_cls.return_value.invalidate.assert_awaited()
# --- GUI POST validates backend_settings + does not write on error ----------
def _mock_pool(conn):
pool = MagicMock()
cm = MagicMock()
cm.__aenter__ = AsyncMock(return_value=conn)
cm.__aexit__ = AsyncMock(return_value=None)
pool.acquire = MagicMock(return_value=cm)
return pool
@pytest.mark.asyncio
async def test_gui_post_rejects_bad_backend_settings_without_writing():
"""POST with a bad backend setting (timeout_s non-numeric) re-renders with a
field error keyed bs_timeout_s and does NOT execute the DB upsert."""
from central.gui.routes import enrichment_update
request = MagicMock()
request.state.operator = MagicMock(username="op")
request.state.csrf_token = "tok"
form = {
"csrf_token": "tok",
"enricher_class": "GeocoderEnricher",
"backend_class": "NaviBackend",
"cache_ttl_s": "86400",
"bs_base_url": "http://navi:8440",
"bs_timeout_s": "not-a-number",
}
request.form = AsyncMock(return_value=form)
conn = MagicMock()
conn.execute = AsyncMock()
conn.fetchrow = AsyncMock(return_value={})
templates = MagicMock()
templates.TemplateResponse.return_value = MagicMock()
with patch("central.gui.routes._get_templates", return_value=templates), \
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
await enrichment_update(request)
conn.execute.assert_not_awaited() # no DB write on validation failure
ctx = templates.TemplateResponse.call_args.kwargs["context"]
assert "bs_timeout_s" in ctx["errors"]
assert ctx["backend_class"] == "NaviBackend" # re-rendered against SUBMITTED backend
@pytest.mark.asyncio
async def test_gui_post_writes_on_valid_settings():
from central.gui.routes import enrichment_update
request = MagicMock()
request.state.operator = MagicMock(username="op")
request.state.csrf_token = "tok"
form = {
"csrf_token": "tok",
"enricher_class": "GeocoderEnricher",
"backend_class": "NaviBackend",
"cache_ttl_s": "86400",
"bs_base_url": "http://navi:8440",
"bs_timeout_s": "10",
}
request.form = AsyncMock(return_value=form)
conn = MagicMock()
conn.execute = AsyncMock()
templates = MagicMock()
with patch("central.gui.routes._get_templates", return_value=templates), \
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
resp = await enrichment_update(request)
conn.execute.assert_awaited() # DB upsert ran
# the backend_settings written are the validated/dumped dict
args = conn.execute.call_args.args
assert "INSERT INTO config.enrichment" in args[0]
assert resp.status_code == 302

View file

@ -0,0 +1,213 @@
"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5).
Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload
rebuild, cache invalidation on backend change (but not on TTL-only change),
EnrichmentCache.invalidate, the generic json widget for backend_settings, and
the /enrichment GUI render. No real DB / NATS pool, config_source, and the
EnrichmentCache class are mocked.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.config_models import EnrichmentConfig
from central.enrichment.cache import EnrichmentCache
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.backends.no_op import NoOpBackend
from central.gui.form_descriptors import describe_fields
# --- mock pool/conn helpers -------------------------------------------------
def _mock_pool(conn: MagicMock) -> MagicMock:
pool = MagicMock()
acquire_cm = MagicMock()
acquire_cm.__aenter__ = AsyncMock(return_value=conn)
acquire_cm.__aexit__ = AsyncMock(return_value=None)
pool.acquire = MagicMock(return_value=acquire_cm)
return pool
# --- ConfigStore --------------------------------------------------------------
@pytest.mark.asyncio
async def test_config_store_reads_enrichment_row():
from central.config_store import ConfigStore
conn = MagicMock()
conn.fetchrow = AsyncMock(return_value={
"enricher_class": "GeocoderEnricher",
"backend_class": "NaviBackend",
"backend_settings": {"base_url": "http://example.test:8440"},
"cache_ttl_s": 3600,
})
store = ConfigStore(_mock_pool(conn))
cfg = await store.get_enrichment_config()
assert isinstance(cfg, EnrichmentConfig)
assert cfg.backend_class == "NaviBackend"
assert cfg.backend_settings == {"base_url": "http://example.test:8440"}
assert cfg.cache_ttl_s == 3600
@pytest.mark.asyncio
async def test_config_store_falls_back_to_defaults_when_row_absent():
from central.config_store import ConfigStore
conn = MagicMock()
conn.fetchrow = AsyncMock(return_value=None)
store = ConfigStore(_mock_pool(conn))
cfg = await store.get_enrichment_config()
assert cfg == EnrichmentConfig() # framework defaults
assert cfg.backend_class == "NoOpBackend"
@pytest.mark.asyncio
async def test_config_store_upsert_passes_dict_settings():
from central.config_store import ConfigStore
conn = MagicMock()
conn.execute = AsyncMock()
store = ConfigStore(_mock_pool(conn))
cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"})
await store.upsert_enrichment_config(cfg)
args = conn.execute.call_args.args
assert "INSERT INTO config.enrichment" in args[0]
# backend_settings passed as a dict (pool codec encodes to jsonb), not a str.
assert {"base_url": "x"} in args
# --- EnrichmentCache.invalidate ----------------------------------------------
@pytest.mark.asyncio
async def test_cache_invalidate_all(tmp_path):
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
await cache.set("geocoder", 3.0, 4.0, {"name": "y"})
deleted = await cache.invalidate()
assert deleted == 2
assert await cache.get("geocoder", 1.0, 2.0) is None
@pytest.mark.asyncio
async def test_cache_invalidate_scoped_to_enricher(tmp_path):
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
await cache.set("other", 1.0, 2.0, {"name": "z"})
deleted = await cache.invalidate("geocoder")
assert deleted == 1
assert await cache.get("geocoder", 1.0, 2.0) is None
assert await cache.get("other", 1.0, 2.0) == {"name": "z"}
# --- Supervisor startup read + hot-reload ------------------------------------
def _supervisor_with(enrichment_cfg: EnrichmentConfig):
"""Build a Supervisor with mocked deps and a mocked EnrichmentCache class
(so no real /var/lib cache file is touched)."""
from central import supervisor as sup_mod
config_source = MagicMock()
config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg)
config_store = MagicMock()
sup = sup_mod.Supervisor(
config_source=config_source,
config_store=config_store,
nats_url="nats://localhost:4222",
)
return sup
@pytest.mark.asyncio
async def test_supervisor_builds_navi_from_config():
"""Given a config naming NaviBackend, the supervisor's enricher set wraps a
NaviBackend proves the registry resolution end-to-end."""
with patch("central.supervisor.EnrichmentCache") as cache_cls:
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
sup = _supervisor_with(
EnrichmentConfig(backend_class="NaviBackend",
backend_settings={"base_url": "http://x:8440", "warmup": False})
)
cfg = await sup._config_source.get_enrichment_config()
sup._rebuild_enrichers(cfg)
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
@pytest.mark.asyncio
async def test_hot_reload_rebuilds_and_invalidates_on_backend_change():
from central import supervisor as sup_mod
with patch("central.supervisor.EnrichmentCache") as cache_cls:
invalidate = AsyncMock(return_value=5)
cache_cls.return_value = MagicMock(invalidate=invalidate)
# Start at NoOp.
sup = _supervisor_with(EnrichmentConfig())
sup._rebuild_enrichers(EnrichmentConfig())
assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
# Config flips to Navi.
sup._config_source.get_enrichment_config = AsyncMock(
return_value=EnrichmentConfig(
backend_class="NaviBackend",
backend_settings={"base_url": "http://x:8440", "warmup": False},
)
)
await sup._handle_enrichment_change()
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
invalidate.assert_awaited() # backend changed -> cache wiped
@pytest.mark.asyncio
async def test_hot_reload_does_not_invalidate_on_ttl_only_change():
with patch("central.supervisor.EnrichmentCache") as cache_cls:
invalidate = AsyncMock(return_value=0)
cache_cls.return_value = MagicMock(invalidate=invalidate)
sup = _supervisor_with(EnrichmentConfig())
sup._rebuild_enrichers(EnrichmentConfig())
# Same backend, only TTL changes.
sup._config_source.get_enrichment_config = AsyncMock(
return_value=EnrichmentConfig(cache_ttl_s=3600)
)
await sup._handle_enrichment_change()
invalidate.assert_not_awaited()
# --- generic json widget + GUI render ----------------------------------------
def test_describe_fields_renders_dict_as_json_widget():
fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})}
assert fields["backend_settings"] == "json"
assert fields["enricher_class"] == "text"
assert fields["cache_ttl_s"] == "number"
@pytest.mark.asyncio
async def test_enrichment_form_renders():
from central.gui.routes import enrichment_form
request = MagicMock()
request.state.operator = MagicMock(username="op")
request.state.csrf_token = "tok"
conn = MagicMock()
conn.fetchrow = AsyncMock(return_value={
"enricher_class": "GeocoderEnricher",
"backend_class": "NoOpBackend",
"backend_settings": {},
"cache_ttl_s": 86400,
})
templates = MagicMock()
templates.TemplateResponse.return_value = MagicMock()
with patch("central.gui.routes._get_templates", return_value=templates), \
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
await enrichment_form(request)
ctx = templates.TemplateResponse.call_args.kwargs["context"]
# PR L.5: outer fields exclude backend_settings (now a per-backend fieldset);
# NoOpBackend's fieldset has zero fields.
outer = {f.name for f in ctx["outer_fields"]}
assert "backend_settings" not in outer
assert "backend_class" in outer
assert ctx["backend_class"] == "NoOpBackend"
assert ctx["backend_fields"] == []
assert ctx["csrf_token"] == "tok"

View file

@ -456,6 +456,7 @@ class TestEnrichmentIntegration:
"""A FIRMS event run through the supervisor's enrichment stage emerges
with data._enriched.geocoder populated (all-null under NoOpBackend)."""
from central.config_models import EnrichmentConfig
from central.enrichment.cache import EnrichmentCache
from central.enrichment.geocoder import all_null_bundle
from central.supervisor import apply_enrichment, build_enrichers
@ -469,9 +470,8 @@ class TestEnrichmentIntegration:
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
assert "_enriched" not in event.data
enrichers = build_enrichers(
EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db"
)
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
enrichers = build_enrichers(EnrichmentConfig(), cache)
await apply_enrichment(event, adapter.enrichment_locations, enrichers)
assert "_enriched" in event.data

125
tests/test_navi_backend.py Normal file
View file

@ -0,0 +1,125 @@
"""Tests for NaviBackend (composed Navi /api/reverse endpoint).
HTTP is exercised via patching the backend's `_fetch` (the codebase has no
aioresponses/respx dep); URL construction is asserted on the pure `_url`
helper. An env-gated integration smoke against the live Navi endpoint is
skipped by default.
"""
import os
from unittest.mock import AsyncMock
import pytest
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
# Full Navi response — already canonical shape.
_NAVI_OK = {
"name": "Where you are",
"city": "Boise",
"county": "Ada",
"state": "Idaho",
"country": "United States",
"postal_code": "83702",
"timezone": "America/Boise",
"landclass": "Public — National Forest",
"elevation_m": 824,
}
def _backend() -> NaviBackend:
# warmup=False so construction issues no background task in tests.
return NaviBackend(base_url="http://navi.test:8440", warmup=False)
def test_url_construction():
b = _backend()
assert b._url(43.615, -116.2023) == "http://navi.test:8440/api/reverse/43.615/-116.2023"
def test_base_url_trailing_slash_stripped():
b = NaviBackend(base_url="http://navi.test:8440/", warmup=False)
assert b._url(1.0, 2.0) == "http://navi.test:8440/api/reverse/1.0/2.0"
@pytest.mark.asyncio
async def test_happy_path_passthrough():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_NAVI_OK))
result = await b.reverse(43.615, -116.2023)
assert result == _NAVI_OK
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_partial_nulls_preserved():
"""Navi 200-with-nulls (non-US: timezone + elevation, rest null)."""
partial = {**all_null_bundle(), "timezone": "Europe/Paris", "elevation_m": 35}
b = _backend()
b._fetch = AsyncMock(return_value=partial)
result = await b.reverse(48.85, 2.35)
assert result["timezone"] == "Europe/Paris"
assert result["elevation_m"] == 35
assert result["city"] is None
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_extra_keys_dropped():
b = _backend()
b._fetch = AsyncMock(return_value={**_NAVI_OK, "debug_internal": "leak"})
result = await b.reverse(1.0, 2.0)
assert "debug_internal" not in result
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_network_error_returns_all_null_never_raises():
b = _backend()
b._fetch = AsyncMock(side_effect=ConnectionError("boom"))
result = await b.reverse(1.0, 2.0)
assert result == all_null_bundle()
@pytest.mark.asyncio
async def test_timeout_returns_all_null():
import asyncio
b = _backend()
b._fetch = AsyncMock(side_effect=asyncio.TimeoutError())
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_malformed_response_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ValueError("not json"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_headers_passed_through_config():
b = NaviBackend(base_url="http://navi.test", headers={"Authorization": "Bearer x"}, warmup=False)
assert b._headers == {"Authorization": "Bearer x"}
@pytest.mark.asyncio
@pytest.mark.skipif(
os.environ.get("NAVI_INTEGRATION_TEST") != "1",
reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint",
)
async def test_live_navi_boise():
"""Integration smoke against the real endpoint (default skipped).
The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific
address lives in source; defaults to localhost when unset.
"""
base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440")
b = NaviBackend(base_url=base_url, warmup=False)
result = await b.reverse(43.6150, -116.2023)
assert result["name"] == "Where you are"
assert result["city"] == "Boise"
assert result["state"] == "Idaho"
assert result["elevation_m"] is not None
assert abs(float(result["elevation_m"]) - 824) < 50

View file

@ -0,0 +1,118 @@
"""Tests for NominatimBackend (OSM Nominatim /reverse jsonv2)."""
from unittest.mock import AsyncMock, patch
import pytest
from central.enrichment.backends.nominatim import (
DEFAULT_USER_AGENT,
NominatimBackend,
)
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
_NOMINATIM_OK = {
"name": "Boise",
"display_name": "Boise, Ada County, Idaho, United States",
"address": {
"city": "Boise",
"county": "Ada County",
"state": "Idaho",
"country": "United States",
"postcode": "83702",
},
}
def _backend(**kw) -> NominatimBackend:
kw.setdefault("base_url", "http://nominatim.test")
kw.setdefault("rate_limit_per_sec", 0) # disabled by default in tests
return NominatimBackend(**kw)
def test_url_and_format():
b = _backend()
url = b._url(43.6, -116.2)
assert url.startswith("http://nominatim.test/reverse?")
assert "format=jsonv2" in url and "lat=43.6" in url and "lon=-116.2" in url
def test_user_agent_header_present():
b = _backend()
assert b._request_headers()["User-Agent"] == DEFAULT_USER_AGENT
def test_custom_user_agent():
b = _backend(user_agent="myapp/1.0 (me@example.com)")
assert b._request_headers()["User-Agent"] == "myapp/1.0 (me@example.com)"
@pytest.mark.asyncio
async def test_happy_path_maps_address_block():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK))
result = await b.reverse(43.6, -116.2)
assert set(result.keys()) == set(GEOCODER_FIELDS)
assert result["name"] == "Boise"
assert result["city"] == "Boise"
assert result["county"] == "Ada County"
assert result["state"] == "Idaho"
assert result["country"] == "United States"
assert result["postal_code"] == "83702"
@pytest.mark.asyncio
async def test_navi_only_fields_null():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK))
result = await b.reverse(43.6, -116.2)
assert result["timezone"] is None
assert result["landclass"] is None
assert result["elevation_m"] is None
@pytest.mark.asyncio
async def test_city_falls_back_to_town_then_village():
b = _backend()
b._fetch = AsyncMock(return_value={"address": {"village": "Tinytown"}})
result = await b.reverse(1.0, 2.0)
assert result["city"] == "Tinytown"
@pytest.mark.asyncio
async def test_network_error_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ConnectionError("down"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_malformed_json_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ValueError("bad"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_rate_limit_disabled_does_not_sleep():
b = _backend(rate_limit_per_sec=0)
with patch("central.enrichment.backends.nominatim.asyncio.sleep") as slp:
await b._throttle()
await b._throttle()
slp.assert_not_called()
@pytest.mark.asyncio
async def test_rate_limit_spaces_consecutive_requests():
"""With a 1/sec limit, the second back-to-back call must sleep a positive
interval. Mock monotonic to a fixed instant so the throttle computes a wait."""
b = _backend(rate_limit_per_sec=1.0)
sleeps: list[float] = []
async def _fake_sleep(d):
sleeps.append(d)
with patch("central.enrichment.backends.nominatim.time.monotonic", return_value=100.0), \
patch("central.enrichment.backends.nominatim.asyncio.sleep", side_effect=_fake_sleep):
await b._throttle() # first: last_request_at was 0 -> no wait
await b._throttle() # second: now==100, last==100 -> wait ~1.0s
assert any(d > 0 for d in sleeps), f"expected a positive throttle sleep, got {sleeps}"

View file

@ -0,0 +1,92 @@
"""Tests for PhotonBackend (raw Photon /reverse)."""
from unittest.mock import AsyncMock
import pytest
from central.enrichment.backends.photon import PhotonBackend
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
# Photon reverse response shape.
_PHOTON_OK = {
"features": [
{
"properties": {
"name": "Boise",
"city": "Boise",
"county": "Ada County",
"state": "Idaho",
"country": "United States",
"postcode": "83702",
"osm_key": "place",
},
"geometry": {"type": "Point", "coordinates": [-116.2, 43.6]},
}
]
}
def _backend() -> PhotonBackend:
return PhotonBackend(base_url="http://photon.test:2322")
def test_url_construction():
b = _backend()
url = b._url(43.6, -116.2)
assert url.startswith("http://photon.test:2322/reverse?")
assert "lat=43.6" in url and "lon=-116.2" in url and "limit=1" in url
@pytest.mark.asyncio
async def test_happy_path_maps_to_canonical():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_PHOTON_OK))
result = await b.reverse(43.6, -116.2)
assert set(result.keys()) == set(GEOCODER_FIELDS)
assert result["city"] == "Boise"
assert result["county"] == "Ada County"
assert result["state"] == "Idaho"
assert result["country"] == "United States"
assert result["postal_code"] == "83702" # mapped from Photon 'postcode'
@pytest.mark.asyncio
async def test_navi_only_fields_are_null():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_PHOTON_OK))
result = await b.reverse(43.6, -116.2)
assert result["timezone"] is None
assert result["landclass"] is None
assert result["elevation_m"] is None
@pytest.mark.asyncio
async def test_empty_features_returns_all_null():
b = _backend()
b._fetch = AsyncMock(return_value={"features": []})
assert await b.reverse(0.0, 0.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_missing_properties_keys_become_null():
b = _backend()
b._fetch = AsyncMock(return_value={"features": [{"properties": {"name": "X"}}]})
result = await b.reverse(1.0, 2.0)
assert result["name"] == "X"
assert result["city"] is None
assert result["postal_code"] is None
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_network_error_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ConnectionError("down"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_malformed_json_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ValueError("bad json"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()

View file

@ -23,8 +23,14 @@ from pathlib import Path
from central.adapter import SourceAdapter
from central.adapter_discovery import discover_adapters
from central.enrichment.geocoder import GEOCODER_FIELDS
from central.streams import STREAMS
# The verbatim design-principle quote that must stay in §2 (Matt, 2026-05-19).
_DESIGN_PRINCIPLE_QUOTE = (
"Central takes it all and gives it all. It's up to the pipe to do with it"
)
DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md"
@ -186,6 +192,48 @@ def test_streams_snippet_quotes_live_registry():
)
def _section(doc: str, header_re: str) -> str:
"""Return the body of the section whose header matches header_re, up to the
next same-or-higher-level header."""
m = re.search(header_re + r"\s*\n(.*?)(?=^## |\Z)", doc, re.DOTALL | re.MULTILINE)
assert m, f"doc missing section matching {header_re!r}"
return m.group(1)
def test_design_principle_quote_present_in_section_2():
"""§2 must still carry the verbatim Matt quote — the reframe changes the
translation beneath it, not the quote itself."""
section = _section(_doc_text(), r"^## 2\. The design principle")
assert _DESIGN_PRINCIPLE_QUOTE in section, "verbatim design-principle quote missing from §2"
def test_anti_pattern_10_1_section_exists():
"""§10.1 must still exist as a subsection (content reframed to
'enrichment outside the framework', structure preserved)."""
doc = _doc_text()
assert re.search(r"^### 10\.1 ", doc, re.MULTILINE), "doc missing '### 10.1' subsection"
def test_enrichment_contract_section_13_has_all_protocol_references():
"""New §13 must name all four enrichment contract types verbatim."""
section = _section(_doc_text(), r"^## 13\. Enrichment contract")
for ref in ("Enricher", "GeocoderEnricher", "GeocoderBackend", "NoOpBackend"):
assert ref in section, f"§13 missing reference to {ref!r}"
def test_enrichment_coverage_matrix_lists_exactly_geocoder_fields():
"""The §13 per-field coverage matrix must list exactly the canonical
GEOCODER_FIELDS derived from code, not hardcoded here."""
section = _section(_doc_text(), r"^## 13\. Enrichment contract")
# Matrix rows look like: | `field_name` | ... |
row_fields = set(re.findall(r"^\|\s*`([a-z_]+)`\s*\|", section, re.MULTILINE))
assert row_fields == set(GEOCODER_FIELDS), (
f"coverage-matrix field drift: "
f"doc-only={row_fields - set(GEOCODER_FIELDS)}, "
f"code-only={set(GEOCODER_FIELDS) - row_fields}"
)
def test_no_orphan_adapter_references_in_anti_patterns():
"""Anti-patterns section names two real adapter modules as examples
(firms, inciweb in §10.4). Those names must still resolve via