mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Compare commits
6 commits
a477285b3f
...
3c27534e9e
| Author | SHA1 | Date | |
|---|---|---|---|
|
3c27534e9e |
|||
|
|
b694fc0c9d | ||
|
bd809846ea |
|||
|
|
04c1d07b3f | ||
|
54238093a5 |
|||
|
|
98b050b2af |
24 changed files with 1865 additions and 50 deletions
|
|
@ -296,6 +296,36 @@ ground-survey workflows.
|
|||
archive is at `https://firms.modaps.eosdis.nasa.gov/`.)
|
||||
- **Removal semantics:** none. FIRMS publishes detections; absence is the signal
|
||||
if a fire stops burning. Consumers should not expect explicit "removal" events.
|
||||
- **Enrichment (`data._enriched.geocoder`):** FIRMS is the enrichment pilot, so
|
||||
each event carries a Central-derived geocoder bundle under
|
||||
`data._enriched.geocoder`. It is *not* an upstream FIRMS field — Central
|
||||
reverse-geocodes the hotspot's `latitude`/`longitude` and attaches the result.
|
||||
The bundle always has these nine keys (any unresolved field is `null`, never
|
||||
missing):
|
||||
|
||||
| key | meaning |
|
||||
|---|---|
|
||||
| `name` | place / feature name |
|
||||
| `city` | city / town / village |
|
||||
| `county` | county (or equivalent) |
|
||||
| `state` | state / province |
|
||||
| `country` | country |
|
||||
| `postal_code` | postal / ZIP code |
|
||||
| `timezone` | IANA tz (e.g. `America/Boise`) |
|
||||
| `landclass` | land-management class (US PAD-US) |
|
||||
| `elevation_m` | ground elevation, metres |
|
||||
|
||||
**Coverage by region (v0.5.0):** US hotspots get the full bundle (with
|
||||
sparsity gaps in deep wilderness); non-US hotspots currently get only
|
||||
`timezone` and `elevation_m` populated (both planet-scale), the rest `null`,
|
||||
pending an upstream planet expansion. Treat `null` as "not resolved," not
|
||||
"does not exist."
|
||||
|
||||
**Known wrinkle — `landclass` antimeridian false-positive:** a non-US hotspot
|
||||
near 51–53°N can spuriously get a non-`null` `landclass` (it false-matches the
|
||||
Aleutian "Rat Islands" US land-management polygon across the dateline). If you
|
||||
consume `landclass`, treat a non-`null` value on a clearly non-US point as
|
||||
suspect. Fix is tracked upstream.
|
||||
- **Live example (verbatim from CT104):**
|
||||
|
||||
```json
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ are intentionally not restated. Cross-references point into that doc.
|
|||
10. [Anti-patterns — what NOT to do](#10-anti-patterns--what-not-to-do)
|
||||
11. [Settings preview hook](#11-settings-preview-hook)
|
||||
12. [Acceptance gate for a new adapter](#12-acceptance-gate-for-a-new-adapter)
|
||||
13. [Enrichment contract](#13-enrichment-contract)
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -49,33 +50,47 @@ concerns (live in [`CONSUMER-INTEGRATION.md`](./CONSUMER-INTEGRATION.md)).
|
|||
> what it will."
|
||||
> — Matt, 2026-05-19
|
||||
|
||||
Adapter authors translate that single sentence into a small number of concrete
|
||||
rules:
|
||||
The correct reading of that sentence: **Central is the consumer's only data
|
||||
plane.** A downstream consumer sees exactly what's on the wire and nothing
|
||||
more — it cannot do a follow-up lookup, cannot re-query the upstream, cannot
|
||||
reverse-geocode a coordinate on its own. So whatever Central does NOT put on
|
||||
the wire is, for every consumer, simply missing. "Gives it all" therefore means
|
||||
*give the consumer everything a reasonable consumer needs to act on the event*
|
||||
— not "give the upstream payload only."
|
||||
|
||||
Adapter authors translate that into a small number of concrete rules:
|
||||
|
||||
- **Preserve every upstream field.** Anything the upstream returns lives in
|
||||
`Event.data` verbatim. Adapters do not silently drop fields, even ones that
|
||||
look redundant or low-value today.
|
||||
- **No enrichment.** Adapters do not reverse-geocode, do not call out to
|
||||
upstream metadata endpoints during normal `poll()` flow, do not consult a
|
||||
second source to "fill in" a missing field. If a downstream consumer wants
|
||||
enrichment, that is consumer-side work.
|
||||
- **No opinionated translation.** Adapters do not coerce units, do not rename
|
||||
fields to match a Central-wide vocabulary, do not collapse upstream
|
||||
enumerations into Central's preferred labels.
|
||||
- **The only adapter-side transforms are mechanical.** Specifically:
|
||||
subject-token normalization (camelCase → snake_case, agency-prefix splitting,
|
||||
whitespace → underscore, lowercase) and dedup-key construction. Both are
|
||||
deterministic functions of upstream identifiers. Nothing else.
|
||||
look redundant or low-value today (see [§10.2](#102-silent-field-dropping)).
|
||||
- **Enrich, deliberately and centrally.** Location, timezone, elevation,
|
||||
landclass and similar context that consumers reliably need should be resolved
|
||||
once, by Central, and attached — not left for twelve consumers to each
|
||||
re-derive (most of them can't). Enrichment runs through the framework
|
||||
([§13](#13-enrichment-contract)): an adapter declares `enrichment_locations`
|
||||
and the supervisor attaches results under `Event.data["_enriched"]`.
|
||||
- **Namespace enrichment for provenance.** Central-derived fields live under
|
||||
`_enriched.<enricher_name>`; everything else in `data` is upstream verbatim.
|
||||
A consumer can always tell which is which.
|
||||
- **Fail gracefully to null, never to an exception.** Enrichment that can't
|
||||
resolve a field returns `null` for it (a stable, documented field set), and a
|
||||
total enrichment failure returns an all-null bundle. A geocoder outage must
|
||||
never drop or corrupt the underlying event.
|
||||
- **No opinionated translation of the upstream payload.** Enrichment *adds*
|
||||
namespaced fields; it does not rewrite upstream ones. Adapters still do not
|
||||
coerce units, rename upstream fields, or collapse upstream enumerations inside
|
||||
`data`. The only in-place adapter transforms remain mechanical: subject-token
|
||||
normalization (camelCase → snake_case, agency-prefix splitting, whitespace →
|
||||
underscore, lowercase) and dedup-key construction.
|
||||
|
||||
This rules out a whole category of plausible-sounding work that prior reviews
|
||||
have already rejected. For instance, "enrich NWIS site rows with USGS
|
||||
monitoring-locations metadata during `poll()`" was proposed for Phase 3 and
|
||||
killed on this principle. The producer adds the field-preserving pipe; the pipe
|
||||
ends at JetStream publish; everything richer is a downstream concern.
|
||||
|
||||
See [§10](#10-anti-patterns--what-not-to-do) for the enforced list of
|
||||
anti-patterns. Future authors should reject the same proposals on the same
|
||||
grounds.
|
||||
This reframes a Phase 2 rule. The earlier draft of this doc said "no
|
||||
enrichment — that's consumer-side work," and a proposal to enrich NWIS rows was
|
||||
rejected on those grounds. That reasoning is now inverted: consumers have no
|
||||
practical way to do that work, so Central does it. The constraint that survives
|
||||
is *where* and *how* — through the framework, namespaced, cached, failing to
|
||||
null — not *whether*. See [§10.1](#101-enrichment-outside-the-framework) for the
|
||||
remaining anti-pattern (enrichment done outside the framework) and
|
||||
[§13](#13-enrichment-contract) for the full contract.
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -625,18 +640,30 @@ adapter authors should mirror it. Do not restate it here.
|
|||
These are the patterns prior reviews have explicitly rejected. Reject them
|
||||
again on sight in a new-adapter PR.
|
||||
|
||||
### 10.1 Enrichment during `poll()`
|
||||
### 10.1 Enrichment outside the framework
|
||||
|
||||
No calls to upstream metadata endpoints, no reverse-geocode, no consultation
|
||||
of a second source to fill in fields the primary feed omitted. The "NWIS
|
||||
enrichment" Phase 3 proposal — joining live measurements against the
|
||||
monitoring-locations metadata endpoint during `poll()` — was rejected on the
|
||||
[§2](#2-the-design-principle) principle. Future proposals along the same
|
||||
lines get the same answer.
|
||||
Enrichment itself is **expected**, not forbidden — see
|
||||
[§2](#2-the-design-principle) and [§13](#13-enrichment-contract). Any adapter
|
||||
with location data should opt in by declaring `enrichment_locations` on the
|
||||
adapter class; the supervisor then runs the registered enrichers and attaches
|
||||
results under `Event.data["_enriched"]`.
|
||||
|
||||
If enrichment is genuinely necessary, the right shape is a separate adapter
|
||||
(or a downstream consumer) — not an `if metadata_missing: await
|
||||
fetch_metadata()` branch buried in an adapter's `poll()`.
|
||||
The anti-pattern is doing enrichment the *wrong* way — outside the framework:
|
||||
|
||||
- An `if missing: await fetch_metadata()` branch buried in an adapter's
|
||||
`poll()`. This bypasses the cache (so every poll re-hits the geocoder), skips
|
||||
the `_enriched` namespacing (so consumers can't tell upstream from
|
||||
Central-derived), and gives up the never-raise/all-null safety net (so a
|
||||
geocoder hiccup can take down the poll).
|
||||
- Writing enriched fields directly into the top level of `Event.data` instead
|
||||
of under `_enriched`. That destroys provenance — a consumer can no longer
|
||||
tell which fields came from the upstream feed and which Central added.
|
||||
- Standing up a parallel enrichment path (a second HTTP client, a private cache)
|
||||
inside one adapter instead of registering a backend with the framework.
|
||||
|
||||
The rule of thumb: declare `enrichment_locations`, let the supervisor do the
|
||||
work. If the framework can't express what you need, extend the framework
|
||||
([§13](#13-enrichment-contract)) — don't route around it inside an adapter.
|
||||
|
||||
### 10.2 Silent field dropping
|
||||
|
||||
|
|
@ -821,3 +848,111 @@ requesting / granting merge.
|
|||
- [ ] **Full pytest suite green.**
|
||||
|
||||
---
|
||||
|
||||
## 13. Enrichment contract
|
||||
|
||||
Enrichment is how Central adds consumer-needed context (location names,
|
||||
timezone, elevation, landclass, …) that the upstream feed doesn't carry and a
|
||||
downstream consumer can't look up itself. It runs in the supervisor, after
|
||||
dedup and before the CloudEvents wrap, for any adapter that opts in. Results are
|
||||
namespaced under `Event.data["_enriched"]` so provenance stays explicit:
|
||||
everything under `_enriched` is Central-derived; everything else in `data` is
|
||||
upstream verbatim.
|
||||
|
||||
### 13.1 Opting an adapter in
|
||||
|
||||
Declare `enrichment_locations` on the adapter class — a list of
|
||||
`(lat_field, lon_field)` tuples naming top-level keys in `Event.data`:
|
||||
|
||||
```python
|
||||
class FIRMSAdapter(SourceAdapter):
|
||||
enrichment_locations = [("latitude", "longitude")]
|
||||
```
|
||||
|
||||
Empty (the default on `SourceAdapter`) means "no enrichment, publish as-is."
|
||||
The supervisor uses the first tuple that resolves to a non-null coordinate pair,
|
||||
runs each registered enricher over `{"lat": …, "lon": …}`, and attaches the
|
||||
results. No adapter code calls enrichers directly.
|
||||
|
||||
### 13.2 The `Enricher` Protocol
|
||||
|
||||
An enricher is any object satisfying this Protocol (`central.enrichment.base`):
|
||||
|
||||
- `name: str` — short identifier, used as the key under
|
||||
`Event.data["_enriched"]`.
|
||||
- `async def enrich(self, location: dict[str, float]) -> dict[str, Any]` —
|
||||
given `{"lat": float, "lon": float}`, return a flat dict of enrichment
|
||||
fields. Fields it can't resolve are present with value `None` (not omitted).
|
||||
**Must never raise** — implementations handle their own failures and return
|
||||
an all-null bundle on total failure.
|
||||
|
||||
### 13.3 `GeocoderEnricher` and the `GeocoderBackend` Protocol
|
||||
|
||||
`GeocoderEnricher` (`central.enrichment.geocoder`, `name = "geocoder"`) is the
|
||||
only enricher today. It owns the cache and the canonical field normalization;
|
||||
the actual reverse-geocode is delegated to a pluggable backend satisfying the
|
||||
`GeocoderBackend` Protocol:
|
||||
|
||||
- `async def reverse(self, lat: float, lon: float) -> dict[str, Any]` — return
|
||||
the canonical geocoder fields (see [§13.5](#135-per-field-coverage)); fields
|
||||
the backend can't resolve return `None`. Must never raise.
|
||||
|
||||
Backends shipped: `NaviBackend` (composed Navi `/api/reverse/<lat>/<lon>`
|
||||
endpoint — name/address + timezone + landclass + elevation in one call),
|
||||
`PhotonBackend` (raw Photon, name/address only), `NominatimBackend` (OSM
|
||||
Nominatim, name/address only, with a configurable rate limit + `User-Agent`),
|
||||
and `NoOpBackend` (all-null — the default until an operator configures a real
|
||||
backend).
|
||||
|
||||
### 13.4 Cache + failure semantics
|
||||
|
||||
`GeocoderEnricher` is backed by a sqlite cache (`central.enrichment.cache`,
|
||||
`/var/lib/central/enrichment_cache.db`):
|
||||
|
||||
- Key: `(enricher_name, lat_rounded, lon_rounded)`, coordinates rounded to 4
|
||||
decimal places (~11 m). TTL is per-enricher, default 24h.
|
||||
- **Cache hit** → return cached bundle, no backend call.
|
||||
- **Cache miss** → call backend, cache the normalized result (**even an
|
||||
all-null bundle** — so known-empty coordinates aren't re-hammered), return it.
|
||||
- **Backend raises** (a violation of the never-raise contract, or an
|
||||
infrastructure error the backend chose to surface) → return an all-null
|
||||
bundle and **do not cache** it, so the next event for that coordinate retries.
|
||||
|
||||
Enrichment config (`EnrichmentConfig`: `enricher_class`, `backend_class`,
|
||||
`backend_settings`, `cache_ttl_s`) is read once at supervisor startup. Changing
|
||||
the enricher set is a restart, not a hot-reload.
|
||||
|
||||
### 13.5 Per-field coverage
|
||||
|
||||
The canonical geocoder bundle is exactly nine fields. They mirror
|
||||
`central.enrichment.geocoder.GEOCODER_FIELDS` (the single source of truth — this
|
||||
table must match it):
|
||||
|
||||
| Field | US events | Non-US events today | Non-US after Photon planet expansion |
|
||||
|---|---|---|---|
|
||||
| `name` | populated (wilderness sparsity gaps) | null | populated |
|
||||
| `city` | populated (wilderness sparsity gaps) | null | populated |
|
||||
| `county` | populated (wilderness sparsity gaps) | null | populated |
|
||||
| `state` | populated (wilderness sparsity gaps) | null | populated |
|
||||
| `country` | populated (wilderness sparsity gaps) | null | populated |
|
||||
| `postal_code` | populated (wilderness sparsity gaps) | null | populated |
|
||||
| `timezone` | populated | populated (`tz_world` is planet-scale) | populated |
|
||||
| `landclass` | populated where PAD-US covers | null (PAD-US is US-only) | null |
|
||||
| `elevation_m` | populated | populated (planet-DEM) | populated |
|
||||
|
||||
**Net for v0.5.0:** US events get a rich bundle; non-US events get `timezone` +
|
||||
`elevation_m` and the rest null. Photon planet expansion is queued on the Navi
|
||||
side with no firm ETA; when it lands, `NaviBackend` picks it up automatically
|
||||
with zero Central code changes.
|
||||
|
||||
### 13.6 Known wrinkle — landclass antimeridian false-positive
|
||||
|
||||
`landclass` is derived from a PostGIS `ST_Intersects` against PAD-US polygons.
|
||||
Points near 51–53°N **outside** the US can spuriously match the Aleutian "Rat
|
||||
Islands" PAD-US polygon (false matching across the antimeridian), yielding a
|
||||
non-null `landclass` that doesn't actually apply. This is a Navi-side bug being
|
||||
worked separately; until it's fixed, treat a non-null `landclass` on a clearly
|
||||
non-US point as suspect. Documented for consumers in
|
||||
`CONSUMER-INTEGRATION.md`.
|
||||
|
||||
---
|
||||
|
|
|
|||
44
sql/migrations/024_add_config_enrichment.sql
Normal file
44
sql/migrations/024_add_config_enrichment.sql
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
-- Migration: 024_add_config_enrichment
|
||||
-- Adds config.enrichment — the single-row, operator-settable enrichment config
|
||||
-- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY.
|
||||
--
|
||||
-- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)).
|
||||
-- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty
|
||||
-- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs,
|
||||
-- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page
|
||||
-- after this merges.
|
||||
--
|
||||
-- The seed mirrors central.config_models.EnrichmentConfig() defaults.
|
||||
-- Regenerate via:
|
||||
-- sudo -u central .venv/bin/python -c \
|
||||
-- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())"
|
||||
--
|
||||
-- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ...
|
||||
-- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER).
|
||||
|
||||
CREATE TABLE IF NOT EXISTS config.enrichment (
|
||||
id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true),
|
||||
enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher',
|
||||
backend_class TEXT NOT NULL DEFAULT 'NoOpBackend',
|
||||
backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
cache_ttl_s INTEGER NOT NULL DEFAULT 86400,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
-- Reuse the existing updated_at trigger function (migration 002).
|
||||
DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment;
|
||||
CREATE TRIGGER enrichment_set_updated_at
|
||||
BEFORE UPDATE ON config.enrichment
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION config.set_updated_at();
|
||||
|
||||
-- Reuse the existing NOTIFY function (migration 001) so the supervisor's
|
||||
-- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE
|
||||
-- branch emits 'enrichment:' (empty key — single-row table has no natural key).
|
||||
DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment;
|
||||
CREATE TRIGGER enrichment_notify
|
||||
AFTER INSERT OR UPDATE OR DELETE ON config.enrichment
|
||||
FOR EACH ROW EXECUTE FUNCTION config.notify_config_change();
|
||||
|
||||
-- Seed the single framework-default row (NoOp; no deployment-specific values).
|
||||
INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING;
|
||||
|
|
@ -8,7 +8,7 @@ import logging
|
|||
from collections.abc import Awaitable, Callable
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from central.config_models import AdapterConfig
|
||||
from central.config_models import AdapterConfig, EnrichmentConfig
|
||||
from central.config_store import ConfigStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -26,6 +26,10 @@ class ConfigSource(Protocol):
|
|||
"""Get configuration for a specific adapter."""
|
||||
...
|
||||
|
||||
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||
"""Get the enrichment configuration."""
|
||||
...
|
||||
|
||||
async def watch_for_changes(
|
||||
self,
|
||||
callback: Callable[[str, str], Awaitable[None] | None],
|
||||
|
|
@ -65,6 +69,10 @@ class DbConfigSource:
|
|||
"""Get a specific adapter from database."""
|
||||
return await self._store.get_adapter(name)
|
||||
|
||||
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||
"""Get the enrichment configuration from database."""
|
||||
return await self._store.get_enrichment_config()
|
||||
|
||||
async def watch_for_changes(
|
||||
self,
|
||||
callback: Callable[[str, str], Awaitable[None] | None],
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ from typing import Any
|
|||
|
||||
import asyncpg
|
||||
|
||||
from central.config_models import AdapterConfig, StreamConfig
|
||||
from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig
|
||||
from central.crypto import decrypt, encrypt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -129,6 +129,48 @@ class ConfigStore:
|
|||
name,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Enrichment configuration (single-row config.enrichment)
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||
"""Read the single config.enrichment row.
|
||||
|
||||
Falls back to EnrichmentConfig() framework defaults if the row is
|
||||
somehow absent (it is migration-seeded, so this is belt-and-suspenders).
|
||||
"""
|
||||
async with self._pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
|
||||
FROM config.enrichment
|
||||
WHERE id = true
|
||||
"""
|
||||
)
|
||||
if row is None:
|
||||
return EnrichmentConfig()
|
||||
return EnrichmentConfig(**dict(row))
|
||||
|
||||
async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None:
|
||||
"""Write the single config.enrichment row (id = true)."""
|
||||
async with self._pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO config.enrichment
|
||||
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
|
||||
VALUES (true, $1, $2, $3, $4)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
enricher_class = EXCLUDED.enricher_class,
|
||||
backend_class = EXCLUDED.backend_class,
|
||||
backend_settings = EXCLUDED.backend_settings,
|
||||
cache_ttl_s = EXCLUDED.cache_ttl_s
|
||||
""",
|
||||
config.enricher_class,
|
||||
config.backend_class,
|
||||
config.backend_settings, # JSON-encoded by the codec
|
||||
config.cache_ttl_s,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Stream configuration
|
||||
# -------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
"""Geocoder backend implementations."""
|
||||
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
from central.enrichment.backends.nominatim import NominatimBackend
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
from central.enrichment.backends.photon import PhotonBackend
|
||||
|
||||
__all__ = ["NoOpBackend"]
|
||||
__all__ = ["NoOpBackend", "NaviBackend", "PhotonBackend", "NominatimBackend"]
|
||||
|
|
|
|||
98
src/central/enrichment/backends/navi.py
Normal file
98
src/central/enrichment/backends/navi.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
"""Navi reverse-geocoding backend.
|
||||
|
||||
Hits the composed Navi endpoint `<base_url>/api/reverse/<lat>/<lon>`, which
|
||||
already returns the canonical 9-field bundle (name, city, county, state,
|
||||
country, postal_code, timezone, landclass, elevation_m). Navi composes Photon
|
||||
(name/address) + tz_world (timezone) + PAD-US (landclass) + planet-DEM
|
||||
(elevation_m), so this backend is a near-passthrough mapping.
|
||||
|
||||
Coverage today: US events get a rich bundle; non-US events get timezone +
|
||||
elevation_m populated (both planet-scale) and the rest null until Navi's
|
||||
Photon planet expansion lands (no Central change needed when it does).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Generic default — operators point this at their Navi instance via the
|
||||
# /enrichment config page (backend_settings.base_url). No deployment-specific
|
||||
# host belongs in source.
|
||||
DEFAULT_BASE_URL = "http://localhost:8440"
|
||||
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
|
||||
_WARMUP_LAT = 43.6150
|
||||
_WARMUP_LON = -116.2023
|
||||
|
||||
|
||||
class NaviBackendSettings(BaseModel):
|
||||
"""Settings for NaviBackend. Mirrors __init__ defaults exactly."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
base_url: str = Field(default=DEFAULT_BASE_URL, description="Navi /api/reverse base URL")
|
||||
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
|
||||
headers: dict[str, str] | None = Field(
|
||||
default=None, description="Extra request headers (e.g. Authorization)"
|
||||
)
|
||||
warmup: bool = Field(default=True, description="Fire a warmup ping on construction")
|
||||
|
||||
|
||||
class NaviBackend:
|
||||
"""GeocoderBackend backed by the composed Navi /api/reverse endpoint."""
|
||||
|
||||
settings_schema = NaviBackendSettings
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
timeout_s: float = 10.0,
|
||||
headers: dict[str, str] | None = None,
|
||||
warmup: bool = True,
|
||||
) -> None:
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._timeout_s = timeout_s
|
||||
# Future-proof: drop an Authorization: Bearer … here config-only, no code change.
|
||||
self._headers = dict(headers or {})
|
||||
if warmup:
|
||||
# Fire-and-forget warmup ping; only if a loop is running (it is under
|
||||
# the supervisor's asyncio.run, not under sync test construction).
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.create_task(self._warmup())
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
def _url(self, lat: float, lon: float) -> str:
|
||||
return f"{self._base_url}/api/reverse/{lat}/{lon}"
|
||||
|
||||
async def _warmup(self) -> None:
|
||||
try:
|
||||
await self._fetch(_WARMUP_LAT, _WARMUP_LON)
|
||||
except Exception:
|
||||
# Warmup is best-effort; a failure here must not break startup.
|
||||
logger.debug("NaviBackend warmup ping failed (non-fatal)")
|
||||
|
||||
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
async with aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
|
||||
) as session:
|
||||
async with session.get(self._url(lat, lon), headers=self._headers) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
try:
|
||||
data = await self._fetch(lat, lon)
|
||||
except Exception:
|
||||
# Non-200, network error, timeout, malformed JSON — never raise.
|
||||
logger.debug("NaviBackend reverse failed; returning all-null bundle")
|
||||
return all_null_bundle()
|
||||
# Navi's response already matches the canonical shape; map defensively.
|
||||
return {field: data.get(field) for field in GEOCODER_FIELDS}
|
||||
|
|
@ -7,11 +7,23 @@ which satisfies the GeocoderBackend contract while resolving nothing.
|
|||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
|
||||
|
||||
class NoOpBackendSettings(BaseModel):
|
||||
"""No-op backend takes no settings. extra='forbid' makes switching to
|
||||
NoOpBackend while stale backend_settings (e.g. a base_url) remain a clean
|
||||
ValidationError instead of a TypeError at construction."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
|
||||
class NoOpBackend:
|
||||
"""GeocoderBackend that resolves no fields."""
|
||||
|
||||
settings_schema = NoOpBackendSettings
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
return all_null_bundle()
|
||||
|
|
|
|||
113
src/central/enrichment/backends/nominatim.py
Normal file
113
src/central/enrichment/backends/nominatim.py
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
"""OSM Nominatim reverse-geocoding backend.
|
||||
|
||||
Works against public OSM Nominatim (1 req/sec + User-Agent required) or a
|
||||
self-hosted instance (no limit). Resolves name + address only; timezone,
|
||||
landclass, and elevation_m are nulled (not in the Nominatim reverse response).
|
||||
|
||||
Nominatim jsonv2 reverse response shape:
|
||||
{"display_name": "...", "name": "...",
|
||||
"address": {city|town|village, county, state, country, postcode, ...}}
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org"
|
||||
DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)"
|
||||
|
||||
|
||||
class NominatimBackendSettings(BaseModel):
|
||||
"""Settings for NominatimBackend. Mirrors __init__ defaults exactly."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
base_url: str = Field(default=DEFAULT_BASE_URL, description="Nominatim /reverse base URL")
|
||||
user_agent: str = Field(
|
||||
default=DEFAULT_USER_AGENT, description="User-Agent (public OSM requires one)"
|
||||
)
|
||||
rate_limit_per_sec: float = Field(
|
||||
default=1.0, description="Outbound request cap; 0 disables (self-hosted)"
|
||||
)
|
||||
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
|
||||
|
||||
|
||||
class NominatimBackend:
|
||||
"""GeocoderBackend backed by an OSM Nominatim /reverse endpoint.
|
||||
|
||||
rate_limit_per_sec throttles outbound requests (public OSM requires <= 1/s);
|
||||
set it to 0 to disable for self-hosted instances.
|
||||
"""
|
||||
|
||||
settings_schema = NominatimBackendSettings
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
user_agent: str = DEFAULT_USER_AGENT,
|
||||
rate_limit_per_sec: float = 1.0,
|
||||
timeout_s: float = 10.0,
|
||||
) -> None:
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._user_agent = user_agent
|
||||
self._min_interval = (1.0 / rate_limit_per_sec) if rate_limit_per_sec > 0 else 0.0
|
||||
self._timeout_s = timeout_s
|
||||
self._rl_lock = asyncio.Lock()
|
||||
self._last_request_at = 0.0
|
||||
|
||||
def _url(self, lat: float, lon: float) -> str:
|
||||
qs = urlencode({"lat": lat, "lon": lon, "format": "jsonv2"})
|
||||
return f"{self._base_url}/reverse?{qs}"
|
||||
|
||||
def _request_headers(self) -> dict[str, str]:
|
||||
# Public Nominatim rejects requests without an identifying User-Agent.
|
||||
return {"User-Agent": self._user_agent}
|
||||
|
||||
async def _throttle(self) -> None:
|
||||
if self._min_interval <= 0:
|
||||
return
|
||||
async with self._rl_lock:
|
||||
now = time.monotonic()
|
||||
wait = self._last_request_at + self._min_interval - now
|
||||
if wait > 0:
|
||||
await asyncio.sleep(wait)
|
||||
self._last_request_at = time.monotonic()
|
||||
|
||||
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
async with aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
|
||||
) as session:
|
||||
async with session.get(
|
||||
self._url(lat, lon), headers=self._request_headers()
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
try:
|
||||
await self._throttle()
|
||||
data = await self._fetch(lat, lon)
|
||||
addr = data.get("address", {}) or {}
|
||||
except Exception:
|
||||
logger.debug("NominatimBackend reverse failed; returning all-null bundle")
|
||||
return all_null_bundle()
|
||||
return {
|
||||
"name": data.get("name") or data.get("display_name"),
|
||||
"city": addr.get("city") or addr.get("town") or addr.get("village"),
|
||||
"county": addr.get("county"),
|
||||
"state": addr.get("state"),
|
||||
"country": addr.get("country"),
|
||||
"postal_code": addr.get("postcode"),
|
||||
"timezone": None, # not in Nominatim reverse response
|
||||
"landclass": None, # Navi-composed-endpoint only
|
||||
"elevation_m": None, # Navi-composed-endpoint only
|
||||
}
|
||||
82
src/central/enrichment/backends/photon.py
Normal file
82
src/central/enrichment/backends/photon.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
"""Raw Photon reverse-geocoding backend.
|
||||
|
||||
For deployers who run a Photon instance directly, without the composed
|
||||
Navi-style endpoint. Photon resolves name + address only — timezone,
|
||||
landclass, and elevation_m are Navi-composed-endpoint extras and are nulled
|
||||
here.
|
||||
|
||||
Photon reverse response shape:
|
||||
{"features": [{"properties": {name, city, county, state, country,
|
||||
postcode, ...}, "geometry": {...}}]}
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_BASE_URL = "http://localhost:2322"
|
||||
|
||||
|
||||
class PhotonBackendSettings(BaseModel):
|
||||
"""Settings for PhotonBackend. Mirrors __init__ defaults exactly."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
base_url: str = Field(default=DEFAULT_BASE_URL, description="Photon /reverse base URL")
|
||||
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
|
||||
headers: dict[str, str] | None = Field(default=None, description="Extra request headers")
|
||||
|
||||
|
||||
class PhotonBackend:
|
||||
"""GeocoderBackend backed by a raw Photon /reverse endpoint."""
|
||||
|
||||
settings_schema = PhotonBackendSettings
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
timeout_s: float = 10.0,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._timeout_s = timeout_s
|
||||
self._headers = dict(headers or {})
|
||||
|
||||
def _url(self, lat: float, lon: float) -> str:
|
||||
qs = urlencode({"lat": lat, "lon": lon, "limit": 1})
|
||||
return f"{self._base_url}/reverse?{qs}"
|
||||
|
||||
async def _fetch(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
async with aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=self._timeout_s),
|
||||
) as session:
|
||||
async with session.get(self._url(lat, lon), headers=self._headers) as resp:
|
||||
resp.raise_for_status()
|
||||
return await resp.json()
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
try:
|
||||
data = await self._fetch(lat, lon)
|
||||
features = data.get("features") or []
|
||||
props = features[0].get("properties", {}) if features else {}
|
||||
except Exception:
|
||||
logger.debug("PhotonBackend reverse failed; returning all-null bundle")
|
||||
return all_null_bundle()
|
||||
return {
|
||||
"name": props.get("name"),
|
||||
"city": props.get("city"),
|
||||
"county": props.get("county"),
|
||||
"state": props.get("state"),
|
||||
"country": props.get("country"),
|
||||
"postal_code": props.get("postcode"), # Photon names it 'postcode'
|
||||
"timezone": None, # not provided by raw Photon
|
||||
"landclass": None, # Navi-composed-endpoint only
|
||||
"elevation_m": None, # Navi-composed-endpoint only
|
||||
}
|
||||
|
|
@ -124,3 +124,27 @@ class EnrichmentCache:
|
|||
) -> None:
|
||||
"""Cache a bundle (idempotent upsert on the rounded-coords key)."""
|
||||
await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload)
|
||||
|
||||
def _invalidate_sync(self, enricher_name: str | None) -> int:
|
||||
conn = self._connect()
|
||||
try:
|
||||
if enricher_name is None:
|
||||
cur = conn.execute("DELETE FROM enrichment_cache")
|
||||
else:
|
||||
cur = conn.execute(
|
||||
"DELETE FROM enrichment_cache WHERE enricher_name = ?",
|
||||
(enricher_name,),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
async def invalidate(self, enricher_name: str | None = None) -> int:
|
||||
"""Drop cached bundles. Scoped to one enricher when given, else all.
|
||||
|
||||
Called when enrichment config changes — a new backend would otherwise
|
||||
keep returning the previous backend's cached results until TTL expiry.
|
||||
Returns the number of rows deleted.
|
||||
"""
|
||||
return await asyncio.to_thread(self._invalidate_sync, enricher_name)
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ land in PR K.
|
|||
import logging
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -38,6 +40,12 @@ def all_null_bundle() -> dict[str, Any]:
|
|||
class GeocoderBackend(Protocol):
|
||||
"""The pluggable reverse-geocoding layer beneath GeocoderEnricher."""
|
||||
|
||||
# Pydantic model (extra='forbid') describing this backend's accepted
|
||||
# settings. The supervisor validates config.enrichment.backend_settings
|
||||
# against it before instantiating, turning a config/settings mismatch into
|
||||
# a clean ValidationError instead of a constructor TypeError.
|
||||
settings_schema: type[BaseModel]
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
"""Return canonical geocoder fields (see GEOCODER_FIELDS).
|
||||
|
||||
|
|
|
|||
|
|
@ -66,6 +66,8 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
|
|||
return "text", None
|
||||
if field_type is int:
|
||||
return "number", None
|
||||
if field_type is float:
|
||||
return "number", None
|
||||
if field_type is bool:
|
||||
return "checkbox", None
|
||||
if field_type is RegionConfig:
|
||||
|
|
@ -88,6 +90,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
|
|||
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
|
||||
)
|
||||
|
||||
# dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings).
|
||||
# The form renders the value as JSON; the POST handler parses it back.
|
||||
if field_type is dict or origin is dict:
|
||||
return "json", None
|
||||
|
||||
# Check if it's a BaseModel subclass (nested model other than RegionConfig)
|
||||
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
|
||||
raise NotImplementedError(
|
||||
|
|
|
|||
|
|
@ -1990,6 +1990,199 @@ async def streams_update(
|
|||
return RedirectResponse(url="/streams", status_code=302)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Enrichment config route
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _outer_enrichment_fields(current: dict) -> list[FieldDescriptor]:
|
||||
"""EnrichmentConfig form fields EXCEPT backend_settings — that one is
|
||||
rendered as a per-backend <fieldset> via _backend_fields()."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
|
||||
return [
|
||||
f for f in describe_fields(EnrichmentConfig, current)
|
||||
if f.name != "backend_settings"
|
||||
]
|
||||
|
||||
|
||||
def _backend_fields(backend_class: str | None, current_bs: dict) -> list[FieldDescriptor]:
|
||||
"""Field descriptors for the selected backend's settings_schema, or [] when
|
||||
the backend class is unknown. Same generic describe_fields machinery."""
|
||||
from central.supervisor import _BACKEND_REGISTRY
|
||||
|
||||
cls = _BACKEND_REGISTRY.get(backend_class or "")
|
||||
if cls is None:
|
||||
return []
|
||||
return describe_fields(cls.settings_schema, current_bs or {})
|
||||
|
||||
|
||||
async def _read_enrichment_row(conn) -> dict:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
|
||||
FROM config.enrichment WHERE id = true
|
||||
"""
|
||||
)
|
||||
return dict(row) if row is not None else {}
|
||||
|
||||
|
||||
def _enrichment_context(request, *, outer_fields, backend_fields, backend_class,
|
||||
errors=None, form_data=None, backend_form_data=None):
|
||||
return {
|
||||
"operator": request.state.operator,
|
||||
"csrf_token": request.state.csrf_token,
|
||||
"outer_fields": outer_fields,
|
||||
"backend_fields": backend_fields,
|
||||
"backend_class": backend_class,
|
||||
"errors": errors,
|
||||
"form_data": form_data,
|
||||
"backend_form_data": backend_form_data,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/enrichment", response_class=HTMLResponse)
|
||||
async def enrichment_form(request: Request) -> HTMLResponse:
|
||||
"""Render the enrichment config form (outer fields + a per-backend fieldset
|
||||
for the currently-selected backend_class)."""
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
current = await _read_enrichment_row(conn)
|
||||
|
||||
backend_class = current.get("backend_class") or "NoOpBackend"
|
||||
current_bs = current.get("backend_settings") or {}
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="enrichment.html",
|
||||
context=_enrichment_context(
|
||||
request,
|
||||
outer_fields=_outer_enrichment_fields(current),
|
||||
backend_fields=_backend_fields(backend_class, current_bs),
|
||||
backend_class=backend_class,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@router.post("/enrichment")
|
||||
async def enrichment_update(request: Request) -> Response:
|
||||
"""Validate + persist the enrichment config. Hot-reload picks it up via the
|
||||
config.enrichment NOTIFY trigger. backend_settings is validated against the
|
||||
SUBMITTED backend_class's settings_schema."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.supervisor import _BACKEND_REGISTRY
|
||||
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
|
||||
form = await request.form()
|
||||
if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
|
||||
raise CsrfValidationError("Invalid CSRF token")
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
form_data: dict[str, Any] = {}
|
||||
backend_form_data: dict[str, Any] = {}
|
||||
parsed: dict[str, Any] = {}
|
||||
|
||||
# --- outer EnrichmentConfig fields (backend_settings excluded) ---
|
||||
for field in _outer_enrichment_fields({}):
|
||||
raw = form.get(field.name, "")
|
||||
form_data[field.name] = raw
|
||||
if field.widget == "number":
|
||||
try:
|
||||
parsed[field.name] = int(raw) if raw else None
|
||||
except ValueError:
|
||||
errors[field.name] = f"{field.label} must be a number"
|
||||
else: # text
|
||||
parsed[field.name] = raw.strip() if raw else None
|
||||
|
||||
submitted_backend_class = parsed.get("backend_class")
|
||||
|
||||
# --- backend settings fieldset, validated against the SUBMITTED backend ---
|
||||
backend_settings: dict[str, Any] = {}
|
||||
backend_cls = _BACKEND_REGISTRY.get(submitted_backend_class or "")
|
||||
if backend_cls is None and submitted_backend_class:
|
||||
errors["backend_class"] = f"Unknown backend: {submitted_backend_class}"
|
||||
elif backend_cls is not None:
|
||||
for f in describe_fields(backend_cls.settings_schema, {}):
|
||||
formkey = f"bs_{f.name}"
|
||||
raw = form.get(formkey, "")
|
||||
backend_form_data[formkey] = raw
|
||||
if f.widget == "checkbox":
|
||||
backend_settings[f.name] = formkey in form
|
||||
elif f.widget == "json":
|
||||
if raw and raw.strip():
|
||||
try:
|
||||
backend_settings[f.name] = json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
errors[formkey] = f"{f.label} is not valid JSON: {e}"
|
||||
# blank -> omit, schema default applies
|
||||
else: # text / number — let pydantic coerce, omit blanks for defaults
|
||||
if raw.strip() != "":
|
||||
backend_settings[f.name] = raw.strip()
|
||||
if not errors:
|
||||
try:
|
||||
backend_settings = backend_cls.settings_schema.model_validate(
|
||||
backend_settings
|
||||
).model_dump()
|
||||
except ValidationError as e:
|
||||
for err in e.errors():
|
||||
loc = err["loc"][0] if err["loc"] else "unknown"
|
||||
errors[f"bs_{loc}"] = err["msg"]
|
||||
|
||||
# --- outer EnrichmentConfig validation ---
|
||||
if not errors:
|
||||
try:
|
||||
validated = EnrichmentConfig(
|
||||
**{k: v for k, v in parsed.items() if v is not None},
|
||||
backend_settings=backend_settings,
|
||||
)
|
||||
except ValidationError as e:
|
||||
for err in e.errors():
|
||||
loc = err["loc"][0] if err["loc"] else "unknown"
|
||||
errors[str(loc)] = err["msg"]
|
||||
|
||||
if errors:
|
||||
# Re-render against the SUBMITTED backend_class so field errors attach
|
||||
# to the right schema (operator may be mid-switch with a typo).
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="enrichment.html",
|
||||
context=_enrichment_context(
|
||||
request,
|
||||
outer_fields=_outer_enrichment_fields({}),
|
||||
backend_fields=_backend_fields(submitted_backend_class, backend_settings),
|
||||
backend_class=submitted_backend_class,
|
||||
errors=errors,
|
||||
form_data=form_data,
|
||||
backend_form_data=backend_form_data,
|
||||
),
|
||||
status_code=200,
|
||||
)
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO config.enrichment
|
||||
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
|
||||
VALUES (true, $1, $2, $3, $4)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
enricher_class = EXCLUDED.enricher_class,
|
||||
backend_class = EXCLUDED.backend_class,
|
||||
backend_settings = EXCLUDED.backend_settings,
|
||||
cache_ttl_s = EXCLUDED.cache_ttl_s
|
||||
""",
|
||||
validated.enricher_class,
|
||||
validated.backend_class,
|
||||
validated.backend_settings, # encoded as jsonb by the pool codec
|
||||
validated.cache_ttl_s,
|
||||
)
|
||||
|
||||
return RedirectResponse(url="/enrichment", status_code=302)
|
||||
|
||||
|
||||
# Alias validation regex
|
||||
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@
|
|||
<li><a href="/adapters">Adapters</a></li>
|
||||
<li><a href="/events">Events</a></li>
|
||||
<li><a href="/streams">Streams</a></li>
|
||||
<li><a href="/enrichment">Enrichment</a></li>
|
||||
<li><a href="/api-keys">API Keys</a></li>
|
||||
<li>{{ operator.username }}</li>
|
||||
<li><a href="/change-password">Change Password</a></li>
|
||||
|
|
|
|||
90
src/central/gui/templates/enrichment.html
Normal file
90
src/central/gui/templates/enrichment.html
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Central — Enrichment{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Enrichment</h1>
|
||||
<p class="secondary">
|
||||
Central-side event enrichment. Results are attached to each event under
|
||||
<code>data._enriched.<enricher></code>. Changes hot-reload into the
|
||||
supervisor; switching backend invalidates the enrichment cache. Backend
|
||||
settings below are validated against the selected backend; switching
|
||||
backend then saving re-renders this form with the new backend's fields.
|
||||
</p>
|
||||
|
||||
<form method="post" action="/enrichment">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
|
||||
|
||||
<fieldset>
|
||||
<legend>Configuration</legend>
|
||||
{% for field in outer_fields %}
|
||||
{% if field.widget == "text" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
|
||||
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||
{% if field.required %}required{% endif %}>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "number" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<input type="number" step="any" id="{{ field.name }}" name="{{ field.name }}"
|
||||
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||
{% if field.required %}required{% endif %}>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</fieldset>
|
||||
|
||||
<fieldset>
|
||||
<legend>Backend settings — {{ backend_class }}</legend>
|
||||
{% if not backend_fields %}
|
||||
<small>This backend takes no settings.</small>
|
||||
{% endif %}
|
||||
{% for field in backend_fields %}
|
||||
{% set fk = "bs_" ~ field.name %}
|
||||
{% if field.widget == "text" %}
|
||||
<label for="{{ fk }}">{{ field.label }}</label>
|
||||
<input type="text" id="{{ fk }}" name="{{ fk }}"
|
||||
value="{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else field.current_value or '' }}">
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "number" %}
|
||||
<label for="{{ fk }}">{{ field.label }}</label>
|
||||
<input type="number" step="any" id="{{ fk }}" name="{{ fk }}"
|
||||
value="{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else field.current_value or '' }}">
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "checkbox" %}
|
||||
<label>
|
||||
<input type="checkbox" name="{{ fk }}"
|
||||
{% if backend_form_data %}{% if backend_form_data[fk] %}checked{% endif %}{% elif field.current_value %}checked{% endif %}>
|
||||
{{ field.label }}
|
||||
</label>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "json" %}
|
||||
<label for="{{ fk }}">{{ field.label }}</label>
|
||||
<textarea id="{{ fk }}" name="{{ fk }}" rows="4"
|
||||
placeholder="{}">{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else (field.current_value | tojson if field.current_value else '') }}</textarea>
|
||||
<small>JSON object{% if field.description %} — {{ field.description }}{% endif %}</small>
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</fieldset>
|
||||
|
||||
<button type="submit">Save Changes</button>
|
||||
</form>
|
||||
{% endblock %}
|
||||
|
|
@ -12,6 +12,7 @@ from typing import Any
|
|||
|
||||
import nats
|
||||
from nats.js import JetStreamContext
|
||||
from pydantic import ValidationError
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapter_discovery import discover_adapters
|
||||
|
|
@ -24,7 +25,10 @@ from central.api_key_resolver import resolve_api_key_alias
|
|||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.base import Enricher
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
from central.enrichment.backends.nominatim import NominatimBackend
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
from central.enrichment.backends.photon import PhotonBackend
|
||||
from central.enrichment.geocoder import GeocoderEnricher
|
||||
from central.models import Event
|
||||
from central.stream_manager import StreamManager
|
||||
|
|
@ -33,23 +37,32 @@ CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
|
|||
ENRICHMENT_CACHE_DB_PATH = Path("/var/lib/central/enrichment_cache.db")
|
||||
|
||||
# Enricher / backend class-name registries for EnrichmentConfig resolution.
|
||||
# PR J ships GeocoderEnricher + NoOpBackend only; PR K extends these.
|
||||
_ENRICHER_REGISTRY: dict[str, type] = {"GeocoderEnricher": GeocoderEnricher}
|
||||
_BACKEND_REGISTRY: dict[str, type] = {"NoOpBackend": NoOpBackend}
|
||||
_BACKEND_REGISTRY: dict[str, type] = {
|
||||
"NoOpBackend": NoOpBackend,
|
||||
"NaviBackend": NaviBackend,
|
||||
"PhotonBackend": PhotonBackend,
|
||||
"NominatimBackend": NominatimBackend,
|
||||
}
|
||||
|
||||
|
||||
def build_enrichers(
|
||||
enrichment_config: EnrichmentConfig,
|
||||
cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH,
|
||||
cache: EnrichmentCache,
|
||||
) -> list[Enricher]:
|
||||
"""Instantiate the configured enricher(s) with their backend + cache.
|
||||
"""Instantiate the configured enricher(s) with their backend + the given cache.
|
||||
|
||||
Read once at supervisor startup — enrichment config is NOT hot-reloaded
|
||||
in PR J (see EnrichmentConfig docstring).
|
||||
The supervisor owns the cache (so it can invalidate it on a config change)
|
||||
and passes it in. Enrichment config is read from config.enrichment at
|
||||
startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change).
|
||||
"""
|
||||
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
|
||||
backend = backend_cls(**enrichment_config.backend_settings)
|
||||
cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s)
|
||||
# Validate backend_settings against the backend's settings_schema BEFORE
|
||||
# constructing. A mismatch (e.g. a stale base_url left in the row after
|
||||
# switching to NoOpBackend) raises a clean pydantic ValidationError here
|
||||
# instead of a TypeError inside the backend constructor.
|
||||
validated = backend_cls.settings_schema.model_validate(enrichment_config.backend_settings)
|
||||
backend = backend_cls(**validated.model_dump())
|
||||
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
|
||||
return [enricher_cls(backend, cache=cache)]
|
||||
|
||||
|
|
@ -158,9 +171,15 @@ class Supervisor:
|
|||
self._config_store = config_store
|
||||
self._nats_url = nats_url
|
||||
self._cloudevents_config = cloudevents_config
|
||||
# Enrichment is read once at startup (no hot-reload in PR J).
|
||||
# Enrichment: a valid default set is built now so the supervisor is
|
||||
# never in a half-state; start() then overrides it from the live
|
||||
# config.enrichment row, and _on_config_change hot-reloads it.
|
||||
self._active_enrichment_config = enrichment_config or EnrichmentConfig()
|
||||
self._enrichment_cache = EnrichmentCache(
|
||||
ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s
|
||||
)
|
||||
self._enrichers: list[Enricher] = build_enrichers(
|
||||
enrichment_config or EnrichmentConfig()
|
||||
self._active_enrichment_config, self._enrichment_cache
|
||||
)
|
||||
self._adapters = discover_adapters()
|
||||
self._nc: nats.NATS | None = None
|
||||
|
|
@ -666,11 +685,74 @@ class Supervisor:
|
|||
extra={"stream": stream_config.name, "error": str(e)},
|
||||
)
|
||||
|
||||
def _rebuild_enrichers(self, config: EnrichmentConfig) -> None:
|
||||
"""Rebuild the active enricher set + cache from an EnrichmentConfig.
|
||||
|
||||
Builds into locals first and commits to instance state only on success,
|
||||
so a ValidationError (bad backend_settings) leaves the previously-active
|
||||
enrichers/config/cache untouched and propagates to the caller.
|
||||
"""
|
||||
cache = EnrichmentCache(ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s)
|
||||
enrichers = build_enrichers(config, cache) # may raise ValidationError
|
||||
self._active_enrichment_config = config
|
||||
self._enrichment_cache = cache
|
||||
self._enrichers = enrichers
|
||||
|
||||
async def _handle_enrichment_change(self) -> None:
|
||||
"""Re-read config.enrichment and rebuild enrichers. Invalidate the cache
|
||||
when the backend changed, so stale results from the previous backend
|
||||
don't survive until TTL expiry.
|
||||
|
||||
Invalid backend_settings (ValidationError) leave the previous backend
|
||||
running — the supervisor stays up; the operator fixes the row and the
|
||||
next NOTIFY brings it in cleanly.
|
||||
"""
|
||||
new_config = await self._config_source.get_enrichment_config()
|
||||
old_config = self._active_enrichment_config
|
||||
backend_changed = (
|
||||
new_config.backend_class != old_config.backend_class
|
||||
or new_config.backend_settings != old_config.backend_settings
|
||||
or new_config.enricher_class != old_config.enricher_class
|
||||
)
|
||||
try:
|
||||
self._rebuild_enrichers(new_config)
|
||||
except ValidationError as e:
|
||||
logger.error(
|
||||
"Enrichment config invalid; keeping previous backend",
|
||||
extra={
|
||||
"enricher_class": new_config.enricher_class,
|
||||
"backend_class": new_config.backend_class,
|
||||
"backend_settings": new_config.backend_settings,
|
||||
"errors": e.errors(),
|
||||
},
|
||||
)
|
||||
return
|
||||
if backend_changed:
|
||||
deleted = await self._enrichment_cache.invalidate()
|
||||
logger.info(
|
||||
"Enrichment backend changed; cache invalidated",
|
||||
extra={
|
||||
"enricher_class": new_config.enricher_class,
|
||||
"backend_class": new_config.backend_class,
|
||||
"rows_deleted": deleted,
|
||||
},
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Enrichment config changed (cache retained)",
|
||||
extra={"cache_ttl_s": new_config.cache_ttl_s},
|
||||
)
|
||||
|
||||
async def _on_config_change(self, table: str, key: str) -> None:
|
||||
"""Handle a configuration change notification.
|
||||
|
||||
Called when NOTIFY fires for config changes.
|
||||
"""
|
||||
# Handle enrichment config changes (single-row config.enrichment)
|
||||
if table == "enrichment":
|
||||
await self._handle_enrichment_change()
|
||||
return
|
||||
|
||||
# Handle stream changes
|
||||
if table == "streams":
|
||||
stream_name = key
|
||||
|
|
@ -762,6 +844,18 @@ class Supervisor:
|
|||
# Ensure streams exist with correct configuration
|
||||
await self._ensure_streams()
|
||||
|
||||
# Load the operator-set enrichment config (overrides the constructor
|
||||
# default); hot-reloaded thereafter via _on_config_change.
|
||||
enrichment_config = await self._config_source.get_enrichment_config()
|
||||
self._rebuild_enrichers(enrichment_config)
|
||||
logger.info(
|
||||
"Enrichment configured",
|
||||
extra={
|
||||
"enricher_class": enrichment_config.enricher_class,
|
||||
"backend_class": enrichment_config.backend_class,
|
||||
},
|
||||
)
|
||||
|
||||
# Load and start enabled adapters
|
||||
enabled_adapters = await self._config_source.list_enabled_adapters()
|
||||
for config in enabled_adapters:
|
||||
|
|
|
|||
235
tests/test_backend_settings_schema.py
Normal file
235
tests/test_backend_settings_schema.py
Normal file
|
|
@ -0,0 +1,235 @@
|
|||
"""Tests for per-backend settings schemas (PR L.5).
|
||||
|
||||
Each GeocoderBackend declares a Pydantic settings_schema (extra='forbid').
|
||||
build_enrichers validates backend_settings against it BEFORE constructing, so
|
||||
a config/settings mismatch is a clean ValidationError, not a TypeError. The
|
||||
supervisor's hot-reload keeps the previous backend running on ValidationError;
|
||||
the GUI POST re-renders with field errors and does not write the DB row.
|
||||
|
||||
Regression guard for the 2026-05-20 incident: switching backend_class to
|
||||
NoOpBackend while backend_settings still held {"base_url": ...} crashed
|
||||
_rebuild_enrichers with `TypeError: NoOpBackend() takes no arguments`.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.supervisor import _BACKEND_REGISTRY, build_enrichers
|
||||
|
||||
|
||||
# --- schemas exist + extra='forbid' -----------------------------------------
|
||||
|
||||
def test_every_backend_declares_a_settings_schema():
|
||||
for name, cls in _BACKEND_REGISTRY.items():
|
||||
schema = getattr(cls, "settings_schema", None)
|
||||
assert schema is not None, f"{name} has no settings_schema"
|
||||
assert isinstance(schema, type) and issubclass(schema, BaseModel), name
|
||||
|
||||
|
||||
def test_every_settings_schema_forbids_extra():
|
||||
for name, cls in _BACKEND_REGISTRY.items():
|
||||
with pytest.raises(ValidationError):
|
||||
cls.settings_schema.model_validate({"definitely_not_a_field": 1})
|
||||
|
||||
|
||||
def test_navi_schema_accepts_valid_kwargs_and_preserves_defaults():
|
||||
from central.enrichment.backends.navi import NaviBackendSettings
|
||||
|
||||
m = NaviBackendSettings()
|
||||
assert m.base_url == "http://localhost:8440"
|
||||
assert m.timeout_s == 10.0 # preserved from __init__, NOT 5.0
|
||||
assert m.headers is None
|
||||
assert m.warmup is True
|
||||
m2 = NaviBackendSettings(base_url="http://navi:8440", timeout_s=3.0, warmup=False)
|
||||
assert m2.base_url == "http://navi:8440" and m2.timeout_s == 3.0
|
||||
|
||||
|
||||
def test_noop_schema_has_no_fields():
|
||||
from central.enrichment.backends.no_op import NoOpBackendSettings
|
||||
|
||||
assert list(NoOpBackendSettings.model_fields) == []
|
||||
|
||||
|
||||
# --- build_enrichers validation ----------------------------------------------
|
||||
|
||||
def _cache(tmp_path) -> EnrichmentCache:
|
||||
return EnrichmentCache(tmp_path / "c.db")
|
||||
|
||||
|
||||
def test_build_enrichers_raises_validation_error_not_typeerror(tmp_path):
|
||||
"""The exact 2026-05-20 bug: NoOpBackend + stale base_url. Must be a clean
|
||||
ValidationError, never a TypeError from the constructor."""
|
||||
cfg = EnrichmentConfig(backend_class="NoOpBackend", backend_settings={"base_url": "http://x"})
|
||||
with pytest.raises(ValidationError):
|
||||
build_enrichers(cfg, _cache(tmp_path))
|
||||
|
||||
|
||||
def test_build_enrichers_navi_valid_settings(tmp_path):
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
|
||||
cfg = EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "warmup": False},
|
||||
)
|
||||
enrichers = build_enrichers(cfg, _cache(tmp_path))
|
||||
assert isinstance(enrichers[0]._backend, NaviBackend)
|
||||
|
||||
|
||||
def test_build_enrichers_navi_unknown_setting_rejected(tmp_path):
|
||||
cfg = EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "bogus": 1},
|
||||
)
|
||||
with pytest.raises(ValidationError):
|
||||
build_enrichers(cfg, _cache(tmp_path))
|
||||
|
||||
|
||||
# --- supervisor hot-reload keeps previous backend on ValidationError ---------
|
||||
|
||||
def _supervisor():
|
||||
from central import supervisor as sup_mod
|
||||
|
||||
config_source = MagicMock()
|
||||
config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig())
|
||||
return sup_mod.Supervisor(
|
||||
config_source=config_source,
|
||||
config_store=MagicMock(),
|
||||
nats_url="nats://localhost:4222",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_enrichment_change_keeps_previous_on_validation_error():
|
||||
"""Navi active, then a bad NOTIFY (NoOp + leftover base_url) arrives. The
|
||||
supervisor must keep NaviBackend running and not crash."""
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
|
||||
sup = _supervisor()
|
||||
# Establish a valid NaviBackend active state.
|
||||
sup._rebuild_enrichers(EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "warmup": False},
|
||||
))
|
||||
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||
active_before = sup._active_enrichment_config
|
||||
|
||||
# Bad config arrives via NOTIFY: NoOp + stale base_url -> ValidationError.
|
||||
sup._config_source.get_enrichment_config = AsyncMock(
|
||||
return_value=EnrichmentConfig(
|
||||
backend_class="NoOpBackend",
|
||||
backend_settings={"base_url": "http://navi:8440"},
|
||||
)
|
||||
)
|
||||
await sup._handle_enrichment_change() # must NOT raise
|
||||
|
||||
# Previous backend still active; config unchanged; cache NOT invalidated.
|
||||
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||
assert sup._active_enrichment_config is active_before
|
||||
cache_cls.return_value.invalidate.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_enrichment_change_applies_valid_noop_with_empty_settings():
|
||||
"""Switching to NoOp with empty backend_settings (the correct way) applies
|
||||
cleanly and invalidates the cache."""
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=3))
|
||||
sup = _supervisor()
|
||||
sup._rebuild_enrichers(EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "warmup": False},
|
||||
))
|
||||
sup._config_source.get_enrichment_config = AsyncMock(
|
||||
return_value=EnrichmentConfig(backend_class="NoOpBackend", backend_settings={})
|
||||
)
|
||||
await sup._handle_enrichment_change()
|
||||
assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
|
||||
cache_cls.return_value.invalidate.assert_awaited()
|
||||
|
||||
|
||||
# --- GUI POST validates backend_settings + does not write on error ----------
|
||||
|
||||
def _mock_pool(conn):
|
||||
pool = MagicMock()
|
||||
cm = MagicMock()
|
||||
cm.__aenter__ = AsyncMock(return_value=conn)
|
||||
cm.__aexit__ = AsyncMock(return_value=None)
|
||||
pool.acquire = MagicMock(return_value=cm)
|
||||
return pool
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gui_post_rejects_bad_backend_settings_without_writing():
|
||||
"""POST with a bad backend setting (timeout_s non-numeric) re-renders with a
|
||||
field error keyed bs_timeout_s and does NOT execute the DB upsert."""
|
||||
from central.gui.routes import enrichment_update
|
||||
|
||||
request = MagicMock()
|
||||
request.state.operator = MagicMock(username="op")
|
||||
request.state.csrf_token = "tok"
|
||||
|
||||
form = {
|
||||
"csrf_token": "tok",
|
||||
"enricher_class": "GeocoderEnricher",
|
||||
"backend_class": "NaviBackend",
|
||||
"cache_ttl_s": "86400",
|
||||
"bs_base_url": "http://navi:8440",
|
||||
"bs_timeout_s": "not-a-number",
|
||||
}
|
||||
request.form = AsyncMock(return_value=form)
|
||||
|
||||
conn = MagicMock()
|
||||
conn.execute = AsyncMock()
|
||||
conn.fetchrow = AsyncMock(return_value={})
|
||||
templates = MagicMock()
|
||||
templates.TemplateResponse.return_value = MagicMock()
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=templates), \
|
||||
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
|
||||
await enrichment_update(request)
|
||||
|
||||
conn.execute.assert_not_awaited() # no DB write on validation failure
|
||||
ctx = templates.TemplateResponse.call_args.kwargs["context"]
|
||||
assert "bs_timeout_s" in ctx["errors"]
|
||||
assert ctx["backend_class"] == "NaviBackend" # re-rendered against SUBMITTED backend
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gui_post_writes_on_valid_settings():
|
||||
from central.gui.routes import enrichment_update
|
||||
|
||||
request = MagicMock()
|
||||
request.state.operator = MagicMock(username="op")
|
||||
request.state.csrf_token = "tok"
|
||||
form = {
|
||||
"csrf_token": "tok",
|
||||
"enricher_class": "GeocoderEnricher",
|
||||
"backend_class": "NaviBackend",
|
||||
"cache_ttl_s": "86400",
|
||||
"bs_base_url": "http://navi:8440",
|
||||
"bs_timeout_s": "10",
|
||||
}
|
||||
request.form = AsyncMock(return_value=form)
|
||||
conn = MagicMock()
|
||||
conn.execute = AsyncMock()
|
||||
templates = MagicMock()
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=templates), \
|
||||
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
|
||||
resp = await enrichment_update(request)
|
||||
|
||||
conn.execute.assert_awaited() # DB upsert ran
|
||||
# the backend_settings written are the validated/dumped dict
|
||||
args = conn.execute.call_args.args
|
||||
assert "INSERT INTO config.enrichment" in args[0]
|
||||
assert resp.status_code == 302
|
||||
213
tests/test_enrichment_config_plumbing.py
Normal file
213
tests/test_enrichment_config_plumbing.py
Normal file
|
|
@ -0,0 +1,213 @@
|
|||
"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5).
|
||||
|
||||
Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload
|
||||
rebuild, cache invalidation on backend change (but not on TTL-only change),
|
||||
EnrichmentCache.invalidate, the generic json widget for backend_settings, and
|
||||
the /enrichment GUI render. No real DB / NATS — pool, config_source, and the
|
||||
EnrichmentCache class are mocked.
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
from central.gui.form_descriptors import describe_fields
|
||||
|
||||
|
||||
# --- mock pool/conn helpers -------------------------------------------------
|
||||
|
||||
def _mock_pool(conn: MagicMock) -> MagicMock:
|
||||
pool = MagicMock()
|
||||
acquire_cm = MagicMock()
|
||||
acquire_cm.__aenter__ = AsyncMock(return_value=conn)
|
||||
acquire_cm.__aexit__ = AsyncMock(return_value=None)
|
||||
pool.acquire = MagicMock(return_value=acquire_cm)
|
||||
return pool
|
||||
|
||||
|
||||
# --- ConfigStore --------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_config_store_reads_enrichment_row():
|
||||
from central.config_store import ConfigStore
|
||||
|
||||
conn = MagicMock()
|
||||
conn.fetchrow = AsyncMock(return_value={
|
||||
"enricher_class": "GeocoderEnricher",
|
||||
"backend_class": "NaviBackend",
|
||||
"backend_settings": {"base_url": "http://example.test:8440"},
|
||||
"cache_ttl_s": 3600,
|
||||
})
|
||||
store = ConfigStore(_mock_pool(conn))
|
||||
cfg = await store.get_enrichment_config()
|
||||
assert isinstance(cfg, EnrichmentConfig)
|
||||
assert cfg.backend_class == "NaviBackend"
|
||||
assert cfg.backend_settings == {"base_url": "http://example.test:8440"}
|
||||
assert cfg.cache_ttl_s == 3600
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_config_store_falls_back_to_defaults_when_row_absent():
|
||||
from central.config_store import ConfigStore
|
||||
|
||||
conn = MagicMock()
|
||||
conn.fetchrow = AsyncMock(return_value=None)
|
||||
store = ConfigStore(_mock_pool(conn))
|
||||
cfg = await store.get_enrichment_config()
|
||||
assert cfg == EnrichmentConfig() # framework defaults
|
||||
assert cfg.backend_class == "NoOpBackend"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_config_store_upsert_passes_dict_settings():
|
||||
from central.config_store import ConfigStore
|
||||
|
||||
conn = MagicMock()
|
||||
conn.execute = AsyncMock()
|
||||
store = ConfigStore(_mock_pool(conn))
|
||||
cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"})
|
||||
await store.upsert_enrichment_config(cfg)
|
||||
args = conn.execute.call_args.args
|
||||
assert "INSERT INTO config.enrichment" in args[0]
|
||||
# backend_settings passed as a dict (pool codec encodes to jsonb), not a str.
|
||||
assert {"base_url": "x"} in args
|
||||
|
||||
|
||||
# --- EnrichmentCache.invalidate ----------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cache_invalidate_all(tmp_path):
|
||||
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
|
||||
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
|
||||
await cache.set("geocoder", 3.0, 4.0, {"name": "y"})
|
||||
deleted = await cache.invalidate()
|
||||
assert deleted == 2
|
||||
assert await cache.get("geocoder", 1.0, 2.0) is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cache_invalidate_scoped_to_enricher(tmp_path):
|
||||
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
|
||||
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
|
||||
await cache.set("other", 1.0, 2.0, {"name": "z"})
|
||||
deleted = await cache.invalidate("geocoder")
|
||||
assert deleted == 1
|
||||
assert await cache.get("geocoder", 1.0, 2.0) is None
|
||||
assert await cache.get("other", 1.0, 2.0) == {"name": "z"}
|
||||
|
||||
|
||||
# --- Supervisor startup read + hot-reload ------------------------------------
|
||||
|
||||
def _supervisor_with(enrichment_cfg: EnrichmentConfig):
|
||||
"""Build a Supervisor with mocked deps and a mocked EnrichmentCache class
|
||||
(so no real /var/lib cache file is touched)."""
|
||||
from central import supervisor as sup_mod
|
||||
|
||||
config_source = MagicMock()
|
||||
config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg)
|
||||
config_store = MagicMock()
|
||||
sup = sup_mod.Supervisor(
|
||||
config_source=config_source,
|
||||
config_store=config_store,
|
||||
nats_url="nats://localhost:4222",
|
||||
)
|
||||
return sup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_supervisor_builds_navi_from_config():
|
||||
"""Given a config naming NaviBackend, the supervisor's enricher set wraps a
|
||||
NaviBackend — proves the registry resolution end-to-end."""
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
|
||||
sup = _supervisor_with(
|
||||
EnrichmentConfig(backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://x:8440", "warmup": False})
|
||||
)
|
||||
cfg = await sup._config_source.get_enrichment_config()
|
||||
sup._rebuild_enrichers(cfg)
|
||||
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hot_reload_rebuilds_and_invalidates_on_backend_change():
|
||||
from central import supervisor as sup_mod
|
||||
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
invalidate = AsyncMock(return_value=5)
|
||||
cache_cls.return_value = MagicMock(invalidate=invalidate)
|
||||
# Start at NoOp.
|
||||
sup = _supervisor_with(EnrichmentConfig())
|
||||
sup._rebuild_enrichers(EnrichmentConfig())
|
||||
assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
|
||||
# Config flips to Navi.
|
||||
sup._config_source.get_enrichment_config = AsyncMock(
|
||||
return_value=EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://x:8440", "warmup": False},
|
||||
)
|
||||
)
|
||||
await sup._handle_enrichment_change()
|
||||
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||
invalidate.assert_awaited() # backend changed -> cache wiped
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hot_reload_does_not_invalidate_on_ttl_only_change():
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
invalidate = AsyncMock(return_value=0)
|
||||
cache_cls.return_value = MagicMock(invalidate=invalidate)
|
||||
sup = _supervisor_with(EnrichmentConfig())
|
||||
sup._rebuild_enrichers(EnrichmentConfig())
|
||||
# Same backend, only TTL changes.
|
||||
sup._config_source.get_enrichment_config = AsyncMock(
|
||||
return_value=EnrichmentConfig(cache_ttl_s=3600)
|
||||
)
|
||||
await sup._handle_enrichment_change()
|
||||
invalidate.assert_not_awaited()
|
||||
|
||||
|
||||
# --- generic json widget + GUI render ----------------------------------------
|
||||
|
||||
def test_describe_fields_renders_dict_as_json_widget():
|
||||
fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})}
|
||||
assert fields["backend_settings"] == "json"
|
||||
assert fields["enricher_class"] == "text"
|
||||
assert fields["cache_ttl_s"] == "number"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enrichment_form_renders():
|
||||
from central.gui.routes import enrichment_form
|
||||
|
||||
request = MagicMock()
|
||||
request.state.operator = MagicMock(username="op")
|
||||
request.state.csrf_token = "tok"
|
||||
|
||||
conn = MagicMock()
|
||||
conn.fetchrow = AsyncMock(return_value={
|
||||
"enricher_class": "GeocoderEnricher",
|
||||
"backend_class": "NoOpBackend",
|
||||
"backend_settings": {},
|
||||
"cache_ttl_s": 86400,
|
||||
})
|
||||
templates = MagicMock()
|
||||
templates.TemplateResponse.return_value = MagicMock()
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=templates), \
|
||||
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
|
||||
await enrichment_form(request)
|
||||
|
||||
ctx = templates.TemplateResponse.call_args.kwargs["context"]
|
||||
# PR L.5: outer fields exclude backend_settings (now a per-backend fieldset);
|
||||
# NoOpBackend's fieldset has zero fields.
|
||||
outer = {f.name for f in ctx["outer_fields"]}
|
||||
assert "backend_settings" not in outer
|
||||
assert "backend_class" in outer
|
||||
assert ctx["backend_class"] == "NoOpBackend"
|
||||
assert ctx["backend_fields"] == []
|
||||
assert ctx["csrf_token"] == "tok"
|
||||
|
|
@ -456,6 +456,7 @@ class TestEnrichmentIntegration:
|
|||
"""A FIRMS event run through the supervisor's enrichment stage emerges
|
||||
with data._enriched.geocoder populated (all-null under NoOpBackend)."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
from central.supervisor import apply_enrichment, build_enrichers
|
||||
|
||||
|
|
@ -469,9 +470,8 @@ class TestEnrichmentIntegration:
|
|||
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
|
||||
assert "_enriched" not in event.data
|
||||
|
||||
enrichers = build_enrichers(
|
||||
EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db"
|
||||
)
|
||||
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||
enrichers = build_enrichers(EnrichmentConfig(), cache)
|
||||
await apply_enrichment(event, adapter.enrichment_locations, enrichers)
|
||||
|
||||
assert "_enriched" in event.data
|
||||
|
|
|
|||
125
tests/test_navi_backend.py
Normal file
125
tests/test_navi_backend.py
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
"""Tests for NaviBackend (composed Navi /api/reverse endpoint).
|
||||
|
||||
HTTP is exercised via patching the backend's `_fetch` (the codebase has no
|
||||
aioresponses/respx dep); URL construction is asserted on the pure `_url`
|
||||
helper. An env-gated integration smoke against the live Navi endpoint is
|
||||
skipped by default.
|
||||
"""
|
||||
|
||||
import os
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
||||
|
||||
# Full Navi response — already canonical shape.
|
||||
_NAVI_OK = {
|
||||
"name": "Where you are",
|
||||
"city": "Boise",
|
||||
"county": "Ada",
|
||||
"state": "Idaho",
|
||||
"country": "United States",
|
||||
"postal_code": "83702",
|
||||
"timezone": "America/Boise",
|
||||
"landclass": "Public — National Forest",
|
||||
"elevation_m": 824,
|
||||
}
|
||||
|
||||
|
||||
def _backend() -> NaviBackend:
|
||||
# warmup=False so construction issues no background task in tests.
|
||||
return NaviBackend(base_url="http://navi.test:8440", warmup=False)
|
||||
|
||||
|
||||
def test_url_construction():
|
||||
b = _backend()
|
||||
assert b._url(43.615, -116.2023) == "http://navi.test:8440/api/reverse/43.615/-116.2023"
|
||||
|
||||
|
||||
def test_base_url_trailing_slash_stripped():
|
||||
b = NaviBackend(base_url="http://navi.test:8440/", warmup=False)
|
||||
assert b._url(1.0, 2.0) == "http://navi.test:8440/api/reverse/1.0/2.0"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_happy_path_passthrough():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value=dict(_NAVI_OK))
|
||||
result = await b.reverse(43.615, -116.2023)
|
||||
assert result == _NAVI_OK
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_partial_nulls_preserved():
|
||||
"""Navi 200-with-nulls (non-US: timezone + elevation, rest null)."""
|
||||
partial = {**all_null_bundle(), "timezone": "Europe/Paris", "elevation_m": 35}
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value=partial)
|
||||
result = await b.reverse(48.85, 2.35)
|
||||
assert result["timezone"] == "Europe/Paris"
|
||||
assert result["elevation_m"] == 35
|
||||
assert result["city"] is None
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extra_keys_dropped():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value={**_NAVI_OK, "debug_internal": "leak"})
|
||||
result = await b.reverse(1.0, 2.0)
|
||||
assert "debug_internal" not in result
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_network_error_returns_all_null_never_raises():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=ConnectionError("boom"))
|
||||
result = await b.reverse(1.0, 2.0)
|
||||
assert result == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_returns_all_null():
|
||||
import asyncio
|
||||
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=asyncio.TimeoutError())
|
||||
assert await b.reverse(1.0, 2.0) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_malformed_response_returns_all_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=ValueError("not json"))
|
||||
assert await b.reverse(1.0, 2.0) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_headers_passed_through_config():
|
||||
b = NaviBackend(base_url="http://navi.test", headers={"Authorization": "Bearer x"}, warmup=False)
|
||||
assert b._headers == {"Authorization": "Bearer x"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("NAVI_INTEGRATION_TEST") != "1",
|
||||
reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint",
|
||||
)
|
||||
async def test_live_navi_boise():
|
||||
"""Integration smoke against the real endpoint (default skipped).
|
||||
|
||||
The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific
|
||||
address lives in source; defaults to localhost when unset.
|
||||
"""
|
||||
base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440")
|
||||
b = NaviBackend(base_url=base_url, warmup=False)
|
||||
result = await b.reverse(43.6150, -116.2023)
|
||||
assert result["name"] == "Where you are"
|
||||
assert result["city"] == "Boise"
|
||||
assert result["state"] == "Idaho"
|
||||
assert result["elevation_m"] is not None
|
||||
assert abs(float(result["elevation_m"]) - 824) < 50
|
||||
118
tests/test_nominatim_backend.py
Normal file
118
tests/test_nominatim_backend.py
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
"""Tests for NominatimBackend (OSM Nominatim /reverse jsonv2)."""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from central.enrichment.backends.nominatim import (
|
||||
DEFAULT_USER_AGENT,
|
||||
NominatimBackend,
|
||||
)
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
||||
|
||||
_NOMINATIM_OK = {
|
||||
"name": "Boise",
|
||||
"display_name": "Boise, Ada County, Idaho, United States",
|
||||
"address": {
|
||||
"city": "Boise",
|
||||
"county": "Ada County",
|
||||
"state": "Idaho",
|
||||
"country": "United States",
|
||||
"postcode": "83702",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _backend(**kw) -> NominatimBackend:
|
||||
kw.setdefault("base_url", "http://nominatim.test")
|
||||
kw.setdefault("rate_limit_per_sec", 0) # disabled by default in tests
|
||||
return NominatimBackend(**kw)
|
||||
|
||||
|
||||
def test_url_and_format():
|
||||
b = _backend()
|
||||
url = b._url(43.6, -116.2)
|
||||
assert url.startswith("http://nominatim.test/reverse?")
|
||||
assert "format=jsonv2" in url and "lat=43.6" in url and "lon=-116.2" in url
|
||||
|
||||
|
||||
def test_user_agent_header_present():
|
||||
b = _backend()
|
||||
assert b._request_headers()["User-Agent"] == DEFAULT_USER_AGENT
|
||||
|
||||
|
||||
def test_custom_user_agent():
|
||||
b = _backend(user_agent="myapp/1.0 (me@example.com)")
|
||||
assert b._request_headers()["User-Agent"] == "myapp/1.0 (me@example.com)"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_happy_path_maps_address_block():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK))
|
||||
result = await b.reverse(43.6, -116.2)
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
assert result["name"] == "Boise"
|
||||
assert result["city"] == "Boise"
|
||||
assert result["county"] == "Ada County"
|
||||
assert result["state"] == "Idaho"
|
||||
assert result["country"] == "United States"
|
||||
assert result["postal_code"] == "83702"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_navi_only_fields_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value=dict(_NOMINATIM_OK))
|
||||
result = await b.reverse(43.6, -116.2)
|
||||
assert result["timezone"] is None
|
||||
assert result["landclass"] is None
|
||||
assert result["elevation_m"] is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_city_falls_back_to_town_then_village():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value={"address": {"village": "Tinytown"}})
|
||||
result = await b.reverse(1.0, 2.0)
|
||||
assert result["city"] == "Tinytown"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_network_error_returns_all_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=ConnectionError("down"))
|
||||
assert await b.reverse(1.0, 2.0) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_malformed_json_returns_all_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=ValueError("bad"))
|
||||
assert await b.reverse(1.0, 2.0) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rate_limit_disabled_does_not_sleep():
|
||||
b = _backend(rate_limit_per_sec=0)
|
||||
with patch("central.enrichment.backends.nominatim.asyncio.sleep") as slp:
|
||||
await b._throttle()
|
||||
await b._throttle()
|
||||
slp.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rate_limit_spaces_consecutive_requests():
|
||||
"""With a 1/sec limit, the second back-to-back call must sleep a positive
|
||||
interval. Mock monotonic to a fixed instant so the throttle computes a wait."""
|
||||
b = _backend(rate_limit_per_sec=1.0)
|
||||
sleeps: list[float] = []
|
||||
|
||||
async def _fake_sleep(d):
|
||||
sleeps.append(d)
|
||||
|
||||
with patch("central.enrichment.backends.nominatim.time.monotonic", return_value=100.0), \
|
||||
patch("central.enrichment.backends.nominatim.asyncio.sleep", side_effect=_fake_sleep):
|
||||
await b._throttle() # first: last_request_at was 0 -> no wait
|
||||
await b._throttle() # second: now==100, last==100 -> wait ~1.0s
|
||||
assert any(d > 0 for d in sleeps), f"expected a positive throttle sleep, got {sleeps}"
|
||||
92
tests/test_photon_backend.py
Normal file
92
tests/test_photon_backend.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
"""Tests for PhotonBackend (raw Photon /reverse)."""
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from central.enrichment.backends.photon import PhotonBackend
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
||||
|
||||
# Photon reverse response shape.
|
||||
_PHOTON_OK = {
|
||||
"features": [
|
||||
{
|
||||
"properties": {
|
||||
"name": "Boise",
|
||||
"city": "Boise",
|
||||
"county": "Ada County",
|
||||
"state": "Idaho",
|
||||
"country": "United States",
|
||||
"postcode": "83702",
|
||||
"osm_key": "place",
|
||||
},
|
||||
"geometry": {"type": "Point", "coordinates": [-116.2, 43.6]},
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def _backend() -> PhotonBackend:
|
||||
return PhotonBackend(base_url="http://photon.test:2322")
|
||||
|
||||
|
||||
def test_url_construction():
|
||||
b = _backend()
|
||||
url = b._url(43.6, -116.2)
|
||||
assert url.startswith("http://photon.test:2322/reverse?")
|
||||
assert "lat=43.6" in url and "lon=-116.2" in url and "limit=1" in url
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_happy_path_maps_to_canonical():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value=dict(_PHOTON_OK))
|
||||
result = await b.reverse(43.6, -116.2)
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
assert result["city"] == "Boise"
|
||||
assert result["county"] == "Ada County"
|
||||
assert result["state"] == "Idaho"
|
||||
assert result["country"] == "United States"
|
||||
assert result["postal_code"] == "83702" # mapped from Photon 'postcode'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_navi_only_fields_are_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value=dict(_PHOTON_OK))
|
||||
result = await b.reverse(43.6, -116.2)
|
||||
assert result["timezone"] is None
|
||||
assert result["landclass"] is None
|
||||
assert result["elevation_m"] is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_features_returns_all_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value={"features": []})
|
||||
assert await b.reverse(0.0, 0.0) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_missing_properties_keys_become_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(return_value={"features": [{"properties": {"name": "X"}}]})
|
||||
result = await b.reverse(1.0, 2.0)
|
||||
assert result["name"] == "X"
|
||||
assert result["city"] is None
|
||||
assert result["postal_code"] is None
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_network_error_returns_all_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=ConnectionError("down"))
|
||||
assert await b.reverse(1.0, 2.0) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_malformed_json_returns_all_null():
|
||||
b = _backend()
|
||||
b._fetch = AsyncMock(side_effect=ValueError("bad json"))
|
||||
assert await b.reverse(1.0, 2.0) == all_null_bundle()
|
||||
|
|
@ -23,8 +23,14 @@ from pathlib import Path
|
|||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapter_discovery import discover_adapters
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS
|
||||
from central.streams import STREAMS
|
||||
|
||||
# The verbatim design-principle quote that must stay in §2 (Matt, 2026-05-19).
|
||||
_DESIGN_PRINCIPLE_QUOTE = (
|
||||
"Central takes it all and gives it all. It's up to the pipe to do with it"
|
||||
)
|
||||
|
||||
DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "PRODUCER-INTEGRATION.md"
|
||||
|
||||
|
||||
|
|
@ -186,6 +192,48 @@ def test_streams_snippet_quotes_live_registry():
|
|||
)
|
||||
|
||||
|
||||
def _section(doc: str, header_re: str) -> str:
|
||||
"""Return the body of the section whose header matches header_re, up to the
|
||||
next same-or-higher-level header."""
|
||||
m = re.search(header_re + r"\s*\n(.*?)(?=^## |\Z)", doc, re.DOTALL | re.MULTILINE)
|
||||
assert m, f"doc missing section matching {header_re!r}"
|
||||
return m.group(1)
|
||||
|
||||
|
||||
def test_design_principle_quote_present_in_section_2():
|
||||
"""§2 must still carry the verbatim Matt quote — the reframe changes the
|
||||
translation beneath it, not the quote itself."""
|
||||
section = _section(_doc_text(), r"^## 2\. The design principle")
|
||||
assert _DESIGN_PRINCIPLE_QUOTE in section, "verbatim design-principle quote missing from §2"
|
||||
|
||||
|
||||
def test_anti_pattern_10_1_section_exists():
|
||||
"""§10.1 must still exist as a subsection (content reframed to
|
||||
'enrichment outside the framework', structure preserved)."""
|
||||
doc = _doc_text()
|
||||
assert re.search(r"^### 10\.1 ", doc, re.MULTILINE), "doc missing '### 10.1' subsection"
|
||||
|
||||
|
||||
def test_enrichment_contract_section_13_has_all_protocol_references():
|
||||
"""New §13 must name all four enrichment contract types verbatim."""
|
||||
section = _section(_doc_text(), r"^## 13\. Enrichment contract")
|
||||
for ref in ("Enricher", "GeocoderEnricher", "GeocoderBackend", "NoOpBackend"):
|
||||
assert ref in section, f"§13 missing reference to {ref!r}"
|
||||
|
||||
|
||||
def test_enrichment_coverage_matrix_lists_exactly_geocoder_fields():
|
||||
"""The §13 per-field coverage matrix must list exactly the canonical
|
||||
GEOCODER_FIELDS — derived from code, not hardcoded here."""
|
||||
section = _section(_doc_text(), r"^## 13\. Enrichment contract")
|
||||
# Matrix rows look like: | `field_name` | ... |
|
||||
row_fields = set(re.findall(r"^\|\s*`([a-z_]+)`\s*\|", section, re.MULTILINE))
|
||||
assert row_fields == set(GEOCODER_FIELDS), (
|
||||
f"coverage-matrix field drift: "
|
||||
f"doc-only={row_fields - set(GEOCODER_FIELDS)}, "
|
||||
f"code-only={set(GEOCODER_FIELDS) - row_fields}"
|
||||
)
|
||||
|
||||
|
||||
def test_no_orphan_adapter_references_in_anti_patterns():
|
||||
"""Anti-patterns section names two real adapter modules as examples
|
||||
(firms, inciweb in §10.4). Those names must still resolve via
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue