diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index 70036b8..009741a 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -296,6 +296,36 @@ ground-survey workflows. archive is at `https://firms.modaps.eosdis.nasa.gov/`.) - **Removal semantics:** none. FIRMS publishes detections; absence is the signal if a fire stops burning. Consumers should not expect explicit "removal" events. +- **Enrichment (`data._enriched.geocoder`):** FIRMS is the enrichment pilot, so + each event carries a Central-derived geocoder bundle under + `data._enriched.geocoder`. It is *not* an upstream FIRMS field — Central + reverse-geocodes the hotspot's `latitude`/`longitude` and attaches the result. + The bundle always has these nine keys (any unresolved field is `null`, never + missing): + + | key | meaning | + |---|---| + | `name` | place / feature name | + | `city` | city / town / village | + | `county` | county (or equivalent) | + | `state` | state / province | + | `country` | country | + | `postal_code` | postal / ZIP code | + | `timezone` | IANA tz (e.g. `America/Boise`) | + | `landclass` | land-management class (US PAD-US) | + | `elevation_m` | ground elevation, metres | + + **Coverage by region (v0.5.0):** US hotspots get the full bundle (with + sparsity gaps in deep wilderness); non-US hotspots currently get only + `timezone` and `elevation_m` populated (both planet-scale), the rest `null`, + pending an upstream planet expansion. Treat `null` as "not resolved," not + "does not exist." + + **Known wrinkle — `landclass` antimeridian false-positive:** a non-US hotspot + near 51–53°N can spuriously get a non-`null` `landclass` (it false-matches the + Aleutian "Rat Islands" US land-management polygon across the dateline). If you + consume `landclass`, treat a non-`null` value on a clearly non-US point as + suspect. Fix is tracked upstream. - **Live example (verbatim from CT104):** ```json diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index 02ce1bf..542c466 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -24,6 +24,7 @@ are intentionally not restated. Cross-references point into that doc. 10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do) 11. [Settings preview hook](#11-settings-preview-hook) 12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter) +13. [Enrichment contract](#13-enrichment-contract) --- @@ -49,33 +50,47 @@ concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)). > what it will." > — Matt, 2026-05-19 -Adapter authors translate that single sentence into a small number of concrete -rules: +The correct reading of that sentence: **Central is the consumer's only data +plane.** A downstream consumer sees exactly what's on the wire and nothing +more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot +reverse-geocode a coordinate on its own. So whatever Central does NOT put on +the wire is, for every consumer, simply missing. "Gives it all" therefore means +*give the consumer everything a reasonable consumer needs to act on the event* +— not "give the upstream payload only." + +Adapter authors translate that into a small number of concrete rules: - **Preserve every upstream field.** Anything the upstream returns lives in `Event.data` verbatim. Adapters do not silently drop fields, even ones that - look redundant or low-value today. -- **No enrichment.** Adapters do not reverse-geocode, do not call out to - upstream metadata endpoints during normal `poll()` flow, do not consult a - second source to "fill in" a missing field. If a downstream consumer wants - enrichment, that is consumer-side work. -- **No opinionated translation.** Adapters do not coerce units, do not rename - fields to match a Central-wide vocabulary, do not collapse upstream - enumerations into Central's preferred labels. -- **The only adapter-side transforms are mechanical.** Specifically: - subject-token normalization (camelCase → snake_case, agency-prefix splitting, - whitespace → underscore, lowercase) and dedup-key construction. Both are - deterministic functions of upstream identifiers. Nothing else. + look redundant or low-value today (see [§10.2](#102-silent-field-dropping)). +- **Enrich, deliberately and centrally.** Location, timezone, elevation, + landclass and similar context that consumers reliably need should be resolved + once, by Central, and attached — not left for twelve consumers to each + re-derive (most of them can't). Enrichment runs through the framework + ([§13](#13-enrichment-contract)): an adapter declares `enrichment_locations` + and the supervisor attaches results under `Event.data["_enriched"]`. +- **Namespace enrichment for provenance.** Central-derived fields live under + `_enriched.`; everything else in `data` is upstream verbatim. + A consumer can always tell which is which. +- **Fail gracefully to null, never to an exception.** Enrichment that can't + resolve a field returns `null` for it (a stable, documented field set), and a + total enrichment failure returns an all-null bundle. A geocoder outage must + never drop or corrupt the underlying event. +- **No opinionated translation of the upstream payload.** Enrichment *adds* + namespaced fields; it does not rewrite upstream ones. Adapters still do not + coerce units, rename upstream fields, or collapse upstream enumerations inside + `data`. The only in-place adapter transforms remain mechanical: subject-token + normalization (camelCase → snake_case, agency-prefix splitting, whitespace → + underscore, lowercase) and dedup-key construction. -This rules out a whole category of plausible-sounding work that prior reviews -have already rejected. For instance, "enrich NWIS site rows with USGS -monitoring-locations metadata during `poll()`" was proposed for Phase 3 and -killed on this principle. The producer adds the field-preserving pipe; the pipe -ends at JetStream publish; everything richer is a downstream concern. - -See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of -anti-patterns. Future authors should reject the same proposals on the same -grounds. +This reframes a Phase 2 rule. The earlier draft of this doc said "no +enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was +rejected on those grounds. That reasoning is now inverted: consumers have no +practical way to do that work, so Central does it. The constraint that survives +is *where* and *how* — through the framework, namespaced, cached, failing to +null — not *whether*. See [§10.1](#101-enrichment-outside-the-framework) for the +remaining anti-pattern (enrichment done outside the framework) and +[§13](#13-enrichment-contract) for the full contract. --- @@ -625,18 +640,30 @@ adapter authors should mirror it. Do not restate it here. These are the patterns prior reviews have explicitly rejected. Reject them again on sight in a new-adapter PR. -### 10.1 Enrichment during `poll()` +### 10.1 Enrichment outside the framework -No calls to upstream metadata endpoints, no reverse-geocode, no consultation -of a second source to fill in fields the primary feed omitted. The "NWIS -enrichment" Phase 3 proposal — joining live measurements against the -monitoring-locations metadata endpoint during `poll()` — was rejected on the -[§2](#2-the-design-principle) principle. Future proposals along the same -lines get the same answer. +Enrichment itself is **expected**, not forbidden — see +[§2](#2-the-design-principle) and [§13](#13-enrichment-contract). Any adapter +with location data should opt in by declaring `enrichment_locations` on the +adapter class; the supervisor then runs the registered enrichers and attaches +results under `Event.data["_enriched"]`. -If enrichment is genuinely necessary, the right shape is a separate adapter -(or a downstream consumer) — not an `if metadata_missing: await -fetch_metadata()` branch buried in an adapter's `poll()`. +The anti-pattern is doing enrichment the *wrong* way — outside the framework: + +- An `if missing: await fetch_metadata()` branch buried in an adapter's + `poll()`. This bypasses the cache (so every poll re-hits the geocoder), skips + the `_enriched` namespacing (so consumers can't tell upstream from + Central-derived), and gives up the never-raise/all-null safety net (so a + geocoder hiccup can take down the poll). +- Writing enriched fields directly into the top level of `Event.data` instead + of under `_enriched`. That destroys provenance — a consumer can no longer + tell which fields came from the upstream feed and which Central added. +- Standing up a parallel enrichment path (a second HTTP client, a private cache) + inside one adapter instead of registering a backend with the framework. + +The rule of thumb: declare `enrichment_locations`, let the supervisor do the +work. If the framework can't express what you need, extend the framework +([§13](#13-enrichment-contract)) — don't route around it inside an adapter. ### 10.2 Silent field dropping @@ -821,3 +848,111 @@ requesting / granting merge. - [ ] **Full pytest suite green.** --- + +## 13. Enrichment contract + +Enrichment is how Central adds consumer-needed context (location names, +timezone, elevation, landclass, …) that the upstream feed doesn't carry and a +downstream consumer can't look up itself. It runs in the supervisor, after +dedup and before the CloudEvents wrap, for any adapter that opts in. Results are +namespaced under `Event.data["_enriched"]` so provenance stays explicit: +everything under `_enriched` is Central-derived; everything else in `data` is +upstream verbatim. + +### 13.1 Opting an adapter in + +Declare `enrichment_locations` on the adapter class — a list of +`(lat_field, lon_field)` tuples naming top-level keys in `Event.data`: + +```python +class FIRMSAdapter(SourceAdapter): + enrichment_locations = [("latitude", "longitude")] +``` + +Empty (the default on `SourceAdapter`) means "no enrichment, publish as-is." +The supervisor uses the first tuple that resolves to a non-null coordinate pair, +runs each registered enricher over `{"lat": …, "lon": …}`, and attaches the +results. No adapter code calls enrichers directly. + +### 13.2 The `Enricher` Protocol + +An enricher is any object satisfying this Protocol (`central.enrichment.base`): + +- `name: str` — short identifier, used as the key under + `Event.data["_enriched"]`. +- `async def enrich(self, location: dict[str, float]) -> dict[str, Any]` — + given `{"lat": float, "lon": float}`, return a flat dict of enrichment + fields. Fields it can't resolve are present with value `None` (not omitted). + **Must never raise** — implementations handle their own failures and return + an all-null bundle on total failure. + +### 13.3 `GeocoderEnricher` and the `GeocoderBackend` Protocol + +`GeocoderEnricher` (`central.enrichment.geocoder`, `name = "geocoder"`) is the +only enricher today. It owns the cache and the canonical field normalization; +the actual reverse-geocode is delegated to a pluggable backend satisfying the +`GeocoderBackend` Protocol: + +- `async def reverse(self, lat: float, lon: float) -> dict[str, Any]` — return + the canonical geocoder fields (see [§13.5](#135-per-field-coverage)); fields + the backend can't resolve return `None`. Must never raise. + +Backends shipped: `NaviBackend` (composed Navi `/api/reverse//` +endpoint — name/address + timezone + landclass + elevation in one call), +`PhotonBackend` (raw Photon, name/address only), `NominatimBackend` (OSM +Nominatim, name/address only, with a configurable rate limit + `User-Agent`), +and `NoOpBackend` (all-null — the default until an operator configures a real +backend). + +### 13.4 Cache + failure semantics + +`GeocoderEnricher` is backed by a sqlite cache (`central.enrichment.cache`, +`/var/lib/central/enrichment_cache.db`): + +- Key: `(enricher_name, lat_rounded, lon_rounded)`, coordinates rounded to 4 + decimal places (~11 m). TTL is per-enricher, default 24h. +- **Cache hit** → return cached bundle, no backend call. +- **Cache miss** → call backend, cache the normalized result (**even an + all-null bundle** — so known-empty coordinates aren't re-hammered), return it. +- **Backend raises** (a violation of the never-raise contract, or an + infrastructure error the backend chose to surface) → return an all-null + bundle and **do not cache** it, so the next event for that coordinate retries. + +Enrichment config (`EnrichmentConfig`: `enricher_class`, `backend_class`, +`backend_settings`, `cache_ttl_s`) is read once at supervisor startup. Changing +the enricher set is a restart, not a hot-reload. + +### 13.5 Per-field coverage + +The canonical geocoder bundle is exactly nine fields. They mirror +`central.enrichment.geocoder.GEOCODER_FIELDS` (the single source of truth — this +table must match it): + +| Field | US events | Non-US events today | Non-US after Photon planet expansion | +|---|---|---|---| +| `name` | populated (wilderness sparsity gaps) | null | populated | +| `city` | populated (wilderness sparsity gaps) | null | populated | +| `county` | populated (wilderness sparsity gaps) | null | populated | +| `state` | populated (wilderness sparsity gaps) | null | populated | +| `country` | populated (wilderness sparsity gaps) | null | populated | +| `postal_code` | populated (wilderness sparsity gaps) | null | populated | +| `timezone` | populated | populated (`tz_world` is planet-scale) | populated | +| `landclass` | populated where PAD-US covers | null (PAD-US is US-only) | null | +| `elevation_m` | populated | populated (planet-DEM) | populated | + +**Net for v0.5.0:** US events get a rich bundle; non-US events get `timezone` + +`elevation_m` and the rest null. Photon planet expansion is queued on the Navi +side with no firm ETA; when it lands, `NaviBackend` picks it up automatically +with zero Central code changes. + +### 13.6 Known wrinkle — landclass antimeridian false-positive + +`landclass` is derived from a PostGIS `ST_Intersects` against PAD-US polygons. +Points near 51–53°N **outside** the US can spuriously match the Aleutian "Rat +Islands" PAD-US polygon (false matching across the antimeridian), yielding a +non-null `landclass` that doesn't actually apply. This is a Navi-side bug being +worked separately; until it's fixed, treat a non-null `landclass` on a clearly +non-US point as suspect. Documented for consumers in +`CONSUMER-INTEGRATION.md`. + +--- diff --git a/sql/migrations/024_add_config_enrichment.sql b/sql/migrations/024_add_config_enrichment.sql new file mode 100644 index 0000000..236c1a2 --- /dev/null +++ b/sql/migrations/024_add_config_enrichment.sql @@ -0,0 +1,44 @@ +-- Migration: 024_add_config_enrichment +-- Adds config.enrichment — the single-row, operator-settable enrichment config +-- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY. +-- +-- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)). +-- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty +-- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs, +-- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page +-- after this merges. +-- +-- The seed mirrors central.config_models.EnrichmentConfig() defaults. +-- Regenerate via: +-- sudo -u central .venv/bin/python -c \ +-- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())" +-- +-- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ... +-- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER). + +CREATE TABLE IF NOT EXISTS config.enrichment ( + id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true), + enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher', + backend_class TEXT NOT NULL DEFAULT 'NoOpBackend', + backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb, + cache_ttl_s INTEGER NOT NULL DEFAULT 86400, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Reuse the existing updated_at trigger function (migration 002). +DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment; +CREATE TRIGGER enrichment_set_updated_at + BEFORE UPDATE ON config.enrichment + FOR EACH ROW + EXECUTE FUNCTION config.set_updated_at(); + +-- Reuse the existing NOTIFY function (migration 001) so the supervisor's +-- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE +-- branch emits 'enrichment:' (empty key — single-row table has no natural key). +DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment; +CREATE TRIGGER enrichment_notify + AFTER INSERT OR UPDATE OR DELETE ON config.enrichment + FOR EACH ROW EXECUTE FUNCTION config.notify_config_change(); + +-- Seed the single framework-default row (NoOp; no deployment-specific values). +INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING; diff --git a/src/central/config_source.py b/src/central/config_source.py index b70ddf1..04331f3 100644 --- a/src/central/config_source.py +++ b/src/central/config_source.py @@ -8,7 +8,7 @@ import logging from collections.abc import Awaitable, Callable from typing import Protocol, runtime_checkable -from central.config_models import AdapterConfig +from central.config_models import AdapterConfig, EnrichmentConfig from central.config_store import ConfigStore logger = logging.getLogger(__name__) @@ -26,6 +26,10 @@ class ConfigSource(Protocol): """Get configuration for a specific adapter.""" ... + async def get_enrichment_config(self) -> EnrichmentConfig: + """Get the enrichment configuration.""" + ... + async def watch_for_changes( self, callback: Callable[[str, str], Awaitable[None] | None], @@ -65,6 +69,10 @@ class DbConfigSource: """Get a specific adapter from database.""" return await self._store.get_adapter(name) + async def get_enrichment_config(self) -> EnrichmentConfig: + """Get the enrichment configuration from database.""" + return await self._store.get_enrichment_config() + async def watch_for_changes( self, callback: Callable[[str, str], Awaitable[None] | None], diff --git a/src/central/config_store.py b/src/central/config_store.py index 826f899..0d51658 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -12,7 +12,7 @@ from typing import Any import asyncpg -from central.config_models import AdapterConfig, StreamConfig +from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig from central.crypto import decrypt, encrypt logger = logging.getLogger(__name__) @@ -129,6 +129,48 @@ class ConfigStore: name, ) + # ------------------------------------------------------------------------- + # Enrichment configuration (single-row config.enrichment) + # ------------------------------------------------------------------------- + + async def get_enrichment_config(self) -> EnrichmentConfig: + """Read the single config.enrichment row. + + Falls back to EnrichmentConfig() framework defaults if the row is + somehow absent (it is migration-seeded, so this is belt-and-suspenders). + """ + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT enricher_class, backend_class, backend_settings, cache_ttl_s + FROM config.enrichment + WHERE id = true + """ + ) + if row is None: + return EnrichmentConfig() + return EnrichmentConfig(**dict(row)) + + async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None: + """Write the single config.enrichment row (id = true).""" + async with self._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO config.enrichment + (id, enricher_class, backend_class, backend_settings, cache_ttl_s) + VALUES (true, $1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE SET + enricher_class = EXCLUDED.enricher_class, + backend_class = EXCLUDED.backend_class, + backend_settings = EXCLUDED.backend_settings, + cache_ttl_s = EXCLUDED.cache_ttl_s + """, + config.enricher_class, + config.backend_class, + config.backend_settings, # JSON-encoded by the codec + config.cache_ttl_s, + ) + # ------------------------------------------------------------------------- # Stream configuration # ------------------------------------------------------------------------- diff --git a/src/central/enrichment/backends/__init__.py b/src/central/enrichment/backends/__init__.py index 309c273..7efd2f5 100644 --- a/src/central/enrichment/backends/__init__.py +++ b/src/central/enrichment/backends/__init__.py @@ -1,5 +1,8 @@ """Geocoder backend implementations.""" +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.backends.nominatim import NominatimBackend from central.enrichment.backends.no_op import NoOpBackend +from central.enrichment.backends.photon import PhotonBackend -__all__ = ["NoOpBackend"] +__all__ = ["NoOpBackend", "NaviBackend", "PhotonBackend", "NominatimBackend"] diff --git a/src/central/enrichment/backends/navi.py b/src/central/enrichment/backends/navi.py new file mode 100644 index 0000000..eaec11d --- /dev/null +++ b/src/central/enrichment/backends/navi.py @@ -0,0 +1,98 @@ +"""Navi reverse-geocoding backend. + +Hits the composed Navi endpoint `/api/reverse//`, which +already returns the canonical 9-field bundle (name, city, county, state, +country, postal_code, timezone, landclass, elevation_m). Navi composes Photon +(name/address) + tz_world (timezone) + PAD-US (landclass) + planet-DEM +(elevation_m), so this backend is a near-passthrough mapping. + +Coverage today: US events get a rich bundle; non-US events get timezone + +elevation_m populated (both planet-scale) and the rest null until Navi's +Photon planet expansion lands (no Central change needed when it does). +""" + +import asyncio +import logging +from typing import Any + +import aiohttp +from pydantic import BaseModel, ConfigDict, Field + +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +logger = logging.getLogger(__name__) + +# Generic default — operators point this at their Navi instance via the +# /enrichment config page (backend_settings.base_url). No deployment-specific +# host belongs in source. +DEFAULT_BASE_URL = "http://localhost:8440" +# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup. +_WARMUP_LAT = 43.6150 +_WARMUP_LON = -116.2023 + + +class NaviBackendSettings(BaseModel): + """Settings for NaviBackend. Mirrors __init__ defaults exactly.""" + + model_config = ConfigDict(extra="forbid") + + base_url: str = Field(default=DEFAULT_BASE_URL, description="Navi /api/reverse base URL") + timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds") + headers: dict[str, str] | None = Field( + default=None, description="Extra request headers (e.g. Authorization)" + ) + warmup: bool = Field(default=True, description="Fire a warmup ping on construction") + + +class NaviBackend: + """GeocoderBackend backed by the composed Navi /api/reverse endpoint.""" + + settings_schema = NaviBackendSettings + + def __init__( + self, + base_url: str = DEFAULT_BASE_URL, + timeout_s: float = 10.0, + headers: dict[str, str] | None = None, + warmup: bool = True, + ) -> None: + self._base_url = base_url.rstrip("/") + self._timeout_s = timeout_s + # Future-proof: drop an Authorization: Bearer … here config-only, no code change. + self._headers = dict(headers or {}) + if warmup: + # Fire-and-forget warmup ping; only if a loop is running (it is under + # the supervisor's asyncio.run, not under sync test construction). + try: + loop = asyncio.get_running_loop() + loop.create_task(self._warmup()) + except RuntimeError: + pass + + def _url(self, lat: float, lon: float) -> str: + return f"{self._base_url}/api/reverse/{lat}/{lon}" + + async def _warmup(self) -> None: + try: + await self._fetch(_WARMUP_LAT, _WARMUP_LON) + except Exception: + # Warmup is best-effort; a failure here must not break startup. + logger.debug("NaviBackend warmup ping failed (non-fatal)") + + async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self._timeout_s), + ) as session: + async with session.get(self._url(lat, lon), headers=self._headers) as resp: + resp.raise_for_status() + return await resp.json() + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + try: + data = await self._fetch(lat, lon) + except Exception: + # Non-200, network error, timeout, malformed JSON — never raise. + logger.debug("NaviBackend reverse failed; returning all-null bundle") + return all_null_bundle() + # Navi's response already matches the canonical shape; map defensively. + return {field: data.get(field) for field in GEOCODER_FIELDS} diff --git a/src/central/enrichment/backends/no_op.py b/src/central/enrichment/backends/no_op.py index 77b0567..3b29f94 100644 --- a/src/central/enrichment/backends/no_op.py +++ b/src/central/enrichment/backends/no_op.py @@ -7,11 +7,23 @@ which satisfies the GeocoderBackend contract while resolving nothing. from typing import Any +from pydantic import BaseModel, ConfigDict + from central.enrichment.geocoder import all_null_bundle +class NoOpBackendSettings(BaseModel): + """No-op backend takes no settings. extra='forbid' makes switching to + NoOpBackend while stale backend_settings (e.g. a base_url) remain a clean + ValidationError instead of a TypeError at construction.""" + + model_config = ConfigDict(extra="forbid") + + class NoOpBackend: """GeocoderBackend that resolves no fields.""" + settings_schema = NoOpBackendSettings + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: return all_null_bundle() diff --git a/src/central/enrichment/backends/nominatim.py b/src/central/enrichment/backends/nominatim.py new file mode 100644 index 0000000..b2023b5 --- /dev/null +++ b/src/central/enrichment/backends/nominatim.py @@ -0,0 +1,113 @@ +"""OSM Nominatim reverse-geocoding backend. + +Works against public OSM Nominatim (1 req/sec + User-Agent required) or a +self-hosted instance (no limit). Resolves name + address only; timezone, +landclass, and elevation_m are nulled (not in the Nominatim reverse response). + +Nominatim jsonv2 reverse response shape: + {"display_name": "...", "name": "...", + "address": {city|town|village, county, state, country, postcode, ...}} +""" + +import asyncio +import logging +import time +from typing import Any +from urllib.parse import urlencode + +import aiohttp +from pydantic import BaseModel, ConfigDict, Field + +from central.enrichment.geocoder import all_null_bundle + +logger = logging.getLogger(__name__) + +DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org" +DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)" + + +class NominatimBackendSettings(BaseModel): + """Settings for NominatimBackend. Mirrors __init__ defaults exactly.""" + + model_config = ConfigDict(extra="forbid") + + base_url: str = Field(default=DEFAULT_BASE_URL, description="Nominatim /reverse base URL") + user_agent: str = Field( + default=DEFAULT_USER_AGENT, description="User-Agent (public OSM requires one)" + ) + rate_limit_per_sec: float = Field( + default=1.0, description="Outbound request cap; 0 disables (self-hosted)" + ) + timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds") + + +class NominatimBackend: + """GeocoderBackend backed by an OSM Nominatim /reverse endpoint. + + rate_limit_per_sec throttles outbound requests (public OSM requires <= 1/s); + set it to 0 to disable for self-hosted instances. + """ + + settings_schema = NominatimBackendSettings + + def __init__( + self, + base_url: str = DEFAULT_BASE_URL, + user_agent: str = DEFAULT_USER_AGENT, + rate_limit_per_sec: float = 1.0, + timeout_s: float = 10.0, + ) -> None: + self._base_url = base_url.rstrip("/") + self._user_agent = user_agent + self._min_interval = (1.0 / rate_limit_per_sec) if rate_limit_per_sec > 0 else 0.0 + self._timeout_s = timeout_s + self._rl_lock = asyncio.Lock() + self._last_request_at = 0.0 + + def _url(self, lat: float, lon: float) -> str: + qs = urlencode({"lat": lat, "lon": lon, "format": "jsonv2"}) + return f"{self._base_url}/reverse?{qs}" + + def _request_headers(self) -> dict[str, str]: + # Public Nominatim rejects requests without an identifying User-Agent. + return {"User-Agent": self._user_agent} + + async def _throttle(self) -> None: + if self._min_interval <= 0: + return + async with self._rl_lock: + now = time.monotonic() + wait = self._last_request_at + self._min_interval - now + if wait > 0: + await asyncio.sleep(wait) + self._last_request_at = time.monotonic() + + async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self._timeout_s), + ) as session: + async with session.get( + self._url(lat, lon), headers=self._request_headers() + ) as resp: + resp.raise_for_status() + return await resp.json() + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + try: + await self._throttle() + data = await self._fetch(lat, lon) + addr = data.get("address", {}) or {} + except Exception: + logger.debug("NominatimBackend reverse failed; returning all-null bundle") + return all_null_bundle() + return { + "name": data.get("name") or data.get("display_name"), + "city": addr.get("city") or addr.get("town") or addr.get("village"), + "county": addr.get("county"), + "state": addr.get("state"), + "country": addr.get("country"), + "postal_code": addr.get("postcode"), + "timezone": None, # not in Nominatim reverse response + "landclass": None, # Navi-composed-endpoint only + "elevation_m": None, # Navi-composed-endpoint only + } diff --git a/src/central/enrichment/backends/photon.py b/src/central/enrichment/backends/photon.py new file mode 100644 index 0000000..7503aef --- /dev/null +++ b/src/central/enrichment/backends/photon.py @@ -0,0 +1,82 @@ +"""Raw Photon reverse-geocoding backend. + +For deployers who run a Photon instance directly, without the composed +Navi-style endpoint. Photon resolves name + address only — timezone, +landclass, and elevation_m are Navi-composed-endpoint extras and are nulled +here. + +Photon reverse response shape: + {"features": [{"properties": {name, city, county, state, country, + postcode, ...}, "geometry": {...}}]} +""" + +import logging +from typing import Any +from urllib.parse import urlencode + +import aiohttp +from pydantic import BaseModel, ConfigDict, Field + +from central.enrichment.geocoder import all_null_bundle + +logger = logging.getLogger(__name__) + +DEFAULT_BASE_URL = "http://localhost:2322" + + +class PhotonBackendSettings(BaseModel): + """Settings for PhotonBackend. Mirrors __init__ defaults exactly.""" + + model_config = ConfigDict(extra="forbid") + + base_url: str = Field(default=DEFAULT_BASE_URL, description="Photon /reverse base URL") + timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds") + headers: dict[str, str] | None = Field(default=None, description="Extra request headers") + + +class PhotonBackend: + """GeocoderBackend backed by a raw Photon /reverse endpoint.""" + + settings_schema = PhotonBackendSettings + + def __init__( + self, + base_url: str = DEFAULT_BASE_URL, + timeout_s: float = 10.0, + headers: dict[str, str] | None = None, + ) -> None: + self._base_url = base_url.rstrip("/") + self._timeout_s = timeout_s + self._headers = dict(headers or {}) + + def _url(self, lat: float, lon: float) -> str: + qs = urlencode({"lat": lat, "lon": lon, "limit": 1}) + return f"{self._base_url}/reverse?{qs}" + + async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self._timeout_s), + ) as session: + async with session.get(self._url(lat, lon), headers=self._headers) as resp: + resp.raise_for_status() + return await resp.json() + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + try: + data = await self._fetch(lat, lon) + features = data.get("features") or [] + props = features[0].get("properties", {}) if features else {} + except Exception: + logger.debug("PhotonBackend reverse failed; returning all-null bundle") + return all_null_bundle() + return { + "name": props.get("name"), + "city": props.get("city"), + "county": props.get("county"), + "state": props.get("state"), + "country": props.get("country"), + "postal_code": props.get("postcode"), # Photon names it 'postcode' + "timezone": None, # not provided by raw Photon + "landclass": None, # Navi-composed-endpoint only + "elevation_m": None, # Navi-composed-endpoint only + } diff --git a/src/central/enrichment/cache.py b/src/central/enrichment/cache.py index 06be7af..d36691e 100644 --- a/src/central/enrichment/cache.py +++ b/src/central/enrichment/cache.py @@ -124,3 +124,27 @@ class EnrichmentCache: ) -> None: """Cache a bundle (idempotent upsert on the rounded-coords key).""" await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload) + + def _invalidate_sync(self, enricher_name: str | None) -> int: + conn = self._connect() + try: + if enricher_name is None: + cur = conn.execute("DELETE FROM enrichment_cache") + else: + cur = conn.execute( + "DELETE FROM enrichment_cache WHERE enricher_name = ?", + (enricher_name,), + ) + conn.commit() + return cur.rowcount + finally: + conn.close() + + async def invalidate(self, enricher_name: str | None = None) -> int: + """Drop cached bundles. Scoped to one enricher when given, else all. + + Called when enrichment config changes — a new backend would otherwise + keep returning the previous backend's cached results until TTL expiry. + Returns the number of rows deleted. + """ + return await asyncio.to_thread(self._invalidate_sync, enricher_name) diff --git a/src/central/enrichment/geocoder.py b/src/central/enrichment/geocoder.py index ab75017..195a734 100644 --- a/src/central/enrichment/geocoder.py +++ b/src/central/enrichment/geocoder.py @@ -9,6 +9,8 @@ land in PR K. import logging from typing import Any, Protocol, runtime_checkable +from pydantic import BaseModel + from central.enrichment.cache import EnrichmentCache logger = logging.getLogger(__name__) @@ -38,6 +40,12 @@ def all_null_bundle() -> dict[str, Any]: class GeocoderBackend(Protocol): """The pluggable reverse-geocoding layer beneath GeocoderEnricher.""" + # Pydantic model (extra='forbid') describing this backend's accepted + # settings. The supervisor validates config.enrichment.backend_settings + # against it before instantiating, turning a config/settings mismatch into + # a clean ValidationError instead of a constructor TypeError. + settings_schema: type[BaseModel] + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: """Return canonical geocoder fields (see GEOCODER_FIELDS). diff --git a/src/central/gui/form_descriptors.py b/src/central/gui/form_descriptors.py index ef7588e..adc76a4 100644 --- a/src/central/gui/form_descriptors.py +++ b/src/central/gui/form_descriptors.py @@ -66,6 +66,8 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, return "text", None if field_type is int: return "number", None + if field_type is float: + return "number", None if field_type is bool: return "checkbox", None if field_type is RegionConfig: @@ -88,6 +90,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]" ) + # dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings). + # The form renders the value as JSON; the POST handler parses it back. + if field_type is dict or origin is dict: + return "json", None + # Check if it's a BaseModel subclass (nested model other than RegionConfig) if isinstance(field_type, type) and issubclass(field_type, BaseModel): raise NotImplementedError( diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index ef7ba49..e6bb580 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1990,6 +1990,199 @@ async def streams_update( return RedirectResponse(url="/streams", status_code=302) +# ============================================================================= +# Enrichment config route +# ============================================================================= + + +def _outer_enrichment_fields(current: dict) -> list[FieldDescriptor]: + """EnrichmentConfig form fields EXCEPT backend_settings — that one is + rendered as a per-backend
via _backend_fields().""" + from central.config_models import EnrichmentConfig + + return [ + f for f in describe_fields(EnrichmentConfig, current) + if f.name != "backend_settings" + ] + + +def _backend_fields(backend_class: str | None, current_bs: dict) -> list[FieldDescriptor]: + """Field descriptors for the selected backend's settings_schema, or [] when + the backend class is unknown. Same generic describe_fields machinery.""" + from central.supervisor import _BACKEND_REGISTRY + + cls = _BACKEND_REGISTRY.get(backend_class or "") + if cls is None: + return [] + return describe_fields(cls.settings_schema, current_bs or {}) + + +async def _read_enrichment_row(conn) -> dict: + row = await conn.fetchrow( + """ + SELECT enricher_class, backend_class, backend_settings, cache_ttl_s + FROM config.enrichment WHERE id = true + """ + ) + return dict(row) if row is not None else {} + + +def _enrichment_context(request, *, outer_fields, backend_fields, backend_class, + errors=None, form_data=None, backend_form_data=None): + return { + "operator": request.state.operator, + "csrf_token": request.state.csrf_token, + "outer_fields": outer_fields, + "backend_fields": backend_fields, + "backend_class": backend_class, + "errors": errors, + "form_data": form_data, + "backend_form_data": backend_form_data, + } + + +@router.get("/enrichment", response_class=HTMLResponse) +async def enrichment_form(request: Request) -> HTMLResponse: + """Render the enrichment config form (outer fields + a per-backend fieldset + for the currently-selected backend_class).""" + templates = _get_templates() + pool = get_pool() + + async with pool.acquire() as conn: + current = await _read_enrichment_row(conn) + + backend_class = current.get("backend_class") or "NoOpBackend" + current_bs = current.get("backend_settings") or {} + + return templates.TemplateResponse( + request=request, + name="enrichment.html", + context=_enrichment_context( + request, + outer_fields=_outer_enrichment_fields(current), + backend_fields=_backend_fields(backend_class, current_bs), + backend_class=backend_class, + ), + ) + + +@router.post("/enrichment") +async def enrichment_update(request: Request) -> Response: + """Validate + persist the enrichment config. Hot-reload picks it up via the + config.enrichment NOTIFY trigger. backend_settings is validated against the + SUBMITTED backend_class's settings_schema.""" + from central.config_models import EnrichmentConfig + from central.supervisor import _BACKEND_REGISTRY + + templates = _get_templates() + pool = get_pool() + + form = await request.form() + if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token: + raise CsrfValidationError("Invalid CSRF token") + + errors: dict[str, str] = {} + form_data: dict[str, Any] = {} + backend_form_data: dict[str, Any] = {} + parsed: dict[str, Any] = {} + + # --- outer EnrichmentConfig fields (backend_settings excluded) --- + for field in _outer_enrichment_fields({}): + raw = form.get(field.name, "") + form_data[field.name] = raw + if field.widget == "number": + try: + parsed[field.name] = int(raw) if raw else None + except ValueError: + errors[field.name] = f"{field.label} must be a number" + else: # text + parsed[field.name] = raw.strip() if raw else None + + submitted_backend_class = parsed.get("backend_class") + + # --- backend settings fieldset, validated against the SUBMITTED backend --- + backend_settings: dict[str, Any] = {} + backend_cls = _BACKEND_REGISTRY.get(submitted_backend_class or "") + if backend_cls is None and submitted_backend_class: + errors["backend_class"] = f"Unknown backend: {submitted_backend_class}" + elif backend_cls is not None: + for f in describe_fields(backend_cls.settings_schema, {}): + formkey = f"bs_{f.name}" + raw = form.get(formkey, "") + backend_form_data[formkey] = raw + if f.widget == "checkbox": + backend_settings[f.name] = formkey in form + elif f.widget == "json": + if raw and raw.strip(): + try: + backend_settings[f.name] = json.loads(raw) + except json.JSONDecodeError as e: + errors[formkey] = f"{f.label} is not valid JSON: {e}" + # blank -> omit, schema default applies + else: # text / number — let pydantic coerce, omit blanks for defaults + if raw.strip() != "": + backend_settings[f.name] = raw.strip() + if not errors: + try: + backend_settings = backend_cls.settings_schema.model_validate( + backend_settings + ).model_dump() + except ValidationError as e: + for err in e.errors(): + loc = err["loc"][0] if err["loc"] else "unknown" + errors[f"bs_{loc}"] = err["msg"] + + # --- outer EnrichmentConfig validation --- + if not errors: + try: + validated = EnrichmentConfig( + **{k: v for k, v in parsed.items() if v is not None}, + backend_settings=backend_settings, + ) + except ValidationError as e: + for err in e.errors(): + loc = err["loc"][0] if err["loc"] else "unknown" + errors[str(loc)] = err["msg"] + + if errors: + # Re-render against the SUBMITTED backend_class so field errors attach + # to the right schema (operator may be mid-switch with a typo). + return templates.TemplateResponse( + request=request, + name="enrichment.html", + context=_enrichment_context( + request, + outer_fields=_outer_enrichment_fields({}), + backend_fields=_backend_fields(submitted_backend_class, backend_settings), + backend_class=submitted_backend_class, + errors=errors, + form_data=form_data, + backend_form_data=backend_form_data, + ), + status_code=200, + ) + + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO config.enrichment + (id, enricher_class, backend_class, backend_settings, cache_ttl_s) + VALUES (true, $1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE SET + enricher_class = EXCLUDED.enricher_class, + backend_class = EXCLUDED.backend_class, + backend_settings = EXCLUDED.backend_settings, + cache_ttl_s = EXCLUDED.cache_ttl_s + """, + validated.enricher_class, + validated.backend_class, + validated.backend_settings, # encoded as jsonb by the pool codec + validated.cache_ttl_s, + ) + + return RedirectResponse(url="/enrichment", status_code=302) + + # Alias validation regex ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$') diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index a7a667d..be7dbca 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -19,6 +19,7 @@
  • Adapters
  • Events
  • Streams
  • +
  • Enrichment
  • API Keys
  • {{ operator.username }}
  • Change Password
  • diff --git a/src/central/gui/templates/enrichment.html b/src/central/gui/templates/enrichment.html new file mode 100644 index 0000000..e2e4d1c --- /dev/null +++ b/src/central/gui/templates/enrichment.html @@ -0,0 +1,90 @@ +{% extends "base.html" %} + +{% block title %}Central — Enrichment{% endblock %} + +{% block content %} +

    Enrichment

    +

    + Central-side event enrichment. Results are attached to each event under + data._enriched.<enricher>. Changes hot-reload into the + supervisor; switching backend invalidates the enrichment cache. Backend + settings below are validated against the selected backend; switching + backend then saving re-renders this form with the new backend's fields. +

    + +
    + + +
    + Configuration + {% for field in outer_fields %} + {% if field.widget == "text" %} + + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + {% elif field.widget == "number" %} + + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + {% endif %} + {% endfor %} +
    + +
    + Backend settings — {{ backend_class }} + {% if not backend_fields %} + This backend takes no settings. + {% endif %} + {% for field in backend_fields %} + {% set fk = "bs_" ~ field.name %} + {% if field.widget == "text" %} + + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[fk] %} + {{ errors[fk] }} + {% endif %} + {% elif field.widget == "number" %} + + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[fk] %} + {{ errors[fk] }} + {% endif %} + {% elif field.widget == "checkbox" %} + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[fk] %} + {{ errors[fk] }} + {% endif %} + {% elif field.widget == "json" %} + + + JSON object{% if field.description %} — {{ field.description }}{% endif %} + {% if errors and errors[fk] %} + {{ errors[fk] }} + {% endif %} + {% endif %} + {% endfor %} +
    + + +
    +{% endblock %} diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 8ca6be9..90913a7 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -12,6 +12,7 @@ from typing import Any import nats from nats.js import JetStreamContext +from pydantic import ValidationError from central.adapter import SourceAdapter from central.adapter_discovery import discover_adapters @@ -24,7 +25,10 @@ from central.api_key_resolver import resolve_api_key_alias from central.config_models import EnrichmentConfig from central.enrichment.base import Enricher from central.enrichment.cache import EnrichmentCache +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.backends.nominatim import NominatimBackend from central.enrichment.backends.no_op import NoOpBackend +from central.enrichment.backends.photon import PhotonBackend from central.enrichment.geocoder import GeocoderEnricher from central.models import Event from central.stream_manager import StreamManager @@ -33,23 +37,32 @@ CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db") # Enricher / backend class-name registries for EnrichmentConfig resolution. -# PR J ships GeocoderEnricher + NoOpBackend only; PR K extends these. _ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher} -_BACKEND_REGISTRY: dict[str, type] = {"NoOpBackend": NoOpBackend} +_BACKEND_REGISTRY: dict[str, type] = { + "NoOpBackend": NoOpBackend, + "NaviBackend": NaviBackend, + "PhotonBackend": PhotonBackend, + "NominatimBackend": NominatimBackend, +} def build_enrichers( enrichment_config: EnrichmentConfig, - cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH, + cache: EnrichmentCache, ) -> list[Enricher]: - """Instantiate the configured enricher(s) with their backend + cache. + """Instantiate the configured enricher(s) with their backend + the given cache. - Read once at supervisor startup — enrichment config is NOT hot-reloaded - in PR J (see EnrichmentConfig docstring). + The supervisor owns the cache (so it can invalidate it on a config change) + and passes it in. Enrichment config is read from config.enrichment at + startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change). """ backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class] - backend = backend_cls(**enrichment_config.backend_settings) - cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s) + # Validate backend_settings against the backend's settings_schema BEFORE + # constructing. A mismatch (e.g. a stale base_url left in the row after + # switching to NoOpBackend) raises a clean pydantic ValidationError here + # instead of a TypeError inside the backend constructor. + validated = backend_cls.settings_schema.model_validate(enrichment_config.backend_settings) + backend = backend_cls(**validated.model_dump()) enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class] return [enricher_cls(backend, cache=cache)] @@ -158,9 +171,15 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config - # Enrichment is read once at startup (no hot-reload in PR J). + # Enrichment: a valid default set is built now so the supervisor is + # never in a half-state; start() then overrides it from the live + # config.enrichment row, and _on_config_change hot-reloads it. + self._active_enrichment_config = enrichment_config or EnrichmentConfig() + self._enrichment_cache = EnrichmentCache( + ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s + ) self._enrichers: list[Enricher] = build_enrichers( - enrichment_config or EnrichmentConfig() + self._active_enrichment_config, self._enrichment_cache ) self._adapters = discover_adapters() self._nc: nats.NATS | None = None @@ -666,11 +685,74 @@ class Supervisor: extra={"stream": stream_config.name, "error": str(e)}, ) + def _rebuild_enrichers(self, config: EnrichmentConfig) -> None: + """Rebuild the active enricher set + cache from an EnrichmentConfig. + + Builds into locals first and commits to instance state only on success, + so a ValidationError (bad backend_settings) leaves the previously-active + enrichers/config/cache untouched and propagates to the caller. + """ + cache = EnrichmentCache(ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s) + enrichers = build_enrichers(config, cache) # may raise ValidationError + self._active_enrichment_config = config + self._enrichment_cache = cache + self._enrichers = enrichers + + async def _handle_enrichment_change(self) -> None: + """Re-read config.enrichment and rebuild enrichers. Invalidate the cache + when the backend changed, so stale results from the previous backend + don't survive until TTL expiry. + + Invalid backend_settings (ValidationError) leave the previous backend + running — the supervisor stays up; the operator fixes the row and the + next NOTIFY brings it in cleanly. + """ + new_config = await self._config_source.get_enrichment_config() + old_config = self._active_enrichment_config + backend_changed = ( + new_config.backend_class != old_config.backend_class + or new_config.backend_settings != old_config.backend_settings + or new_config.enricher_class != old_config.enricher_class + ) + try: + self._rebuild_enrichers(new_config) + except ValidationError as e: + logger.error( + "Enrichment config invalid; keeping previous backend", + extra={ + "enricher_class": new_config.enricher_class, + "backend_class": new_config.backend_class, + "backend_settings": new_config.backend_settings, + "errors": e.errors(), + }, + ) + return + if backend_changed: + deleted = await self._enrichment_cache.invalidate() + logger.info( + "Enrichment backend changed; cache invalidated", + extra={ + "enricher_class": new_config.enricher_class, + "backend_class": new_config.backend_class, + "rows_deleted": deleted, + }, + ) + else: + logger.info( + "Enrichment config changed (cache retained)", + extra={"cache_ttl_s": new_config.cache_ttl_s}, + ) + async def _on_config_change(self, table: str, key: str) -> None: """Handle a configuration change notification. Called when NOTIFY fires for config changes. """ + # Handle enrichment config changes (single-row config.enrichment) + if table == "enrichment": + await self._handle_enrichment_change() + return + # Handle stream changes if table == "streams": stream_name = key @@ -762,6 +844,18 @@ class Supervisor: # Ensure streams exist with correct configuration await self._ensure_streams() + # Load the operator-set enrichment config (overrides the constructor + # default); hot-reloaded thereafter via _on_config_change. + enrichment_config = await self._config_source.get_enrichment_config() + self._rebuild_enrichers(enrichment_config) + logger.info( + "Enrichment configured", + extra={ + "enricher_class": enrichment_config.enricher_class, + "backend_class": enrichment_config.backend_class, + }, + ) + # Load and start enabled adapters enabled_adapters = await self._config_source.list_enabled_adapters() for config in enabled_adapters: diff --git a/tests/test_backend_settings_schema.py b/tests/test_backend_settings_schema.py new file mode 100644 index 0000000..1d02fcc --- /dev/null +++ b/tests/test_backend_settings_schema.py @@ -0,0 +1,235 @@ +"""Tests for per-backend settings schemas (PR L.5). + +Each GeocoderBackend declares a Pydantic settings_schema (extra='forbid'). +build_enrichers validates backend_settings against it BEFORE constructing, so +a config/settings mismatch is a clean ValidationError, not a TypeError. The +supervisor's hot-reload keeps the previous backend running on ValidationError; +the GUI POST re-renders with field errors and does not write the DB row. + +Regression guard for the 2026-05-20 incident: switching backend_class to +NoOpBackend while backend_settings still held {"base_url": ...} crashed +_rebuild_enrichers with `TypeError: NoOpBackend() takes no arguments`. +""" + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from pydantic import BaseModel, ValidationError + +from central.config_models import EnrichmentConfig +from central.enrichment.cache import EnrichmentCache +from central.supervisor import _BACKEND_REGISTRY, build_enrichers + + +# --- schemas exist + extra='forbid' ----------------------------------------- + +def test_every_backend_declares_a_settings_schema(): + for name, cls in _BACKEND_REGISTRY.items(): + schema = getattr(cls, "settings_schema", None) + assert schema is not None, f"{name} has no settings_schema" + assert isinstance(schema, type) and issubclass(schema, BaseModel), name + + +def test_every_settings_schema_forbids_extra(): + for name, cls in _BACKEND_REGISTRY.items(): + with pytest.raises(ValidationError): + cls.settings_schema.model_validate({"definitely_not_a_field": 1}) + + +def test_navi_schema_accepts_valid_kwargs_and_preserves_defaults(): + from central.enrichment.backends.navi import NaviBackendSettings + + m = NaviBackendSettings() + assert m.base_url == "http://localhost:8440" + assert m.timeout_s == 10.0 # preserved from __init__, NOT 5.0 + assert m.headers is None + assert m.warmup is True + m2 = NaviBackendSettings(base_url="http://navi:8440", timeout_s=3.0, warmup=False) + assert m2.base_url == "http://navi:8440" and m2.timeout_s == 3.0 + + +def test_noop_schema_has_no_fields(): + from central.enrichment.backends.no_op import NoOpBackendSettings + + assert list(NoOpBackendSettings.model_fields) == [] + + +# --- build_enrichers validation ---------------------------------------------- + +def _cache(tmp_path) -> EnrichmentCache: + return EnrichmentCache(tmp_path / "c.db") + + +def test_build_enrichers_raises_validation_error_not_typeerror(tmp_path): + """The exact 2026-05-20 bug: NoOpBackend + stale base_url. Must be a clean + ValidationError, never a TypeError from the constructor.""" + cfg = EnrichmentConfig(backend_class="NoOpBackend", backend_settings={"base_url": "http://x"}) + with pytest.raises(ValidationError): + build_enrichers(cfg, _cache(tmp_path)) + + +def test_build_enrichers_navi_valid_settings(tmp_path): + from central.enrichment.backends.navi import NaviBackend + + cfg = EnrichmentConfig( + backend_class="NaviBackend", + backend_settings={"base_url": "http://navi:8440", "warmup": False}, + ) + enrichers = build_enrichers(cfg, _cache(tmp_path)) + assert isinstance(enrichers[0]._backend, NaviBackend) + + +def test_build_enrichers_navi_unknown_setting_rejected(tmp_path): + cfg = EnrichmentConfig( + backend_class="NaviBackend", + backend_settings={"base_url": "http://navi:8440", "bogus": 1}, + ) + with pytest.raises(ValidationError): + build_enrichers(cfg, _cache(tmp_path)) + + +# --- supervisor hot-reload keeps previous backend on ValidationError --------- + +def _supervisor(): + from central import supervisor as sup_mod + + config_source = MagicMock() + config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig()) + return sup_mod.Supervisor( + config_source=config_source, + config_store=MagicMock(), + nats_url="nats://localhost:4222", + ) + + +@pytest.mark.asyncio +async def test_handle_enrichment_change_keeps_previous_on_validation_error(): + """Navi active, then a bad NOTIFY (NoOp + leftover base_url) arrives. The + supervisor must keep NaviBackend running and not crash.""" + from central.enrichment.backends.navi import NaviBackend + from central.enrichment.backends.no_op import NoOpBackend + + with patch("central.supervisor.EnrichmentCache") as cache_cls: + cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0)) + sup = _supervisor() + # Establish a valid NaviBackend active state. + sup._rebuild_enrichers(EnrichmentConfig( + backend_class="NaviBackend", + backend_settings={"base_url": "http://navi:8440", "warmup": False}, + )) + assert isinstance(sup._enrichers[0]._backend, NaviBackend) + active_before = sup._active_enrichment_config + + # Bad config arrives via NOTIFY: NoOp + stale base_url -> ValidationError. + sup._config_source.get_enrichment_config = AsyncMock( + return_value=EnrichmentConfig( + backend_class="NoOpBackend", + backend_settings={"base_url": "http://navi:8440"}, + ) + ) + await sup._handle_enrichment_change() # must NOT raise + + # Previous backend still active; config unchanged; cache NOT invalidated. + assert isinstance(sup._enrichers[0]._backend, NaviBackend) + assert sup._active_enrichment_config is active_before + cache_cls.return_value.invalidate.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_handle_enrichment_change_applies_valid_noop_with_empty_settings(): + """Switching to NoOp with empty backend_settings (the correct way) applies + cleanly and invalidates the cache.""" + from central.enrichment.backends.no_op import NoOpBackend + + with patch("central.supervisor.EnrichmentCache") as cache_cls: + cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=3)) + sup = _supervisor() + sup._rebuild_enrichers(EnrichmentConfig( + backend_class="NaviBackend", + backend_settings={"base_url": "http://navi:8440", "warmup": False}, + )) + sup._config_source.get_enrichment_config = AsyncMock( + return_value=EnrichmentConfig(backend_class="NoOpBackend", backend_settings={}) + ) + await sup._handle_enrichment_change() + assert isinstance(sup._enrichers[0]._backend, NoOpBackend) + cache_cls.return_value.invalidate.assert_awaited() + + +# --- GUI POST validates backend_settings + does not write on error ---------- + +def _mock_pool(conn): + pool = MagicMock() + cm = MagicMock() + cm.__aenter__ = AsyncMock(return_value=conn) + cm.__aexit__ = AsyncMock(return_value=None) + pool.acquire = MagicMock(return_value=cm) + return pool + + +@pytest.mark.asyncio +async def test_gui_post_rejects_bad_backend_settings_without_writing(): + """POST with a bad backend setting (timeout_s non-numeric) re-renders with a + field error keyed bs_timeout_s and does NOT execute the DB upsert.""" + from central.gui.routes import enrichment_update + + request = MagicMock() + request.state.operator = MagicMock(username="op") + request.state.csrf_token = "tok" + + form = { + "csrf_token": "tok", + "enricher_class": "GeocoderEnricher", + "backend_class": "NaviBackend", + "cache_ttl_s": "86400", + "bs_base_url": "http://navi:8440", + "bs_timeout_s": "not-a-number", + } + request.form = AsyncMock(return_value=form) + + conn = MagicMock() + conn.execute = AsyncMock() + conn.fetchrow = AsyncMock(return_value={}) + templates = MagicMock() + templates.TemplateResponse.return_value = MagicMock() + + with patch("central.gui.routes._get_templates", return_value=templates), \ + patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): + await enrichment_update(request) + + conn.execute.assert_not_awaited() # no DB write on validation failure + ctx = templates.TemplateResponse.call_args.kwargs["context"] + assert "bs_timeout_s" in ctx["errors"] + assert ctx["backend_class"] == "NaviBackend" # re-rendered against SUBMITTED backend + + +@pytest.mark.asyncio +async def test_gui_post_writes_on_valid_settings(): + from central.gui.routes import enrichment_update + + request = MagicMock() + request.state.operator = MagicMock(username="op") + request.state.csrf_token = "tok" + form = { + "csrf_token": "tok", + "enricher_class": "GeocoderEnricher", + "backend_class": "NaviBackend", + "cache_ttl_s": "86400", + "bs_base_url": "http://navi:8440", + "bs_timeout_s": "10", + } + request.form = AsyncMock(return_value=form) + conn = MagicMock() + conn.execute = AsyncMock() + templates = MagicMock() + + with patch("central.gui.routes._get_templates", return_value=templates), \ + patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): + resp = await enrichment_update(request) + + conn.execute.assert_awaited() # DB upsert ran + # the backend_settings written are the validated/dumped dict + args = conn.execute.call_args.args + assert "INSERT INTO config.enrichment" in args[0] + assert resp.status_code == 302 diff --git a/tests/test_enrichment_config_plumbing.py b/tests/test_enrichment_config_plumbing.py new file mode 100644 index 0000000..1ac5e36 --- /dev/null +++ b/tests/test_enrichment_config_plumbing.py @@ -0,0 +1,213 @@ +"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5). + +Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload +rebuild, cache invalidation on backend change (but not on TTL-only change), +EnrichmentCache.invalidate, the generic json widget for backend_settings, and +the /enrichment GUI render. No real DB / NATS — pool, config_source, and the +EnrichmentCache class are mocked. +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.config_models import EnrichmentConfig +from central.enrichment.cache import EnrichmentCache +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.backends.no_op import NoOpBackend +from central.gui.form_descriptors import describe_fields + + +# --- mock pool/conn helpers ------------------------------------------------- + +def _mock_pool(conn: MagicMock) -> MagicMock: + pool = MagicMock() + acquire_cm = MagicMock() + acquire_cm.__aenter__ = AsyncMock(return_value=conn) + acquire_cm.__aexit__ = AsyncMock(return_value=None) + pool.acquire = MagicMock(return_value=acquire_cm) + return pool + + +# --- ConfigStore -------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_config_store_reads_enrichment_row(): + from central.config_store import ConfigStore + + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value={ + "enricher_class": "GeocoderEnricher", + "backend_class": "NaviBackend", + "backend_settings": {"base_url": "http://example.test:8440"}, + "cache_ttl_s": 3600, + }) + store = ConfigStore(_mock_pool(conn)) + cfg = await store.get_enrichment_config() + assert isinstance(cfg, EnrichmentConfig) + assert cfg.backend_class == "NaviBackend" + assert cfg.backend_settings == {"base_url": "http://example.test:8440"} + assert cfg.cache_ttl_s == 3600 + + +@pytest.mark.asyncio +async def test_config_store_falls_back_to_defaults_when_row_absent(): + from central.config_store import ConfigStore + + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value=None) + store = ConfigStore(_mock_pool(conn)) + cfg = await store.get_enrichment_config() + assert cfg == EnrichmentConfig() # framework defaults + assert cfg.backend_class == "NoOpBackend" + + +@pytest.mark.asyncio +async def test_config_store_upsert_passes_dict_settings(): + from central.config_store import ConfigStore + + conn = MagicMock() + conn.execute = AsyncMock() + store = ConfigStore(_mock_pool(conn)) + cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"}) + await store.upsert_enrichment_config(cfg) + args = conn.execute.call_args.args + assert "INSERT INTO config.enrichment" in args[0] + # backend_settings passed as a dict (pool codec encodes to jsonb), not a str. + assert {"base_url": "x"} in args + + +# --- EnrichmentCache.invalidate ---------------------------------------------- + +@pytest.mark.asyncio +async def test_cache_invalidate_all(tmp_path): + cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600) + await cache.set("geocoder", 1.0, 2.0, {"name": "x"}) + await cache.set("geocoder", 3.0, 4.0, {"name": "y"}) + deleted = await cache.invalidate() + assert deleted == 2 + assert await cache.get("geocoder", 1.0, 2.0) is None + + +@pytest.mark.asyncio +async def test_cache_invalidate_scoped_to_enricher(tmp_path): + cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600) + await cache.set("geocoder", 1.0, 2.0, {"name": "x"}) + await cache.set("other", 1.0, 2.0, {"name": "z"}) + deleted = await cache.invalidate("geocoder") + assert deleted == 1 + assert await cache.get("geocoder", 1.0, 2.0) is None + assert await cache.get("other", 1.0, 2.0) == {"name": "z"} + + +# --- Supervisor startup read + hot-reload ------------------------------------ + +def _supervisor_with(enrichment_cfg: EnrichmentConfig): + """Build a Supervisor with mocked deps and a mocked EnrichmentCache class + (so no real /var/lib cache file is touched).""" + from central import supervisor as sup_mod + + config_source = MagicMock() + config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg) + config_store = MagicMock() + sup = sup_mod.Supervisor( + config_source=config_source, + config_store=config_store, + nats_url="nats://localhost:4222", + ) + return sup + + +@pytest.mark.asyncio +async def test_supervisor_builds_navi_from_config(): + """Given a config naming NaviBackend, the supervisor's enricher set wraps a + NaviBackend — proves the registry resolution end-to-end.""" + with patch("central.supervisor.EnrichmentCache") as cache_cls: + cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0)) + sup = _supervisor_with( + EnrichmentConfig(backend_class="NaviBackend", + backend_settings={"base_url": "http://x:8440", "warmup": False}) + ) + cfg = await sup._config_source.get_enrichment_config() + sup._rebuild_enrichers(cfg) + assert isinstance(sup._enrichers[0]._backend, NaviBackend) + + +@pytest.mark.asyncio +async def test_hot_reload_rebuilds_and_invalidates_on_backend_change(): + from central import supervisor as sup_mod + + with patch("central.supervisor.EnrichmentCache") as cache_cls: + invalidate = AsyncMock(return_value=5) + cache_cls.return_value = MagicMock(invalidate=invalidate) + # Start at NoOp. + sup = _supervisor_with(EnrichmentConfig()) + sup._rebuild_enrichers(EnrichmentConfig()) + assert isinstance(sup._enrichers[0]._backend, NoOpBackend) + # Config flips to Navi. + sup._config_source.get_enrichment_config = AsyncMock( + return_value=EnrichmentConfig( + backend_class="NaviBackend", + backend_settings={"base_url": "http://x:8440", "warmup": False}, + ) + ) + await sup._handle_enrichment_change() + assert isinstance(sup._enrichers[0]._backend, NaviBackend) + invalidate.assert_awaited() # backend changed -> cache wiped + + +@pytest.mark.asyncio +async def test_hot_reload_does_not_invalidate_on_ttl_only_change(): + with patch("central.supervisor.EnrichmentCache") as cache_cls: + invalidate = AsyncMock(return_value=0) + cache_cls.return_value = MagicMock(invalidate=invalidate) + sup = _supervisor_with(EnrichmentConfig()) + sup._rebuild_enrichers(EnrichmentConfig()) + # Same backend, only TTL changes. + sup._config_source.get_enrichment_config = AsyncMock( + return_value=EnrichmentConfig(cache_ttl_s=3600) + ) + await sup._handle_enrichment_change() + invalidate.assert_not_awaited() + + +# --- generic json widget + GUI render ---------------------------------------- + +def test_describe_fields_renders_dict_as_json_widget(): + fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})} + assert fields["backend_settings"] == "json" + assert fields["enricher_class"] == "text" + assert fields["cache_ttl_s"] == "number" + + +@pytest.mark.asyncio +async def test_enrichment_form_renders(): + from central.gui.routes import enrichment_form + + request = MagicMock() + request.state.operator = MagicMock(username="op") + request.state.csrf_token = "tok" + + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value={ + "enricher_class": "GeocoderEnricher", + "backend_class": "NoOpBackend", + "backend_settings": {}, + "cache_ttl_s": 86400, + }) + templates = MagicMock() + templates.TemplateResponse.return_value = MagicMock() + + with patch("central.gui.routes._get_templates", return_value=templates), \ + patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): + await enrichment_form(request) + + ctx = templates.TemplateResponse.call_args.kwargs["context"] + # PR L.5: outer fields exclude backend_settings (now a per-backend fieldset); + # NoOpBackend's fieldset has zero fields. + outer = {f.name for f in ctx["outer_fields"]} + assert "backend_settings" not in outer + assert "backend_class" in outer + assert ctx["backend_class"] == "NoOpBackend" + assert ctx["backend_fields"] == [] + assert ctx["csrf_token"] == "tok" diff --git a/tests/test_firms.py b/tests/test_firms.py index 2ab1e1d..6a974e2 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -456,6 +456,7 @@ class TestEnrichmentIntegration: """A FIRMS event run through the supervisor's enrichment stage emerges with data._enriched.geocoder populated (all-null under NoOpBackend).""" from central.config_models import EnrichmentConfig + from central.enrichment.cache import EnrichmentCache from central.enrichment.geocoder import all_null_bundle from central.supervisor import apply_enrichment, build_enrichers @@ -469,9 +470,8 @@ class TestEnrichmentIntegration: event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") assert "_enriched" not in event.data - enrichers = build_enrichers( - EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db" - ) + cache = EnrichmentCache(tmp_path / "enrichment_cache.db") + enrichers = build_enrichers(EnrichmentConfig(), cache) await apply_enrichment(event, adapter.enrichment_locations, enrichers) assert "_enriched" in event.data diff --git a/tests/test_navi_backend.py b/tests/test_navi_backend.py new file mode 100644 index 0000000..2adf90d --- /dev/null +++ b/tests/test_navi_backend.py @@ -0,0 +1,125 @@ +"""Tests for NaviBackend (composed Navi /api/reverse endpoint). + +HTTP is exercised via patching the backend's `_fetch` (the codebase has no +aioresponses/respx dep); URL construction is asserted on the pure `_url` +helper. An env-gated integration smoke against the live Navi endpoint is +skipped by default. +""" + +import os +from unittest.mock import AsyncMock + +import pytest + +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +# Full Navi response — already canonical shape. +_NAVI_OK = { + "name": "Where you are", + "city": "Boise", + "county": "Ada", + "state": "Idaho", + "country": "United States", + "postal_code": "83702", + "timezone": "America/Boise", + "landclass": "Public — National Forest", + "elevation_m": 824, +} + + +def _backend() -> NaviBackend: + # warmup=False so construction issues no background task in tests. + return NaviBackend(base_url="http://navi.test:8440", warmup=False) + + +def test_url_construction(): + b = _backend() + assert b._url(43.615, -116.2023) == "http://navi.test:8440/api/reverse/43.615/-116.2023" + + +def test_base_url_trailing_slash_stripped(): + b = NaviBackend(base_url="http://navi.test:8440/", warmup=False) + assert b._url(1.0, 2.0) == "http://navi.test:8440/api/reverse/1.0/2.0" + + +@pytest.mark.asyncio +async def test_happy_path_passthrough(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_NAVI_OK)) + result = await b.reverse(43.615, -116.2023) + assert result == _NAVI_OK + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_partial_nulls_preserved(): + """Navi 200-with-nulls (non-US: timezone + elevation, rest null).""" + partial = {**all_null_bundle(), "timezone": "Europe/Paris", "elevation_m": 35} + b = _backend() + b._fetch = AsyncMock(return_value=partial) + result = await b.reverse(48.85, 2.35) + assert result["timezone"] == "Europe/Paris" + assert result["elevation_m"] == 35 + assert result["city"] is None + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_extra_keys_dropped(): + b = _backend() + b._fetch = AsyncMock(return_value={**_NAVI_OK, "debug_internal": "leak"}) + result = await b.reverse(1.0, 2.0) + assert "debug_internal" not in result + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_network_error_returns_all_null_never_raises(): + b = _backend() + b._fetch = AsyncMock(side_effect=ConnectionError("boom")) + result = await b.reverse(1.0, 2.0) + assert result == all_null_bundle() + + +@pytest.mark.asyncio +async def test_timeout_returns_all_null(): + import asyncio + + b = _backend() + b._fetch = AsyncMock(side_effect=asyncio.TimeoutError()) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_malformed_response_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ValueError("not json")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_headers_passed_through_config(): + b = NaviBackend(base_url="http://navi.test", headers={"Authorization": "Bearer x"}, warmup=False) + assert b._headers == {"Authorization": "Bearer x"} + + +@pytest.mark.asyncio +@pytest.mark.skipif( + os.environ.get("NAVI_INTEGRATION_TEST") != "1", + reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint", +) +async def test_live_navi_boise(): + """Integration smoke against the real endpoint (default skipped). + + The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific + address lives in source; defaults to localhost when unset. + """ + base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440") + b = NaviBackend(base_url=base_url, warmup=False) + result = await b.reverse(43.6150, -116.2023) + assert result["name"] == "Where you are" + assert result["city"] == "Boise" + assert result["state"] == "Idaho" + assert result["elevation_m"] is not None + assert abs(float(result["elevation_m"]) - 824) < 50 diff --git a/tests/test_nominatim_backend.py b/tests/test_nominatim_backend.py new file mode 100644 index 0000000..6663d23 --- /dev/null +++ b/tests/test_nominatim_backend.py @@ -0,0 +1,118 @@ +"""Tests for NominatimBackend (OSM Nominatim /reverse jsonv2).""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from central.enrichment.backends.nominatim import ( + DEFAULT_USER_AGENT, + NominatimBackend, +) +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +_NOMINATIM_OK = { + "name": "Boise", + "display_name": "Boise, Ada County, Idaho, United States", + "address": { + "city": "Boise", + "county": "Ada County", + "state": "Idaho", + "country": "United States", + "postcode": "83702", + }, +} + + +def _backend(**kw) -> NominatimBackend: + kw.setdefault("base_url", "http://nominatim.test") + kw.setdefault("rate_limit_per_sec", 0) # disabled by default in tests + return NominatimBackend(**kw) + + +def test_url_and_format(): + b = _backend() + url = b._url(43.6, -116.2) + assert url.startswith("http://nominatim.test/reverse?") + assert "format=jsonv2" in url and "lat=43.6" in url and "lon=-116.2" in url + + +def test_user_agent_header_present(): + b = _backend() + assert b._request_headers()["User-Agent"] == DEFAULT_USER_AGENT + + +def test_custom_user_agent(): + b = _backend(user_agent="myapp/1.0 (me@example.com)") + assert b._request_headers()["User-Agent"] == "myapp/1.0 (me@example.com)" + + +@pytest.mark.asyncio +async def test_happy_path_maps_address_block(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK)) + result = await b.reverse(43.6, -116.2) + assert set(result.keys()) == set(GEOCODER_FIELDS) + assert result["name"] == "Boise" + assert result["city"] == "Boise" + assert result["county"] == "Ada County" + assert result["state"] == "Idaho" + assert result["country"] == "United States" + assert result["postal_code"] == "83702" + + +@pytest.mark.asyncio +async def test_navi_only_fields_null(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK)) + result = await b.reverse(43.6, -116.2) + assert result["timezone"] is None + assert result["landclass"] is None + assert result["elevation_m"] is None + + +@pytest.mark.asyncio +async def test_city_falls_back_to_town_then_village(): + b = _backend() + b._fetch = AsyncMock(return_value={"address": {"village": "Tinytown"}}) + result = await b.reverse(1.0, 2.0) + assert result["city"] == "Tinytown" + + +@pytest.mark.asyncio +async def test_network_error_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ConnectionError("down")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_malformed_json_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ValueError("bad")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_rate_limit_disabled_does_not_sleep(): + b = _backend(rate_limit_per_sec=0) + with patch("central.enrichment.backends.nominatim.asyncio.sleep") as slp: + await b._throttle() + await b._throttle() + slp.assert_not_called() + + +@pytest.mark.asyncio +async def test_rate_limit_spaces_consecutive_requests(): + """With a 1/sec limit, the second back-to-back call must sleep a positive + interval. Mock monotonic to a fixed instant so the throttle computes a wait.""" + b = _backend(rate_limit_per_sec=1.0) + sleeps: list[float] = [] + + async def _fake_sleep(d): + sleeps.append(d) + + with patch("central.enrichment.backends.nominatim.time.monotonic", return_value=100.0), \ + patch("central.enrichment.backends.nominatim.asyncio.sleep", side_effect=_fake_sleep): + await b._throttle() # first: last_request_at was 0 -> no wait + await b._throttle() # second: now==100, last==100 -> wait ~1.0s + assert any(d > 0 for d in sleeps), f"expected a positive throttle sleep, got {sleeps}" diff --git a/tests/test_photon_backend.py b/tests/test_photon_backend.py new file mode 100644 index 0000000..f224d21 --- /dev/null +++ b/tests/test_photon_backend.py @@ -0,0 +1,92 @@ +"""Tests for PhotonBackend (raw Photon /reverse).""" + +from unittest.mock import AsyncMock + +import pytest + +from central.enrichment.backends.photon import PhotonBackend +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +# Photon reverse response shape. +_PHOTON_OK = { + "features": [ + { + "properties": { + "name": "Boise", + "city": "Boise", + "county": "Ada County", + "state": "Idaho", + "country": "United States", + "postcode": "83702", + "osm_key": "place", + }, + "geometry": {"type": "Point", "coordinates": [-116.2, 43.6]}, + } + ] +} + + +def _backend() -> PhotonBackend: + return PhotonBackend(base_url="http://photon.test:2322") + + +def test_url_construction(): + b = _backend() + url = b._url(43.6, -116.2) + assert url.startswith("http://photon.test:2322/reverse?") + assert "lat=43.6" in url and "lon=-116.2" in url and "limit=1" in url + + +@pytest.mark.asyncio +async def test_happy_path_maps_to_canonical(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_PHOTON_OK)) + result = await b.reverse(43.6, -116.2) + assert set(result.keys()) == set(GEOCODER_FIELDS) + assert result["city"] == "Boise" + assert result["county"] == "Ada County" + assert result["state"] == "Idaho" + assert result["country"] == "United States" + assert result["postal_code"] == "83702" # mapped from Photon 'postcode' + + +@pytest.mark.asyncio +async def test_navi_only_fields_are_null(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_PHOTON_OK)) + result = await b.reverse(43.6, -116.2) + assert result["timezone"] is None + assert result["landclass"] is None + assert result["elevation_m"] is None + + +@pytest.mark.asyncio +async def test_empty_features_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(return_value={"features": []}) + assert await b.reverse(0.0, 0.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_missing_properties_keys_become_null(): + b = _backend() + b._fetch = AsyncMock(return_value={"features": [{"properties": {"name": "X"}}]}) + result = await b.reverse(1.0, 2.0) + assert result["name"] == "X" + assert result["city"] is None + assert result["postal_code"] is None + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_network_error_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ConnectionError("down")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_malformed_json_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ValueError("bad json")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() diff --git a/tests/test_producer_doc.py b/tests/test_producer_doc.py index aef32e7..07abbef 100644 --- a/tests/test_producer_doc.py +++ b/tests/test_producer_doc.py @@ -23,8 +23,14 @@ from pathlib import Path from central.adapter import SourceAdapter from central.adapter_discovery import discover_adapters +from central.enrichment.geocoder import GEOCODER_FIELDS from central.streams import STREAMS +# The verbatim design-principle quote that must stay in §2 (Matt, 2026-05-19). +_DESIGN_PRINCIPLE_QUOTE = ( + "Central takes it all and gives it all. It's up to the pipe to do with it" +) + DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md" @@ -186,6 +192,48 @@ def test_streams_snippet_quotes_live_registry(): ) +def _section(doc: str, header_re: str) -> str: + """Return the body of the section whose header matches header_re, up to the + next same-or-higher-level header.""" + m = re.search(header_re + r"\s*\n(.*?)(?=^## |\Z)", doc, re.DOTALL | re.MULTILINE) + assert m, f"doc missing section matching {header_re!r}" + return m.group(1) + + +def test_design_principle_quote_present_in_section_2(): + """§2 must still carry the verbatim Matt quote — the reframe changes the + translation beneath it, not the quote itself.""" + section = _section(_doc_text(), r"^## 2\. The design principle") + assert _DESIGN_PRINCIPLE_QUOTE in section, "verbatim design-principle quote missing from §2" + + +def test_anti_pattern_10_1_section_exists(): + """§10.1 must still exist as a subsection (content reframed to + 'enrichment outside the framework', structure preserved).""" + doc = _doc_text() + assert re.search(r"^### 10\.1 ", doc, re.MULTILINE), "doc missing '### 10.1' subsection" + + +def test_enrichment_contract_section_13_has_all_protocol_references(): + """New §13 must name all four enrichment contract types verbatim.""" + section = _section(_doc_text(), r"^## 13\. Enrichment contract") + for ref in ("Enricher", "GeocoderEnricher", "GeocoderBackend", "NoOpBackend"): + assert ref in section, f"§13 missing reference to {ref!r}" + + +def test_enrichment_coverage_matrix_lists_exactly_geocoder_fields(): + """The §13 per-field coverage matrix must list exactly the canonical + GEOCODER_FIELDS — derived from code, not hardcoded here.""" + section = _section(_doc_text(), r"^## 13\. Enrichment contract") + # Matrix rows look like: | `field_name` | ... | + row_fields = set(re.findall(r"^\|\s*`([a-z_]+)`\s*\|", section, re.MULTILINE)) + assert row_fields == set(GEOCODER_FIELDS), ( + f"coverage-matrix field drift: " + f"doc-only={row_fields - set(GEOCODER_FIELDS)}, " + f"code-only={set(GEOCODER_FIELDS) - row_fields}" + ) + + def test_no_orphan_adapter_references_in_anti_patterns(): """Anti-patterns section names two real adapter modules as examples (firms, inciweb in §10.4). Those names must still resolve via