mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
fix(3-M.b): apply_enrichment always attaches _enriched for declared adapters
Coordless events such as removal tombstones with null lat/lon, from adapters that declare enrichment_locations, previously fell off the loop without writing _enriched and carried no geocoder bundle at all, violating the every-event-carries-_enriched design rule. Add a post-loop fallback that resolves the null location to an all-null bundle per enricher. Adapters with no enrichment_locations remain skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bb6db03a07
commit
f0c044505f
2 changed files with 93 additions and 3 deletions
|
|
@ -76,9 +76,12 @@ async def apply_enrichment(
|
|||
|
||||
No-op when the adapter declares no enrichment_locations or no enrichers
|
||||
are registered. Uses the first (lat_path, lon_path) tuple that resolves to
|
||||
a non-null coordinate pair in event.data. Each enricher's result is keyed
|
||||
by enricher.name. Mutates the data dict in place (Event is frozen, but its
|
||||
data dict is not — this avoids a model_copy on every published event).
|
||||
a non-null coordinate pair in event.data. If no declared pair resolves to
|
||||
coordinates, still attaches an all-null bundle so that every event from an
|
||||
enriched adapter carries _enriched (consumers get a stable field set).
|
||||
Each enricher's result is keyed by enricher.name. Mutates the data dict in
|
||||
place (Event is frozen, but its data dict is not — this avoids a
|
||||
model_copy on every published event).
|
||||
"""
|
||||
if not enrichment_locations or not enrichers:
|
||||
return
|
||||
|
|
@ -93,6 +96,15 @@ async def apply_enrichment(
|
|||
enriched[enricher.name] = await enricher.enrich(location)
|
||||
event.data["_enriched"] = enriched
|
||||
return
|
||||
# No declared pair resolved to coordinates. Still attach _enriched: each
|
||||
# enricher resolves the null location to its own all-null bundle (per the
|
||||
# never-raise contract), so coordless events (e.g. removal tombstones)
|
||||
# carry the same shape as enriched ones.
|
||||
null_location = {"lat": None, "lon": None}
|
||||
enriched = {}
|
||||
for enricher in enrichers:
|
||||
enriched[enricher.name] = await enricher.enrich(null_location)
|
||||
event.data["_enriched"] = enriched
|
||||
|
||||
# Stream subject mappings -- derived from the registry; every stream is included
|
||||
# (META too: supervisor must create it in JetStream even though archive skips it).
|
||||
|
|
|
|||
78
tests/test_apply_enrichment_coordless.py
Normal file
78
tests/test_apply_enrichment_coordless.py
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
"""Regression tests for apply_enrichment's coordless path.
|
||||
|
||||
Design principle: every event from an adapter that declares enrichment_locations
|
||||
must carry data["_enriched"] — populated when coordinates resolve, an all-null
|
||||
bundle when they don't (e.g. removal tombstones with no lat/lon). Adapters that
|
||||
declare no enrichment_locations are still skipped entirely.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.enrichment.geocoder import GeocoderEnricher, all_null_bundle
|
||||
from central.models import Event, Geo
|
||||
from central.supervisor import apply_enrichment, build_enrichers
|
||||
|
||||
|
||||
def _make_event(data: dict[str, Any]) -> Event:
|
||||
return Event(
|
||||
id="evt-1",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.test",
|
||||
time=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||
geo=Geo(),
|
||||
data=data,
|
||||
)
|
||||
|
||||
|
||||
class _PopulatingBackend:
|
||||
"""Deterministic backend that resolves any real coords to a fixed place."""
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
return {**all_null_bundle(), "city": "Boise", "state": "ID"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_coordless_event_with_declared_locations_gets_null_bundle(tmp_path):
|
||||
"""An event whose declared coord paths are all None still gets _enriched."""
|
||||
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||
enrichers = build_enrichers(EnrichmentConfig(), cache)
|
||||
event = _make_event(
|
||||
{"latitude": None, "longitude": None, "reason": "fallen_off_current_service"}
|
||||
)
|
||||
assert "_enriched" not in event.data
|
||||
|
||||
await apply_enrichment(event, [("latitude", "longitude")], enrichers)
|
||||
|
||||
assert event.data["_enriched"]["geocoder"] == all_null_bundle()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_with_coords_still_enriches_normally(tmp_path):
|
||||
"""The coord-bearing path is unchanged: the backend is consulted and its
|
||||
resolved fields land in the bundle."""
|
||||
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||
enricher = GeocoderEnricher(_PopulatingBackend(), cache=cache)
|
||||
event = _make_event({"latitude": 43.0, "longitude": -116.0})
|
||||
|
||||
await apply_enrichment(event, [("latitude", "longitude")], [enricher])
|
||||
|
||||
bundle = event.data["_enriched"]["geocoder"]
|
||||
assert bundle["state"] == "ID"
|
||||
assert bundle["city"] == "Boise"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_adapter_with_no_enrichment_locations_still_skipped(tmp_path):
|
||||
"""Adapters declaring no enrichment_locations are skipped — no _enriched."""
|
||||
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||
enrichers = build_enrichers(EnrichmentConfig(), cache)
|
||||
event = _make_event({"latitude": 43.0, "longitude": -116.0})
|
||||
|
||||
await apply_enrichment(event, [], enrichers)
|
||||
|
||||
assert "_enriched" not in event.data
|
||||
Loading…
Add table
Add a link
Reference in a new issue