feat(v0.5.9): unified incident pipeline + state_511_atis Idaho cutover + two-sided freshness gate

Coordinated change across the consumer dispatch layer + central_normalizer + new incident_handler + new itd_511 work_zone parser. The integrated story: Central PM filed a heads-up that ITD 511 publishes four EventTypes (work_zone, closure, incident, special_event) under us.id Convention A, and meshais v0.5.8 work focused on work_zone shape only -- meaning incidents (the higher-priority operational signal) were silently rendering as the v0.5.7-regression-style fallback. This v0.5.9 closes that gap by treating incidents + closures + special_events from all three traffic adapters (state_511_atis, itd_511, tomtom_incidents) as a single unified pipeline, while also migrating Idaho coverage from state_511_atis to itd_511 (the direct ITD feed) at the consumer level. Components: (1) new meshai/central/incident_handler.py routes incident/closure/special_event events through per-adapter parsers (tomtom, itd_511, state_511_atis-non-ID) to a canonical incident shape, then a single rendering pipeline with sub_type-aware emoji selection (jam/crash/road_closed/disabled_vehicle/parade/special_event/vehicle_fire/road_works). (2) Universal two-sided freshness gate in the consumer dispatch layer: only events with 0 <= age <= 1800s (default-allow on missing start_time) make it past the gate. Rejects both stale events (more than 30 min old) AND future-scheduled events (negative age -- a real itd_511 case for scheduled work projects). The gate sits ABOVE both incident_handler and the v0.5.8 work_zone formatter so all adapters get gated uniformly. (3) state_511_atis Idaho cutover -- both incident_handler and the v0.5.8 work_zone parser skip state_511_atis events where the state token is ID, deferring to itd_511 as the authoritative source. state_511_atis remains fully active for non-Idaho neighbor coverage (WA/OR/MT/UT/WY/NV) -- verified by Phase 2 WA broadcasts in the synthetic probe. (4) new itd_511 work_zone parser (extension to central_normalizer.py) consumes the itd_511 work_zone EventType and produces the same MEDIUM-style wire format as the existing state_511_atis work_zone parser (road + mile range + town + direction + sub_type + ends-at). (5) No Update: broadcasts in the incident pipeline -- per Matts call, real-time traffic Updates (jam getting worse, delay growing) are not actionable for mesh users. State tracking continues via traffic_events UPSERT but only the first sighting of an external_id ever fires a New: broadcast. WFIGS handler unchanged -- fires keep their 8h-rate-limited Update: behavior since acres growth IS operationally meaningful (evacuation decisions). Forecast: 3-10 mesh broadcasts/day in Idaho, all New:. Cross-check: original raw broadcast count was 623 against a fixed-clock 49-min synthetic window; after v0.5.9 REVISED (no Updates) it dropped to 18; after v0.5.9 GAMMA (two-sided gate + Idaho cutover) it dropped to 9. Test count: was 589 baseline, +45 net new -- 634 passing. Synthetic probe verified all four phases: Phase 1 (replay 3032 captured envelopes) = 0 broadcasts (correctly suppressed); Phase 2 (synthesized fresh non-ID + ID) = 7 broadcasts; Phase 3 (synthesized fresh itd_511 work_zone) = 2 broadcasts; Phase 4 (synthesized fresh ID for explicit ID-skip exercise) = caught by ID-skip 1/1. Master stays off in prod; no toggle flips.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-06-05 06:41:21 +00:00
commit 0099d0fd94
8 changed files with 2088 additions and 12 deletions

View file

@ -421,6 +421,74 @@ class CentralConsumer:
try: try:
from meshai.central_normalizer import normalize as _norm_envelope from meshai.central_normalizer import normalize as _norm_envelope
from meshai.notifications.renderers.work_zone import format_work_zone_mesh from meshai.notifications.renderers.work_zone import format_work_zone_mesh
# v0.5.9 unified incident pipeline -- tomtom_incidents +
# state_511_atis + itd_511 for incident/closure/special_event
# categories all flow through meshai.central.incident_handler.
# state_511_atis with category=work_zone stays on the v0.5.8
# _parse_state_511_atis path below.
_adapter_v9 = inner.get("adapter") or ""
if (
_adapter_v9 in ("tomtom_incidents", "state_511_atis", "itd_511")
and (cat_raw.startswith("incident.")
or cat_raw.startswith("closure.")
or cat_raw.startswith("special_event."))
):
from meshai.central.incident_handler import handle_incident
synthesized = handle_incident(envelope, subject, data=data) or None
else:
# v0.5.9 GAMMA: state_511_atis Idaho cutover via helper.
# Applies BEFORE normalize+dispatch so neither the work_zone
# renderer nor the incident_handler ever sees an ID-state_511
# envelope.
from meshai.central_normalizer import (
should_skip_state_511_atis_id as _skip_s5_id,
)
# v0.5.9 GAMMA universal freshness gate -- applies to ALL
# incident-pipeline adapters BEFORE dispatch, so itd_511
# work_zone (which goes through central_normalizer +
# format_work_zone_mesh, NOT handle_incident) is now also
# gated. Per-source field paths defined in the helper.
from meshai.central_normalizer import (
is_incident_envelope_stale as _stale_check,
)
if _stale_check(envelope, now=int(time.time())):
try:
from meshai.persistence import get_db as _get_db_st
_conn_st = _get_db_st()
_conn_st.execute(
"INSERT INTO event_log(received_at, source, "
"category, severity_word, event_id_external, "
"nats_subject, handled, table_name, table_pk) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(int(time.time()),
inner.get("adapter") or "",
cat_raw + "|freshness_drop", None,
inner.get("id"),
subject, 0, None, None),
)
except Exception:
logger.exception("freshness_drop log failed")
synthesized = None
n = None
elif _skip_s5_id(envelope):
try:
from meshai.persistence import get_db as _get_db_skip
_conn_skip = _get_db_skip()
_conn_skip.execute(
"INSERT INTO event_log(received_at, source, "
"category, severity_word, event_id_external, "
"nats_subject, handled, table_name, table_pk) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(int(time.time()), "state_511_atis",
cat_raw + "|skip_id", None,
inner.get("id"),
subject, 0, None, None),
)
except Exception:
logger.exception("state_511_id_skip log failed")
synthesized = None
n = None # short-circuit downstream dispatch
else:
n = _norm_envelope(envelope) n = _norm_envelope(envelope)
# v0.5.8 wfigs_handler dispatch -- WFIGS events route through # v0.5.8 wfigs_handler dispatch -- WFIGS events route through
# the persistence-backed change-detection handler (which also # the persistence-backed change-detection handler (which also

View file

