feat(v0.5.8b): persistence foundation + WFIGS handler + universal cold-start grace

Three integrated pieces that ship together because they were designed as one safety story: (1) PERSISTENCE FOUNDATION -- new meshai/persistence/ module with SQLite db.py, schema migration framework (v1), 13 tables covering all adapter event shapes (traffic_events, fires, firms_pixels, quake_events, nws_alerts, gauge_readings, swpc_events) + mesh state (mesh_nodes, mesh_telemetry, mesh_positions, mesh_messages_in, mesh_broadcasts_out, mesh_health_events) + cross-cutting event_log + schema_meta. WAL mode for reader concurrency, single-writer pattern, MESHAI_DB_PATH env var, mounted at /data/meshai.sqlite via existing docker-compose meshai_data volume. .gitignore updated. (2) WFIGS HANDLER -- meshai/central/wfigs_handler.py implements the first per-adapter handler that uses the persistence layer. Format: MEDIUM style with town/landclass/county fallback chain, lat/lon at 3-decimal precision, New:/Update: prefix. 8h-rate-limited change-detection per IRWIN via fires.last_broadcast_at. Skips tombstones and perimeters silently (logged to event_log with handled=0). Acres fallback chain DailyAcres -> IncidentSize -> raw.DiscoveryAcres -> raw.FinalAcres -> N/A. Pass-through Initial Attack auto-numbered names (IA 1, IA 2). (3) UNIVERSAL COLD-START GRACE -- meshai/notifications/pipeline/dispatcher.py grows a configurable grace window (cold_start_grace_seconds, default 60s, GUI-editable per Rule 17). Anchored to first-event-seen (not container boot), so the grace activates the moment broadcasts could fire. Suppresses mesh delivery during the window; handler-side persistence (fires UPSERT, event_log) still happens normally. New _cold_start_dropped counter exposed in dispatch_stats(). Designed to protect against JetStream backlog spam at toggle-flip time, applies universally to ALL adapters. (4) WFIGS HANDLER CALLBACK REFACTOR -- New:/Update: prefix now keys on fires.last_broadcast_at IS NULL (not row-missing), and last_broadcast_* field updates moved to a post-broadcast commit callback that the dispatcher invokes ONLY on successful delivery. This means: cold-start-suppressed events leave fires.last_broadcast_at NULL, so when they eventually broadcast post-grace, they correctly render as New: (first ACTUAL delivery for that IRWIN), not Update:. event_log.handled and mesh_broadcasts_out audit row also gated on the same callback -- decoupling persistence rows from broadcast rows for an honest audit trail. New tests: 15 in test_wfigs_handler.py, 15 in test_persistence.py, additional cold-start grace tests in test_dispatcher.py (+4 WFIGS callback scenarios). Synthetic probes wfigs-cleaned-samples.md (initial) and wfigs-cleaned-samples-v2.md (cold-start verification) generated against isolated temp SQLite databases. CT108 /data/meshai.sqlite untouched during build. Master stays off. No live toggle flips. Test count: was 535 (v0.5.7 baseline) -> 566 (persistence) -> 581 (wfigs handler) -> 589 expected (cold-start grace).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-06-05 03:52:58 +00:00
commit 053d67db6e
16 changed files with 2652 additions and 1 deletions

View file

