From c333a97344a07022539c6144bd15ee9f3f1cf3aa Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Fri, 5 Jun 2026 16:35:40 +0000 Subject: [PATCH] feat(v0.6-2): dispatcher state persistence -- cold-start, cooldowns, dedup LRU to SQLite Closes Rule-20 dispatcher gap from audit doc v0.6-phase1-audit.md finding #1. Pre-this-commit the cold-start anchor, 4 drop counters, per-toggle cooldown map, and dedup OrderedDict all lived in Dispatcher instance memory and were lost on every container restart. v5.sql adds three tables: - dispatcher_state (singleton id=1): cold_start_anchor + 4 drop counters - dispatcher_cooldowns ((toggle,category,region) keyed): last_fired_at - dispatcher_dedup ((source,event_id) keyed): seen_at Dispatcher refactor: - __init__ calls _restore_from_db -- counters, cold-start anchor, cooldown map, and dedup LRU (most-recent 10k by seen_at) all rehydrated from the three new tables - write-through on every mutation: _persist_state for counter/anchor, _persist_cooldown for cooldown UPSERT + 2*cooldown_s prune, _persist_dedup for dedup INSERT OR REPLACE + 7-day cleanup - in-memory caches stay authoritative on the fast read path - cumulative-since-install counters (NOT since-boot); LLM will be able to answer "we have dropped 47 stale events this week" after commit #5 (env_reporter) lands - graceful degrade: missing v5 tables / persistence outage falls back to fresh in-memory state without crashing the constructor Tests: - tests/test_dispatcher_persistence.py (17 tests): state restore on init, counter+cooldown+dedup survival across simulated restart, cooldown rearm within 2x window, dedup LRU rebuild caps at 10k, 7-day cleanup on insert, INSERT OR REPLACE on duplicate source+event_id, v5 migration idempotent, synthetic storm (50 events) -> restart -> replay (5 incl 1 duplicate) with the duplicate dedup-rejected and counters NOT resetting - tests/conftest.py (new): autouse MESHAI_DB_PATH redirection to per-test tmp file, so the dispatcher_* tables on production /data dont get polluted by tests that construct Dispatcher() without an explicit fixture - tests/test_notification_toggles.py: _dispatch helper wipes dedup/cooldown/ state tables between calls (per-call independence preserved; pre-v0.6-2 in-memory-only Dispatcher reset naturally per instance) Test count: 680 -> 697 (+17 new, 0 regressions). Refs audit doc v0.6-phase1-audit.md finding #1. --- meshai/notifications/pipeline/dispatcher.py | 209 ++++++++- meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v5.sql | 69 +++ tests/conftest.py | 31 ++ tests/test_dispatcher_persistence.py | 494 ++++++++++++++++++++ tests/test_notification_toggles.py | 17 + 6 files changed, 815 insertions(+), 7 deletions(-) create mode 100644 meshai/persistence/migrations/v5.sql create mode 100644 tests/conftest.py create mode 100644 tests/test_dispatcher_persistence.py diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index 04ad3ac..c75e4c4 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -12,6 +12,19 @@ v0.5.2: toggle path gains three guards at the entrance (staleness, per-toggle cooldown, (source,id) LRU dedup) plus the friendly mesh-broadcast composer so the toggle path stops emitting raw `[Family] central.category` debug strings. The legacy rules path is intentionally left untouched (no regression risk). + +v0.6-2: state persistence (audit doc finding #1). The cold-start anchor, +the four drop counters, the per-(toggle,category,region) cooldown map, +and the (source,event_id) dedup OrderedDict now write through to SQLite +on every mutation. The dispatcher restores them on __init__ so they +survive container restart. In-memory caches stay authoritative on the +fast path; SQLite is the durability layer + the LLM's "what's been +suppressed?" query target (commit #5: env_reporter). + +Cumulative-since-install counters: the four `_*_dropped` ints are NOT +reset on boot. They carry forward from the dispatcher_state singleton +row. A `dispatch_stats()` call returns the in-memory (=most-recent) +values, which mirror the on-disk values exactly. """ import logging @@ -25,8 +38,20 @@ from meshai.notifications.renderers.composer import compose_mesh_message # Bounded (source, event.id) LRU set — see _dispatch_toggles Section 3. +# Holds the in-memory fast-path cap; SQLite dispatcher_dedup retains a +# 7-day window which can exceed this. On boot we restore the most-recent +# _DEDUP_LRU_MAX rows into this OrderedDict. _DEDUP_LRU_MAX = 10_000 +# v0.6-2 SQLite dedup retention. Anything older than this is deleted on +# every dispatcher_dedup insert. +_DEDUP_DB_RETENTION_S = 7 * 86400 # 604_800 + +# In-memory cooldown map prune threshold (entries). When the map grows past +# this we re-apply the 2*cooldown_s cutoff so it stays bounded. The SQLite +# prune fires on every cooldown write regardless. +_COOLDOWN_INMEM_PRUNE_THRESHOLD = 1024 + class Dispatcher: """Dispatches immediate events to channels matching configured rules.""" @@ -48,6 +73,7 @@ class Dispatcher: self._connector = connector self._logger = logging.getLogger("meshai.pipeline.dispatcher") # v0.5.2 — toggle-path guards (ops counters exposed via dispatch_stats()): + # v0.6-2: restored from dispatcher_state on __init__ via _restore_from_db. self._stale_dropped = 0 self._cooldown_dropped = 0 self._dedup_dropped = 0 @@ -60,6 +86,156 @@ class Dispatcher: self._toggle_cooldown: dict[tuple[str, str, str], float] = {} # Insertion-ordered (source, event.id) -> sentinel; evict oldest at cap. self._dedup_lru: "OrderedDict[tuple[str, str], bool]" = OrderedDict() + # v0.6-2: hydrate from SQLite. Graceful no-op if persistence is + # unavailable -- the dispatcher still works, just without + # cross-restart durability. + self._restore_from_db() + + # ---------- v0.6-2 persistence ----------------------------------------- + + def _restore_from_db(self) -> None: + """Hydrate in-memory state from dispatcher_state + dispatcher_cooldowns + + dispatcher_dedup on dispatcher construction. Idempotent. + + Defensive against missing tables: if the v5 migration hasn't run yet + (e.g. fresh DB created by a test fixture before migrations apply), + any sqlite OperationalError is caught and the dispatcher falls back + to fresh in-memory state. The fast path is unaffected.""" + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + self._logger.exception( + "dispatcher: persistence unavailable on init; using fresh " + "in-memory state (counters reset, no cooldown/dedup carryover)" + ) + return + + # State singleton. Wrap in try/except so a missing dispatcher_state + # table (pre-v5 schema) degrades gracefully to fresh state instead + # of raising into the Dispatcher constructor. + try: + row = conn.execute( + "SELECT cold_start_anchor, stale_dropped, cooldown_dropped, " + "dedup_dropped, cold_start_dropped FROM dispatcher_state WHERE id=1" + ).fetchone() + except Exception: + self._logger.debug( + "dispatcher: v5 tables not present yet; using fresh state" + ) + return + if row is not None: + self._first_event_at = row["cold_start_anchor"] + self._stale_dropped = int(row["stale_dropped"] or 0) + self._cooldown_dropped = int(row["cooldown_dropped"] or 0) + self._dedup_dropped = int(row["dedup_dropped"] or 0) + self._cold_start_dropped = int(row["cold_start_dropped"] or 0) + self._logger.info( + "dispatcher state restored: cold_start_anchor=%s " + "stale=%d cooldown=%d dedup=%d cold_start=%d", + self._first_event_at, self._stale_dropped, + self._cooldown_dropped, self._dedup_dropped, + self._cold_start_dropped, + ) + + # Cooldowns: every row restored verbatim (the in-memory prune + # threshold of 1024 will fire on the first cooldown write if the + # restored set is bigger, so even pathological histories self-bound). + for r in conn.execute( + "SELECT toggle, category, region, last_fired_at " + "FROM dispatcher_cooldowns" + ).fetchall(): + self._toggle_cooldown[(r["toggle"], r["category"], r["region"])] = \ + float(r["last_fired_at"]) + + # Dedup LRU: restore the most-recent _DEDUP_LRU_MAX rows in + # newest-first order, then re-add into the OrderedDict oldest-first + # so the natural insertion order matches the OrderedDict-as-LRU + # contract (oldest = first-evicted on overflow). On-disk retains a + # 7-day window which may exceed the in-memory cap; the LLM still + # sees the full window via direct SELECT. + rows = conn.execute( + "SELECT source, event_id FROM dispatcher_dedup " + "ORDER BY seen_at DESC LIMIT ?", + (_DEDUP_LRU_MAX,), + ).fetchall() + for r in reversed(rows): + self._dedup_lru[(r["source"], r["event_id"])] = True + + if self._toggle_cooldown or self._dedup_lru: + self._logger.info( + "dispatcher caches restored: cooldowns=%d dedup_lru=%d", + len(self._toggle_cooldown), len(self._dedup_lru), + ) + + def _persist_state(self) -> None: + """Write the current counters + cold_start_anchor to dispatcher_state. + Called whenever any of those values change.""" + try: + from meshai.persistence import get_db + conn = get_db() + conn.execute( + "UPDATE dispatcher_state SET cold_start_anchor=?, " + "stale_dropped=?, cooldown_dropped=?, dedup_dropped=?, " + "cold_start_dropped=?, updated_at=? WHERE id=1", + (self._first_event_at, self._stale_dropped, + self._cooldown_dropped, self._dedup_dropped, + self._cold_start_dropped, time.time()), + ) + except Exception: + self._logger.exception( + "dispatcher: state write-through failed; in-memory still ok" + ) + + def _persist_cooldown(self, key: tuple[str, str, str], + now: float, cooldown_s: int) -> None: + """UPSERT a single (toggle, category, region) cooldown row + prune + rows older than (2 * cooldown_s). Mirrors the in-memory prune + semantics moved off the per-1024-grow check.""" + try: + from meshai.persistence import get_db + conn = get_db() + toggle, category, region = key + conn.execute( + "INSERT OR REPLACE INTO dispatcher_cooldowns(" + "toggle, category, region, last_fired_at, updated_at) " + "VALUES (?,?,?,?,?)", + (toggle, category, region, now, now), + ) + if cooldown_s > 0: + cutoff = now - (2 * cooldown_s) + conn.execute( + "DELETE FROM dispatcher_cooldowns WHERE last_fired_at < ?", + (cutoff,), + ) + except Exception: + self._logger.exception( + "dispatcher: cooldown write-through failed for %s", key + ) + + def _persist_dedup(self, key: tuple[str, str], now: float) -> None: + """INSERT a single (source, event_id) dedup row + prune rows older + than _DEDUP_DB_RETENTION_S. Same key arriving twice updates seen_at.""" + try: + from meshai.persistence import get_db + conn = get_db() + source, event_id = key + conn.execute( + "INSERT OR REPLACE INTO dispatcher_dedup(" + "source, event_id, seen_at) VALUES (?,?,?)", + (source, event_id, now), + ) + cutoff = now - _DEDUP_DB_RETENTION_S + conn.execute( + "DELETE FROM dispatcher_dedup WHERE seen_at < ?", + (cutoff,), + ) + except Exception: + self._logger.exception( + "dispatcher: dedup write-through failed for %s", key + ) + + # ---------- core dispatch ---------------------------------------------- async def dispatch(self, event: Event) -> None: """Deliver via matching rules AND enabled family toggles (parallel, v0.5).""" @@ -68,7 +244,7 @@ class Dispatcher: async def _dispatch_rules(self, event: Event) -> None: """Deliver an immediate-severity event to all matching channels. - + This method is async and awaits each channel.deliver() call. """ rules = self._matching_rules(event) @@ -109,6 +285,10 @@ class Dispatcher: re-delivery during reconnect. Then composes a friendly mesh string instead of the prior raw `[Family] central.category` debug format. + + v0.6-2: every mutation of the four drop counters, the cold-start + anchor, the cooldown map, and the dedup LRU writes through to + SQLite via the _persist_* helpers. Read fast-path stays in-memory. """ toggles = getattr(self._config.notifications, "toggles", None) if not isinstance(toggles, dict) or not toggles: @@ -131,12 +311,14 @@ class Dispatcher: now_anchor = time.time() if self._first_event_at is None: self._first_event_at = now_anchor + self._persist_state() # anchor armed -- durable self._logger.info( "cold-start grace anchor set: t0=%.3f window=%ds", now_anchor, grace_s, ) if (now_anchor - self._first_event_at) < grace_s: self._cold_start_dropped += 1 + self._persist_state() self._logger.info( "cold-start grace: dropping broadcast source=%s category=%s " "elapsed=%.1fs window=%ds", @@ -156,6 +338,7 @@ class Dispatcher: age = time.time() - event.timestamp if age > freshness_s: self._stale_dropped += 1 + self._persist_state() self._logger.debug( "dispatcher: dropping stale event source=%s category=%s " "age=%.0fs > freshness=%ds", @@ -175,12 +358,14 @@ class Dispatcher: last_fired = self._toggle_cooldown.get(ck) if last_fired is not None and (now - last_fired) < cooldown_s: self._cooldown_dropped += 1 + self._persist_state() return # silent throttle — no log spam self._toggle_cooldown[ck] = now - # Lazy prune: keep map bounded at ~2x the largest cooldown by - # discarding entries older than 2 * cooldown_s. Cheap; runs only - # when the map grows past a threshold so it's not per-event work. - if len(self._toggle_cooldown) > 1024: + self._persist_cooldown(ck, now, cooldown_s) + # In-memory prune: mirror the SQLite cutoff when the map grows + # past the threshold. The SQLite prune already ran inside + # _persist_cooldown. + if len(self._toggle_cooldown) > _COOLDOWN_INMEM_PRUNE_THRESHOLD: cutoff = now - (2 * cooldown_s) self._toggle_cooldown = { k: t for k, t in self._toggle_cooldown.items() if t >= cutoff @@ -192,8 +377,13 @@ class Dispatcher: # Touch to keep recent. self._dedup_lru.move_to_end(dk) self._dedup_dropped += 1 + self._persist_state() + # Refresh seen_at on disk too -- a repeat sighting is fresh + # evidence we're still seeing this id. + self._persist_dedup(dk, time.time()) return self._dedup_lru[dk] = True + self._persist_dedup(dk, time.time()) while len(self._dedup_lru) > _DEDUP_LRU_MAX: self._dedup_lru.popitem(last=False) # evict oldest @@ -241,7 +431,12 @@ class Dispatcher: self._logger.exception(f"Toggle channel delivery failed for {fam}/{ch_type}") def dispatch_stats(self) -> dict: - """Expose v0.5.2 toggle-path guard counters for ops/health endpoints.""" + """Expose v0.5.2 toggle-path guard counters for ops/health endpoints. + + Returns the in-memory (= write-through current) values. Equivalent to + SELECT from dispatcher_state but avoids the DB round-trip on every + call. The numbers are cumulative-since-install (NOT since-boot). + """ return { "stale_dropped": self._stale_dropped, "cooldown_dropped": self._cooldown_dropped, @@ -279,8 +474,10 @@ class Dispatcher: now_anchor = time.time() if self._first_event_at is None: self._first_event_at = now_anchor + self._persist_state() if (now_anchor - self._first_event_at) < grace_s: self._cold_start_dropped += 1 + self._persist_state() self._logger.info( "cold-start grace: dropping scheduled broadcast " "(table=%s pk=%s)", diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index 9bab216..30d3985 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 4 +SCHEMA_VERSION = 5 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v5.sql b/meshai/persistence/migrations/v5.sql new file mode 100644 index 0000000..175898a --- /dev/null +++ b/meshai/persistence/migrations/v5.sql @@ -0,0 +1,69 @@ +-- v0.6-2 dispatcher state persistence (audit doc finding #1). +-- +-- Pre-this-migration the dispatcher held its cold-start anchor, four drop +-- counters, per-(toggle,category,region) cooldown map, and (source,event_id) +-- dedup OrderedDict in instance memory only -- every container restart +-- silently reset them. The LLM (commit #5: env_reporter) needs cumulative +-- "we've dropped 47 stale events this week" semantics, which requires +-- write-through to SQLite. +-- +-- All three tables are independent; the dispatcher's in-memory caches +-- remain authoritative on the fast path. Writes go to both. On boot the +-- in-memory caches are rebuilt from SQLite. + +-- -------------------------------------------------------------------------- +-- dispatcher_state: singleton row keyed on id=1. The CHECK constraint plus +-- the seed-INSERT-OR-IGNORE below guarantees exactly one row ever exists. +-- All counters are cumulative-since-install (NOT since-boot); the dispatcher +-- read-restores them on every __init__ and write-increments per drop. +-- -------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS dispatcher_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + cold_start_anchor REAL, -- epoch seconds; NULL until first event-to-enabled-toggle + stale_dropped INTEGER NOT NULL DEFAULT 0, + cooldown_dropped INTEGER NOT NULL DEFAULT 0, + dedup_dropped INTEGER NOT NULL DEFAULT 0, + cold_start_dropped INTEGER NOT NULL DEFAULT 0, + updated_at REAL +); + +-- Seed the singleton row. INSERT OR IGNORE so a re-run is a clean no-op. +INSERT OR IGNORE INTO dispatcher_state(id, cold_start_anchor, + stale_dropped, cooldown_dropped, dedup_dropped, cold_start_dropped, + updated_at) +VALUES (1, NULL, 0, 0, 0, 0, NULL); + +-- -------------------------------------------------------------------------- +-- dispatcher_cooldowns: one row per (toggle, category, region) triple that +-- has fired at least once. `last_fired_at` powers the "in cooldown?" check. +-- Pruned on every insert: rows older than (2 * current_cooldown_s) are +-- deleted (matches the existing in-memory prune semantics at +-- pipeline/dispatcher.py:183-187). +-- -------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS dispatcher_cooldowns ( + toggle TEXT NOT NULL, + category TEXT NOT NULL, + region TEXT NOT NULL, + last_fired_at REAL NOT NULL, + updated_at REAL NOT NULL, + PRIMARY KEY (toggle, category, region) +); +CREATE INDEX IF NOT EXISTS idx_dispatcher_cooldowns_last_fired + ON dispatcher_cooldowns(last_fired_at); + +-- -------------------------------------------------------------------------- +-- dispatcher_dedup: bounded LRU on disk -- 7-day window (deleted on every +-- insert). The in-memory OrderedDict still caps at 10k for the fast path; +-- on boot we rebuild it from the most-recent 10k rows ordered by seen_at. +-- This means the DB retains MORE history than the in-memory cap, which is +-- useful for the LLM ("did we already see this event ID?") even if it has +-- been evicted from the in-memory LRU. +-- -------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS dispatcher_dedup ( + source TEXT NOT NULL, + event_id TEXT NOT NULL, + seen_at REAL NOT NULL, + PRIMARY KEY (source, event_id) +); +CREATE INDEX IF NOT EXISTS idx_dispatcher_dedup_seen_at + ON dispatcher_dedup(seen_at); diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0997378 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,31 @@ +"""Pytest fixture isolation for meshai persistence (v0.6-2). + +Before v0.6-2 the dispatcher held all state in instance memory, so tests +that constructed `Dispatcher(...)` were inert w.r.t. SQLite. v0.6-2 made +`Dispatcher.__init__` read/restore from the persistence layer, which by +default points at `/data/meshai.sqlite`. Without isolation every test +would now read+write production state, polluting across tests and across +pytest invocations. + +This autouse fixture redirects `MESHAI_DB_PATH` to a per-test tmp file +and clears the persistence-layer threading.local caches around each test. +Existing tests that don't reference any fixture get isolation for free; +tests that explicitly use a `db_path` (or similar) fixture can still +override the env var inside their own fixture body -- last setenv wins. +""" +import pytest + +from meshai.persistence import close_thread_connection +from meshai.persistence import db as _persistence_db + + +@pytest.fixture(autouse=True) +def _isolate_meshai_db(tmp_path, monkeypatch): + """Point MESHAI_DB_PATH at a tmp file per test.""" + p = str(tmp_path / "meshai-test-isolated.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", p) + _persistence_db._initialised.clear() + close_thread_connection() + yield p + close_thread_connection() + _persistence_db._initialised.discard(p) diff --git a/tests/test_dispatcher_persistence.py b/tests/test_dispatcher_persistence.py new file mode 100644 index 0000000..db14807 --- /dev/null +++ b/tests/test_dispatcher_persistence.py @@ -0,0 +1,494 @@ +"""v0.6-2 dispatcher state persistence tests (audit doc finding #1). + +Covers: + - state row restored on dispatcher __init__ + - counter increments survive restart + - cold-start anchor survives restart + - cooldown map rebuilt from dispatcher_cooldowns on init + - dedup LRU rebuilt from dispatcher_dedup (most-recent 10k) on init + - 7-day dedup cleanup runs on insert + - 2*cooldown_s cooldown prune runs on insert + - v5 migration is idempotent + - synthetic storm + restart + replay probe (commit spec step 5) +""" +from __future__ import annotations + +import asyncio +import time as _time +from unittest.mock import MagicMock, AsyncMock + +import pytest + +from meshai.config import Config, NotificationToggle +from meshai.notifications.events import make_event +from meshai.notifications.pipeline.dispatcher import ( + Dispatcher, _DEDUP_LRU_MAX, _DEDUP_DB_RETENTION_S, +) +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +# ---------- fixtures -------------------------------------------------------- + + +@pytest.fixture +def db_path(tmp_path, monkeypatch): + p = str(tmp_path / "disp-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", p) + persistence_db._initialised.clear() + close_thread_connection() + init_db() + yield p + close_thread_connection() + persistence_db._initialised.discard(p) + + +def _build_config(*, cold_start_grace=60, + fire_enabled=True, fire_cooldown=300, + fire_freshness=600, + fire_severity_channels=None): + """Minimal Config with the `fire` family toggle enabled. The toggle uses + a fake `broadcast_channel=1` + mesh_broadcast severity-channel mapping so + dispatch() reaches the channel.deliver() path.""" + cfg = Config() + cfg.notifications.cold_start_grace_seconds = cold_start_grace + cfg.notifications.toggles = { + "fire": NotificationToggle( + name="fire", enabled=fire_enabled, + freshness_seconds=fire_freshness, + cooldown_seconds=fire_cooldown, + severity_channels=(fire_severity_channels or + {"routine": ["mesh_broadcast"], + "priority": ["mesh_broadcast"], + "immediate": ["mesh_broadcast"]}), + broadcast_channel=1, + ), + } + return cfg + + +def _mk_channel_factory(deliver_outcome=True): + """Returns (factory, channel_mock_list). Each call appends one mock and + returns it so tests can assert call counts.""" + created = [] + + def _factory(rule, connector): + ch = MagicMock() + ch.deliver = AsyncMock(return_value=deliver_outcome) + created.append(ch) + return ch + return _factory, created + + +def _fire_event(*, source="fires", category="wildfire_incident", + event_id=None, severity="priority", region="US-ID", + timestamp=None): + """Build a wildfire Event whose category maps to the `fire` toggle.""" + e = make_event( + source=source, category=category, severity=severity, + region=region, + title="🔥 Test fire", lat=43.6, lon=-116.2, + ) + if event_id is not None: + e.id = event_id + if timestamp is not None: + e.timestamp = timestamp + return e + + +# ============================================================================ +# Schema + table baseline +# ============================================================================ + + +def test_v5_tables_present_after_migration(db_path): + """v5.sql ran during init_db. All three tables + the singleton row.""" + conn = persistence_db.get_db(db_path) + tables = {r["name"] for r in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall()} + assert "dispatcher_state" in tables + assert "dispatcher_cooldowns" in tables + assert "dispatcher_dedup" in tables + + row = conn.execute("SELECT * FROM dispatcher_state").fetchone() + assert row is not None + assert row["id"] == 1 + assert row["cold_start_anchor"] is None + assert row["stale_dropped"] == 0 + assert row["cooldown_dropped"] == 0 + assert row["dedup_dropped"] == 0 + assert row["cold_start_dropped"] == 0 + + +def test_v5_migration_idempotent(db_path): + """Re-running migrations is a no-op (only one singleton row, no errors).""" + # Force a second migration pass via the public path. + persistence_db._initialised.discard(db_path) + persistence_db._apply_migrations(persistence_db.get_db(db_path)) + conn = persistence_db.get_db(db_path) + n = conn.execute("SELECT COUNT(*) FROM dispatcher_state").fetchone()[0] + assert n == 1 + + +# ============================================================================ +# Construction: restore from empty DB = clean defaults +# ============================================================================ + + +def test_init_restores_empty_state(db_path): + cfg = _build_config() + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + assert d._stale_dropped == 0 + assert d._cooldown_dropped == 0 + assert d._dedup_dropped == 0 + assert d._cold_start_dropped == 0 + assert d._first_event_at is None + assert d._toggle_cooldown == {} + assert len(d._dedup_lru) == 0 + + +# ============================================================================ +# Counters survive restart +# ============================================================================ + + +def test_counter_increments_persist_across_restart(db_path): + cfg = _build_config() + factory, _ = _mk_channel_factory() + d1 = Dispatcher(cfg, factory) + + # Forcibly bump each counter via the persistence helper. Mirrors what + # _dispatch_toggles does on a drop. + d1._stale_dropped = 5 + d1._cooldown_dropped = 3 + d1._dedup_dropped = 7 + d1._cold_start_dropped = 2 + d1._first_event_at = 12345.678 + d1._persist_state() + + # Fresh dispatcher, same DB. + d2 = Dispatcher(cfg, factory) + assert d2._stale_dropped == 5 + assert d2._cooldown_dropped == 3 + assert d2._dedup_dropped == 7 + assert d2._cold_start_dropped == 2 + assert d2._first_event_at == pytest.approx(12345.678) + + +def test_dispatch_stats_reflects_persisted_state(db_path): + cfg = _build_config() + factory, _ = _mk_channel_factory() + d1 = Dispatcher(cfg, factory) + d1._stale_dropped = 99 + d1._persist_state() + + d2 = Dispatcher(cfg, factory) + stats = d2.dispatch_stats() + assert stats["stale_dropped"] == 99 + + +# ============================================================================ +# Cooldown survives restart, re-arms on boot +# ============================================================================ + + +def test_cooldown_persists_across_restart(db_path): + cfg = _build_config(fire_cooldown=300) + factory, _ = _mk_channel_factory() + d1 = Dispatcher(cfg, factory) + + key = ("fire", "wildfire_incident", "US-ID") + now = _time.time() + d1._toggle_cooldown[key] = now + d1._persist_cooldown(key, now, 300) + + d2 = Dispatcher(cfg, factory) + assert key in d2._toggle_cooldown + assert d2._toggle_cooldown[key] == pytest.approx(now) + + +def test_cooldown_rearms_on_boot_within_window(db_path): + """Cooldown was set 100s before restart; check 100s after restart + (so 200s after the fire) with cooldown_s=300 => still in cooldown + (200 < 300).""" + cfg = _build_config(fire_cooldown=300) + factory, _ = _mk_channel_factory() + d1 = Dispatcher(cfg, factory) + + fire_t = 10_000.0 + key = ("fire", "wildfire_incident", "US-ID") + d1._toggle_cooldown[key] = fire_t + d1._persist_cooldown(key, fire_t, 300) + + # Restart -- fresh dispatcher reads from DB. + d2 = Dispatcher(cfg, factory) + assert key in d2._toggle_cooldown + last = d2._toggle_cooldown[key] + now = fire_t + 200 # 200s elapsed + assert (now - last) < 300, "must still be in cooldown window" + + +def test_cooldown_prune_2x_window(db_path): + """Inserting a cooldown deletes rows whose last_fired_at < (now - 2*cooldown_s).""" + cfg = _build_config(fire_cooldown=300) + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + now = _time.time() + very_old_t = now - 1000.0 # > 2*300=600s old + fresh_t = now - 100.0 + + d._persist_cooldown(("oldfam", "cat", "*"), very_old_t, 300) + d._persist_cooldown(("freshfam", "cat", "*"), fresh_t, 300) + # The 2nd call's prune deletes the old row. + # Issue one more recent insert to make sure prune runs again with `now`. + d._persist_cooldown(("nowfam", "cat", "*"), now, 300) + + conn = persistence_db.get_db(db_path) + rows = conn.execute( + "SELECT toggle FROM dispatcher_cooldowns ORDER BY toggle" + ).fetchall() + toggles = [r["toggle"] for r in rows] + assert "oldfam" not in toggles, "stale cooldown should be pruned" + assert "freshfam" in toggles + assert "nowfam" in toggles + + +# ============================================================================ +# Dedup survives restart +# ============================================================================ + + +def test_dedup_lru_rebuilt_on_restart(db_path): + cfg = _build_config() + factory, _ = _mk_channel_factory() + d1 = Dispatcher(cfg, factory) + + now = _time.time() + for i in range(5): + key = ("firms", f"event_{i}") + d1._dedup_lru[key] = True + d1._persist_dedup(key, now + i) + + d2 = Dispatcher(cfg, factory) + assert len(d2._dedup_lru) == 5 + for i in range(5): + assert ("firms", f"event_{i}") in d2._dedup_lru + + +def test_dedup_lru_rebuild_caps_at_10k(db_path): + """If on-disk has > 10k rows, in-memory restores only the most recent 10k.""" + cfg = _build_config() + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + conn = persistence_db.get_db(db_path) + base = _time.time() - 10000 + rows = [("firms", f"e{i}", base + i) + for i in range(_DEDUP_LRU_MAX + 50)] + conn.executemany( + "INSERT INTO dispatcher_dedup(source, event_id, seen_at) VALUES (?,?,?)", + rows, + ) + + d2 = Dispatcher(cfg, factory) + assert len(d2._dedup_lru) == _DEDUP_LRU_MAX + # Most recent IDs survived (e.g. e_(_DEDUP_LRU_MAX+49)) but oldest didn't (e_0). + assert ("firms", "e0") not in d2._dedup_lru + assert ("firms", f"e{_DEDUP_LRU_MAX + 49}") in d2._dedup_lru + + +def test_dedup_7_day_cleanup(db_path): + cfg = _build_config() + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + now = _time.time() + # Need ancient < cutoff strictly. cutoff = now - retention; ancient = + # now - retention - 1000 leaves 1000s of headroom. + too_old = now - (_DEDUP_DB_RETENTION_S + 1000) + + # Seed an ancient row directly to bypass the prune-on-insert guard. + conn = persistence_db.get_db(db_path) + conn.execute( + "INSERT INTO dispatcher_dedup(source, event_id, seen_at) VALUES (?,?,?)", + ("firms", "ancient", too_old), + ) + # Now an insert via the helper triggers the cleanup: cutoff = now - retention. + d._persist_dedup(("firms", "fresh"), now) + + rows = conn.execute( + "SELECT event_id FROM dispatcher_dedup ORDER BY event_id" + ).fetchall() + ids = [r["event_id"] for r in rows] + assert "ancient" not in ids + assert "fresh" in ids + + +def test_dedup_same_key_updates_seen_at(db_path): + """Re-seeing the same (source, event_id) refreshes its seen_at without + creating a second row -- the INSERT OR REPLACE on the composite PK.""" + cfg = _build_config() + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + t1 = _time.time() + t2 = t1 + 50 + + d._persist_dedup(("firms", "abc"), t1) + d._persist_dedup(("firms", "abc"), t2) + + conn = persistence_db.get_db(db_path) + rows = conn.execute("SELECT seen_at FROM dispatcher_dedup WHERE event_id='abc'").fetchall() + assert len(rows) == 1 + assert rows[0]["seen_at"] == pytest.approx(t2) + + +# ============================================================================ +# End-to-end: counters via real _dispatch_toggles +# ============================================================================ + + +def test_real_dispatch_arms_cold_start_anchor(db_path): + """First event to reach the toggle anchors the grace window and the + anchor lands in dispatcher_state.""" + cfg = _build_config(cold_start_grace=60) + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + ev = _fire_event(event_id="fire_anchor_001", timestamp=_time.time()) + asyncio.run(d._dispatch_toggles(ev)) + + assert d._first_event_at is not None + # And in SQLite. + conn = persistence_db.get_db(db_path) + row = conn.execute("SELECT cold_start_anchor FROM dispatcher_state").fetchone() + assert row["cold_start_anchor"] is not None + + +def test_real_dispatch_stale_drop_persists(db_path): + """A stale event increments stale_dropped on disk.""" + cfg = _build_config(cold_start_grace=0, fire_freshness=60) + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + ev = _fire_event(event_id="fire_stale_001", + timestamp=_time.time() - 600) # 10 min old vs 60s window + asyncio.run(d._dispatch_toggles(ev)) + + assert d._stale_dropped == 1 + conn = persistence_db.get_db(db_path) + row = conn.execute("SELECT stale_dropped FROM dispatcher_state").fetchone() + assert row["stale_dropped"] == 1 + + +def test_real_dispatch_dedup_writes_through(db_path): + """A successful first-sighting writes a dedup row; a second sighting + drops at dedup AND refreshes seen_at.""" + cfg = _build_config(cold_start_grace=0, fire_freshness=86400, fire_cooldown=0) + factory, _ = _mk_channel_factory() + d = Dispatcher(cfg, factory) + + ev1 = _fire_event(event_id="fire_dup_001", timestamp=_time.time()) + asyncio.run(d._dispatch_toggles(ev1)) + + conn = persistence_db.get_db(db_path) + rows = conn.execute("SELECT source, event_id FROM dispatcher_dedup").fetchall() + assert ("fires", "fire_dup_001") in [(r["source"], r["event_id"]) for r in rows] + + # second sighting + ev2 = _fire_event(event_id="fire_dup_001", timestamp=_time.time()) + asyncio.run(d._dispatch_toggles(ev2)) + + assert d._dedup_dropped == 1 + row = conn.execute("SELECT dedup_dropped FROM dispatcher_state").fetchone() + assert row["dedup_dropped"] == 1 + + +# ============================================================================ +# Synthetic storm -> restart -> replay (commit spec step 5) +# ============================================================================ + + +def test_synthetic_storm_restart_replay(db_path): + """Phase 1: 50 distinct events. Phase 2: new Dispatcher, same DB. Phase 3: + 5 more events including one duplicate from phase 1. Assertions: + - Phase 3 duplicate is dropped by dedup + - Counters in phase 2/3 inherit phase 1 values (not reset) + - Dedup LRU + cooldown carries across + """ + cfg = _build_config(cold_start_grace=0, fire_freshness=86400, fire_cooldown=0) + factory_a, _ = _mk_channel_factory() + d1 = Dispatcher(cfg, factory_a) + + # ---- Phase 1: 50 distinct events ---- + base_ts = _time.time() + for i in range(50): + ev = _fire_event(event_id=f"storm_{i}", timestamp=base_ts) + asyncio.run(d1._dispatch_toggles(ev)) + + p1_dedup_rows = persistence_db.get_db(db_path).execute( + "SELECT COUNT(*) FROM dispatcher_dedup" + ).fetchone()[0] + p1_dedup_dropped = d1._dedup_dropped + p1_stale_dropped = d1._stale_dropped + + assert p1_dedup_rows == 50 + # All 50 were first-sightings -> no dedup_dropped increments. + assert p1_dedup_dropped == 0 + + # ---- Phase 2: simulated restart, fresh Dispatcher, same DB ---- + factory_b, _ = _mk_channel_factory() + d2 = Dispatcher(cfg, factory_b) + # Carry-over from phase 1: + assert d2._dedup_dropped == p1_dedup_dropped + assert d2._stale_dropped == p1_stale_dropped + assert len(d2._dedup_lru) == 50 + + # ---- Phase 3: 5 new events incl one phase-1 duplicate ---- + for i in range(50, 54): + asyncio.run(d2._dispatch_toggles( + _fire_event(event_id=f"storm_{i}", timestamp=base_ts) + )) + # The duplicate: + dup_ev = _fire_event(event_id="storm_3", timestamp=base_ts) + asyncio.run(d2._dispatch_toggles(dup_ev)) + + # Now: dedup_dropped must have incremented by exactly 1 (the duplicate). + assert d2._dedup_dropped == p1_dedup_dropped + 1, ( + f"expected +1 dedup drop, got {d2._dedup_dropped - p1_dedup_dropped}" + ) + + # 50 phase-1 + 4 phase-3-new = 54 distinct rows on disk. + final_rows = persistence_db.get_db(db_path).execute( + "SELECT COUNT(*) FROM dispatcher_dedup" + ).fetchone()[0] + assert final_rows == 54 + + +# ============================================================================ +# Graceful degrade: dispatcher works even when persistence is broken +# ============================================================================ + + +def test_dispatcher_construct_when_db_unavailable(monkeypatch, tmp_path): + """If get_db() raises on construct, the dispatcher must still init + with fresh in-memory state (no crash, no abort).""" + cfg = _build_config() + factory, _ = _mk_channel_factory() + + def _broken_get_db(*a, **kw): + raise RuntimeError("persistence layer down") + + monkeypatch.setattr( + "meshai.notifications.pipeline.dispatcher.__name__", "" + ) + # Patch the import target: persistence.get_db. + import meshai.persistence as p + monkeypatch.setattr(p, "get_db", _broken_get_db) + + d = Dispatcher(cfg, factory) # must not raise + assert d._stale_dropped == 0 + assert d._first_event_at is None diff --git a/tests/test_notification_toggles.py b/tests/test_notification_toggles.py index f08d0cc..2c08713 100644 --- a/tests/test_notification_toggles.py +++ b/tests/test_notification_toggles.py @@ -24,6 +24,23 @@ class RecChannel: def _dispatch(cfg, event): rec = [] + # v0.6-2: wipe dedup state between calls so each _dispatch is an + # independent "what happens if this event arrives now?" check. + # The pre-v0.6-2 in-memory dedup naturally reset per Dispatcher + # instance; the new persisted dedup carries across instances unless + # we clear it here. + try: + from meshai.persistence import get_db + conn = get_db() + conn.execute("DELETE FROM dispatcher_dedup") + conn.execute("DELETE FROM dispatcher_cooldowns") + conn.execute( + "UPDATE dispatcher_state SET cold_start_anchor=NULL, " + "stale_dropped=0, cooldown_dropped=0, dedup_dropped=0, " + "cold_start_dropped=0 WHERE id=1" + ) + except Exception: + pass d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None) asyncio.run(d.dispatch(event)) return rec