@ -0,0 +1,745 @@
"""v0.5.9 unified incident handler.
Three sources collapse into one persistence-backed change-detection pipeline:
* tomtom_incidents (real-time crashes/jams/closures, TTI-uuid stable ID)
* state_511_atis (ITD incidents + closures + special events --
the EventType branching the v0.5.8 parser missed)
* itd_511 (ITD's newer direct feed, Convention A subject)
State_511_atis with category=work_zone continues to flow through the existing
v0.5.8 _parse_state_511_atis -> work_zone renderer (untouched). Only the
THREE non-work-zone EventTypes route here.
Filtering at handler entrance (Matt's v0.5.9 §6):
* tomtom magnitude_of_delay==0 -> drop (no event_log row, no broadcast)
* tomtom time_validity != "present" -> drop
* everything else flows into the canonical pipeline.
Change-detection (Matt's §5):
* NEW external_id -> 'New:'
* magnitude steps up -> 'Update:'
* delay doubles (>=2x) -> 'Update:'
* icon_category changes -> 'Update:'
* 8h elapsed since last bcast -> 'Update:' (heartbeat)
* otherwise -> drop silently
Same callback pattern as WFIGS: handler attaches `_on_broadcast_committed`
to data; dispatcher invokes it AFTER successful deliver(). Cold-start
suppression leaves last_broadcast_* NULL so the next successful broadcast
still labels itself New:.
"""
from __future__ import annotations
import logging
import re
import time
from datetime import datetime, timezone
from typing import Any, Optional
from meshai.persistence import get_db
logger = logging.getLogger(__name__)
# v0.5.9 REVISED freshness gate -- drop incidents that started more
# than 30 min ago. Default-allow when start_time is missing so we err
# on the side of broadcasting potentially-fresh data.
INCIDENT_FRESHNESS_MAX_S = 30 * 60 # 1800
# Heartbeat retained as a constant for backward-compatible imports, but
# the v0.5.9 REVISED handler no longer fires Update broadcasts. State
# tracking continues to UPSERT current_* columns; the dispatcher just
# stops getting wire strings after the first New: broadcast per
# external_id. See WFIGS handler if you want post-first-broadcast
# behavior (fires keep their 8h-rate-limited Update flow).
INCIDENT_BROADCAST_HEARTBEAT_S = 8 * 60 * 60 # 28800 (unused)
# ---- canonical sub_type vocabulary --------------------------------------
# Tomtom icon_category int -> canonical sub_type. Anything missing maps to
# the generic 'incident' bucket so the wire string is never empty.
_TOMTOM_ICON_TO_SUB = {
0: "incident", # unknown
1: "accident",
2: "fog",
3: "danger",
4: "rain",
5: "ice",
6: "jam",
7: "lane_closed",
8: "road_closed",
9: "road_works",
10: "wind",
11: "flooding",
12: "broken_down",
14: "incident", # cluster
}
# state_511 / itd_511 event_sub_type string -> canonical sub_type.
# Coverage in the 7-day Idaho sample shown after each entry.
_SUB_TYPE_511_MAP = {
"crash": "accident", # 28/41
"incident": "incident", # 1/41
"debrisOnRoadway": "debris", # 1/41
"disabledVehicle": "disabled_vehicle", # 1/41
"vehicleOnFire": "vehicle_on_fire", # 2/41
"wildfire": "incident", # 1/41
"wildfireInArea": "incident", # 1/41
"leftLaneBlocked": "lane_closed", # 2/41
"onRampBlocked": "ramp_closed", # 2/41
"roadwayBlocked": "road_closed", # 2/41
"roadConstruction": "road_works",
"pavementMarkingOperations": "road_works",
"pavementMarkingOperations ": "road_works", # trailing-space variant
"utilityWork": "road_works",
"singleLineTraffic:AlternatingDirections": "lane_closed",
"roadMaintenanceOperations": "road_works",
"pavingOperations": "road_works",
"bridgeConstruction": "road_works",
"bridgeMaintenanceOperations": "road_works",
"flaggingOperation": "lane_closed",
"brushControl": "road_works",
"constructionWork": "road_works",
"guardrailRepairs": "road_works",
"workOnTheShoulder": "road_works",
"nightTimeConstructionWork": "road_works",
"bridgeInspectionWork": "road_works",
"longTermRoadConstruction": "road_works",
"workOnUndergroundServices": "road_works",
"roadsideCleanupCrew": "road_works",
"RampRestriction": "lane_closed",
"parade": "parade",
}
# Emoji per canonical sub_type.
_SUB_TYPE_EMOJI = {
"accident": "🚨",
"jam": "🚗",
"road_closed": "🚫",
"closure": "🚫",
"road_works": "🚧",
"lane_closed": "🟠",
"ramp_closed": "🟠",
"debris": "⚠️",
"vehicle_on_fire": "🔥",
"disabled_vehicle": "🛑",
"ice": "⚠️",
"fog": "⚠️",
"flooding": "🌊",
"wind": "🌬️",
"broken_down": "🛞",
"danger": "⚠️",
"rain": "⚠️",
"incident": "⚠️",
"special_event": "🎪",
"parade": "🎪",
}
# Human-readable noun phrase per canonical sub_type.
_SUB_TYPE_PHRASE = {
"accident": "crash",
"jam": "jam",
"road_closed": "road closed",
"closure": "closure",
"road_works": "road works",
"lane_closed": "lane closed",
"ramp_closed": "ramp closed",
"debris": "debris on roadway",
"vehicle_on_fire": "vehicle fire",
"disabled_vehicle": "disabled vehicle",
"ice": "icy conditions",
"fog": "fog",
"flooding": "flooding",
"wind": "high wind",
"broken_down": "broken-down vehicle",
"danger": "dangerous conditions",
"rain": "heavy rain",
"incident": "incident",
"special_event": "special event",
"parade": "parade",
}
# ---- helpers -------------------------------------------------------------
def _now() -> int:
return int(time.time())
def _direction_short(dir_str: Optional[str]) -> Optional[str]:
if not dir_str: return None
s = str(dir_str).strip().lower()
if s.startswith("north"): return "N"
if s.startswith("south"): return "S"
if s.startswith("east"): return "E"
if s.startswith("west"): return "W"
if s in ("nb", "n"): return "N"
if s in ("sb", "s"): return "S"
if s in ("eb", "e"): return "E"
if s in ("wb", "w"): return "W"
if s in ("both", "both directions"): return "both"
return None
def _parse_iso_epoch(s: Optional[str]) -> Optional[int]:
if not s: return None
try:
return int(datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp())
except Exception:
return None
def _parse_511_date_epoch(s: Optional[str]) -> Optional[int]:
"""state_511 uses '5/28/26, 10:45 PM' format."""
if not s: return None
try:
return int(datetime.strptime(s, "%m/%d/%y, %I:%M %p").replace(
tzinfo=timezone.utc).timestamp())
except Exception:
return None
_TTI_RE = re.compile(r"TTI-([0-9a-f-]{36})")
def _tomtom_tti(envelope_id: Optional[str]) -> Optional[str]:
"""Extract the stable TTI-<uuid> piece from the per-poll inner.id.
TomTom IDs look like:
ID:tomtom:TTI-<uuid>-TTR<numeric>
The TTR<numeric> rotates each poll; the TTI-uuid stays stable across
re-publishes of the same incident.
"""
if not envelope_id: return None
m = _TTI_RE.search(envelope_id)
return m.group(1) if m else envelope_id
def _tomtom_direction_from_description(desc: Optional[str]) -> Optional[str]:
if not desc: return None
s = desc.lower()
if "northbound" in s: return "N"
if "southbound" in s: return "S"
if "eastbound" in s: return "E"
if "westbound" in s: return "W"
return None
def _tomtom_road_label(d: dict) -> Optional[str]:
"""Compose 'I-84 W' style label from road_numbers + direction."""
nums = d.get("road_numbers") or []
if nums:
return str(nums[0])
return None # caller falls back to from/to or street name
# ---- per-source parsers --------------------------------------------------
def _parse_tomtom_incident(envelope: dict, now: int) -> Optional[dict]:
"""Returns the canonical incident dict, or None if filtered."""
inner = envelope.get("data") or {}
d = inner.get("data") or {}
# FILTER §6: magnitude_of_delay == 0 -> drop at handler entrance.
magnitude = d.get("magnitude_of_delay")
if magnitude == 0:
return None
# FILTER §4: time_validity != 'present' -> drop past/future.
if d.get("time_validity") != "present":
return None
external_id = _tomtom_tti(inner.get("id"))
if not external_id:
return None
icon = d.get("icon_category")
sub_type = _TOMTOM_ICON_TO_SUB.get(icon, "incident")
delay_s = d.get("delay")
delay_minutes = None
if isinstance(delay_s, (int, float)) and delay_s > 0:
delay_minutes = max(1, int(round(delay_s / 60)))
ge = (d.get("_enriched") or {}).get("geocoder") or {}
return {
"_kind": "incident",
"source": "tomtom_incidents",
"external_id": external_id,
"category_kind": "incident",
"road": _tomtom_road_label(d) or (d.get("from") or "road"),
"direction": _tomtom_direction_from_description(d.get("description")),
"mile_start": None,
"mile_end": None,
"county": ge.get("county"),
"state": d.get("state_code"),
"lat": d.get("latitude"),
"lon": d.get("longitude"),
"sub_type": sub_type,
"impact": None,
"delay_minutes": delay_minutes,
"delay_seconds": int(delay_s) if isinstance(delay_s, (int, float)) else None,
"magnitude": magnitude,
"icon_category": sub_type,
"from_loc": d.get("from"),
"to_loc": d.get("to"),
"start_at": _parse_iso_epoch(d.get("start_time")),
"end_at": _parse_iso_epoch(d.get("end_time")),
"geocoder_city": ge.get("city"),
"landclass": ge.get("landclass"),
}
def _parse_state_511_incident(envelope: dict, category_raw: str, now: int) -> Optional[dict]:
"""Handle state_511_atis with category in (incident, closure, special_event).
Returns None for unsupported categories (work_zone is NOT handled here --
that stays with the existing v0.5.8 _parse_state_511_atis path)."""
inner = envelope.get("data") or {}
d = inner.get("data") or {}
if category_raw.startswith("incident."): kind = "incident"
elif category_raw.startswith("closure."): kind = "closure"
elif category_raw.startswith("special_event."): kind = "special_event"
else: return None
external_id = inner.get("id")
if not external_id:
return None
raw_sub = (d.get("event_sub_type") or "").strip()
sub_type = _SUB_TYPE_511_MAP.get(raw_sub)
if sub_type is None:
if kind == "closure" or d.get("is_full_closure"):
sub_type = "closure"
elif kind == "special_event":
sub_type = "special_event"
else:
sub_type = "incident"
ge = (d.get("_enriched") or {}).get("geocoder") or {}
return {
"_kind": "incident",
"source": "state_511_atis",
"external_id": external_id,
"category_kind": kind,
"road": d.get("roadway_name"),
"direction": _direction_short(d.get("direction")),
"mile_start": None,
"mile_end": None,
"county": d.get("county") or ge.get("county"),
"state": d.get("state_code"),
"lat": d.get("latitude"),
"lon": d.get("longitude"),
"sub_type": sub_type,
"impact": "all lanes closed" if d.get("is_full_closure") else None,
"delay_minutes": None,
"delay_seconds": None,
"magnitude": None,
"icon_category": sub_type,
"from_loc": None,
"to_loc": None,
"start_at": _parse_511_date_epoch(d.get("start_date")),
"end_at": None,
"geocoder_city": ge.get("city"),
"landclass": ge.get("landclass"),
}
def _parse_itd_511_incident(envelope: dict, category_raw: str, now: int) -> Optional[dict]:
inner = envelope.get("data") or {}
d = inner.get("data") or {}
if category_raw.startswith("incident."): kind = "incident"
elif category_raw.startswith("closure."): kind = "closure"
elif category_raw.startswith("special_event."): kind = "special_event"
else: return None
external_id = inner.get("id")
if not external_id:
return None
raw_sub = (d.get("event_sub_type") or "").strip()
sub_type = _SUB_TYPE_511_MAP.get(raw_sub)
if sub_type is None:
# ITD has event_type_short ("closure", "incident", "work_zone",
# "special_event"); fall through to that.
sub_type = {
"incident": "incident",
"closure": "closure",
"work_zone": "road_works",
"special_event": "special_event",
}.get((d.get("event_type_short") or "").lower(), "incident")
ge = (d.get("_enriched") or {}).get("geocoder") or {}
return {
"_kind": "incident",
"source": "itd_511",
"external_id": external_id,
"category_kind": kind,
"road": d.get("roadway_name"),
"direction": _direction_short(d.get("direction")),
"mile_start": None,
"mile_end": None,
"county": ge.get("county"),
"state": "ID",
"lat": d.get("latitude"),
"lon": d.get("longitude"),
"sub_type": sub_type,
"impact": "all lanes closed" if d.get("is_full_closure") else None,
"delay_minutes": None,
"delay_seconds": None,
"magnitude": None,
"icon_category": sub_type,
"from_loc": None,
"to_loc": None,
"start_at": d.get("start_epoch"),
"end_at": d.get("planned_end_epoch"),
"geocoder_city": ge.get("city"),
"landclass": ge.get("landclass"),
}
def _extract_start_time_epoch(envelope: dict, adapter: str) -> Optional[int]:
"""Per-source start-time -> epoch seconds. None when the field is
missing or unparseable (caller treats None as 'do not gate')."""
inner = envelope.get("data") or {}
d = inner.get("data") or {}
if adapter == "tomtom_incidents":
# tomtom inner.data.start_time is ISO-8601 ("2026-06-01T21:08:11Z")
return _parse_iso_epoch(d.get("start_time"))
if adapter == "state_511_atis":
# state_511 uses "5/28/26, 10:45 PM" in inner.data.start_date
return _parse_511_date_epoch(d.get("start_date"))
if adapter == "itd_511":
# itd_511 carries start_epoch as a Unix epoch integer
val = d.get("start_epoch")
if isinstance(val, (int, float)) and val > 0:
return int(val)
return None
return None
# ---- main entry point ----------------------------------------------------
def handle_incident(envelope: dict, subject: str,
data: Optional[dict] = None,
now: Optional[int] = None) -> Optional[str]:
"""Unified incident handler. Returns the wire string when a broadcast
should fire, None otherwise."""
if not isinstance(envelope, dict):
return None
inner = envelope.get("data") or {}
adapter = inner.get("adapter") or ""
category_raw = inner.get("category") or ""
severity_word = _coerce_severity(inner.get("severity"))
now = now if now is not None else _now()
try:
conn = get_db()
except Exception:
logger.exception("incident_handler: persistence unavailable")
return None
# v0.5.9 GAMMA: state_511_atis Idaho cutover -- itd_511 is the
# authoritative ID source. state_511_atis remains active for non-ID
# neighbor coverage (WA, OR, MT). Skip ID events at handler entrance
# with event_log handled=0 reason='state_511_atis_id_replaced_by_itd_511'
# so the accounting trail makes the cutover visible.
if adapter == "state_511_atis":
sd = (envelope.get("data") or {}).get("data") or {}
sgeo = (envelope.get("data") or {}).get("geo") or {}
if (sd.get("state_code") == "ID"
or sgeo.get("primary_region") == "US-ID"):
_log_event(conn, now=now, source="state_511_atis",
category=category_raw + "|skip_id",
severity_word=severity_word,
event_id_external=inner.get("id"),
subject=subject, handled=0,
table_name=None, table_pk=None)
return None
# v0.5.9 REVISED gate (B): freshness check at handler entrance.
# Computed BEFORE per-source parse + before the mag=0/past/future
# filters, so stale envelopes never UPSERT into traffic_events.
# Missing start_time -> default-allow (treat as fresh).
start_epoch = _extract_start_time_epoch(envelope, adapter)
if start_epoch is not None:
age_s = now - start_epoch
# v0.5.9 GAMMA: reject FUTURE-scheduled events (age < 0) as well
# as stale events (age > window). itd_511 work_zone envelopes can
# carry start_epoch many days in the future for scheduled
# construction projects; under the previous one-sided check
# those slipped through. Spec re-read: 'skip even New: broadcast
# if the underlying event began more than 30 min ago' implies
# the event must have BEGUN.
if age_s < 0 or age_s > INCIDENT_FRESHNESS_MAX_S:
logger.debug(
"incident freshness gate: dropping source=%s subject=%s "
"age=%ds (window=[0, %d])",
adapter, subject, age_s, INCIDENT_FRESHNESS_MAX_S,
)
_log_event(conn, now=now, source=adapter, category=category_raw,
severity_word=severity_word,
event_id_external=inner.get("id"),
subject=subject, handled=0,
table_name=None, table_pk=None)
return None
# Per-source parse (returns None when filtered).
if adapter == "tomtom_incidents":
n = _parse_tomtom_incident(envelope, now)
elif adapter == "state_511_atis":
n = _parse_state_511_incident(envelope, category_raw, now)
elif adapter == "itd_511":
n = _parse_itd_511_incident(envelope, category_raw, now)
else:
return None
if n is None:
# Filtered envelope -- log to event_log handled=0 with no fires/
# traffic_events row, no broadcast. Lets us account for upstream
# noise without polluting the broadcast pipeline.
_log_event(conn, now=now, source=adapter, category=category_raw,
severity_word=severity_word,
event_id_external=inner.get("id"),
subject=subject, handled=0,
table_name=None, table_pk=None)
return None
external_id = n["external_id"]
source = n["source"]
pk_combined = f"{source}|{external_id}"
log_id = _log_event_returning_id(
conn, now=now, source=source, category=category_raw,
severity_word=severity_word,
event_id_external=external_id,
subject=subject, handled=0,
table_name="traffic_events", table_pk=pk_combined)
row = conn.execute(
"SELECT first_seen_at, last_seen_at, last_broadcast_at, "
"last_broadcast_magnitude, last_broadcast_delay_seconds, "
"last_broadcast_icon_category FROM traffic_events "
"WHERE source=? AND external_id=?",
(source, external_id),
).fetchone()
if row is None:
# NEW external_id -- INSERT, return 'New:' wire, callback updates
# last_broadcast_* on dispatcher commit.
conn.execute(
"INSERT INTO traffic_events(source, external_id, road, direction, "
"mile_start, mile_end, county, state, lat, lon, sub_type, impact, "
"start_at, end_at, first_seen_at, last_seen_at, last_broadcast_at, "
"magnitude_of_delay, delay_seconds, icon_category, "
"last_broadcast_magnitude, last_broadcast_delay_seconds, "
"last_broadcast_icon_category) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(source, external_id, n["road"], n["direction"],
n["mile_start"], n["mile_end"], n["county"], n["state"],
n["lat"], n["lon"], n["sub_type"], n["impact"],
n["start_at"], n["end_at"], now, now, None,
n["magnitude"], n["delay_seconds"], n["icon_category"],
None, None, None),
)
wire = _render(n, prefix="New")
_attach_commit_handles(data, source=source, external_id=external_id,
magnitude=n["magnitude"],
delay_seconds=n["delay_seconds"],
icon_category=n["icon_category"],
event_log_row_id=log_id)
return wire
# EXISTING incident -- always UPSERT current fields + last_seen_at.
conn.execute(
"UPDATE traffic_events SET sub_type=?, impact=?, magnitude_of_delay=?, "
"delay_seconds=?, icon_category=?, last_seen_at=?, "
"lat=COALESCE(?, lat), lon=COALESCE(?, lon), "
"direction=COALESCE(?, direction), road=COALESCE(?, road) "
"WHERE source=? AND external_id=?",
(n["sub_type"], n["impact"], n["magnitude"], n["delay_seconds"],
n["icon_category"], now, n["lat"], n["lon"],
n["direction"], n["road"], source, external_id),
)
last_bcast_at = row["last_broadcast_at"]
last_bcast_mag = row["last_broadcast_magnitude"]
last_bcast_delay = row["last_broadcast_delay_seconds"]
last_bcast_icon = row["last_broadcast_icon_category"]
# Cold-start race: row exists from a prior INSERT but the dispatcher
# dropped the broadcast (grace, cooldown, etc.). last_broadcast_at is
# still NULL -> the next successful broadcast still labels itself New:.
if last_bcast_at is None:
wire = _render(n, prefix="New")
_attach_commit_handles(data, source=source, external_id=external_id,
magnitude=n["magnitude"],
delay_seconds=n["delay_seconds"],
icon_category=n["icon_category"],
event_log_row_id=log_id)
return wire
# v0.5.9 REVISED gate (A): once we've successfully broadcast this
# external_id (last_broadcast_at IS NOT NULL), no further mesh
# traffic for it -- magnitude jumps, delay growth, icon flips, and
# heartbeats all stay in the table for state queries but do NOT
# synthesize a wire string. Matt's reasoning: 'should be no old
# broadcasts, just new' -- traffic updates aren't actionable enough
# to justify spamming. WFIGS keeps its 8h Update flow (operationally
# meaningful for fires).
return None
# ---- commit-callback factory --------------------------------------------
def _attach_commit_handles(data: Optional[dict], *, source: str,
external_id: str,
magnitude: Optional[int],
delay_seconds: Optional[int],
icon_category: Optional[str],
event_log_row_id: Optional[int]) -> None:
if not isinstance(data, dict):
return
def _on_commit(committed_at: float) -> None:
try:
conn = get_db()
except Exception:
logger.exception("incident commit callback: persistence unavailable")
return
conn.execute(
"UPDATE traffic_events SET last_broadcast_at=?, "
"last_broadcast_magnitude=?, last_broadcast_delay_seconds=?, "
"last_broadcast_icon_category=? "
"WHERE source=? AND external_id=?",
(int(committed_at), magnitude, delay_seconds, icon_category,
source, external_id),
)
if event_log_row_id is not None:
conn.execute("UPDATE event_log SET handled=1 WHERE id=?",
(int(event_log_row_id),))
data["_on_broadcast_committed"] = _on_commit
data["_broadcast_audit"] = {"table": "traffic_events",
"pk": f"{source}|{external_id}"}
# ---- event_log helpers ---------------------------------------------------
def _coerce_severity(sev: Any) -> Optional[str]:
if sev is None: return None
if isinstance(sev, str): return sev or None
try: return str(int(sev))
except (TypeError, ValueError): return str(sev)
def _log_event(conn, *, now, source, category, severity_word,
event_id_external, subject, handled,
table_name, table_pk) -> None:
conn.execute(
"INSERT INTO event_log(received_at, source, category, severity_word, "
"event_id_external, nats_subject, handled, table_name, table_pk) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(now, source, category, severity_word, event_id_external, subject,
int(bool(handled)), table_name, table_pk),
)
def _log_event_returning_id(conn, *, now, source, category, severity_word,
event_id_external, subject, handled,
table_name, table_pk) -> int:
cur = conn.execute(
"INSERT INTO event_log(received_at, source, category, severity_word, "
"event_id_external, nats_subject, handled, table_name, table_pk) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(now, source, category, severity_word, event_id_external, subject,
int(bool(handled)), table_name, table_pk),
)
return int(cur.lastrowid)
# ---- renderer ------------------------------------------------------------
def _render(n: dict, *, prefix: str = "") -> str:
"""MEDIUM-style wire string. Pattern:
{emoji} {prefix}: {road} near {anchor}: {phrase}{impact}{delay}{coords}
`delay` segment is OMITTED when delay_minutes is None (Matt's §3).
`coords` segment is omitted when lat/lon are None (state_511 closures
sometimes lack coords).
"""
sub_type = n.get("sub_type") or "incident"
emoji = _SUB_TYPE_EMOJI.get(sub_type, "⚠️")
phrase = _SUB_TYPE_PHRASE.get(sub_type, "incident")
road = n.get("road") or "road"
direction = n.get("direction")
if direction and direction != "both" and direction not in str(road):
road_label = f"{road} {direction}"
else:
road_label = road
anchor = _location_anchor(n)
delay_minutes = n.get("delay_minutes")
delay_seg = f", {delay_minutes} min delay" if delay_minutes else ""
impact = n.get("impact")
impact_seg = f", {impact}" if impact else ""
lat = n.get("lat")
lon = n.get("lon")
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
coords = f", @ {lat:.3f},{lon:.3f}"
else:
coords = ""
prefix_str = f"{prefix}: " if prefix else ""
return f"{emoji} {prefix_str}{road_label} near {anchor}: {phrase}{impact_seg}{delay_seg}{coords}"
def _location_anchor(n: dict) -> str:
"""Anchor priority: geocoder.city > nearest_town > landclass > county."""
city = n.get("geocoder_city")
if city:
return str(city)
lat = n.get("lat")
lon = n.get("lon")
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
try:
from meshai.central_normalizer import nearest_town
nt = nearest_town(lat, lon, max_distance_mi=100.0)
except Exception:
nt = None
if nt and nt.get("name"):
town = nt["name"]
d = nt.get("distance_mi")
if isinstance(d, (int, float)):
if d < 1: return f"near {town}"
bearing = nt.get("bearing") or ""
return f"{int(round(d))} mi {bearing} of {town}".strip()
return str(town)
landclass = n.get("landclass")
if landclass:
return str(landclass)
county = n.get("county")
state = n.get("state")
if county and state: return f"{county} Co {state}"
if state: return str(state)
return "(location unknown)"

