diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index 70036b8..009741a 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -296,6 +296,36 @@ ground-survey workflows. archive is at `https://firms.modaps.eosdis.nasa.gov/`.) - **Removal semantics:** none. FIRMS publishes detections; absence is the signal if a fire stops burning. Consumers should not expect explicit "removal" events. +- **Enrichment (`data._enriched.geocoder`):** FIRMS is the enrichment pilot, so + each event carries a Central-derived geocoder bundle under + `data._enriched.geocoder`. It is *not* an upstream FIRMS field — Central + reverse-geocodes the hotspot's `latitude`/`longitude` and attaches the result. + The bundle always has these nine keys (any unresolved field is `null`, never + missing): + + | key | meaning | + |---|---| + | `name` | place / feature name | + | `city` | city / town / village | + | `county` | county (or equivalent) | + | `state` | state / province | + | `country` | country | + | `postal_code` | postal / ZIP code | + | `timezone` | IANA tz (e.g. `America/Boise`) | + | `landclass` | land-management class (US PAD-US) | + | `elevation_m` | ground elevation, metres | + + **Coverage by region (v0.5.0):** US hotspots get the full bundle (with + sparsity gaps in deep wilderness); non-US hotspots currently get only + `timezone` and `elevation_m` populated (both planet-scale), the rest `null`, + pending an upstream planet expansion. Treat `null` as "not resolved," not + "does not exist." + + **Known wrinkle — `landclass` antimeridian false-positive:** a non-US hotspot + near 51–53°N can spuriously get a non-`null` `landclass` (it false-matches the + Aleutian "Rat Islands" US land-management polygon across the dateline). If you + consume `landclass`, treat a non-`null` value on a clearly non-US point as + suspect. Fix is tracked upstream. - **Live example (verbatim from CT104):** ```json diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index 02ce1bf..542c466 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -24,6 +24,7 @@ are intentionally not restated. Cross-references point into that doc. 10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do) 11. [Settings preview hook](#11-settings-preview-hook) 12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter) +13. [Enrichment contract](#13-enrichment-contract) --- @@ -49,33 +50,47 @@ concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)). > what it will." > — Matt, 2026-05-19 -Adapter authors translate that single sentence into a small number of concrete -rules: +The correct reading of that sentence: **Central is the consumer's only data +plane.** A downstream consumer sees exactly what's on the wire and nothing +more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot +reverse-geocode a coordinate on its own. So whatever Central does NOT put on +the wire is, for every consumer, simply missing. "Gives it all" therefore means +*give the consumer everything a reasonable consumer needs to act on the event* +— not "give the upstream payload only." + +Adapter authors translate that into a small number of concrete rules: - **Preserve every upstream field.** Anything the upstream returns lives in `Event.data` verbatim. Adapters do not silently drop fields, even ones that - look redundant or low-value today. -- **No enrichment.** Adapters do not reverse-geocode, do not call out to - upstream metadata endpoints during normal `poll()` flow, do not consult a - second source to "fill in" a missing field. If a downstream consumer wants - enrichment, that is consumer-side work. -- **No opinionated translation.** Adapters do not coerce units, do not rename - fields to match a Central-wide vocabulary, do not collapse upstream - enumerations into Central's preferred labels. -- **The only adapter-side transforms are mechanical.** Specifically: - subject-token normalization (camelCase → snake_case, agency-prefix splitting, - whitespace → underscore, lowercase) and dedup-key construction. Both are - deterministic functions of upstream identifiers. Nothing else. + look redundant or low-value today (see [§10.2](#102-silent-field-dropping)). +- **Enrich, deliberately and centrally.** Location, timezone, elevation, + landclass and similar context that consumers reliably need should be resolved + once, by Central, and attached — not left for twelve consumers to each + re-derive (most of them can't). Enrichment runs through the framework + ([§13](#13-enrichment-contract)): an adapter declares `enrichment_locations` + and the supervisor attaches results under `Event.data["_enriched"]`. +- **Namespace enrichment for provenance.** Central-derived fields live under + `_enriched.`; everything else in `data` is upstream verbatim. + A consumer can always tell which is which. +- **Fail gracefully to null, never to an exception.** Enrichment that can't + resolve a field returns `null` for it (a stable, documented field set), and a + total enrichment failure returns an all-null bundle. A geocoder outage must + never drop or corrupt the underlying event. +- **No opinionated translation of the upstream payload.** Enrichment *adds* + namespaced fields; it does not rewrite upstream ones. Adapters still do not + coerce units, rename upstream fields, or collapse upstream enumerations inside + `data`. The only in-place adapter transforms remain mechanical: subject-token + normalization (camelCase → snake_case, agency-prefix splitting, whitespace → + underscore, lowercase) and dedup-key construction. -This rules out a whole category of plausible-sounding work that prior reviews -have already rejected. For instance, "enrich NWIS site rows with USGS -monitoring-locations metadata during `poll()`" was proposed for Phase 3 and -killed on this principle. The producer adds the field-preserving pipe; the pipe -ends at JetStream publish; everything richer is a downstream concern. - -See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of -anti-patterns. Future authors should reject the same proposals on the same -grounds. +This reframes a Phase 2 rule. The earlier draft of this doc said "no +enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was +rejected on those grounds. That reasoning is now inverted: consumers have no +practical way to do that work, so Central does it. The constraint that survives +is *where* and *how* — through the framework, namespaced, cached, failing to +null — not *whether*. See [§10.1](#101-enrichment-outside-the-framework) for the +remaining anti-pattern (enrichment done outside the framework) and +[§13](#13-enrichment-contract) for the full contract. --- @@ -625,18 +640,30 @@ adapter authors should mirror it. Do not restate it here. These are the patterns prior reviews have explicitly rejected. Reject them again on sight in a new-adapter PR. -### 10.1 Enrichment during `poll()` +### 10.1 Enrichment outside the framework -No calls to upstream metadata endpoints, no reverse-geocode, no consultation -of a second source to fill in fields the primary feed omitted. The "NWIS -enrichment" Phase 3 proposal — joining live measurements against the -monitoring-locations metadata endpoint during `poll()` — was rejected on the -[§2](#2-the-design-principle) principle. Future proposals along the same -lines get the same answer. +Enrichment itself is **expected**, not forbidden — see +[§2](#2-the-design-principle) and [§13](#13-enrichment-contract). Any adapter +with location data should opt in by declaring `enrichment_locations` on the +adapter class; the supervisor then runs the registered enrichers and attaches +results under `Event.data["_enriched"]`. -If enrichment is genuinely necessary, the right shape is a separate adapter -(or a downstream consumer) — not an `if metadata_missing: await -fetch_metadata()` branch buried in an adapter's `poll()`. +The anti-pattern is doing enrichment the *wrong* way — outside the framework: + +- An `if missing: await fetch_metadata()` branch buried in an adapter's + `poll()`. This bypasses the cache (so every poll re-hits the geocoder), skips + the `_enriched` namespacing (so consumers can't tell upstream from + Central-derived), and gives up the never-raise/all-null safety net (so a + geocoder hiccup can take down the poll). +- Writing enriched fields directly into the top level of `Event.data` instead + of under `_enriched`. That destroys provenance — a consumer can no longer + tell which fields came from the upstream feed and which Central added. +- Standing up a parallel enrichment path (a second HTTP client, a private cache) + inside one adapter instead of registering a backend with the framework. + +The rule of thumb: declare `enrichment_locations`, let the supervisor do the +work. If the framework can't express what you need, extend the framework +([§13](#13-enrichment-contract)) — don't route around it inside an adapter. ### 10.2 Silent field dropping @@ -821,3 +848,111 @@ requesting / granting merge. - [ ] **Full pytest suite green.** --- + +## 13. Enrichment contract + +Enrichment is how Central adds consumer-needed context (location names, +timezone, elevation, landclass, …) that the upstream feed doesn't carry and a +downstream consumer can't look up itself. It runs in the supervisor, after +dedup and before the CloudEvents wrap, for any adapter that opts in. Results are +namespaced under `Event.data["_enriched"]` so provenance stays explicit: +everything under `_enriched` is Central-derived; everything else in `data` is +upstream verbatim. + +### 13.1 Opting an adapter in + +Declare `enrichment_locations` on the adapter class — a list of +`(lat_field, lon_field)` tuples naming top-level keys in `Event.data`: + +```python +class FIRMSAdapter(SourceAdapter): + enrichment_locations = [("latitude", "longitude")] +``` + +Empty (the default on `SourceAdapter`) means "no enrichment, publish as-is." +The supervisor uses the first tuple that resolves to a non-null coordinate pair, +runs each registered enricher over `{"lat": …, "lon": …}`, and attaches the +results. No adapter code calls enrichers directly. + +### 13.2 The `Enricher` Protocol + +An enricher is any object satisfying this Protocol (`central.enrichment.base`): + +- `name: str` — short identifier, used as the key under + `Event.data["_enriched"]`. +- `async def enrich(self, location: dict[str, float]) -> dict[str, Any]` — + given `{"lat": float, "lon": float}`, return a flat dict of enrichment + fields. Fields it can't resolve are present with value `None` (not omitted). + **Must never raise** — implementations handle their own failures and return + an all-null bundle on total failure. + +### 13.3 `GeocoderEnricher` and the `GeocoderBackend` Protocol + +`GeocoderEnricher` (`central.enrichment.geocoder`, `name = "geocoder"`) is the +only enricher today. It owns the cache and the canonical field normalization; +the actual reverse-geocode is delegated to a pluggable backend satisfying the +`GeocoderBackend` Protocol: + +- `async def reverse(self, lat: float, lon: float) -> dict[str, Any]` — return + the canonical geocoder fields (see [§13.5](#135-per-field-coverage)); fields + the backend can't resolve return `None`. Must never raise. + +Backends shipped: `NaviBackend` (composed Navi `/api/reverse//` +endpoint — name/address + timezone + landclass + elevation in one call), +`PhotonBackend` (raw Photon, name/address only), `NominatimBackend` (OSM +Nominatim, name/address only, with a configurable rate limit + `User-Agent`), +and `NoOpBackend` (all-null — the default until an operator configures a real +backend). + +### 13.4 Cache + failure semantics + +`GeocoderEnricher` is backed by a sqlite cache (`central.enrichment.cache`, +`/var/lib/central/enrichment_cache.db`): + +- Key: `(enricher_name, lat_rounded, lon_rounded)`, coordinates rounded to 4 + decimal places (~11 m). TTL is per-enricher, default 24h. +- **Cache hit** → return cached bundle, no backend call. +- **Cache miss** → call backend, cache the normalized result (**even an + all-null bundle** — so known-empty coordinates aren't re-hammered), return it. +- **Backend raises** (a violation of the never-raise contract, or an + infrastructure error the backend chose to surface) → return an all-null + bundle and **do not cache** it, so the next event for that coordinate retries. + +Enrichment config (`EnrichmentConfig`: `enricher_class`, `backend_class`, +`backend_settings`, `cache_ttl_s`) is read once at supervisor startup. Changing +the enricher set is a restart, not a hot-reload. + +### 13.5 Per-field coverage + +The canonical geocoder bundle is exactly nine fields. They mirror +`central.enrichment.geocoder.GEOCODER_FIELDS` (the single source of truth — this +table must match it): + +| Field | US events | Non-US events today | Non-US after Photon planet expansion | +|---|---|---|---| +| `name` | populated (wilderness sparsity gaps) | null | populated | +| `city` | populated (wilderness sparsity gaps) | null | populated | +| `county` | populated (wilderness sparsity gaps) | null | populated | +| `state` | populated (wilderness sparsity gaps) | null | populated | +| `country` | populated (wilderness sparsity gaps) | null | populated | +| `postal_code` | populated (wilderness sparsity gaps) | null | populated | +| `timezone` | populated | populated (`tz_world` is planet-scale) | populated | +| `landclass` | populated where PAD-US covers | null (PAD-US is US-only) | null | +| `elevation_m` | populated | populated (planet-DEM) | populated | + +**Net for v0.5.0:** US events get a rich bundle; non-US events get `timezone` + +`elevation_m` and the rest null. Photon planet expansion is queued on the Navi +side with no firm ETA; when it lands, `NaviBackend` picks it up automatically +with zero Central code changes. + +### 13.6 Known wrinkle — landclass antimeridian false-positive + +`landclass` is derived from a PostGIS `ST_Intersects` against PAD-US polygons. +Points near 51–53°N **outside** the US can spuriously match the Aleutian "Rat +Islands" PAD-US polygon (false matching across the antimeridian), yielding a +non-null `landclass` that doesn't actually apply. This is a Navi-side bug being +worked separately; until it's fixed, treat a non-null `landclass` on a clearly +non-US point as suspect. Documented for consumers in +`CONSUMER-INTEGRATION.md`. + +--- diff --git a/src/central/enrichment/backends/__init__.py b/src/central/enrichment/backends/__init__.py index 309c273..7efd2f5 100644 --- a/src/central/enrichment/backends/__init__.py +++ b/src/central/enrichment/backends/__init__.py @@ -1,5 +1,8 @@ """Geocoder backend implementations.""" +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.backends.nominatim import NominatimBackend from central.enrichment.backends.no_op import NoOpBackend +from central.enrichment.backends.photon import PhotonBackend -__all__ = ["NoOpBackend"] +__all__ = ["NoOpBackend", "NaviBackend", "PhotonBackend", "NominatimBackend"] diff --git a/src/central/enrichment/backends/navi.py b/src/central/enrichment/backends/navi.py new file mode 100644 index 0000000..5335388 --- /dev/null +++ b/src/central/enrichment/backends/navi.py @@ -0,0 +1,79 @@ +"""Navi reverse-geocoding backend. + +Hits the composed Navi endpoint `/api/reverse//`, which +already returns the canonical 9-field bundle (name, city, county, state, +country, postal_code, timezone, landclass, elevation_m). Navi composes Photon +(name/address) + tz_world (timezone) + PAD-US (landclass) + planet-DEM +(elevation_m), so this backend is a near-passthrough mapping. + +Coverage today: US events get a rich bundle; non-US events get timezone + +elevation_m populated (both planet-scale) and the rest null until Navi's +Photon planet expansion lands (no Central change needed when it does). +""" + +import asyncio +import logging +from typing import Any + +import aiohttp + +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +logger = logging.getLogger(__name__) + +DEFAULT_BASE_URL = "http://192.168.1.130:8440" +# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup. +_WARMUP_LAT = 43.6150 +_WARMUP_LON = -116.2023 + + +class NaviBackend: + """GeocoderBackend backed by the composed Navi /api/reverse endpoint.""" + + def __init__( + self, + base_url: str = DEFAULT_BASE_URL, + timeout_s: float = 10.0, + headers: dict[str, str] | None = None, + warmup: bool = True, + ) -> None: + self._base_url = base_url.rstrip("/") + self._timeout_s = timeout_s + # Future-proof: drop an Authorization: Bearer … here config-only, no code change. + self._headers = dict(headers or {}) + if warmup: + # Fire-and-forget warmup ping; only if a loop is running (it is under + # the supervisor's asyncio.run, not under sync test construction). + try: + loop = asyncio.get_running_loop() + loop.create_task(self._warmup()) + except RuntimeError: + pass + + def _url(self, lat: float, lon: float) -> str: + return f"{self._base_url}/api/reverse/{lat}/{lon}" + + async def _warmup(self) -> None: + try: + await self._fetch(_WARMUP_LAT, _WARMUP_LON) + except Exception: + # Warmup is best-effort; a failure here must not break startup. + logger.debug("NaviBackend warmup ping failed (non-fatal)") + + async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self._timeout_s), + ) as session: + async with session.get(self._url(lat, lon), headers=self._headers) as resp: + resp.raise_for_status() + return await resp.json() + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + try: + data = await self._fetch(lat, lon) + except Exception: + # Non-200, network error, timeout, malformed JSON — never raise. + logger.debug("NaviBackend reverse failed; returning all-null bundle") + return all_null_bundle() + # Navi's response already matches the canonical shape; map defensively. + return {field: data.get(field) for field in GEOCODER_FIELDS} diff --git a/src/central/enrichment/backends/nominatim.py b/src/central/enrichment/backends/nominatim.py new file mode 100644 index 0000000..0b3a9cb --- /dev/null +++ b/src/central/enrichment/backends/nominatim.py @@ -0,0 +1,95 @@ +"""OSM Nominatim reverse-geocoding backend. + +Works against public OSM Nominatim (1 req/sec + User-Agent required) or a +self-hosted instance (no limit). Resolves name + address only; timezone, +landclass, and elevation_m are nulled (not in the Nominatim reverse response). + +Nominatim jsonv2 reverse response shape: + {"display_name": "...", "name": "...", + "address": {city|town|village, county, state, country, postcode, ...}} +""" + +import asyncio +import logging +import time +from typing import Any +from urllib.parse import urlencode + +import aiohttp + +from central.enrichment.geocoder import all_null_bundle + +logger = logging.getLogger(__name__) + +DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org" +DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)" + + +class NominatimBackend: + """GeocoderBackend backed by an OSM Nominatim /reverse endpoint. + + rate_limit_per_sec throttles outbound requests (public OSM requires <= 1/s); + set it to 0 to disable for self-hosted instances. + """ + + def __init__( + self, + base_url: str = DEFAULT_BASE_URL, + user_agent: str = DEFAULT_USER_AGENT, + rate_limit_per_sec: float = 1.0, + timeout_s: float = 10.0, + ) -> None: + self._base_url = base_url.rstrip("/") + self._user_agent = user_agent + self._min_interval = (1.0 / rate_limit_per_sec) if rate_limit_per_sec > 0 else 0.0 + self._timeout_s = timeout_s + self._rl_lock = asyncio.Lock() + self._last_request_at = 0.0 + + def _url(self, lat: float, lon: float) -> str: + qs = urlencode({"lat": lat, "lon": lon, "format": "jsonv2"}) + return f"{self._base_url}/reverse?{qs}" + + def _request_headers(self) -> dict[str, str]: + # Public Nominatim rejects requests without an identifying User-Agent. + return {"User-Agent": self._user_agent} + + async def _throttle(self) -> None: + if self._min_interval <= 0: + return + async with self._rl_lock: + now = time.monotonic() + wait = self._last_request_at + self._min_interval - now + if wait > 0: + await asyncio.sleep(wait) + self._last_request_at = time.monotonic() + + async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self._timeout_s), + ) as session: + async with session.get( + self._url(lat, lon), headers=self._request_headers() + ) as resp: + resp.raise_for_status() + return await resp.json() + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + try: + await self._throttle() + data = await self._fetch(lat, lon) + addr = data.get("address", {}) or {} + except Exception: + logger.debug("NominatimBackend reverse failed; returning all-null bundle") + return all_null_bundle() + return { + "name": data.get("name") or data.get("display_name"), + "city": addr.get("city") or addr.get("town") or addr.get("village"), + "county": addr.get("county"), + "state": addr.get("state"), + "country": addr.get("country"), + "postal_code": addr.get("postcode"), + "timezone": None, # not in Nominatim reverse response + "landclass": None, # Navi-composed-endpoint only + "elevation_m": None, # Navi-composed-endpoint only + } diff --git a/src/central/enrichment/backends/photon.py b/src/central/enrichment/backends/photon.py new file mode 100644 index 0000000..2e75814 --- /dev/null +++ b/src/central/enrichment/backends/photon.py @@ -0,0 +1,69 @@ +"""Raw Photon reverse-geocoding backend. + +For deployers who run a Photon instance directly, without the composed +Navi-style endpoint. Photon resolves name + address only — timezone, +landclass, and elevation_m are Navi-composed-endpoint extras and are nulled +here. + +Photon reverse response shape: + {"features": [{"properties": {name, city, county, state, country, + postcode, ...}, "geometry": {...}}]} +""" + +import logging +from typing import Any +from urllib.parse import urlencode + +import aiohttp + +from central.enrichment.geocoder import all_null_bundle + +logger = logging.getLogger(__name__) + +DEFAULT_BASE_URL = "http://localhost:2322" + + +class PhotonBackend: + """GeocoderBackend backed by a raw Photon /reverse endpoint.""" + + def __init__( + self, + base_url: str = DEFAULT_BASE_URL, + timeout_s: float = 10.0, + headers: dict[str, str] | None = None, + ) -> None: + self._base_url = base_url.rstrip("/") + self._timeout_s = timeout_s + self._headers = dict(headers or {}) + + def _url(self, lat: float, lon: float) -> str: + qs = urlencode({"lat": lat, "lon": lon, "limit": 1}) + return f"{self._base_url}/reverse?{qs}" + + async def _fetch(self, lat: float, lon: float) -> dict[str, Any]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self._timeout_s), + ) as session: + async with session.get(self._url(lat, lon), headers=self._headers) as resp: + resp.raise_for_status() + return await resp.json() + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + try: + data = await self._fetch(lat, lon) + features = data.get("features") or [] + props = features[0].get("properties", {}) if features else {} + except Exception: + logger.debug("PhotonBackend reverse failed; returning all-null bundle") + return all_null_bundle() + return { + "name": props.get("name"), + "city": props.get("city"), + "county": props.get("county"), + "state": props.get("state"), + "country": props.get("country"), + "postal_code": props.get("postcode"), # Photon names it 'postcode' + "timezone": None, # not provided by raw Photon + "landclass": None, # Navi-composed-endpoint only + "elevation_m": None, # Navi-composed-endpoint only + } diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 8ca6be9..b3ec997 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -24,7 +24,10 @@ from central.api_key_resolver import resolve_api_key_alias from central.config_models import EnrichmentConfig from central.enrichment.base import Enricher from central.enrichment.cache import EnrichmentCache +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.backends.nominatim import NominatimBackend from central.enrichment.backends.no_op import NoOpBackend +from central.enrichment.backends.photon import PhotonBackend from central.enrichment.geocoder import GeocoderEnricher from central.models import Event from central.stream_manager import StreamManager @@ -33,9 +36,13 @@ CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db") # Enricher / backend class-name registries for EnrichmentConfig resolution. -# PR J ships GeocoderEnricher + NoOpBackend only; PR K extends these. _ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher} -_BACKEND_REGISTRY: dict[str, type] = {"NoOpBackend": NoOpBackend} +_BACKEND_REGISTRY: dict[str, type] = { + "NoOpBackend": NoOpBackend, + "NaviBackend": NaviBackend, + "PhotonBackend": PhotonBackend, + "NominatimBackend": NominatimBackend, +} def build_enrichers( diff --git a/tests/test_navi_backend.py b/tests/test_navi_backend.py new file mode 100644 index 0000000..98870ba --- /dev/null +++ b/tests/test_navi_backend.py @@ -0,0 +1,120 @@ +"""Tests for NaviBackend (composed Navi /api/reverse endpoint). + +HTTP is exercised via patching the backend's `_fetch` (the codebase has no +aioresponses/respx dep); URL construction is asserted on the pure `_url` +helper. An env-gated integration smoke against the live Navi endpoint is +skipped by default. +""" + +import os +from unittest.mock import AsyncMock + +import pytest + +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +# Full Navi response — already canonical shape. +_NAVI_OK = { + "name": "Where you are", + "city": "Boise", + "county": "Ada", + "state": "Idaho", + "country": "United States", + "postal_code": "83702", + "timezone": "America/Boise", + "landclass": "Public — National Forest", + "elevation_m": 824, +} + + +def _backend() -> NaviBackend: + # warmup=False so construction issues no background task in tests. + return NaviBackend(base_url="http://navi.test:8440", warmup=False) + + +def test_url_construction(): + b = _backend() + assert b._url(43.615, -116.2023) == "http://navi.test:8440/api/reverse/43.615/-116.2023" + + +def test_base_url_trailing_slash_stripped(): + b = NaviBackend(base_url="http://navi.test:8440/", warmup=False) + assert b._url(1.0, 2.0) == "http://navi.test:8440/api/reverse/1.0/2.0" + + +@pytest.mark.asyncio +async def test_happy_path_passthrough(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_NAVI_OK)) + result = await b.reverse(43.615, -116.2023) + assert result == _NAVI_OK + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_partial_nulls_preserved(): + """Navi 200-with-nulls (non-US: timezone + elevation, rest null).""" + partial = {**all_null_bundle(), "timezone": "Europe/Paris", "elevation_m": 35} + b = _backend() + b._fetch = AsyncMock(return_value=partial) + result = await b.reverse(48.85, 2.35) + assert result["timezone"] == "Europe/Paris" + assert result["elevation_m"] == 35 + assert result["city"] is None + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_extra_keys_dropped(): + b = _backend() + b._fetch = AsyncMock(return_value={**_NAVI_OK, "debug_internal": "leak"}) + result = await b.reverse(1.0, 2.0) + assert "debug_internal" not in result + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_network_error_returns_all_null_never_raises(): + b = _backend() + b._fetch = AsyncMock(side_effect=ConnectionError("boom")) + result = await b.reverse(1.0, 2.0) + assert result == all_null_bundle() + + +@pytest.mark.asyncio +async def test_timeout_returns_all_null(): + import asyncio + + b = _backend() + b._fetch = AsyncMock(side_effect=asyncio.TimeoutError()) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_malformed_response_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ValueError("not json")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_headers_passed_through_config(): + b = NaviBackend(base_url="http://navi.test", headers={"Authorization": "Bearer x"}, warmup=False) + assert b._headers == {"Authorization": "Bearer x"} + + +@pytest.mark.asyncio +@pytest.mark.skipif( + os.environ.get("NAVI_INTEGRATION_TEST") != "1", + reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint", +) +async def test_live_navi_boise(): + """Integration smoke against the real endpoint (default skipped).""" + b = NaviBackend(warmup=False) # default base_url + result = await b.reverse(43.6150, -116.2023) + assert result["name"] == "Where you are" + assert result["city"] == "Boise" + assert result["state"] == "Idaho" + assert result["elevation_m"] is not None + assert abs(float(result["elevation_m"]) - 824) < 50 diff --git a/tests/test_nominatim_backend.py b/tests/test_nominatim_backend.py new file mode 100644 index 0000000..6663d23 --- /dev/null +++ b/tests/test_nominatim_backend.py @@ -0,0 +1,118 @@ +"""Tests for NominatimBackend (OSM Nominatim /reverse jsonv2).""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from central.enrichment.backends.nominatim import ( + DEFAULT_USER_AGENT, + NominatimBackend, +) +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +_NOMINATIM_OK = { + "name": "Boise", + "display_name": "Boise, Ada County, Idaho, United States", + "address": { + "city": "Boise", + "county": "Ada County", + "state": "Idaho", + "country": "United States", + "postcode": "83702", + }, +} + + +def _backend(**kw) -> NominatimBackend: + kw.setdefault("base_url", "http://nominatim.test") + kw.setdefault("rate_limit_per_sec", 0) # disabled by default in tests + return NominatimBackend(**kw) + + +def test_url_and_format(): + b = _backend() + url = b._url(43.6, -116.2) + assert url.startswith("http://nominatim.test/reverse?") + assert "format=jsonv2" in url and "lat=43.6" in url and "lon=-116.2" in url + + +def test_user_agent_header_present(): + b = _backend() + assert b._request_headers()["User-Agent"] == DEFAULT_USER_AGENT + + +def test_custom_user_agent(): + b = _backend(user_agent="myapp/1.0 (me@example.com)") + assert b._request_headers()["User-Agent"] == "myapp/1.0 (me@example.com)" + + +@pytest.mark.asyncio +async def test_happy_path_maps_address_block(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK)) + result = await b.reverse(43.6, -116.2) + assert set(result.keys()) == set(GEOCODER_FIELDS) + assert result["name"] == "Boise" + assert result["city"] == "Boise" + assert result["county"] == "Ada County" + assert result["state"] == "Idaho" + assert result["country"] == "United States" + assert result["postal_code"] == "83702" + + +@pytest.mark.asyncio +async def test_navi_only_fields_null(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK)) + result = await b.reverse(43.6, -116.2) + assert result["timezone"] is None + assert result["landclass"] is None + assert result["elevation_m"] is None + + +@pytest.mark.asyncio +async def test_city_falls_back_to_town_then_village(): + b = _backend() + b._fetch = AsyncMock(return_value={"address": {"village": "Tinytown"}}) + result = await b.reverse(1.0, 2.0) + assert result["city"] == "Tinytown" + + +@pytest.mark.asyncio +async def test_network_error_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ConnectionError("down")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_malformed_json_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ValueError("bad")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_rate_limit_disabled_does_not_sleep(): + b = _backend(rate_limit_per_sec=0) + with patch("central.enrichment.backends.nominatim.asyncio.sleep") as slp: + await b._throttle() + await b._throttle() + slp.assert_not_called() + + +@pytest.mark.asyncio +async def test_rate_limit_spaces_consecutive_requests(): + """With a 1/sec limit, the second back-to-back call must sleep a positive + interval. Mock monotonic to a fixed instant so the throttle computes a wait.""" + b = _backend(rate_limit_per_sec=1.0) + sleeps: list[float] = [] + + async def _fake_sleep(d): + sleeps.append(d) + + with patch("central.enrichment.backends.nominatim.time.monotonic", return_value=100.0), \ + patch("central.enrichment.backends.nominatim.asyncio.sleep", side_effect=_fake_sleep): + await b._throttle() # first: last_request_at was 0 -> no wait + await b._throttle() # second: now==100, last==100 -> wait ~1.0s + assert any(d > 0 for d in sleeps), f"expected a positive throttle sleep, got {sleeps}" diff --git a/tests/test_photon_backend.py b/tests/test_photon_backend.py new file mode 100644 index 0000000..f224d21 --- /dev/null +++ b/tests/test_photon_backend.py @@ -0,0 +1,92 @@ +"""Tests for PhotonBackend (raw Photon /reverse).""" + +from unittest.mock import AsyncMock + +import pytest + +from central.enrichment.backends.photon import PhotonBackend +from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle + +# Photon reverse response shape. +_PHOTON_OK = { + "features": [ + { + "properties": { + "name": "Boise", + "city": "Boise", + "county": "Ada County", + "state": "Idaho", + "country": "United States", + "postcode": "83702", + "osm_key": "place", + }, + "geometry": {"type": "Point", "coordinates": [-116.2, 43.6]}, + } + ] +} + + +def _backend() -> PhotonBackend: + return PhotonBackend(base_url="http://photon.test:2322") + + +def test_url_construction(): + b = _backend() + url = b._url(43.6, -116.2) + assert url.startswith("http://photon.test:2322/reverse?") + assert "lat=43.6" in url and "lon=-116.2" in url and "limit=1" in url + + +@pytest.mark.asyncio +async def test_happy_path_maps_to_canonical(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_PHOTON_OK)) + result = await b.reverse(43.6, -116.2) + assert set(result.keys()) == set(GEOCODER_FIELDS) + assert result["city"] == "Boise" + assert result["county"] == "Ada County" + assert result["state"] == "Idaho" + assert result["country"] == "United States" + assert result["postal_code"] == "83702" # mapped from Photon 'postcode' + + +@pytest.mark.asyncio +async def test_navi_only_fields_are_null(): + b = _backend() + b._fetch = AsyncMock(return_value=dict(_PHOTON_OK)) + result = await b.reverse(43.6, -116.2) + assert result["timezone"] is None + assert result["landclass"] is None + assert result["elevation_m"] is None + + +@pytest.mark.asyncio +async def test_empty_features_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(return_value={"features": []}) + assert await b.reverse(0.0, 0.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_missing_properties_keys_become_null(): + b = _backend() + b._fetch = AsyncMock(return_value={"features": [{"properties": {"name": "X"}}]}) + result = await b.reverse(1.0, 2.0) + assert result["name"] == "X" + assert result["city"] is None + assert result["postal_code"] is None + assert set(result.keys()) == set(GEOCODER_FIELDS) + + +@pytest.mark.asyncio +async def test_network_error_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ConnectionError("down")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() + + +@pytest.mark.asyncio +async def test_malformed_json_returns_all_null(): + b = _backend() + b._fetch = AsyncMock(side_effect=ValueError("bad json")) + assert await b.reverse(1.0, 2.0) == all_null_bundle() diff --git a/tests/test_producer_doc.py b/tests/test_producer_doc.py index aef32e7..07abbef 100644 --- a/tests/test_producer_doc.py +++ b/tests/test_producer_doc.py @@ -23,8 +23,14 @@ from pathlib import Path from central.adapter import SourceAdapter from central.adapter_discovery import discover_adapters +from central.enrichment.geocoder import GEOCODER_FIELDS from central.streams import STREAMS +# The verbatim design-principle quote that must stay in §2 (Matt, 2026-05-19). +_DESIGN_PRINCIPLE_QUOTE = ( + "Central takes it all and gives it all. It's up to the pipe to do with it" +) + DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md" @@ -186,6 +192,48 @@ def test_streams_snippet_quotes_live_registry(): ) +def _section(doc: str, header_re: str) -> str: + """Return the body of the section whose header matches header_re, up to the + next same-or-higher-level header.""" + m = re.search(header_re + r"\s*\n(.*?)(?=^## |\Z)", doc, re.DOTALL | re.MULTILINE) + assert m, f"doc missing section matching {header_re!r}" + return m.group(1) + + +def test_design_principle_quote_present_in_section_2(): + """§2 must still carry the verbatim Matt quote — the reframe changes the + translation beneath it, not the quote itself.""" + section = _section(_doc_text(), r"^## 2\. The design principle") + assert _DESIGN_PRINCIPLE_QUOTE in section, "verbatim design-principle quote missing from §2" + + +def test_anti_pattern_10_1_section_exists(): + """§10.1 must still exist as a subsection (content reframed to + 'enrichment outside the framework', structure preserved).""" + doc = _doc_text() + assert re.search(r"^### 10\.1 ", doc, re.MULTILINE), "doc missing '### 10.1' subsection" + + +def test_enrichment_contract_section_13_has_all_protocol_references(): + """New §13 must name all four enrichment contract types verbatim.""" + section = _section(_doc_text(), r"^## 13\. Enrichment contract") + for ref in ("Enricher", "GeocoderEnricher", "GeocoderBackend", "NoOpBackend"): + assert ref in section, f"§13 missing reference to {ref!r}" + + +def test_enrichment_coverage_matrix_lists_exactly_geocoder_fields(): + """The §13 per-field coverage matrix must list exactly the canonical + GEOCODER_FIELDS — derived from code, not hardcoded here.""" + section = _section(_doc_text(), r"^## 13\. Enrichment contract") + # Matrix rows look like: | `field_name` | ... | + row_fields = set(re.findall(r"^\|\s*`([a-z_]+)`\s*\|", section, re.MULTILINE)) + assert row_fields == set(GEOCODER_FIELDS), ( + f"coverage-matrix field drift: " + f"doc-only={row_fields - set(GEOCODER_FIELDS)}, " + f"code-only={set(GEOCODER_FIELDS) - row_fields}" + ) + + def test_no_orphan_adapter_references_in_anti_patterns(): """Anti-patterns section names two real adapter modules as examples (firms, inciweb in §10.4). Those names must still resolve via