@ -392,3 +392,284 @@ def test_geocoder_name_is_never_used_as_town_fallback(monkeypatch):
n = normalize(env)
# Must NOT pick up "Cache Nf Road 444" from geocoder.name.
assert n["town"] is None
# ============================================================================
# v0.5.8-wzdx federal parser tests
# ============================================================================
# --- representative envelopes (flat shape, as Central actually publishes) ---
_WZDX_ID_FULL = {
"data": {
"adapter": "wzdx",
"category": "work_zone.wzdx",
"time": "2026-06-01T13:00:00Z",
"severity": 3,
"geo": {"centroid": [-112.408309608311, 43.0208066348276],
"primary_region": "US-ID", "regions": ["US-ID"]},
"data": {
"road_names": ["Exit 80 On Ramp"],
"direction": "southbound",
"description": " Road construction on Exit 80 On Ramp Southbound near MM (80)."
" All lanes closed. 6/1/2026 7:00 AM to 6/10/2026 6:00 PM Mon, Tue ...",
"vehicle_impact": "all-lanes-closed",
"event_status": None,
"start_date": "2026-06-01T13:00:00Z",
"end_date": "2026-06-11T00:00:00Z",
"data_source_id": "ERS",
"feed_name": "iddot",
"feed_state_code": "ID",
"latitude": 43.0208066348276,
"longitude": -112.408309608311,
"_enriched": {"geocoder": {"city": None, "name": "Ross Fork Creek",
"county": "Bannock", "state": "Idaho"}},
},
},
}
_WZDX_WA = {
"data": {
"adapter": "wzdx",
"category": "work_zone.wzdx",
"time": "2026-06-01T00:00:00+00:00",
"severity": 1,
"geo": {"centroid": [-117.33633, 46.433365], "primary_region": "US-WA"},
"data": {
"road_names": ["012"],
"direction": "westbound",
"description": "Contract - XE3608 SR 12",
"vehicle_impact": "unknown",
"event_status": "pending",
"start_date": "2026-06-01T00:00:00+00:00",
"end_date": "2026-06-05T00:00:00+00:00",
"data_source_id": "WSDOT-WZDB",
"feed_name": "wsdot",
"feed_state_code": "WA",
"latitude": 46.433365,
"longitude": -117.33633,
"_enriched": {"geocoder": {"city": None, "name": "US Highway 12",
"county": "Garfield", "state": "Washington"}},
},
},
}
_WZDX_MCCALL = {
"data": {
"adapter": "wzdx", "category": "work_zone.wzdx",
"time": "2026-05-28T23:00:00Z", "severity": 1,
"geo": {"centroid": [-116.09759, 44.9065083834611], "primary_region": "US-ID"},
"data": {
"road_names": ["SH-55"],
"direction": "unknown",
"description": " Emergency repairs on SH-55 Both Directions near Washington St."
" 5/28/2026 5:00 PM to 5/29/2026 8:00 AM Thu, Fri: ...",
"vehicle_impact": "all-lanes-open",
"start_date": "2026-05-28T23:00:00Z",
"end_date": "2026-05-29T14:00:00Z",
"feed_state_code": "ID",
"latitude": 44.9065083834611,
"longitude": -116.09759,
"_enriched": {"geocoder": {"city": "McCall", "county": "Valley", "state": "ID"}},
},
},
}
def _normalize_wzdx(env):
n = normalize(env)
assert n is not None
assert n["source"] == "wzdx"
return n
# --- (a) Idaho wzdx full-field parse ---------------------------------------
def test_wzdx_idaho_full_fields_normalized(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
# Mock Photon for the SECONDARY town path (city is null in this envelope).
monkeypatch.setattr(cn, "_photon_reverse_places",
lambda lat, lon: [
{"geometry": {"coordinates": [-112.4373, 43.0299]},
"properties": {"name": "Fort Hall",
"osm_key": "place", "osm_value": "village"}},
])
n = _normalize_wzdx(_WZDX_ID_FULL)
assert n["road"] is None # Exit-ramp pattern → uninformative-road drop
assert n["direction"] == "southbound"
# sub_type combines impact-phrase (suppressed under full-closure) + work_type
# (None here — types_of_work absent). With full-closure, sub_type stays None
# and the renderer prepends "all lanes closed".
assert n["sub_type"] is None
assert n["impact"] == "full_closure"
assert n["mile_start"] == 80 and n["mile_end"] is None
assert n["town"] == "Fort Hall" # via Photon nearest_town
assert isinstance(n["ends_at"], datetime)
assert n["ends_at"].year == 2026 and n["ends_at"].month == 6 and n["ends_at"].day == 11
def test_wzdx_wa_road_passes_through_verbatim(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
n = _normalize_wzdx(_WZDX_WA)
# Per spec: "honor upstream verbatim, no expansion" -- raw '012' passes through.
assert n["road"] == "012"
assert n["direction"] == "westbound"
# vehicle_impact='unknown' → impact_phrase=None; sub_type stays None.
assert n["sub_type"] is None
assert n["impact"] == "partial"
# No MM in WA descriptions; mile_start stays None.
assert n["mile_start"] is None
assert isinstance(n["ends_at"], datetime)
assert n["town"] is None # city null + Photon returned no places
# --- (c) vehicle_impact mapping for each main value ------------------------
@pytest.mark.parametrize("vi_raw,expected_sub_type,expected_impact", [
("all-lanes-closed", None, "full_closure"),
("some-lanes-closed", "lanes reduced", "partial"),
("alternating-one-way", "one-way alternating", "partial"),
("unknown", None, "partial"),
("all-lanes-open", None, "partial"),
("totally-made-up", None, "partial"),
])
def test_wzdx_vehicle_impact_mapping(vi_raw, expected_sub_type, expected_impact, monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
env = {"data": {"adapter": "wzdx", "category": "work_zone.wzdx", "time": "2026-06-01T00:00:00Z",
"geo": {"centroid": [-116.0, 44.0]},
"data": {"road_names": ["SH-1"], "direction": "northbound",
"description": "X", "vehicle_impact": vi_raw,
"end_date": "2026-06-05T17:00:00Z",
"latitude": 44.0, "longitude": -116.0,
"_enriched": {"geocoder": {"city": "Boise"}}}}}
n = normalize(env)
assert n["sub_type"] == expected_sub_type
assert n["impact"] == expected_impact
# --- (d) structured end_date parses to friendly format --------------------
def test_wzdx_end_date_iso_parsed_to_datetime(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
env = {"data": {"adapter": "wzdx", "category": "work_zone.wzdx",
"geo": {"centroid": [-116.0, 44.0]},
"data": {"road_names": ["SH-1"], "direction": "northbound",
"description": "x", "vehicle_impact": "unknown",
"end_date": "2026-06-15T18:30:00+00:00",
"latitude": 44.0, "longitude": -116.0,
"_enriched": {"geocoder": {"city": "Boise"}}}}}
n = normalize(env)
assert isinstance(n["ends_at"], datetime)
assert n["ends_at"].month == 6 and n["ends_at"].day == 15
assert n["ends_at"].hour in (18, 11, 12) # depending on local-tz coercion
# --- (e) MM regex extraction on ID description ----------------------------
def test_wzdx_mile_post_regex_from_description(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
env = {"data": {"adapter": "wzdx", "category": "work_zone.wzdx",
"geo": {"centroid": [-116.0, 44.0]},
"data": {"road_names": ["I-15"], "direction": "southbound",
"description": "Bridge work on I-15 SB from MM (89) to MM (93). 6/1/2026 7:00 AM to 6/3/2026 5:00 PM",
"vehicle_impact": "some-lanes-closed",
"end_date": "2026-06-03T22:00:00Z",
"latitude": 44.0, "longitude": -116.0,
"_enriched": {"geocoder": {"city": "Blackfoot"}}}}}
n = normalize(env)
assert n["mile_start"] == 89
assert n["mile_end"] == 93
# --- (f) WA event without MM yields mile_start=None -----------------------
def test_wzdx_wa_no_mm_in_description(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
n = _normalize_wzdx(_WZDX_WA)
assert n["mile_start"] is None
assert n["mile_end"] is None
# --- (g) town fallback chain ----------------------------------------------
def test_wzdx_town_uses_geocoder_city_when_present(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
calls = []
monkeypatch.setattr(cn, "_photon_reverse_places",
lambda lat, lon: calls.append("called") or [])
n = _normalize_wzdx(_WZDX_MCCALL)
assert n["town"] == "McCall"
assert calls == [] # city present → Photon NOT called
def test_wzdx_town_falls_back_to_nearest_town_when_city_null(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places",
lambda lat, lon: [
{"geometry": {"coordinates": [-117.293, 46.475]},
"properties": {"name": "Pomeroy",
"osm_key": "place", "osm_value": "city"}},
])
n = _normalize_wzdx(_WZDX_WA)
assert n["town"] == "Pomeroy"
# --- adapter dispatch routes wzdx → _parse_wzdx_federal -------------------
def test_wzdx_adapter_routes_to_wzdx_parser(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
n = normalize(_WZDX_WA)
assert n is not None
assert n["source"] == "wzdx"
# --- work_type from types_of_work or event_type ---------------------------
def test_wzdx_sub_type_from_types_of_work(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
env = {"data": {"adapter": "wzdx", "category": "work_zone.wzdx",
"geo": {"centroid": [-116.0, 44.0]},
"data": {"road_names": ["SH-1"], "direction": "both",
"description": "x",
"types_of_work": [{"type_name": "paving"}],
"vehicle_impact": "some-lanes-closed",
"end_date": "2026-06-05T17:00:00Z",
"latitude": 44.0, "longitude": -116.0,
"_enriched": {"geocoder": {"city": "Boise"}}}}}
n = normalize(env)
# Folded form: impact_phrase + work_type (paving)
assert n["sub_type"] == "lanes reduced, paving"
def test_wzdx_sub_type_unknown_vocab_is_lowercased_with_spaces(monkeypatch):
_clear_h3_cache()
from meshai import central_normalizer as cn
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
env = {"data": {"adapter": "wzdx", "category": "work_zone.wzdx",
"geo": {"centroid": [-116.0, 44.0]},
"data": {"road_names": ["SH-1"], "direction": "northbound",
"description": "x",
"types_of_work": [{"type_name": "Some-Custom-Work"}],
"vehicle_impact": "all-lanes-open",
"end_date": "2026-06-05T17:00:00Z",
"latitude": 44.0, "longitude": -116.0,
"_enriched": {"geocoder": {"city": "Boise"}}}}}
n = normalize(env)
assert n["sub_type"] == "some custom work" # lowercased + hyphens→spaces

View file

@ -0,0 +1,136 @@
"""v0.5.8b — cold-start grace + post-broadcast commit hook in Dispatcher.
The grace window suppresses mesh broadcasts for N seconds after the FIRST
event the dispatcher sees through an enabled toggle. The persistence layer
(handler-side) has already run by then, so fires/event_log rows exist;
only the broadcast (and the mesh_broadcasts_out audit + last_broadcast_*
callback) is gated.
"""
import asyncio
import time
from unittest.mock import patch
import pytest
from meshai.config import Config
from meshai.notifications.events import make_event
from meshai.notifications.pipeline.dispatcher import Dispatcher
# ---------- helpers -------------------------------------------------------
class RecChannel:
def __init__(self, rec):
self.rec = rec
async def deliver(self, payload, rule):
self.rec.append({
"name": rule.name,
"message": payload.message,
"delivery_type": rule.delivery_type,
})
return True
def _cfg(*, cold_start_grace_seconds=60, toggle_name="weather"):
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = cold_start_grace_seconds
t = cfg.notifications.toggles[toggle_name]
t.enabled = True
t.min_severity = "routine"
t.severity_channels = {"routine": ["mesh_broadcast"]}
# Wide-open v0.5.2 gates so the cold-start gate is the only thing
# that can drop these events.
t.freshness_seconds = 0
t.cooldown_seconds = 0
return cfg
def _ev(*, source="nws", category="weather_warning",
severity="routine", title="t", **kw):
return make_event(
source=source, category=category, severity=severity,
title=title, timestamp=time.time(), **kw,
)
def _make(cfg):
rec: list = []
d = Dispatcher(cfg, lambda r, c: RecChannel(rec), connector=None)
return d, rec
# ---------- (a) first event during grace ----------------------------------
def test_cold_start_grace_drops_first_event_inside_window():
cfg = _cfg(cold_start_grace_seconds=60)
d, rec = _make(cfg)
fake_now = 1_000_000.0
with patch("meshai.notifications.pipeline.dispatcher.time.time",
return_value=fake_now):
asyncio.run(d.dispatch(_ev()))
assert rec == [], "broadcast must be suppressed inside grace window"
stats = d.dispatch_stats()
assert stats["cold_start_dropped"] == 1
assert stats["cold_start_anchor_at"] == fake_now
# ---------- (b) event arriving 30s into grace -- still dropped -----------
def test_cold_start_grace_drops_event_partway_through_window():
cfg = _cfg(cold_start_grace_seconds=60)
d, rec = _make(cfg)
t0 = 2_000_000.0
# First event anchors the window.
with patch("meshai.notifications.pipeline.dispatcher.time.time",
return_value=t0):
asyncio.run(d.dispatch(_ev()))
# 30s in, still inside the 60s window.
with patch("meshai.notifications.pipeline.dispatcher.time.time",
return_value=t0 + 30):
asyncio.run(d.dispatch(_ev()))
assert rec == []
assert d.dispatch_stats()["cold_start_dropped"] == 2
# ---------- (c) event 61s after first -- broadcasts ----------------------
def test_cold_start_grace_passes_event_after_window():
cfg = _cfg(cold_start_grace_seconds=60)
d, rec = _make(cfg)
t0 = 3_000_000.0
with patch("meshai.notifications.pipeline.dispatcher.time.time",
return_value=t0):
asyncio.run(d.dispatch(_ev())) # dropped
with patch("meshai.notifications.pipeline.dispatcher.time.time",
return_value=t0 + 61):
asyncio.run(d.dispatch(_ev())) # broadcasts
assert len(rec) == 1
stats = d.dispatch_stats()
assert stats["cold_start_dropped"] == 1
# ---------- (d) grace = 0 disables the feature ----------------------------
def test_cold_start_grace_zero_disables_feature():
cfg = _cfg(cold_start_grace_seconds=0)
d, rec = _make(cfg)
asyncio.run(d.dispatch(_ev()))
assert len(rec) == 1
stats = d.dispatch_stats()
assert stats["cold_start_dropped"] == 0
# Anchor not set when grace disabled (no gate ran).
assert stats["cold_start_anchor_at"] is None

View file

@ -32,6 +32,7 @@ def _dispatch(cfg, event):
def _cfg(enable="weather", **kw):
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy tests
t = cfg.notifications.toggles[enable]
t.enabled = True
t.min_severity = kw.get("min_severity", "priority")
@ -47,6 +48,7 @@ def _ev(severity="priority", category="weather_warning", region=None, regions=No
def test_disabled_toggle_no_dispatch():
cfg = Config(); cfg.notifications.rules = [] # weather disabled by default
cfg.notifications.cold_start_grace_seconds = 0
assert _dispatch(cfg, _ev()) == []
@ -107,6 +109,7 @@ def test_quiet_hours_override_immediate_only():
def test_category_maps_to_correct_family():
# seismic family toggle handles earthquake_event via get_toggle fallback
cfg = Config(); cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy test
cfg.notifications.toggles["seismic"].enabled = True
cfg.notifications.toggles["seismic"].severity_channels = {"priority": ["mesh_broadcast"]}
rec = _dispatch(cfg, _ev(severity="priority", category="earthquake_event"))

339
tests/test_persistence.py Normal file
View file

@ -0,0 +1,339 @@
"""Tests for the meshai/persistence foundation (v1 schema).
Foundation-only: no adapter handlers wired yet. Tests cover migration
runner, schema shape, idempotency, WAL mode, env-var path resolution,
in-memory mode, and basic insert/query against representative tables.
"""
import os
import sqlite3
import tempfile
import time
from pathlib import Path
import pytest
from meshai.persistence import (
MESHAI_DB_PATH,
SCHEMA_VERSION,
close_thread_connection,
get_db,
init_db,
)
from meshai.persistence import db as persistence_db
# ---------- helpers --------------------------------------------------------
def _force_reinit(monkeypatch, db_path):
"""Point every get_db() at a fresh DB and reset the per-process init
cache + thread-local connection so tests don't bleed into one another."""
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
# Wipe the initialised-set so migrations re-run against the new file.
persistence_db._initialised.clear()
close_thread_connection()
@pytest.fixture
def tmp_db(monkeypatch, tmp_path):
db = str(tmp_path / "meshai-test.sqlite")
_force_reinit(monkeypatch, db)
yield db
close_thread_connection()
persistence_db._initialised.discard(db)
# ---------- migration runner ----------------------------------------------
# Every table the v1 schema is contracted to produce. The migration runner
# is tested by checking that ALL of these exist after init.
_V1_TABLES = {
"schema_meta",
"event_log",
"traffic_events",
"fires",
"firms_pixels",
"quake_events",
"nws_alerts",
"gauge_readings",
"swpc_events",
"mesh_nodes",
"mesh_telemetry",
"mesh_positions",
"mesh_messages_in",
"mesh_broadcasts_out",
"mesh_health_events",
}
def test_migration_v1_creates_all_tables(tmp_db):
conn = init_db()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' "
"AND name NOT LIKE 'sqlite_%' AND name NOT LIKE 'idx_%'"
).fetchall()
names = {r["name"] for r in rows}
missing = _V1_TABLES - names
extra = names - _V1_TABLES
assert not missing, f"missing tables after v1: {sorted(missing)}"
# extras are fine (autoindex tables etc.); just make sure we didn't
# leave the contract unfulfilled.
assert len(names) >= len(_V1_TABLES)
def test_schema_version_recorded(tmp_db):
conn = init_db()
row = conn.execute("SELECT value FROM schema_meta WHERE key='version'").fetchone()
assert row is not None
assert int(row["value"]) == SCHEMA_VERSION
def test_migration_idempotent_rerun(tmp_db):
init_db()
# Force a "second startup" by closing the connection and clearing the
# per-process initialised cache so the migration runner re-evaluates.
close_thread_connection()
persistence_db._initialised.discard(tmp_db)
conn = init_db()
# No exception means we didn't try to CREATE TABLE twice; verify the
# row count of schema_meta hasn't drifted.
rows = conn.execute("SELECT COUNT(*) AS n FROM schema_meta").fetchone()
assert rows["n"] >= 1
# ---------- WAL mode ------------------------------------------------------
def test_wal_mode_enabled_on_file_db(tmp_db):
conn = init_db()
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode.lower() == "wal", f"expected WAL, got {mode!r}"
# ---------- MESHAI_DB_PATH resolution -------------------------------------
def test_meshai_db_path_env_var_respected(monkeypatch, tmp_path):
custom = str(tmp_path / "custom-path.sqlite")
monkeypatch.setenv("MESHAI_DB_PATH", custom)
assert MESHAI_DB_PATH() == custom
def test_meshai_db_path_falls_back_to_default(monkeypatch):
monkeypatch.delenv("MESHAI_DB_PATH", raising=False)
assert MESHAI_DB_PATH() == persistence_db.DEFAULT_DB_PATH
# ---------- in-memory mode for tests --------------------------------------
def test_in_memory_db_works(monkeypatch):
# Special-cased :memory: routes via uri=shared-cache so init runs cleanly.
monkeypatch.setenv("MESHAI_DB_PATH", ":memory:")
persistence_db._initialised.clear()
close_thread_connection()
conn = init_db()
# All v1 tables present even in memory.
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' "
"AND name NOT LIKE 'sqlite_%'"
).fetchall()
names = {r["name"] for r in rows}
assert _V1_TABLES.issubset(names)
close_thread_connection()
# ---------- basic insert/query: fires -------------------------------------
def test_fires_insert_and_query(tmp_db):
conn = init_db()
now = int(time.time())
irwin = "{E7FCBC00-2D0A-49D6-889F-550D4EDCBFD6}"
conn.execute(
"INSERT INTO fires(irwin_id, incident_name, incident_type, "
"current_acres, current_contained_pct, status, lat, lon, "
"county, state, landclass, declared_at, last_event_at) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)",
(irwin, "IA 1", "wildfire", None, None, "active",
43.5213, -115.1665, "Elmore", "ID", "Sawtooth National Forest",
now - 3600, now),
)
row = conn.execute(
"SELECT incident_name, county, state, landclass, last_event_at "
"FROM fires WHERE irwin_id = ?", (irwin,)).fetchone()
assert row is not None
assert row["incident_name"] == "IA 1"
assert row["county"] == "Elmore"
assert row["state"] == "ID"
assert row["landclass"] == "Sawtooth National Forest"
assert row["last_event_at"] == now
def test_fires_last_broadcast_change_detection_columns(tmp_db):
"""The 8h-rate-limit change-detection logic uses last_broadcast_acres
and last_broadcast_contained -- verify they accept NULL and updates."""
conn = init_db()
now = int(time.time())
irwin = "{ABC}"
conn.execute(
"INSERT INTO fires(irwin_id, incident_name, last_event_at, "
"last_broadcast_at, last_broadcast_acres, last_broadcast_contained) "
"VALUES (?,?,?,?,?,?)",
(irwin, "X", now, now, 1847.0, 23),
)
row = conn.execute(
"SELECT last_broadcast_acres, last_broadcast_contained "
"FROM fires WHERE irwin_id = ?", (irwin,)).fetchone()
assert row["last_broadcast_acres"] == 1847.0
assert row["last_broadcast_contained"] == 23
# ---------- basic insert/query: mesh_nodes -------------------------------
def test_mesh_nodes_insert_and_query(tmp_db):
conn = init_db()
now = int(time.time())
conn.execute(
"INSERT INTO mesh_nodes(node_id, long_name, short_name, hw_model, "
"last_lat, last_lon, last_battery_pct, first_seen_at, last_seen_at) "
"VALUES (?,?,?,?,?,?,?,?,?)",
("!85098cea", "AIDA Northgate", "AIDA", "TBEAM",
43.6535, -116.2674, 88, now - 86400, now),
)
row = conn.execute(
"SELECT long_name, short_name, last_battery_pct, is_stale "
"FROM mesh_nodes WHERE node_id = ?", ("!85098cea",)).fetchone()
assert row is not None
assert row["long_name"] == "AIDA Northgate"
assert row["last_battery_pct"] == 88
assert row["is_stale"] == 0 # default
# ---------- traffic_events composite PK ----------------------------------
def test_traffic_events_composite_pk_uniqueness(tmp_db):
conn = init_db()
now = int(time.time())
conn.execute(
"INSERT INTO traffic_events(source, external_id, road, county, state, "
"first_seen_at, last_seen_at) VALUES (?,?,?,?,?,?,?)",
("state_511_atis", "ID:Construction:33868", "I-86", "Bannock", "ID", now, now),
)
# Same source + external_id should violate the PK constraint.
with pytest.raises(sqlite3.IntegrityError):
conn.execute(
"INSERT INTO traffic_events(source, external_id, road, county, state, "
"first_seen_at, last_seen_at) VALUES (?,?,?,?,?,?,?)",
("state_511_atis", "ID:Construction:33868", "I-86", "Bannock", "ID", now, now),
)
# Different source with same external_id is fine.
conn.execute(
"INSERT INTO traffic_events(source, external_id, road, county, state, "
"first_seen_at, last_seen_at) VALUES (?,?,?,?,?,?,?)",
("wzdx", "ID:Construction:33868", "I-86", "Bannock", "ID", now, now),
)
rows = conn.execute("SELECT COUNT(*) AS n FROM traffic_events").fetchone()
assert rows["n"] == 2
# ---------- event_log basic ------------------------------------------------
def test_event_log_insert_and_query(tmp_db):
conn = init_db()
now = int(time.time())
conn.execute(
"INSERT INTO event_log(received_at, source, category, severity_word, "
"event_id_external, nats_subject, handled, table_name, table_pk) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(now, "wfigs_incidents", "fire.incident.wildfire", "routine",
"{E7FCBC00}", "central.fire.incident.id.elmore", 1, "fires", "{E7FCBC00}"),
)
row = conn.execute(
"SELECT source, table_name, handled FROM event_log "
"WHERE event_id_external = ?", ("{E7FCBC00}",)).fetchone()
assert row["source"] == "wfigs_incidents"
assert row["table_name"] == "fires"
assert row["handled"] == 1
# ---------- firms_pixels nullable FK to fires ----------------------------
def test_firms_pixels_fk_to_fires_nullable(tmp_db):
conn = init_db()
now = int(time.time())
# Unattached pixel (no irwin_id yet -- v0.6 fire-tracker will attach later).
conn.execute(
"INSERT INTO firms_pixels(lat, lon, acq_time, frp, confidence, satellite) "
"VALUES (?,?,?,?,?,?)",
(42.197, -113.710, now, 135.93, "high", "N"),
)
row = conn.execute(
"SELECT irwin_id, frp, confidence FROM firms_pixels "
"WHERE acq_time = ?", (now,)).fetchone()
assert row is not None
assert row["irwin_id"] is None # nullable
assert row["frp"] == 135.93
# Insert a fire, then attach a new pixel via the FK.
irwin = "{LINK-IRWIN}"
conn.execute(
"INSERT INTO fires(irwin_id, incident_name, last_event_at) "
"VALUES (?,?,?)", (irwin, "Linked Fire", now),
)
conn.execute(
"INSERT INTO firms_pixels(irwin_id, lat, lon, acq_time, frp) "
"VALUES (?,?,?,?,?)", (irwin, 42.197, -113.710, now + 1, 50.0),
)
rows = conn.execute(
"SELECT COUNT(*) AS n FROM firms_pixels WHERE irwin_id = ?",
(irwin,)).fetchone()
assert rows["n"] == 1
# ---------- indexes present (spot-check) ---------------------------------
def test_v1_indexes_created(tmp_db):
conn = init_db()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' "
"AND name LIKE 'idx_%'"
).fetchall()
names = {r["name"] for r in rows}
must_have = {
"idx_event_log_received",
"idx_traffic_last_seen",
"idx_fires_last_event",
"idx_firms_pixels_acq_time",
"idx_mesh_tel_node_time",
"idx_mesh_pos_node_time",
"idx_mesh_nodes_last_seen",
"idx_gauge_site_time",
}
missing = must_have - names
assert not missing, f"missing indexes: {sorted(missing)}"
# ---------- gauge_readings autoincrement -------------------------------
def test_gauge_readings_autoincrement_pk(tmp_db):
conn = init_db()
now = int(time.time())
for value in (3490.0, 3520.0, 3505.0):
conn.execute(
"INSERT INTO gauge_readings(site_id, reading_value, reading_unit, "
"reading_time) VALUES (?,?,?,?)",
("USGS-13038000", value, "ft^3/s", now),
)
rows = conn.execute(
"SELECT COUNT(*) AS n FROM gauge_readings WHERE site_id = ?",
("USGS-13038000",)).fetchone()
assert rows["n"] == 3

View file

@ -58,6 +58,10 @@ def _cfg(toggle_name="weather", **kw):
"""Default config: one toggle enabled with mesh_broadcast on priority."""
cfg = Config()
cfg.notifications.rules = []
# v0.5.8b: disable the cold-start grace for these tests -- they
# exercise the v0.5.2 guards in isolation and expect the first
# event to broadcast.
cfg.notifications.cold_start_grace_seconds = 0
t = cfg.notifications.toggles[toggle_name]
t.enabled = True
t.min_severity = kw.get("min_severity", "routine")
@ -280,6 +284,7 @@ def test_hydro_event_maps_to_geohazards_toggle():
toggle alone must NOT fire on them anymore."""
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0
# Enable BOTH weather and seismic toggles so we can prove routing.
cfg.notifications.toggles["weather"].enabled = True
cfg.notifications.toggles["weather"].min_severity = "routine"
@ -308,6 +313,7 @@ def test_hydro_high_water_also_seismic():
"""Same as above for stream_high_water (the lower-severity sibling)."""
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0
cfg.notifications.toggles["seismic"].enabled = True
cfg.notifications.toggles["seismic"].min_severity = "routine"
cfg.notifications.toggles["seismic"].severity_channels = {
@ -331,5 +337,6 @@ def test_dispatch_stats_exposes_all_counters():
stats = d.dispatch_stats()
assert set(stats.keys()) == {
"stale_dropped", "cooldown_dropped", "dedup_dropped",
"cold_start_dropped", "cold_start_anchor_at",
"cooldown_keys", "dedup_lru_size",
}

589
tests/test_wfigs_handler.py Normal file
View file

@ -0,0 +1,589 @@
"""Tests for meshai/central/wfigs_handler.py -- WFIGS persistence wire-up.
Covers:
(a) parse clean active-incident envelope (all fields populated)
(b) acres fallback chain: top-null -> raw.DiscoveryAcres used
(c) acres absent at every level -> renders "N/A"
(d) IncidentName="IA 1" placeholder passes through verbatim
(e) tombstone subject -> handler returns None + event_log row handled=0
(f) perimeter subject -> handler returns None + event_log row handled=0
(g) NEW IRWIN -> "New:" prefix + fires INSERT + mesh_broadcasts_out audit row
(h) known IRWIN no change -> drop silently, last_broadcast_* unchanged
(i) known IRWIN acres up but <8h elapsed -> drop, last_broadcast_* unchanged
(j) known IRWIN acres up + >=8h elapsed -> "Update:" prefix + audit row
(k) location anchor priority: geocoder.city > nearest_town > landclass > county
"""
import os
import time
import pytest
from meshai import central_normalizer as cn
from meshai.central.wfigs_handler import (
WFIGS_BROADCAST_COOLDOWN_S,
handle_wfigs,
)
from meshai.persistence import close_thread_connection, init_db
from meshai.persistence import db as persistence_db
# ---------- fixtures ------------------------------------------------------
@pytest.fixture
def mem_db(monkeypatch, tmp_path):
"""Fresh on-disk SQLite per test (avoids in-memory shared-cache bleed)."""
db_path = str(tmp_path / "wfigs-test.sqlite")
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
persistence_db._initialised.clear()
close_thread_connection()
conn = init_db()
yield conn
close_thread_connection()
persistence_db._initialised.discard(db_path)
@pytest.fixture
def no_photon(monkeypatch):
"""Force nearest_town to return None so anchor falls through deterministically.
Tests that exercise nearest_town wire it in directly."""
monkeypatch.setattr(cn, "_photon_reverse_places", lambda lat, lon: [])
# Also reset the H3 LRU so cache state doesn't leak across tests.
if hasattr(cn, "_H3_NEAREST_CACHE"):
cn._H3_NEAREST_CACHE.clear()
# ---------- envelope builders --------------------------------------------
_IRWIN_A = "{E7FCBC00-2D0A-49D6-889F-550D4EDCBFD6}"
_IRWIN_B = "{ABCDEF01-2345-6789-ABCD-EF0123456789}"
_IRWIN_C = "{11111111-2222-3333-4444-555555555555}"
def _make_active_envelope(*, irwin_id=_IRWIN_A,
name="Cache Peak Fire",
incident_type="wildfire",
lat=42.197, lon=-113.710,
county="Cassia", state="ID",
landclass=None,
geocoder_city=None,
daily_acres=1847.0,
pct_contained=23,
raw_discovery_acres=None,
raw_pct_contained=None,
fire_discovery_dt_ms=1780529163000,
subject="central.fire.incident.id.cassia"):
"""Build the Central CloudEvents envelope shape we observe in prod."""
geocoder = {}
if geocoder_city is not None:
geocoder["city"] = geocoder_city
if landclass is not None:
geocoder["landclass"] = landclass
raw = {}
if raw_discovery_acres is not None:
raw["DiscoveryAcres"] = raw_discovery_acres
if raw_pct_contained is not None:
raw["PercentContained"] = raw_pct_contained
return {
"subject": subject,
"id": f"{irwin_id}:active:{int(time.time())}",
"data": {
"id": irwin_id,
"adapter": "wfigs_incidents",
"category": f"fire.incident.{incident_type}",
"severity": "routine",
"geo": {
"primary_region": f"US-{state}",
"centroid": [lon, lat],
"geocoder": geocoder,
},
"data": {
"IrwinID": irwin_id,
"IncidentName": name,
"IncidentTypeCategory": incident_type,
"latitude": lat,
"longitude": lon,
"POOCounty": county,
"POOState": state,
"DailyAcres": daily_acres,
"PercentContained": pct_contained,
"FireDiscoveryDateTime": fire_discovery_dt_ms,
"raw": raw,
},
},
}
def _make_tombstone(irwin_id=_IRWIN_A, state="ID", county="Boise",
subject="central.fire.incident.removed.id"):
return {
"subject": subject,
"id": f"{irwin_id}:removed:2026-06-04T02:57:04.684858+00:00",
"data": {
"id": f"{irwin_id}:removed:2026-06-04T02:57:04.684858+00:00",
"adapter": "wfigs_incidents",
"category": "fire.incident.removed",
"severity": "routine",
"geo": {"primary_region": f"US-{state}", "geocoder": {}},
"data": {
"irwin_id": irwin_id,
"last_observed_at": "2026-06-04T02:52:04.209539+00:00",
"state": state,
"county": county,
"reason": "fallen_off_current_service",
},
},
}
def _make_perimeter(irwin_id=_IRWIN_A, state="ID", county="Cassia",
subject="central.fire.perimeter.id.cassia"):
return {
"subject": subject,
"id": f"{irwin_id}:perimeter",
"data": {
"id": f"{irwin_id}:perimeter",
"adapter": "wfigs_perimeters",
"category": "fire.perimeter.wildfire",
"severity": "routine",
"geo": {"primary_region": f"US-{state}", "geocoder": {}},
"data": {
"irwin_id": irwin_id,
"state": state,
"county": county,
},
},
}
# ============================================================================
# (a) parse a clean active-incident envelope with all fields
# ============================================================================
def test_a_parse_clean_active_envelope(mem_db, no_photon):
env = _make_active_envelope()
n = cn.normalize(env)
assert n is not None
assert n["_kind"] == "wfigs_incident"
assert n["irwin_id"] == _IRWIN_A
assert n["incident_name"] == "Cache Peak Fire"
assert n["incident_type"] == "wildfire"
assert n["acres"] == 1847.0
assert n["contained_pct"] == 23
assert n["county"] == "Cassia"
assert n["state"] == "ID"
# FireDiscoveryDateTime epoch-ms -> epoch-s conversion
assert n["declared_at_epoch"] == 1780529163
# ============================================================================
# (b) null top-level acres -> raw.DiscoveryAcres fallback used
# ============================================================================
def test_b_acres_fallback_to_raw_discovery_acres(mem_db, no_photon):
env = _make_active_envelope(daily_acres=None, pct_contained=None,
raw_discovery_acres=0.1,
raw_pct_contained=0)
n = cn.normalize(env)
assert n["acres"] == 0.1
assert n["contained_pct"] == 0
# ============================================================================
# (c) no acres anywhere -> renders "N/A"
# ============================================================================
def test_c_acres_missing_renders_na(mem_db, no_photon):
env = _make_active_envelope(name="IA 7", daily_acres=None,
pct_contained=None,
irwin_id=_IRWIN_C,
landclass="Sawtooth National Forest")
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1_000_000)
assert wire is not None
assert "N/A" in wire
assert "containment unknown" in wire
# ============================================================================
# (d) "IA 1" placeholder name passes through verbatim
# ============================================================================
def test_d_ia_placeholder_passthrough(mem_db, no_photon):
env = _make_active_envelope(name="IA 1", county="Elmore",
daily_acres=None, pct_contained=None,
landclass="Sawtooth National Forest",
irwin_id=_IRWIN_B)
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1_000_000)
assert wire is not None
assert "IA 1" in wire
# ============================================================================
# (e) tombstone subject -> handler returns None + event_log handled=0
# ============================================================================
def test_e_tombstone_returns_none_and_logs(mem_db, no_photon):
env = _make_tombstone()
n = cn.normalize(env)
assert n["_kind"] == "wfigs_tombstone"
out = handle_wfigs(n, env, env["subject"], now=2_000_000)
assert out is None
row = mem_db.execute(
"SELECT source, category, handled, table_name, table_pk, nats_subject "
"FROM event_log WHERE event_id_external=?", (_IRWIN_A,)).fetchone()
assert row is not None
assert row["source"] == "wfigs_incidents"
assert row["category"] == "fire.incident.removed"
assert row["handled"] == 0
assert row["table_name"] is None
assert row["table_pk"] == _IRWIN_A
assert row["nats_subject"] == "central.fire.incident.removed.id"
# No row in fires.
n_fires = mem_db.execute("SELECT COUNT(*) AS n FROM fires").fetchone()["n"]
assert n_fires == 0
# ============================================================================
# (f) perimeter subject -> same as tombstone
# ============================================================================
def test_f_perimeter_returns_none_and_logs(mem_db, no_photon):
env = _make_perimeter()
n = cn.normalize(env)
assert n["_kind"] == "wfigs_perimeter"
out = handle_wfigs(n, env, env["subject"], now=3_000_000)
assert out is None
row = mem_db.execute(
"SELECT source, handled FROM event_log WHERE event_id_external=?",
(_IRWIN_A,)).fetchone()
assert row is not None
assert row["source"] == "wfigs_perimeters"
assert row["handled"] == 0
n_fires = mem_db.execute("SELECT COUNT(*) AS n FROM fires").fetchone()["n"]
assert n_fires == 0
# ============================================================================
# (g) NEW IRWIN -> "New:" prefix + fires INSERT + mesh_broadcasts_out audit
# ============================================================================
def test_g_new_irwin_inserts_and_broadcasts(mem_db, no_photon):
env = _make_active_envelope(geocoder_city="Burley") # avoids Photon path
now = 5_000_000
data = {}
wire = handle_wfigs(cn.normalize(env), env, env["subject"],
data=data, now=now)
assert wire is not None
assert wire.startswith("🔥 New: Cache Peak Fire")
assert "Burley" in wire
assert "1,847 ac" in wire
assert "23% contained" in wire
assert "@ 42.197,-113.710" in wire
# v0.5.8b: handler INSERTs the fires row with last_broadcast_*=NULL,
# then attaches a commit callback. The dispatcher fires the callback
# on successful broadcast; we simulate that here.
fr_pre = mem_db.execute(
"SELECT last_broadcast_at FROM fires WHERE irwin_id=?",
(_IRWIN_A,)).fetchone()
assert fr_pre["last_broadcast_at"] is None
data["_on_broadcast_committed"](float(now))
fr = mem_db.execute(
"SELECT current_acres, last_broadcast_at, last_broadcast_acres, "
"last_broadcast_contained, last_event_at "
"FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
assert fr is not None
assert fr["current_acres"] == 1847.0
assert fr["last_broadcast_at"] == now
assert fr["last_broadcast_acres"] == 1847.0
assert fr["last_broadcast_contained"] == 23
assert fr["last_event_at"] == now
# event_log row logged with handled=1.
el = mem_db.execute(
"SELECT handled, table_name, table_pk FROM event_log "
"WHERE event_id_external=?", (_IRWIN_A,)).fetchone()
assert el["handled"] == 1
assert el["table_name"] == "fires"
assert el["table_pk"] == _IRWIN_A
# v0.5.8b: mesh_broadcasts_out is inserted by the dispatcher
# (test_cold_start_grace covers that path). The handler only signals
# via data["_broadcast_audit"] that an audit row is wanted.
assert data["_broadcast_audit"] == {"table": "fires", "pk": _IRWIN_A}
# ============================================================================
# (h) known IRWIN no-change -> drop silently, last_broadcast_* unchanged
# ============================================================================
def test_h_known_irwin_no_change_drops(mem_db, no_photon):
env = _make_active_envelope(geocoder_city="Burley")
first_now = 5_000_000
data0 = {}
handle_wfigs(cn.normalize(env), env, env["subject"],
data=data0, now=first_now)
# v0.5.8b: dispatcher commit closes the broadcast.
data0["_on_broadcast_committed"](float(first_now))
# Re-publish the same incident exactly 30 min later: same acres + contained.
later = first_now + 1800
out = handle_wfigs(cn.normalize(env), env, env["subject"], now=later)
assert out is None
fr = mem_db.execute(
"SELECT last_broadcast_at, last_broadcast_acres, last_broadcast_contained, "
"last_event_at FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
# last_broadcast_* unchanged from the original.
assert fr["last_broadcast_at"] == first_now
assert fr["last_broadcast_acres"] == 1847.0
assert fr["last_broadcast_contained"] == 23
# last_event_at was refreshed.
assert fr["last_event_at"] == later
# v0.5.8b: mesh_broadcasts_out is inserted by the dispatcher, not the
# handler -- this test never invokes a real dispatcher, so count is 0.
cnt = mem_db.execute(
"SELECT COUNT(*) AS n FROM mesh_broadcasts_out WHERE source_event_pk=?",
(_IRWIN_A,)).fetchone()["n"]
assert cnt == 0
# ============================================================================
# (i) known IRWIN acres up but <8h elapsed -> drop, last_broadcast_* unchanged
# ============================================================================
def test_i_known_irwin_change_inside_cooldown_drops(mem_db, no_photon):
env_initial = _make_active_envelope(geocoder_city="Burley")
data0 = {}
handle_wfigs(cn.normalize(env_initial), env_initial,
env_initial["subject"], data=data0, now=5_000_000)
data0["_on_broadcast_committed"](float(5_000_000))
# Bigger fire, but only 4h later -- inside cooldown.
env_grown = _make_active_envelope(geocoder_city="Burley",
daily_acres=3000.0, pct_contained=23)
later = 5_000_000 + 4 * 3600
out = handle_wfigs(cn.normalize(env_grown), env_grown,
env_grown["subject"], now=later)
assert out is None
fr = mem_db.execute(
"SELECT last_broadcast_at, last_broadcast_acres, last_broadcast_contained, "
"current_acres FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
assert fr["last_broadcast_at"] == 5_000_000
assert fr["last_broadcast_acres"] == 1847.0
assert fr["last_broadcast_contained"] == 23
# current_acres was refreshed to the new value.
assert fr["current_acres"] == 3000.0
# ============================================================================
# (j) known IRWIN acres up AND >=8h elapsed -> "Update:" + audit row
# ============================================================================
def test_j_known_irwin_change_after_cooldown_broadcasts(mem_db, no_photon):
env_initial = _make_active_envelope(geocoder_city="Burley")
data_j0 = {}
handle_wfigs(cn.normalize(env_initial), env_initial,
env_initial["subject"], data=data_j0, now=5_000_000)
data_j0["_on_broadcast_committed"](float(5_000_000))
env_grown = _make_active_envelope(geocoder_city="Burley",
daily_acres=3000.0, pct_contained=35)
later = 5_000_000 + WFIGS_BROADCAST_COOLDOWN_S
data2 = {}
out = handle_wfigs(cn.normalize(env_grown), env_grown,
env_grown["subject"], data=data2, now=later)
assert out is not None
assert out.startswith("🔥 Update: Cache Peak Fire")
assert "3,000 ac" in out
assert "35% contained" in out
# Simulate dispatcher commit.
data2["_on_broadcast_committed"](float(later))
fr = mem_db.execute(
"SELECT last_broadcast_at, last_broadcast_acres, last_broadcast_contained "
"FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
assert fr["last_broadcast_at"] == later
assert fr["last_broadcast_acres"] == 3000.0
assert fr["last_broadcast_contained"] == 35
# ============================================================================
# (k) location anchor priority -- city > nearest_town > landclass > county
# ============================================================================
def test_k_anchor_geocoder_city_wins(mem_db, no_photon):
env = _make_active_envelope(geocoder_city="Twin Falls",
landclass="Sawtooth NF",
county="Cassia")
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1)
assert "Twin Falls" in wire
assert "Sawtooth NF" not in wire
assert "Cassia Co" not in wire
def test_k_anchor_falls_to_nearest_town(monkeypatch, mem_db):
"""When city missing, nearest_town(distance, bearing) feeds the anchor."""
fake = {"name": "Boise", "distance_mi": 47.0, "bearing": "S"}
monkeypatch.setattr(
"meshai.central_normalizer.nearest_town",
lambda lat, lon, max_distance_mi=50.0: fake,
)
env = _make_active_envelope(geocoder_city=None,
landclass="Sawtooth NF",
county="Cassia")
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1)
assert "47 mi S of Boise" in wire
# Lower-priority anchors NOT used when nearest_town hit.
assert "Sawtooth NF" not in wire
def test_k_anchor_falls_to_landclass(monkeypatch, mem_db):
monkeypatch.setattr(
"meshai.central_normalizer.nearest_town",
lambda lat, lon, max_distance_mi=50.0: None,
)
env = _make_active_envelope(geocoder_city=None,
landclass="Sawtooth National Forest",
county="Cassia")
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1)
assert "Sawtooth National Forest" in wire
assert "Cassia Co" not in wire
def test_k_anchor_falls_to_county(monkeypatch, mem_db):
monkeypatch.setattr(
"meshai.central_normalizer.nearest_town",
lambda lat, lon, max_distance_mi=50.0: None,
)
env = _make_active_envelope(geocoder_city=None, landclass=None,
county="Cassia", state="ID")
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1)
assert "Cassia Co ID" in wire
def test_k_anchor_nearest_town_under_one_mile_says_near(monkeypatch, mem_db):
fake = {"name": "Burley", "distance_mi": 0.3, "bearing": "N"}
monkeypatch.setattr(
"meshai.central_normalizer.nearest_town",
lambda lat, lon, max_distance_mi=50.0: fake,
)
env = _make_active_envelope(geocoder_city=None)
wire = handle_wfigs(cn.normalize(env), env, env["subject"], now=1)
assert "near Burley" in wire
# ============================================================================
# v0.5.8b refactor -- New:/Update: prefix survives cold-start drops
# ============================================================================
def _run_handler_only(env, data=None, now=None):
"""Run normalize + handler WITHOUT invoking any commit callback.
Simulates the dispatcher dropping the broadcast (grace/cooldown/stale)
after the handler has already written persistence rows."""
n = cn.normalize(env)
if data is None:
data = {}
wire = handle_wfigs(n, env, env["subject"], data=data, now=now)
return wire, data
def _commit(data, committed_at):
"""Simulate the dispatcher invoking the handler's post-commit callback."""
cb = data.get("_on_broadcast_committed")
assert callable(cb), "handler must attach _on_broadcast_committed"
cb(committed_at)
def test_e_cold_start_then_resume_still_new(mem_db, no_photon):
"""Cold-start drop scenario: first pass writes fires + event_log but
dispatcher drops the broadcast (we skip the callback). Second pass on
the SAME IRWIN must still produce "New:" because last_broadcast_at is
still NULL -- it really is the first delivery for that fire.
"""
env = _make_active_envelope(geocoder_city="Burley")
# Pass 1: handler runs, but the dispatcher drops the broadcast (we
# mimic that by not calling the commit callback).
wire1, data1 = _run_handler_only(env, now=10_000)
assert wire1.startswith("🔥 New: ")
fr = mem_db.execute(
"SELECT current_acres, last_broadcast_at, last_broadcast_acres "
"FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
assert fr is not None
assert fr["current_acres"] == 1847.0
assert fr["last_broadcast_at"] is None
assert fr["last_broadcast_acres"] is None
# Pass 2: same envelope 5 minutes later (still pre-broadcast).
wire2, data2 = _run_handler_only(env, now=10_300)
assert wire2.startswith("🔥 New: "), "must still be 'New:' until last_broadcast_at gets set"
fr2 = mem_db.execute(
"SELECT current_acres, last_broadcast_at, last_event_at "
"FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
# last_event_at advanced; last_broadcast_at still NULL.
assert fr2["last_event_at"] == 10_300
assert fr2["last_broadcast_at"] is None
def test_f_commit_callback_updates_last_broadcast(mem_db, no_photon):
"""After the dispatcher calls the callback, last_broadcast_* reflect
the committed timestamp + the acres/containment of THIS broadcast."""
env = _make_active_envelope(geocoder_city="Burley")
wire, data = _run_handler_only(env, now=20_000)
assert wire is not None
_commit(data, committed_at=20_005.0)
fr = mem_db.execute(
"SELECT last_broadcast_at, last_broadcast_acres, last_broadcast_contained "
"FROM fires WHERE irwin_id=?", (_IRWIN_A,)).fetchone()
assert fr["last_broadcast_at"] == 20_005
assert fr["last_broadcast_acres"] == 1847.0
assert fr["last_broadcast_contained"] == 23
# Third pass: same IRWIN, no growth, no callback (cooldown applies).
# Handler must return None this time because last_broadcast_at IS NOT NULL
# and the change-detection gates report no change.
env_same = _make_active_envelope(geocoder_city="Burley")
wire3, _ = _run_handler_only(env_same, now=20_010)
assert wire3 is None
def test_g_callback_not_called_means_last_broadcast_stays_null(mem_db, no_photon):
"""If dispatcher drops for any reason (grace, staleness, cooldown,
dedup) the callback is not invoked -- last_broadcast_* stays NULL and
the next successful broadcast emits 'New:' (not 'Update:'). This is
the inverse of test_e from the persistence-row side."""
env = _make_active_envelope(geocoder_city="Burley")
wire, data = _run_handler_only(env, now=30_000)
assert wire is not None
# No _commit() call.
fr = mem_db.execute(
"SELECT last_broadcast_at FROM fires WHERE irwin_id=?",
(_IRWIN_A,)).fetchone()
assert fr["last_broadcast_at"] is None
def test_h_no_audit_row_inserted_when_handler_skips_commit(mem_db, no_photon):
"""The handler no longer writes mesh_broadcasts_out -- the dispatcher
inserts it via `_broadcast_audit`. Until the dispatcher calls _commit,
there should be zero rows in mesh_broadcasts_out even though fires
has the new row."""
env = _make_active_envelope(geocoder_city="Burley")
wire, data = _run_handler_only(env, now=40_000)
assert wire is not None
n = mem_db.execute(
"SELECT COUNT(*) AS n FROM mesh_broadcasts_out").fetchone()["n"]
assert n == 0
# The handler signalled the dispatcher SHOULD insert an audit row.
audit = data["_broadcast_audit"]
assert audit == {"table": "fires", "pk": _IRWIN_A}
def test_h_handler_attaches_audit_descriptor_and_callback(mem_db, no_photon):
"""Sanity: every active wire-string return must come with the two
dispatcher hooks attached."""
env = _make_active_envelope(geocoder_city="Burley", irwin_id=_IRWIN_B)
data = {}
wire = handle_wfigs(cn.normalize(env), env, env["subject"],
data=data, now=50_000)
assert wire is not None
assert callable(data["_on_broadcast_committed"])
assert data["_broadcast_audit"]["table"] == "fires"
assert data["_broadcast_audit"]["pk"] == _IRWIN_B