fix(central): v0.4 D.2 -- remap Central adapter names to meshai source for consistent dashboard attribution

Phase D catalogued a source-name divergence: central-sourced events carried
Central's adapter name (wfigs_incidents, nwis, swpc_alerts, wzdx) rather than
meshai's native source (fires, usgs, swpc, traffic), so the C.2 family-tab
per-adapter event filtering (which keys on the native source name) wouldn't
group central events under the right adapter.

Fix: CENTRAL_ADAPTER_TO_SOURCE table in consumer.py; normalize() now remaps
inner Event.adapter -> meshai source, falling back to the literal adapter name
for anything not in the table (logged at DEBUG when a translation happens).

before -> after (Event.source):
  wfigs_incidents / wfigs_perimeters -> fires
  nwis                               -> usgs
  swpc_alerts / swpc_kindex / swpc_protons -> swpc
  wzdx                               -> traffic
  nws, usgs_quake, firms             -> unchanged (1:1, omitted from table)
  unknown (e.g. experimental_foo)    -> passthrough as-is

Tests: tests/test_central_consumer.py parametrized test_central_adapter_source_remap
(6 cases: 4 remaps + nws passthrough + unknown passthrough). Full suite: 283 passed.

In-prod verify (rebuilt, ephemeral probe over real Central data): the four
observed adapters now normalize to source=fires/usgs/swpc/traffic; nws passes
through. No live flip needed; container stays native baseline + healthy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-28 06:12:47 +00:00
commit 11e37c4f48
2 changed files with 45 additions and 1 deletions

View file

@ -46,6 +46,21 @@ ADAPTER_SUBJECTS: dict[str, list[str]] = {
"traffic": ["central.traffic.>"], "traffic": ["central.traffic.>"],
} }
# Bridge between Central's adapter taxonomy and meshai's family-tab source names.
# Central names some adapters differently (e.g. "wfigs_incidents" vs meshai's
# "fires"); remap so dashboard per-adapter event filtering (which keys on the
# native source name) works whether a feed is native or central. 1:1 names
# (nws, usgs_quake, firms) are intentionally omitted -> passthrough.
CENTRAL_ADAPTER_TO_SOURCE: dict[str, str] = {
"wfigs_incidents": "fires",
"wfigs_perimeters": "fires",
"nwis": "usgs",
"swpc_alerts": "swpc",
"swpc_kindex": "swpc",
"swpc_protons": "swpc",
"wzdx": "traffic",
}
# Central hierarchical category prefix -> meshai flat category. # Central hierarchical category prefix -> meshai flat category.
# First matching prefix wins; order matters (most specific first). # First matching prefix wins; order matters (most specific first).
_CATEGORY_MAP: list[tuple[str, str]] = [ _CATEGORY_MAP: list[tuple[str, str]] = [
@ -212,8 +227,12 @@ class CentralConsumer:
if exp is not None: if exp is not None:
kwargs["expires"] = exp kwargs["expires"] = exp
raw_adapter = inner.get("adapter") or "central"
source = CENTRAL_ADAPTER_TO_SOURCE.get(raw_adapter, raw_adapter)
if source != raw_adapter:
logger.debug("Central adapter %r -> meshai source %r", raw_adapter, source)
return make_event( return make_event(
source=inner.get("adapter") or "central", source=source,
category=category, category=category,
severity=map_severity(inner.get("severity")), severity=map_severity(inner.get("severity")),
**kwargs, **kwargs,

View file

@ -163,3 +163,28 @@ def test_subject_domain_fallback_for_unmapped_category():
"data": {"road": "I-44"}}} "data": {"road": "I-44"}}}
ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode()) ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode())
assert ev is not None and ev.category == "traffic_congestion" assert ev is not None and ev.category == "traffic_congestion"
@pytest.mark.parametrize("adapter,expected", [
("wfigs_incidents", "fires"),
("nwis", "usgs"),
("swpc_alerts", "swpc"),
("wzdx", "traffic"),
("nws", "nws"), # 1:1 passthrough
("experimental_foo", "experimental_foo"), # unknown -> passthrough
])
def test_central_adapter_source_remap(adapter, expected):
"""D.2: Central adapter names map to meshai source names (unknown passes through)."""
import json
from meshai.central.consumer import CentralConsumer
from meshai.config import EnvironmentalConfig
from meshai.notifications.pipeline.bus import EventBus
rec = []
bus = EventBus(); bus.subscribe(rec.append)
c = CentralConsumer(EnvironmentalConfig(), bus)
env = {"id": "e1", "data": {"id": "e1", "adapter": adapter, "category": "wx.alert.x",
"time": "2026-05-28T00:00:00Z", "severity": 1,
"geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]},
"data": {}}}
ev = c._handle("central.wx.alert.x", json.dumps(env).encode())
assert ev is not None and ev.source == expected