meshai/tests/test_notification_toggles.py
K7ZVX 053d67db6e feat(v0.5.8b): persistence foundation + WFIGS handler + universal cold-start grace
Three integrated pieces that ship together because they were designed as one safety story: (1) PERSISTENCE FOUNDATION -- new meshai/persistence/ module with SQLite db.py, schema migration framework (v1), 13 tables covering all adapter event shapes (traffic_events, fires, firms_pixels, quake_events, nws_alerts, gauge_readings, swpc_events) + mesh state (mesh_nodes, mesh_telemetry, mesh_positions, mesh_messages_in, mesh_broadcasts_out, mesh_health_events) + cross-cutting event_log + schema_meta. WAL mode for reader concurrency, single-writer pattern, MESHAI_DB_PATH env var, mounted at /data/meshai.sqlite via existing docker-compose meshai_data volume. .gitignore updated. (2) WFIGS HANDLER -- meshai/central/wfigs_handler.py implements the first per-adapter handler that uses the persistence layer. Format: MEDIUM style with town/landclass/county fallback chain, lat/lon at 3-decimal precision, New:/Update: prefix. 8h-rate-limited change-detection per IRWIN via fires.last_broadcast_at. Skips tombstones and perimeters silently (logged to event_log with handled=0). Acres fallback chain DailyAcres -> IncidentSize -> raw.DiscoveryAcres -> raw.FinalAcres -> N/A. Pass-through Initial Attack auto-numbered names (IA 1, IA 2). (3) UNIVERSAL COLD-START GRACE -- meshai/notifications/pipeline/dispatcher.py grows a configurable grace window (cold_start_grace_seconds, default 60s, GUI-editable per Rule 17). Anchored to first-event-seen (not container boot), so the grace activates the moment broadcasts could fire. Suppresses mesh delivery during the window; handler-side persistence (fires UPSERT, event_log) still happens normally. New _cold_start_dropped counter exposed in dispatch_stats(). Designed to protect against JetStream backlog spam at toggle-flip time, applies universally to ALL adapters. (4) WFIGS HANDLER CALLBACK REFACTOR -- New:/Update: prefix now keys on fires.last_broadcast_at IS NULL (not row-missing), and last_broadcast_* field updates moved to a post-broadcast commit callback that the dispatcher invokes ONLY on successful delivery. This means: cold-start-suppressed events leave fires.last_broadcast_at NULL, so when they eventually broadcast post-grace, they correctly render as New: (first ACTUAL delivery for that IRWIN), not Update:. event_log.handled and mesh_broadcasts_out audit row also gated on the same callback -- decoupling persistence rows from broadcast rows for an honest audit trail. New tests: 15 in test_wfigs_handler.py, 15 in test_persistence.py, additional cold-start grace tests in test_dispatcher.py (+4 WFIGS callback scenarios). Synthetic probes wfigs-cleaned-samples.md (initial) and wfigs-cleaned-samples-v2.md (cold-start verification) generated against isolated temp SQLite databases. CT108 /data/meshai.sqlite untouched during build. Master stays off. No live toggle flips. Test count: was 535 (v0.5.7 baseline) -> 566 (persistence) -> 581 (wfigs handler) -> 589 expected (cold-start grace).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-05 03:54:04 +00:00

128 lines
4.9 KiB
Python

