From 11e37c4f48823843c8c3bee02fdd543ffbe1ba3f Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 06:12:47 +0000 Subject: [PATCH] fix(central): v0.4 D.2 -- remap Central adapter names to meshai source for consistent dashboard attribution Phase D catalogued a source-name divergence: central-sourced events carried Central's adapter name (wfigs_incidents, nwis, swpc_alerts, wzdx) rather than meshai's native source (fires, usgs, swpc, traffic), so the C.2 family-tab per-adapter event filtering (which keys on the native source name) wouldn't group central events under the right adapter. Fix: CENTRAL_ADAPTER_TO_SOURCE table in consumer.py; normalize() now remaps inner Event.adapter -> meshai source, falling back to the literal adapter name for anything not in the table (logged at DEBUG when a translation happens). before -> after (Event.source): wfigs_incidents / wfigs_perimeters -> fires nwis -> usgs swpc_alerts / swpc_kindex / swpc_protons -> swpc wzdx -> traffic nws, usgs_quake, firms -> unchanged (1:1, omitted from table) unknown (e.g. experimental_foo) -> passthrough as-is Tests: tests/test_central_consumer.py parametrized test_central_adapter_source_remap (6 cases: 4 remaps + nws passthrough + unknown passthrough). Full suite: 283 passed. In-prod verify (rebuilt, ephemeral probe over real Central data): the four observed adapters now normalize to source=fires/usgs/swpc/traffic; nws passes through. No live flip needed; container stays native baseline + healthy. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/central/consumer.py | 21 ++++++++++++++++++++- tests/test_central_consumer.py | 25 +++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index d65062d..d888340 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -46,6 +46,21 @@ ADAPTER_SUBJECTS: dict[str, list[str]] = { "traffic": ["central.traffic.>"], } +# Bridge between Central's adapter taxonomy and meshai's family-tab source names. +# Central names some adapters differently (e.g. "wfigs_incidents" vs meshai's +# "fires"); remap so dashboard per-adapter event filtering (which keys on the +# native source name) works whether a feed is native or central. 1:1 names +# (nws, usgs_quake, firms) are intentionally omitted -> passthrough. +CENTRAL_ADAPTER_TO_SOURCE: dict[str, str] = { + "wfigs_incidents": "fires", + "wfigs_perimeters": "fires", + "nwis": "usgs", + "swpc_alerts": "swpc", + "swpc_kindex": "swpc", + "swpc_protons": "swpc", + "wzdx": "traffic", +} + # Central hierarchical category prefix -> meshai flat category. # First matching prefix wins; order matters (most specific first). _CATEGORY_MAP: list[tuple[str, str]] = [ @@ -212,8 +227,12 @@ class CentralConsumer: if exp is not None: kwargs["expires"] = exp + raw_adapter = inner.get("adapter") or "central" + source = CENTRAL_ADAPTER_TO_SOURCE.get(raw_adapter, raw_adapter) + if source != raw_adapter: + logger.debug("Central adapter %r -> meshai source %r", raw_adapter, source) return make_event( - source=inner.get("adapter") or "central", + source=source, category=category, severity=map_severity(inner.get("severity")), **kwargs, diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index f923bcb..751bf0b 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -163,3 +163,28 @@ def test_subject_domain_fallback_for_unmapped_category(): "data": {"road": "I-44"}}} ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode()) assert ev is not None and ev.category == "traffic_congestion" + + +@pytest.mark.parametrize("adapter,expected", [ + ("wfigs_incidents", "fires"), + ("nwis", "usgs"), + ("swpc_alerts", "swpc"), + ("wzdx", "traffic"), + ("nws", "nws"), # 1:1 passthrough + ("experimental_foo", "experimental_foo"), # unknown -> passthrough +]) +def test_central_adapter_source_remap(adapter, expected): + """D.2: Central adapter names map to meshai source names (unknown passes through).""" + import json + from meshai.central.consumer import CentralConsumer + from meshai.config import EnvironmentalConfig + from meshai.notifications.pipeline.bus import EventBus + rec = [] + bus = EventBus(); bus.subscribe(rec.append) + c = CentralConsumer(EnvironmentalConfig(), bus) + env = {"id": "e1", "data": {"id": "e1", "adapter": adapter, "category": "wx.alert.x", + "time": "2026-05-28T00:00:00Z", "severity": 1, + "geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]}, + "data": {}}} + ev = c._handle("central.wx.alert.x", json.dumps(env).encode()) + assert ev is not None and ev.source == expected