meshai/tests/test_notification_toggles.py
Matt Johnson (via Claude) c333a97344 feat(v0.6-2): dispatcher state persistence -- cold-start, cooldowns, dedup LRU to SQLite
Closes Rule-20 dispatcher gap from audit doc v0.6-phase1-audit.md finding #1.
Pre-this-commit the cold-start anchor, 4 drop counters, per-toggle cooldown
map, and dedup OrderedDict all lived in Dispatcher instance memory and were
lost on every container restart.

v5.sql adds three tables:
  - dispatcher_state (singleton id=1): cold_start_anchor + 4 drop counters
  - dispatcher_cooldowns ((toggle,category,region) keyed): last_fired_at
  - dispatcher_dedup ((source,event_id) keyed): seen_at

Dispatcher refactor:
  - __init__ calls _restore_from_db -- counters, cold-start anchor, cooldown
    map, and dedup LRU (most-recent 10k by seen_at) all rehydrated from the
    three new tables
  - write-through on every mutation: _persist_state for counter/anchor,
    _persist_cooldown for cooldown UPSERT + 2*cooldown_s prune,
    _persist_dedup for dedup INSERT OR REPLACE + 7-day cleanup
  - in-memory caches stay authoritative on the fast read path
  - cumulative-since-install counters (NOT since-boot); LLM will be able
    to answer "we have dropped 47 stale events this week" after commit #5
    (env_reporter) lands
  - graceful degrade: missing v5 tables / persistence outage falls back to
    fresh in-memory state without crashing the constructor

Tests:
  - tests/test_dispatcher_persistence.py (17 tests): state restore on init,
    counter+cooldown+dedup survival across simulated restart, cooldown rearm
    within 2x window, dedup LRU rebuild caps at 10k, 7-day cleanup on insert,
    INSERT OR REPLACE on duplicate source+event_id, v5 migration idempotent,
    synthetic storm (50 events) -> restart -> replay (5 incl 1 duplicate)
    with the duplicate dedup-rejected and counters NOT resetting
  - tests/conftest.py (new): autouse MESHAI_DB_PATH redirection to per-test
    tmp file, so the dispatcher_*  tables on production /data dont get
    polluted by tests that construct Dispatcher() without an explicit fixture
  - tests/test_notification_toggles.py: _dispatch helper wipes dedup/cooldown/
    state tables between calls (per-call independence preserved; pre-v0.6-2
    in-memory-only Dispatcher reset naturally per instance)

Test count: 680 -> 697 (+17 new, 0 regressions).

Refs audit doc v0.6-phase1-audit.md finding #1.
2026-06-05 16:35:40 +00:00

145 lines
5.6 KiB
Python

