From f0c044505f8311626289e8f101f801352d0a82bd Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Thu, 21 May 2026 04:04:25 +0000 Subject: [PATCH] fix(3-M.b): apply_enrichment always attaches _enriched for declared adapters Coordless events such as removal tombstones with null lat/lon, from adapters that declare enrichment_locations, previously fell off the loop without writing _enriched and carried no geocoder bundle at all, violating the every-event-carries-_enriched design rule. Add a post-loop fallback that resolves the null location to an all-null bundle per enricher. Adapters with no enrichment_locations remain skipped. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/central/supervisor.py | 18 +++++- tests/test_apply_enrichment_coordless.py | 78 ++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 tests/test_apply_enrichment_coordless.py diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 90913a7..26db1da 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -76,9 +76,12 @@ async def apply_enrichment( No-op when the adapter declares no enrichment_locations or no enrichers are registered. Uses the first (lat_path, lon_path) tuple that resolves to - a non-null coordinate pair in event.data. Each enricher's result is keyed - by enricher.name. Mutates the data dict in place (Event is frozen, but its - data dict is not — this avoids a model_copy on every published event). + a non-null coordinate pair in event.data. If no declared pair resolves to + coordinates, still attaches an all-null bundle so that every event from an + enriched adapter carries _enriched (consumers get a stable field set). + Each enricher's result is keyed by enricher.name. Mutates the data dict in + place (Event is frozen, but its data dict is not — this avoids a + model_copy on every published event). """ if not enrichment_locations or not enrichers: return @@ -93,6 +96,15 @@ async def apply_enrichment( enriched[enricher.name] = await enricher.enrich(location) event.data["_enriched"] = enriched return + # No declared pair resolved to coordinates. Still attach _enriched: each + # enricher resolves the null location to its own all-null bundle (per the + # never-raise contract), so coordless events (e.g. removal tombstones) + # carry the same shape as enriched ones. + null_location = {"lat": None, "lon": None} + enriched = {} + for enricher in enrichers: + enriched[enricher.name] = await enricher.enrich(null_location) + event.data["_enriched"] = enriched # Stream subject mappings -- derived from the registry; every stream is included # (META too: supervisor must create it in JetStream even though archive skips it). diff --git a/tests/test_apply_enrichment_coordless.py b/tests/test_apply_enrichment_coordless.py new file mode 100644 index 0000000..870485d --- /dev/null +++ b/tests/test_apply_enrichment_coordless.py @@ -0,0 +1,78 @@ +"""Regression tests for apply_enrichment's coordless path. + +Design principle: every event from an adapter that declares enrichment_locations +must carry data["_enriched"] — populated when coordinates resolve, an all-null +bundle when they don't (e.g. removal tombstones with no lat/lon). Adapters that +declare no enrichment_locations are still skipped entirely. +""" + +from datetime import datetime, timezone +from typing import Any + +import pytest + +from central.config_models import EnrichmentConfig +from central.enrichment.cache import EnrichmentCache +from central.enrichment.geocoder import GeocoderEnricher, all_null_bundle +from central.models import Event, Geo +from central.supervisor import apply_enrichment, build_enrichers + + +def _make_event(data: dict[str, Any]) -> Event: + return Event( + id="evt-1", + adapter="usgs_quake", + category="quake.event.test", + time=datetime(2026, 1, 1, tzinfo=timezone.utc), + geo=Geo(), + data=data, + ) + + +class _PopulatingBackend: + """Deterministic backend that resolves any real coords to a fixed place.""" + + async def reverse(self, lat: float, lon: float) -> dict[str, Any]: + return {**all_null_bundle(), "city": "Boise", "state": "ID"} + + +@pytest.mark.asyncio +async def test_coordless_event_with_declared_locations_gets_null_bundle(tmp_path): + """An event whose declared coord paths are all None still gets _enriched.""" + cache = EnrichmentCache(tmp_path / "enrichment_cache.db") + enrichers = build_enrichers(EnrichmentConfig(), cache) + event = _make_event( + {"latitude": None, "longitude": None, "reason": "fallen_off_current_service"} + ) + assert "_enriched" not in event.data + + await apply_enrichment(event, [("latitude", "longitude")], enrichers) + + assert event.data["_enriched"]["geocoder"] == all_null_bundle() + + +@pytest.mark.asyncio +async def test_event_with_coords_still_enriches_normally(tmp_path): + """The coord-bearing path is unchanged: the backend is consulted and its + resolved fields land in the bundle.""" + cache = EnrichmentCache(tmp_path / "enrichment_cache.db") + enricher = GeocoderEnricher(_PopulatingBackend(), cache=cache) + event = _make_event({"latitude": 43.0, "longitude": -116.0}) + + await apply_enrichment(event, [("latitude", "longitude")], [enricher]) + + bundle = event.data["_enriched"]["geocoder"] + assert bundle["state"] == "ID" + assert bundle["city"] == "Boise" + + +@pytest.mark.asyncio +async def test_adapter_with_no_enrichment_locations_still_skipped(tmp_path): + """Adapters declaring no enrichment_locations are skipped — no _enriched.""" + cache = EnrichmentCache(tmp_path / "enrichment_cache.db") + enrichers = build_enrichers(EnrichmentConfig(), cache) + event = _make_event({"latitude": 43.0, "longitude": -116.0}) + + await apply_enrichment(event, [], enrichers) + + assert "_enriched" not in event.data