mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-10 17:04:45 +02:00
776 lines
31 KiB
Python
776 lines
31 KiB
Python
"""Tests for meshai.central.incident_handler (v0.5.9).
|
|
|
|
Coverage:
|
|
Tomtom parsing + rendering (a/b/c/d):
|
|
(a) jam, accident, road_closed, lane_closed, road_works each render with
|
|
correct emoji + phrase + delay segment
|
|
(b) magnitude_of_delay == 0 events filtered at entrance
|
|
(c) delay == null events render WITHOUT the delay segment
|
|
(d) time_validity past/future events filtered
|
|
|
|
Per-incident change-detection (e-i):
|
|
(e) republish with no change -> drop silently, no new audit
|
|
(f) magnitude bump up -> Update
|
|
(g) delay double (>=2x) -> Update
|
|
(h) icon change -> Update
|
|
|
|
state_511 / itd_511 EventType branching (j-m):
|
|
(j) state_511_atis incident parses
|
|
(k) state_511_atis closure parses
|
|
(l) state_511_atis special_event parses (synthetic)
|
|
(m) itd_511 incident parses
|
|
|
|
Decoupled callback (n) and traffic_events UPSERT (o):
|
|
(n) cold-start scenario -- handler runs but callback never fires;
|
|
second pass still emits New: (not Update:)
|
|
(o) existing traffic_events row gets UPSERTed across passes
|
|
"""
|
|
|
|
import re
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from meshai.central.incident_handler import (
|
|
handle_incident,
|
|
)
|
|
from meshai.persistence import close_thread_connection, init_db
|
|
from meshai.persistence import db as persistence_db
|
|
|
|
|
|
# ---------- fixtures ------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def mem_db(monkeypatch, tmp_path):
|
|
db_path = str(tmp_path / "incident-test.sqlite")
|
|
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
|
|
persistence_db._initialised.clear()
|
|
close_thread_connection()
|
|
conn = init_db()
|
|
yield conn
|
|
close_thread_connection()
|
|
persistence_db._initialised.discard(db_path)
|
|
|
|
|
|
@pytest.fixture
|
|
def no_photon(monkeypatch):
|
|
import meshai.central_normalizer as cn
|
|
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
|
|
if hasattr(cn, "_H3_NEAREST_CACHE"):
|
|
cn._H3_NEAREST_CACHE.clear()
|
|
|
|
|
|
# ---------- tomtom envelope builder --------------------------------------
|
|
|
|
|
|
_TTI_A = "cfb0c03f-9ab9-46f9-ac21-9b17d0f715f2"
|
|
_TTI_B = "13ca4176-4eea-428e-a807-49bee662159a"
|
|
_TTI_C = "3573b54b-9e55-4aff-83d3-253048825e77"
|
|
|
|
|
|
def _tomtom_env(*, tti=_TTI_A,
|
|
icon_category=6,
|
|
magnitude=4,
|
|
delay=412,
|
|
time_validity="present",
|
|
description="Queuing traffic on I-84 Westbound from Orchard St to ID-55. ",
|
|
road_numbers=("I-84",),
|
|
from_loc="Orchard St/Exit 52 (I-84)",
|
|
to_loc="ID-55/Exit 46 (I-84)",
|
|
start_time=None,
|
|
end_time=None,
|
|
lat=43.5833926533, lon=-116.2598321532,
|
|
state_code="ID", bbox_name="treasure_valley_ext",
|
|
geocoder_city="Boise"):
|
|
inner_id = f"ID:tomtom:TTI-{tti}-TTR{int(time.time()*1000)}"
|
|
geocoder = {"city": geocoder_city, "county": "Ada", "state": "ID",
|
|
"country": "United States", "landclass": None}
|
|
return {
|
|
"id": inner_id,
|
|
"subject": "central.traffic.incident.id",
|
|
"data": {
|
|
"id": inner_id, "adapter": "tomtom_incidents",
|
|
"category": "incident.tomtom_incidents", "severity": 1,
|
|
"geo": {"centroid": [lon, lat], "primary_region": "US-ID"},
|
|
"data": {
|
|
"id": inner_id,
|
|
"description": description,
|
|
"event_code": 108,
|
|
"from": from_loc, "to": to_loc,
|
|
"magnitude_of_delay": magnitude,
|
|
"icon_category": icon_category,
|
|
"length": 6112.13,
|
|
"delay": delay,
|
|
"road_numbers": list(road_numbers),
|
|
"start_time": start_time, "end_time": end_time,
|
|
"time_validity": time_validity,
|
|
"state_code": state_code, "bbox_name": bbox_name,
|
|
"latitude": lat, "longitude": lon,
|
|
"_enriched": {"geocoder": geocoder},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _state_511_env(*, layer="Incidents", category_prefix="incident",
|
|
event_sub_type="crash",
|
|
roadway="US-95", direction="Both",
|
|
is_full_closure=False,
|
|
external_id="ID:Incidents:33948",
|
|
lat=48.5295, lon=-116.4293,
|
|
geocoder_city="Naples", county="Boundary"):
|
|
return {
|
|
"id": external_id,
|
|
"subject": f"central.traffic.{category_prefix}.id",
|
|
"data": {
|
|
"id": external_id, "adapter": "state_511_atis",
|
|
"category": f"{category_prefix}.state_511_atis", "severity": 1,
|
|
"geo": {"centroid": [lon, lat], "primary_region": "US-WA"},
|
|
"data": {
|
|
"roadway_name": roadway, "direction": direction,
|
|
"event_sub_type": event_sub_type,
|
|
"description": "test event",
|
|
"is_full_closure": is_full_closure,
|
|
"layer": layer,
|
|
# v0.5.9 GAMMA: default to non-ID neighbor state because
|
|
# state_511_atis no longer covers Idaho (itd_511 took over).
|
|
# Tests that want to exercise the ID-skip path override these.
|
|
"county": county, "state": "Washington", "state_code": "WA",
|
|
"start_date": None,
|
|
"last_updated": None,
|
|
"latitude": lat, "longitude": lon,
|
|
"_enriched": {"geocoder": {
|
|
"city": geocoder_city, "county": county, "state": "ID",
|
|
"country": "United States", "landclass": None,
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _itd_511_env(*, category_prefix="incident",
|
|
event_type_short="incident",
|
|
event_sub_type="crash",
|
|
roadway="I-84", direction="East",
|
|
is_full_closure=False,
|
|
external_id="ITD:469:17",
|
|
lat=43.6486, lon=-116.4870,
|
|
geocoder_city="Caldwell"):
|
|
return {
|
|
"id": external_id,
|
|
"subject": f"central.traffic.{category_prefix}.us.id",
|
|
"data": {
|
|
"id": external_id, "adapter": "itd_511",
|
|
"category": f"{category_prefix}.itd_511", "severity": 1,
|
|
"geo": {"centroid": [lon, lat], "primary_region": "US-ID"},
|
|
"data": {
|
|
"event_type_short": event_type_short,
|
|
"event_sub_type": event_sub_type,
|
|
"roadway_name": roadway, "direction": direction,
|
|
"description": "test itd event",
|
|
"lanes_affected": "All lanes affected",
|
|
"is_full_closure": is_full_closure,
|
|
"itd_severity": "None",
|
|
"comment": "", "cause": "roadwork",
|
|
"organization": "ERS",
|
|
"recurrence_text": "", "recurrence_schedules": [],
|
|
"restrictions": {}, "encoded_polyline": "",
|
|
"id_internal": 17, "source_id": "469",
|
|
"reported_epoch": None,
|
|
"last_updated_epoch": None,
|
|
"start_epoch": None,
|
|
"planned_end_epoch": None,
|
|
"latitude": lat, "longitude": lon,
|
|
"_enriched": {"geocoder": {
|
|
"city": geocoder_city, "county": "Canyon", "state": "ID",
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _commit(data, committed_at):
|
|
cb = data.get("_on_broadcast_committed")
|
|
assert callable(cb), "handler must attach commit callback"
|
|
cb(committed_at)
|
|
|
|
|
|
# ============================================================================
|
|
# (a) tomtom parsing -- all five icon categories render correctly
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.parametrize("icon, expected_emoji, expected_phrase", [
|
|
(1, "🚨", "Crash"),
|
|
(6, "🚗", "Stationary Traffic"),
|
|
(7, "🟠", "Lane Reduction"),
|
|
(8, "🚫", "Road Closed"),
|
|
(9, "🚧", "Road Works"),
|
|
])
|
|
def test_a_tomtom_icon_renders(mem_db, no_photon, icon, expected_emoji, expected_phrase):
|
|
env = _tomtom_env(icon_category=icon, delay=300)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
assert wire.startswith(f"{expected_emoji} {expected_phrase}")
|
|
assert "Near Boise, ID" in wire
|
|
assert "I-84" in wire
|
|
assert "5 min delay" in wire
|
|
|
|
|
|
# ============================================================================
|
|
# (b) tomtom magnitude_of_delay == 0 events filtered
|
|
# ============================================================================
|
|
|
|
|
|
def test_b_tomtom_magnitude_zero_filtered(mem_db, no_photon):
|
|
env = _tomtom_env(icon_category=6, magnitude=0, delay=10)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is None
|
|
# filtered envelope leaves no traffic_events row
|
|
n_rows = mem_db.execute("SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
|
|
assert n_rows == 0
|
|
# but the event IS logged to event_log handled=0 for accounting
|
|
n_log = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM event_log WHERE source='tomtom_incidents'"
|
|
).fetchone()["n"]
|
|
assert n_log == 1
|
|
assert "_on_broadcast_committed" not in data
|
|
|
|
|
|
# ============================================================================
|
|
# (c) tomtom delay == null events render WITHOUT delay segment
|
|
# ============================================================================
|
|
|
|
|
|
def test_c_tomtom_delay_null_no_delay_segment(mem_db, no_photon):
|
|
env = _tomtom_env(icon_category=1, delay=None)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
assert "Crash" in wire
|
|
assert "min delay" not in wire # no delay segment
|
|
|
|
|
|
# ============================================================================
|
|
# (d) tomtom time_validity past/future filtered
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.parametrize("validity", ["past", "future"])
|
|
def test_d_tomtom_time_validity_filtered(mem_db, no_photon, validity):
|
|
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
|
|
time_validity=validity)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is None
|
|
n_rows = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
|
|
assert n_rows == 0
|
|
|
|
|
|
# ============================================================================
|
|
# (e) per-incident dedup -- republish with no change drops silently
|
|
# ============================================================================
|
|
|
|
|
|
def test_e_per_incident_dedup_no_change(mem_db, no_photon):
|
|
env = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
wire1 = handle_incident(env, env["subject"], data=data1, now=1_000_000)
|
|
assert wire1 is not None
|
|
_commit(data1, 1_000_001)
|
|
|
|
# Re-publish 5 minutes later, same magnitude/delay/icon.
|
|
data2 = {}
|
|
wire2 = handle_incident(env, env["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is None # no change, no broadcast
|
|
|
|
|
|
# ============================================================================
|
|
# (f) magnitude bump triggers Update
|
|
# ============================================================================
|
|
|
|
|
|
def test_f_magnitude_bump_triggers_update(mem_db, no_photon):
|
|
env1 = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
|
|
_commit(data1, 1_000_001)
|
|
|
|
# v0.5.9 REVISED gate (A): magnitude bump no longer fires Update.
|
|
# State still flips in traffic_events, but no wire string returns.
|
|
env2 = _tomtom_env(icon_category=6, magnitude=5, delay=300)
|
|
data2 = {}
|
|
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is None
|
|
# Current magnitude tracked in the row.
|
|
row = mem_db.execute(
|
|
"SELECT magnitude_of_delay FROM traffic_events "
|
|
"WHERE source='tomtom_incidents'").fetchone()
|
|
assert row["magnitude_of_delay"] == 5
|
|
|
|
|
|
# ============================================================================
|
|
# (g) delay double triggers Update
|
|
# ============================================================================
|
|
|
|
|
|
def test_g_delay_double_triggers_update(mem_db, no_photon):
|
|
env1 = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
|
|
_commit(data1, 1_000_001)
|
|
|
|
# v0.5.9 REVISED gate (A): delay double no longer fires Update.
|
|
env2 = _tomtom_env(icon_category=6, delay=700)
|
|
data2 = {}
|
|
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is None
|
|
row = mem_db.execute(
|
|
"SELECT delay_seconds FROM traffic_events "
|
|
"WHERE source='tomtom_incidents'").fetchone()
|
|
assert row["delay_seconds"] == 700
|
|
|
|
|
|
def test_g_delay_below_double_no_update(mem_db, no_photon):
|
|
"""delay 300 -> 500 (1.67x) should NOT trigger broadcast."""
|
|
env1 = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
|
|
_commit(data1, 1_000_001)
|
|
|
|
env2 = _tomtom_env(icon_category=6, delay=500)
|
|
data2 = {}
|
|
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is None
|
|
|
|
|
|
# ============================================================================
|
|
# (h) icon change triggers Update
|
|
# ============================================================================
|
|
|
|
|
|
def test_h_icon_change_triggers_update(mem_db, no_photon):
|
|
env1 = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
|
|
_commit(data1, 1_000_001)
|
|
|
|
# v0.5.9 REVISED gate (A): icon change no longer fires Update.
|
|
env2 = _tomtom_env(icon_category=8, delay=300)
|
|
data2 = {}
|
|
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is None
|
|
row = mem_db.execute(
|
|
"SELECT icon_category FROM traffic_events "
|
|
"WHERE source='tomtom_incidents'").fetchone()
|
|
assert row["icon_category"] == "road_closed"
|
|
|
|
|
|
# ============================================================================
|
|
# (j) state_511_atis incident parses
|
|
# ============================================================================
|
|
|
|
|
|
def test_j_state_511_incident_parses(mem_db, no_photon):
|
|
env = _state_511_env(layer="Incidents", category_prefix="incident",
|
|
event_sub_type="crash")
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
assert wire.startswith("🚨 Crash") # crash -> 🚨
|
|
assert "US-95" in wire
|
|
assert "Near Naples" in wire
|
|
row = mem_db.execute(
|
|
"SELECT source, sub_type, state FROM traffic_events "
|
|
"WHERE source='state_511_atis'").fetchone()
|
|
assert row["sub_type"] == "accident"
|
|
# v0.5.9 GAMMA: state_511_atis is non-ID only -- helper now defaults to WA.
|
|
assert row["state"] == "WA"
|
|
|
|
|
|
# ============================================================================
|
|
# (k) state_511_atis closure parses
|
|
# ============================================================================
|
|
|
|
|
|
def test_k_state_511_closure_parses(mem_db, no_photon):
|
|
env = _state_511_env(layer="Closures", category_prefix="closure",
|
|
event_sub_type="roadConstruction",
|
|
is_full_closure=True,
|
|
external_id="ID:Closures:33950")
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
# roadConstruction -> road_works -> 🚧
|
|
assert wire.startswith("🚧 Road Works")
|
|
assert "US-95" in wire
|
|
|
|
|
|
# ============================================================================
|
|
# (l) state_511_atis special_event parses
|
|
# ============================================================================
|
|
|
|
|
|
def test_l_state_511_special_event_parses(mem_db, no_photon):
|
|
env = _state_511_env(layer="Special Events",
|
|
category_prefix="special_event",
|
|
event_sub_type="parade",
|
|
external_id="ID:SpecialEvents:42")
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
# parade -> 🎪
|
|
assert wire.startswith("🎪 Parade")
|
|
assert "US-95" in wire
|
|
|
|
|
|
# ============================================================================
|
|
# (m) itd_511 incident parses
|
|
# ============================================================================
|
|
|
|
|
|
def test_m_itd_511_incident_parses(mem_db, no_photon):
|
|
env = _itd_511_env(category_prefix="incident",
|
|
event_type_short="incident",
|
|
event_sub_type="crash")
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
assert wire.startswith("🚨 Crash")
|
|
row = mem_db.execute(
|
|
"SELECT source, state FROM traffic_events "
|
|
"WHERE source='itd_511'").fetchone()
|
|
assert row["state"] == "ID"
|
|
|
|
|
|
# ============================================================================
|
|
# (n) decoupled callback -- cold-start scenario still emits New: on second pass
|
|
# ============================================================================
|
|
|
|
|
|
def test_n_cold_start_then_resume_still_new(mem_db, no_photon):
|
|
env = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
wire1 = handle_incident(env, env["subject"], data=data1, now=1_000_000)
|
|
assert wire1.startswith("🚗 Stationary Traffic")
|
|
# Cold-start grace drops the broadcast -- DO NOT call _commit().
|
|
|
|
# 5 minutes later, same incident republishes.
|
|
data2 = {}
|
|
wire2 = handle_incident(env, env["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is not None
|
|
assert wire2.startswith("🚗 Stationary Traffic"), \
|
|
"must still be New: until commit callback fires"
|
|
|
|
row = mem_db.execute(
|
|
"SELECT last_broadcast_at, last_broadcast_magnitude "
|
|
"FROM traffic_events WHERE source='tomtom_incidents'").fetchone()
|
|
assert row["last_broadcast_at"] is None
|
|
assert row["last_broadcast_magnitude"] is None
|
|
|
|
|
|
# ============================================================================
|
|
# (o) traffic_events row gets UPSERTed on each pass; event_log handled flips
|
|
# ============================================================================
|
|
|
|
|
|
def test_o_traffic_events_upsert_and_event_log_handled_flip(mem_db, no_photon):
|
|
env1 = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
|
|
|
|
# event_log row exists with handled=0 BEFORE callback.
|
|
el_pre = mem_db.execute(
|
|
"SELECT handled FROM event_log "
|
|
"WHERE source='tomtom_incidents' ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
assert el_pre["handled"] == 0
|
|
|
|
_commit(data1, 1_000_001)
|
|
|
|
el_post = mem_db.execute(
|
|
"SELECT handled FROM event_log "
|
|
"WHERE source='tomtom_incidents' ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
assert el_post["handled"] == 1
|
|
|
|
fr_post = mem_db.execute(
|
|
"SELECT last_broadcast_at, last_broadcast_magnitude, "
|
|
"last_broadcast_delay_seconds, last_broadcast_icon_category "
|
|
"FROM traffic_events WHERE source='tomtom_incidents'"
|
|
).fetchone()
|
|
assert fr_post["last_broadcast_at"] == 1_000_001
|
|
assert fr_post["last_broadcast_magnitude"] == 4
|
|
assert fr_post["last_broadcast_delay_seconds"] == 300
|
|
assert fr_post["last_broadcast_icon_category"] == "jam"
|
|
|
|
# Re-publish: UPSERT updates current_* but doesn't touch last_broadcast_*.
|
|
env2 = _tomtom_env(icon_category=6, delay=500)
|
|
data2 = {}
|
|
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
|
|
# v0.5.9 REVISED: no Update broadcasts regardless of delta size --
|
|
# under the OLD rule this was 'delay 1.67x not enough', now it's
|
|
# 'we never re-broadcast'.
|
|
assert wire2 is None
|
|
fr2 = mem_db.execute(
|
|
"SELECT delay_seconds, last_broadcast_delay_seconds "
|
|
"FROM traffic_events WHERE source='tomtom_incidents'"
|
|
).fetchone()
|
|
assert fr2["delay_seconds"] == 500 # UPSERT happened
|
|
assert fr2["last_broadcast_delay_seconds"] == 300 # last broadcast unchanged
|
|
|
|
|
|
# ============================================================================
|
|
# v0.5.9 REVISED -- conservative gates
|
|
# ============================================================================
|
|
|
|
|
|
def test_p_known_id_all_changed_no_broadcast(mem_db, no_photon):
|
|
"""Regression guard for the new no-Update rule. Republish the SAME
|
|
external_id with magnitude AND delay AND icon all changed. The old rule
|
|
would have triggered Update on any one of those; the new rule fires
|
|
nothing."""
|
|
env1 = _tomtom_env(icon_category=6, delay=300)
|
|
data1 = {}
|
|
wire1 = handle_incident(env1, env1["subject"], data=data1, now=1_000_000)
|
|
assert wire1.startswith("🚗 Stationary Traffic")
|
|
_commit(data1, 1_000_001)
|
|
|
|
env2 = _tomtom_env(icon_category=8, magnitude=5, delay=700) # ALL changed
|
|
data2 = {}
|
|
wire2 = handle_incident(env2, env2["subject"], data=data2, now=1_000_300)
|
|
assert wire2 is None, "no Update broadcasts under the v0.5.9 REVISED rule"
|
|
|
|
# State still tracks the latest field values.
|
|
row = mem_db.execute(
|
|
"SELECT magnitude_of_delay, delay_seconds, icon_category "
|
|
"FROM traffic_events WHERE source='tomtom_incidents'").fetchone()
|
|
assert row["magnitude_of_delay"] == 5
|
|
assert row["delay_seconds"] == 700
|
|
assert row["icon_category"] == "road_closed"
|
|
|
|
|
|
# ============================================================================
|
|
# Freshness gate (q/r/s)
|
|
# ============================================================================
|
|
|
|
|
|
def _now_anchor_relative(start_age_seconds: int):
|
|
"""Choose (now, start_time_iso) so that `now - parse(start_time) ==
|
|
start_age_seconds`. Used by the freshness-gate tests."""
|
|
import datetime as _dt
|
|
now = 2_000_000_000 # arbitrary fixed epoch >>1970
|
|
start_iso = _dt.datetime.fromtimestamp(
|
|
now - start_age_seconds, tz=_dt.timezone.utc
|
|
).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
return now, start_iso
|
|
|
|
|
|
def test_q_fresh_event_15min_ago_broadcasts(mem_db, no_photon):
|
|
"""Event started 15 min ago -- WITHIN the 30-min fresh window. New: fires."""
|
|
now, start_iso = _now_anchor_relative(15 * 60)
|
|
env = _tomtom_env(icon_category=6, delay=300,
|
|
start_time=start_iso)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=now)
|
|
assert wire is not None
|
|
assert wire.startswith("🚗 Stationary Traffic")
|
|
n_rows = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
|
|
assert n_rows == 1
|
|
|
|
|
|
def test_r_stale_event_45min_ago_dropped_no_row(mem_db, no_photon):
|
|
"""Event started 45 min ago -- OUTSIDE the 30-min window. Drop AT
|
|
handler entrance, no UPSERT into traffic_events, event_log handled=0."""
|
|
now, start_iso = _now_anchor_relative(45 * 60)
|
|
env = _tomtom_env(icon_category=6, magnitude=2, delay=300,
|
|
start_time=start_iso)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=now)
|
|
assert wire is None
|
|
|
|
n_rows = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
|
|
assert n_rows == 0, "no UPSERT for stale incidents"
|
|
|
|
n_log = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM event_log WHERE handled=0 "
|
|
"AND source='tomtom_incidents'").fetchone()["n"]
|
|
assert n_log == 1, "stale envelope must still be logged (handled=0)"
|
|
|
|
|
|
def test_s_null_start_time_default_allow(mem_db, no_photon):
|
|
"""start_time missing -> default-allow (treat as fresh and broadcast)."""
|
|
env = _tomtom_env(icon_category=6, delay=300,
|
|
start_time=None)
|
|
data = {}
|
|
wire = handle_incident(env, env["subject"], data=data, now=1_000_000)
|
|
assert wire is not None
|
|
assert wire.startswith("🚗 Stationary Traffic")
|
|
|
|
|
|
# ============================================================================
|
|
# Per-source startTime field path (t)
|
|
# ============================================================================
|
|
|
|
|
|
def test_t_state_511_start_date_path(mem_db, no_photon):
|
|
"""state_511_atis pulls start_time from inner.data.start_date ('5/28/26,
|
|
10:45 PM' format). Construct fresh (within 30 min) and stale (>30 min)
|
|
cases and confirm gate behavior is per-source-correct."""
|
|
# 5 min ago in the 511-date format.
|
|
import datetime as _dt
|
|
now = int(_dt.datetime(2026, 6, 4, 12, 0, tzinfo=_dt.timezone.utc).timestamp())
|
|
fresh_date = _dt.datetime.fromtimestamp(now - 5 * 60,
|
|
tz=_dt.timezone.utc).strftime(
|
|
"%-m/%-d/%y, %-I:%M %p")
|
|
env = _state_511_env(layer="Incidents", category_prefix="incident",
|
|
event_sub_type="crash")
|
|
# Replace the default start_date with a fresh one.
|
|
env["data"]["data"]["start_date"] = fresh_date
|
|
wire = handle_incident(env, env["subject"], data={}, now=now)
|
|
assert wire is not None
|
|
assert wire.startswith("🚨 Crash")
|
|
|
|
# Stale variant (45 min ago).
|
|
stale_date = _dt.datetime.fromtimestamp(now - 45 * 60,
|
|
tz=_dt.timezone.utc).strftime(
|
|
"%-m/%-d/%y, %-I:%M %p")
|
|
env2 = _state_511_env(layer="Incidents", category_prefix="incident",
|
|
event_sub_type="crash",
|
|
external_id="ID:Incidents:99999")
|
|
env2["data"]["data"]["start_date"] = stale_date
|
|
wire2 = handle_incident(env2, env2["subject"], data={}, now=now)
|
|
assert wire2 is None
|
|
|
|
|
|
def test_t_itd_511_start_epoch_path(mem_db, no_photon):
|
|
"""itd_511 pulls start_time from inner.data.start_epoch (Unix int)."""
|
|
now = 1_700_000_000
|
|
# Fresh (5 min ago) variant.
|
|
env = _itd_511_env(category_prefix="incident",
|
|
event_type_short="incident",
|
|
event_sub_type="crash")
|
|
env["data"]["data"]["start_epoch"] = now - 5 * 60
|
|
wire = handle_incident(env, env["subject"], data={}, now=now)
|
|
assert wire is not None
|
|
assert wire.startswith("🚨 Crash")
|
|
|
|
# Stale variant (45 min ago).
|
|
env2 = _itd_511_env(category_prefix="incident",
|
|
event_type_short="incident",
|
|
event_sub_type="crash",
|
|
external_id="ITD:469:99999")
|
|
env2["data"]["data"]["start_epoch"] = now - 45 * 60
|
|
wire2 = handle_incident(env2, env2["subject"], data={}, now=now)
|
|
assert wire2 is None
|
|
|
|
|
|
def test_t_tomtom_start_time_path(mem_db, no_photon):
|
|
"""tomtom pulls start_time from inner.data.start_time (ISO-8601)."""
|
|
now, fresh_iso = _now_anchor_relative(5 * 60)
|
|
env = _tomtom_env(icon_category=6, delay=300,
|
|
start_time=fresh_iso)
|
|
wire = handle_incident(env, env["subject"], data={}, now=now)
|
|
assert wire is not None
|
|
assert wire.startswith("🚗 Stationary Traffic")
|
|
|
|
_, stale_iso = _now_anchor_relative(45 * 60)
|
|
env2 = _tomtom_env(icon_category=6, delay=300,
|
|
start_time=stale_iso,
|
|
tti="11111111-2222-3333-4444-555555555555")
|
|
wire2 = handle_incident(env2, env2["subject"], data={}, now=now)
|
|
assert wire2 is None
|
|
|
|
|
|
# ============================================================================
|
|
# v0.5.9 GAMMA -- two-sided freshness gate + state_511 ID skip
|
|
# ============================================================================
|
|
|
|
|
|
def test_u_future_scheduled_event_dropped(mem_db, no_photon):
|
|
"""itd_511 work_zone envelopes can carry start_epoch in the future
|
|
(scheduled construction). The v0.5.9 REVISED one-sided gate let those
|
|
slip through. The GAMMA fix rejects negative ages too."""
|
|
import datetime as _dt
|
|
now = 2_000_000_000
|
|
# start_epoch 8 hours in the future
|
|
env = _itd_511_env(category_prefix="incident",
|
|
event_type_short="incident",
|
|
event_sub_type="crash",
|
|
external_id="ITD:future:1")
|
|
env["data"]["data"]["start_epoch"] = now + 8 * 3600
|
|
wire = handle_incident(env, env["subject"], data={}, now=now)
|
|
assert wire is None
|
|
n_rows = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
|
|
assert n_rows == 0
|
|
n_log = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM event_log WHERE handled=0 "
|
|
"AND source='itd_511'").fetchone()["n"]
|
|
assert n_log == 1
|
|
|
|
|
|
def test_v_state_511_id_skipped_at_handler_entrance(mem_db, no_photon):
|
|
"""state_511_atis with state_code='ID' is skipped at handler entrance --
|
|
no parse, no traffic_events row, event_log records the skip."""
|
|
env = _state_511_env(layer="Incidents", category_prefix="incident",
|
|
event_sub_type="crash")
|
|
# Override defaults (post-GAMMA helper defaults to WA) to ID for this test.
|
|
env["data"]["data"]["state_code"] = "ID"
|
|
env["data"]["data"]["state"] = "Idaho"
|
|
env["data"]["geo"]["primary_region"] = "US-ID"
|
|
wire = handle_incident(env, env["subject"], data={}, now=1_000_000)
|
|
assert wire is None
|
|
n_rows = mem_db.execute(
|
|
"SELECT COUNT(*) AS n FROM traffic_events").fetchone()["n"]
|
|
assert n_rows == 0
|
|
row = mem_db.execute(
|
|
"SELECT category, handled FROM event_log "
|
|
"WHERE source='state_511_atis' ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
assert row is not None
|
|
assert "|skip_id" in row["category"]
|
|
assert row["handled"] == 0
|
|
|
|
|
|
def test_v_state_511_non_id_still_processed(mem_db, no_photon):
|
|
"""Regression guard: state_511_atis with state_code='WA' (or anything
|
|
that's not ID) keeps going through the handler. After Idaho cutover
|
|
we still need state_511 for neighbor coverage."""
|
|
env = _state_511_env(layer="Incidents", category_prefix="incident",
|
|
event_sub_type="crash", county="Spokane")
|
|
# Override state_code to WA.
|
|
env["data"]["data"]["state_code"] = "WA"
|
|
env["data"]["geo"]["primary_region"] = "US-WA"
|
|
wire = handle_incident(env, env["subject"], data={}, now=1_000_000)
|
|
assert wire is not None
|
|
assert wire.startswith("🚨 Crash")
|
|
# traffic_events row written for WA event.
|
|
row = mem_db.execute(
|
|
"SELECT state FROM traffic_events WHERE source='state_511_atis'"
|
|
).fetchone()
|
|
assert row is not None
|
|
assert row["state"] == "WA"
|
|
|
|
|
|
def test_w_itd_511_future_scheduled_dropped_via_start_epoch(mem_db, no_photon):
|
|
"""Direct exercise of the itd_511 start_epoch field path with a
|
|
future-scheduled value (the Phase-1 leak source). Confirms the gate
|
|
really uses start_epoch and the new two-sided check catches it."""
|
|
import meshai.central.incident_handler as h
|
|
env = _itd_511_env(category_prefix="incident",
|
|
event_type_short="incident",
|
|
event_sub_type="crash",
|
|
external_id="ITD:future:work")
|
|
env["data"]["data"]["start_epoch"] = 99_000_000_000 # year ~5108 -- definitely future
|
|
se = h._extract_start_time_epoch(env, "itd_511")
|
|
assert se == 99_000_000_000
|
|
now = 2_000_000_000
|
|
wire = handle_incident(env, env["subject"], data={}, now=now)
|
|
assert wire is None
|