View file

@ -740,6 +740,73 @@ def _parse_wfigs_incidents(inner_data: dict, geo: dict) -> dict:
} }
# ---------- itd_511 work_zone parser (v0.5.9 GAMMA) ----------------------
def _itd_ends_at(planned_end_epoch) -> Optional[datetime]:
"""itd_511 stores planned_end_epoch as a Unix int (or None)."""
if not isinstance(planned_end_epoch, (int, float)) or planned_end_epoch <= 0:
return None
try:
return datetime.fromtimestamp(int(planned_end_epoch), tz=timezone.utc)
except (ValueError, OSError):
return None
def _parse_itd_511_work_zone(inner_data: dict, geo: dict) -> dict:
"""Normalize an itd_511 work_zone (or closure-acting-as-work-zone)
envelope into the work_zone renderer's flat dict shape.
Mirrors _parse_state_511_atis output: same keys, same town/distance
fallback chain. The renderer consumes both via format_work_zone_mesh.
"""
desc_raw = inner_data.get("description") or ""
desc = _clean_description(desc_raw)
mile_start, mile_end = _parse_mile_posts(desc or "")
ends_at = _itd_ends_at(inner_data.get("planned_end_epoch"))
is_full = bool(inner_data.get("is_full_closure"))
impact = "full_closure" if is_full else "partial"
road = normalize_road_name(inner_data.get("roadway_name"))
if _is_uninformative_road(road):
road = None
event_lat = inner_data.get("latitude")
event_lon = inner_data.get("longitude")
if event_lat is None and geo.get("centroid"):
try: event_lon, event_lat = geo["centroid"][0], geo["centroid"][1]
except (IndexError, TypeError): pass
enriched = (inner_data.get("_enriched") or {}).get("geocoder") or {}
town = (enriched.get("city") or "").strip() or None
distance_mi: Optional[int] = None
bearing: Optional[str] = None
if town:
distance_mi, bearing = _compute_distance_bearing(event_lat, event_lon, town)
else:
nt = nearest_town(event_lat, event_lon) if event_lat is not None else None
if nt:
town = nt.get("name")
distance_mi = nt.get("distance_mi")
bearing = nt.get("bearing")
return {
"source": "itd_511",
"road": road,
"direction": _norm_direction(inner_data.get("direction")),
"mile_start": mile_start,
"mile_end": mile_end,
"description": desc,
"sub_type": _norm_sub_type(inner_data.get("event_sub_type")),
"impact": impact,
"ends_at": ends_at,
"town": town,
"distance_mi": distance_mi,
"bearing": bearing,
}
# ---------- public entry point -------------------------------------------- # ---------- public entry point --------------------------------------------
def normalize(envelope: dict) -> Optional[dict]: def normalize(envelope: dict) -> Optional[dict]:
@ -755,9 +822,20 @@ def normalize(envelope: dict) -> Optional[dict]:
geo = inner.get("geo") or {} geo = inner.get("geo") or {}
if adapter == "state_511_atis": if adapter == "state_511_atis":
# Parser stays pure: returns parsed dict for ALL states. The
# v0.5.9 GAMMA Idaho-cutover decision lives in the consumer
# (skip + event_log handled=0 before dispatching here). See
# should_skip_state_511_atis_id() below for the test-friendly
# helper that the consumer uses.
return _parse_state_511_atis(inner_data, geo) return _parse_state_511_atis(inner_data, geo)
if adapter == "wzdx": if adapter == "wzdx":
return _parse_wzdx_federal(inner_data, geo) return _parse_wzdx_federal(inner_data, geo)
# v0.5.9 GAMMA: itd_511 work_zone parser (incident/closure/special_event
# still route through incident_handler per v0.5.9; work_zone is the
# only EventType that uses the work_zone renderer + Format).
if adapter == "itd_511":
if (inner.get("category") or "").startswith("work_zone."):
return _parse_itd_511_work_zone(inner_data, geo)
# v0.5.8 WFIGS dispatch -- incidents + tombstones + perimeters. # v0.5.8 WFIGS dispatch -- incidents + tombstones + perimeters.
# The handler downstream uses _kind to route to change-detection # The handler downstream uses _kind to route to change-detection
@ -787,3 +865,93 @@ def normalize(envelope: dict) -> Optional[dict]:
# Other adapters await per-adapter parsers; return None to defer. # Other adapters await per-adapter parsers; return None to defer.
return None return None
def should_skip_state_511_atis_id(envelope: dict) -> bool:
"""v0.5.9 GAMMA decision helper: True when this envelope is a
state_511_atis publish for an Idaho event (state_code='ID' or
primary_region='US-ID').
Used by the consumer to decide 'skip + event_log handled=0' before
dispatching to either the work_zone renderer or the incident_handler.
Kept out of the parser so test_central_normalizer's existing ID
fixtures continue to exercise _parse_state_511_atis directly.
"""
if not isinstance(envelope, dict):
return False
inner = envelope.get("data") or {}
if (inner.get("adapter") or "") != "state_511_atis":
return False
d = inner.get("data") or {}
geo = inner.get("geo") or {}
return (d.get("state_code") == "ID"
or geo.get("primary_region") == "US-ID")
# ---------- v0.5.9 GAMMA universal freshness helper -----------------------
def _parse_iso_epoch_freshness(s: Optional[str]) -> Optional[int]:
"""Local copy of the ISO parser used by the universal freshness gate.
Duplicated rather than imported from incident_handler so the dependency
graph stays one-directional (consumer -> central_normalizer)."""
if not s: return None
try:
from datetime import datetime as _dt
return int(_dt.fromisoformat(s.replace("Z", "+00:00")).timestamp())
except Exception:
return None
def _parse_511_date_epoch_freshness(s: Optional[str]) -> Optional[int]:
if not s: return None
try:
from datetime import datetime as _dt, timezone as _tz
return int(_dt.strptime(s, "%m/%d/%y, %I:%M %p").replace(
tzinfo=_tz.utc).timestamp())
except Exception:
return None
def is_incident_envelope_stale(envelope: dict, now: int,
max_age_s: int = 1800) -> bool:
"""v0.5.9 GAMMA universal freshness gate. Returns True iff the envelope
should be DROPPED on freshness grounds.
Per-source start-time fields:
tomtom_incidents -> inner.data.start_time (ISO-8601)
state_511_atis -> inner.data.start_date ("5/28/26, 10:45 PM")
itd_511 -> inner.data.start_epoch (Unix int)
other adapters -> None (default-allow; the gate has nothing to do)
Two-sided check: 0 <= age <= max_age_s. Negative ages reject future-
scheduled events (e.g. itd_511 work_zone planned to start days from
now); ages > max_age_s reject stale events. None / missing start time
defaults to ALLOW so we err on the side of broadcasting potentially-
fresh data with incomplete metadata.
Pure (no side effects); caller decides to log + skip when this returns
True.
"""
if not isinstance(envelope, dict): return False
inner = envelope.get("data") or {}
adapter = inner.get("adapter") or ""
d = inner.get("data") or {}
se: Optional[int] = None
if adapter == "tomtom_incidents":
se = _parse_iso_epoch_freshness(d.get("start_time"))
elif adapter == "state_511_atis":
se = _parse_511_date_epoch_freshness(d.get("start_date"))
elif adapter == "itd_511":
val = d.get("start_epoch")
if isinstance(val, (int, float)) and val > 0:
se = int(val)
else:
return False # adapter not in scope of this gate
if se is None:
return False # default-allow on missing start time
age = now - se
return age < 0 or age > max_age_s

