feat(3-K): real geocoder backends + producer-doc reframe + consumer-doc enrichment

Second of three PRs for v0.5.0 (J shipped the framework; this fills in real
backends + documents the reframed design principle in-tree; L is the events
tab + map fix, then tag).

Backends (all satisfy GeocoderBackend; never raise, all-null on any failure):
- NaviBackend — composed Navi /api/reverse/<lat>/<lon> (name/address + timezone
  + landclass + elevation in one call). Near-passthrough: response already
  matches the canonical 9-field shape. Best-effort warmup ping (Boise) on
  construction when a loop is running; config `headers` slot for a future
  Authorization: Bearer (config-only, no code change). Default base_url
  http://192.168.1.130:8440.
- PhotonBackend — raw Photon /reverse?lat&lon&limit=1 (name/address only).
  Maps features[0].properties; postal_code <- postcode; timezone/landclass/
  elevation_m null (Navi-composed-endpoint extras).
- NominatimBackend — OSM Nominatim /reverse?format=jsonv2 (name/address only).
  Configurable rate limit (default 1/sec; 0 disables for self-hosted) +
  required User-Agent. Maps the address block; landclass/elevation_m/timezone
  null.

Registered all three in supervisor _BACKEND_REGISTRY (resolved by EnrichmentConfig
backend_class name).

Docs — design pivot now in-tree:
- PRODUCER §2 reframed: the verbatim Matt quote stays; the translation inverts.
  Central is the consumer's only data plane (consumers can't do follow-up
  lookups), so enrich deliberately and centrally, namespaced under _enriched,
  failing to null. "No enrichment" is gone.
- PRODUCER §10.1 inverted: enrichment is expected; the anti-pattern is doing it
  OUTSIDE the framework (inline in poll(), bypassing cache + _enriched
  namespacing + the never-raise safety net).
- PRODUCER new §13 Enrichment contract: Enricher / GeocoderEnricher /
  GeocoderBackend Protocols, NoOpBackend default, sqlite cache + TTL +
  cache-all-null + don't-cache-on-raise semantics, _enriched.<name> provenance,
  per-field coverage matrix (cross-checked against GEOCODER_FIELDS), and the
  landclass antimeridian known wrinkle.
- CONSUMER FIRMS section: documents the data._enriched.geocoder bundle (9
  fields), per-region coverage (US-full, non-US timezone+elevation), and the
  antimeridian landclass caveat.

Tests:
- test_navi/photon/nominatim_backend.py — happy-path field mapping, null
  handling, extra-key drop, network/timeout/non-200/malformed -> all-null
  (never raises), Nominatim rate-limit (disabled + spacing) + User-Agent.
  Env-gated live Navi smoke (NAVI_INTEGRATION_TEST=1; skipped by default — the
  192.168.1.130 endpoint isn't reachable from CT104's segment).
- test_producer_doc.py — +4: §2 verbatim quote present, §10.1 subsection exists,
  §13 names all four protocol types, §13 coverage matrix == GEOCODER_FIELDS
  (derived from code, not hardcoded).

Verification: full pytest 525 passed, 1 skipped (was 495; +30 backend +
4 doc tests, -1 the env-gated skip). grep subject_for_event/_ADAPTER_REGISTRY
clean. All three backends import + resolve via the registry.

Flagged for later (NOT done here): adapters besides FIRMS that should declare
enrichment_locations (nwis, eonet, gdacs, usgs_quake, wfigs_*) — that's PR L
scope alongside the events tab. See PR description.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-20 16:10:44 +00:00
commit 98b050b2af
11 changed files with 833 additions and 37 deletions

View file

@ -296,6 +296,36 @@ ground-survey workflows.
archive is at `https://firms.modaps.eosdis.nasa.gov/`.) archive is at `https://firms.modaps.eosdis.nasa.gov/`.)
- **Removal semantics:** none. FIRMS publishes detections; absence is the signal - **Removal semantics:** none. FIRMS publishes detections; absence is the signal
if a fire stops burning. Consumers should not expect explicit "removal" events. if a fire stops burning. Consumers should not expect explicit "removal" events.
- **Enrichment (`data._enriched.geocoder`):** FIRMS is the enrichment pilot, so
each event carries a Central-derived geocoder bundle under
`data._enriched.geocoder`. It is *not* an upstream FIRMS field — Central
reverse-geocodes the hotspot's `latitude`/`longitude` and attaches the result.
The bundle always has these nine keys (any unresolved field is `null`, never
missing):
| key | meaning |
|---|---|
| `name` | place / feature name |
| `city` | city / town / village |
| `county` | county (or equivalent) |
| `state` | state / province |
| `country` | country |
| `postal_code` | postal / ZIP code |
| `timezone` | IANA tz (e.g. `America/Boise`) |
| `landclass` | land-management class (US PAD-US) |
| `elevation_m` | ground elevation, metres |
**Coverage by region (v0.5.0):** US hotspots get the full bundle (with
sparsity gaps in deep wilderness); non-US hotspots currently get only
`timezone` and `elevation_m` populated (both planet-scale), the rest `null`,
pending an upstream planet expansion. Treat `null` as "not resolved," not
"does not exist."
**Known wrinkle — `landclass` antimeridian false-positive:** a non-US hotspot
near 5153°N can spuriously get a non-`null` `landclass` (it false-matches the
Aleutian "Rat Islands" US land-management polygon across the dateline). If you
consume `landclass`, treat a non-`null` value on a clearly non-US point as
suspect. Fix is tracked upstream.
- **Live example (verbatim from CT104):** - **Live example (verbatim from CT104):**
```json ```json

