diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index d65062d..d888340 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -46,6 +46,21 @@ ADAPTER_SUBJECTS: dict[str, list[str]] = { "traffic": ["central.traffic.>"], } +# Bridge between Central's adapter taxonomy and meshai's family-tab source names. +# Central names some adapters differently (e.g. "wfigs_incidents" vs meshai's +# "fires"); remap so dashboard per-adapter event filtering (which keys on the +# native source name) works whether a feed is native or central. 1:1 names +# (nws, usgs_quake, firms) are intentionally omitted -> passthrough. +CENTRAL_ADAPTER_TO_SOURCE: dict[str, str] = { + "wfigs_incidents": "fires", + "wfigs_perimeters": "fires", + "nwis": "usgs", + "swpc_alerts": "swpc", + "swpc_kindex": "swpc", + "swpc_protons": "swpc", + "wzdx": "traffic", +} + # Central hierarchical category prefix -> meshai flat category. # First matching prefix wins; order matters (most specific first). _CATEGORY_MAP: list[tuple[str, str]] = [ @@ -212,8 +227,12 @@ class CentralConsumer: if exp is not None: kwargs["expires"] = exp + raw_adapter = inner.get("adapter") or "central" + source = CENTRAL_ADAPTER_TO_SOURCE.get(raw_adapter, raw_adapter) + if source != raw_adapter: + logger.debug("Central adapter %r -> meshai source %r", raw_adapter, source) return make_event( - source=inner.get("adapter") or "central", + source=source, category=category, severity=map_severity(inner.get("severity")), **kwargs, diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index f923bcb..751bf0b 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -163,3 +163,28 @@ def test_subject_domain_fallback_for_unmapped_category(): "data": {"road": "I-44"}}} ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode()) assert ev is not None and ev.category == "traffic_congestion" + + +@pytest.mark.parametrize("adapter,expected", [ + ("wfigs_incidents", "fires"), + ("nwis", "usgs"), + ("swpc_alerts", "swpc"), + ("wzdx", "traffic"), + ("nws", "nws"), # 1:1 passthrough + ("experimental_foo", "experimental_foo"), # unknown -> passthrough +]) +def test_central_adapter_source_remap(adapter, expected): + """D.2: Central adapter names map to meshai source names (unknown passes through).""" + import json + from meshai.central.consumer import CentralConsumer + from meshai.config import EnvironmentalConfig + from meshai.notifications.pipeline.bus import EventBus + rec = [] + bus = EventBus(); bus.subscribe(rec.append) + c = CentralConsumer(EnvironmentalConfig(), bus) + env = {"id": "e1", "data": {"id": "e1", "adapter": adapter, "category": "wx.alert.x", + "time": "2026-05-28T00:00:00Z", "severity": 1, + "geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]}, + "data": {}}} + ev = c._handle("central.wx.alert.x", json.dumps(env).encode()) + assert ev is not None and ev.source == expected