From ea0c68097a7a4a97e71ce05ad1f3971fc3882395 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 05:05:12 +0000 Subject: [PATCH] fix(central): v0.4 D.1 -- subject-domain category fallback (traffic 'work_zone.wzdx' was mapping to 'other') Surfaced during the Phase D rollout flipping all five remaining domains to central. Central's traffic categories are NOT domain-prefixed -- the inner Event.category for a work zone is "work_zone.wzdx", not "traffic.work_zone". The prefix table in map_category therefore missed and returned "other", which would break category-based routing/digest grouping for central-sourced traffic. before: map_category("work_zone.wzdx") -> "other" after: when the category table misses, fall back to the stable subject domain token (central..<...>): central.traffic.* -> traffic_congestion. Added category_from_subject() + a domain->category map (wx, fire, quake, hydro, space, disaster, traffic, traffic_flow, traffic_cameras). The well-prefixed domains (wx.alert, fire.incident, hydro., space.alert) still match the primary table; the fallback only fires on a miss, so a known domain never yields "other" again. Test: tests/test_central_consumer.py gains test_subject_domain_fallback_for_unmapped_category (category_from_subject + a 'work_zone.wzdx' message -> traffic_congestion). Full suite: 277 passed. Verified in prod (rebuilt, all 5 flipped to central): the per-domain LAST_PER_SUBJECT normalize probe now shows traffic -> category=traffic_congestion (was 'other'); the other four domains unchanged and clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/central/consumer.py | 29 ++++++++++++++++++++++++++++- tests/test_central_consumer.py | 19 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index ce8003d..d65062d 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -77,6 +77,30 @@ def map_category(central_category: str) -> str: return "other" +# Subject-domain fallback: some Central categories are not domain-prefixed +# (e.g. traffic's "work_zone.wzdx"), so when the category table misses we map by +# the stable subject domain token (central..<...>) instead of "other". +_SUBJECT_DOMAIN_CATEGORY = { + "wx": "weather_warning", + "fire": "wildfire_incident", + "quake": "earthquake_event", + "hydro": "stream_flow", + "space": "geomagnetic_storm", + "disaster": "disaster_event", + "traffic": "traffic_congestion", + "traffic_flow": "traffic_flow", + "traffic_cameras": "traffic_camera", +} + + +def category_from_subject(subject: str) -> Optional[str]: + """Map a NATS subject (central..<...>) to a meshai category.""" + parts = (subject or "").split(".") + if len(parts) >= 2 and parts[0] == "central": + return _SUBJECT_DOMAIN_CATEGORY.get(parts[1]) + return None + + def map_severity(sev: Optional[int]) -> str: """Central int severity (0-4 / None) -> meshai severity string. @@ -152,6 +176,9 @@ class CentralConsumer: group_key = re.sub(r":removed$", "", group_key) cat_raw = inner.get("category") or envelope.get("centralcategory") or "" + category = map_category(cat_raw) + if category == "other": + category = category_from_subject(subject) or "other" geo = inner.get("geo") or {} lat = lon = None @@ -187,7 +214,7 @@ class CentralConsumer: return make_event( source=inner.get("adapter") or "central", - category=map_category(cat_raw), + category=category, severity=map_severity(inner.get("severity")), **kwargs, ) diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index a3c716a..f923bcb 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -144,3 +144,22 @@ def test_consumer_config_uses_deliver_policy_new(): from meshai.central.consumer import consumer_config from nats.js.api import DeliverPolicy assert consumer_config().deliver_policy == DeliverPolicy.NEW + + +def test_subject_domain_fallback_for_unmapped_category(): + """D.1: an unmapped category (traffic 'work_zone.wzdx') falls back to the + subject domain instead of returning 'other'.""" + import json + from meshai.central.consumer import CentralConsumer, category_from_subject + from meshai.config import EnvironmentalConfig + from meshai.notifications.pipeline.bus import EventBus + assert category_from_subject("central.traffic.work_zone.ok") == "traffic_congestion" + rec = [] + bus = EventBus(); bus.subscribe(rec.append) + c = CentralConsumer(EnvironmentalConfig(), bus) + env = {"id": "wz1", "data": {"id": "wz1", "adapter": "wzdx", + "category": "work_zone.wzdx", "time": "2026-05-28T00:00:00Z", "severity": 1, + "geo": {"centroid": [-96.2, 36.15], "primary_region": "US-OK", "regions": ["US-OK"]}, + "data": {"road": "I-44"}}} + ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode()) + assert ev is not None and ev.category == "traffic_congestion"