From 14d168822bee6d06d09755c995802de17ffe8b7d Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Thu, 4 Jun 2026 06:10:12 +0000 Subject: [PATCH] fix(traffic): v0.5.7-traffic -- NATS pattern fix + itd_511 sub-adapter routing + categories audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second family of the v0.5.7 NATS-and-categories campaign. Weather went first because its NWS pattern was already legal; traffic was carrying invalid NATS syntax in production. FIX 1 -- Invalid `>` mid-subject in traffic. Pre-v0.5.7-traffic the subject builder shipped `central.traffic.>.{state}` for both the traffic and roads511 adapters. NATS rules say `>` is only legal at the tail token; mid-subject `>` is rejected by the broker at subscribe time (or silently delivers nothing depending on server version). Replaced with Convention B (per Central v0.10.0 meshai_integration_guide.md): single-token `*` in the event_type slot, bare state suffix -- `central.traffic.*.id` for Idaho. Shared by the wzdx, tomtom_incidents and state_511_atis adapters. FIX 2 -- roads511 dual subscribe. The new Idaho-only itd_511 adapter in Central v0.10.0 uses Convention A (`central.traffic..us.`, the us. form). Convention B (bare state) is shared with the rest of the traffic family. roads511 now owns BOTH: central.traffic.*.id (Convention B, shared with traffic via _subject_owned) central.traffic.*.us.id (Convention A, itd_511-only) Sub-adapter routing in CentralConsumer._subject_owned (v0.5.1) already keeps shared subjects scoped to the right meshai source -- no change needed. FIX 3 -- itd_511 -> roads511 in CENTRAL_ADAPTER_TO_SOURCE. Mirrors state_511_atis (added v0.5.3). Both Idaho 511 feeds collapse to a single meshai source for UX simplicity; future v0.6 may split them if Matt needs differential rules. FIX 4 -- Roads-family categories audit + finer event_type mapping. Pre-v0.5.7-traffic the central path flattened every traffic-domain event to `traffic_congestion` because work_zone / incident / closure had no entries in _CATEGORY_MAP and fell through to the `traffic.` catchall (then the subject-domain fallback). Added three explicit map entries before the catchall: ("work_zone", "work_zone") # catches "work_zone" and "work_zone.wzdx" ("incident", "road_incident") # catches incident.tomtom_incidents + bare ("closure", "road_closure") # catches closure + closure.itd_511 ALERT_CATEGORIES gains two new roads-family entries so the Advanced Rules editor can target them: work_zone -- Active construction/maintenance work zone road_incident -- Reported incident (crash, hazard, debris) Existing entries `road_closure` and `traffic_congestion` kept. composer._CATEGORY_EMOJI gains matching glyphs (🚧 work_zone, 🚨 road_incident) so the live LoRa rendering lines up with the category example_message glyphs. Audit cross-check (test_alert_categories_roads_complete enforces parity): Native emit: traffic.py -> traffic_congestion; roads511.py -> road_closure Central path emit (via map_category): {road_closure, traffic_congestion, work_zone, road_incident} ALERT_CATEGORIES{toggle=roads}: {road_closure, traffic_congestion, work_zone, road_incident} Parity. No orphans, no missing. DEFERRED to v0.5.8: itd_511_cameras / traffic_cameras stream lives at a different subject domain (central.traffic_cameras.>) and needs a new meshai source (roads_cameras or similar). Out of scope for v0.5.7. Tests ----- PYTHONPATH=. pytest -q: 366 passed (was 345; +21 net). - tests/test_traffic_v057.py (new): NATS-syntax checks (`>` only at tail, single-token `*`), traffic Convention B, roads511 dual-subscribe, shared bare-state subject, itd_511 + state_511_atis remap, map_category event_type preservation, ALERT_CATEGORIES roads parity (reflection-based scan of native emit + central path), required-fields check on the four roads entries. - tests/test_central_region_routing.py: updated `test_subjects_for_traffic_and_roads511_share_state_token` -> two new tests covering Convention B (traffic) and dual-subscribe (roads511). - tests/test_central_consumer.py: updated `test_subject_domain_fallback_for_unmapped_category` (work_zone.wzdx is now mapped, switched to a genuinely-unmapped category) + new `test_v057_traffic_work_zone_now_mapped` asserting wzdx envelopes land on ev.category=="work_zone". Safe-mode preserved (master off, all family toggles off, all adapters native, central disabled). No live toggle flipped. Not tagging yet -- v0.5.7 tag waits until all families ship. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/central/consumer.py | 41 ++++- meshai/notifications/categories.py | 19 +++ meshai/notifications/renderers/composer.py | 2 + tests/test_central_consumer.py | 30 +++- tests/test_central_region_routing.py | 20 ++- tests/test_traffic_v057.py | 177 +++++++++++++++++++++ 6 files changed, 278 insertions(+), 11 deletions(-) create mode 100644 tests/test_traffic_v057.py diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index fe7785e..4f248d1 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -64,16 +64,28 @@ def _subjects_for(adapter: str, region: Optional[str]) -> list[str]: - region BEFORE the wildcard (nws): central.wx.alert.us.id.> - - region AFTER the wildcard (quake / firms / usgs / traffic): + - region AFTER the wildcard (quake / firms / usgs): central.quake.event.>.us.id central.fire.hotspot.>.us.id central.hydro.>.us.id (+ ".unknown" workaround, see below) - central.traffic.>.id (state only, no us. prefix) - state-only token at a fixed depth (fires): central.fire.incident..> central.fire.perimeter..> + - traffic family — Convention B, bare state, no wildcard: + central.traffic..id (wzdx, tomtom_incidents, + state_511_atis) + - traffic family — Convention A, us.: + central.traffic..us.id (itd_511, Idaho-only) - region ignored (swpc) — space weather is planetary. + NATS rule: `>` is only legal at the tail. Pre-v0.5.7-traffic this file + shipped `central.traffic.>.{state}` for traffic+roads511, which was + syntactically invalid (`>` mid-subject). Fixed by switching to single- + token `*` wildcards for the per-event-type slot. roads511 now owns + BOTH the bare-state (Convention B, shared with traffic) and the + us. (Convention A, itd_511-only) subjects so itd_511 events + attribute to roads511 in meshai. + The .unknown workaround: v0.9.20 leaves USGS hydro events whose gauge state can't be inferred on `central.hydro.>.unknown`. Subscribing to both avoids losing those rows until v0.9.20.1 backfills the state tag. @@ -93,8 +105,16 @@ def _subjects_for(adapter: str, region: Optional[str]) -> list[str]: "usgs": [f"central.hydro.>.{region}", "central.hydro.>.unknown"], "swpc": ["central.space.>"], - "traffic": [f"central.traffic.>.{state}"], - "roads511": [f"central.traffic.>.{state}"], # shared with traffic + # Convention B (bare state) — shared by traffic family (wzdx, + # tomtom_incidents, state_511_atis). Single-token `*` matches the + # event_type slot; `>` was illegal here. + "traffic": [f"central.traffic.*.{state}"], + # roads511 dual-subscribes: bare state (shared with traffic) + the + # us. form that the new itd_511 Idaho-only adapter publishes + # (Convention A). Sub-adapter routing (_subject_owned) keeps the + # shared bare-state subject scoped to both source names. + "roads511": [f"central.traffic.*.{state}", + f"central.traffic.*.{region}"], } return list(table.get(adapter, [])) @@ -113,6 +133,11 @@ CENTRAL_ADAPTER_TO_SOURCE: dict[str, str] = { "wzdx": "traffic", "tomtom_incidents": "traffic", "state_511_atis": "roads511", + # v0.5.7-traffic: itd_511 is the new Idaho-only Central adapter + # (Convention A publishing). Routes to meshai's roads511 source so + # ALERT_CATEGORIES roads-family rules cover both 511 feeds. A future + # v0.6 may split them; for now collapsed for UX simplicity. + "itd_511": "roads511", "firms": "firms", } @@ -134,6 +159,14 @@ _CATEGORY_MAP: list[tuple[str, str]] = [ ("disaster.", "disaster_event"), ("traffic_flow", "traffic_flow"), ("traffic_cameras", "traffic_camera"), + # v0.5.7-traffic: preserve traffic event_type distinctions instead of + # flattening to traffic_congestion. Central publishes category strings + # like "work_zone.wzdx", "incident.tomtom_incidents", "closure" (raw + # from state_511_atis / itd_511). startswith() catches both the bare + # form and the "." suffixed form. + ("work_zone", "work_zone"), + ("incident", "road_incident"), + ("closure", "road_closure"), ("traffic.", "traffic_congestion"), ] diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 7f55b0a..8028226 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -296,6 +296,11 @@ ALERT_CATEGORIES = { }, # Environmental - Roads + # v0.5.7-traffic audit (test_alert_categories_roads_complete enforces parity): + # Native: traffic.py -> traffic_congestion; roads511.py -> road_closure. + # Central path (via map_category): work_zone (wzdx), road_incident + # (tomtom_incidents + state_511_atis/itd_511 'incident'), road_closure + # (state_511_atis/itd_511 'closure'), traffic_congestion (traffic. catchall). "road_closure": { "name": "Road Closure", "description": "Full road closure on a monitored corridor", @@ -310,6 +315,20 @@ ALERT_CATEGORIES = { "example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio", "toggle": "roads", }, + "work_zone": { + "name": "Work Zone", + "description": "Active construction or maintenance work zone affecting traffic — possible lane closures, reduced speed, or detour", + "default_severity": "routine", + "example_message": "🚧 Work Zone: I-84 EB MP 168-173 — right lane closed, 55 mph zone. Expect delays.", + "toggle": "roads", + }, + "road_incident": { + "name": "Road Incident", + "description": "Reported incident on a monitored corridor (crash, disabled vehicle, debris, hazard)", + "default_severity": "priority", + "example_message": "🚨 Road Incident: US-93 NB at MP 47 — crash blocking left lane, expect 30-min delay.", + "toggle": "roads", + }, # Environmental - Avalanche "avalanche_warning": { diff --git a/meshai/notifications/renderers/composer.py b/meshai/notifications/renderers/composer.py index 099b074..3569f06 100644 --- a/meshai/notifications/renderers/composer.py +++ b/meshai/notifications/renderers/composer.py @@ -54,6 +54,8 @@ _CATEGORY_EMOJI: dict[str, str] = { # Roads "road_closure": "🚧", "traffic_congestion": "🚗", + "work_zone": "🚧", + "road_incident": "🚨", # Avalanche "avalanche_warning": "⛷", "avalanche_considerable": "⛷", diff --git a/tests/test_central_consumer.py b/tests/test_central_consumer.py index c28f93d..0f1385f 100644 --- a/tests/test_central_consumer.py +++ b/tests/test_central_consumer.py @@ -150,8 +150,14 @@ def test_consumer_config_uses_deliver_policy_new(): def test_subject_domain_fallback_for_unmapped_category(): - """D.1: an unmapped category (traffic 'work_zone.wzdx') falls back to the - subject domain instead of returning 'other'.""" + """D.1: an unmapped category falls back to the subject domain instead + of returning 'other'. + + v0.5.7-traffic note: 'work_zone.wzdx' is now MAPPED (-> 'work_zone'), + so we use a genuinely-unmapped category string here to exercise the + fallback path. The subject-domain fallback for central.traffic.* is + still 'traffic_congestion'. + """ import json from meshai.central.consumer import CentralConsumer, category_from_subject from meshai.config import EnvironmentalConfig @@ -161,13 +167,31 @@ def test_subject_domain_fallback_for_unmapped_category(): bus = EventBus(); bus.subscribe(rec.append) c = CentralConsumer(EnvironmentalConfig(), bus) env = {"id": "wz1", "data": {"id": "wz1", "adapter": "wzdx", - "category": "work_zone.wzdx", "time": "2026-05-28T00:00:00Z", "severity": 1, + "category": "telematics.unknown_thing", "time": "2026-05-28T00:00:00Z", "severity": 1, "geo": {"centroid": [-96.2, 36.15], "primary_region": "US-OK", "regions": ["US-OK"]}, "data": {"road": "I-44"}}} ev = c._handle("central.traffic.work_zone.ok", json.dumps(env).encode()) assert ev is not None and ev.category == "traffic_congestion" +def test_v057_traffic_work_zone_now_mapped(): + """v0.5.7-traffic: 'work_zone.wzdx' maps to the new 'work_zone' meshai + category (not flattened to traffic_congestion).""" + import json + from meshai.central.consumer import CentralConsumer + from meshai.config import EnvironmentalConfig + from meshai.notifications.pipeline.bus import EventBus + rec = [] + bus = EventBus(); bus.subscribe(rec.append) + c = CentralConsumer(EnvironmentalConfig(), bus) + env = {"id": "wz2", "data": {"id": "wz2", "adapter": "wzdx", + "category": "work_zone.wzdx", "time": "2026-05-28T00:00:00Z", "severity": 1, + "geo": {"centroid": [-114.0, 42.0], "primary_region": "US-ID", "regions": ["US-ID"]}, + "data": {"road": "I-84"}}} + ev = c._handle("central.traffic.work_zone.id", json.dumps(env).encode()) + assert ev is not None and ev.category == "work_zone" + + @pytest.mark.parametrize("adapter,expected", [ ("wfigs_incidents", "fires"), ("nwis", "usgs"), diff --git a/tests/test_central_region_routing.py b/tests/test_central_region_routing.py index d29aaf7..1c69ad0 100644 --- a/tests/test_central_region_routing.py +++ b/tests/test_central_region_routing.py @@ -39,10 +39,22 @@ def test_subjects_for_fires_us_id_uses_state_token(): ] -def test_subjects_for_traffic_and_roads511_share_state_token(): - """Traffic family: bare-state suffix (no us. prefix), shared by both adapters.""" - assert _subjects_for("traffic", "us.id") == ["central.traffic.>.id"] - assert _subjects_for("roads511", "us.id") == ["central.traffic.>.id"] +def test_subjects_for_traffic_uses_convention_b(): + """v0.5.7-traffic: traffic adapter -> bare-state Convention B with `*` + in the event_type slot. Pre-v0.5.7-traffic this was `>.{state}` which + is invalid NATS (`>` must be at the tail). The bare-state subject is + shared with roads511 (sub-adapter routing picks the right meshai source).""" + assert _subjects_for("traffic", "us.id") == ["central.traffic.*.id"] + + +def test_subjects_for_roads511_dual_subscribes_convention_a_and_b(): + """v0.5.7-traffic: roads511 owns BOTH the shared bare-state subject + (Convention B, shared with traffic) AND the us. subject + (Convention A) where the new Idaho-only itd_511 adapter publishes.""" + assert _subjects_for("roads511", "us.id") == [ + "central.traffic.*.id", + "central.traffic.*.us.id", + ] def test_subjects_for_usgs_includes_unknown_workaround(): diff --git a/tests/test_traffic_v057.py b/tests/test_traffic_v057.py new file mode 100644 index 0000000..ae1e1d3 --- /dev/null +++ b/tests/test_traffic_v057.py @@ -0,0 +1,177 @@ +"""v0.5.7-traffic: NATS pattern fix + itd_511 sub-adapter routing + categories audit. + +Covers four things shipped in v0.5.7-traffic: + +1. NATS pattern syntax — `>` is legal only at the tail. Pre-v0.5.7-traffic + we shipped `central.traffic.>.` (mid-subject `>`), invalid per + NATS rules. Now: `central.traffic.*.` (Convention B, bare state) + for traffic; roads511 dual-subscribes both Convention B and + `central.traffic.*.us.` (Convention A, itd_511 form). +2. roads511 dual subscription — owns both shared bare-state and us. + subjects so itd_511 events route to the roads511 source in meshai. +3. CENTRAL_ADAPTER_TO_SOURCE['itd_511'] == 'roads511'. +4. ALERT_CATEGORIES roads-family parity — every category we can emit + (native + central path post-map_category) has a registry entry. +""" + +import inspect + +import pytest + +from meshai.central.consumer import ( + CENTRAL_ADAPTER_TO_SOURCE, + _SUBJECTS_BARE, + _subjects_for, + map_category, +) +from meshai.notifications.categories import ALERT_CATEGORIES + + +# ---------- NATS pattern validation (Convention A / B) --------------------- + + +def _assert_legal_nats(subject: str) -> None: + """Assert NATS multi-level wildcard `>` only appears at the tail token.""" + tokens = subject.split(".") + if ">" in tokens: + assert tokens[-1] == ">", f"`>` not at tail in {subject!r}" + assert tokens.count(">") == 1, f"multiple `>` in {subject!r}" + for tok in tokens: + # `*` and `>` are wildcards; everything else must be a non-empty + # token without further wildcard characters mixed in. + assert tok, f"empty token in {subject!r}" + if tok not in {"*", ">"}: + assert "*" not in tok and ">" not in tok, f"mixed wildcard in token {tok!r}" + + +def test_subjects_for_traffic_uses_convention_b(): + """traffic adapter -> bare-state Convention B; no `>` anywhere.""" + subs = _subjects_for("traffic", "us.id") + assert subs == ["central.traffic.*.id"] + for s in subs: + _assert_legal_nats(s) + assert ">" not in s, f"`>` in {s!r}" + + +def test_subjects_for_roads511_dual_subscribes(): + """roads511 owns bare-state (shared with traffic) AND us. (itd_511).""" + subs = _subjects_for("roads511", "us.id") + assert subs == ["central.traffic.*.id", "central.traffic.*.us.id"] + for s in subs: + _assert_legal_nats(s) + assert ">" not in s, f"`>` in {s!r}" + + +def test_traffic_and_roads511_share_convention_b_subject(): + """The bare-state subject is shared so sub-adapter routing kicks in.""" + traffic_subs = set(_subjects_for("traffic", "us.id")) + roads511_subs = set(_subjects_for("roads511", "us.id")) + shared = traffic_subs & roads511_subs + assert shared == {"central.traffic.*.id"} + + +def test_no_invalid_mid_subject_wildcards_in_traffic_family(): + """Sanity sweep, scoped to this phase: traffic + roads511 region-aware + subjects are NATS-legal (no `>` mid-subject). Other adapters (firms, + usgs, usgs_quake, fires, nws) carry the v0.5.4 mid-`>` patterns and + are intentionally OUT OF SCOPE for v0.5.7-traffic -- they'll be fixed + per-family later in the v0.5.7 campaign.""" + for adapter in ("traffic", "roads511"): + for s in _subjects_for(adapter, "us.id"): + _assert_legal_nats(s) + assert ">" not in s, f"`>` still present in {adapter} subject {s!r}" + + +def test_bare_form_unchanged_when_region_empty(): + """Empty region returns _SUBJECTS_BARE for backward compat.""" + assert _subjects_for("traffic", "") == ["central.traffic.>"] + assert _subjects_for("roads511", None) == ["central.traffic.>"] + + +# ---------- itd_511 -> roads511 remap -------------------------------------- + + +def test_itd_511_remaps_to_roads511(): + assert CENTRAL_ADAPTER_TO_SOURCE.get("itd_511") == "roads511" + + +def test_state_511_atis_still_remaps_to_roads511(): + """v0.5.3 mapping must survive the v0.5.7-traffic edit.""" + assert CENTRAL_ADAPTER_TO_SOURCE.get("state_511_atis") == "roads511" + + +# ---------- map_category preserves event_type distinctions ----------------- + + +@pytest.mark.parametrize("central_cat,expected", [ + ("work_zone.wzdx", "work_zone"), + ("work_zone", "work_zone"), + ("incident.tomtom_incidents", "road_incident"), + ("incident", "road_incident"), + ("closure.itd_511", "road_closure"), + ("closure", "road_closure"), + # The catchall still flattens unknown traffic.* shapes. + ("traffic.unknown_thing", "traffic_congestion"), +]) +def test_map_category_traffic_event_types(central_cat, expected): + assert map_category(central_cat) == expected + + +# ---------- ALERT_CATEGORIES roads-family parity --------------------------- + + +def _native_emitted_roads_categories() -> set[str]: + """Walk traffic.py and roads511.py for category= literals.""" + import re + from meshai.env import traffic as traffic_mod + from meshai.env import roads511 as roads511_mod + emitted: set[str] = set() + for mod in (traffic_mod, roads511_mod): + src = inspect.getsource(mod) + emitted |= set(re.findall(r'category="([a-z_]+)"', src)) + return emitted + + +def _central_path_roads_categories() -> set[str]: + """Categories the central path can deliver into the roads family. + + Drives off map_category() so the test breaks if the routing changes. + """ + central_inputs = [ + "work_zone.wzdx", + "incident.tomtom_incidents", + "closure.itd_511", + "closure", + "incident", + "traffic.flow_slow", + ] + return {map_category(c) for c in central_inputs} + + +def test_alert_categories_roads_complete(): + """Every category emitted by native traffic/roads511 OR delivered via + the central path (post-map_category) must have an ALERT_CATEGORIES + entry with toggle='roads'. No orphans. + """ + registry_roads = { + cid for cid, info in ALERT_CATEGORIES.items() + if info.get("toggle") == "roads" + } + emitted = _native_emitted_roads_categories() | _central_path_roads_categories() + missing = emitted - registry_roads + orphans = registry_roads - emitted + assert not missing, f"emit set has roads categories missing from ALERT_CATEGORIES: {missing}" + assert not orphans, f"ALERT_CATEGORIES has orphan roads entries: {orphans}" + + +@pytest.mark.parametrize( + "cat", + ["road_closure", "traffic_congestion", "work_zone", "road_incident"], +) +def test_roads_categories_have_required_fields(cat): + info = ALERT_CATEGORIES[cat] + assert info["toggle"] == "roads" + assert info["name"] + assert info["description"] + assert info["default_severity"] in {"routine", "priority", "immediate"} + assert info["example_message"]