diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index 009741a..70036b8 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -296,36 +296,6 @@ ground-survey workflows. archive is at `https://firms.modaps.eosdis.nasa.gov/`.) - **Removal semantics:** none. FIRMS publishes detections; absence is the signal if a fire stops burning. Consumers should not expect explicit "removal" events. -- **Enrichment (`data._enriched.geocoder`):** FIRMS is the enrichment pilot, so - each event carries a Central-derived geocoder bundle under - `data._enriched.geocoder`. It is *not* an upstream FIRMS field — Central - reverse-geocodes the hotspot's `latitude`/`longitude` and attaches the result. - The bundle always has these nine keys (any unresolved field is `null`, never - missing): - - | key | meaning | - |---|---| - | `name` | place / feature name | - | `city` | city / town / village | - | `county` | county (or equivalent) | - | `state` | state / province | - | `country` | country | - | `postal_code` | postal / ZIP code | - | `timezone` | IANA tz (e.g. `America/Boise`) | - | `landclass` | land-management class (US PAD-US) | - | `elevation_m` | ground elevation, metres | - - **Coverage by region (v0.5.0):** US hotspots get the full bundle (with - sparsity gaps in deep wilderness); non-US hotspots currently get only - `timezone` and `elevation_m` populated (both planet-scale), the rest `null`, - pending an upstream planet expansion. Treat `null` as "not resolved," not - "does not exist." - - **Known wrinkle — `landclass` antimeridian false-positive:** a non-US hotspot - near 51–53°N can spuriously get a non-`null` `landclass` (it false-matches the - Aleutian "Rat Islands" US land-management polygon across the dateline). If you - consume `landclass`, treat a non-`null` value on a clearly non-US point as - suspect. Fix is tracked upstream. - **Live example (verbatim from CT104):** ```json diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index 542c466..02ce1bf 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -24,7 +24,6 @@ are intentionally not restated. Cross-references point into that doc. 10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do) 11. [Settings preview hook](#11-settings-preview-hook) 12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter) -13. [Enrichment contract](#13-enrichment-contract) --- @@ -50,47 +49,33 @@ concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)). > what it will." > — Matt, 2026-05-19 -The correct reading of that sentence: **Central is the consumer's only data -plane.** A downstream consumer sees exactly what's on the wire and nothing -more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot -reverse-geocode a coordinate on its own. So whatever Central does NOT put on -the wire is, for every consumer, simply missing. "Gives it all" therefore means -*give the consumer everything a reasonable consumer needs to act on the event* -— not "give the upstream payload only." - -Adapter authors translate that into a small number of concrete rules: +Adapter authors translate that single sentence into a small number of concrete +rules: - **Preserve every upstream field.** Anything the upstream returns lives in `Event.data` verbatim. Adapters do not silently drop fields, even ones that - look redundant or low-value today (see [§10.2](#102-silent-field-dropping)). -- **Enrich, deliberately and centrally.** Location, timezone, elevation, - landclass and similar context that consumers reliably need should be resolved - once, by Central, and attached — not left for twelve consumers to each - re-derive (most of them can't). Enrichment runs through the framework - ([§13](#13-enrichment-contract)): an adapter declares `enrichment_locations` - and the supervisor attaches results under `Event.data["_enriched"]`. -- **Namespace enrichment for provenance.** Central-derived fields live under - `_enriched.`; everything else in `data` is upstream verbatim. - A consumer can always tell which is which. -- **Fail gracefully to null, never to an exception.** Enrichment that can't - resolve a field returns `null` for it (a stable, documented field set), and a - total enrichment failure returns an all-null bundle. A geocoder outage must - never drop or corrupt the underlying event. -- **No opinionated translation of the upstream payload.** Enrichment *adds* - namespaced fields; it does not rewrite upstream ones. Adapters still do not - coerce units, rename upstream fields, or collapse upstream enumerations inside - `data`. The only in-place adapter transforms remain mechanical: subject-token - normalization (camelCase → snake_case, agency-prefix splitting, whitespace → - underscore, lowercase) and dedup-key construction. + look redundant or low-value today. +- **No enrichment.** Adapters do not reverse-geocode, do not call out to + upstream metadata endpoints during normal `poll()` flow, do not consult a + second source to "fill in" a missing field. If a downstream consumer wants + enrichment, that is consumer-side work. +- **No opinionated translation.** Adapters do not coerce units, do not rename + fields to match a Central-wide vocabulary, do not collapse upstream + enumerations into Central's preferred labels. +- **The only adapter-side transforms are mechanical.** Specifically: + subject-token normalization (camelCase → snake_case, agency-prefix splitting, + whitespace → underscore, lowercase) and dedup-key construction. Both are + deterministic functions of upstream identifiers. Nothing else. -This reframes a Phase 2 rule. The earlier draft of this doc said "no -enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was -rejected on those grounds. That reasoning is now inverted: consumers have no -practical way to do that work, so Central does it. The constraint that survives -is *where* and *how* — through the framework, namespaced, cached, failing to -null — not *whether*. See [§10.1](#101-enrichment-outside-the-framework) for the -remaining anti-pattern (enrichment done outside the framework) and -[§13](#13-enrichment-contract) for the full contract. +This rules out a whole category of plausible-sounding work that prior reviews +have already rejected. For instance, "enrich NWIS site rows with USGS +monitoring-locations metadata during `poll()`" was proposed for Phase 3 and +killed on this principle. The producer adds the field-preserving pipe; the pipe +ends at JetStream publish; everything richer is a downstream concern. + +See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of +anti-patterns. Future authors should reject the same proposals on the same +grounds. --- @@ -640,30 +625,18 @@ adapter authors should mirror it. Do not restate it here. These are the patterns prior reviews have explicitly rejected. Reject them again on sight in a new-adapter PR. -### 10.1 Enrichment outside the framework +### 10.1 Enrichment during `poll()` -Enrichment itself is **expected**, not forbidden — see -[§2](#2-the-design-principle) and [§13](#13-enrichment-contract). Any adapter -with location data should opt in by declaring `enrichment_locations` on the -adapter class; the supervisor then runs the registered enrichers and attaches -results under `Event.data["_enriched"]`. +No calls to upstream metadata endpoints, no reverse-geocode, no consultation +of a second source to fill in fields the primary feed omitted. The "NWIS +enrichment" Phase 3 proposal — joining live measurements against the +monitoring-locations metadata endpoint during `poll()` — was rejected on the +[§2](#2-the-design-principle) principle. Future proposals along the same +lines get the same answer. -The anti-pattern is doing enrichment the *wrong* way — outside the framework: - -- An `if missing: await fetch_metadata()` branch buried in an adapter's - `poll()`. This bypasses the cache (so every poll re-hits the geocoder), skips - the `_enriched` namespacing (so consumers can't tell upstream from - Central-derived), and gives up the never-raise/all-null safety net (so a - geocoder hiccup can take down the poll). -- Writing enriched fields directly into the top level of `Event.data` instead - of under `_enriched`. That destroys provenance — a consumer can no longer - tell which fields came from the upstream feed and which Central added. -- Standing up a parallel enrichment path (a second HTTP client, a private cache) - inside one adapter instead of registering a backend with the framework. - -The rule of thumb: declare `enrichment_locations`, let the supervisor do the -work. If the framework can't express what you need, extend the framework -([§13](#13-enrichment-contract)) — don't route around it inside an adapter. +If enrichment is genuinely necessary, the right shape is a separate adapter +(or a downstream consumer) — not an `if metadata_missing: await +fetch_metadata()` branch buried in an adapter's `poll()`. ### 10.2 Silent field dropping @@ -848,111 +821,3 @@ requesting / granting merge. - [ ] **Full pytest suite green.** --- - -## 13. Enrichment contract - -Enrichment is how Central adds consumer-needed context (location names, -timezone, elevation, landclass, …) that the upstream feed doesn't carry and a -downstream consumer can't look up itself. It runs in the supervisor, after -dedup and before the CloudEvents wrap, for any adapter that opts in. Results are -namespaced under `Event.data["_enriched"]` so provenance stays explicit: -everything under `_enriched` is Central-derived; everything else in `data` is -upstream verbatim. - -### 13.1 Opting an adapter in - -Declare `enrichment_locations` on the adapter class — a list of -`(lat_field, lon_field)` tuples naming top-level keys in `Event.data`: - -```python -class FIRMSAdapter(SourceAdapter): - enrichment_locations = [("latitude", "longitude")] -``` - -Empty (the default on `SourceAdapter`) means "no enrichment, publish as-is." -The supervisor uses the first tuple that resolves to a non-null coordinate pair, -runs each registered enricher over `{"lat": …, "lon": …}`, and attaches the -results. No adapter code calls enrichers directly. - -### 13.2 The `Enricher` Protocol - -An enricher is any object satisfying this Protocol (`central.enrichment.base`): - -- `name: str` — short identifier, used as the key under - `Event.data["_enriched"]`. -- `async def enrich(self, location: dict[str, float]) -> dict[str, Any]` — - given `{"lat": float, "lon": float}`, return a flat dict of enrichment - fields. Fields it can't resolve are present with value `None` (not omitted). - **Must never raise** — implementations handle their own failures and return - an all-null bundle on total failure. - -### 13.3 `GeocoderEnricher` and the `GeocoderBackend` Protocol - -`GeocoderEnricher` (`central.enrichment.geocoder`, `name = "geocoder"`) is the -only enricher today. It owns the cache and the canonical field normalization; -the actual reverse-geocode is delegated to a pluggable backend satisfying the -`GeocoderBackend` Protocol: - -- `async def reverse(self, lat: float, lon: float) -> dict[str, Any]` — return - the canonical geocoder fields (see [§13.5](#135-per-field-coverage)); fields - the backend can't resolve return `None`. Must never raise. - -Backends shipped: `NaviBackend` (composed Navi `/api/reverse//` -endpoint — name/address + timezone + landclass + elevation in one call), -`PhotonBackend` (raw Photon, name/address only), `NominatimBackend` (OSM -Nominatim, name/address only, with a configurable rate limit + `User-Agent`), -and `NoOpBackend` (all-null — the default until an operator configures a real -backend). - -### 13.4 Cache + failure semantics - -`GeocoderEnricher` is backed by a sqlite cache (`central.enrichment.cache`, -`/var/lib/central/enrichment_cache.db`): - -- Key: `(enricher_name, lat_rounded, lon_rounded)`, coordinates rounded to 4 - decimal places (~11 m). TTL is per-enricher, default 24h. -- **Cache hit** → return cached bundle, no backend call. -- **Cache miss** → call backend, cache the normalized result (**even an - all-null bundle** — so known-empty coordinates aren't re-hammered), return it. -- **Backend raises** (a violation of the never-raise contract, or an - infrastructure error the backend chose to surface) → return an all-null - bundle and **do not cache** it, so the next event for that coordinate retries. - -Enrichment config (`EnrichmentConfig`: `enricher_class`, `backend_class`, -`backend_settings`, `cache_ttl_s`) is read once at supervisor startup. Changing -the enricher set is a restart, not a hot-reload. - -### 13.5 Per-field coverage - -The canonical geocoder bundle is exactly nine fields. They mirror -`central.enrichment.geocoder.GEOCODER_FIELDS` (the single source of truth — this -table must match it): - -| Field | US events | Non-US events today | Non-US after Photon planet expansion | -|---|---|---|---| -| `name` | populated (wilderness sparsity gaps) | null | populated | -| `city` | populated (wilderness sparsity gaps) | null | populated | -| `county` | populated (wilderness sparsity gaps) | null | populated | -| `state` | populated (wilderness sparsity gaps) | null | populated | -| `country` | populated (wilderness sparsity gaps) | null | populated | -| `postal_code` | populated (wilderness sparsity gaps) | null | populated | -| `timezone` | populated | populated (`tz_world` is planet-scale) | populated | -| `landclass` | populated where PAD-US covers | null (PAD-US is US-only) | null | -| `elevation_m` | populated | populated (planet-DEM) | populated | - -**Net for v0.5.0:** US events get a rich bundle; non-US events get `timezone` + -`elevation_m` and the rest null. Photon planet expansion is queued on the Navi -side with no firm ETA; when it lands, `NaviBackend` picks it up automatically -with zero Central code changes. - -### 13.6 Known wrinkle — landclass antimeridian false-positive - -`landclass` is derived from a PostGIS `ST_Intersects` against PAD-US polygons. -Points near 51–53°N **outside** the US can spuriously match the Aleutian "Rat -Islands" PAD-US polygon (false matching across the antimeridian), yielding a -non-null `landclass` that doesn't actually apply. This is a Navi-side bug being -worked separately; until it's fixed, treat a non-null `landclass` on a clearly -non-US point as suspect. Documented for consumers in -`CONSUMER-INTEGRATION.md`. - ---- diff --git a/sql/migrations/024_add_config_enrichment.sql b/sql/migrations/024_add_config_enrichment.sql deleted file mode 100644 index 236c1a2..0000000 --- a/sql/migrations/024_add_config_enrichment.sql +++ /dev/null @@ -1,44 +0,0 @@ --- Migration: 024_add_config_enrichment --- Adds config.enrichment — the single-row, operator-settable enrichment config --- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY. --- --- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)). --- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty --- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs, --- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page --- after this merges. --- --- The seed mirrors central.config_models.EnrichmentConfig() defaults. --- Regenerate via: --- sudo -u central .venv/bin/python -c \ --- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())" --- --- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ... --- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER). - -CREATE TABLE IF NOT EXISTS config.enrichment ( - id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true), - enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher', - backend_class TEXT NOT NULL DEFAULT 'NoOpBackend', - backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb, - cache_ttl_s INTEGER NOT NULL DEFAULT 86400, - updated_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - --- Reuse the existing updated_at trigger function (migration 002). -DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment; -CREATE TRIGGER enrichment_set_updated_at - BEFORE UPDATE ON config.enrichment - FOR EACH ROW - EXECUTE FUNCTION config.set_updated_at(); - --- Reuse the existing NOTIFY function (migration 001) so the supervisor's --- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE --- branch emits 'enrichment:' (empty key — single-row table has no natural key). -DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment; -CREATE TRIGGER enrichment_notify - AFTER INSERT OR UPDATE OR DELETE ON config.enrichment - FOR EACH ROW EXECUTE FUNCTION config.notify_config_change(); - --- Seed the single framework-default row (NoOp; no deployment-specific values). -INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING; diff --git a/src/central/config_source.py b/src/central/config_source.py index 04331f3..b70ddf1 100644 --- a/src/central/config_source.py +++ b/src/central/config_source.py @@ -8,7 +8,7 @@ import logging from collections.abc import Awaitable, Callable from typing import Protocol, runtime_checkable -from central.config_models import AdapterConfig, EnrichmentConfig +from central.config_models import AdapterConfig from central.config_store import ConfigStore logger = logging.getLogger(__name__) @@ -26,10 +26,6 @@ class ConfigSource(Protocol): """Get configuration for a specific adapter.""" ... - async def get_enrichment_config(self) -> EnrichmentConfig: - """Get the enrichment configuration.""" - ... - async def watch_for_changes( self, callback: Callable[[str, str], Awaitable[None] | None], @@ -69,10 +65,6 @@ class DbConfigSource: """Get a specific adapter from database.""" return await self._store.get_adapter(name) - async def get_enrichment_config(self) -> EnrichmentConfig: - """Get the enrichment configuration from database.""" - return await self._store.get_enrichment_config() - async def watch_for_changes( self, callback: Callable[[str, str], Awaitable[None] | None], diff --git a/src/central/config_store.py b/src/central/config_store.py index 0d51658..826f899 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -12,7 +12,7 @@ from typing import Any import asyncpg -from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig +from central.config_models import AdapterConfig, StreamConfig from central.crypto import decrypt, encrypt logger = logging.getLogger(__name__) @@ -129,48 +129,6 @@ class ConfigStore: name, ) - # ------------------------------------------------------------------------- - # Enrichment configuration (single-row config.enrichment) - # ------------------------------------------------------------------------- - - async def get_enrichment_config(self) -> EnrichmentConfig: - """Read the single config.enrichment row. - - Falls back to EnrichmentConfig() framework defaults if the row is - somehow absent (it is migration-seeded, so this is belt-and-suspenders). - """ - async with self._pool.acquire() as conn: - row = await conn.fetchrow( - """ - SELECT enricher_class, backend_class, backend_settings, cache_ttl_s - FROM config.enrichment - WHERE id = true - """ - ) - if row is None: - return EnrichmentConfig() - return EnrichmentConfig(**dict(row)) - - async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None: - """Write the single config.enrichment row (id = true).""" - async with self._pool.acquire() as conn: - await conn.execute( - """ - INSERT INTO config.enrichment - (id, enricher_class, backend_class, backend_settings, cache_ttl_s) - VALUES (true, $1, $2, $3, $4) - ON CONFLICT (id) DO UPDATE SET - enricher_class = EXCLUDED.enricher_class, - backend_class = EXCLUDED.backend_class, - backend_settings = EXCLUDED.backend_settings, - cache_ttl_s = EXCLUDED.cache_ttl_s - """, - config.enricher_class, - config.backend_class, - config.backend_settings, # JSON-encoded by the codec - config.cache_ttl_s, - ) - # ------------------------------------------------------------------------- # Stream configuration # ------------------------------------------------------------------------- diff --git a/src/central/enrichment/backends/__init__.py b/src/central/enrichment/backends/__init__.py index 7efd2f5..309c273 100644 --- a/src/central/enrichment/backends/__init__.py +++ b/src/central/enrichment/backends/__init__.py @@ -1,8 +1,5 @@ """Geocoder backend implementations.""" -from central.enrichment.backends.navi import NaviBackend -from central.enrichment.backends.nominatim import NominatimBackend from central.enrichment.backends.no_op import NoOpBackend -from central.enrichment.backends.photon import PhotonBackend -__all__ = ["NoOpBackend", "NaviBackend", "PhotonBackend", "NominatimBackend"] +__all__ = ["NoOpBackend"] diff --git a/src/central/enrichment/backends/navi.py b/src/central/enrichment/backends/navi.py deleted file mode 100644 index eaec11d..0000000 --- a/src/central/enrichment/backends/navi.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Navi reverse-geocoding backend. - -Hits the composed Navi endpoint `/api/reverse//`, which -already returns the canonical 9-field bundle (name, city, county, state, -country, postal_code, timezone, landclass, elevation_m). Navi composes Photon -(name/address) + tz_world (timezone) + PAD-US (landclass) + planet-DEM -(elevation_m), so this backend is a near-passthrough mapping. - -Coverage today: US events get a rich bundle; non-US events get timezone + -elevation_m populated (both planet-scale) and the rest null until Navi's -Photon planet expansion lands (no Central change needed when it does). -""" - -import asyncio -import logging -from typing import Any - -import aiohttp -from pydantic import BaseModel, ConfigDict, Field - -from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle - -logger = logging.getLogger(__name__) - -# Generic default — operators point this at their Navi instance via the -# /enrichment config page (backend_settings.base_url). No deployment-specific -# host belongs in source. -DEFAULT_BASE_URL = "http://localhost:8440" -# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup. -_WARMUP_LAT = 43.6150 -_WARMUP_LON = -116.2023 - - -class NaviBackendSettings(BaseModel): - """Settings for NaviBackend. Mirrors __init__ defaults exactly.""" - - model_config = ConfigDict(extra="forbid") - - base_url: str = Field(default=DEFAULT_BASE_URL, description="Navi /api/reverse base URL") - timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds") - headers: dict[str, str] | None = Field( - default=None, description="Extra request headers (e.g. Authorization)" - ) - warmup: bool = Field(default=True, description="Fire a warmup ping on construction") - - -class NaviBackend: - """GeocoderBackend backed by the composed Navi /api/reverse endpoint.""" - - settings_schema = NaviBackendSettings - - def __init__( - self, - base_url: str = DEFAULT_BASE_URL, - timeout_s: float = 10.0, - headers: dict[str, str] | None = None, - warmup: bool = True, - ) -> None: - self._base_url = base_url.rstrip("/") - self._timeout_s = timeout_s - # Future-proof: drop an Authorization: Bearer … here config-only, no code change. - self._headers = dict(headers or {}) - if warmup: - # Fire-and-forget warmup ping; only if a loop is running (it is under - # the supervisor's asyncio.run, not under sync test construction). - try: - loop = asyncio.get_running_loop() - loop.create_task(self._warmup()) - except RuntimeError: - pass - - def _url(self, lat: float, lon: float) -> str: - return f"{self._base_url}/api/reverse/{lat}/{lon}" - - async def _warmup(self) -> None: - try: - await self._fetch(_WARMUP_LAT, _WARMUP_LON) - except Exception: - # Warmup is best-effort; a failure here must not break startup. - logger.debug("NaviBackend warmup ping failed (non-fatal)") - - async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: - async with aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=self._timeout_s), - ) as session: - async with session.get(self._url(lat, lon), headers=self._headers) as resp: - resp.raise_for_status() - return await resp.json() - - async def reverse(self, lat: float, lon: float) -> dict[str, Any]: - try: - data = await self._fetch(lat, lon) - except Exception: - # Non-200, network error, timeout, malformed JSON — never raise. - logger.debug("NaviBackend reverse failed; returning all-null bundle") - return all_null_bundle() - # Navi's response already matches the canonical shape; map defensively. - return {field: data.get(field) for field in GEOCODER_FIELDS} diff --git a/src/central/enrichment/backends/no_op.py b/src/central/enrichment/backends/no_op.py index 3b29f94..77b0567 100644 --- a/src/central/enrichment/backends/no_op.py +++ b/src/central/enrichment/backends/no_op.py @@ -7,23 +7,11 @@ which satisfies the GeocoderBackend contract while resolving nothing. from typing import Any -from pydantic import BaseModel, ConfigDict - from central.enrichment.geocoder import all_null_bundle -class NoOpBackendSettings(BaseModel): - """No-op backend takes no settings. extra='forbid' makes switching to - NoOpBackend while stale backend_settings (e.g. a base_url) remain a clean - ValidationError instead of a TypeError at construction.""" - - model_config = ConfigDict(extra="forbid") - - class NoOpBackend: """GeocoderBackend that resolves no fields.""" - settings_schema = NoOpBackendSettings - async def reverse(self, lat: float, lon: float) -> dict[str, Any]: return all_null_bundle() diff --git a/src/central/enrichment/backends/nominatim.py b/src/central/enrichment/backends/nominatim.py deleted file mode 100644 index b2023b5..0000000 --- a/src/central/enrichment/backends/nominatim.py +++ /dev/null @@ -1,113 +0,0 @@ -"""OSM Nominatim reverse-geocoding backend. - -Works against public OSM Nominatim (1 req/sec + User-Agent required) or a -self-hosted instance (no limit). Resolves name + address only; timezone, -landclass, and elevation_m are nulled (not in the Nominatim reverse response). - -Nominatim jsonv2 reverse response shape: - {"display_name": "...", "name": "...", - "address": {city|town|village, county, state, country, postcode, ...}} -""" - -import asyncio -import logging -import time -from typing import Any -from urllib.parse import urlencode - -import aiohttp -from pydantic import BaseModel, ConfigDict, Field - -from central.enrichment.geocoder import all_null_bundle - -logger = logging.getLogger(__name__) - -DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org" -DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)" - - -class NominatimBackendSettings(BaseModel): - """Settings for NominatimBackend. Mirrors __init__ defaults exactly.""" - - model_config = ConfigDict(extra="forbid") - - base_url: str = Field(default=DEFAULT_BASE_URL, description="Nominatim /reverse base URL") - user_agent: str = Field( - default=DEFAULT_USER_AGENT, description="User-Agent (public OSM requires one)" - ) - rate_limit_per_sec: float = Field( - default=1.0, description="Outbound request cap; 0 disables (self-hosted)" - ) - timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds") - - -class NominatimBackend: - """GeocoderBackend backed by an OSM Nominatim /reverse endpoint. - - rate_limit_per_sec throttles outbound requests (public OSM requires <= 1/s); - set it to 0 to disable for self-hosted instances. - """ - - settings_schema = NominatimBackendSettings - - def __init__( - self, - base_url: str = DEFAULT_BASE_URL, - user_agent: str = DEFAULT_USER_AGENT, - rate_limit_per_sec: float = 1.0, - timeout_s: float = 10.0, - ) -> None: - self._base_url = base_url.rstrip("/") - self._user_agent = user_agent - self._min_interval = (1.0 / rate_limit_per_sec) if rate_limit_per_sec > 0 else 0.0 - self._timeout_s = timeout_s - self._rl_lock = asyncio.Lock() - self._last_request_at = 0.0 - - def _url(self, lat: float, lon: float) -> str: - qs = urlencode({"lat": lat, "lon": lon, "format": "jsonv2"}) - return f"{self._base_url}/reverse?{qs}" - - def _request_headers(self) -> dict[str, str]: - # Public Nominatim rejects requests without an identifying User-Agent. - return {"User-Agent": self._user_agent} - - async def _throttle(self) -> None: - if self._min_interval <= 0: - return - async with self._rl_lock: - now = time.monotonic() - wait = self._last_request_at + self._min_interval - now - if wait > 0: - await asyncio.sleep(wait) - self._last_request_at = time.monotonic() - - async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: - async with aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=self._timeout_s), - ) as session: - async with session.get( - self._url(lat, lon), headers=self._request_headers() - ) as resp: - resp.raise_for_status() - return await resp.json() - - async def reverse(self, lat: float, lon: float) -> dict[str, Any]: - try: - await self._throttle() - data = await self._fetch(lat, lon) - addr = data.get("address", {}) or {} - except Exception: - logger.debug("NominatimBackend reverse failed; returning all-null bundle") - return all_null_bundle() - return { - "name": data.get("name") or data.get("display_name"), - "city": addr.get("city") or addr.get("town") or addr.get("village"), - "county": addr.get("county"), - "state": addr.get("state"), - "country": addr.get("country"), - "postal_code": addr.get("postcode"), - "timezone": None, # not in Nominatim reverse response - "landclass": None, # Navi-composed-endpoint only - "elevation_m": None, # Navi-composed-endpoint only - } diff --git a/src/central/enrichment/backends/photon.py b/src/central/enrichment/backends/photon.py deleted file mode 100644 index 7503aef..0000000 --- a/src/central/enrichment/backends/photon.py +++ /dev/null @@ -1,82 +0,0 @@ -"""Raw Photon reverse-geocoding backend. - -For deployers who run a Photon instance directly, without the composed -Navi-style endpoint. Photon resolves name + address only — timezone, -landclass, and elevation_m are Navi-composed-endpoint extras and are nulled -here. - -Photon reverse response shape: - {"features": [{"properties": {name, city, county, state, country, - postcode, ...}, "geometry": {...}}]} -""" - -import logging -from typing import Any -from urllib.parse import urlencode - -import aiohttp -from pydantic import BaseModel, ConfigDict, Field - -from central.enrichment.geocoder import all_null_bundle - -logger = logging.getLogger(__name__) - -DEFAULT_BASE_URL = "http://localhost:2322" - - -class PhotonBackendSettings(BaseModel): - """Settings for PhotonBackend. Mirrors __init__ defaults exactly.""" - - model_config = ConfigDict(extra="forbid") - - base_url: str = Field(default=DEFAULT_BASE_URL, description="Photon /reverse base URL") - timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds") - headers: dict[str, str] | None = Field(default=None, description="Extra request headers") - - -class PhotonBackend: - """GeocoderBackend backed by a raw Photon /reverse endpoint.""" - - settings_schema = PhotonBackendSettings - - def __init__( - self, - base_url: str = DEFAULT_BASE_URL, - timeout_s: float = 10.0, - headers: dict[str, str] | None = None, - ) -> None: - self._base_url = base_url.rstrip("/") - self._timeout_s = timeout_s - self._headers = dict(headers or {}) - - def _url(self, lat: float, lon: float) -> str: - qs = urlencode({"lat": lat, "lon": lon, "limit": 1}) - return f"{self._base_url}/reverse?{qs}" - - async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: - async with aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=self._timeout_s), - ) as session: - async with session.get(self._url(lat, lon), headers=self._headers) as resp: - resp.raise_for_status() - return await resp.json() - - async def reverse(self, lat: float, lon: float) -> dict[str, Any]: - try: - data = await self._fetch(lat, lon) - features = data.get("features") or [] - props = features[0].get("properties", {}) if features else {} - except Exception: - logger.debug("PhotonBackend reverse failed; returning all-null bundle") - return all_null_bundle() - return { - "name": props.get("name"), - "city": props.get("city"), - "county": props.get("county"), - "state": props.get("state"), - "country": props.get("country"), - "postal_code": props.get("postcode"), # Photon names it 'postcode' - "timezone": None, # not provided by raw Photon - "landclass": None, # Navi-composed-endpoint only - "elevation_m": None, # Navi-composed-endpoint only - } diff --git a/src/central/enrichment/cache.py b/src/central/enrichment/cache.py index d36691e..06be7af 100644 --- a/src/central/enrichment/cache.py +++ b/src/central/enrichment/cache.py @@ -124,27 +124,3 @@ class EnrichmentCache: ) -> None: """Cache a bundle (idempotent upsert on the rounded-coords key).""" await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload) - - def _invalidate_sync(self, enricher_name: str | None) -> int: - conn = self._connect() - try: - if enricher_name is None: - cur = conn.execute("DELETE FROM enrichment_cache") - else: - cur = conn.execute( - "DELETE FROM enrichment_cache WHERE enricher_name = ?", - (enricher_name,), - ) - conn.commit() - return cur.rowcount - finally: - conn.close() - - async def invalidate(self, enricher_name: str | None = None) -> int: - """Drop cached bundles. Scoped to one enricher when given, else all. - - Called when enrichment config changes — a new backend would otherwise - keep returning the previous backend's cached results until TTL expiry. - Returns the number of rows deleted. - """ - return await asyncio.to_thread(self._invalidate_sync, enricher_name) diff --git a/src/central/enrichment/geocoder.py b/src/central/enrichment/geocoder.py index 195a734..ab75017 100644 --- a/src/central/enrichment/geocoder.py +++ b/src/central/enrichment/geocoder.py @@ -9,8 +9,6 @@ land in PR K. import logging from typing import Any, Protocol, runtime_checkable -from pydantic import BaseModel - from central.enrichment.cache import EnrichmentCache logger = logging.getLogger(__name__) @@ -40,12 +38,6 @@ def all_null_bundle() -> dict[str, Any]: class GeocoderBackend(Protocol): """The pluggable reverse-geocoding layer beneath GeocoderEnricher.""" - # Pydantic model (extra='forbid') describing this backend's accepted - # settings. The supervisor validates config.enrichment.backend_settings - # against it before instantiating, turning a config/settings mismatch into - # a clean ValidationError instead of a constructor TypeError. - settings_schema: type[BaseModel] - async def reverse(self, lat: float, lon: float) -> dict[str, Any]: """Return canonical geocoder fields (see GEOCODER_FIELDS). diff --git a/src/central/gui/form_descriptors.py b/src/central/gui/form_descriptors.py index adc76a4..ef7588e 100644 --- a/src/central/gui/form_descriptors.py +++ b/src/central/gui/form_descriptors.py @@ -66,8 +66,6 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, return "text", None if field_type is int: return "number", None - if field_type is float: - return "number", None if field_type is bool: return "checkbox", None if field_type is RegionConfig: @@ -90,11 +88,6 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]" ) - # dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings). - # The form renders the value as JSON; the POST handler parses it back. - if field_type is dict or origin is dict: - return "json", None - # Check if it's a BaseModel subclass (nested model other than RegionConfig) if isinstance(field_type, type) and issubclass(field_type, BaseModel): raise NotImplementedError( diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index e6bb580..ef7ba49 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1990,199 +1990,6 @@ async def streams_update( return RedirectResponse(url="/streams", status_code=302) -# ============================================================================= -# Enrichment config route -# ============================================================================= - - -def _outer_enrichment_fields(current: dict) -> list[FieldDescriptor]: - """EnrichmentConfig form fields EXCEPT backend_settings — that one is - rendered as a per-backend
via _backend_fields().""" - from central.config_models import EnrichmentConfig - - return [ - f for f in describe_fields(EnrichmentConfig, current) - if f.name != "backend_settings" - ] - - -def _backend_fields(backend_class: str | None, current_bs: dict) -> list[FieldDescriptor]: - """Field descriptors for the selected backend's settings_schema, or [] when - the backend class is unknown. Same generic describe_fields machinery.""" - from central.supervisor import _BACKEND_REGISTRY - - cls = _BACKEND_REGISTRY.get(backend_class or "") - if cls is None: - return [] - return describe_fields(cls.settings_schema, current_bs or {}) - - -async def _read_enrichment_row(conn) -> dict: - row = await conn.fetchrow( - """ - SELECT enricher_class, backend_class, backend_settings, cache_ttl_s - FROM config.enrichment WHERE id = true - """ - ) - return dict(row) if row is not None else {} - - -def _enrichment_context(request, *, outer_fields, backend_fields, backend_class, - errors=None, form_data=None, backend_form_data=None): - return { - "operator": request.state.operator, - "csrf_token": request.state.csrf_token, - "outer_fields": outer_fields, - "backend_fields": backend_fields, - "backend_class": backend_class, - "errors": errors, - "form_data": form_data, - "backend_form_data": backend_form_data, - } - - -@router.get("/enrichment", response_class=HTMLResponse) -async def enrichment_form(request: Request) -> HTMLResponse: - """Render the enrichment config form (outer fields + a per-backend fieldset - for the currently-selected backend_class).""" - templates = _get_templates() - pool = get_pool() - - async with pool.acquire() as conn: - current = await _read_enrichment_row(conn) - - backend_class = current.get("backend_class") or "NoOpBackend" - current_bs = current.get("backend_settings") or {} - - return templates.TemplateResponse( - request=request, - name="enrichment.html", - context=_enrichment_context( - request, - outer_fields=_outer_enrichment_fields(current), - backend_fields=_backend_fields(backend_class, current_bs), - backend_class=backend_class, - ), - ) - - -@router.post("/enrichment") -async def enrichment_update(request: Request) -> Response: - """Validate + persist the enrichment config. Hot-reload picks it up via the - config.enrichment NOTIFY trigger. backend_settings is validated against the - SUBMITTED backend_class's settings_schema.""" - from central.config_models import EnrichmentConfig - from central.supervisor import _BACKEND_REGISTRY - - templates = _get_templates() - pool = get_pool() - - form = await request.form() - if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token: - raise CsrfValidationError("Invalid CSRF token") - - errors: dict[str, str] = {} - form_data: dict[str, Any] = {} - backend_form_data: dict[str, Any] = {} - parsed: dict[str, Any] = {} - - # --- outer EnrichmentConfig fields (backend_settings excluded) --- - for field in _outer_enrichment_fields({}): - raw = form.get(field.name, "") - form_data[field.name] = raw - if field.widget == "number": - try: - parsed[field.name] = int(raw) if raw else None - except ValueError: - errors[field.name] = f"{field.label} must be a number" - else: # text - parsed[field.name] = raw.strip() if raw else None - - submitted_backend_class = parsed.get("backend_class") - - # --- backend settings fieldset, validated against the SUBMITTED backend --- - backend_settings: dict[str, Any] = {} - backend_cls = _BACKEND_REGISTRY.get(submitted_backend_class or "") - if backend_cls is None and submitted_backend_class: - errors["backend_class"] = f"Unknown backend: {submitted_backend_class}" - elif backend_cls is not None: - for f in describe_fields(backend_cls.settings_schema, {}): - formkey = f"bs_{f.name}" - raw = form.get(formkey, "") - backend_form_data[formkey] = raw - if f.widget == "checkbox": - backend_settings[f.name] = formkey in form - elif f.widget == "json": - if raw and raw.strip(): - try: - backend_settings[f.name] = json.loads(raw) - except json.JSONDecodeError as e: - errors[formkey] = f"{f.label} is not valid JSON: {e}" - # blank -> omit, schema default applies - else: # text / number — let pydantic coerce, omit blanks for defaults - if raw.strip() != "": - backend_settings[f.name] = raw.strip() - if not errors: - try: - backend_settings = backend_cls.settings_schema.model_validate( - backend_settings - ).model_dump() - except ValidationError as e: - for err in e.errors(): - loc = err["loc"][0] if err["loc"] else "unknown" - errors[f"bs_{loc}"] = err["msg"] - - # --- outer EnrichmentConfig validation --- - if not errors: - try: - validated = EnrichmentConfig( - **{k: v for k, v in parsed.items() if v is not None}, - backend_settings=backend_settings, - ) - except ValidationError as e: - for err in e.errors(): - loc = err["loc"][0] if err["loc"] else "unknown" - errors[str(loc)] = err["msg"] - - if errors: - # Re-render against the SUBMITTED backend_class so field errors attach - # to the right schema (operator may be mid-switch with a typo). - return templates.TemplateResponse( - request=request, - name="enrichment.html", - context=_enrichment_context( - request, - outer_fields=_outer_enrichment_fields({}), - backend_fields=_backend_fields(submitted_backend_class, backend_settings), - backend_class=submitted_backend_class, - errors=errors, - form_data=form_data, - backend_form_data=backend_form_data, - ), - status_code=200, - ) - - async with pool.acquire() as conn: - await conn.execute( - """ - INSERT INTO config.enrichment - (id, enricher_class, backend_class, backend_settings, cache_ttl_s) - VALUES (true, $1, $2, $3, $4) - ON CONFLICT (id) DO UPDATE SET - enricher_class = EXCLUDED.enricher_class, - backend_class = EXCLUDED.backend_class, - backend_settings = EXCLUDED.backend_settings, - cache_ttl_s = EXCLUDED.cache_ttl_s - """, - validated.enricher_class, - validated.backend_class, - validated.backend_settings, # encoded as jsonb by the pool codec - validated.cache_ttl_s, - ) - - return RedirectResponse(url="/enrichment", status_code=302) - - # Alias validation regex ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$') diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index be7dbca..a7a667d 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -19,7 +19,6 @@
  • Adapters
  • Events
  • Streams
  • -
  • Enrichment
  • API Keys
  • {{ operator.username }}
  • Change Password
  • diff --git a/src/central/gui/templates/enrichment.html b/src/central/gui/templates/enrichment.html deleted file mode 100644 index e2e4d1c..0000000 --- a/src/central/gui/templates/enrichment.html +++ /dev/null @@ -1,90 +0,0 @@ -{% extends "base.html" %} - -{% block title %}Central — Enrichment{% endblock %} - -{% block content %} -

    Enrichment

    -

    - Central-side event enrichment. Results are attached to each event under - data._enriched.<enricher>. Changes hot-reload into the - supervisor; switching backend invalidates the enrichment cache. Backend - settings below are validated against the selected backend; switching - backend then saving re-renders this form with the new backend's fields. -

    - -
    - - -
    - Configuration - {% for field in outer_fields %} - {% if field.widget == "text" %} - - - {% if field.description %}{{ field.description }}{% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - {% elif field.widget == "number" %} - - - {% if field.description %}{{ field.description }}{% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - {% endif %} - {% endfor %} -
    - -
    - Backend settings — {{ backend_class }} - {% if not backend_fields %} - This backend takes no settings. - {% endif %} - {% for field in backend_fields %} - {% set fk = "bs_" ~ field.name %} - {% if field.widget == "text" %} - - - {% if field.description %}{{ field.description }}{% endif %} - {% if errors and errors[fk] %} - {{ errors[fk] }} - {% endif %} - {% elif field.widget == "number" %} - - - {% if field.description %}{{ field.description }}{% endif %} - {% if errors and errors[fk] %} - {{ errors[fk] }} - {% endif %} - {% elif field.widget == "checkbox" %} - - {% if field.description %}{{ field.description }}{% endif %} - {% if errors and errors[fk] %} - {{ errors[fk] }} - {% endif %} - {% elif field.widget == "json" %} - - - JSON object{% if field.description %} — {{ field.description }}{% endif %} - {% if errors and errors[fk] %} - {{ errors[fk] }} - {% endif %} - {% endif %} - {% endfor %} -
    - - -
    -{% endblock %} diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 90913a7..8ca6be9 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -12,7 +12,6 @@ from typing import Any import nats from nats.js import JetStreamContext -from pydantic import ValidationError from central.adapter import SourceAdapter from central.adapter_discovery import discover_adapters @@ -25,10 +24,7 @@ from central.api_key_resolver import resolve_api_key_alias from central.config_models import EnrichmentConfig from central.enrichment.base import Enricher from central.enrichment.cache import EnrichmentCache -from central.enrichment.backends.navi import NaviBackend -from central.enrichment.backends.nominatim import NominatimBackend from central.enrichment.backends.no_op import NoOpBackend -from central.enrichment.backends.photon import PhotonBackend from central.enrichment.geocoder import GeocoderEnricher from central.models import Event from central.stream_manager import StreamManager @@ -37,32 +33,23 @@ CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db") # Enricher / backend class-name registries for EnrichmentConfig resolution. +# PR J ships GeocoderEnricher + NoOpBackend only; PR K extends these. _ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher} -_BACKEND_REGISTRY: dict[str, type] = { - "NoOpBackend": NoOpBackend, - "NaviBackend": NaviBackend, - "PhotonBackend": PhotonBackend, - "NominatimBackend": NominatimBackend, -} +_BACKEND_REGISTRY: dict[str, type] = {"NoOpBackend": NoOpBackend} def build_enrichers( enrichment_config: EnrichmentConfig, - cache: EnrichmentCache, + cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH, ) -> list[Enricher]: - """Instantiate the configured enricher(s) with their backend + the given cache. + """Instantiate the configured enricher(s) with their backend + cache. - The supervisor owns the cache (so it can invalidate it on a config change) - and passes it in. Enrichment config is read from config.enrichment at - startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change). + Read once at supervisor startup — enrichment config is NOT hot-reloaded + in PR J (see EnrichmentConfig docstring). """ backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class] - # Validate backend_settings against the backend's settings_schema BEFORE - # constructing. A mismatch (e.g. a stale base_url left in the row after - # switching to NoOpBackend) raises a clean pydantic ValidationError here - # instead of a TypeError inside the backend constructor. - validated = backend_cls.settings_schema.model_validate(enrichment_config.backend_settings) - backend = backend_cls(**validated.model_dump()) + backend = backend_cls(**enrichment_config.backend_settings) + cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s) enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class] return [enricher_cls(backend, cache=cache)] @@ -171,15 +158,9 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config - # Enrichment: a valid default set is built now so the supervisor is - # never in a half-state; start() then overrides it from the live - # config.enrichment row, and _on_config_change hot-reloads it. - self._active_enrichment_config = enrichment_config or EnrichmentConfig() - self._enrichment_cache = EnrichmentCache( - ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s - ) + # Enrichment is read once at startup (no hot-reload in PR J). self._enrichers: list[Enricher] = build_enrichers( - self._active_enrichment_config, self._enrichment_cache + enrichment_config or EnrichmentConfig() ) self._adapters = discover_adapters() self._nc: nats.NATS | None = None @@ -685,74 +666,11 @@ class Supervisor: extra={"stream": stream_config.name, "error": str(e)}, ) - def _rebuild_enrichers(self, config: EnrichmentConfig) -> None: - """Rebuild the active enricher set + cache from an EnrichmentConfig. - - Builds into locals first and commits to instance state only on success, - so a ValidationError (bad backend_settings) leaves the previously-active - enrichers/config/cache untouched and propagates to the caller. - """ - cache = EnrichmentCache(ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s) - enrichers = build_enrichers(config, cache) # may raise ValidationError - self._active_enrichment_config = config - self._enrichment_cache = cache - self._enrichers = enrichers - - async def _handle_enrichment_change(self) -> None: - """Re-read config.enrichment and rebuild enrichers. Invalidate the cache - when the backend changed, so stale results from the previous backend - don't survive until TTL expiry. - - Invalid backend_settings (ValidationError) leave the previous backend - running — the supervisor stays up; the operator fixes the row and the - next NOTIFY brings it in cleanly. - """ - new_config = await self._config_source.get_enrichment_config() - old_config = self._active_enrichment_config - backend_changed = ( - new_config.backend_class != old_config.backend_class - or new_config.backend_settings != old_config.backend_settings - or new_config.enricher_class != old_config.enricher_class - ) - try: - self._rebuild_enrichers(new_config) - except ValidationError as e: - logger.error( - "Enrichment config invalid; keeping previous backend", - extra={ - "enricher_class": new_config.enricher_class, - "backend_class": new_config.backend_class, - "backend_settings": new_config.backend_settings, - "errors": e.errors(), - }, - ) - return - if backend_changed: - deleted = await self._enrichment_cache.invalidate() - logger.info( - "Enrichment backend changed; cache invalidated", - extra={ - "enricher_class": new_config.enricher_class, - "backend_class": new_config.backend_class, - "rows_deleted": deleted, - }, - ) - else: - logger.info( - "Enrichment config changed (cache retained)", - extra={"cache_ttl_s": new_config.cache_ttl_s}, - ) - async def _on_config_change(self, table: str, key: str) -> None: """Handle a configuration change notification. Called when NOTIFY fires for config changes. """ - # Handle enrichment config changes (single-row config.enrichment) - if table == "enrichment": - await self._handle_enrichment_change() - return - # Handle stream changes if table == "streams": stream_name = key @@ -844,18 +762,6 @@ class Supervisor: # Ensure streams exist with correct configuration await self._ensure_streams() - # Load the operator-set enrichment config (overrides the constructor - # default); hot-reloaded thereafter via _on_config_change. - enrichment_config = await self._config_source.get_enrichment_config() - self._rebuild_enrichers(enrichment_config) - logger.info( - "Enrichment configured", - extra={ - "enricher_class": enrichment_config.enricher_class, - "backend_class": enrichment_config.backend_class, - }, - ) - # Load and start enabled adapters enabled_adapters = await self._config_source.list_enabled_adapters() for config in enabled_adapters: diff --git a/tests/test_backend_settings_schema.py b/tests/test_backend_settings_schema.py deleted file mode 100644 index 1d02fcc..0000000 --- a/tests/test_backend_settings_schema.py +++ /dev/null @@ -1,235 +0,0 @@ -"""Tests for per-backend settings schemas (PR L.5). - -Each GeocoderBackend declares a Pydantic settings_schema (extra='forbid'). -build_enrichers validates backend_settings against it BEFORE constructing, so -a config/settings mismatch is a clean ValidationError, not a TypeError. The -supervisor's hot-reload keeps the previous backend running on ValidationError; -the GUI POST re-renders with field errors and does not write the DB row. - -Regression guard for the 2026-05-20 incident: switching backend_class to -NoOpBackend while backend_settings still held {"base_url": ...} crashed -_rebuild_enrichers with `TypeError: NoOpBackend() takes no arguments`. -""" - -from pathlib import Path -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest -from pydantic import BaseModel, ValidationError - -from central.config_models import EnrichmentConfig -from central.enrichment.cache import EnrichmentCache -from central.supervisor import _BACKEND_REGISTRY, build_enrichers - - -# --- schemas exist + extra='forbid' ----------------------------------------- - -def test_every_backend_declares_a_settings_schema(): - for name, cls in _BACKEND_REGISTRY.items(): - schema = getattr(cls, "settings_schema", None) - assert schema is not None, f"{name} has no settings_schema" - assert isinstance(schema, type) and issubclass(schema, BaseModel), name - - -def test_every_settings_schema_forbids_extra(): - for name, cls in _BACKEND_REGISTRY.items(): - with pytest.raises(ValidationError): - cls.settings_schema.model_validate({"definitely_not_a_field": 1}) - - -def test_navi_schema_accepts_valid_kwargs_and_preserves_defaults(): - from central.enrichment.backends.navi import NaviBackendSettings - - m = NaviBackendSettings() - assert m.base_url == "http://localhost:8440" - assert m.timeout_s == 10.0 # preserved from __init__, NOT 5.0 - assert m.headers is None - assert m.warmup is True - m2 = NaviBackendSettings(base_url="http://navi:8440", timeout_s=3.0, warmup=False) - assert m2.base_url == "http://navi:8440" and m2.timeout_s == 3.0 - - -def test_noop_schema_has_no_fields(): - from central.enrichment.backends.no_op import NoOpBackendSettings - - assert list(NoOpBackendSettings.model_fields) == [] - - -# --- build_enrichers validation ---------------------------------------------- - -def _cache(tmp_path) -> EnrichmentCache: - return EnrichmentCache(tmp_path / "c.db") - - -def test_build_enrichers_raises_validation_error_not_typeerror(tmp_path): - """The exact 2026-05-20 bug: NoOpBackend + stale base_url. Must be a clean - ValidationError, never a TypeError from the constructor.""" - cfg = EnrichmentConfig(backend_class="NoOpBackend", backend_settings={"base_url": "http://x"}) - with pytest.raises(ValidationError): - build_enrichers(cfg, _cache(tmp_path)) - - -def test_build_enrichers_navi_valid_settings(tmp_path): - from central.enrichment.backends.navi import NaviBackend - - cfg = EnrichmentConfig( - backend_class="NaviBackend", - backend_settings={"base_url": "http://navi:8440", "warmup": False}, - ) - enrichers = build_enrichers(cfg, _cache(tmp_path)) - assert isinstance(enrichers[0]._backend, NaviBackend) - - -def test_build_enrichers_navi_unknown_setting_rejected(tmp_path): - cfg = EnrichmentConfig( - backend_class="NaviBackend", - backend_settings={"base_url": "http://navi:8440", "bogus": 1}, - ) - with pytest.raises(ValidationError): - build_enrichers(cfg, _cache(tmp_path)) - - -# --- supervisor hot-reload keeps previous backend on ValidationError --------- - -def _supervisor(): - from central import supervisor as sup_mod - - config_source = MagicMock() - config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig()) - return sup_mod.Supervisor( - config_source=config_source, - config_store=MagicMock(), - nats_url="nats://localhost:4222", - ) - - -@pytest.mark.asyncio -async def test_handle_enrichment_change_keeps_previous_on_validation_error(): - """Navi active, then a bad NOTIFY (NoOp + leftover base_url) arrives. The - supervisor must keep NaviBackend running and not crash.""" - from central.enrichment.backends.navi import NaviBackend - from central.enrichment.backends.no_op import NoOpBackend - - with patch("central.supervisor.EnrichmentCache") as cache_cls: - cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0)) - sup = _supervisor() - # Establish a valid NaviBackend active state. - sup._rebuild_enrichers(EnrichmentConfig( - backend_class="NaviBackend", - backend_settings={"base_url": "http://navi:8440", "warmup": False}, - )) - assert isinstance(sup._enrichers[0]._backend, NaviBackend) - active_before = sup._active_enrichment_config - - # Bad config arrives via NOTIFY: NoOp + stale base_url -> ValidationError. - sup._config_source.get_enrichment_config = AsyncMock( - return_value=EnrichmentConfig( - backend_class="NoOpBackend", - backend_settings={"base_url": "http://navi:8440"}, - ) - ) - await sup._handle_enrichment_change() # must NOT raise - - # Previous backend still active; config unchanged; cache NOT invalidated. - assert isinstance(sup._enrichers[0]._backend, NaviBackend) - assert sup._active_enrichment_config is active_before - cache_cls.return_value.invalidate.assert_not_awaited() - - -@pytest.mark.asyncio -async def test_handle_enrichment_change_applies_valid_noop_with_empty_settings(): - """Switching to NoOp with empty backend_settings (the correct way) applies - cleanly and invalidates the cache.""" - from central.enrichment.backends.no_op import NoOpBackend - - with patch("central.supervisor.EnrichmentCache") as cache_cls: - cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=3)) - sup = _supervisor() - sup._rebuild_enrichers(EnrichmentConfig( - backend_class="NaviBackend", - backend_settings={"base_url": "http://navi:8440", "warmup": False}, - )) - sup._config_source.get_enrichment_config = AsyncMock( - return_value=EnrichmentConfig(backend_class="NoOpBackend", backend_settings={}) - ) - await sup._handle_enrichment_change() - assert isinstance(sup._enrichers[0]._backend, NoOpBackend) - cache_cls.return_value.invalidate.assert_awaited() - - -# --- GUI POST validates backend_settings + does not write on error ---------- - -def _mock_pool(conn): - pool = MagicMock() - cm = MagicMock() - cm.__aenter__ = AsyncMock(return_value=conn) - cm.__aexit__ = AsyncMock(return_value=None) - pool.acquire = MagicMock(return_value=cm) - return pool - - -@pytest.mark.asyncio -async def test_gui_post_rejects_bad_backend_settings_without_writing(): - """POST with a bad backend setting (timeout_s non-numeric) re-renders with a - field error keyed bs_timeout_s and does NOT execute the DB upsert.""" - from central.gui.routes import enrichment_update - - request = MagicMock() - request.state.operator = MagicMock(username="op") - request.state.csrf_token = "tok" - - form = { - "csrf_token": "tok", - "enricher_class": "GeocoderEnricher", - "backend_class": "NaviBackend", - "cache_ttl_s": "86400", - "bs_base_url": "http://navi:8440", - "bs_timeout_s": "not-a-number", - } - request.form = AsyncMock(return_value=form) - - conn = MagicMock() - conn.execute = AsyncMock() - conn.fetchrow = AsyncMock(return_value={}) - templates = MagicMock() - templates.TemplateResponse.return_value = MagicMock() - - with patch("central.gui.routes._get_templates", return_value=templates), \ - patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): - await enrichment_update(request) - - conn.execute.assert_not_awaited() # no DB write on validation failure - ctx = templates.TemplateResponse.call_args.kwargs["context"] - assert "bs_timeout_s" in ctx["errors"] - assert ctx["backend_class"] == "NaviBackend" # re-rendered against SUBMITTED backend - - -@pytest.mark.asyncio -async def test_gui_post_writes_on_valid_settings(): - from central.gui.routes import enrichment_update - - request = MagicMock() - request.state.operator = MagicMock(username="op") - request.state.csrf_token = "tok" - form = { - "csrf_token": "tok", - "enricher_class": "GeocoderEnricher", - "backend_class": "NaviBackend", - "cache_ttl_s": "86400", - "bs_base_url": "http://navi:8440", - "bs_timeout_s": "10", - } - request.form = AsyncMock(return_value=form) - conn = MagicMock() - conn.execute = AsyncMock() - templates = MagicMock() - - with patch("central.gui.routes._get_templates", return_value=templates), \ - patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): - resp = await enrichment_update(request) - - conn.execute.assert_awaited() # DB upsert ran - # the backend_settings written are the validated/dumped dict - args = conn.execute.call_args.args - assert "INSERT INTO config.enrichment" in args[0] - assert resp.status_code == 302 diff --git a/tests/test_enrichment_config_plumbing.py b/tests/test_enrichment_config_plumbing.py deleted file mode 100644 index 1ac5e36..0000000 --- a/tests/test_enrichment_config_plumbing.py +++ /dev/null @@ -1,213 +0,0 @@ -"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5). - -Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload -rebuild, cache invalidation on backend change (but not on TTL-only change), -EnrichmentCache.invalidate, the generic json widget for backend_settings, and -the /enrichment GUI render. No real DB / NATS — pool, config_source, and the -EnrichmentCache class are mocked. -""" - -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -from central.config_models import EnrichmentConfig -from central.enrichment.cache import EnrichmentCache -from central.enrichment.backends.navi import NaviBackend -from central.enrichment.backends.no_op import NoOpBackend -from central.gui.form_descriptors import describe_fields - - -# --- mock pool/conn helpers ------------------------------------------------- - -def _mock_pool(conn: MagicMock) -> MagicMock: - pool = MagicMock() - acquire_cm = MagicMock() - acquire_cm.__aenter__ = AsyncMock(return_value=conn) - acquire_cm.__aexit__ = AsyncMock(return_value=None) - pool.acquire = MagicMock(return_value=acquire_cm) - return pool - - -# --- ConfigStore -------------------------------------------------------------- - -@pytest.mark.asyncio -async def test_config_store_reads_enrichment_row(): - from central.config_store import ConfigStore - - conn = MagicMock() - conn.fetchrow = AsyncMock(return_value={ - "enricher_class": "GeocoderEnricher", - "backend_class": "NaviBackend", - "backend_settings": {"base_url": "http://example.test:8440"}, - "cache_ttl_s": 3600, - }) - store = ConfigStore(_mock_pool(conn)) - cfg = await store.get_enrichment_config() - assert isinstance(cfg, EnrichmentConfig) - assert cfg.backend_class == "NaviBackend" - assert cfg.backend_settings == {"base_url": "http://example.test:8440"} - assert cfg.cache_ttl_s == 3600 - - -@pytest.mark.asyncio -async def test_config_store_falls_back_to_defaults_when_row_absent(): - from central.config_store import ConfigStore - - conn = MagicMock() - conn.fetchrow = AsyncMock(return_value=None) - store = ConfigStore(_mock_pool(conn)) - cfg = await store.get_enrichment_config() - assert cfg == EnrichmentConfig() # framework defaults - assert cfg.backend_class == "NoOpBackend" - - -@pytest.mark.asyncio -async def test_config_store_upsert_passes_dict_settings(): - from central.config_store import ConfigStore - - conn = MagicMock() - conn.execute = AsyncMock() - store = ConfigStore(_mock_pool(conn)) - cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"}) - await store.upsert_enrichment_config(cfg) - args = conn.execute.call_args.args - assert "INSERT INTO config.enrichment" in args[0] - # backend_settings passed as a dict (pool codec encodes to jsonb), not a str. - assert {"base_url": "x"} in args - - -# --- EnrichmentCache.invalidate ---------------------------------------------- - -@pytest.mark.asyncio -async def test_cache_invalidate_all(tmp_path): - cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600) - await cache.set("geocoder", 1.0, 2.0, {"name": "x"}) - await cache.set("geocoder", 3.0, 4.0, {"name": "y"}) - deleted = await cache.invalidate() - assert deleted == 2 - assert await cache.get("geocoder", 1.0, 2.0) is None - - -@pytest.mark.asyncio -async def test_cache_invalidate_scoped_to_enricher(tmp_path): - cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600) - await cache.set("geocoder", 1.0, 2.0, {"name": "x"}) - await cache.set("other", 1.0, 2.0, {"name": "z"}) - deleted = await cache.invalidate("geocoder") - assert deleted == 1 - assert await cache.get("geocoder", 1.0, 2.0) is None - assert await cache.get("other", 1.0, 2.0) == {"name": "z"} - - -# --- Supervisor startup read + hot-reload ------------------------------------ - -def _supervisor_with(enrichment_cfg: EnrichmentConfig): - """Build a Supervisor with mocked deps and a mocked EnrichmentCache class - (so no real /var/lib cache file is touched).""" - from central import supervisor as sup_mod - - config_source = MagicMock() - config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg) - config_store = MagicMock() - sup = sup_mod.Supervisor( - config_source=config_source, - config_store=config_store, - nats_url="nats://localhost:4222", - ) - return sup - - -@pytest.mark.asyncio -async def test_supervisor_builds_navi_from_config(): - """Given a config naming NaviBackend, the supervisor's enricher set wraps a - NaviBackend — proves the registry resolution end-to-end.""" - with patch("central.supervisor.EnrichmentCache") as cache_cls: - cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0)) - sup = _supervisor_with( - EnrichmentConfig(backend_class="NaviBackend", - backend_settings={"base_url": "http://x:8440", "warmup": False}) - ) - cfg = await sup._config_source.get_enrichment_config() - sup._rebuild_enrichers(cfg) - assert isinstance(sup._enrichers[0]._backend, NaviBackend) - - -@pytest.mark.asyncio -async def test_hot_reload_rebuilds_and_invalidates_on_backend_change(): - from central import supervisor as sup_mod - - with patch("central.supervisor.EnrichmentCache") as cache_cls: - invalidate = AsyncMock(return_value=5) - cache_cls.return_value = MagicMock(invalidate=invalidate) - # Start at NoOp. - sup = _supervisor_with(EnrichmentConfig()) - sup._rebuild_enrichers(EnrichmentConfig()) - assert isinstance(sup._enrichers[0]._backend, NoOpBackend) - # Config flips to Navi. - sup._config_source.get_enrichment_config = AsyncMock( - return_value=EnrichmentConfig( - backend_class="NaviBackend", - backend_settings={"base_url": "http://x:8440", "warmup": False}, - ) - ) - await sup._handle_enrichment_change() - assert isinstance(sup._enrichers[0]._backend, NaviBackend) - invalidate.assert_awaited() # backend changed -> cache wiped - - -@pytest.mark.asyncio -async def test_hot_reload_does_not_invalidate_on_ttl_only_change(): - with patch("central.supervisor.EnrichmentCache") as cache_cls: - invalidate = AsyncMock(return_value=0) - cache_cls.return_value = MagicMock(invalidate=invalidate) - sup = _supervisor_with(EnrichmentConfig()) - sup._rebuild_enrichers(EnrichmentConfig()) - # Same backend, only TTL changes. - sup._config_source.get_enrichment_config = AsyncMock( - return_value=EnrichmentConfig(cache_ttl_s=3600) - ) - await sup._handle_enrichment_change() - invalidate.assert_not_awaited() - - -# --- generic json widget + GUI render ---------------------------------------- - -def test_describe_fields_renders_dict_as_json_widget(): - fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})} - assert fields["backend_settings"] == "json" - assert fields["enricher_class"] == "text" - assert fields["cache_ttl_s"] == "number" - - -@pytest.mark.asyncio -async def test_enrichment_form_renders(): - from central.gui.routes import enrichment_form - - request = MagicMock() - request.state.operator = MagicMock(username="op") - request.state.csrf_token = "tok" - - conn = MagicMock() - conn.fetchrow = AsyncMock(return_value={ - "enricher_class": "GeocoderEnricher", - "backend_class": "NoOpBackend", - "backend_settings": {}, - "cache_ttl_s": 86400, - }) - templates = MagicMock() - templates.TemplateResponse.return_value = MagicMock() - - with patch("central.gui.routes._get_templates", return_value=templates), \ - patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): - await enrichment_form(request) - - ctx = templates.TemplateResponse.call_args.kwargs["context"] - # PR L.5: outer fields exclude backend_settings (now a per-backend fieldset); - # NoOpBackend's fieldset has zero fields. - outer = {f.name for f in ctx["outer_fields"]} - assert "backend_settings" not in outer - assert "backend_class" in outer - assert ctx["backend_class"] == "NoOpBackend" - assert ctx["backend_fields"] == [] - assert ctx["csrf_token"] == "tok" diff --git a/tests/test_firms.py b/tests/test_firms.py index 6a974e2..2ab1e1d 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -456,7 +456,6 @@ class TestEnrichmentIntegration: """A FIRMS event run through the supervisor's enrichment stage emerges with data._enriched.geocoder populated (all-null under NoOpBackend).""" from central.config_models import EnrichmentConfig - from central.enrichment.cache import EnrichmentCache from central.enrichment.geocoder import all_null_bundle from central.supervisor import apply_enrichment, build_enrichers @@ -470,8 +469,9 @@ class TestEnrichmentIntegration: event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") assert "_enriched" not in event.data - cache = EnrichmentCache(tmp_path / "enrichment_cache.db") - enrichers = build_enrichers(EnrichmentConfig(), cache) + enrichers = build_enrichers( + EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db" + ) await apply_enrichment(event, adapter.enrichment_locations, enrichers) assert "_enriched" in event.data diff --git a/tests/test_navi_backend.py b/tests/test_navi_backend.py deleted file mode 100644 index 2adf90d..0000000 --- a/tests/test_navi_backend.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Tests for NaviBackend (composed Navi /api/reverse endpoint). - -HTTP is exercised via patching the backend's `_fetch` (the codebase has no -aioresponses/respx dep); URL construction is asserted on the pure `_url` -helper. An env-gated integration smoke against the live Navi endpoint is -skipped by default. -""" - -import os -from unittest.mock import AsyncMock - -import pytest - -from central.enrichment.backends.navi import NaviBackend -from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle - -# Full Navi response — already canonical shape. -_NAVI_OK = { - "name": "Where you are", - "city": "Boise", - "county": "Ada", - "state": "Idaho", - "country": "United States", - "postal_code": "83702", - "timezone": "America/Boise", - "landclass": "Public — National Forest", - "elevation_m": 824, -} - - -def _backend() -> NaviBackend: - # warmup=False so construction issues no background task in tests. - return NaviBackend(base_url="http://navi.test:8440", warmup=False) - - -def test_url_construction(): - b = _backend() - assert b._url(43.615, -116.2023) == "http://navi.test:8440/api/reverse/43.615/-116.2023" - - -def test_base_url_trailing_slash_stripped(): - b = NaviBackend(base_url="http://navi.test:8440/", warmup=False) - assert b._url(1.0, 2.0) == "http://navi.test:8440/api/reverse/1.0/2.0" - - -@pytest.mark.asyncio -async def test_happy_path_passthrough(): - b = _backend() - b._fetch = AsyncMock(return_value=dict(_NAVI_OK)) - result = await b.reverse(43.615, -116.2023) - assert result == _NAVI_OK - assert set(result.keys()) == set(GEOCODER_FIELDS) - - -@pytest.mark.asyncio -async def test_partial_nulls_preserved(): - """Navi 200-with-nulls (non-US: timezone + elevation, rest null).""" - partial = {**all_null_bundle(), "timezone": "Europe/Paris", "elevation_m": 35} - b = _backend() - b._fetch = AsyncMock(return_value=partial) - result = await b.reverse(48.85, 2.35) - assert result["timezone"] == "Europe/Paris" - assert result["elevation_m"] == 35 - assert result["city"] is None - assert set(result.keys()) == set(GEOCODER_FIELDS) - - -@pytest.mark.asyncio -async def test_extra_keys_dropped(): - b = _backend() - b._fetch = AsyncMock(return_value={**_NAVI_OK, "debug_internal": "leak"}) - result = await b.reverse(1.0, 2.0) - assert "debug_internal" not in result - assert set(result.keys()) == set(GEOCODER_FIELDS) - - -@pytest.mark.asyncio -async def test_network_error_returns_all_null_never_raises(): - b = _backend() - b._fetch = AsyncMock(side_effect=ConnectionError("boom")) - result = await b.reverse(1.0, 2.0) - assert result == all_null_bundle() - - -@pytest.mark.asyncio -async def test_timeout_returns_all_null(): - import asyncio - - b = _backend() - b._fetch = AsyncMock(side_effect=asyncio.TimeoutError()) - assert await b.reverse(1.0, 2.0) == all_null_bundle() - - -@pytest.mark.asyncio -async def test_malformed_response_returns_all_null(): - b = _backend() - b._fetch = AsyncMock(side_effect=ValueError("not json")) - assert await b.reverse(1.0, 2.0) == all_null_bundle() - - -@pytest.mark.asyncio -async def test_headers_passed_through_config(): - b = NaviBackend(base_url="http://navi.test", headers={"Authorization": "Bearer x"}, warmup=False) - assert b._headers == {"Authorization": "Bearer x"} - - -@pytest.mark.asyncio -@pytest.mark.skipif( - os.environ.get("NAVI_INTEGRATION_TEST") != "1", - reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint", -) -async def test_live_navi_boise(): - """Integration smoke against the real endpoint (default skipped). - - The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific - address lives in source; defaults to localhost when unset. - """ - base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440") - b = NaviBackend(base_url=base_url, warmup=False) - result = await b.reverse(43.6150, -116.2023) - assert result["name"] == "Where you are" - assert result["city"] == "Boise" - assert result["state"] == "Idaho" - assert result["elevation_m"] is not None - assert abs(float(result["elevation_m"]) - 824) < 50 diff --git a/tests/test_nominatim_backend.py b/tests/test_nominatim_backend.py deleted file mode 100644 index 6663d23..0000000 --- a/tests/test_nominatim_backend.py +++ /dev/null @@ -1,118 +0,0 @@ -"""Tests for NominatimBackend (OSM Nominatim /reverse jsonv2).""" - -from unittest.mock import AsyncMock, patch - -import pytest - -from central.enrichment.backends.nominatim import ( - DEFAULT_USER_AGENT, - NominatimBackend, -) -from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle - -_NOMINATIM_OK = { - "name": "Boise", - "display_name": "Boise, Ada County, Idaho, United States", - "address": { - "city": "Boise", - "county": "Ada County", - "state": "Idaho", - "country": "United States", - "postcode": "83702", - }, -} - - -def _backend(**kw) -> NominatimBackend: - kw.setdefault("base_url", "http://nominatim.test") - kw.setdefault("rate_limit_per_sec", 0) # disabled by default in tests - return NominatimBackend(**kw) - - -def test_url_and_format(): - b = _backend() - url = b._url(43.6, -116.2) - assert url.startswith("http://nominatim.test/reverse?") - assert "format=jsonv2" in url and "lat=43.6" in url and "lon=-116.2" in url - - -def test_user_agent_header_present(): - b = _backend() - assert b._request_headers()["User-Agent"] == DEFAULT_USER_AGENT - - -def test_custom_user_agent(): - b = _backend(user_agent="myapp/1.0 (me@example.com)") - assert b._request_headers()["User-Agent"] == "myapp/1.0 (me@example.com)" - - -@pytest.mark.asyncio -async def test_happy_path_maps_address_block(): - b = _backend() - b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK)) - result = await b.reverse(43.6, -116.2) - assert set(result.keys()) == set(GEOCODER_FIELDS) - assert result["name"] == "Boise" - assert result["city"] == "Boise" - assert result["county"] == "Ada County" - assert result["state"] == "Idaho" - assert result["country"] == "United States" - assert result["postal_code"] == "83702" - - -@pytest.mark.asyncio -async def test_navi_only_fields_null(): - b = _backend() - b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK)) - result = await b.reverse(43.6, -116.2) - assert result["timezone"] is None - assert result["landclass"] is None - assert result["elevation_m"] is None - - -@pytest.mark.asyncio -async def test_city_falls_back_to_town_then_village(): - b = _backend() - b._fetch = AsyncMock(return_value={"address": {"village": "Tinytown"}}) - result = await b.reverse(1.0, 2.0) - assert result["city"] == "Tinytown" - - -@pytest.mark.asyncio -async def test_network_error_returns_all_null(): - b = _backend() - b._fetch = AsyncMock(side_effect=ConnectionError("down")) - assert await b.reverse(1.0, 2.0) == all_null_bundle() - - -@pytest.mark.asyncio -async def test_malformed_json_returns_all_null(): - b = _backend() - b._fetch = AsyncMock(side_effect=ValueError("bad")) - assert await b.reverse(1.0, 2.0) == all_null_bundle() - - -@pytest.mark.asyncio -async def test_rate_limit_disabled_does_not_sleep(): - b = _backend(rate_limit_per_sec=0) - with patch("central.enrichment.backends.nominatim.asyncio.sleep") as slp: - await b._throttle() - await b._throttle() - slp.assert_not_called() - - -@pytest.mark.asyncio -async def test_rate_limit_spaces_consecutive_requests(): - """With a 1/sec limit, the second back-to-back call must sleep a positive - interval. Mock monotonic to a fixed instant so the throttle computes a wait.""" - b = _backend(rate_limit_per_sec=1.0) - sleeps: list[float] = [] - - async def _fake_sleep(d): - sleeps.append(d) - - with patch("central.enrichment.backends.nominatim.time.monotonic", return_value=100.0), \ - patch("central.enrichment.backends.nominatim.asyncio.sleep", side_effect=_fake_sleep): - await b._throttle() # first: last_request_at was 0 -> no wait - await b._throttle() # second: now==100, last==100 -> wait ~1.0s - assert any(d > 0 for d in sleeps), f"expected a positive throttle sleep, got {sleeps}" diff --git a/tests/test_photon_backend.py b/tests/test_photon_backend.py deleted file mode 100644 index f224d21..0000000 --- a/tests/test_photon_backend.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Tests for PhotonBackend (raw Photon /reverse).""" - -from unittest.mock import AsyncMock - -import pytest - -from central.enrichment.backends.photon import PhotonBackend -from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle - -# Photon reverse response shape. -_PHOTON_OK = { - "features": [ - { - "properties": { - "name": "Boise", - "city": "Boise", - "county": "Ada County", - "state": "Idaho", - "country": "United States", - "postcode": "83702", - "osm_key": "place", - }, - "geometry": {"type": "Point", "coordinates": [-116.2, 43.6]}, - } - ] -} - - -def _backend() -> PhotonBackend: - return PhotonBackend(base_url="http://photon.test:2322") - - -def test_url_construction(): - b = _backend() - url = b._url(43.6, -116.2) - assert url.startswith("http://photon.test:2322/reverse?") - assert "lat=43.6" in url and "lon=-116.2" in url and "limit=1" in url - - -@pytest.mark.asyncio -async def test_happy_path_maps_to_canonical(): - b = _backend() - b._fetch = AsyncMock(return_value=dict(_PHOTON_OK)) - result = await b.reverse(43.6, -116.2) - assert set(result.keys()) == set(GEOCODER_FIELDS) - assert result["city"] == "Boise" - assert result["county"] == "Ada County" - assert result["state"] == "Idaho" - assert result["country"] == "United States" - assert result["postal_code"] == "83702" # mapped from Photon 'postcode' - - -@pytest.mark.asyncio -async def test_navi_only_fields_are_null(): - b = _backend() - b._fetch = AsyncMock(return_value=dict(_PHOTON_OK)) - result = await b.reverse(43.6, -116.2) - assert result["timezone"] is None - assert result["landclass"] is None - assert result["elevation_m"] is None - - -@pytest.mark.asyncio -async def test_empty_features_returns_all_null(): - b = _backend() - b._fetch = AsyncMock(return_value={"features": []}) - assert await b.reverse(0.0, 0.0) == all_null_bundle() - - -@pytest.mark.asyncio -async def test_missing_properties_keys_become_null(): - b = _backend() - b._fetch = AsyncMock(return_value={"features": [{"properties": {"name": "X"}}]}) - result = await b.reverse(1.0, 2.0) - assert result["name"] == "X" - assert result["city"] is None - assert result["postal_code"] is None - assert set(result.keys()) == set(GEOCODER_FIELDS) - - -@pytest.mark.asyncio -async def test_network_error_returns_all_null(): - b = _backend() - b._fetch = AsyncMock(side_effect=ConnectionError("down")) - assert await b.reverse(1.0, 2.0) == all_null_bundle() - - -@pytest.mark.asyncio -async def test_malformed_json_returns_all_null(): - b = _backend() - b._fetch = AsyncMock(side_effect=ValueError("bad json")) - assert await b.reverse(1.0, 2.0) == all_null_bundle() diff --git a/tests/test_producer_doc.py b/tests/test_producer_doc.py index 07abbef..aef32e7 100644 --- a/tests/test_producer_doc.py +++ b/tests/test_producer_doc.py @@ -23,14 +23,8 @@ from pathlib import Path from central.adapter import SourceAdapter from central.adapter_discovery import discover_adapters -from central.enrichment.geocoder import GEOCODER_FIELDS from central.streams import STREAMS -# The verbatim design-principle quote that must stay in §2 (Matt, 2026-05-19). -_DESIGN_PRINCIPLE_QUOTE = ( - "Central takes it all and gives it all. It's up to the pipe to do with it" -) - DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md" @@ -192,48 +186,6 @@ def test_streams_snippet_quotes_live_registry(): ) -def _section(doc: str, header_re: str) -> str: - """Return the body of the section whose header matches header_re, up to the - next same-or-higher-level header.""" - m = re.search(header_re + r"\s*\n(.*?)(?=^## |\Z)", doc, re.DOTALL | re.MULTILINE) - assert m, f"doc missing section matching {header_re!r}" - return m.group(1) - - -def test_design_principle_quote_present_in_section_2(): - """§2 must still carry the verbatim Matt quote — the reframe changes the - translation beneath it, not the quote itself.""" - section = _section(_doc_text(), r"^## 2\. The design principle") - assert _DESIGN_PRINCIPLE_QUOTE in section, "verbatim design-principle quote missing from §2" - - -def test_anti_pattern_10_1_section_exists(): - """§10.1 must still exist as a subsection (content reframed to - 'enrichment outside the framework', structure preserved).""" - doc = _doc_text() - assert re.search(r"^### 10\.1 ", doc, re.MULTILINE), "doc missing '### 10.1' subsection" - - -def test_enrichment_contract_section_13_has_all_protocol_references(): - """New §13 must name all four enrichment contract types verbatim.""" - section = _section(_doc_text(), r"^## 13\. Enrichment contract") - for ref in ("Enricher", "GeocoderEnricher", "GeocoderBackend", "NoOpBackend"): - assert ref in section, f"§13 missing reference to {ref!r}" - - -def test_enrichment_coverage_matrix_lists_exactly_geocoder_fields(): - """The §13 per-field coverage matrix must list exactly the canonical - GEOCODER_FIELDS — derived from code, not hardcoded here.""" - section = _section(_doc_text(), r"^## 13\. Enrichment contract") - # Matrix rows look like: | `field_name` | ... | - row_fields = set(re.findall(r"^\|\s*`([a-z_]+)`\s*\|", section, re.MULTILINE)) - assert row_fields == set(GEOCODER_FIELDS), ( - f"coverage-matrix field drift: " - f"doc-only={row_fields - set(GEOCODER_FIELDS)}, " - f"code-only={set(GEOCODER_FIELDS) - row_fields}" - ) - - def test_no_orphan_adapter_references_in_anti_patterns(): """Anti-patterns section names two real adapter modules as examples (firms, inciweb in §10.4). Those names must still resolve via