View file

@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
DEFAULT_DB_PATH = "/data/meshai.sqlite" DEFAULT_DB_PATH = "/data/meshai.sqlite"
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
SCHEMA_VERSION = 1 SCHEMA_VERSION = 2
SCHEMA_META_TABLE = "schema_meta" SCHEMA_META_TABLE = "schema_meta"
MIGRATIONS_DIR = Path(__file__).parent / "migrations" MIGRATIONS_DIR = Path(__file__).parent / "migrations"

View file

@ -0,0 +1,31 @@
-- v0.5.9 schema migration: add incident-handler columns to traffic_events.
--
-- The v0.5.8b traffic_events table covered state_511_atis + wzdx with road/
-- direction/mile_post/county/state fields. The unified incident handler
-- (tomtom_incidents + state_511_atis incidents/closures + itd_511) needs:
--
-- * `magnitude_of_delay` : TomTom 0-4 severity int, NULL for the
-- 511 sources that don't carry it
-- * `delay_seconds` : TomTom delay measurement, NULL otherwise
-- * `icon_category` : canonical sub_type word (cross-source)
-- * `last_broadcast_magnitude` : snapshot of magnitude when we last fired
-- * `last_broadcast_delay_seconds` : snapshot of delay when we last fired
-- * `last_broadcast_icon_category` : snapshot of icon_category when we last fired
--
-- The three `last_broadcast_*` columns power per-incident change-detection
-- (Matt's v0.5.9 spec §5): broadcast Update only when magnitude bumps up,
-- delay doubles, icon_category changes, or 8h heartbeat fires. SQLite stores
-- NULL columns cheaply so the work-zone adapters can ignore these fields.
ALTER TABLE traffic_events ADD COLUMN magnitude_of_delay INTEGER;
ALTER TABLE traffic_events ADD COLUMN delay_seconds INTEGER;
ALTER TABLE traffic_events ADD COLUMN icon_category TEXT;
ALTER TABLE traffic_events ADD COLUMN last_broadcast_magnitude INTEGER;
ALTER TABLE traffic_events ADD COLUMN last_broadcast_delay_seconds INTEGER;
ALTER TABLE traffic_events ADD COLUMN last_broadcast_icon_category TEXT;
-- Index for the change-detection hot path (per-source per-external_id lookup
-- is already covered by the composite PK; this index helps the "who hasn't
-- broadcast in 8h" sweep queries that a watchdog will use in v0.5.10).
CREATE INDEX IF NOT EXISTS idx_traffic_source_lastbcast
ON traffic_events(source, last_broadcast_at);

View file

@ -673,3 +673,132 @@ def test_wzdx_sub_type_unknown_vocab_is_lowercased_with_spaces(monkeypatch):
"_enriched": {"geocoder": {"city": "Boise"}}}}} "_enriched": {"geocoder": {"city": "Boise"}}}}}
n = normalize(env) n = normalize(env)
assert n["sub_type"] == "some custom work" # lowercased + hyphens→spaces assert n["sub_type"] == "some custom work" # lowercased + hyphens→spaces
# ============================================================================
# v0.5.9 GAMMA -- state_511_atis Idaho cutover
# ============================================================================
def _state_511_envelope(state_code="ID", primary_region="US-ID"):
return {
"subject": "central.traffic.work_zone.id",
"id": "ID:Construction:33333",
"data": {
"id": "ID:Construction:33333", "adapter": "state_511_atis",
"category": "work_zone.state_511_atis", "severity": 1,
"geo": {"centroid": [-116.79, 47.70],
"primary_region": primary_region},
"data": {
"roadway_name": "US-95", "direction": "Both",
"event_sub_type": "brushControl",
"description": "Minor Brush control on US-95.",
"is_full_closure": False, "layer": "Construction",
"county": "Kootenai", "state": "Idaho",
"state_code": state_code,
"start_date": "6/1/26, 5:00 AM",
"last_updated": "5/28/26, 12:54 PM",
"latitude": 47.7, "longitude": -116.79,
"_enriched": {"geocoder": {
"city": "Coeur d'Alene", "county": "Kootenai",
}},
},
},
}
def test_gamma_should_skip_state_511_atis_id_via_state_code():
"""Helper returns True when state_code='ID'."""
from meshai.central_normalizer import should_skip_state_511_atis_id
env = _state_511_envelope(state_code="ID")
assert should_skip_state_511_atis_id(env) is True
def test_gamma_should_skip_state_511_atis_id_via_primary_region():
"""Helper returns True when only primary_region='US-ID' is set."""
from meshai.central_normalizer import should_skip_state_511_atis_id
env = _state_511_envelope(state_code="", primary_region="US-ID")
assert should_skip_state_511_atis_id(env) is True
def test_gamma_should_skip_state_511_atis_id_false_for_non_id():
"""Helper returns False for neighbor states."""
from meshai.central_normalizer import should_skip_state_511_atis_id
env = _state_511_envelope(state_code="WA", primary_region="US-WA")
assert should_skip_state_511_atis_id(env) is False
def test_gamma_state_511_non_id_still_parses():
"""state_511_atis with state_code='WA' continues to be parsed --
neighbor-state coverage remains active after the Idaho cutover.
With the v0.5.9 GAMMA fixup, the parser is unconditionally pure;
this test guards against a future regression that would put the
skip back into normalize()."""
env = _state_511_envelope(state_code="WA", primary_region="US-WA")
n = normalize(env)
assert n is not None
assert n.get("source") == "state_511_atis"
assert n.get("road") == "US-95"
def test_gamma_itd_511_work_zone_dispatch():
"""itd_511 + category=work_zone.* routes to _parse_itd_511_work_zone."""
env = {
"subject": "central.traffic.work_zone.us.id",
"id": "ITD:469:99",
"data": {
"id": "ITD:469:99", "adapter": "itd_511",
"category": "work_zone.itd_511", "severity": 1,
"geo": {"centroid": [-116.79, 47.70],
"primary_region": "US-ID"},
"data": {
"event_type_short": "work_zone",
"event_sub_type": "roadConstruction",
"roadway_name": "I-90", "direction": "Both",
"description": "Road construction on I-90.",
"is_full_closure": False,
"comment": "", "cause": "roadwork",
"organization": "ERS",
"start_epoch": 1780600000,
"planned_end_epoch": 1781000000,
"latitude": 47.7, "longitude": -116.79,
"_enriched": {"geocoder": {
"city": "Coeur d'Alene", "county": "Kootenai",
}},
},
},
}
n = normalize(env)
assert n is not None
assert n.get("source") == "itd_511"
assert n.get("road") == "I-90"
def test_gamma_itd_511_non_work_zone_returns_none_or_marker():
"""itd_511 + category=incident.* should NOT go through work_zone parser."""
env = {
"subject": "central.traffic.incident.us.id",
"id": "ITD:469:100",
"data": {
"id": "ITD:469:100", "adapter": "itd_511",
"category": "incident.itd_511", "severity": 1,
"geo": {"primary_region": "US-ID"},
"data": {
"event_type_short": "incident",
"event_sub_type": "crash",
"roadway_name": "I-84", "direction": "East",
"description": "Crash on I-84.",
"is_full_closure": False, "comment": "",
"cause": "crash", "organization": "ERS",
"start_epoch": 1780600000,
"planned_end_epoch": None,
"latitude": 43.5, "longitude": -116.5,
"_enriched": {"geocoder": {"city": "Boise"}},
},
},
}
n = normalize(env)
# Either None or not a work_zone shape -- never a parsed work_zone.
if n is not None:
assert n.get("source") != "itd_511" or "road" not in n

