mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Merge pull request #46 from zvx-echo6/feature/3-mb-apply-enrichment-coordless
fix(3-M.b): apply_enrichment always attaches _enriched (coordless events)
This commit is contained in:
commit
1cf1eabb1c
2 changed files with 93 additions and 3 deletions
|
|
@ -76,9 +76,12 @@ async def apply_enrichment(
|
||||||
|
|
||||||
No-op when the adapter declares no enrichment_locations or no enrichers
|
No-op when the adapter declares no enrichment_locations or no enrichers
|
||||||
are registered. Uses the first (lat_path, lon_path) tuple that resolves to
|
are registered. Uses the first (lat_path, lon_path) tuple that resolves to
|
||||||
a non-null coordinate pair in event.data. Each enricher's result is keyed
|
a non-null coordinate pair in event.data. If no declared pair resolves to
|
||||||
by enricher.name. Mutates the data dict in place (Event is frozen, but its
|
coordinates, still attaches an all-null bundle so that every event from an
|
||||||
data dict is not — this avoids a model_copy on every published event).
|
enriched adapter carries _enriched (consumers get a stable field set).
|
||||||
|
Each enricher's result is keyed by enricher.name. Mutates the data dict in
|
||||||
|
place (Event is frozen, but its data dict is not — this avoids a
|
||||||
|
model_copy on every published event).
|
||||||
"""
|
"""
|
||||||
if not enrichment_locations or not enrichers:
|
if not enrichment_locations or not enrichers:
|
||||||
return
|
return
|
||||||
|
|
@ -93,6 +96,15 @@ async def apply_enrichment(
|
||||||
enriched[enricher.name] = await enricher.enrich(location)
|
enriched[enricher.name] = await enricher.enrich(location)
|
||||||
event.data["_enriched"] = enriched
|
event.data["_enriched"] = enriched
|
||||||
return
|
return
|
||||||
|
# No declared pair resolved to coordinates. Still attach _enriched: each
|
||||||
|
# enricher resolves the null location to its own all-null bundle (per the
|
||||||
|
# never-raise contract), so coordless events (e.g. removal tombstones)
|
||||||
|
# carry the same shape as enriched ones.
|
||||||
|
null_location = {"lat": None, "lon": None}
|
||||||
|
enriched = {}
|
||||||
|
for enricher in enrichers:
|
||||||
|
enriched[enricher.name] = await enricher.enrich(null_location)
|
||||||
|
event.data["_enriched"] = enriched
|
||||||
|
|
||||||
# Stream subject mappings -- derived from the registry; every stream is included
|
# Stream subject mappings -- derived from the registry; every stream is included
|
||||||
# (META too: supervisor must create it in JetStream even though archive skips it).
|
# (META too: supervisor must create it in JetStream even though archive skips it).
|
||||||
|
|
|
||||||
78
tests/test_apply_enrichment_coordless.py
Normal file
78
tests/test_apply_enrichment_coordless.py
Normal file
|
|
@ -0,0 +1,78 @@
|
||||||
|
"""Regression tests for apply_enrichment's coordless path.
|
||||||
|
|
||||||
|
Design principle: every event from an adapter that declares enrichment_locations
|
||||||
|
must carry data["_enriched"] — populated when coordinates resolve, an all-null
|
||||||
|
bundle when they don't (e.g. removal tombstones with no lat/lon). Adapters that
|
||||||
|
declare no enrichment_locations are still skipped entirely.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from central.config_models import EnrichmentConfig
|
||||||
|
from central.enrichment.cache import EnrichmentCache
|
||||||
|
from central.enrichment.geocoder import GeocoderEnricher, all_null_bundle
|
||||||
|
from central.models import Event, Geo
|
||||||
|
from central.supervisor import apply_enrichment, build_enrichers
|
||||||
|
|
||||||
|
|
||||||
|
def _make_event(data: dict[str, Any]) -> Event:
|
||||||
|
return Event(
|
||||||
|
id="evt-1",
|
||||||
|
adapter="usgs_quake",
|
||||||
|
category="quake.event.test",
|
||||||
|
time=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||||
|
geo=Geo(),
|
||||||
|
data=data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _PopulatingBackend:
|
||||||
|
"""Deterministic backend that resolves any real coords to a fixed place."""
|
||||||
|
|
||||||
|
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||||
|
return {**all_null_bundle(), "city": "Boise", "state": "ID"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_coordless_event_with_declared_locations_gets_null_bundle(tmp_path):
|
||||||
|
"""An event whose declared coord paths are all None still gets _enriched."""
|
||||||
|
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||||
|
enrichers = build_enrichers(EnrichmentConfig(), cache)
|
||||||
|
event = _make_event(
|
||||||
|
{"latitude": None, "longitude": None, "reason": "fallen_off_current_service"}
|
||||||
|
)
|
||||||
|
assert "_enriched" not in event.data
|
||||||
|
|
||||||
|
await apply_enrichment(event, [("latitude", "longitude")], enrichers)
|
||||||
|
|
||||||
|
assert event.data["_enriched"]["geocoder"] == all_null_bundle()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_event_with_coords_still_enriches_normally(tmp_path):
|
||||||
|
"""The coord-bearing path is unchanged: the backend is consulted and its
|
||||||
|
resolved fields land in the bundle."""
|
||||||
|
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||||
|
enricher = GeocoderEnricher(_PopulatingBackend(), cache=cache)
|
||||||
|
event = _make_event({"latitude": 43.0, "longitude": -116.0})
|
||||||
|
|
||||||
|
await apply_enrichment(event, [("latitude", "longitude")], [enricher])
|
||||||
|
|
||||||
|
bundle = event.data["_enriched"]["geocoder"]
|
||||||
|
assert bundle["state"] == "ID"
|
||||||
|
assert bundle["city"] == "Boise"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adapter_with_no_enrichment_locations_still_skipped(tmp_path):
|
||||||
|
"""Adapters declaring no enrichment_locations are skipped — no _enriched."""
|
||||||
|
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||||
|
enrichers = build_enrichers(EnrichmentConfig(), cache)
|
||||||
|
event = _make_event({"latitude": 43.0, "longitude": -116.0})
|
||||||
|
|
||||||
|
await apply_enrichment(event, [], enrichers)
|
||||||
|
|
||||||
|
assert "_enriched" not in event.data
|
||||||
Loading…
Add table
Add a link
Reference in a new issue