mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-10 17:04:45 +02:00
Closes Rule-20 dispatcher gap from audit doc v0.6-phase1-audit.md finding #1. Pre-this-commit the cold-start anchor, 4 drop counters, per-toggle cooldown map, and dedup OrderedDict all lived in Dispatcher instance memory and were lost on every container restart. v5.sql adds three tables: - dispatcher_state (singleton id=1): cold_start_anchor + 4 drop counters - dispatcher_cooldowns ((toggle,category,region) keyed): last_fired_at - dispatcher_dedup ((source,event_id) keyed): seen_at Dispatcher refactor: - __init__ calls _restore_from_db -- counters, cold-start anchor, cooldown map, and dedup LRU (most-recent 10k by seen_at) all rehydrated from the three new tables - write-through on every mutation: _persist_state for counter/anchor, _persist_cooldown for cooldown UPSERT + 2*cooldown_s prune, _persist_dedup for dedup INSERT OR REPLACE + 7-day cleanup - in-memory caches stay authoritative on the fast read path - cumulative-since-install counters (NOT since-boot); LLM will be able to answer "we have dropped 47 stale events this week" after commit #5 (env_reporter) lands - graceful degrade: missing v5 tables / persistence outage falls back to fresh in-memory state without crashing the constructor Tests: - tests/test_dispatcher_persistence.py (17 tests): state restore on init, counter+cooldown+dedup survival across simulated restart, cooldown rearm within 2x window, dedup LRU rebuild caps at 10k, 7-day cleanup on insert, INSERT OR REPLACE on duplicate source+event_id, v5 migration idempotent, synthetic storm (50 events) -> restart -> replay (5 incl 1 duplicate) with the duplicate dedup-rejected and counters NOT resetting - tests/conftest.py (new): autouse MESHAI_DB_PATH redirection to per-test tmp file, so the dispatcher_* tables on production /data dont get polluted by tests that construct Dispatcher() without an explicit fixture - tests/test_notification_toggles.py: _dispatch helper wipes dedup/cooldown/ state tables between calls (per-call independence preserved; pre-v0.6-2 in-memory-only Dispatcher reset naturally per instance) Test count: 680 -> 697 (+17 new, 0 regressions). Refs audit doc v0.6-phase1-audit.md finding #1.
145 lines
5.6 KiB
Python
145 lines
5.6 KiB
Python
"""v0.5 Section 1: NotificationToggle dispatch routing tests."""
|
|
|
|
import asyncio
|
|
|
|
from meshai.config import Config
|
|
from meshai.notifications.pipeline.dispatcher import Dispatcher
|
|
from meshai.notifications.events import make_event
|
|
|
|
|
|
class RecChannel:
|
|
def __init__(self, rec):
|
|
self.rec = rec
|
|
|
|
async def deliver(self, payload, rule):
|
|
self.rec.append({
|
|
"delivery_type": rule.delivery_type,
|
|
"name": rule.name,
|
|
"broadcast_channel": rule.broadcast_channel,
|
|
"node_ids": list(rule.node_ids),
|
|
"override_quiet": rule.override_quiet,
|
|
})
|
|
return True
|
|
|
|
|
|
def _dispatch(cfg, event):
|
|
rec = []
|
|
# v0.6-2: wipe dedup state between calls so each _dispatch is an
|
|
# independent "what happens if this event arrives now?" check.
|
|
# The pre-v0.6-2 in-memory dedup naturally reset per Dispatcher
|
|
# instance; the new persisted dedup carries across instances unless
|
|
# we clear it here.
|
|
try:
|
|
from meshai.persistence import get_db
|
|
conn = get_db()
|
|
conn.execute("DELETE FROM dispatcher_dedup")
|
|
conn.execute("DELETE FROM dispatcher_cooldowns")
|
|
conn.execute(
|
|
"UPDATE dispatcher_state SET cold_start_anchor=NULL, "
|
|
"stale_dropped=0, cooldown_dropped=0, dedup_dropped=0, "
|
|
"cold_start_dropped=0 WHERE id=1"
|
|
)
|
|
except Exception:
|
|
pass
|
|
d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None)
|
|
asyncio.run(d.dispatch(event))
|
|
return rec
|
|
|
|
|
|
def _cfg(enable="weather", **kw):
|
|
cfg = Config()
|
|
cfg.notifications.rules = []
|
|
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy tests
|
|
t = cfg.notifications.toggles[enable]
|
|
t.enabled = True
|
|
t.min_severity = kw.get("min_severity", "priority")
|
|
t.regions = kw.get("regions", [])
|
|
t.severity_channels = kw.get("severity_channels", {"priority": ["mesh_broadcast"]})
|
|
return cfg
|
|
|
|
|
|
def _ev(severity="priority", category="weather_warning", region=None, regions=None):
|
|
return make_event(source="nws", category=category, severity=severity,
|
|
region=region, regions=regions or [], title="t")
|
|
|
|
|
|
def test_disabled_toggle_no_dispatch():
|
|
cfg = Config(); cfg.notifications.rules = [] # weather disabled by default
|
|
cfg.notifications.cold_start_grace_seconds = 0
|
|
assert _dispatch(cfg, _ev()) == []
|
|
|
|
|
|
def test_enabled_toggle_dispatches():
|
|
rec = _dispatch(_cfg(), _ev(severity="priority"))
|
|
assert len(rec) == 1 and rec[0]["delivery_type"] == "mesh_broadcast"
|
|
assert rec[0]["name"] == "toggle:weather"
|
|
|
|
|
|
def test_region_empty_allows_all():
|
|
rec = _dispatch(_cfg(regions=[]), _ev(region="Boise"))
|
|
assert len(rec) == 1
|
|
|
|
|
|
def test_region_populated_blocks_mismatch():
|
|
cfg = _cfg(regions=["Magic Valley"])
|
|
assert _dispatch(cfg, _ev(region="Boise")) == []
|
|
assert len(_dispatch(cfg, _ev(region="Magic Valley"))) == 1
|
|
|
|
|
|
def test_region_matches_via_regions_list():
|
|
cfg = _cfg(regions=["Magic Valley"])
|
|
assert len(_dispatch(cfg, _ev(region=None, regions=["Magic Valley", "X"]))) == 1
|
|
|
|
|
|
def test_severity_threshold():
|
|
cfg = _cfg(min_severity="priority",
|
|
severity_channels={"routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"],
|
|
"immediate": ["mesh_broadcast"]})
|
|
assert _dispatch(cfg, _ev(severity="routine")) == [] # below threshold
|
|
assert len(_dispatch(cfg, _ev(severity="priority"))) == 1
|
|
assert len(_dispatch(cfg, _ev(severity="immediate"))) == 1
|
|
|
|
|
|
def test_per_severity_channel_routing():
|
|
cfg = _cfg(min_severity="routine",
|
|
severity_channels={"priority": ["mesh_broadcast"],
|
|
"immediate": ["mesh_broadcast", "mesh_dm"]})
|
|
assert len(_dispatch(cfg, _ev(severity="priority"))) == 1
|
|
imm = _dispatch(cfg, _ev(severity="immediate"))
|
|
assert {r["delivery_type"] for r in imm} == {"mesh_broadcast", "mesh_dm"}
|
|
|
|
|
|
def test_digest_channel_skipped_in_live_dispatch():
|
|
cfg = _cfg(severity_channels={"priority": ["digest", "mesh_broadcast"]})
|
|
rec = _dispatch(cfg, _ev(severity="priority"))
|
|
assert [r["delivery_type"] for r in rec] == ["mesh_broadcast"] # digest not live-dispatched
|
|
|
|
|
|
def test_quiet_hours_override_immediate_only():
|
|
cfg = _cfg(min_severity="routine",
|
|
severity_channels={"priority": ["mesh_broadcast"], "immediate": ["mesh_broadcast"]})
|
|
cfg.notifications.toggles["weather"].quiet_hours_override = True
|
|
assert _dispatch(cfg, _ev(severity="priority"))[0]["override_quiet"] is False
|
|
assert _dispatch(cfg, _ev(severity="immediate"))[0]["override_quiet"] is True
|
|
|
|
|
|
def test_category_maps_to_correct_family():
|
|
# seismic family toggle handles earthquake_event via get_toggle fallback
|
|
cfg = Config(); cfg.notifications.rules = []
|
|
cfg.notifications.cold_start_grace_seconds = 0 # v0.5.8b: legacy test
|
|
cfg.notifications.toggles["seismic"].enabled = True
|
|
cfg.notifications.toggles["seismic"].severity_channels = {"priority": ["mesh_broadcast"]}
|
|
rec = _dispatch(cfg, _ev(severity="priority", category="earthquake_event"))
|
|
assert len(rec) == 1 and rec[0]["name"] == "toggle:seismic"
|
|
|
|
|
|
def test_rules_and_toggles_both_fire():
|
|
from meshai.config import NotificationRuleConfig
|
|
cfg = _cfg()
|
|
cfg.notifications.rules = [NotificationRuleConfig(
|
|
name="legacy", enabled=True, trigger_type="condition",
|
|
categories=["weather_warning"], min_severity="routine",
|
|
delivery_type="mesh_broadcast")]
|
|
rec = _dispatch(cfg, _ev(severity="priority"))
|
|
names = {r["name"] for r in rec}
|
|
assert "legacy" in names and "toggle:weather" in names # parallel paths both fire
|