"""v0.5 Section 1: NotificationToggle dispatch routing tests."""
import asyncio
from meshai.config import Config
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.events import make_event
class RecChannel:
def __init__(self, rec):
self.rec = rec
async def deliver(self, payload, rule):
self.rec.append({
"delivery_type": rule.delivery_type,
"name": rule.name,
"broadcast_channel": rule.broadcast_channel,
"node_ids": list(rule.node_ids),
"override_quiet": rule.override_quiet,
})
return True
def _dispatch(cfg, event):
rec = []
# v0.6-2: wipe dedup state between calls so each _dispatch is an
# independent "what happens if this event arrives now?" check.
# The pre-v0.6-2 in-memory dedup naturally reset per Dispatcher
# instance; the new persisted dedup carries across instances unless
# we clear it here.
try:
from meshai.persistence import get_db
conn = get_db()
conn.execute("DELETE FROM dispatcher_dedup")
conn.execute("DELETE FROM dispatcher_cooldowns")
conn.execute(
"UPDATE dispatcher_state SET cold_start_anchor=NULL, "
"stale_dropped=0, cooldown_dropped=0, dedup_dropped=0, "
"cold_start_dropped=0 WHERE id=1"
)
except Exception:
pass
d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None)
asyncio.run(d.dispatch(event))
return rec
def _cfg(enable="weather", **kw):
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy tests
t = cfg.notifications.toggles[enable]
t.enabled = True
t.min_severity = kw.get("min_severity", "priority")
t.regions = kw.get("regions", [])
t.severity_channels = kw.get("severity_channels", {"priority": ["mesh_broadcast"]})
return cfg
def _ev(severity="priority", category="weather_warning", region=None, regions=None):
return make_event(source="nws", category=category, severity=severity,
region=region, regions=regions or [], title="t")
def test_disabled_toggle_no_dispatch():
cfg = Config(); cfg.notifications.rules = [] # weather disabled by default
cfg.notifications.cold_start_grace_seconds = 0
assert _dispatch(cfg, _ev()) == []
def test_enabled_toggle_dispatches():
rec = _dispatch(_cfg(), _ev(severity="priority"))
assert len(rec) == 1 and rec[0]["delivery_type"] == "mesh_broadcast"
assert rec[0]["name"] == "toggle:weather"
def test_region_empty_allows_all():
rec = _dispatch(_cfg(regions=[]), _ev(region="Boise"))
assert len(rec) == 1
def test_region_populated_blocks_mismatch():
cfg = _cfg(regions=["Magic Valley"])
assert _dispatch(cfg, _ev(region="Boise")) == []
assert len(_dispatch(cfg, _ev(region="Magic Valley"))) == 1
def test_region_matches_via_regions_list():
cfg = _cfg(regions=["Magic Valley"])
assert len(_dispatch(cfg, _ev(region=None, regions=["Magic Valley", "X"]))) == 1
def test_severity_threshold():
cfg = _cfg(min_severity="priority",
severity_channels={"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
"immediate": ["mesh_broadcast"]})
assert _dispatch(cfg, _ev(severity="routine")) == [] # below threshold
assert len(_dispatch(cfg, _ev(severity="priority"))) == 1
assert len(_dispatch(cfg, _ev(severity="immediate"))) == 1
def test_per_severity_channel_routing():
cfg = _cfg(min_severity="routine",
severity_channels={"priority": ["mesh_broadcast"],
"immediate": ["mesh_broadcast", "mesh_dm"]})
assert len(_dispatch(cfg, _ev(severity="priority"))) == 1
imm = _dispatch(cfg, _ev(severity="immediate"))
assert {r["delivery_type"] for r in imm} == {"mesh_broadcast", "mesh_dm"}
def test_digest_channel_skipped_in_live_dispatch():
cfg = _cfg(severity_channels={"priority": ["digest", "mesh_broadcast"]})
rec = _dispatch(cfg, _ev(severity="priority"))
assert [r["delivery_type"] for r in rec] == ["mesh_broadcast"] # digest not live-dispatched
def test_quiet_hours_override_immediate_only():
cfg = _cfg(min_severity="routine",
severity_channels={"priority": ["mesh_broadcast"], "immediate": ["mesh_broadcast"]})
cfg.notifications.toggles["weather"].quiet_hours_override = True
assert _dispatch(cfg, _ev(severity="priority"))[0]["override_quiet"] is False
assert _dispatch(cfg, _ev(severity="immediate"))[0]["override_quiet"] is True
def test_category_maps_to_correct_family():
# seismic family toggle handles earthquake_event via get_toggle fallback
cfg = Config(); cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy test
cfg.notifications.toggles["seismic"].enabled = True
cfg.notifications.toggles["seismic"].severity_channels = {"priority": ["mesh_broadcast"]}
rec = _dispatch(cfg, _ev(severity="priority", category="earthquake_event"))
assert len(rec) == 1 and rec[0]["name"] == "toggle:seismic"
def test_rules_and_toggles_both_fire():
from meshai.config import NotificationRuleConfig
cfg = _cfg()
cfg.notifications.rules = [NotificationRuleConfig(
name="legacy", enabled=True, trigger_type="condition",
categories=["weather_warning"], min_severity="routine",
delivery_type="mesh_broadcast")]
rec = _dispatch(cfg, _ev(severity="priority"))
names = {r["name"] for r in rec}
assert "legacy" in names and "toggle:weather" in names # parallel paths both fire