diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index ce8003d..d65062d 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -77,6 +77,30 @@ def map_category(central_category: str) -> str: return "other" +# Subject-domain fallback: some Central categories are not domain-prefixed +# (e.g. traffic's "work_zone.wzdx"), so when the category table misses we map by +# the stable subject domain token (central..<...>) instead of "other". +_SUBJECT_DOMAIN_CATEGORY = { + "wx": "weather_warning", + "fire": "wildfire_incident", + "quake": "earthquake_event", + "hydro": "stream_flow", + "space": "geomagnetic_storm", + "disaster": "disaster_event", + "traffic": "traffic_congestion", + "traffic_flow": "traffic_flow", + "traffic_cameras": "traffic_camera", +} + + +def category_from_subject(subject: str) -> Optional[str]: + """Map a NATS subject (central..<...>) to a meshai category.""" + parts = (subject or "").split(".") + if len(parts) >= 2 and parts[0] == "central": + return _SUBJECT_DOMAIN_CATEGORY.get(parts[1]) + return None + + def map_severity(sev: Optional[int]) -> str: """Central int severity (0-4 / None) -> meshai severity string. @@ -152,6 +176,9 @@ class CentralConsumer: group_key = re.sub(r":removed$", "", group_key) cat_raw = inner.get("category") or envelope.get("centralcategory") or "" + category = map_category(cat_raw) + if category == "other": + category = category_from_subject(subject) or "other" geo = inner.get("geo") or {} lat = lon = None @@ -187,7 +214,7 @@ class CentralConsumer: return make_event( source=inner.get("adapter") or "central", - category=map_category(cat_raw), + category=category, severity=map_severity(inner.get("severity")), **kwargs, ) diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index a3c716a..f923bcb 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -144,3 +144,22 @@ def test_consumer_config_uses_deliver_policy_new(): from meshai.central.consumer import consumer_config from nats.js.api import DeliverPolicy assert consumer_config().deliver_policy == DeliverPolicy.NEW + + +def test_subject_domain_fallback_for_unmapped_category(): + """D.1: an unmapped category (traffic 'work_zone.wzdx') falls back to the + subject domain instead of returning 'other'.""" + import json + from meshai.central.consumer import CentralConsumer, category_from_subject + from meshai.config import EnvironmentalConfig + from meshai.notifications.pipeline.bus import EventBus + assert category_from_subject("central.traffic.work_zone.ok") == "traffic_congestion" + rec = [] + bus = EventBus(); bus.subscribe(rec.append) + c = CentralConsumer(EnvironmentalConfig(), bus) + env = {"id": "wz1", "data": {"id": "wz1", "adapter": "wzdx", + "category": "work_zone.wzdx", "time": "2026-05-28T00:00:00Z", "severity": 1, + "geo": {"centroid": [-96.2, 36.15], "primary_region": "US-OK", "regions": ["US-OK"]}, + "data": {"road": "I-44"}}} + ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode()) + assert ev is not None and ev.category == "traffic_congestion"