mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-22 18:44:40 +02:00
feat(3-J): enrichment framework + GeocoderEnricher + NoOpBackend + FIRMS pilot
First of three PRs for v0.5.0 (J: framework; K: real geocoder backends +
doc revisions; L: operator events tab + per-adapter render + events-map fix).
Design pivot: the Phase 2 "no enrichment, upstream verbatim" reading of
Matt's principle is reframed — consumers can't do follow-up lookups, they
only see what's on the wire, so whatever Central doesn't enrich is
effectively missing downstream. Enrichment is now expected. The producer-doc
§2/§10.1 rewrite lands in PR K; this PR builds the framework PR K documents.
New package src/central/enrichment/:
- base.py Enricher Protocol (name + async enrich(location) -> dict).
- geocoder.py GeocoderEnricher + GeocoderBackend Protocol + the locked
GEOCODER_FIELDS set (name, city, county, state, country,
postal_code, timezone, landclass, elevation_m) + all_null_bundle().
- cache.py EnrichmentCache — stdlib sqlite3 off the event loop via
asyncio.to_thread (no async-sqlite dep). Keyed on
(enricher_name, lat_4dp, lon_4dp); per-enricher TTL (24h
default); fresh connection per op (sqlite3 isn't thread-safe
to share). Cache even all-null; never cache backend failures.
- backends/no_op.py NoOpBackend — all-null bundle, the PR J default.
Provenance: enrichment results land under event.data["_enriched"][<name>];
everything else in data stays upstream verbatim.
Wiring:
- adapter.py enrichment_locations: list[tuple[str,str]] = [] class attr.
Empty (default) = publish as-is, no enrichment.
- config_models.py EnrichmentConfig (enricher_class, backend_class,
backend_settings, cache_ttl_s). Read once at startup.
- supervisor.py build_enrichers() + apply_enrichment(); enrichment runs
after dedup, before wrap_event, in the poll loop. Class-name
registries for enricher/backend resolution (PR K extends).
- firms.py enrichment_locations = [("latitude","longitude")] — pilot.
Enrichment config is read once at supervisor startup; hot-reload is out of
scope for PR J (noted in EnrichmentConfig + build_enrichers docstrings).
Tests (16 new):
- test_enrichment_framework.py (9): parent-dir/table init, cache miss->hit,
TTL expiry, 4dp rounding, nearby-coord collapse, concurrent-set single-row,
backend-failure all-null-not-cached (retries), success cached (one backend
call), all-null cached.
- test_geocoder_enricher.py (5): NoOp all-null, field-set == GEOCODER_FIELDS,
null-coords short-circuit (no backend call), name=="geocoder", sequential
same-coords single backend call.
- test_firms.py (+2): enrichment_locations declared + paths resolve to floats
in a real event (structural, not literal); event through supervisor
apply_enrichment emerges with data._enriched.geocoder == all-null bundle.
Verification: full pytest 495 passed (was 479; +16). grep for
subject_for_event/_ADAPTER_REGISTRY clean. Module imports cleanly.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
765c07aa7f
commit
d960d1f5e0
13 changed files with 685 additions and 1 deletions
160
tests/test_enrichment_framework.py
Normal file
160
tests/test_enrichment_framework.py
Normal file
|
|
@ -0,0 +1,160 @@
|
|||
"""Tests for the enrichment cache + framework wiring.
|
||||
|
||||
Covers cache hit/miss/TTL/rounding, idempotent concurrent writes, and the
|
||||
"backend failure -> all-null, not cached" contract via GeocoderEnricher.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from central.enrichment.cache import EnrichmentCache, round_coord
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS, GeocoderEnricher, all_null_bundle
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cache_path(tmp_path: Path) -> Path:
|
||||
return tmp_path / "nested" / "enrichment_cache.db"
|
||||
|
||||
|
||||
def test_init_creates_parent_dir_and_table(cache_path: Path):
|
||||
assert not cache_path.parent.exists()
|
||||
cache = EnrichmentCache(cache_path, ttl_s=60)
|
||||
assert cache_path.parent.is_dir()
|
||||
# Table exists and is queryable.
|
||||
conn = sqlite3.connect(cache_path)
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='enrichment_cache'"
|
||||
)
|
||||
assert cur.fetchone() is not None
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cache_miss_then_hit(cache_path: Path):
|
||||
cache = EnrichmentCache(cache_path, ttl_s=3600)
|
||||
assert await cache.get("geocoder", 45.0, -116.0) is None # miss
|
||||
payload = {"name": "Somewhere", "state": "ID"}
|
||||
await cache.set("geocoder", 45.0, -116.0, payload)
|
||||
hit = await cache.get("geocoder", 45.0, -116.0)
|
||||
assert hit == payload
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ttl_expiry_returns_miss(cache_path: Path):
|
||||
cache = EnrichmentCache(cache_path, ttl_s=0) # everything immediately stale
|
||||
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
|
||||
# ttl_s=0 -> age (>0) always exceeds ttl -> treated as expired.
|
||||
assert await cache.get("geocoder", 1.0, 2.0) is None
|
||||
|
||||
|
||||
def test_round_coord_4dp():
|
||||
assert round_coord(45.123456789) == 45.1235
|
||||
assert round_coord(-116.000049) == -116.0
|
||||
assert round_coord(12.99995) == 13.0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rounding_collapses_nearby_coords_to_same_key(cache_path: Path):
|
||||
cache = EnrichmentCache(cache_path, ttl_s=3600)
|
||||
await cache.set("geocoder", 45.12341, -116.45678, {"name": "rounded"})
|
||||
# 45.123413 / -116.456784 round to the same 4dp key -> same row.
|
||||
hit = await cache.get("geocoder", 45.123413, -116.456784)
|
||||
assert hit == {"name": "rounded"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_sets_do_not_double_write(cache_path: Path):
|
||||
cache = EnrichmentCache(cache_path, ttl_s=3600)
|
||||
await asyncio.gather(
|
||||
*[cache.set("geocoder", 10.0, 20.0, {"n": i}) for i in range(20)]
|
||||
)
|
||||
conn = sqlite3.connect(cache_path)
|
||||
try:
|
||||
count = conn.execute(
|
||||
"SELECT COUNT(*) FROM enrichment_cache WHERE enricher_name='geocoder' "
|
||||
"AND lat_rounded=? AND lon_rounded=?",
|
||||
(10.0, 20.0),
|
||||
).fetchone()[0]
|
||||
finally:
|
||||
conn.close()
|
||||
assert count == 1, "PRIMARY KEY must collapse concurrent writes to one row"
|
||||
|
||||
|
||||
class _CountingBackend:
|
||||
"""Backend that counts reverse() calls; lets tests prove cache hits."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.calls = 0
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
self.calls += 1
|
||||
return {**all_null_bundle(), "name": "Counted", "state": "ID"}
|
||||
|
||||
|
||||
class _ExplodingBackend:
|
||||
"""Backend that violates the never-raise contract."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.calls = 0
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
self.calls += 1
|
||||
raise RuntimeError("upstream geocoder down")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backend_failure_returns_all_null_and_does_not_cache(cache_path: Path):
|
||||
cache = EnrichmentCache(cache_path, ttl_s=3600)
|
||||
backend = _ExplodingBackend()
|
||||
enricher = GeocoderEnricher(backend, cache=cache)
|
||||
|
||||
result = await enricher.enrich({"lat": 5.0, "lon": 6.0})
|
||||
assert result == all_null_bundle()
|
||||
|
||||
# Nothing cached -> a second call retries the backend (calls increments).
|
||||
assert await cache.get("geocoder", 5.0, 6.0) is None
|
||||
await enricher.enrich({"lat": 5.0, "lon": 6.0})
|
||||
assert backend.calls == 2, "failed lookups must not be cached (must retry)"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_result_is_cached_and_avoids_second_backend_call(cache_path: Path):
|
||||
cache = EnrichmentCache(cache_path, ttl_s=3600)
|
||||
backend = _CountingBackend()
|
||||
enricher = GeocoderEnricher(backend, cache=cache)
|
||||
|
||||
first = await enricher.enrich({"lat": 7.5, "lon": 8.5})
|
||||
second = await enricher.enrich({"lat": 7.5, "lon": 8.5})
|
||||
assert first == second
|
||||
assert backend.calls == 1, "second call with same coords must hit cache"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_null_result_is_cached(cache_path: Path):
|
||||
"""A backend that resolves nothing still gets cached — the contract says
|
||||
cache even all-null so we don't re-hammer the backend for known-empty
|
||||
coordinates."""
|
||||
|
||||
class _NullCounting:
|
||||
def __init__(self) -> None:
|
||||
self.calls = 0
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
self.calls += 1
|
||||
return all_null_bundle()
|
||||
|
||||
cache = EnrichmentCache(cache_path, ttl_s=3600)
|
||||
backend = _NullCounting()
|
||||
enricher = GeocoderEnricher(backend, cache=cache)
|
||||
await enricher.enrich({"lat": 1.0, "lon": 1.0})
|
||||
await enricher.enrich({"lat": 1.0, "lon": 1.0})
|
||||
assert backend.calls == 1
|
||||
cached = await cache.get("geocoder", 1.0, 1.0)
|
||||
assert cached == all_null_bundle()
|
||||
|
|
@ -421,3 +421,58 @@ class TestApplyConfig:
|
|||
assert adapter._satellites == ["VIIRS_NOAA20_NRT"]
|
||||
|
||||
await adapter.shutdown()
|
||||
|
||||
|
||||
class TestEnrichmentIntegration:
|
||||
"""FIRMS is the PR J enrichment pilot."""
|
||||
|
||||
def test_enrichment_locations_declared_and_resolvable(self, temp_db_path, mock_config_store):
|
||||
"""FIRMS declares enrichment_locations and the declared paths actually
|
||||
resolve to coordinates in a real event's data — verified structurally,
|
||||
not by hardcoding the literal tuple."""
|
||||
locations = getattr(FIRMSAdapter, "enrichment_locations")
|
||||
assert isinstance(locations, list) and len(locations) >= 1
|
||||
for tup in locations:
|
||||
assert isinstance(tup, tuple) and len(tup) == 2
|
||||
assert all(isinstance(p, str) for p in tup)
|
||||
|
||||
config = make_adapter_config()
|
||||
adapter = FIRMSAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT")
|
||||
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
|
||||
# Every declared (lat_path, lon_path) must resolve to a float in data.
|
||||
for lat_path, lon_path in locations:
|
||||
assert isinstance(event.data.get(lat_path), float)
|
||||
assert isinstance(event.data.get(lon_path), float)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_passes_through_supervisor_enrichment(
|
||||
self, tmp_path, temp_db_path, mock_config_store
|
||||
):
|
||||
"""A FIRMS event run through the supervisor's enrichment stage emerges
|
||||
with data._enriched.geocoder populated (all-null under NoOpBackend)."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
from central.supervisor import apply_enrichment, build_enrichers
|
||||
|
||||
config = make_adapter_config()
|
||||
adapter = FIRMSAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT")
|
||||
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
|
||||
assert "_enriched" not in event.data
|
||||
|
||||
enrichers = build_enrichers(
|
||||
EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db"
|
||||
)
|
||||
await apply_enrichment(event, adapter.enrichment_locations, enrichers)
|
||||
|
||||
assert "_enriched" in event.data
|
||||
assert event.data["_enriched"]["geocoder"] == all_null_bundle()
|
||||
|
|
|
|||
65
tests/test_geocoder_enricher.py
Normal file
65
tests/test_geocoder_enricher.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
"""Tests for GeocoderEnricher with the default NoOpBackend."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.enrichment.geocoder import (
|
||||
GEOCODER_FIELDS,
|
||||
GeocoderEnricher,
|
||||
all_null_bundle,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_noop_backend_returns_all_null_bundle():
|
||||
enricher = GeocoderEnricher(NoOpBackend())
|
||||
result = await enricher.enrich({"lat": 45.0, "lon": -116.0})
|
||||
assert result == all_null_bundle()
|
||||
assert all(v is None for v in result.values())
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_field_set_matches_locked_protocol():
|
||||
"""Every field in the locked GEOCODER_FIELDS set is present (all None for
|
||||
NoOpBackend), and no extra keys leak through — bidirectional equality."""
|
||||
enricher = GeocoderEnricher(NoOpBackend())
|
||||
result = await enricher.enrich({"lat": 1.0, "lon": 2.0})
|
||||
assert set(result.keys()) == set(GEOCODER_FIELDS)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_missing_coords_returns_all_null_without_backend_call():
|
||||
class _Tripwire:
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
raise AssertionError("backend must not be called for null coords")
|
||||
|
||||
enricher = GeocoderEnricher(_Tripwire())
|
||||
assert await enricher.enrich({"lat": None, "lon": None}) == all_null_bundle() # type: ignore[dict-item]
|
||||
assert await enricher.enrich({}) == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enricher_name_is_geocoder():
|
||||
"""The name keys the result under event.data['_enriched'][name]."""
|
||||
assert GeocoderEnricher(NoOpBackend()).name == "geocoder"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sequential_calls_same_coords_hit_cache(tmp_path):
|
||||
class _CountingNoOp:
|
||||
def __init__(self) -> None:
|
||||
self.calls = 0
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
self.calls += 1
|
||||
return all_null_bundle()
|
||||
|
||||
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
|
||||
backend = _CountingNoOp()
|
||||
enricher = GeocoderEnricher(backend, cache=cache)
|
||||
for _ in range(5):
|
||||
await enricher.enrich({"lat": 33.5, "lon": -111.9})
|
||||
assert backend.calls == 1, "repeated identical coords must collapse to one backend call"
|
||||
Loading…
Add table
Add a link
Reference in a new issue