View file

@ -0,0 +1,798 @@
"""Tests for meshai.central.incident_handler (v0.5.9).
Coverage:
Tomtom parsing + rendering (a/b/c/d):
(a) jam, accident, road_closed, lane_closed, road_works each render with
correct emoji + phrase + delay segment
(b) magnitude_of_delay == 0 events filtered at entrance
(c) delay == null events render WITHOUT the delay segment
(d) time_validity past/future events filtered
Per-incident change-detection (e-i):
(e) republish with no change -> drop silently, no new audit
(f) magnitude bump up -> Update
(g) delay double (>=2x) -> Update
(h) icon change -> Update
(i) 8h heartbeat -> Update
state_511 / itd_511 EventType branching (j-m):
(j) state_511_atis incident parses
(k) state_511_atis closure parses
(l) state_511_atis special_event parses (synthetic)
(m) itd_511 incident parses
Decoupled callback (n) and traffic_events UPSERT (o):
(n) cold-start scenario -- handler runs but callback never fires;
second pass still emits New: (not Update:)
(o) existing traffic_events row gets UPSERTed across passes
"""
import re
import time
import pytest
from meshai.central.incident_handler import (
INCIDENT_BROADCAST_HEARTBEAT_S,
handle_incident,
)
from meshai.persistence import close_thread_connection, init_db
from meshai.persistence import db as persistence_db
# ---------- fixtures ------------------------------------------------------
@pytest.fixture
def mem_db(monkeypatch, tmp_path):
db_path = str(tmp_path / "incident-test.sqlite")
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
persistence_db._initialised.clear()
close_thread_connection()
conn = init_db()
yield conn
close_thread_connection()
persistence_db._initialised.discard(db_path)
@pytest.fixture
def no_photon(monkeypatch):
import meshai.central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
if hasattr(cn, "_H3_NEAREST_CACHE"):
cn._H3_NEAREST_CACHE.clear()
# ---------- tomtom envelope builder --------------------------------------
_TTI_A = "cfb0c03f-9ab9-46f9-ac21-9b17d0f715f2"
_TTI_B = "13ca4176-4eea-428e-a807-49bee662159a"
_TTI_C = "3573b54b-9e55-4aff-83d3-253048825e77"
def _tomtom_env(*, tti=_TTI_A,
icon_category=6,
magnitude=2,
delay=412,
time_validity="present",
description="Queuing traffic on I-84 Westbound from Orchard St to ID-55. ",
road_numbers=("I-84",),
from_loc="Orchard St/Exit 52 (I-84)",
to_loc="ID-55/Exit 46 (I-84)",
start_time=None,
end_time=None,
lat=43.5833926533, lon=-116.2598321532,
state_code="ID", bbox_name="treasure_valley_ext",
geocoder_city="Boise"):
inner_id = f"ID:tomtom:TTI-{tti}-TTR{int(time.time()*1000)}"
geocoder = {"city": geocoder_city, "county": "Ada", "state": "ID",
"country": "United States", "landclass": None}
return {
"id": inner_id,
"subject": "central.traffic.incident.id",
"data": {
"id": inner_id, "adapter": "tomtom_incidents",
"category": "incident.tomtom_incidents", "severity": 1,
"geo": {"centroid": [lon, lat], "primary_region": "US-ID"},
"data": {
"id": inner_id,
"description": description,
"event_code": 108,
"from": from_loc, "to": to_loc,
"magnitude_of_delay": magnitude,
"icon_category": icon_category,
"length": 6112.13,
"delay": delay,
"road_numbers": list(road_numbers),
"start_time": start_time, "end_time": end_time,
"time_validity": time_validity,
"state_code": state_code, "bbox_name": bbox_name,
"latitude": lat, "longitude": lon,
"_enriched": {"geocoder": geocoder},
},
},
}
def _state_511_env(*, layer="Incidents", category_prefix="incident",
event_sub_type="crash",
roadway="US-95", direction="Both",
is_full_closure=False,
external_id="ID:Incidents:33948",
lat=48.5295, lon=-116.4293,
geocoder_city="Naples", county="Boundary"):
return {
"id": external_id,
"subject": f"central.traffic.{category_prefix}.id",
"data": {
"id": external_id, "adapter": "state_511_atis",
"category": f"{category_prefix}.state_511_atis", "severity": 1,
"geo": {"centroid": [lon, lat], "primary_region": "US-WA"},
"data": {
"roadway_name": roadway, "direction": direction,
"event_sub_type": event_sub_type,
"description": "test event",
"is_full_closure": is_full_closure,
"layer": layer,
# v0.5.9 GAMMA: default to non-ID neighbor state because
# state_511_atis no longer covers Idaho (itd_511 took over).
# Tests that want to exercise the ID-skip path override these.
"county": county, "state": "Washington", "state_code": "WA",
"start_date": None,
"last_updated": None,
"latitude": lat, "longitude": lon,
"_enriched": {"geocoder": {
"city": geocoder_city, "county": county, "state": "ID",
"country": "United States", "landclass": None,
}},
},
},
}
def _itd_511_env(*, category_prefix="incident",
event_type_short="incident",
event_sub_type="crash",
roadway="I-84", direction="East",
is_full_closure=False,
external_id="ITD:469:17",
lat=43.6486, lon=-116.4870,
geocoder_city="Caldwell"):
return {
"id": external_id,
"subject": f"central.traffic.{category_prefix}.us.id",
"data": {
"id": external_id, "adapter": "itd_511",
"category": f"{category_prefix}.itd_511", "severity": 1,
"geo": {"centroid": [lon, lat], "primary_region": "US-ID"},
"data": {
"event_type_short": event_type_short,
"event_sub_type": event_sub_type,
"roadway_name": roadway, "direction": direction,
"description": "test itd event",
"lanes_affected": "All lanes affected",
"is_full_closure": is_full_closure,
"itd_severity": "None",
"comment": "", "cause": "roadwork",
"organization": "ERS",
"recurrence_text": "", "recurrence_schedules": [],
"restrictions": {}, "encoded_polyline": "",
"id_internal": 17, "source_id": "469",
"reported_epoch": None,
"last_updated_epoch": None,
"start_epoch": None,
"planned_end_epoch": None,
"latitude": lat, "longitude": lon,
"_enriched": {"geocoder": {
"city": geocoder_city, "county": "Canyon", "state": "ID",
}},
},
},
}
def _commit(data, committed_at):
cb = data.get("_on_broadcast_committed")
assert callable(cb), "handler must attach commit callback"
cb(committed_at)
# ============================================================================
# (a) tomtom parsing -- all five icon categories render correctly
# ============================================================================
@pytest.mark.parametrize("icon, expected_emoji, expected_phrase", [
(1, "🚨", "crash"),
(6, "🚗", "jam"),
(7, "🟠", "lane closed"),
(8, "🚫", "road closed"),
(9, "🚧", "road works"),
])
def test_a_tomtom_icon_renders(mem_db, no_photon, icon, expected_emoji, expected_phrase):
env = _tomtom_env(icon_category=icon, magnitude=2, delay=300)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
assert wire.startswith(f"{expected_emoji} New: I-84")
assert expected_phrase in wire
assert "near Boise" in wire
assert "5 min delay" in wire
assert "@ 43.583,-116.260" in wire
# ============================================================================
# (b) tomtom magnitude_of_delay == 0 events filtered
# ============================================================================
def test_b_tomtom_magnitude_zero_filtered(mem_db, no_photon):
env = _tomtom_env(icon_category=6, magnitude=0, delay=10)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is None
# filtered envelope leaves no traffic_events row
n_rows = mem_db.execute("SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
assert n_rows == 0
# but the event IS logged to event_log handled=0 for accounting
n_log = mem_db.execute(
"SELECT COUNT(*) AS n FROM event_log WHERE source='tomtom_incidents'"
).fetchone()["n"]
assert n_log == 1
assert "_on_broadcast_committed" not in data
# ============================================================================
# (c) tomtom delay == null events render WITHOUT delay segment
# ============================================================================
def test_c_tomtom_delay_null_no_delay_segment(mem_db, no_photon):
env = _tomtom_env(icon_category=1, magnitude=3, delay=None)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
assert "crash" in wire
assert "min delay" not in wire # no delay segment
assert "@ 43.583,-116.260" in wire
# ============================================================================
# (d) tomtom time_validity past/future filtered
# ============================================================================
@pytest.mark.parametrize("validity", ["past", "future"])
def test_d_tomtom_time_validity_filtered(mem_db, no_photon, validity):
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
time_validity=validity)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is None
n_rows = mem_db.execute(
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
assert n_rows == 0
# ============================================================================
# (e) per-incident dedup -- republish with no change drops silently
# ============================================================================
def test_e_per_incident_dedup_no_change(mem_db, no_photon):
env = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
wire1 = handle_incident(env, env["subject"], data=data1, now=1_000_000)
assert wire1 is not None
_commit(data1, 1_000_001)
# Re-publish 5 minutes later, same magnitude/delay/icon.
data2 = {}
wire2 = handle_incident(env, env["subject"], data=data2, now=1_000_300)
assert wire2 is None # no change, no broadcast
# ============================================================================
# (f) magnitude bump triggers Update
# ============================================================================
def test_f_magnitude_bump_triggers_update(mem_db, no_photon):
env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
_commit(data1, 1_000_001)
# v0.5.9 REVISED gate (A): magnitude bump no longer fires Update.
# State still flips in traffic_events, but no wire string returns.
env2 = _tomtom_env(icon_category=6, magnitude=3, delay=300)
data2 = {}
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
assert wire2 is None
# Current magnitude tracked in the row.
row = mem_db.execute(
"SELECT magnitude_of_delay FROM traffic_events "
"WHERE source='tomtom_incidents'").fetchone()
assert row["magnitude_of_delay"] == 3
# ============================================================================
# (g) delay double triggers Update
# ============================================================================
def test_g_delay_double_triggers_update(mem_db, no_photon):
env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
_commit(data1, 1_000_001)
# v0.5.9 REVISED gate (A): delay double no longer fires Update.
env2 = _tomtom_env(icon_category=6, magnitude=2, delay=700)
data2 = {}
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
assert wire2 is None
row = mem_db.execute(
"SELECT delay_seconds FROM traffic_events "
"WHERE source='tomtom_incidents'").fetchone()
assert row["delay_seconds"] == 700
def test_g_delay_below_double_no_update(mem_db, no_photon):
"""delay 300 -> 500 (1.67x) should NOT trigger broadcast."""
env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
_commit(data1, 1_000_001)
env2 = _tomtom_env(icon_category=6, magnitude=2, delay=500)
data2 = {}
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
assert wire2 is None
# ============================================================================
# (h) icon change triggers Update
# ============================================================================
def test_h_icon_change_triggers_update(mem_db, no_photon):
env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
_commit(data1, 1_000_001)
# v0.5.9 REVISED gate (A): icon change no longer fires Update.
env2 = _tomtom_env(icon_category=8, magnitude=2, delay=300)
data2 = {}
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
assert wire2 is None
row = mem_db.execute(
"SELECT icon_category FROM traffic_events "
"WHERE source='tomtom_incidents'").fetchone()
assert row["icon_category"] == "road_closed"
# ============================================================================
# (i) 8h heartbeat triggers Update
# ============================================================================
def test_i_8h_heartbeat_triggers_update(mem_db, no_photon):
env = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
handle_incident(env, env["subject"], data=data1, now=1_000_000)
_commit(data1, 1_000_001)
# v0.5.9 REVISED gate (A): heartbeat no longer fires Update.
later = 1_000_001 + INCIDENT_BROADCAST_HEARTBEAT_S
data2 = {}
wire2 = handle_incident(env, env["subject"], data=data2, now=later)
assert wire2 is None
# ============================================================================
# (j) state_511_atis incident parses
# ============================================================================
def test_j_state_511_incident_parses(mem_db, no_photon):
env = _state_511_env(layer="Incidents", category_prefix="incident",
event_sub_type="crash")
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
assert wire.startswith("🚨 New: US-95") # crash -> 🚨
assert "crash" in wire
assert "near Naples" in wire
row = mem_db.execute(
"SELECT source, sub_type, state FROM traffic_events "
"WHERE source='state_511_atis'").fetchone()
assert row["sub_type"] == "accident"
# v0.5.9 GAMMA: state_511_atis is non-ID only -- helper now defaults to WA.
assert row["state"] == "WA"
# ============================================================================
# (k) state_511_atis closure parses
# ============================================================================
def test_k_state_511_closure_parses(mem_db, no_photon):
env = _state_511_env(layer="Closures", category_prefix="closure",
event_sub_type="roadConstruction",
is_full_closure=True,
external_id="ID:Closures:33950")
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
# roadConstruction -> road_works -> 🚧
assert wire.startswith("🚧 New:")
assert "all lanes closed" in wire
# ============================================================================
# (l) state_511_atis special_event parses
# ============================================================================
def test_l_state_511_special_event_parses(mem_db, no_photon):
env = _state_511_env(layer="Special Events",
category_prefix="special_event",
event_sub_type="parade",
external_id="ID:SpecialEvents:42")
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
# parade -> 🎪
assert wire.startswith("🎪 New:")
assert "parade" in wire
# ============================================================================
# (m) itd_511 incident parses
# ============================================================================
def test_m_itd_511_incident_parses(mem_db, no_photon):
env = _itd_511_env(category_prefix="incident",
event_type_short="incident",
event_sub_type="crash")
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
assert wire.startswith("🚨 New:")
row = mem_db.execute(
"SELECT source, state FROM traffic_events "
"WHERE source='itd_511'").fetchone()
assert row["state"] == "ID"
# ============================================================================
# (n) decoupled callback -- cold-start scenario still emits New: on second pass
# ============================================================================
def test_n_cold_start_then_resume_still_new(mem_db, no_photon):
env = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
wire1 = handle_incident(env, env["subject"], data=data1, now=1_000_000)
assert wire1.startswith("🚗 New:")
# Cold-start grace drops the broadcast -- DO NOT call _commit().
# 5 minutes later, same incident republishes.
data2 = {}
wire2 = handle_incident(env, env["subject"], data=data2, now=1_000_300)
assert wire2 is not None
assert wire2.startswith("🚗 New:"), \
"must still be New: until commit callback fires"
row = mem_db.execute(
"SELECT last_broadcast_at, last_broadcast_magnitude "
"FROM traffic_events WHERE source='tomtom_incidents'").fetchone()
assert row["last_broadcast_at"] is None
assert row["last_broadcast_magnitude"] is None
# ============================================================================
# (o) traffic_events row gets UPSERTed on each pass; event_log handled flips
# ============================================================================
def test_o_traffic_events_upsert_and_event_log_handled_flip(mem_db, no_photon):
env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
# event_log row exists with handled=0 BEFORE callback.
el_pre = mem_db.execute(
"SELECT handled FROM event_log "
"WHERE source='tomtom_incidents' ORDER BY id DESC LIMIT 1"
).fetchone()
assert el_pre["handled"] == 0
_commit(data1, 1_000_001)
el_post = mem_db.execute(
"SELECT handled FROM event_log "
"WHERE source='tomtom_incidents' ORDER BY id DESC LIMIT 1"
).fetchone()
assert el_post["handled"] == 1
fr_post = mem_db.execute(
"SELECT last_broadcast_at, last_broadcast_magnitude, "
"last_broadcast_delay_seconds, last_broadcast_icon_category "
"FROM traffic_events WHERE source='tomtom_incidents'"
).fetchone()
assert fr_post["last_broadcast_at"] == 1_000_001
assert fr_post["last_broadcast_magnitude"] == 2
assert fr_post["last_broadcast_delay_seconds"] == 300
assert fr_post["last_broadcast_icon_category"] == "jam"
# Re-publish: UPSERT updates current_* but doesn't touch last_broadcast_*.
env2 = _tomtom_env(icon_category=6, magnitude=2, delay=500)
data2 = {}
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
# v0.5.9 REVISED: no Update broadcasts regardless of delta size --
# under the OLD rule this was 'delay 1.67x not enough', now it's
# 'we never re-broadcast'.
assert wire2 is None
fr2 = mem_db.execute(
"SELECT delay_seconds, last_broadcast_delay_seconds "
"FROM traffic_events WHERE source='tomtom_incidents'"
).fetchone()
assert fr2["delay_seconds"] == 500 # UPSERT happened
assert fr2["last_broadcast_delay_seconds"] == 300 # last broadcast unchanged
# ============================================================================
# v0.5.9 REVISED -- conservative gates
# ============================================================================
def test_p_known_id_all_changed_no_broadcast(mem_db, no_photon):
"""Regression guard for the new no-Update rule. Republish the SAME
external_id with magnitude AND delay AND icon all changed. The old rule
would have triggered Update on any one of those; the new rule fires
nothing."""
env1 = _tomtom_env(icon_category=6, magnitude=2, delay=300)
data1 = {}
wire1 = handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
assert wire1.startswith("🚗 New:")
_commit(data1, 1_000_001)
env2 = _tomtom_env(icon_category=8, magnitude=4, delay=700) # ALL changed
data2 = {}
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
assert wire2 is None, "no Update broadcasts under the v0.5.9 REVISED rule"
# State still tracks the latest field values.
row = mem_db.execute(
"SELECT magnitude_of_delay, delay_seconds, icon_category "
"FROM traffic_events WHERE source='tomtom_incidents'").fetchone()
assert row["magnitude_of_delay"] == 4
assert row["delay_seconds"] == 700
assert row["icon_category"] == "road_closed"
# ============================================================================
# Freshness gate (q/r/s)
# ============================================================================
def _now_anchor_relative(start_age_seconds: int):
"""Choose (now, start_time_iso) so that `now - parse(start_time) ==
start_age_seconds`. Used by the freshness-gate tests."""
import datetime as _dt
now = 2_000_000_000 # arbitrary fixed epoch >>1970
start_iso = _dt.datetime.fromtimestamp(
now - start_age_seconds, tz=_dt.timezone.utc
).strftime("%Y-%m-%dT%H:%M:%SZ")
return now, start_iso
def test_q_fresh_event_15min_ago_broadcasts(mem_db, no_photon):
"""Event started 15 min ago -- WITHIN the 30-min fresh window. New: fires."""
now, start_iso = _now_anchor_relative(15 * 60)
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
start_time=start_iso)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=now)
assert wire is not None
assert wire.startswith("🚗 New:")
n_rows = mem_db.execute(
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
assert n_rows == 1
def test_r_stale_event_45min_ago_dropped_no_row(mem_db, no_photon):
"""Event started 45 min ago -- OUTSIDE the 30-min window. Drop AT
handler entrance, no UPSERT into traffic_events, event_log handled=0."""
now, start_iso = _now_anchor_relative(45 * 60)
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
start_time=start_iso)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=now)
assert wire is None
n_rows = mem_db.execute(
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
assert n_rows == 0, "no UPSERT for stale incidents"
n_log = mem_db.execute(
"SELECT COUNT(*) AS n FROM event_log WHERE handled=0 "
"AND source='tomtom_incidents'").fetchone()["n"]
assert n_log == 1, "stale envelope must still be logged (handled=0)"
def test_s_null_start_time_default_allow(mem_db, no_photon):
"""start_time missing -> default-allow (treat as fresh and broadcast)."""
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
start_time=None)
data = {}
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
assert wire is not None
assert wire.startswith("🚗 New:")
# ============================================================================
# Per-source startTime field path (t)
# ============================================================================
def test_t_state_511_start_date_path(mem_db, no_photon):
"""state_511_atis pulls start_time from inner.data.start_date ('5/28/26,
10:45 PM' format). Construct fresh (within 30 min) and stale (>30 min)
cases and confirm gate behavior is per-source-correct."""
# 5 min ago in the 511-date format.
import datetime as _dt
now = int(_dt.datetime(2026, 6, 4, 12, 0, tzinfo=_dt.timezone.utc).timestamp())
fresh_date = _dt.datetime.fromtimestamp(now - 5 * 60,
tz=_dt.timezone.utc).strftime(
"%-m/%-d/%y, %-I:%M %p")
env = _state_511_env(layer="Incidents", category_prefix="incident",
event_sub_type="crash")
# Replace the default start_date with a fresh one.
env["data"]["data"]["start_date"] = fresh_date
wire = handle_incident(env, env["subject"], data={}, now=now)
assert wire is not None
assert wire.startswith("🚨 New:")
# Stale variant (45 min ago).
stale_date = _dt.datetime.fromtimestamp(now - 45 * 60,
tz=_dt.timezone.utc).strftime(
"%-m/%-d/%y, %-I:%M %p")
env2 = _state_511_env(layer="Incidents", category_prefix="incident",
event_sub_type="crash",
external_id="ID:Incidents:99999")
env2["data"]["data"]["start_date"] = stale_date
wire2 = handle_incident(env2, env2["subject"], data={}, now=now)
assert wire2 is None
def test_t_itd_511_start_epoch_path(mem_db, no_photon):
"""itd_511 pulls start_time from inner.data.start_epoch (Unix int)."""
now = 1_700_000_000
# Fresh (5 min ago) variant.
env = _itd_511_env(category_prefix="incident",
event_type_short="incident",
event_sub_type="crash")
env["data"]["data"]["start_epoch"] = now - 5 * 60
wire = handle_incident(env, env["subject"], data={}, now=now)
assert wire is not None
assert wire.startswith("🚨 New:")
# Stale variant (45 min ago).
env2 = _itd_511_env(category_prefix="incident",
event_type_short="incident",
event_sub_type="crash",
external_id="ITD:469:99999")
env2["data"]["data"]["start_epoch"] = now - 45 * 60
wire2 = handle_incident(env2, env2["subject"], data={}, now=now)
assert wire2 is None
def test_t_tomtom_start_time_path(mem_db, no_photon):
"""tomtom pulls start_time from inner.data.start_time (ISO-8601)."""
now, fresh_iso = _now_anchor_relative(5 * 60)
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
start_time=fresh_iso)
wire = handle_incident(env, env["subject"], data={}, now=now)
assert wire is not None
assert wire.startswith("🚗 New:")
_, stale_iso = _now_anchor_relative(45 * 60)
env2 = _tomtom_env(icon_category=6, magnitude=2, delay=300,
start_time=stale_iso,
tti="11111111-2222-3333-4444-555555555555")
wire2 = handle_incident(env2, env2["subject"], data={}, now=now)
assert wire2 is None
# ============================================================================
# v0.5.9 GAMMA -- two-sided freshness gate + state_511 ID skip
# ============================================================================
def test_u_future_scheduled_event_dropped(mem_db, no_photon):
"""itd_511 work_zone envelopes can carry start_epoch in the future
(scheduled construction). The v0.5.9 REVISED one-sided gate let those
slip through. The GAMMA fix rejects negative ages too."""
import datetime as _dt
now = 2_000_000_000
# start_epoch 8 hours in the future
env = _itd_511_env(category_prefix="incident",
event_type_short="incident",
event_sub_type="crash",
external_id="ITD:future:1")
env["data"]["data"]["start_epoch"] = now + 8 * 3600
wire = handle_incident(env, env["subject"], data={}, now=now)
assert wire is None
n_rows = mem_db.execute(
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
assert n_rows == 0
n_log = mem_db.execute(
"SELECT COUNT(*) AS n FROM event_log WHERE handled=0 "
"AND source='itd_511'").fetchone()["n"]
assert n_log == 1
def test_v_state_511_id_skipped_at_handler_entrance(mem_db, no_photon):
"""state_511_atis with state_code='ID' is skipped at handler entrance --
no parse, no traffic_events row, event_log records the skip."""
env = _state_511_env(layer="Incidents", category_prefix="incident",
event_sub_type="crash")
# Override defaults (post-GAMMA helper defaults to WA) to ID for this test.
env["data"]["data"]["state_code"] = "ID"
env["data"]["data"]["state"] = "Idaho"
env["data"]["geo"]["primary_region"] = "US-ID"
wire = handle_incident(env, env["subject"], data={}, now=1_000_000)
assert wire is None
n_rows = mem_db.execute(
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
assert n_rows == 0
row = mem_db.execute(
"SELECT category, handled FROM event_log "
"WHERE source='state_511_atis' ORDER BY id DESC LIMIT 1"
).fetchone()
assert row is not None
assert "|skip_id" in row["category"]
assert row["handled"] == 0
def test_v_state_511_non_id_still_processed(mem_db, no_photon):
"""Regression guard: state_511_atis with state_code='WA' (or anything
that's not ID) keeps going through the handler. After Idaho cutover
we still need state_511 for neighbor coverage."""
env = _state_511_env(layer="Incidents", category_prefix="incident",
event_sub_type="crash", county="Spokane")
# Override state_code to WA.
env["data"]["data"]["state_code"] = "WA"
env["data"]["geo"]["primary_region"] = "US-WA"
wire = handle_incident(env, env["subject"], data={}, now=1_000_000)
assert wire is not None
assert wire.startswith("🚨 New:")
# traffic_events row written for WA event.
row = mem_db.execute(
"SELECT state FROM traffic_events WHERE source='state_511_atis'"
).fetchone()
assert row is not None
assert row["state"] == "WA"
def test_w_itd_511_future_scheduled_dropped_via_start_epoch(mem_db, no_photon):
"""Direct exercise of the itd_511 start_epoch field path with a
future-scheduled value (the Phase-1 leak source). Confirms the gate
really uses start_epoch and the new two-sided check catches it."""
import meshai.central.incident_handler as h
env = _itd_511_env(category_prefix="incident",
event_type_short="incident",
event_sub_type="crash",
external_id="ITD:future:work")
env["data"]["data"]["start_epoch"] = 99_000_000_000 # year ~5108 -- definitely future
se = h._extract_start_time_epoch(env, "itd_511")
assert se == 99_000_000_000
now = 2_000_000_000
wire = handle_incident(env, env["subject"], data={}, now=now)
assert wire is None

View file

@ -0,0 +1,137 @@
"""Tests for the v0.5.9 GAMMA itd_511 work_zone parser in central_normalizer.
Covers the cutover path: itd_511 supplies all Idaho work_zone broadcasts now
(state_511_atis ID is skipped). The parser must produce the same flat dict
shape as _parse_state_511_atis so format_work_zone_mesh works unchanged.
"""
from datetime import datetime, timezone
import pytest
from meshai import central_normalizer as cn
# ---------- envelope builder ---------------------------------------------
def _itd_work_zone_env(*, roadway="SH-55", direction="North",
event_sub_type="roadConstruction",
is_full_closure=False,
external_id="ITD:469:42",
lat=44.103, lon=-116.110,
geocoder_city="McCall",
start_epoch=1780600000,
planned_end_epoch=1781200000,
description="Road construction on SH-55 Northbound from MM (140) to MM (145). 6/4/2026 7:00 AM to 6/11/2026 5:00 PM."):
return {
"id": external_id,
"subject": "central.traffic.work_zone.us.id",
"data": {
"id": external_id, "adapter": "itd_511",
"category": "work_zone.itd_511", "severity": 1,
"geo": {"centroid": [lon, lat], "primary_region": "US-ID"},
"data": {
"event_type_short": "work_zone",
"event_sub_type": event_sub_type,
"roadway_name": roadway, "direction": direction,
"description": description,
"lanes_affected": "All lanes affected",
"is_full_closure": is_full_closure,
"itd_severity": "None",
"comment": "",
"cause": "roadwork",
"organization": "ERS",
"recurrence_text": "",
"recurrence_schedules": [],
"restrictions": {},
"encoded_polyline": "",
"id_internal": 42, "source_id": "999",
"reported_epoch": start_epoch,
"last_updated_epoch": start_epoch,
"start_epoch": start_epoch,
"planned_end_epoch": planned_end_epoch,
"latitude": lat, "longitude": lon,
"_enriched": {"geocoder": {
"name": None, "city": geocoder_city,
"county": "Valley", "state": "ID",
"country": "United States",
"landclass": None, "elevation_m": 1530.0,
}},
},
},
}
@pytest.fixture
def no_photon(monkeypatch):
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
if hasattr(cn, "_H3_NEAREST_CACHE"):
cn._H3_NEAREST_CACHE.clear()
# ---------- test (a): all fields populate -----------------------------------
def test_itd_511_work_zone_parses_with_all_fields(no_photon):
env = _itd_work_zone_env()
n = cn.normalize(env)
assert n is not None
assert n["source"] == "itd_511"
assert n["road"] == "SH-55"
# _norm_direction returns 'northbound' (matches state_511 convention)
assert n["direction"] == "northbound"
assert n["mile_start"] == 140
assert n["mile_end"] == 145
assert n["sub_type"] is not None
# is_full_closure=False -> impact 'partial' matches state_511 convention
assert n["impact"] == "partial"
assert n["town"] == "McCall"
# ends_at is a datetime
assert isinstance(n["ends_at"], datetime)
def test_itd_511_work_zone_full_closure_impact(no_photon):
env = _itd_work_zone_env(is_full_closure=True)
n = cn.normalize(env)
assert n["impact"] == "full_closure"
def test_itd_511_work_zone_renderer_produces_wire(no_photon):
from meshai.notifications.renderers.work_zone import format_work_zone_mesh
env = _itd_work_zone_env()
n = cn.normalize(env)
wire = format_work_zone_mesh(n)
assert wire is not None
# Format matches state_511 convention: 🚧 emoji + road
assert "🚧" in wire
assert "SH-55" in wire
assert "McCall" in wire
def test_itd_511_work_zone_end_date_formatting(no_photon):
"""planned_end_epoch should serialize to a datetime that the renderer
can format consistently with state_511."""
env = _itd_work_zone_env(planned_end_epoch=1781200000)
n = cn.normalize(env)
assert isinstance(n["ends_at"], datetime)
assert n["ends_at"].tzinfo is not None # UTC-aware
def test_itd_511_work_zone_no_end_date(no_photon):
"""planned_end_epoch == None or 0 -> ends_at is None."""
env = _itd_work_zone_env(planned_end_epoch=None)
n = cn.normalize(env)
assert n["ends_at"] is None
def test_itd_511_incident_does_not_go_through_work_zone_parser(no_photon):
"""The work_zone parser should NOT be invoked for category=incident.itd_511
-- those still route through the incident_handler. normalize() returns
None (defer) for non-work_zone itd_511 categories."""
env = _itd_work_zone_env()
env["data"]["category"] = "incident.itd_511"
n = cn.normalize(env)
# Either None (defer) or not a work_zone dict (no road/direction/etc).
if n is not None:
assert "_kind" in n # marker, not a parsed work_zone dict