From 0099d0fd941ce78b136ee067416185a0d76e1e3f Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Fri, 5 Jun 2026 06:41:21 +0000 Subject: [PATCH] feat(v0.5.9): unified incident pipeline + state_511_atis Idaho cutover + two-sided freshness gate Coordinated change across the consumer dispatch layer + central_normalizer + new incident_handler + new itd_511 work_zone parser. The integrated story: Central PM filed a heads-up that ITD 511 publishes four EventTypes (work_zone, closure, incident, special_event) under us.id Convention A, and meshais v0.5.8 work focused on work_zone shape only -- meaning incidents (the higher-priority operational signal) were silently rendering as the v0.5.7-regression-style fallback. This v0.5.9 closes that gap by treating incidents + closures + special_events from all three traffic adapters (state_511_atis, itd_511, tomtom_incidents) as a single unified pipeline, while also migrating Idaho coverage from state_511_atis to itd_511 (the direct ITD feed) at the consumer level. Components: (1) new meshai/central/incident_handler.py routes incident/closure/special_event events through per-adapter parsers (tomtom, itd_511, state_511_atis-non-ID) to a canonical incident shape, then a single rendering pipeline with sub_type-aware emoji selection (jam/crash/road_closed/disabled_vehicle/parade/special_event/vehicle_fire/road_works). (2) Universal two-sided freshness gate in the consumer dispatch layer: only events with 0 <= age <= 1800s (default-allow on missing start_time) make it past the gate. Rejects both stale events (more than 30 min old) AND future-scheduled events (negative age -- a real itd_511 case for scheduled work projects). The gate sits ABOVE both incident_handler and the v0.5.8 work_zone formatter so all adapters get gated uniformly. (3) state_511_atis Idaho cutover -- both incident_handler and the v0.5.8 work_zone parser skip state_511_atis events where the state token is ID, deferring to itd_511 as the authoritative source. state_511_atis remains fully active for non-Idaho neighbor coverage (WA/OR/MT/UT/WY/NV) -- verified by Phase 2 WA broadcasts in the synthetic probe. (4) new itd_511 work_zone parser (extension to central_normalizer.py) consumes the itd_511 work_zone EventType and produces the same MEDIUM-style wire format as the existing state_511_atis work_zone parser (road + mile range + town + direction + sub_type + ends-at). (5) No Update: broadcasts in the incident pipeline -- per Matts call, real-time traffic Updates (jam getting worse, delay growing) are not actionable for mesh users. State tracking continues via traffic_events UPSERT but only the first sighting of an external_id ever fires a New: broadcast. WFIGS handler unchanged -- fires keep their 8h-rate-limited Update: behavior since acres growth IS operationally meaningful (evacuation decisions). Forecast: 3-10 mesh broadcasts/day in Idaho, all New:. Cross-check: original raw broadcast count was 623 against a fixed-clock 49-min synthetic window; after v0.5.9 REVISED (no Updates) it dropped to 18; after v0.5.9 GAMMA (two-sided gate + Idaho cutover) it dropped to 9. Test count: was 589 baseline, +45 net new -- 634 passing. Synthetic probe verified all four phases: Phase 1 (replay 3032 captured envelopes) = 0 broadcasts (correctly suppressed); Phase 2 (synthesized fresh non-ID + ID) = 7 broadcasts; Phase 3 (synthesized fresh itd_511 work_zone) = 2 broadcasts; Phase 4 (synthesized fresh ID for explicit ID-skip exercise) = caught by ID-skip 1/1. Master stays off in prod; no toggle flips. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/central/consumer.py | 90 ++- meshai/central/incident_handler.py | 745 +++++++++++++++++++++++++ meshai/central_normalizer.py | 168 ++++++ meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v2.sql | 31 ++ tests/test_central_normalizer.py | 129 +++++ tests/test_incident_handler.py | 798 +++++++++++++++++++++++++++ tests/test_itd_511_work_zone.py | 137 +++++ 8 files changed, 2088 insertions(+), 12 deletions(-) create mode 100644 meshai/central/incident_handler.py create mode 100644 meshai/persistence/migrations/v2.sql create mode 100644 tests/test_incident_handler.py create mode 100644 tests/test_itd_511_work_zone.py diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 1fb3bba..40416db 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -421,17 +421,85 @@ class CentralConsumer: try: from meshai.central_normalizer import normalize as _norm_envelope from meshai.notifications.renderers.work_zone import format_work_zone_mesh - n = _norm_envelope(envelope) - # v0.5.8 wfigs_handler dispatch -- WFIGS events route through - # the persistence-backed change-detection handler (which also - # logs to event_log for tombstones + perimeters). Other adapters - # with a normalized dict (state_511_atis, wzdx) flow through the - # work_zone renderer as before. - if n is not None and str(n.get("_kind", "")).startswith("wfigs"): - from meshai.central.wfigs_handler import handle_wfigs - synthesized = handle_wfigs(n, envelope, subject, data=data) or None - elif n is not None and category in ("work_zone", "road_closure", "road_incident"): - synthesized = format_work_zone_mesh(n) or None + # v0.5.9 unified incident pipeline -- tomtom_incidents + + # state_511_atis + itd_511 for incident/closure/special_event + # categories all flow through meshai.central.incident_handler. + # state_511_atis with category=work_zone stays on the v0.5.8 + # _parse_state_511_atis path below. + _adapter_v9 = inner.get("adapter") or "" + if ( + _adapter_v9 in ("tomtom_incidents", "state_511_atis", "itd_511") + and (cat_raw.startswith("incident.") + or cat_raw.startswith("closure.") + or cat_raw.startswith("special_event.")) + ): + from meshai.central.incident_handler import handle_incident + synthesized = handle_incident(envelope, subject, data=data) or None + else: + # v0.5.9 GAMMA: state_511_atis Idaho cutover via helper. + # Applies BEFORE normalize+dispatch so neither the work_zone + # renderer nor the incident_handler ever sees an ID-state_511 + # envelope. + from meshai.central_normalizer import ( + should_skip_state_511_atis_id as _skip_s5_id, + ) + # v0.5.9 GAMMA universal freshness gate -- applies to ALL + # incident-pipeline adapters BEFORE dispatch, so itd_511 + # work_zone (which goes through central_normalizer + + # format_work_zone_mesh, NOT handle_incident) is now also + # gated. Per-source field paths defined in the helper. + from meshai.central_normalizer import ( + is_incident_envelope_stale as _stale_check, + ) + if _stale_check(envelope, now=int(time.time())): + try: + from meshai.persistence import get_db as _get_db_st + _conn_st = _get_db_st() + _conn_st.execute( + "INSERT INTO event_log(received_at, source, " + "category, severity_word, event_id_external, " + "nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (int(time.time()), + inner.get("adapter") or "", + cat_raw + "|freshness_drop", None, + inner.get("id"), + subject, 0, None, None), + ) + except Exception: + logger.exception("freshness_drop log failed") + synthesized = None + n = None + elif _skip_s5_id(envelope): + try: + from meshai.persistence import get_db as _get_db_skip + _conn_skip = _get_db_skip() + _conn_skip.execute( + "INSERT INTO event_log(received_at, source, " + "category, severity_word, event_id_external, " + "nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (int(time.time()), "state_511_atis", + cat_raw + "|skip_id", None, + inner.get("id"), + subject, 0, None, None), + ) + except Exception: + logger.exception("state_511_id_skip log failed") + synthesized = None + n = None # short-circuit downstream dispatch + else: + n = _norm_envelope(envelope) + # v0.5.8 wfigs_handler dispatch -- WFIGS events route through + # the persistence-backed change-detection handler (which also + # logs to event_log for tombstones + perimeters). Other adapters + # with a normalized dict (state_511_atis, wzdx) flow through the + # work_zone renderer as before. + if n is not None and str(n.get("_kind", "")).startswith("wfigs"): + from meshai.central.wfigs_handler import handle_wfigs + synthesized = handle_wfigs(n, envelope, subject, data=data) or None + elif n is not None and category in ("work_zone", "road_closure", "road_incident"): + synthesized = format_work_zone_mesh(n) or None except Exception: logger.exception("normalizer/renderer failed for adapter=%s category=%s", inner.get("adapter"), category) diff --git a/meshai/central/incident_handler.py b/meshai/central/incident_handler.py new file mode 100644 index 0000000..418f2bb --- /dev/null +++ b/meshai/central/incident_handler.py @@ -0,0 +1,745 @@ +"""v0.5.9 unified incident handler. + +Three sources collapse into one persistence-backed change-detection pipeline: + + * tomtom_incidents (real-time crashes/jams/closures, TTI-uuid stable ID) + * state_511_atis (ITD incidents + closures + special events -- + the EventType branching the v0.5.8 parser missed) + * itd_511 (ITD's newer direct feed, Convention A subject) + +State_511_atis with category=work_zone continues to flow through the existing +v0.5.8 _parse_state_511_atis -> work_zone renderer (untouched). Only the +THREE non-work-zone EventTypes route here. + +Filtering at handler entrance (Matt's v0.5.9 Β§6): + * tomtom magnitude_of_delay==0 -> drop (no event_log row, no broadcast) + * tomtom time_validity != "present" -> drop + * everything else flows into the canonical pipeline. + +Change-detection (Matt's Β§5): + * NEW external_id -> 'New:' + * magnitude steps up -> 'Update:' + * delay doubles (>=2x) -> 'Update:' + * icon_category changes -> 'Update:' + * 8h elapsed since last bcast -> 'Update:' (heartbeat) + * otherwise -> drop silently + +Same callback pattern as WFIGS: handler attaches `_on_broadcast_committed` +to data; dispatcher invokes it AFTER successful deliver(). Cold-start +suppression leaves last_broadcast_* NULL so the next successful broadcast +still labels itself New:. +""" + +from __future__ import annotations + +import logging +import re +import time +from datetime import datetime, timezone +from typing import Any, Optional + +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +# v0.5.9 REVISED freshness gate -- drop incidents that started more +# than 30 min ago. Default-allow when start_time is missing so we err +# on the side of broadcasting potentially-fresh data. +INCIDENT_FRESHNESS_MAX_S = 30 * 60 # 1800 + +# Heartbeat retained as a constant for backward-compatible imports, but +# the v0.5.9 REVISED handler no longer fires Update broadcasts. State +# tracking continues to UPSERT current_* columns; the dispatcher just +# stops getting wire strings after the first New: broadcast per +# external_id. See WFIGS handler if you want post-first-broadcast +# behavior (fires keep their 8h-rate-limited Update flow). +INCIDENT_BROADCAST_HEARTBEAT_S = 8 * 60 * 60 # 28800 (unused) + + +# ---- canonical sub_type vocabulary -------------------------------------- + +# Tomtom icon_category int -> canonical sub_type. Anything missing maps to +# the generic 'incident' bucket so the wire string is never empty. +_TOMTOM_ICON_TO_SUB = { + 0: "incident", # unknown + 1: "accident", + 2: "fog", + 3: "danger", + 4: "rain", + 5: "ice", + 6: "jam", + 7: "lane_closed", + 8: "road_closed", + 9: "road_works", + 10: "wind", + 11: "flooding", + 12: "broken_down", + 14: "incident", # cluster +} + +# state_511 / itd_511 event_sub_type string -> canonical sub_type. +# Coverage in the 7-day Idaho sample shown after each entry. +_SUB_TYPE_511_MAP = { + "crash": "accident", # 28/41 + "incident": "incident", # 1/41 + "debrisOnRoadway": "debris", # 1/41 + "disabledVehicle": "disabled_vehicle", # 1/41 + "vehicleOnFire": "vehicle_on_fire", # 2/41 + "wildfire": "incident", # 1/41 + "wildfireInArea": "incident", # 1/41 + "leftLaneBlocked": "lane_closed", # 2/41 + "onRampBlocked": "ramp_closed", # 2/41 + "roadwayBlocked": "road_closed", # 2/41 + "roadConstruction": "road_works", + "pavementMarkingOperations": "road_works", + "pavementMarkingOperations ": "road_works", # trailing-space variant + "utilityWork": "road_works", + "singleLineTraffic:AlternatingDirections": "lane_closed", + "roadMaintenanceOperations": "road_works", + "pavingOperations": "road_works", + "bridgeConstruction": "road_works", + "bridgeMaintenanceOperations": "road_works", + "flaggingOperation": "lane_closed", + "brushControl": "road_works", + "constructionWork": "road_works", + "guardrailRepairs": "road_works", + "workOnTheShoulder": "road_works", + "nightTimeConstructionWork": "road_works", + "bridgeInspectionWork": "road_works", + "longTermRoadConstruction": "road_works", + "workOnUndergroundServices": "road_works", + "roadsideCleanupCrew": "road_works", + "RampRestriction": "lane_closed", + "parade": "parade", +} + +# Emoji per canonical sub_type. +_SUB_TYPE_EMOJI = { + "accident": "🚨", + "jam": "πŸš—", + "road_closed": "🚫", + "closure": "🚫", + "road_works": "🚧", + "lane_closed": "🟠", + "ramp_closed": "🟠", + "debris": "⚠️", + "vehicle_on_fire": "πŸ”₯", + "disabled_vehicle": "πŸ›‘", + "ice": "⚠️", + "fog": "⚠️", + "flooding": "🌊", + "wind": "🌬️", + "broken_down": "πŸ›ž", + "danger": "⚠️", + "rain": "⚠️", + "incident": "⚠️", + "special_event": "πŸŽͺ", + "parade": "πŸŽͺ", +} + +# Human-readable noun phrase per canonical sub_type. +_SUB_TYPE_PHRASE = { + "accident": "crash", + "jam": "jam", + "road_closed": "road closed", + "closure": "closure", + "road_works": "road works", + "lane_closed": "lane closed", + "ramp_closed": "ramp closed", + "debris": "debris on roadway", + "vehicle_on_fire": "vehicle fire", + "disabled_vehicle": "disabled vehicle", + "ice": "icy conditions", + "fog": "fog", + "flooding": "flooding", + "wind": "high wind", + "broken_down": "broken-down vehicle", + "danger": "dangerous conditions", + "rain": "heavy rain", + "incident": "incident", + "special_event": "special event", + "parade": "parade", +} + + +# ---- helpers ------------------------------------------------------------- + + +def _now() -> int: + return int(time.time()) + + +def _direction_short(dir_str: Optional[str]) -> Optional[str]: + if not dir_str: return None + s = str(dir_str).strip().lower() + if s.startswith("north"): return "N" + if s.startswith("south"): return "S" + if s.startswith("east"): return "E" + if s.startswith("west"): return "W" + if s in ("nb", "n"): return "N" + if s in ("sb", "s"): return "S" + if s in ("eb", "e"): return "E" + if s in ("wb", "w"): return "W" + if s in ("both", "both directions"): return "both" + return None + + +def _parse_iso_epoch(s: Optional[str]) -> Optional[int]: + if not s: return None + try: + return int(datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp()) + except Exception: + return None + + +def _parse_511_date_epoch(s: Optional[str]) -> Optional[int]: + """state_511 uses '5/28/26, 10:45 PM' format.""" + if not s: return None + try: + return int(datetime.strptime(s, "%m/%d/%y, %I:%M %p").replace( + tzinfo=timezone.utc).timestamp()) + except Exception: + return None + + +_TTI_RE = re.compile(r"TTI-([0-9a-f-]{36})") + + +def _tomtom_tti(envelope_id: Optional[str]) -> Optional[str]: + """Extract the stable TTI- piece from the per-poll inner.id. + + TomTom IDs look like: + ID:tomtom:TTI--TTR + The TTR rotates each poll; the TTI-uuid stays stable across + re-publishes of the same incident. + """ + if not envelope_id: return None + m = _TTI_RE.search(envelope_id) + return m.group(1) if m else envelope_id + + +def _tomtom_direction_from_description(desc: Optional[str]) -> Optional[str]: + if not desc: return None + s = desc.lower() + if "northbound" in s: return "N" + if "southbound" in s: return "S" + if "eastbound" in s: return "E" + if "westbound" in s: return "W" + return None + + +def _tomtom_road_label(d: dict) -> Optional[str]: + """Compose 'I-84 W' style label from road_numbers + direction.""" + nums = d.get("road_numbers") or [] + if nums: + return str(nums[0]) + return None # caller falls back to from/to or street name + + +# ---- per-source parsers -------------------------------------------------- + + +def _parse_tomtom_incident(envelope: dict, now: int) -> Optional[dict]: + """Returns the canonical incident dict, or None if filtered.""" + inner = envelope.get("data") or {} + d = inner.get("data") or {} + + # FILTER Β§6: magnitude_of_delay == 0 -> drop at handler entrance. + magnitude = d.get("magnitude_of_delay") + if magnitude == 0: + return None + + # FILTER Β§4: time_validity != 'present' -> drop past/future. + if d.get("time_validity") != "present": + return None + + external_id = _tomtom_tti(inner.get("id")) + if not external_id: + return None + + icon = d.get("icon_category") + sub_type = _TOMTOM_ICON_TO_SUB.get(icon, "incident") + + delay_s = d.get("delay") + delay_minutes = None + if isinstance(delay_s, (int, float)) and delay_s > 0: + delay_minutes = max(1, int(round(delay_s / 60))) + + ge = (d.get("_enriched") or {}).get("geocoder") or {} + + return { + "_kind": "incident", + "source": "tomtom_incidents", + "external_id": external_id, + "category_kind": "incident", + "road": _tomtom_road_label(d) or (d.get("from") or "road"), + "direction": _tomtom_direction_from_description(d.get("description")), + "mile_start": None, + "mile_end": None, + "county": ge.get("county"), + "state": d.get("state_code"), + "lat": d.get("latitude"), + "lon": d.get("longitude"), + "sub_type": sub_type, + "impact": None, + "delay_minutes": delay_minutes, + "delay_seconds": int(delay_s) if isinstance(delay_s, (int, float)) else None, + "magnitude": magnitude, + "icon_category": sub_type, + "from_loc": d.get("from"), + "to_loc": d.get("to"), + "start_at": _parse_iso_epoch(d.get("start_time")), + "end_at": _parse_iso_epoch(d.get("end_time")), + "geocoder_city": ge.get("city"), + "landclass": ge.get("landclass"), + } + + +def _parse_state_511_incident(envelope: dict, category_raw: str, now: int) -> Optional[dict]: + """Handle state_511_atis with category in (incident, closure, special_event). + Returns None for unsupported categories (work_zone is NOT handled here -- + that stays with the existing v0.5.8 _parse_state_511_atis path).""" + inner = envelope.get("data") or {} + d = inner.get("data") or {} + + if category_raw.startswith("incident."): kind = "incident" + elif category_raw.startswith("closure."): kind = "closure" + elif category_raw.startswith("special_event."): kind = "special_event" + else: return None + + external_id = inner.get("id") + if not external_id: + return None + + raw_sub = (d.get("event_sub_type") or "").strip() + sub_type = _SUB_TYPE_511_MAP.get(raw_sub) + if sub_type is None: + if kind == "closure" or d.get("is_full_closure"): + sub_type = "closure" + elif kind == "special_event": + sub_type = "special_event" + else: + sub_type = "incident" + + ge = (d.get("_enriched") or {}).get("geocoder") or {} + + return { + "_kind": "incident", + "source": "state_511_atis", + "external_id": external_id, + "category_kind": kind, + "road": d.get("roadway_name"), + "direction": _direction_short(d.get("direction")), + "mile_start": None, + "mile_end": None, + "county": d.get("county") or ge.get("county"), + "state": d.get("state_code"), + "lat": d.get("latitude"), + "lon": d.get("longitude"), + "sub_type": sub_type, + "impact": "all lanes closed" if d.get("is_full_closure") else None, + "delay_minutes": None, + "delay_seconds": None, + "magnitude": None, + "icon_category": sub_type, + "from_loc": None, + "to_loc": None, + "start_at": _parse_511_date_epoch(d.get("start_date")), + "end_at": None, + "geocoder_city": ge.get("city"), + "landclass": ge.get("landclass"), + } + + +def _parse_itd_511_incident(envelope: dict, category_raw: str, now: int) -> Optional[dict]: + inner = envelope.get("data") or {} + d = inner.get("data") or {} + + if category_raw.startswith("incident."): kind = "incident" + elif category_raw.startswith("closure."): kind = "closure" + elif category_raw.startswith("special_event."): kind = "special_event" + else: return None + + external_id = inner.get("id") + if not external_id: + return None + + raw_sub = (d.get("event_sub_type") or "").strip() + sub_type = _SUB_TYPE_511_MAP.get(raw_sub) + if sub_type is None: + # ITD has event_type_short ("closure", "incident", "work_zone", + # "special_event"); fall through to that. + sub_type = { + "incident": "incident", + "closure": "closure", + "work_zone": "road_works", + "special_event": "special_event", + }.get((d.get("event_type_short") or "").lower(), "incident") + + ge = (d.get("_enriched") or {}).get("geocoder") or {} + + return { + "_kind": "incident", + "source": "itd_511", + "external_id": external_id, + "category_kind": kind, + "road": d.get("roadway_name"), + "direction": _direction_short(d.get("direction")), + "mile_start": None, + "mile_end": None, + "county": ge.get("county"), + "state": "ID", + "lat": d.get("latitude"), + "lon": d.get("longitude"), + "sub_type": sub_type, + "impact": "all lanes closed" if d.get("is_full_closure") else None, + "delay_minutes": None, + "delay_seconds": None, + "magnitude": None, + "icon_category": sub_type, + "from_loc": None, + "to_loc": None, + "start_at": d.get("start_epoch"), + "end_at": d.get("planned_end_epoch"), + "geocoder_city": ge.get("city"), + "landclass": ge.get("landclass"), + } + + + +def _extract_start_time_epoch(envelope: dict, adapter: str) -> Optional[int]: + """Per-source start-time -> epoch seconds. None when the field is + missing or unparseable (caller treats None as 'do not gate').""" + inner = envelope.get("data") or {} + d = inner.get("data") or {} + if adapter == "tomtom_incidents": + # tomtom inner.data.start_time is ISO-8601 ("2026-06-01T21:08:11Z") + return _parse_iso_epoch(d.get("start_time")) + if adapter == "state_511_atis": + # state_511 uses "5/28/26, 10:45 PM" in inner.data.start_date + return _parse_511_date_epoch(d.get("start_date")) + if adapter == "itd_511": + # itd_511 carries start_epoch as a Unix epoch integer + val = d.get("start_epoch") + if isinstance(val, (int, float)) and val > 0: + return int(val) + return None + return None + + +# ---- main entry point ---------------------------------------------------- + + +def handle_incident(envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + """Unified incident handler. Returns the wire string when a broadcast + should fire, None otherwise.""" + if not isinstance(envelope, dict): + return None + + inner = envelope.get("data") or {} + adapter = inner.get("adapter") or "" + category_raw = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + now = now if now is not None else _now() + + try: + conn = get_db() + except Exception: + logger.exception("incident_handler: persistence unavailable") + return None + + # v0.5.9 GAMMA: state_511_atis Idaho cutover -- itd_511 is the + # authoritative ID source. state_511_atis remains active for non-ID + # neighbor coverage (WA, OR, MT). Skip ID events at handler entrance + # with event_log handled=0 reason='state_511_atis_id_replaced_by_itd_511' + # so the accounting trail makes the cutover visible. + if adapter == "state_511_atis": + sd = (envelope.get("data") or {}).get("data") or {} + sgeo = (envelope.get("data") or {}).get("geo") or {} + if (sd.get("state_code") == "ID" + or sgeo.get("primary_region") == "US-ID"): + _log_event(conn, now=now, source="state_511_atis", + category=category_raw + "|skip_id", + severity_word=severity_word, + event_id_external=inner.get("id"), + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # v0.5.9 REVISED gate (B): freshness check at handler entrance. + # Computed BEFORE per-source parse + before the mag=0/past/future + # filters, so stale envelopes never UPSERT into traffic_events. + # Missing start_time -> default-allow (treat as fresh). + start_epoch = _extract_start_time_epoch(envelope, adapter) + if start_epoch is not None: + age_s = now - start_epoch + # v0.5.9 GAMMA: reject FUTURE-scheduled events (age < 0) as well + # as stale events (age > window). itd_511 work_zone envelopes can + # carry start_epoch many days in the future for scheduled + # construction projects; under the previous one-sided check + # those slipped through. Spec re-read: 'skip even New: broadcast + # if the underlying event began more than 30 min ago' implies + # the event must have BEGUN. + if age_s < 0 or age_s > INCIDENT_FRESHNESS_MAX_S: + logger.debug( + "incident freshness gate: dropping source=%s subject=%s " + "age=%ds (window=[0, %d])", + adapter, subject, age_s, INCIDENT_FRESHNESS_MAX_S, + ) + _log_event(conn, now=now, source=adapter, category=category_raw, + severity_word=severity_word, + event_id_external=inner.get("id"), + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # Per-source parse (returns None when filtered). + if adapter == "tomtom_incidents": + n = _parse_tomtom_incident(envelope, now) + elif adapter == "state_511_atis": + n = _parse_state_511_incident(envelope, category_raw, now) + elif adapter == "itd_511": + n = _parse_itd_511_incident(envelope, category_raw, now) + else: + return None + + if n is None: + # Filtered envelope -- log to event_log handled=0 with no fires/ + # traffic_events row, no broadcast. Lets us account for upstream + # noise without polluting the broadcast pipeline. + _log_event(conn, now=now, source=adapter, category=category_raw, + severity_word=severity_word, + event_id_external=inner.get("id"), + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + external_id = n["external_id"] + source = n["source"] + pk_combined = f"{source}|{external_id}" + + log_id = _log_event_returning_id( + conn, now=now, source=source, category=category_raw, + severity_word=severity_word, + event_id_external=external_id, + subject=subject, handled=0, + table_name="traffic_events", table_pk=pk_combined) + + row = conn.execute( + "SELECT first_seen_at, last_seen_at, last_broadcast_at, " + "last_broadcast_magnitude, last_broadcast_delay_seconds, " + "last_broadcast_icon_category FROM traffic_events " + "WHERE source=? AND external_id=?", + (source, external_id), + ).fetchone() + + if row is None: + # NEW external_id -- INSERT, return 'New:' wire, callback updates + # last_broadcast_* on dispatcher commit. + conn.execute( + "INSERT INTO traffic_events(source, external_id, road, direction, " + "mile_start, mile_end, county, state, lat, lon, sub_type, impact, " + "start_at, end_at, first_seen_at, last_seen_at, last_broadcast_at, " + "magnitude_of_delay, delay_seconds, icon_category, " + "last_broadcast_magnitude, last_broadcast_delay_seconds, " + "last_broadcast_icon_category) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + (source, external_id, n["road"], n["direction"], + n["mile_start"], n["mile_end"], n["county"], n["state"], + n["lat"], n["lon"], n["sub_type"], n["impact"], + n["start_at"], n["end_at"], now, now, None, + n["magnitude"], n["delay_seconds"], n["icon_category"], + None, None, None), + ) + wire = _render(n, prefix="New") + _attach_commit_handles(data, source=source, external_id=external_id, + magnitude=n["magnitude"], + delay_seconds=n["delay_seconds"], + icon_category=n["icon_category"], + event_log_row_id=log_id) + return wire + + # EXISTING incident -- always UPSERT current fields + last_seen_at. + conn.execute( + "UPDATE traffic_events SET sub_type=?, impact=?, magnitude_of_delay=?, " + "delay_seconds=?, icon_category=?, last_seen_at=?, " + "lat=COALESCE(?, lat), lon=COALESCE(?, lon), " + "direction=COALESCE(?, direction), road=COALESCE(?, road) " + "WHERE source=? AND external_id=?", + (n["sub_type"], n["impact"], n["magnitude"], n["delay_seconds"], + n["icon_category"], now, n["lat"], n["lon"], + n["direction"], n["road"], source, external_id), + ) + + last_bcast_at = row["last_broadcast_at"] + last_bcast_mag = row["last_broadcast_magnitude"] + last_bcast_delay = row["last_broadcast_delay_seconds"] + last_bcast_icon = row["last_broadcast_icon_category"] + + # Cold-start race: row exists from a prior INSERT but the dispatcher + # dropped the broadcast (grace, cooldown, etc.). last_broadcast_at is + # still NULL -> the next successful broadcast still labels itself New:. + if last_bcast_at is None: + wire = _render(n, prefix="New") + _attach_commit_handles(data, source=source, external_id=external_id, + magnitude=n["magnitude"], + delay_seconds=n["delay_seconds"], + icon_category=n["icon_category"], + event_log_row_id=log_id) + return wire + + # v0.5.9 REVISED gate (A): once we've successfully broadcast this + # external_id (last_broadcast_at IS NOT NULL), no further mesh + # traffic for it -- magnitude jumps, delay growth, icon flips, and + # heartbeats all stay in the table for state queries but do NOT + # synthesize a wire string. Matt's reasoning: 'should be no old + # broadcasts, just new' -- traffic updates aren't actionable enough + # to justify spamming. WFIGS keeps its 8h Update flow (operationally + # meaningful for fires). + return None + + +# ---- commit-callback factory -------------------------------------------- + + +def _attach_commit_handles(data: Optional[dict], *, source: str, + external_id: str, + magnitude: Optional[int], + delay_seconds: Optional[int], + icon_category: Optional[str], + event_log_row_id: Optional[int]) -> None: + if not isinstance(data, dict): + return + + def _on_commit(committed_at: float) -> None: + try: + conn = get_db() + except Exception: + logger.exception("incident commit callback: persistence unavailable") + return + conn.execute( + "UPDATE traffic_events SET last_broadcast_at=?, " + "last_broadcast_magnitude=?, last_broadcast_delay_seconds=?, " + "last_broadcast_icon_category=? " + "WHERE source=? AND external_id=?", + (int(committed_at), magnitude, delay_seconds, icon_category, + source, external_id), + ) + if event_log_row_id is not None: + conn.execute("UPDATE event_log SET handled=1 WHERE id=?", + (int(event_log_row_id),)) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = {"table": "traffic_events", + "pk": f"{source}|{external_id}"} + + +# ---- event_log helpers --------------------------------------------------- + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _log_event(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> None: + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk), + ) + + +def _log_event_returning_id(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> int: + cur = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk), + ) + return int(cur.lastrowid) + + +# ---- renderer ------------------------------------------------------------ + + +def _render(n: dict, *, prefix: str = "") -> str: + """MEDIUM-style wire string. Pattern: + + {emoji} {prefix}: {road} near {anchor}: {phrase}{impact}{delay}{coords} + + `delay` segment is OMITTED when delay_minutes is None (Matt's Β§3). + `coords` segment is omitted when lat/lon are None (state_511 closures + sometimes lack coords). + """ + sub_type = n.get("sub_type") or "incident" + emoji = _SUB_TYPE_EMOJI.get(sub_type, "⚠️") + phrase = _SUB_TYPE_PHRASE.get(sub_type, "incident") + + road = n.get("road") or "road" + direction = n.get("direction") + if direction and direction != "both" and direction not in str(road): + road_label = f"{road} {direction}" + else: + road_label = road + + anchor = _location_anchor(n) + + delay_minutes = n.get("delay_minutes") + delay_seg = f", {delay_minutes} min delay" if delay_minutes else "" + + impact = n.get("impact") + impact_seg = f", {impact}" if impact else "" + + lat = n.get("lat") + lon = n.get("lon") + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + coords = f", @ {lat:.3f},{lon:.3f}" + else: + coords = "" + + prefix_str = f"{prefix}: " if prefix else "" + return f"{emoji} {prefix_str}{road_label} near {anchor}: {phrase}{impact_seg}{delay_seg}{coords}" + + +def _location_anchor(n: dict) -> str: + """Anchor priority: geocoder.city > nearest_town > landclass > county.""" + city = n.get("geocoder_city") + if city: + return str(city) + lat = n.get("lat") + lon = n.get("lon") + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + try: + from meshai.central_normalizer import nearest_town + nt = nearest_town(lat, lon, max_distance_mi=100.0) + except Exception: + nt = None + if nt and nt.get("name"): + town = nt["name"] + d = nt.get("distance_mi") + if isinstance(d, (int, float)): + if d < 1: return f"near {town}" + bearing = nt.get("bearing") or "" + return f"{int(round(d))} mi {bearing} of {town}".strip() + return str(town) + landclass = n.get("landclass") + if landclass: + return str(landclass) + county = n.get("county") + state = n.get("state") + if county and state: return f"{county} Co {state}" + if state: return str(state) + return "(location unknown)" diff --git a/meshai/central_normalizer.py b/meshai/central_normalizer.py index 287c44d..e98a26d 100644 --- a/meshai/central_normalizer.py +++ b/meshai/central_normalizer.py @@ -740,6 +740,73 @@ def _parse_wfigs_incidents(inner_data: dict, geo: dict) -> dict: } + +# ---------- itd_511 work_zone parser (v0.5.9 GAMMA) ---------------------- + +def _itd_ends_at(planned_end_epoch) -> Optional[datetime]: + """itd_511 stores planned_end_epoch as a Unix int (or None).""" + if not isinstance(planned_end_epoch, (int, float)) or planned_end_epoch <= 0: + return None + try: + return datetime.fromtimestamp(int(planned_end_epoch), tz=timezone.utc) + except (ValueError, OSError): + return None + + +def _parse_itd_511_work_zone(inner_data: dict, geo: dict) -> dict: + """Normalize an itd_511 work_zone (or closure-acting-as-work-zone) + envelope into the work_zone renderer's flat dict shape. + + Mirrors _parse_state_511_atis output: same keys, same town/distance + fallback chain. The renderer consumes both via format_work_zone_mesh. + """ + desc_raw = inner_data.get("description") or "" + desc = _clean_description(desc_raw) + mile_start, mile_end = _parse_mile_posts(desc or "") + + ends_at = _itd_ends_at(inner_data.get("planned_end_epoch")) + is_full = bool(inner_data.get("is_full_closure")) + impact = "full_closure" if is_full else "partial" + + road = normalize_road_name(inner_data.get("roadway_name")) + if _is_uninformative_road(road): + road = None + + event_lat = inner_data.get("latitude") + event_lon = inner_data.get("longitude") + if event_lat is None and geo.get("centroid"): + try: event_lon, event_lat = geo["centroid"][0], geo["centroid"][1] + except (IndexError, TypeError): pass + + enriched = (inner_data.get("_enriched") or {}).get("geocoder") or {} + town = (enriched.get("city") or "").strip() or None + distance_mi: Optional[int] = None + bearing: Optional[str] = None + if town: + distance_mi, bearing = _compute_distance_bearing(event_lat, event_lon, town) + else: + nt = nearest_town(event_lat, event_lon) if event_lat is not None else None + if nt: + town = nt.get("name") + distance_mi = nt.get("distance_mi") + bearing = nt.get("bearing") + + return { + "source": "itd_511", + "road": road, + "direction": _norm_direction(inner_data.get("direction")), + "mile_start": mile_start, + "mile_end": mile_end, + "description": desc, + "sub_type": _norm_sub_type(inner_data.get("event_sub_type")), + "impact": impact, + "ends_at": ends_at, + "town": town, + "distance_mi": distance_mi, + "bearing": bearing, + } + + # ---------- public entry point -------------------------------------------- def normalize(envelope: dict) -> Optional[dict]: @@ -755,9 +822,20 @@ def normalize(envelope: dict) -> Optional[dict]: geo = inner.get("geo") or {} if adapter == "state_511_atis": + # Parser stays pure: returns parsed dict for ALL states. The + # v0.5.9 GAMMA Idaho-cutover decision lives in the consumer + # (skip + event_log handled=0 before dispatching here). See + # should_skip_state_511_atis_id() below for the test-friendly + # helper that the consumer uses. return _parse_state_511_atis(inner_data, geo) if adapter == "wzdx": return _parse_wzdx_federal(inner_data, geo) + # v0.5.9 GAMMA: itd_511 work_zone parser (incident/closure/special_event + # still route through incident_handler per v0.5.9; work_zone is the + # only EventType that uses the work_zone renderer + Format). + if adapter == "itd_511": + if (inner.get("category") or "").startswith("work_zone."): + return _parse_itd_511_work_zone(inner_data, geo) # v0.5.8 WFIGS dispatch -- incidents + tombstones + perimeters. # The handler downstream uses _kind to route to change-detection @@ -787,3 +865,93 @@ def normalize(envelope: dict) -> Optional[dict]: # Other adapters await per-adapter parsers; return None to defer. return None + + +def should_skip_state_511_atis_id(envelope: dict) -> bool: + """v0.5.9 GAMMA decision helper: True when this envelope is a + state_511_atis publish for an Idaho event (state_code='ID' or + primary_region='US-ID'). + + Used by the consumer to decide 'skip + event_log handled=0' before + dispatching to either the work_zone renderer or the incident_handler. + Kept out of the parser so test_central_normalizer's existing ID + fixtures continue to exercise _parse_state_511_atis directly. + """ + if not isinstance(envelope, dict): + return False + inner = envelope.get("data") or {} + if (inner.get("adapter") or "") != "state_511_atis": + return False + d = inner.get("data") or {} + geo = inner.get("geo") or {} + return (d.get("state_code") == "ID" + or geo.get("primary_region") == "US-ID") + + + +# ---------- v0.5.9 GAMMA universal freshness helper ----------------------- + + +def _parse_iso_epoch_freshness(s: Optional[str]) -> Optional[int]: + """Local copy of the ISO parser used by the universal freshness gate. + Duplicated rather than imported from incident_handler so the dependency + graph stays one-directional (consumer -> central_normalizer).""" + if not s: return None + try: + from datetime import datetime as _dt + return int(_dt.fromisoformat(s.replace("Z", "+00:00")).timestamp()) + except Exception: + return None + + +def _parse_511_date_epoch_freshness(s: Optional[str]) -> Optional[int]: + if not s: return None + try: + from datetime import datetime as _dt, timezone as _tz + return int(_dt.strptime(s, "%m/%d/%y, %I:%M %p").replace( + tzinfo=_tz.utc).timestamp()) + except Exception: + return None + + +def is_incident_envelope_stale(envelope: dict, now: int, + max_age_s: int = 1800) -> bool: + """v0.5.9 GAMMA universal freshness gate. Returns True iff the envelope + should be DROPPED on freshness grounds. + + Per-source start-time fields: + tomtom_incidents -> inner.data.start_time (ISO-8601) + state_511_atis -> inner.data.start_date ("5/28/26, 10:45 PM") + itd_511 -> inner.data.start_epoch (Unix int) + other adapters -> None (default-allow; the gate has nothing to do) + + Two-sided check: 0 <= age <= max_age_s. Negative ages reject future- + scheduled events (e.g. itd_511 work_zone planned to start days from + now); ages > max_age_s reject stale events. None / missing start time + defaults to ALLOW so we err on the side of broadcasting potentially- + fresh data with incomplete metadata. + + Pure (no side effects); caller decides to log + skip when this returns + True. + """ + if not isinstance(envelope, dict): return False + inner = envelope.get("data") or {} + adapter = inner.get("adapter") or "" + d = inner.get("data") or {} + + se: Optional[int] = None + if adapter == "tomtom_incidents": + se = _parse_iso_epoch_freshness(d.get("start_time")) + elif adapter == "state_511_atis": + se = _parse_511_date_epoch_freshness(d.get("start_date")) + elif adapter == "itd_511": + val = d.get("start_epoch") + if isinstance(val, (int, float)) and val > 0: + se = int(val) + else: + return False # adapter not in scope of this gate + + if se is None: + return False # default-allow on missing start time + age = now - se + return age < 0 or age > max_age_s diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index 23b86d6..c68f688 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 1 +SCHEMA_VERSION = 2 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v2.sql b/meshai/persistence/migrations/v2.sql new file mode 100644 index 0000000..3fc8df1 --- /dev/null +++ b/meshai/persistence/migrations/v2.sql @@ -0,0 +1,31 @@ +-- v0.5.9 schema migration: add incident-handler columns to traffic_events. +-- +-- The v0.5.8b traffic_events table covered state_511_atis + wzdx with road/ +-- direction/mile_post/county/state fields. The unified incident handler +-- (tomtom_incidents + state_511_atis incidents/closures + itd_511) needs: +-- +-- * `magnitude_of_delay` : TomTom 0-4 severity int, NULL for the +-- 511 sources that don't carry it +-- * `delay_seconds` : TomTom delay measurement, NULL otherwise +-- * `icon_category` : canonical sub_type word (cross-source) +-- * `last_broadcast_magnitude` : snapshot of magnitude when we last fired +-- * `last_broadcast_delay_seconds` : snapshot of delay when we last fired +-- * `last_broadcast_icon_category` : snapshot of icon_category when we last fired +-- +-- The three `last_broadcast_*` columns power per-incident change-detection +-- (Matt's v0.5.9 spec Β§5): broadcast Update only when magnitude bumps up, +-- delay doubles, icon_category changes, or 8h heartbeat fires. SQLite stores +-- NULL columns cheaply so the work-zone adapters can ignore these fields. + +ALTER TABLE traffic_events ADD COLUMN magnitude_of_delay INTEGER; +ALTER TABLE traffic_events ADD COLUMN delay_seconds INTEGER; +ALTER TABLE traffic_events ADD COLUMN icon_category TEXT; +ALTER TABLE traffic_events ADD COLUMN last_broadcast_magnitude INTEGER; +ALTER TABLE traffic_events ADD COLUMN last_broadcast_delay_seconds INTEGER; +ALTER TABLE traffic_events ADD COLUMN last_broadcast_icon_category TEXT; + +-- Index for the change-detection hot path (per-source per-external_id lookup +-- is already covered by the composite PK; this index helps the "who hasn't +-- broadcast in 8h" sweep queries that a watchdog will use in v0.5.10). +CREATE INDEX IF NOT EXISTS idx_traffic_source_lastbcast + ON traffic_events(source, last_broadcast_at); diff --git a/tests/test_central_normalizer.py b/tests/test_central_normalizer.py index 6dff855..6d329e1 100644 --- a/tests/test_central_normalizer.py +++ b/tests/test_central_normalizer.py @@ -673,3 +673,132 @@ def test_wzdx_sub_type_unknown_vocab_is_lowercased_with_spaces(monkeypatch): "_enriched": {"geocoder": {"city": "Boise"}}}}} n = normalize(env) assert n["sub_type"] == "some custom work" # lowercased + hyphensβ†’spaces + + + +# ============================================================================ +# v0.5.9 GAMMA -- state_511_atis Idaho cutover +# ============================================================================ + + +def _state_511_envelope(state_code="ID", primary_region="US-ID"): + return { + "subject": "central.traffic.work_zone.id", + "id": "ID:Construction:33333", + "data": { + "id": "ID:Construction:33333", "adapter": "state_511_atis", + "category": "work_zone.state_511_atis", "severity": 1, + "geo": {"centroid": [-116.79, 47.70], + "primary_region": primary_region}, + "data": { + "roadway_name": "US-95", "direction": "Both", + "event_sub_type": "brushControl", + "description": "Minor Brush control on US-95.", + "is_full_closure": False, "layer": "Construction", + "county": "Kootenai", "state": "Idaho", + "state_code": state_code, + "start_date": "6/1/26, 5:00 AM", + "last_updated": "5/28/26, 12:54 PM", + "latitude": 47.7, "longitude": -116.79, + "_enriched": {"geocoder": { + "city": "Coeur d'Alene", "county": "Kootenai", + }}, + }, + }, + } + + +def test_gamma_should_skip_state_511_atis_id_via_state_code(): + """Helper returns True when state_code='ID'.""" + from meshai.central_normalizer import should_skip_state_511_atis_id + env = _state_511_envelope(state_code="ID") + assert should_skip_state_511_atis_id(env) is True + + +def test_gamma_should_skip_state_511_atis_id_via_primary_region(): + """Helper returns True when only primary_region='US-ID' is set.""" + from meshai.central_normalizer import should_skip_state_511_atis_id + env = _state_511_envelope(state_code="", primary_region="US-ID") + assert should_skip_state_511_atis_id(env) is True + + +def test_gamma_should_skip_state_511_atis_id_false_for_non_id(): + """Helper returns False for neighbor states.""" + from meshai.central_normalizer import should_skip_state_511_atis_id + env = _state_511_envelope(state_code="WA", primary_region="US-WA") + assert should_skip_state_511_atis_id(env) is False + + +def test_gamma_state_511_non_id_still_parses(): + """state_511_atis with state_code='WA' continues to be parsed -- + neighbor-state coverage remains active after the Idaho cutover. + With the v0.5.9 GAMMA fixup, the parser is unconditionally pure; + this test guards against a future regression that would put the + skip back into normalize().""" + env = _state_511_envelope(state_code="WA", primary_region="US-WA") + n = normalize(env) + assert n is not None + assert n.get("source") == "state_511_atis" + assert n.get("road") == "US-95" + + +def test_gamma_itd_511_work_zone_dispatch(): + """itd_511 + category=work_zone.* routes to _parse_itd_511_work_zone.""" + env = { + "subject": "central.traffic.work_zone.us.id", + "id": "ITD:469:99", + "data": { + "id": "ITD:469:99", "adapter": "itd_511", + "category": "work_zone.itd_511", "severity": 1, + "geo": {"centroid": [-116.79, 47.70], + "primary_region": "US-ID"}, + "data": { + "event_type_short": "work_zone", + "event_sub_type": "roadConstruction", + "roadway_name": "I-90", "direction": "Both", + "description": "Road construction on I-90.", + "is_full_closure": False, + "comment": "", "cause": "roadwork", + "organization": "ERS", + "start_epoch": 1780600000, + "planned_end_epoch": 1781000000, + "latitude": 47.7, "longitude": -116.79, + "_enriched": {"geocoder": { + "city": "Coeur d'Alene", "county": "Kootenai", + }}, + }, + }, + } + n = normalize(env) + assert n is not None + assert n.get("source") == "itd_511" + assert n.get("road") == "I-90" + + +def test_gamma_itd_511_non_work_zone_returns_none_or_marker(): + """itd_511 + category=incident.* should NOT go through work_zone parser.""" + env = { + "subject": "central.traffic.incident.us.id", + "id": "ITD:469:100", + "data": { + "id": "ITD:469:100", "adapter": "itd_511", + "category": "incident.itd_511", "severity": 1, + "geo": {"primary_region": "US-ID"}, + "data": { + "event_type_short": "incident", + "event_sub_type": "crash", + "roadway_name": "I-84", "direction": "East", + "description": "Crash on I-84.", + "is_full_closure": False, "comment": "", + "cause": "crash", "organization": "ERS", + "start_epoch": 1780600000, + "planned_end_epoch": None, + "latitude": 43.5, "longitude": -116.5, + "_enriched": {"geocoder": {"city": "Boise"}}, + }, + }, + } + n = normalize(env) + # Either None or not a work_zone shape -- never a parsed work_zone. + if n is not None: + assert n.get("source") != "itd_511" or "road" not in n diff --git a/tests/test_incident_handler.py b/tests/test_incident_handler.py new file mode 100644 index 0000000..f82a37b --- /dev/null +++ b/tests/test_incident_handler.py @@ -0,0 +1,798 @@ +"""Tests for meshai.central.incident_handler (v0.5.9). + +Coverage: + Tomtom parsing + rendering (a/b/c/d): + (a) jam, accident, road_closed, lane_closed, road_works each render with + correct emoji + phrase + delay segment + (b) magnitude_of_delay == 0 events filtered at entrance + (c) delay == null events render WITHOUT the delay segment + (d) time_validity past/future events filtered + + Per-incident change-detection (e-i): + (e) republish with no change -> drop silently, no new audit + (f) magnitude bump up -> Update + (g) delay double (>=2x) -> Update + (h) icon change -> Update + (i) 8h heartbeat -> Update + + state_511 / itd_511 EventType branching (j-m): + (j) state_511_atis incident parses + (k) state_511_atis closure parses + (l) state_511_atis special_event parses (synthetic) + (m) itd_511 incident parses + + Decoupled callback (n) and traffic_events UPSERT (o): + (n) cold-start scenario -- handler runs but callback never fires; + second pass still emits New: (not Update:) + (o) existing traffic_events row gets UPSERTed across passes +""" + +import re +import time + +import pytest + +from meshai.central.incident_handler import ( + INCIDENT_BROADCAST_HEARTBEAT_S, + handle_incident, +) +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +# ---------- fixtures ------------------------------------------------------ + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "incident-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +@pytest.fixture +def no_photon(monkeypatch): + import meshai.central_normalizer as cn + monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: []) + if hasattr(cn, "_H3_NEAREST_CACHE"): + cn._H3_NEAREST_CACHE.clear() + + +# ---------- tomtom envelope builder -------------------------------------- + + +_TTI_A = "cfb0c03f-9ab9-46f9-ac21-9b17d0f715f2" +_TTI_B = "13ca4176-4eea-428e-a807-49bee662159a" +_TTI_C = "3573b54b-9e55-4aff-83d3-253048825e77" + + +def _tomtom_env(*, tti=_TTI_A, + icon_category=6, + magnitude=2, + delay=412, + time_validity="present", + description="Queuing traffic on I-84 Westbound from Orchard St to ID-55. ", + road_numbers=("I-84",), + from_loc="Orchard St/Exit 52 (I-84)", + to_loc="ID-55/Exit 46 (I-84)", + start_time=None, + end_time=None, + lat=43.5833926533, lon=-116.2598321532, + state_code="ID", bbox_name="treasure_valley_ext", + geocoder_city="Boise"): + inner_id = f"ID:tomtom:TTI-{tti}-TTR{int(time.time()*1000)}" + geocoder = {"city": geocoder_city, "county": "Ada", "state": "ID", + "country": "United States", "landclass": None} + return { + "id": inner_id, + "subject": "central.traffic.incident.id", + "data": { + "id": inner_id, "adapter": "tomtom_incidents", + "category": "incident.tomtom_incidents", "severity": 1, + "geo": {"centroid": [lon, lat], "primary_region": "US-ID"}, + "data": { + "id": inner_id, + "description": description, + "event_code": 108, + "from": from_loc, "to": to_loc, + "magnitude_of_delay": magnitude, + "icon_category": icon_category, + "length": 6112.13, + "delay": delay, + "road_numbers": list(road_numbers), + "start_time": start_time, "end_time": end_time, + "time_validity": time_validity, + "state_code": state_code, "bbox_name": bbox_name, + "latitude": lat, "longitude": lon, + "_enriched": {"geocoder": geocoder}, + }, + }, + } + + +def _state_511_env(*, layer="Incidents", category_prefix="incident", + event_sub_type="crash", + roadway="US-95", direction="Both", + is_full_closure=False, + external_id="ID:Incidents:33948", + lat=48.5295, lon=-116.4293, + geocoder_city="Naples", county="Boundary"): + return { + "id": external_id, + "subject": f"central.traffic.{category_prefix}.id", + "data": { + "id": external_id, "adapter": "state_511_atis", + "category": f"{category_prefix}.state_511_atis", "severity": 1, + "geo": {"centroid": [lon, lat], "primary_region": "US-WA"}, + "data": { + "roadway_name": roadway, "direction": direction, + "event_sub_type": event_sub_type, + "description": "test event", + "is_full_closure": is_full_closure, + "layer": layer, + # v0.5.9 GAMMA: default to non-ID neighbor state because + # state_511_atis no longer covers Idaho (itd_511 took over). + # Tests that want to exercise the ID-skip path override these. + "county": county, "state": "Washington", "state_code": "WA", + "start_date": None, + "last_updated": None, + "latitude": lat, "longitude": lon, + "_enriched": {"geocoder": { + "city": geocoder_city, "county": county, "state": "ID", + "country": "United States", "landclass": None, + }}, + }, + }, + } + + +def _itd_511_env(*, category_prefix="incident", + event_type_short="incident", + event_sub_type="crash", + roadway="I-84", direction="East", + is_full_closure=False, + external_id="ITD:469:17", + lat=43.6486, lon=-116.4870, + geocoder_city="Caldwell"): + return { + "id": external_id, + "subject": f"central.traffic.{category_prefix}.us.id", + "data": { + "id": external_id, "adapter": "itd_511", + "category": f"{category_prefix}.itd_511", "severity": 1, + "geo": {"centroid": [lon, lat], "primary_region": "US-ID"}, + "data": { + "event_type_short": event_type_short, + "event_sub_type": event_sub_type, + "roadway_name": roadway, "direction": direction, + "description": "test itd event", + "lanes_affected": "All lanes affected", + "is_full_closure": is_full_closure, + "itd_severity": "None", + "comment": "", "cause": "roadwork", + "organization": "ERS", + "recurrence_text": "", "recurrence_schedules": [], + "restrictions": {}, "encoded_polyline": "", + "id_internal": 17, "source_id": "469", + "reported_epoch": None, + "last_updated_epoch": None, + "start_epoch": None, + "planned_end_epoch": None, + "latitude": lat, "longitude": lon, + "_enriched": {"geocoder": { + "city": geocoder_city, "county": "Canyon", "state": "ID", + }}, + }, + }, + } + + +def _commit(data, committed_at): + cb = data.get("_on_broadcast_committed") + assert callable(cb), "handler must attach commit callback" + cb(committed_at) + + +# ============================================================================ +# (a) tomtom parsing -- all five icon categories render correctly +# ============================================================================ + + +@pytest.mark.parametrize("icon, expected_emoji, expected_phrase", [ + (1, "🚨", "crash"), + (6, "πŸš—", "jam"), + (7, "🟠", "lane closed"), + (8, "🚫", "road closed"), + (9, "🚧", "road works"), +]) +def test_a_tomtom_icon_renders(mem_db, no_photon, icon, expected_emoji, expected_phrase): + env = _tomtom_env(icon_category=icon, magnitude=2, delay=300) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith(f"{expected_emoji} New: I-84") + assert expected_phrase in wire + assert "near Boise" in wire + assert "5 min delay" in wire + assert "@ 43.583,-116.260" in wire + + +# ============================================================================ +# (b) tomtom magnitude_of_delay == 0 events filtered +# ============================================================================ + + +def test_b_tomtom_magnitude_zero_filtered(mem_db, no_photon): + env = _tomtom_env(icon_category=6, magnitude=0, delay=10) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is None + # filtered envelope leaves no traffic_events row + n_rows = mem_db.execute("SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"] + assert n_rows == 0 + # but the event IS logged to event_log handled=0 for accounting + n_log = mem_db.execute( + "SELECT COUNT(*) AS n FROM event_log WHERE source='tomtom_incidents'" + ).fetchone()["n"] + assert n_log == 1 + assert "_on_broadcast_committed" not in data + + +# ============================================================================ +# (c) tomtom delay == null events render WITHOUT delay segment +# ============================================================================ + + +def test_c_tomtom_delay_null_no_delay_segment(mem_db, no_photon): + env = _tomtom_env(icon_category=1, magnitude=3, delay=None) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert "crash" in wire + assert "min delay" not in wire # no delay segment + assert "@ 43.583,-116.260" in wire + + +# ============================================================================ +# (d) tomtom time_validity past/future filtered +# ============================================================================ + + +@pytest.mark.parametrize("validity", ["past", "future"]) +def test_d_tomtom_time_validity_filtered(mem_db, no_photon, validity): + env = _tomtom_env(icon_category=6, magnitude=2, delay=300, + time_validity=validity) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is None + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"] + assert n_rows == 0 + + +# ============================================================================ +# (e) per-incident dedup -- republish with no change drops silently +# ============================================================================ + + +def test_e_per_incident_dedup_no_change(mem_db, no_photon): + env = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + wire1 = handle_incident(env, env["subject"], data=data1, now=1_000_000) + assert wire1 is not None + _commit(data1, 1_000_001) + + # Re-publish 5 minutes later, same magnitude/delay/icon. + data2 = {} + wire2 = handle_incident(env, env["subject"], data=data2, now=1_000_300) + assert wire2 is None # no change, no broadcast + + +# ============================================================================ +# (f) magnitude bump triggers Update +# ============================================================================ + + +def test_f_magnitude_bump_triggers_update(mem_db, no_photon): + env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + handle_incident(env1, env1["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + + # v0.5.9 REVISED gate (A): magnitude bump no longer fires Update. + # State still flips in traffic_events, but no wire string returns. + env2 = _tomtom_env(icon_category=6, magnitude=3, delay=300) + data2 = {} + wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300) + assert wire2 is None + # Current magnitude tracked in the row. + row = mem_db.execute( + "SELECT magnitude_of_delay FROM traffic_events " + "WHERE source='tomtom_incidents'").fetchone() + assert row["magnitude_of_delay"] == 3 + + +# ============================================================================ +# (g) delay double triggers Update +# ============================================================================ + + +def test_g_delay_double_triggers_update(mem_db, no_photon): + env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + handle_incident(env1, env1["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + + # v0.5.9 REVISED gate (A): delay double no longer fires Update. + env2 = _tomtom_env(icon_category=6, magnitude=2, delay=700) + data2 = {} + wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300) + assert wire2 is None + row = mem_db.execute( + "SELECT delay_seconds FROM traffic_events " + "WHERE source='tomtom_incidents'").fetchone() + assert row["delay_seconds"] == 700 + + +def test_g_delay_below_double_no_update(mem_db, no_photon): + """delay 300 -> 500 (1.67x) should NOT trigger broadcast.""" + env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + handle_incident(env1, env1["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + + env2 = _tomtom_env(icon_category=6, magnitude=2, delay=500) + data2 = {} + wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300) + assert wire2 is None + + +# ============================================================================ +# (h) icon change triggers Update +# ============================================================================ + + +def test_h_icon_change_triggers_update(mem_db, no_photon): + env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + handle_incident(env1, env1["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + + # v0.5.9 REVISED gate (A): icon change no longer fires Update. + env2 = _tomtom_env(icon_category=8, magnitude=2, delay=300) + data2 = {} + wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300) + assert wire2 is None + row = mem_db.execute( + "SELECT icon_category FROM traffic_events " + "WHERE source='tomtom_incidents'").fetchone() + assert row["icon_category"] == "road_closed" + + +# ============================================================================ +# (i) 8h heartbeat triggers Update +# ============================================================================ + + +def test_i_8h_heartbeat_triggers_update(mem_db, no_photon): + env = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + handle_incident(env, env["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + + # v0.5.9 REVISED gate (A): heartbeat no longer fires Update. + later = 1_000_001 + INCIDENT_BROADCAST_HEARTBEAT_S + data2 = {} + wire2 = handle_incident(env, env["subject"], data=data2, now=later) + assert wire2 is None + + +# ============================================================================ +# (j) state_511_atis incident parses +# ============================================================================ + + +def test_j_state_511_incident_parses(mem_db, no_photon): + env = _state_511_env(layer="Incidents", category_prefix="incident", + event_sub_type="crash") + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith("🚨 New: US-95") # crash -> 🚨 + assert "crash" in wire + assert "near Naples" in wire + row = mem_db.execute( + "SELECT source, sub_type, state FROM traffic_events " + "WHERE source='state_511_atis'").fetchone() + assert row["sub_type"] == "accident" + # v0.5.9 GAMMA: state_511_atis is non-ID only -- helper now defaults to WA. + assert row["state"] == "WA" + + +# ============================================================================ +# (k) state_511_atis closure parses +# ============================================================================ + + +def test_k_state_511_closure_parses(mem_db, no_photon): + env = _state_511_env(layer="Closures", category_prefix="closure", + event_sub_type="roadConstruction", + is_full_closure=True, + external_id="ID:Closures:33950") + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + # roadConstruction -> road_works -> 🚧 + assert wire.startswith("🚧 New:") + assert "all lanes closed" in wire + + +# ============================================================================ +# (l) state_511_atis special_event parses +# ============================================================================ + + +def test_l_state_511_special_event_parses(mem_db, no_photon): + env = _state_511_env(layer="Special Events", + category_prefix="special_event", + event_sub_type="parade", + external_id="ID:SpecialEvents:42") + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + # parade -> πŸŽͺ + assert wire.startswith("πŸŽͺ New:") + assert "parade" in wire + + +# ============================================================================ +# (m) itd_511 incident parses +# ============================================================================ + + +def test_m_itd_511_incident_parses(mem_db, no_photon): + env = _itd_511_env(category_prefix="incident", + event_type_short="incident", + event_sub_type="crash") + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith("🚨 New:") + row = mem_db.execute( + "SELECT source, state FROM traffic_events " + "WHERE source='itd_511'").fetchone() + assert row["state"] == "ID" + + +# ============================================================================ +# (n) decoupled callback -- cold-start scenario still emits New: on second pass +# ============================================================================ + + +def test_n_cold_start_then_resume_still_new(mem_db, no_photon): + env = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + wire1 = handle_incident(env, env["subject"], data=data1, now=1_000_000) + assert wire1.startswith("πŸš— New:") + # Cold-start grace drops the broadcast -- DO NOT call _commit(). + + # 5 minutes later, same incident republishes. + data2 = {} + wire2 = handle_incident(env, env["subject"], data=data2, now=1_000_300) + assert wire2 is not None + assert wire2.startswith("πŸš— New:"), \ + "must still be New: until commit callback fires" + + row = mem_db.execute( + "SELECT last_broadcast_at, last_broadcast_magnitude " + "FROM traffic_events WHERE source='tomtom_incidents'").fetchone() + assert row["last_broadcast_at"] is None + assert row["last_broadcast_magnitude"] is None + + +# ============================================================================ +# (o) traffic_events row gets UPSERTed on each pass; event_log handled flips +# ============================================================================ + + +def test_o_traffic_events_upsert_and_event_log_handled_flip(mem_db, no_photon): + env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + handle_incident(env1, env1["subject"], data=data1, now=1_000_000) + + # event_log row exists with handled=0 BEFORE callback. + el_pre = mem_db.execute( + "SELECT handled FROM event_log " + "WHERE source='tomtom_incidents' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert el_pre["handled"] == 0 + + _commit(data1, 1_000_001) + + el_post = mem_db.execute( + "SELECT handled FROM event_log " + "WHERE source='tomtom_incidents' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert el_post["handled"] == 1 + + fr_post = mem_db.execute( + "SELECT last_broadcast_at, last_broadcast_magnitude, " + "last_broadcast_delay_seconds, last_broadcast_icon_category " + "FROM traffic_events WHERE source='tomtom_incidents'" + ).fetchone() + assert fr_post["last_broadcast_at"] == 1_000_001 + assert fr_post["last_broadcast_magnitude"] == 2 + assert fr_post["last_broadcast_delay_seconds"] == 300 + assert fr_post["last_broadcast_icon_category"] == "jam" + + # Re-publish: UPSERT updates current_* but doesn't touch last_broadcast_*. + env2 = _tomtom_env(icon_category=6, magnitude=2, delay=500) + data2 = {} + wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300) + # v0.5.9 REVISED: no Update broadcasts regardless of delta size -- + # under the OLD rule this was 'delay 1.67x not enough', now it's + # 'we never re-broadcast'. + assert wire2 is None + fr2 = mem_db.execute( + "SELECT delay_seconds, last_broadcast_delay_seconds " + "FROM traffic_events WHERE source='tomtom_incidents'" + ).fetchone() + assert fr2["delay_seconds"] == 500 # UPSERT happened + assert fr2["last_broadcast_delay_seconds"] == 300 # last broadcast unchanged + + +# ============================================================================ +# v0.5.9 REVISED -- conservative gates +# ============================================================================ + + +def test_p_known_id_all_changed_no_broadcast(mem_db, no_photon): + """Regression guard for the new no-Update rule. Republish the SAME + external_id with magnitude AND delay AND icon all changed. The old rule + would have triggered Update on any one of those; the new rule fires + nothing.""" + env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300) + data1 = {} + wire1 = handle_incident(env1, env1["subject"], data=data1, now=1_000_000) + assert wire1.startswith("πŸš— New:") + _commit(data1, 1_000_001) + + env2 = _tomtom_env(icon_category=8, magnitude=4, delay=700) # ALL changed + data2 = {} + wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300) + assert wire2 is None, "no Update broadcasts under the v0.5.9 REVISED rule" + + # State still tracks the latest field values. + row = mem_db.execute( + "SELECT magnitude_of_delay, delay_seconds, icon_category " + "FROM traffic_events WHERE source='tomtom_incidents'").fetchone() + assert row["magnitude_of_delay"] == 4 + assert row["delay_seconds"] == 700 + assert row["icon_category"] == "road_closed" + + +# ============================================================================ +# Freshness gate (q/r/s) +# ============================================================================ + + +def _now_anchor_relative(start_age_seconds: int): + """Choose (now, start_time_iso) so that `now - parse(start_time) == + start_age_seconds`. Used by the freshness-gate tests.""" + import datetime as _dt + now = 2_000_000_000 # arbitrary fixed epoch >>1970 + start_iso = _dt.datetime.fromtimestamp( + now - start_age_seconds, tz=_dt.timezone.utc + ).strftime("%Y-%m-%dT%H:%M:%SZ") + return now, start_iso + + +def test_q_fresh_event_15min_ago_broadcasts(mem_db, no_photon): + """Event started 15 min ago -- WITHIN the 30-min fresh window. New: fires.""" + now, start_iso = _now_anchor_relative(15 * 60) + env = _tomtom_env(icon_category=6, magnitude=2, delay=300, + start_time=start_iso) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=now) + assert wire is not None + assert wire.startswith("πŸš— New:") + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"] + assert n_rows == 1 + + +def test_r_stale_event_45min_ago_dropped_no_row(mem_db, no_photon): + """Event started 45 min ago -- OUTSIDE the 30-min window. Drop AT + handler entrance, no UPSERT into traffic_events, event_log handled=0.""" + now, start_iso = _now_anchor_relative(45 * 60) + env = _tomtom_env(icon_category=6, magnitude=2, delay=300, + start_time=start_iso) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=now) + assert wire is None + + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"] + assert n_rows == 0, "no UPSERT for stale incidents" + + n_log = mem_db.execute( + "SELECT COUNT(*) AS n FROM event_log WHERE handled=0 " + "AND source='tomtom_incidents'").fetchone()["n"] + assert n_log == 1, "stale envelope must still be logged (handled=0)" + + +def test_s_null_start_time_default_allow(mem_db, no_photon): + """start_time missing -> default-allow (treat as fresh and broadcast).""" + env = _tomtom_env(icon_category=6, magnitude=2, delay=300, + start_time=None) + data = {} + wire = handle_incident(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith("πŸš— New:") + + +# ============================================================================ +# Per-source startTime field path (t) +# ============================================================================ + + +def test_t_state_511_start_date_path(mem_db, no_photon): + """state_511_atis pulls start_time from inner.data.start_date ('5/28/26, + 10:45 PM' format). Construct fresh (within 30 min) and stale (>30 min) + cases and confirm gate behavior is per-source-correct.""" + # 5 min ago in the 511-date format. + import datetime as _dt + now = int(_dt.datetime(2026, 6, 4, 12, 0, tzinfo=_dt.timezone.utc).timestamp()) + fresh_date = _dt.datetime.fromtimestamp(now - 5 * 60, + tz=_dt.timezone.utc).strftime( + "%-m/%-d/%y, %-I:%M %p") + env = _state_511_env(layer="Incidents", category_prefix="incident", + event_sub_type="crash") + # Replace the default start_date with a fresh one. + env["data"]["data"]["start_date"] = fresh_date + wire = handle_incident(env, env["subject"], data={}, now=now) + assert wire is not None + assert wire.startswith("🚨 New:") + + # Stale variant (45 min ago). + stale_date = _dt.datetime.fromtimestamp(now - 45 * 60, + tz=_dt.timezone.utc).strftime( + "%-m/%-d/%y, %-I:%M %p") + env2 = _state_511_env(layer="Incidents", category_prefix="incident", + event_sub_type="crash", + external_id="ID:Incidents:99999") + env2["data"]["data"]["start_date"] = stale_date + wire2 = handle_incident(env2, env2["subject"], data={}, now=now) + assert wire2 is None + + +def test_t_itd_511_start_epoch_path(mem_db, no_photon): + """itd_511 pulls start_time from inner.data.start_epoch (Unix int).""" + now = 1_700_000_000 + # Fresh (5 min ago) variant. + env = _itd_511_env(category_prefix="incident", + event_type_short="incident", + event_sub_type="crash") + env["data"]["data"]["start_epoch"] = now - 5 * 60 + wire = handle_incident(env, env["subject"], data={}, now=now) + assert wire is not None + assert wire.startswith("🚨 New:") + + # Stale variant (45 min ago). + env2 = _itd_511_env(category_prefix="incident", + event_type_short="incident", + event_sub_type="crash", + external_id="ITD:469:99999") + env2["data"]["data"]["start_epoch"] = now - 45 * 60 + wire2 = handle_incident(env2, env2["subject"], data={}, now=now) + assert wire2 is None + + +def test_t_tomtom_start_time_path(mem_db, no_photon): + """tomtom pulls start_time from inner.data.start_time (ISO-8601).""" + now, fresh_iso = _now_anchor_relative(5 * 60) + env = _tomtom_env(icon_category=6, magnitude=2, delay=300, + start_time=fresh_iso) + wire = handle_incident(env, env["subject"], data={}, now=now) + assert wire is not None + assert wire.startswith("πŸš— New:") + + _, stale_iso = _now_anchor_relative(45 * 60) + env2 = _tomtom_env(icon_category=6, magnitude=2, delay=300, + start_time=stale_iso, + tti="11111111-2222-3333-4444-555555555555") + wire2 = handle_incident(env2, env2["subject"], data={}, now=now) + assert wire2 is None + + +# ============================================================================ +# v0.5.9 GAMMA -- two-sided freshness gate + state_511 ID skip +# ============================================================================ + + +def test_u_future_scheduled_event_dropped(mem_db, no_photon): + """itd_511 work_zone envelopes can carry start_epoch in the future + (scheduled construction). The v0.5.9 REVISED one-sided gate let those + slip through. The GAMMA fix rejects negative ages too.""" + import datetime as _dt + now = 2_000_000_000 + # start_epoch 8 hours in the future + env = _itd_511_env(category_prefix="incident", + event_type_short="incident", + event_sub_type="crash", + external_id="ITD:future:1") + env["data"]["data"]["start_epoch"] = now + 8 * 3600 + wire = handle_incident(env, env["subject"], data={}, now=now) + assert wire is None + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"] + assert n_rows == 0 + n_log = mem_db.execute( + "SELECT COUNT(*) AS n FROM event_log WHERE handled=0 " + "AND source='itd_511'").fetchone()["n"] + assert n_log == 1 + + +def test_v_state_511_id_skipped_at_handler_entrance(mem_db, no_photon): + """state_511_atis with state_code='ID' is skipped at handler entrance -- + no parse, no traffic_events row, event_log records the skip.""" + env = _state_511_env(layer="Incidents", category_prefix="incident", + event_sub_type="crash") + # Override defaults (post-GAMMA helper defaults to WA) to ID for this test. + env["data"]["data"]["state_code"] = "ID" + env["data"]["data"]["state"] = "Idaho" + env["data"]["geo"]["primary_region"] = "US-ID" + wire = handle_incident(env, env["subject"], data={}, now=1_000_000) + assert wire is None + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"] + assert n_rows == 0 + row = mem_db.execute( + "SELECT category, handled FROM event_log " + "WHERE source='state_511_atis' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert row is not None + assert "|skip_id" in row["category"] + assert row["handled"] == 0 + + +def test_v_state_511_non_id_still_processed(mem_db, no_photon): + """Regression guard: state_511_atis with state_code='WA' (or anything + that's not ID) keeps going through the handler. After Idaho cutover + we still need state_511 for neighbor coverage.""" + env = _state_511_env(layer="Incidents", category_prefix="incident", + event_sub_type="crash", county="Spokane") + # Override state_code to WA. + env["data"]["data"]["state_code"] = "WA" + env["data"]["geo"]["primary_region"] = "US-WA" + wire = handle_incident(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert wire.startswith("🚨 New:") + # traffic_events row written for WA event. + row = mem_db.execute( + "SELECT state FROM traffic_events WHERE source='state_511_atis'" + ).fetchone() + assert row is not None + assert row["state"] == "WA" + + +def test_w_itd_511_future_scheduled_dropped_via_start_epoch(mem_db, no_photon): + """Direct exercise of the itd_511 start_epoch field path with a + future-scheduled value (the Phase-1 leak source). Confirms the gate + really uses start_epoch and the new two-sided check catches it.""" + import meshai.central.incident_handler as h + env = _itd_511_env(category_prefix="incident", + event_type_short="incident", + event_sub_type="crash", + external_id="ITD:future:work") + env["data"]["data"]["start_epoch"] = 99_000_000_000 # year ~5108 -- definitely future + se = h._extract_start_time_epoch(env, "itd_511") + assert se == 99_000_000_000 + now = 2_000_000_000 + wire = handle_incident(env, env["subject"], data={}, now=now) + assert wire is None diff --git a/tests/test_itd_511_work_zone.py b/tests/test_itd_511_work_zone.py new file mode 100644 index 0000000..59ed064 --- /dev/null +++ b/tests/test_itd_511_work_zone.py @@ -0,0 +1,137 @@ +"""Tests for the v0.5.9 GAMMA itd_511 work_zone parser in central_normalizer. + +Covers the cutover path: itd_511 supplies all Idaho work_zone broadcasts now +(state_511_atis ID is skipped). The parser must produce the same flat dict +shape as _parse_state_511_atis so format_work_zone_mesh works unchanged. +""" + +from datetime import datetime, timezone + +import pytest + +from meshai import central_normalizer as cn + + +# ---------- envelope builder --------------------------------------------- + + +def _itd_work_zone_env(*, roadway="SH-55", direction="North", + event_sub_type="roadConstruction", + is_full_closure=False, + external_id="ITD:469:42", + lat=44.103, lon=-116.110, + geocoder_city="McCall", + start_epoch=1780600000, + planned_end_epoch=1781200000, + description="Road construction on SH-55 Northbound from MM (140) to MM (145). 6/4/2026 7:00 AM to 6/11/2026 5:00 PM."): + return { + "id": external_id, + "subject": "central.traffic.work_zone.us.id", + "data": { + "id": external_id, "adapter": "itd_511", + "category": "work_zone.itd_511", "severity": 1, + "geo": {"centroid": [lon, lat], "primary_region": "US-ID"}, + "data": { + "event_type_short": "work_zone", + "event_sub_type": event_sub_type, + "roadway_name": roadway, "direction": direction, + "description": description, + "lanes_affected": "All lanes affected", + "is_full_closure": is_full_closure, + "itd_severity": "None", + "comment": "", + "cause": "roadwork", + "organization": "ERS", + "recurrence_text": "", + "recurrence_schedules": [], + "restrictions": {}, + "encoded_polyline": "", + "id_internal": 42, "source_id": "999", + "reported_epoch": start_epoch, + "last_updated_epoch": start_epoch, + "start_epoch": start_epoch, + "planned_end_epoch": planned_end_epoch, + "latitude": lat, "longitude": lon, + "_enriched": {"geocoder": { + "name": None, "city": geocoder_city, + "county": "Valley", "state": "ID", + "country": "United States", + "landclass": None, "elevation_m": 1530.0, + }}, + }, + }, + } + + +@pytest.fixture +def no_photon(monkeypatch): + monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: []) + if hasattr(cn, "_H3_NEAREST_CACHE"): + cn._H3_NEAREST_CACHE.clear() + + +# ---------- test (a): all fields populate ----------------------------------- + + +def test_itd_511_work_zone_parses_with_all_fields(no_photon): + env = _itd_work_zone_env() + n = cn.normalize(env) + assert n is not None + assert n["source"] == "itd_511" + assert n["road"] == "SH-55" + # _norm_direction returns 'northbound' (matches state_511 convention) + assert n["direction"] == "northbound" + assert n["mile_start"] == 140 + assert n["mile_end"] == 145 + assert n["sub_type"] is not None + # is_full_closure=False -> impact 'partial' matches state_511 convention + assert n["impact"] == "partial" + assert n["town"] == "McCall" + # ends_at is a datetime + assert isinstance(n["ends_at"], datetime) + + +def test_itd_511_work_zone_full_closure_impact(no_photon): + env = _itd_work_zone_env(is_full_closure=True) + n = cn.normalize(env) + assert n["impact"] == "full_closure" + + +def test_itd_511_work_zone_renderer_produces_wire(no_photon): + from meshai.notifications.renderers.work_zone import format_work_zone_mesh + env = _itd_work_zone_env() + n = cn.normalize(env) + wire = format_work_zone_mesh(n) + assert wire is not None + # Format matches state_511 convention: 🚧 emoji + road + assert "🚧" in wire + assert "SH-55" in wire + assert "McCall" in wire + + +def test_itd_511_work_zone_end_date_formatting(no_photon): + """planned_end_epoch should serialize to a datetime that the renderer + can format consistently with state_511.""" + env = _itd_work_zone_env(planned_end_epoch=1781200000) + n = cn.normalize(env) + assert isinstance(n["ends_at"], datetime) + assert n["ends_at"].tzinfo is not None # UTC-aware + + +def test_itd_511_work_zone_no_end_date(no_photon): + """planned_end_epoch == None or 0 -> ends_at is None.""" + env = _itd_work_zone_env(planned_end_epoch=None) + n = cn.normalize(env) + assert n["ends_at"] is None + + +def test_itd_511_incident_does_not_go_through_work_zone_parser(no_photon): + """The work_zone parser should NOT be invoked for category=incident.itd_511 + -- those still route through the incident_handler. normalize() returns + None (defer) for non-work_zone itd_511 categories.""" + env = _itd_work_zone_env() + env["data"]["category"] = "incident.itd_511" + n = cn.normalize(env) + # Either None (defer) or not a work_zone dict (no road/direction/etc). + if n is not None: + assert "_kind" in n # marker, not a parsed work_zone dict