View file

@ -24,6 +24,7 @@ are intentionally not restated. Cross-references point into that doc.
10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do) 10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do)
11. [Settings preview hook](#11-settings-preview-hook) 11. [Settings preview hook](#11-settings-preview-hook)
12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter) 12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter)
13. [Enrichment contract](#13-enrichment-contract)
--- ---
@ -49,33 +50,47 @@ concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)).
> what it will." > what it will."
> — Matt, 2026-05-19 > — Matt, 2026-05-19
Adapter authors translate that single sentence into a small number of concrete The correct reading of that sentence: **Central is the consumer's only data
rules: plane.** A downstream consumer sees exactly what's on the wire and nothing
more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot
reverse-geocode a coordinate on its own. So whatever Central does NOT put on
the wire is, for every consumer, simply missing. "Gives it all" therefore means
*give the consumer everything a reasonable consumer needs to act on the event*
— not "give the upstream payload only."
Adapter authors translate that into a small number of concrete rules:
- **Preserve every upstream field.** Anything the upstream returns lives in - **Preserve every upstream field.** Anything the upstream returns lives in
`Event.data` verbatim. Adapters do not silently drop fields, even ones that `Event.data` verbatim. Adapters do not silently drop fields, even ones that
look redundant or low-value today. look redundant or low-value today (see [§10.2](#102-silent-field-dropping)).
- **No enrichment.** Adapters do not reverse-geocode, do not call out to - **Enrich, deliberately and centrally.** Location, timezone, elevation,
upstream metadata endpoints during normal `poll()` flow, do not consult a landclass and similar context that consumers reliably need should be resolved
second source to "fill in" a missing field. If a downstream consumer wants once, by Central, and attached — not left for twelve consumers to each
enrichment, that is consumer-side work. re-derive (most of them can't). Enrichment runs through the framework
- **No opinionated translation.** Adapters do not coerce units, do not rename ([§13](#13-enrichment-contract)): an adapter declares `enrichment_locations`
fields to match a Central-wide vocabulary, do not collapse upstream and the supervisor attaches results under `Event.data["_enriched"]`.
enumerations into Central's preferred labels. - **Namespace enrichment for provenance.** Central-derived fields live under
- **The only adapter-side transforms are mechanical.** Specifically: `_enriched.<enricher_name>`; everything else in `data` is upstream verbatim.
subject-token normalization (camelCase → snake_case, agency-prefix splitting, A consumer can always tell which is which.
whitespace → underscore, lowercase) and dedup-key construction. Both are - **Fail gracefully to null, never to an exception.** Enrichment that can't
deterministic functions of upstream identifiers. Nothing else. resolve a field returns `null` for it (a stable, documented field set), and a
total enrichment failure returns an all-null bundle. A geocoder outage must
never drop or corrupt the underlying event.
- **No opinionated translation of the upstream payload.** Enrichment *adds*
namespaced fields; it does not rewrite upstream ones. Adapters still do not
coerce units, rename upstream fields, or collapse upstream enumerations inside
`data`. The only in-place adapter transforms remain mechanical: subject-token
normalization (camelCase → snake_case, agency-prefix splitting, whitespace →
underscore, lowercase) and dedup-key construction.
This rules out a whole category of plausible-sounding work that prior reviews This reframes a Phase 2 rule. The earlier draft of this doc said "no
have already rejected. For instance, "enrich NWIS site rows with USGS enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was
monitoring-locations metadata during `poll()`" was proposed for Phase 3 and rejected on those grounds. That reasoning is now inverted: consumers have no
killed on this principle. The producer adds the field-preserving pipe; the pipe practical way to do that work, so Central does it. The constraint that survives
ends at JetStream publish; everything richer is a downstream concern. is *where* and *how* — through the framework, namespaced, cached, failing to
null — not *whether*. See [§10.1](#101-enrichment-outside-the-framework) for the
See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of remaining anti-pattern (enrichment done outside the framework) and
anti-patterns. Future authors should reject the same proposals on the same [§13](#13-enrichment-contract) for the full contract.
grounds.
--- ---
@ -625,18 +640,30 @@ adapter authors should mirror it. Do not restate it here.
These are the patterns prior reviews have explicitly rejected. Reject them These are the patterns prior reviews have explicitly rejected. Reject them
again on sight in a new-adapter PR. again on sight in a new-adapter PR.
### 10.1 Enrichment during `poll()` ### 10.1 Enrichment outside the framework
No calls to upstream metadata endpoints, no reverse-geocode, no consultation Enrichment itself is **expected**, not forbidden — see
of a second source to fill in fields the primary feed omitted. The "NWIS [§2](#2-the-design-principle) and [§13](#13-enrichment-contract). Any adapter
enrichment" Phase 3 proposal — joining live measurements against the with location data should opt in by declaring `enrichment_locations` on the
monitoring-locations metadata endpoint during `poll()` — was rejected on the adapter class; the supervisor then runs the registered enrichers and attaches
[§2](#2-the-design-principle) principle. Future proposals along the same results under `Event.data["_enriched"]`.
lines get the same answer.
If enrichment is genuinely necessary, the right shape is a separate adapter The anti-pattern is doing enrichment the *wrong* way — outside the framework:
(or a downstream consumer) — not an `if metadata_missing: await
fetch_metadata()` branch buried in an adapter's `poll()`. - An `if missing: await fetch_metadata()` branch buried in an adapter's
`poll()`. This bypasses the cache (so every poll re-hits the geocoder), skips
the `_enriched` namespacing (so consumers can't tell upstream from
Central-derived), and gives up the never-raise/all-null safety net (so a
geocoder hiccup can take down the poll).
- Writing enriched fields directly into the top level of `Event.data` instead
of under `_enriched`. That destroys provenance — a consumer can no longer
tell which fields came from the upstream feed and which Central added.
- Standing up a parallel enrichment path (a second HTTP client, a private cache)
inside one adapter instead of registering a backend with the framework.
The rule of thumb: declare `enrichment_locations`, let the supervisor do the
work. If the framework can't express what you need, extend the framework
([§13](#13-enrichment-contract)) — don't route around it inside an adapter.
### 10.2 Silent field dropping ### 10.2 Silent field dropping
@ -821,3 +848,111 @@ requesting / granting merge.
- [ ] **Full pytest suite green.** - [ ] **Full pytest suite green.**
--- ---
## 13. Enrichment contract
Enrichment is how Central adds consumer-needed context (location names,
timezone, elevation, landclass, …) that the upstream feed doesn't carry and a
downstream consumer can't look up itself. It runs in the supervisor, after
dedup and before the CloudEvents wrap, for any adapter that opts in. Results are
namespaced under `Event.data["_enriched"]` so provenance stays explicit:
everything under `_enriched` is Central-derived; everything else in `data` is
upstream verbatim.
### 13.1 Opting an adapter in
Declare `enrichment_locations` on the adapter class — a list of
`(lat_field, lon_field)` tuples naming top-level keys in `Event.data`:
```python
class FIRMSAdapter(SourceAdapter):
enrichment_locations = [("latitude", "longitude")]
```
Empty (the default on `SourceAdapter`) means "no enrichment, publish as-is."
The supervisor uses the first tuple that resolves to a non-null coordinate pair,
runs each registered enricher over `{"lat": …, "lon": …}`, and attaches the
results. No adapter code calls enrichers directly.
### 13.2 The `Enricher` Protocol
An enricher is any object satisfying this Protocol (`central.enrichment.base`):
- `name: str` — short identifier, used as the key under
`Event.data["_enriched"]`.
- `async def enrich(self, location: dict[str, float]) -> dict[str, Any]`
given `{"lat": float, "lon": float}`, return a flat dict of enrichment
fields. Fields it can't resolve are present with value `None` (not omitted).
**Must never raise** — implementations handle their own failures and return
an all-null bundle on total failure.
### 13.3 `GeocoderEnricher` and the `GeocoderBackend` Protocol
`GeocoderEnricher` (`central.enrichment.geocoder`, `name = "geocoder"`) is the
only enricher today. It owns the cache and the canonical field normalization;
the actual reverse-geocode is delegated to a pluggable backend satisfying the
`GeocoderBackend` Protocol:
- `async def reverse(self, lat: float, lon: float) -> dict[str, Any]` — return
the canonical geocoder fields (see [§13.5](#135-per-field-coverage)); fields
the backend can't resolve return `None`. Must never raise.
Backends shipped: `NaviBackend` (composed Navi `/api/reverse/<lat>/<lon>`
endpoint — name/address + timezone + landclass + elevation in one call),
`PhotonBackend` (raw Photon, name/address only), `NominatimBackend` (OSM
Nominatim, name/address only, with a configurable rate limit + `User-Agent`),
and `NoOpBackend` (all-null — the default until an operator configures a real
backend).
### 13.4 Cache + failure semantics
`GeocoderEnricher` is backed by a sqlite cache (`central.enrichment.cache`,
`/var/lib/central/enrichment_cache.db`):
- Key: `(enricher_name, lat_rounded, lon_rounded)`, coordinates rounded to 4
decimal places (~11 m). TTL is per-enricher, default 24h.
- **Cache hit** → return cached bundle, no backend call.
- **Cache miss** → call backend, cache the normalized result (**even an
all-null bundle** — so known-empty coordinates aren't re-hammered), return it.
- **Backend raises** (a violation of the never-raise contract, or an
infrastructure error the backend chose to surface) → return an all-null
bundle and **do not cache** it, so the next event for that coordinate retries.
Enrichment config (`EnrichmentConfig`: `enricher_class`, `backend_class`,
`backend_settings`, `cache_ttl_s`) is read once at supervisor startup. Changing
the enricher set is a restart, not a hot-reload.
### 13.5 Per-field coverage
The canonical geocoder bundle is exactly nine fields. They mirror
`central.enrichment.geocoder.GEOCODER_FIELDS` (the single source of truth — this
table must match it):
| Field | US events | Non-US events today | Non-US after Photon planet expansion |
|---|---|---|---|
| `name` | populated (wilderness sparsity gaps) | null | populated |
| `city` | populated (wilderness sparsity gaps) | null | populated |
| `county` | populated (wilderness sparsity gaps) | null | populated |
| `state` | populated (wilderness sparsity gaps) | null | populated |
| `country` | populated (wilderness sparsity gaps) | null | populated |
| `postal_code` | populated (wilderness sparsity gaps) | null | populated |
| `timezone` | populated | populated (`tz_world` is planet-scale) | populated |
| `landclass` | populated where PAD-US covers | null (PAD-US is US-only) | null |
| `elevation_m` | populated | populated (planet-DEM) | populated |
**Net for v0.5.0:** US events get a rich bundle; non-US events get `timezone` +
`elevation_m` and the rest null. Photon planet expansion is queued on the Navi
side with no firm ETA; when it lands, `NaviBackend` picks it up automatically
with zero Central code changes.
### 13.6 Known wrinkle — landclass antimeridian false-positive
`landclass` is derived from a PostGIS `ST_Intersects` against PAD-US polygons.
Points near 5153°N **outside** the US can spuriously match the Aleutian "Rat
Islands" PAD-US polygon (false matching across the antimeridian), yielding a
non-null `landclass` that doesn't actually apply. This is a Navi-side bug being
worked separately; until it's fixed, treat a non-null `landclass` on a clearly
non-US point as suspect. Documented for consumers in
`CONSUMER-INTEGRATION.md`.
---

View file

@ -1,5 +1,8 @@
"""Geocoder backend implementations.""" """Geocoder backend implementations."""
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.backends.nominatim import NominatimBackend
from central.enrichment.backends.no_op import NoOpBackend from central.enrichment.backends.no_op import NoOpBackend
from central.enrichment.backends.photon import PhotonBackend
__all__ = ["NoOpBackend"] __all__ = ["NoOpBackend", "NaviBackend", "PhotonBackend", "NominatimBackend"]

View file

@ -0,0 +1,79 @@
"""Navi reverse-geocoding backend.
Hits the composed Navi endpoint `<base_url>/api/reverse/<lat>/<lon>`, which
already returns the canonical 9-field bundle (name, city, county, state,
country, postal_code, timezone, landclass, elevation_m). Navi composes Photon
(name/address) + tz_world (timezone) + PAD-US (landclass) + planet-DEM
(elevation_m), so this backend is a near-passthrough mapping.
Coverage today: US events get a rich bundle; non-US events get timezone +
elevation_m populated (both planet-scale) and the rest null until Navi's
Photon planet expansion lands (no Central change needed when it does).
"""
import asyncio
import logging
from typing import Any
import aiohttp
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "http://192.168.1.130:8440"
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
_WARMUP_LAT = 43.6150
_WARMUP_LON = -116.2023
class NaviBackend:
"""GeocoderBackend backed by the composed Navi /api/reverse endpoint."""
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
timeout_s: float = 10.0,
headers: dict[str, str] | None = None,
warmup: bool = True,
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout_s = timeout_s
# Future-proof: drop an Authorization: Bearer … here config-only, no code change.
self._headers = dict(headers or {})
if warmup:
# Fire-and-forget warmup ping; only if a loop is running (it is under
# the supervisor's asyncio.run, not under sync test construction).
try:
loop = asyncio.get_running_loop()
loop.create_task(self._warmup())
except RuntimeError:
pass
def _url(self, lat: float, lon: float) -> str:
return f"{self._base_url}/api/reverse/{lat}/{lon}"
async def _warmup(self) -> None:
try:
await self._fetch(_WARMUP_LAT, _WARMUP_LON)
except Exception:
# Warmup is best-effort; a failure here must not break startup.
logger.debug("NaviBackend warmup ping failed (non-fatal)")
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
) as session:
async with session.get(self._url(lat, lon), headers=self._headers) as resp:
resp.raise_for_status()
return await resp.json()
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
try:
data = await self._fetch(lat, lon)
except Exception:
# Non-200, network error, timeout, malformed JSON — never raise.
logger.debug("NaviBackend reverse failed; returning all-null bundle")
return all_null_bundle()
# Navi's response already matches the canonical shape; map defensively.
return {field: data.get(field) for field in GEOCODER_FIELDS}

View file

@ -0,0 +1,95 @@
"""OSM Nominatim reverse-geocoding backend.
Works against public OSM Nominatim (1 req/sec + User-Agent required) or a
self-hosted instance (no limit). Resolves name + address only; timezone,
landclass, and elevation_m are nulled (not in the Nominatim reverse response).
Nominatim jsonv2 reverse response shape:
{"display_name": "...", "name": "...",
"address": {city|town|village, county, state, country, postcode, ...}}
"""
import asyncio
import logging
import time
from typing import Any
from urllib.parse import urlencode
import aiohttp
from central.enrichment.geocoder import all_null_bundle
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org"
DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)"
class NominatimBackend:
"""GeocoderBackend backed by an OSM Nominatim /reverse endpoint.
rate_limit_per_sec throttles outbound requests (public OSM requires <= 1/s);
set it to 0 to disable for self-hosted instances.
"""
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
user_agent: str = DEFAULT_USER_AGENT,
rate_limit_per_sec: float = 1.0,
timeout_s: float = 10.0,
) -> None:
self._base_url = base_url.rstrip("/")
self._user_agent = user_agent
self._min_interval = (1.0 / rate_limit_per_sec) if rate_limit_per_sec > 0 else 0.0
self._timeout_s = timeout_s
self._rl_lock = asyncio.Lock()
self._last_request_at = 0.0
def _url(self, lat: float, lon: float) -> str:
qs = urlencode({"lat": lat, "lon": lon, "format": "jsonv2"})
return f"{self._base_url}/reverse?{qs}"
def _request_headers(self) -> dict[str, str]:
# Public Nominatim rejects requests without an identifying User-Agent.
return {"User-Agent": self._user_agent}
async def _throttle(self) -> None:
if self._min_interval <= 0:
return
async with self._rl_lock:
now = time.monotonic()
wait = self._last_request_at + self._min_interval - now
if wait > 0:
await asyncio.sleep(wait)
self._last_request_at = time.monotonic()
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
) as session:
async with session.get(
self._url(lat, lon), headers=self._request_headers()
) as resp:
resp.raise_for_status()
return await resp.json()
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
try:
await self._throttle()
data = await self._fetch(lat, lon)
addr = data.get("address", {}) or {}
except Exception:
logger.debug("NominatimBackend reverse failed; returning all-null bundle")
return all_null_bundle()
return {
"name": data.get("name") or data.get("display_name"),
"city": addr.get("city") or addr.get("town") or addr.get("village"),
"county": addr.get("county"),
"state": addr.get("state"),
"country": addr.get("country"),
"postal_code": addr.get("postcode"),
"timezone": None, # not in Nominatim reverse response
"landclass": None, # Navi-composed-endpoint only
"elevation_m": None, # Navi-composed-endpoint only
}

View file

@ -0,0 +1,69 @@
"""Raw Photon reverse-geocoding backend.
For deployers who run a Photon instance directly, without the composed
Navi-style endpoint. Photon resolves name + address only timezone,
landclass, and elevation_m are Navi-composed-endpoint extras and are nulled
here.
Photon reverse response shape:
{"features": [{"properties": {name, city, county, state, country,
postcode, ...}, "geometry": {...}}]}
"""
import logging
from typing import Any
from urllib.parse import urlencode
import aiohttp
from central.enrichment.geocoder import all_null_bundle
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "http://localhost:2322"
class PhotonBackend:
"""GeocoderBackend backed by a raw Photon /reverse endpoint."""
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
timeout_s: float = 10.0,
headers: dict[str, str] | None = None,
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout_s = timeout_s
self._headers = dict(headers or {})
def _url(self, lat: float, lon: float) -> str:
qs = urlencode({"lat": lat, "lon": lon, "limit": 1})
return f"{self._base_url}/reverse?{qs}"
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
) as session:
async with session.get(self._url(lat, lon), headers=self._headers) as resp:
resp.raise_for_status()
return await resp.json()
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
try:
data = await self._fetch(lat, lon)
features = data.get("features") or []
props = features[0].get("properties", {}) if features else {}
except Exception:
logger.debug("PhotonBackend reverse failed; returning all-null bundle")
return all_null_bundle()
return {
"name": props.get("name"),
"city": props.get("city"),
"county": props.get("county"),
"state": props.get("state"),
"country": props.get("country"),
"postal_code": props.get("postcode"), # Photon names it 'postcode'
"timezone": None, # not provided by raw Photon
"landclass": None, # Navi-composed-endpoint only
"elevation_m": None, # Navi-composed-endpoint only
}

View file

@ -24,7 +24,10 @@ from central.api_key_resolver import resolve_api_key_alias
from central.config_models import EnrichmentConfig from central.config_models import EnrichmentConfig
from central.enrichment.base import Enricher from central.enrichment.base import Enricher
from central.enrichment.cache import EnrichmentCache from central.enrichment.cache import EnrichmentCache
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.backends.nominatim import NominatimBackend
from central.enrichment.backends.no_op import NoOpBackend from central.enrichment.backends.no_op import NoOpBackend
from central.enrichment.backends.photon import PhotonBackend
from central.enrichment.geocoder import GeocoderEnricher from central.enrichment.geocoder import GeocoderEnricher
from central.models import Event from central.models import Event
from central.stream_manager import StreamManager from central.stream_manager import StreamManager
@ -33,9 +36,13 @@ CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db") ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db")
# Enricher / backend class-name registries for EnrichmentConfig resolution. # Enricher / backend class-name registries for EnrichmentConfig resolution.
# PR J ships GeocoderEnricher + NoOpBackend only; PR K extends these.
_ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher} _ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher}
_BACKEND_REGISTRY: dict[str, type] = {"NoOpBackend": NoOpBackend} _BACKEND_REGISTRY: dict[str, type] = {
"NoOpBackend": NoOpBackend,
"NaviBackend": NaviBackend,
"PhotonBackend": PhotonBackend,
"NominatimBackend": NominatimBackend,
}
def build_enrichers( def build_enrichers(

120
tests/test_navi_backend.py Normal file
View file

@ -0,0 +1,120 @@
"""Tests for NaviBackend (composed Navi /api/reverse endpoint).
HTTP is exercised via patching the backend's `_fetch` (the codebase has no
aioresponses/respx dep); URL construction is asserted on the pure `_url`
helper. An env-gated integration smoke against the live Navi endpoint is
skipped by default.
"""
import os
from unittest.mock import AsyncMock
import pytest
from central.enrichment.backends.navi import NaviBackend
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
# Full Navi response — already canonical shape.
_NAVI_OK = {
"name": "Where you are",
"city": "Boise",
"county": "Ada",
"state": "Idaho",
"country": "United States",
"postal_code": "83702",
"timezone": "America/Boise",
"landclass": "Public — National Forest",
"elevation_m": 824,
}
def _backend() -> NaviBackend:
# warmup=False so construction issues no background task in tests.
return NaviBackend(base_url="http://navi.test:8440", warmup=False)
def test_url_construction():
b = _backend()
assert b._url(43.615, -116.2023) == "http://navi.test:8440/api/reverse/43.615/-116.2023"
def test_base_url_trailing_slash_stripped():
b = NaviBackend(base_url="http://navi.test:8440/", warmup=False)
assert b._url(1.0, 2.0) == "http://navi.test:8440/api/reverse/1.0/2.0"
@pytest.mark.asyncio
async def test_happy_path_passthrough():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_NAVI_OK))
result = await b.reverse(43.615, -116.2023)
assert result == _NAVI_OK
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_partial_nulls_preserved():
"""Navi 200-with-nulls (non-US: timezone + elevation, rest null)."""
partial = {**all_null_bundle(), "timezone": "Europe/Paris", "elevation_m": 35}
b = _backend()
b._fetch = AsyncMock(return_value=partial)
result = await b.reverse(48.85, 2.35)
assert result["timezone"] == "Europe/Paris"
assert result["elevation_m"] == 35
assert result["city"] is None
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_extra_keys_dropped():
b = _backend()
b._fetch = AsyncMock(return_value={**_NAVI_OK, "debug_internal": "leak"})
result = await b.reverse(1.0, 2.0)
assert "debug_internal" not in result
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_network_error_returns_all_null_never_raises():
b = _backend()
b._fetch = AsyncMock(side_effect=ConnectionError("boom"))
result = await b.reverse(1.0, 2.0)
assert result == all_null_bundle()
@pytest.mark.asyncio
async def test_timeout_returns_all_null():
import asyncio
b = _backend()
b._fetch = AsyncMock(side_effect=asyncio.TimeoutError())
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_malformed_response_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ValueError("not json"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_headers_passed_through_config():
b = NaviBackend(base_url="http://navi.test", headers={"Authorization": "Bearer x"}, warmup=False)
assert b._headers == {"Authorization": "Bearer x"}
@pytest.mark.asyncio
@pytest.mark.skipif(
os.environ.get("NAVI_INTEGRATION_TEST") != "1",
reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint",
)
async def test_live_navi_boise():
"""Integration smoke against the real endpoint (default skipped)."""
b = NaviBackend(warmup=False) # default base_url
result = await b.reverse(43.6150, -116.2023)
assert result["name"] == "Where you are"
assert result["city"] == "Boise"
assert result["state"] == "Idaho"
assert result["elevation_m"] is not None
assert abs(float(result["elevation_m"]) - 824) < 50

View file

@ -0,0 +1,118 @@
"""Tests for NominatimBackend (OSM Nominatim /reverse jsonv2)."""
from unittest.mock import AsyncMock, patch
import pytest
from central.enrichment.backends.nominatim import (
DEFAULT_USER_AGENT,
NominatimBackend,
)
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
_NOMINATIM_OK = {
"name": "Boise",
"display_name": "Boise, Ada County, Idaho, United States",
"address": {
"city": "Boise",
"county": "Ada County",
"state": "Idaho",
"country": "United States",
"postcode": "83702",
},
}
def _backend(**kw) -> NominatimBackend:
kw.setdefault("base_url", "http://nominatim.test")
kw.setdefault("rate_limit_per_sec", 0) # disabled by default in tests
return NominatimBackend(**kw)
def test_url_and_format():
b = _backend()
url = b._url(43.6, -116.2)
assert url.startswith("http://nominatim.test/reverse?")
assert "format=jsonv2" in url and "lat=43.6" in url and "lon=-116.2" in url
def test_user_agent_header_present():
b = _backend()
assert b._request_headers()["User-Agent"] == DEFAULT_USER_AGENT
def test_custom_user_agent():
b = _backend(user_agent="myapp/1.0 (me@example.com)")
assert b._request_headers()["User-Agent"] == "myapp/1.0 (me@example.com)"
@pytest.mark.asyncio
async def test_happy_path_maps_address_block():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK))
result = await b.reverse(43.6, -116.2)
assert set(result.keys()) == set(GEOCODER_FIELDS)
assert result["name"] == "Boise"
assert result["city"] == "Boise"
assert result["county"] == "Ada County"
assert result["state"] == "Idaho"
assert result["country"] == "United States"
assert result["postal_code"] == "83702"
@pytest.mark.asyncio
async def test_navi_only_fields_null():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK))
result = await b.reverse(43.6, -116.2)
assert result["timezone"] is None
assert result["landclass"] is None
assert result["elevation_m"] is None
@pytest.mark.asyncio
async def test_city_falls_back_to_town_then_village():
b = _backend()
b._fetch = AsyncMock(return_value={"address": {"village": "Tinytown"}})
result = await b.reverse(1.0, 2.0)
assert result["city"] == "Tinytown"
@pytest.mark.asyncio
async def test_network_error_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ConnectionError("down"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_malformed_json_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ValueError("bad"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_rate_limit_disabled_does_not_sleep():
b = _backend(rate_limit_per_sec=0)
with patch("central.enrichment.backends.nominatim.asyncio.sleep") as slp:
await b._throttle()
await b._throttle()
slp.assert_not_called()
@pytest.mark.asyncio
async def test_rate_limit_spaces_consecutive_requests():
"""With a 1/sec limit, the second back-to-back call must sleep a positive
interval. Mock monotonic to a fixed instant so the throttle computes a wait."""
b = _backend(rate_limit_per_sec=1.0)
sleeps: list[float] = []
async def _fake_sleep(d):
sleeps.append(d)
with patch("central.enrichment.backends.nominatim.time.monotonic", return_value=100.0), \
patch("central.enrichment.backends.nominatim.asyncio.sleep", side_effect=_fake_sleep):
await b._throttle() # first: last_request_at was 0 -> no wait
await b._throttle() # second: now==100, last==100 -> wait ~1.0s
assert any(d > 0 for d in sleeps), f"expected a positive throttle sleep, got {sleeps}"

View file

@ -0,0 +1,92 @@
"""Tests for PhotonBackend (raw Photon /reverse)."""
from unittest.mock import AsyncMock
import pytest
from central.enrichment.backends.photon import PhotonBackend
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
# Photon reverse response shape.
_PHOTON_OK = {
"features": [
{
"properties": {
"name": "Boise",
"city": "Boise",
"county": "Ada County",
"state": "Idaho",
"country": "United States",
"postcode": "83702",
"osm_key": "place",
},
"geometry": {"type": "Point", "coordinates": [-116.2, 43.6]},
}
]
}
def _backend() -> PhotonBackend:
return PhotonBackend(base_url="http://photon.test:2322")
def test_url_construction():
b = _backend()
url = b._url(43.6, -116.2)
assert url.startswith("http://photon.test:2322/reverse?")
assert "lat=43.6" in url and "lon=-116.2" in url and "limit=1" in url
@pytest.mark.asyncio
async def test_happy_path_maps_to_canonical():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_PHOTON_OK))
result = await b.reverse(43.6, -116.2)
assert set(result.keys()) == set(GEOCODER_FIELDS)
assert result["city"] == "Boise"
assert result["county"] == "Ada County"
assert result["state"] == "Idaho"
assert result["country"] == "United States"
assert result["postal_code"] == "83702" # mapped from Photon 'postcode'
@pytest.mark.asyncio
async def test_navi_only_fields_are_null():
b = _backend()
b._fetch = AsyncMock(return_value=dict(_PHOTON_OK))
result = await b.reverse(43.6, -116.2)
assert result["timezone"] is None
assert result["landclass"] is None
assert result["elevation_m"] is None
@pytest.mark.asyncio
async def test_empty_features_returns_all_null():
b = _backend()
b._fetch = AsyncMock(return_value={"features": []})
assert await b.reverse(0.0, 0.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_missing_properties_keys_become_null():
b = _backend()
b._fetch = AsyncMock(return_value={"features": [{"properties": {"name": "X"}}]})
result = await b.reverse(1.0, 2.0)
assert result["name"] == "X"
assert result["city"] is None
assert result["postal_code"] is None
assert set(result.keys()) == set(GEOCODER_FIELDS)
@pytest.mark.asyncio
async def test_network_error_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ConnectionError("down"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()
@pytest.mark.asyncio
async def test_malformed_json_returns_all_null():
b = _backend()
b._fetch = AsyncMock(side_effect=ValueError("bad json"))
assert await b.reverse(1.0, 2.0) == all_null_bundle()

View file

@ -23,8 +23,14 @@ from pathlib import Path
from central.adapter import SourceAdapter from central.adapter import SourceAdapter
from central.adapter_discovery import discover_adapters from central.adapter_discovery import discover_adapters
from central.enrichment.geocoder import GEOCODER_FIELDS
from central.streams import STREAMS from central.streams import STREAMS
# The verbatim design-principle quote that must stay in §2 (Matt, 2026-05-19).
_DESIGN_PRINCIPLE_QUOTE = (
"Central takes it all and gives it all. It's up to the pipe to do with it"
)
DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md" DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md"
@ -186,6 +192,48 @@ def test_streams_snippet_quotes_live_registry():
) )
def _section(doc: str, header_re: str) -> str:
"""Return the body of the section whose header matches header_re, up to the
next same-or-higher-level header."""
m = re.search(header_re + r"\s*\n(.*?)(?=^## |\Z)", doc, re.DOTALL | re.MULTILINE)
assert m, f"doc missing section matching {header_re!r}"
return m.group(1)
def test_design_principle_quote_present_in_section_2():
"""§2 must still carry the verbatim Matt quote — the reframe changes the
translation beneath it, not the quote itself."""
section = _section(_doc_text(), r"^## 2\. The design principle")
assert _DESIGN_PRINCIPLE_QUOTE in section, "verbatim design-principle quote missing from §2"
def test_anti_pattern_10_1_section_exists():
"""§10.1 must still exist as a subsection (content reframed to
'enrichment outside the framework', structure preserved)."""
doc = _doc_text()
assert re.search(r"^### 10\.1 ", doc, re.MULTILINE), "doc missing '### 10.1' subsection"
def test_enrichment_contract_section_13_has_all_protocol_references():
"""New §13 must name all four enrichment contract types verbatim."""
section = _section(_doc_text(), r"^## 13\. Enrichment contract")
for ref in ("Enricher", "GeocoderEnricher", "GeocoderBackend", "NoOpBackend"):
assert ref in section, f"§13 missing reference to {ref!r}"
def test_enrichment_coverage_matrix_lists_exactly_geocoder_fields():
"""The §13 per-field coverage matrix must list exactly the canonical
GEOCODER_FIELDS derived from code, not hardcoded here."""
section = _section(_doc_text(), r"^## 13\. Enrichment contract")
# Matrix rows look like: | `field_name` | ... |
row_fields = set(re.findall(r"^\|\s*`([a-z_]+)`\s*\|", section, re.MULTILINE))
assert row_fields == set(GEOCODER_FIELDS), (
f"coverage-matrix field drift: "
f"doc-only={row_fields - set(GEOCODER_FIELDS)}, "
f"code-only={set(GEOCODER_FIELDS) - row_fields}"
)
def test_no_orphan_adapter_references_in_anti_patterns(): def test_no_orphan_adapter_references_in_anti_patterns():
"""Anti-patterns section names two real adapter modules as examples """Anti-patterns section names two real adapter modules as examples
(firms, inciweb in §10.4). Those names must still resolve via (firms, inciweb in §10.4). Those names must still resolve via