"""v0.5 Section 1: NotificationToggle dispatch routing tests."""
import asyncio
from meshai.config import Config
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.events import make_event
class RecChannel:
def __init__(self, rec):
self.rec = rec
async def deliver(self, payload, rule):
self.rec.append({
"delivery_type": rule.delivery_type,
"name": rule.name,
"broadcast_channel": rule.broadcast_channel,
"node_ids": list(rule.node_ids),
"override_quiet": rule.override_quiet,
})
return True
def _dispatch(cfg, event):
rec = []
d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None)
asyncio.run(d.dispatch(event))
return rec
def _cfg(enable="weather", **kw):
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy tests
t = cfg.notifications.toggles[enable]
t.enabled = True
t.min_severity = kw.get("min_severity", "priority")
t.regions = kw.get("regions", [])
t.severity_channels = kw.get("severity_channels", {"priority": ["mesh_broadcast"]})
return cfg
def _ev(severity="priority", category="weather_warning", region=None, regions=None):
return make_event(source="nws", category=category, severity=severity,
region=region, regions=regions or [], title="t")
def test_disabled_toggle_no_dispatch():
cfg = Config(); cfg.notifications.rules = [] # weather disabled by default
cfg.notifications.cold_start_grace_seconds = 0
assert _dispatch(cfg, _ev()) == []
def test_enabled_toggle_dispatches():
rec = _dispatch(_cfg(), _ev(severity="priority"))
assert len(rec) == 1 and rec[0]["delivery_type"] == "mesh_broadcast"
assert rec[0]["name"] == "toggle:weather"
def test_region_empty_allows_all():
rec = _dispatch(_cfg(regions=[]), _ev(region="Boise"))
assert len(rec) == 1
def test_region_populated_blocks_mismatch():
cfg = _cfg(regions=["Magic Valley"])
assert _dispatch(cfg, _ev(region="Boise")) == []
assert len(_dispatch(cfg, _ev(region="Magic Valley"))) == 1
def test_region_matches_via_regions_list():
cfg = _cfg(regions=["Magic Valley"])
assert len(_dispatch(cfg, _ev(region=None, regions=["Magic Valley", "X"]))) == 1
def test_severity_threshold():
cfg = _cfg(min_severity="priority",
severity_channels={"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
"immediate": ["mesh_broadcast"]})
assert _dispatch(cfg, _ev(severity="routine")) == [] # below threshold
assert len(_dispatch(cfg, _ev(severity="priority"))) == 1
assert len(_dispatch(cfg, _ev(severity="immediate"))) == 1
def test_per_severity_channel_routing():
cfg = _cfg(min_severity="routine",
severity_channels={"priority": ["mesh_broadcast"],
"immediate": ["mesh_broadcast", "mesh_dm"]})
assert len(_dispatch(cfg, _ev(severity="priority"))) == 1
imm = _dispatch(cfg, _ev(severity="immediate"))
assert {r["delivery_type"] for r in imm} == {"mesh_broadcast", "mesh_dm"}
def test_digest_channel_skipped_in_live_dispatch():
cfg = _cfg(severity_channels={"priority": ["digest", "mesh_broadcast"]})
rec = _dispatch(cfg, _ev(severity="priority"))
assert [r["delivery_type"] for r in rec] == ["mesh_broadcast"] # digest not live-dispatched
def test_quiet_hours_override_immediate_only():
cfg = _cfg(min_severity="routine",
severity_channels={"priority": ["mesh_broadcast"], "immediate": ["mesh_broadcast"]})
cfg.notifications.toggles["weather"].quiet_hours_override = True
assert _dispatch(cfg, _ev(severity="priority"))[0]["override_quiet"] is False
assert _dispatch(cfg, _ev(severity="immediate"))[0]["override_quiet"] is True
def test_category_maps_to_correct_family():
# seismic family toggle handles earthquake_event via get_toggle fallback
cfg = Config(); cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy test
cfg.notifications.toggles["seismic"].enabled = True
cfg.notifications.toggles["seismic"].severity_channels = {"priority": ["mesh_broadcast"]}
rec = _dispatch(cfg, _ev(severity="priority", category="earthquake_event"))
assert len(rec) == 1 and rec[0]["name"] == "toggle:seismic"
def test_rules_and_toggles_both_fire():
from meshai.config import NotificationRuleConfig
cfg = _cfg()
cfg.notifications.rules = [NotificationRuleConfig(
name="legacy", enabled=True, trigger_type="condition",
categories=["weather_warning"], min_severity="routine",
delivery_type="mesh_broadcast")]
rec = _dispatch(cfg, _ev(severity="priority"))
names = {r["name"] for r in rec}
assert "legacy" in names and "toggle:weather" in names # parallel paths both fire