feat(v0.6-2): dispatcher state persistence -- cold-start, cooldowns, dedup LRU to SQLite

Closes Rule-20 dispatcher gap from audit doc v0.6-phase1-audit.md finding #1.
Pre-this-commit the cold-start anchor, 4 drop counters, per-toggle cooldown
map, and dedup OrderedDict all lived in Dispatcher instance memory and were
lost on every container restart.

v5.sql adds three tables:
  - dispatcher_state (singleton id=1): cold_start_anchor + 4 drop counters
  - dispatcher_cooldowns ((toggle,category,region) keyed): last_fired_at
  - dispatcher_dedup ((source,event_id) keyed): seen_at

Dispatcher refactor:
  - __init__ calls _restore_from_db -- counters, cold-start anchor, cooldown
    map, and dedup LRU (most-recent 10k by seen_at) all rehydrated from the
    three new tables
  - write-through on every mutation: _persist_state for counter/anchor,
    _persist_cooldown for cooldown UPSERT + 2*cooldown_s prune,
    _persist_dedup for dedup INSERT OR REPLACE + 7-day cleanup
  - in-memory caches stay authoritative on the fast read path
  - cumulative-since-install counters (NOT since-boot); LLM will be able
    to answer "we have dropped 47 stale events this week" after commit #5
    (env_reporter) lands
  - graceful degrade: missing v5 tables / persistence outage falls back to
    fresh in-memory state without crashing the constructor

Tests:
  - tests/test_dispatcher_persistence.py (17 tests): state restore on init,
    counter+cooldown+dedup survival across simulated restart, cooldown rearm
    within 2x window, dedup LRU rebuild caps at 10k, 7-day cleanup on insert,
    INSERT OR REPLACE on duplicate source+event_id, v5 migration idempotent,
    synthetic storm (50 events) -> restart -> replay (5 incl 1 duplicate)
    with the duplicate dedup-rejected and counters NOT resetting
  - tests/conftest.py (new): autouse MESHAI_DB_PATH redirection to per-test
    tmp file, so the dispatcher_*  tables on production /data dont get
    polluted by tests that construct Dispatcher() without an explicit fixture
  - tests/test_notification_toggles.py: _dispatch helper wipes dedup/cooldown/
    state tables between calls (per-call independence preserved; pre-v0.6-2
    in-memory-only Dispatcher reset naturally per instance)

Test count: 680 -> 697 (+17 new, 0 regressions).

Refs audit doc v0.6-phase1-audit.md finding #1.
This commit is contained in:
Matt Johnson (via Claude) 2026-06-05 16:35:40 +00:00
commit c333a97344
6 changed files with 815 additions and 7 deletions

View file

@ -12,6 +12,19 @@ v0.5.2: toggle path gains three guards at the entrance (staleness, per-toggle
cooldown, (source,id) LRU dedup) plus the friendly mesh-broadcast composer so
the toggle path stops emitting raw `[Family] central.category` debug strings.
The legacy rules path is intentionally left untouched (no regression risk).
v0.6-2: state persistence (audit doc finding #1). The cold-start anchor,
the four drop counters, the per-(toggle,category,region) cooldown map,
and the (source,event_id) dedup OrderedDict now write through to SQLite
on every mutation. The dispatcher restores them on __init__ so they
survive container restart. In-memory caches stay authoritative on the
fast path; SQLite is the durability layer + the LLM's "what's been
suppressed?" query target (commit #5: env_reporter).
Cumulative-since-install counters: the four `_*_dropped` ints are NOT
reset on boot. They carry forward from the dispatcher_state singleton
row. A `dispatch_stats()` call returns the in-memory (=most-recent)
values, which mirror the on-disk values exactly.
"""
import logging
@ -25,8 +38,20 @@ from meshai.notifications.renderers.composer import compose_mesh_message
# Bounded (source, event.id) LRU set — see _dispatch_toggles Section 3.
# Holds the in-memory fast-path cap; SQLite dispatcher_dedup retains a
# 7-day window which can exceed this. On boot we restore the most-recent
# _DEDUP_LRU_MAX rows into this OrderedDict.
_DEDUP_LRU_MAX = 10_000
# v0.6-2 SQLite dedup retention. Anything older than this is deleted on
# every dispatcher_dedup insert.
_DEDUP_DB_RETENTION_S = 7 * 86400 # 604_800
# In-memory cooldown map prune threshold (entries). When the map grows past
# this we re-apply the 2*cooldown_s cutoff so it stays bounded. The SQLite
# prune fires on every cooldown write regardless.
_COOLDOWN_INMEM_PRUNE_THRESHOLD = 1024
class Dispatcher:
"""Dispatches immediate events to channels matching configured rules."""
@ -48,6 +73,7 @@ class Dispatcher:
self._connector = connector
self._logger = logging.getLogger("meshai.pipeline.dispatcher")
# v0.5.2 — toggle-path guards (ops counters exposed via dispatch_stats()):
# v0.6-2: restored from dispatcher_state on __init__ via _restore_from_db.
self._stale_dropped = 0
self._cooldown_dropped = 0
self._dedup_dropped = 0
@ -60,6 +86,156 @@ class Dispatcher:
self._toggle_cooldown: dict[tuple[str, str, str], float] = {}
# Insertion-ordered (source, event.id) -> sentinel; evict oldest at cap.
self._dedup_lru: "OrderedDict[tuple[str, str], bool]" = OrderedDict()
# v0.6-2: hydrate from SQLite. Graceful no-op if persistence is
# unavailable -- the dispatcher still works, just without
# cross-restart durability.
self._restore_from_db()
# ---------- v0.6-2 persistence -----------------------------------------
def _restore_from_db(self) -> None:
"""Hydrate in-memory state from dispatcher_state + dispatcher_cooldowns
+ dispatcher_dedup on dispatcher construction. Idempotent.
Defensive against missing tables: if the v5 migration hasn't run yet
(e.g. fresh DB created by a test fixture before migrations apply),
any sqlite OperationalError is caught and the dispatcher falls back
to fresh in-memory state. The fast path is unaffected."""
try:
from meshai.persistence import get_db
conn = get_db()
except Exception:
self._logger.exception(
"dispatcher: persistence unavailable on init; using fresh "
"in-memory state (counters reset, no cooldown/dedup carryover)"
)
return
# State singleton. Wrap in try/except so a missing dispatcher_state
# table (pre-v5 schema) degrades gracefully to fresh state instead
# of raising into the Dispatcher constructor.
try:
row = conn.execute(
"SELECT cold_start_anchor, stale_dropped, cooldown_dropped, "
"dedup_dropped, cold_start_dropped FROM dispatcher_state WHERE id=1"
).fetchone()
except Exception:
self._logger.debug(
"dispatcher: v5 tables not present yet; using fresh state"
)
return
if row is not None:
self._first_event_at = row["cold_start_anchor"]
self._stale_dropped = int(row["stale_dropped"] or 0)
self._cooldown_dropped = int(row["cooldown_dropped"] or 0)
self._dedup_dropped = int(row["dedup_dropped"] or 0)
self._cold_start_dropped = int(row["cold_start_dropped"] or 0)
self._logger.info(
"dispatcher state restored: cold_start_anchor=%s "
"stale=%d cooldown=%d dedup=%d cold_start=%d",
self._first_event_at, self._stale_dropped,
self._cooldown_dropped, self._dedup_dropped,
self._cold_start_dropped,
)
# Cooldowns: every row restored verbatim (the in-memory prune
# threshold of 1024 will fire on the first cooldown write if the
# restored set is bigger, so even pathological histories self-bound).
for r in conn.execute(
"SELECT toggle, category, region, last_fired_at "
"FROM dispatcher_cooldowns"
).fetchall():
self._toggle_cooldown[(r["toggle"], r["category"], r["region"])] = \
float(r["last_fired_at"])
# Dedup LRU: restore the most-recent _DEDUP_LRU_MAX rows in
# newest-first order, then re-add into the OrderedDict oldest-first
# so the natural insertion order matches the OrderedDict-as-LRU
# contract (oldest = first-evicted on overflow). On-disk retains a
# 7-day window which may exceed the in-memory cap; the LLM still
# sees the full window via direct SELECT.
rows = conn.execute(
"SELECT source, event_id FROM dispatcher_dedup "
"ORDER BY seen_at DESC LIMIT ?",
(_DEDUP_LRU_MAX,),
).fetchall()
for r in reversed(rows):
self._dedup_lru[(r["source"], r["event_id"])] = True
if self._toggle_cooldown or self._dedup_lru:
self._logger.info(
"dispatcher caches restored: cooldowns=%d dedup_lru=%d",
len(self._toggle_cooldown), len(self._dedup_lru),
)
def _persist_state(self) -> None:
"""Write the current counters + cold_start_anchor to dispatcher_state.
Called whenever any of those values change."""
try:
from meshai.persistence import get_db
conn = get_db()
conn.execute(
"UPDATE dispatcher_state SET cold_start_anchor=?, "
"stale_dropped=?, cooldown_dropped=?, dedup_dropped=?, "
"cold_start_dropped=?, updated_at=? WHERE id=1",
(self._first_event_at, self._stale_dropped,
self._cooldown_dropped, self._dedup_dropped,
self._cold_start_dropped, time.time()),
)
except Exception:
self._logger.exception(
"dispatcher: state write-through failed; in-memory still ok"
)
def _persist_cooldown(self, key: tuple[str, str, str],
now: float, cooldown_s: int) -> None:
"""UPSERT a single (toggle, category, region) cooldown row + prune
rows older than (2 * cooldown_s). Mirrors the in-memory prune
semantics moved off the per-1024-grow check."""
try:
from meshai.persistence import get_db
conn = get_db()
toggle, category, region = key
conn.execute(
"INSERT OR REPLACE INTO dispatcher_cooldowns("
"toggle, category, region, last_fired_at, updated_at) "
"VALUES (?,?,?,?,?)",
(toggle, category, region, now, now),
)
if cooldown_s > 0:
cutoff = now - (2 * cooldown_s)
conn.execute(
"DELETE FROM dispatcher_cooldowns WHERE last_fired_at < ?",
(cutoff,),
)
except Exception:
self._logger.exception(
"dispatcher: cooldown write-through failed for %s", key
)
def _persist_dedup(self, key: tuple[str, str], now: float) -> None:
"""INSERT a single (source, event_id) dedup row + prune rows older
than _DEDUP_DB_RETENTION_S. Same key arriving twice updates seen_at."""
try:
from meshai.persistence import get_db
conn = get_db()
source, event_id = key
conn.execute(
"INSERT OR REPLACE INTO dispatcher_dedup("
"source, event_id, seen_at) VALUES (?,?,?)",
(source, event_id, now),
)
cutoff = now - _DEDUP_DB_RETENTION_S
conn.execute(
"DELETE FROM dispatcher_dedup WHERE seen_at < ?",
(cutoff,),
)
except Exception:
self._logger.exception(
"dispatcher: dedup write-through failed for %s", key
)
# ---------- core dispatch ----------------------------------------------
async def dispatch(self, event: Event) -> None:
"""Deliver via matching rules AND enabled family toggles (parallel, v0.5)."""
@ -68,7 +244,7 @@ class Dispatcher:
async def _dispatch_rules(self, event: Event) -> None:
"""Deliver an immediate-severity event to all matching channels.
This method is async and awaits each channel.deliver() call.
"""
rules = self._matching_rules(event)
@ -109,6 +285,10 @@ class Dispatcher:
re-delivery during reconnect.
Then composes a friendly mesh string instead of the prior raw
`[Family] central.category` debug format.
v0.6-2: every mutation of the four drop counters, the cold-start
anchor, the cooldown map, and the dedup LRU writes through to
SQLite via the _persist_* helpers. Read fast-path stays in-memory.
"""
toggles = getattr(self._config.notifications, "toggles", None)
if not isinstance(toggles, dict) or not toggles:
@ -131,12 +311,14 @@ class Dispatcher:
now_anchor = time.time()
if self._first_event_at is None:
self._first_event_at = now_anchor
self._persist_state() # anchor armed -- durable
self._logger.info(
"cold-start grace anchor set: t0=%.3f window=%ds",
now_anchor, grace_s,
)
if (now_anchor - self._first_event_at) < grace_s:
self._cold_start_dropped += 1
self._persist_state()
self._logger.info(
"cold-start grace: dropping broadcast source=%s category=%s "
"elapsed=%.1fs window=%ds",
@ -156,6 +338,7 @@ class Dispatcher:
age = time.time() - event.timestamp
if age > freshness_s:
self._stale_dropped += 1
self._persist_state()
self._logger.debug(
"dispatcher: dropping stale event source=%s category=%s "
"age=%.0fs > freshness=%ds",
@ -175,12 +358,14 @@ class Dispatcher:
last_fired = self._toggle_cooldown.get(ck)
if last_fired is not None and (now - last_fired) < cooldown_s:
self._cooldown_dropped += 1
self._persist_state()
return # silent throttle — no log spam
self._toggle_cooldown[ck] = now
# Lazy prune: keep map bounded at ~2x the largest cooldown by
# discarding entries older than 2 * cooldown_s. Cheap; runs only
# when the map grows past a threshold so it's not per-event work.
if len(self._toggle_cooldown) > 1024:
self._persist_cooldown(ck, now, cooldown_s)
# In-memory prune: mirror the SQLite cutoff when the map grows
# past the threshold. The SQLite prune already ran inside
# _persist_cooldown.
if len(self._toggle_cooldown) > _COOLDOWN_INMEM_PRUNE_THRESHOLD:
cutoff = now - (2 * cooldown_s)
self._toggle_cooldown = {
k: t for k, t in self._toggle_cooldown.items() if t >= cutoff
@ -192,8 +377,13 @@ class Dispatcher:
# Touch to keep recent.
self._dedup_lru.move_to_end(dk)
self._dedup_dropped += 1
self._persist_state()
# Refresh seen_at on disk too -- a repeat sighting is fresh
# evidence we're still seeing this id.
self._persist_dedup(dk, time.time())
return
self._dedup_lru[dk] = True
self._persist_dedup(dk, time.time())
while len(self._dedup_lru) > _DEDUP_LRU_MAX:
self._dedup_lru.popitem(last=False) # evict oldest
@ -241,7 +431,12 @@ class Dispatcher:
self._logger.exception(f"Toggle channel delivery failed for {fam}/{ch_type}")
def dispatch_stats(self) -> dict:
"""Expose v0.5.2 toggle-path guard counters for ops/health endpoints."""
"""Expose v0.5.2 toggle-path guard counters for ops/health endpoints.
Returns the in-memory (= write-through current) values. Equivalent to
SELECT from dispatcher_state but avoids the DB round-trip on every
call. The numbers are cumulative-since-install (NOT since-boot).
"""
return {
"stale_dropped": self._stale_dropped,
"cooldown_dropped": self._cooldown_dropped,
@ -279,8 +474,10 @@ class Dispatcher:
now_anchor = time.time()
if self._first_event_at is None:
self._first_event_at = now_anchor
self._persist_state()
if (now_anchor - self._first_event_at) < grace_s:
self._cold_start_dropped += 1
self._persist_state()
self._logger.info(
"cold-start grace: dropping scheduled broadcast "
"(table=%s pk=%s)",

View file

@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
DEFAULT_DB_PATH = "/data/meshai.sqlite"
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
SCHEMA_VERSION = 4
SCHEMA_VERSION = 5
SCHEMA_META_TABLE = "schema_meta"
MIGRATIONS_DIR = Path(__file__).parent / "migrations"

View file

@ -0,0 +1,69 @@
-- v0.6-2 dispatcher state persistence (audit doc finding #1).
--
-- Pre-this-migration the dispatcher held its cold-start anchor, four drop
-- counters, per-(toggle,category,region) cooldown map, and (source,event_id)
-- dedup OrderedDict in instance memory only -- every container restart
-- silently reset them. The LLM (commit #5: env_reporter) needs cumulative
-- "we've dropped 47 stale events this week" semantics, which requires
-- write-through to SQLite.
--
-- All three tables are independent; the dispatcher's in-memory caches
-- remain authoritative on the fast path. Writes go to both. On boot the
-- in-memory caches are rebuilt from SQLite.
-- --------------------------------------------------------------------------
-- dispatcher_state: singleton row keyed on id=1. The CHECK constraint plus
-- the seed-INSERT-OR-IGNORE below guarantees exactly one row ever exists.
-- All counters are cumulative-since-install (NOT since-boot); the dispatcher
-- read-restores them on every __init__ and write-increments per drop.
-- --------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS dispatcher_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
cold_start_anchor REAL, -- epoch seconds; NULL until first event-to-enabled-toggle
stale_dropped INTEGER NOT NULL DEFAULT 0,
cooldown_dropped INTEGER NOT NULL DEFAULT 0,
dedup_dropped INTEGER NOT NULL DEFAULT 0,
cold_start_dropped INTEGER NOT NULL DEFAULT 0,
updated_at REAL
);
-- Seed the singleton row. INSERT OR IGNORE so a re-run is a clean no-op.
INSERT OR IGNORE INTO dispatcher_state(id, cold_start_anchor,
stale_dropped, cooldown_dropped, dedup_dropped, cold_start_dropped,
updated_at)
VALUES (1, NULL, 0, 0, 0, 0, NULL);
-- --------------------------------------------------------------------------
-- dispatcher_cooldowns: one row per (toggle, category, region) triple that
-- has fired at least once. `last_fired_at` powers the "in cooldown?" check.
-- Pruned on every insert: rows older than (2 * current_cooldown_s) are
-- deleted (matches the existing in-memory prune semantics at
-- pipeline/dispatcher.py:183-187).
-- --------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS dispatcher_cooldowns (
toggle TEXT NOT NULL,
category TEXT NOT NULL,
region TEXT NOT NULL,
last_fired_at REAL NOT NULL,
updated_at REAL NOT NULL,
PRIMARY KEY (toggle, category, region)
);
CREATE INDEX IF NOT EXISTS idx_dispatcher_cooldowns_last_fired
ON dispatcher_cooldowns(last_fired_at);
-- --------------------------------------------------------------------------
-- dispatcher_dedup: bounded LRU on disk -- 7-day window (deleted on every
-- insert). The in-memory OrderedDict still caps at 10k for the fast path;
-- on boot we rebuild it from the most-recent 10k rows ordered by seen_at.
-- This means the DB retains MORE history than the in-memory cap, which is
-- useful for the LLM ("did we already see this event ID?") even if it has
-- been evicted from the in-memory LRU.
-- --------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS dispatcher_dedup (
source TEXT NOT NULL,
event_id TEXT NOT NULL,
seen_at REAL NOT NULL,
PRIMARY KEY (source, event_id)
);
CREATE INDEX IF NOT EXISTS idx_dispatcher_dedup_seen_at
ON dispatcher_dedup(seen_at);

31
tests/conftest.py Normal file
View file

@ -0,0 +1,31 @@
"""Pytest fixture isolation for meshai persistence (v0.6-2).
Before v0.6-2 the dispatcher held all state in instance memory, so tests
that constructed `Dispatcher(...)` were inert w.r.t. SQLite. v0.6-2 made
`Dispatcher.__init__` read/restore from the persistence layer, which by
default points at `/data/meshai.sqlite`. Without isolation every test
would now read+write production state, polluting across tests and across
pytest invocations.
This autouse fixture redirects `MESHAI_DB_PATH` to a per-test tmp file
and clears the persistence-layer threading.local caches around each test.
Existing tests that don't reference any fixture get isolation for free;
tests that explicitly use a `db_path` (or similar) fixture can still
override the env var inside their own fixture body -- last setenv wins.
"""
import pytest
from meshai.persistence import close_thread_connection
from meshai.persistence import db as _persistence_db
@pytest.fixture(autouse=True)
def _isolate_meshai_db(tmp_path, monkeypatch):
"""Point MESHAI_DB_PATH at a tmp file per test."""
p = str(tmp_path / "meshai-test-isolated.sqlite")
monkeypatch.setenv("MESHAI_DB_PATH", p)
_persistence_db._initialised.clear()
close_thread_connection()
yield p
close_thread_connection()
_persistence_db._initialised.discard(p)

View file

@ -0,0 +1,494 @@
"""v0.6-2 dispatcher state persistence tests (audit doc finding #1).
Covers:
- state row restored on dispatcher __init__
- counter increments survive restart
- cold-start anchor survives restart
- cooldown map rebuilt from dispatcher_cooldowns on init
- dedup LRU rebuilt from dispatcher_dedup (most-recent 10k) on init
- 7-day dedup cleanup runs on insert
- 2*cooldown_s cooldown prune runs on insert
- v5 migration is idempotent
- synthetic storm + restart + replay probe (commit spec step 5)
"""
from __future__ import annotations
import asyncio
import time as _time
from unittest.mock import MagicMock, AsyncMock
import pytest
from meshai.config import Config, NotificationToggle
from meshai.notifications.events import make_event
from meshai.notifications.pipeline.dispatcher import (
Dispatcher, _DEDUP_LRU_MAX, _DEDUP_DB_RETENTION_S,
)
from meshai.persistence import close_thread_connection, init_db
from meshai.persistence import db as persistence_db
# ---------- fixtures --------------------------------------------------------
@pytest.fixture
def db_path(tmp_path, monkeypatch):
p = str(tmp_path / "disp-test.sqlite")
monkeypatch.setenv("MESHAI_DB_PATH", p)
persistence_db._initialised.clear()
close_thread_connection()
init_db()
yield p
close_thread_connection()
persistence_db._initialised.discard(p)
def _build_config(*, cold_start_grace=60,
fire_enabled=True, fire_cooldown=300,
fire_freshness=600,
fire_severity_channels=None):
"""Minimal Config with the `fire` family toggle enabled. The toggle uses
a fake `broadcast_channel=1` + mesh_broadcast severity-channel mapping so
dispatch() reaches the channel.deliver() path."""
cfg = Config()
cfg.notifications.cold_start_grace_seconds = cold_start_grace
cfg.notifications.toggles = {
"fire": NotificationToggle(
name="fire", enabled=fire_enabled,
freshness_seconds=fire_freshness,
cooldown_seconds=fire_cooldown,
severity_channels=(fire_severity_channels or
{"routine": ["mesh_broadcast"],
"priority": ["mesh_broadcast"],
"immediate": ["mesh_broadcast"]}),
broadcast_channel=1,
),
}
return cfg
def _mk_channel_factory(deliver_outcome=True):
"""Returns (factory, channel_mock_list). Each call appends one mock and
returns it so tests can assert call counts."""
created = []
def _factory(rule, connector):
ch = MagicMock()
ch.deliver = AsyncMock(return_value=deliver_outcome)
created.append(ch)
return ch
return _factory, created
def _fire_event(*, source="fires", category="wildfire_incident",
event_id=None, severity="priority", region="US-ID",
timestamp=None):
"""Build a wildfire Event whose category maps to the `fire` toggle."""
e = make_event(
source=source, category=category, severity=severity,
region=region,
title="🔥 Test fire", lat=43.6, lon=-116.2,
)
if event_id is not None:
e.id = event_id
if timestamp is not None:
e.timestamp = timestamp
return e
# ============================================================================
# Schema + table baseline
# ============================================================================
def test_v5_tables_present_after_migration(db_path):
"""v5.sql ran during init_db. All three tables + the singleton row."""
conn = persistence_db.get_db(db_path)
tables = {r["name"] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
assert "dispatcher_state" in tables
assert "dispatcher_cooldowns" in tables
assert "dispatcher_dedup" in tables
row = conn.execute("SELECT * FROM dispatcher_state").fetchone()
assert row is not None
assert row["id"] == 1
assert row["cold_start_anchor"] is None
assert row["stale_dropped"] == 0
assert row["cooldown_dropped"] == 0
assert row["dedup_dropped"] == 0
assert row["cold_start_dropped"] == 0
def test_v5_migration_idempotent(db_path):
"""Re-running migrations is a no-op (only one singleton row, no errors)."""
# Force a second migration pass via the public path.
persistence_db._initialised.discard(db_path)
persistence_db._apply_migrations(persistence_db.get_db(db_path))
conn = persistence_db.get_db(db_path)
n = conn.execute("SELECT COUNT(*) FROM dispatcher_state").fetchone()[0]
assert n == 1
# ============================================================================
# Construction: restore from empty DB = clean defaults
# ============================================================================
def test_init_restores_empty_state(db_path):
cfg = _build_config()
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
assert d._stale_dropped == 0
assert d._cooldown_dropped == 0
assert d._dedup_dropped == 0
assert d._cold_start_dropped == 0
assert d._first_event_at is None
assert d._toggle_cooldown == {}
assert len(d._dedup_lru) == 0
# ============================================================================
# Counters survive restart
# ============================================================================
def test_counter_increments_persist_across_restart(db_path):
cfg = _build_config()
factory, _ = _mk_channel_factory()
d1 = Dispatcher(cfg, factory)
# Forcibly bump each counter via the persistence helper. Mirrors what
# _dispatch_toggles does on a drop.
d1._stale_dropped = 5
d1._cooldown_dropped = 3
d1._dedup_dropped = 7
d1._cold_start_dropped = 2
d1._first_event_at = 12345.678
d1._persist_state()
# Fresh dispatcher, same DB.
d2 = Dispatcher(cfg, factory)
assert d2._stale_dropped == 5
assert d2._cooldown_dropped == 3
assert d2._dedup_dropped == 7
assert d2._cold_start_dropped == 2
assert d2._first_event_at == pytest.approx(12345.678)
def test_dispatch_stats_reflects_persisted_state(db_path):
cfg = _build_config()
factory, _ = _mk_channel_factory()
d1 = Dispatcher(cfg, factory)
d1._stale_dropped = 99
d1._persist_state()
d2 = Dispatcher(cfg, factory)
stats = d2.dispatch_stats()
assert stats["stale_dropped"] == 99
# ============================================================================
# Cooldown survives restart, re-arms on boot
# ============================================================================
def test_cooldown_persists_across_restart(db_path):
cfg = _build_config(fire_cooldown=300)
factory, _ = _mk_channel_factory()
d1 = Dispatcher(cfg, factory)
key = ("fire", "wildfire_incident", "US-ID")
now = _time.time()
d1._toggle_cooldown[key] = now
d1._persist_cooldown(key, now, 300)
d2 = Dispatcher(cfg, factory)
assert key in d2._toggle_cooldown
assert d2._toggle_cooldown[key] == pytest.approx(now)
def test_cooldown_rearms_on_boot_within_window(db_path):
"""Cooldown was set 100s before restart; check 100s after restart
(so 200s after the fire) with cooldown_s=300 => still in cooldown
(200 < 300)."""
cfg = _build_config(fire_cooldown=300)
factory, _ = _mk_channel_factory()
d1 = Dispatcher(cfg, factory)
fire_t = 10_000.0
key = ("fire", "wildfire_incident", "US-ID")
d1._toggle_cooldown[key] = fire_t
d1._persist_cooldown(key, fire_t, 300)
# Restart -- fresh dispatcher reads from DB.
d2 = Dispatcher(cfg, factory)
assert key in d2._toggle_cooldown
last = d2._toggle_cooldown[key]
now = fire_t + 200 # 200s elapsed
assert (now - last) < 300, "must still be in cooldown window"
def test_cooldown_prune_2x_window(db_path):
"""Inserting a cooldown deletes rows whose last_fired_at < (now - 2*cooldown_s)."""
cfg = _build_config(fire_cooldown=300)
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
now = _time.time()
very_old_t = now - 1000.0 # > 2*300=600s old
fresh_t = now - 100.0
d._persist_cooldown(("oldfam", "cat", "*"), very_old_t, 300)
d._persist_cooldown(("freshfam", "cat", "*"), fresh_t, 300)
# The 2nd call's prune deletes the old row.
# Issue one more recent insert to make sure prune runs again with `now`.
d._persist_cooldown(("nowfam", "cat", "*"), now, 300)
conn = persistence_db.get_db(db_path)
rows = conn.execute(
"SELECT toggle FROM dispatcher_cooldowns ORDER BY toggle"
).fetchall()
toggles = [r["toggle"] for r in rows]
assert "oldfam" not in toggles, "stale cooldown should be pruned"
assert "freshfam" in toggles
assert "nowfam" in toggles
# ============================================================================
# Dedup survives restart
# ============================================================================
def test_dedup_lru_rebuilt_on_restart(db_path):
cfg = _build_config()
factory, _ = _mk_channel_factory()
d1 = Dispatcher(cfg, factory)
now = _time.time()
for i in range(5):
key = ("firms", f"event_{i}")
d1._dedup_lru[key] = True
d1._persist_dedup(key, now + i)
d2 = Dispatcher(cfg, factory)
assert len(d2._dedup_lru) == 5
for i in range(5):
assert ("firms", f"event_{i}") in d2._dedup_lru
def test_dedup_lru_rebuild_caps_at_10k(db_path):
"""If on-disk has > 10k rows, in-memory restores only the most recent 10k."""
cfg = _build_config()
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
conn = persistence_db.get_db(db_path)
base = _time.time() - 10000
rows = [("firms", f"e{i}", base + i)
for i in range(_DEDUP_LRU_MAX + 50)]
conn.executemany(
"INSERT INTO dispatcher_dedup(source, event_id, seen_at) VALUES (?,?,?)",
rows,
)
d2 = Dispatcher(cfg, factory)
assert len(d2._dedup_lru) == _DEDUP_LRU_MAX
# Most recent IDs survived (e.g. e_(_DEDUP_LRU_MAX+49)) but oldest didn't (e_0).
assert ("firms", "e0") not in d2._dedup_lru
assert ("firms", f"e{_DEDUP_LRU_MAX + 49}") in d2._dedup_lru
def test_dedup_7_day_cleanup(db_path):
cfg = _build_config()
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
now = _time.time()
# Need ancient < cutoff strictly. cutoff = now - retention; ancient =
# now - retention - 1000 leaves 1000s of headroom.
too_old = now - (_DEDUP_DB_RETENTION_S + 1000)
# Seed an ancient row directly to bypass the prune-on-insert guard.
conn = persistence_db.get_db(db_path)
conn.execute(
"INSERT INTO dispatcher_dedup(source, event_id, seen_at) VALUES (?,?,?)",
("firms", "ancient", too_old),
)
# Now an insert via the helper triggers the cleanup: cutoff = now - retention.
d._persist_dedup(("firms", "fresh"), now)
rows = conn.execute(
"SELECT event_id FROM dispatcher_dedup ORDER BY event_id"
).fetchall()
ids = [r["event_id"] for r in rows]
assert "ancient" not in ids
assert "fresh" in ids
def test_dedup_same_key_updates_seen_at(db_path):
"""Re-seeing the same (source, event_id) refreshes its seen_at without
creating a second row -- the INSERT OR REPLACE on the composite PK."""
cfg = _build_config()
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
t1 = _time.time()
t2 = t1 + 50
d._persist_dedup(("firms", "abc"), t1)
d._persist_dedup(("firms", "abc"), t2)
conn = persistence_db.get_db(db_path)
rows = conn.execute("SELECT seen_at FROM dispatcher_dedup WHERE event_id='abc'").fetchall()
assert len(rows) == 1
assert rows[0]["seen_at"] == pytest.approx(t2)
# ============================================================================
# End-to-end: counters via real _dispatch_toggles
# ============================================================================
def test_real_dispatch_arms_cold_start_anchor(db_path):
"""First event to reach the toggle anchors the grace window and the
anchor lands in dispatcher_state."""
cfg = _build_config(cold_start_grace=60)
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
ev = _fire_event(event_id="fire_anchor_001", timestamp=_time.time())
asyncio.run(d._dispatch_toggles(ev))
assert d._first_event_at is not None
# And in SQLite.
conn = persistence_db.get_db(db_path)
row = conn.execute("SELECT cold_start_anchor FROM dispatcher_state").fetchone()
assert row["cold_start_anchor"] is not None
def test_real_dispatch_stale_drop_persists(db_path):
"""A stale event increments stale_dropped on disk."""
cfg = _build_config(cold_start_grace=0, fire_freshness=60)
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
ev = _fire_event(event_id="fire_stale_001",
timestamp=_time.time() - 600) # 10 min old vs 60s window
asyncio.run(d._dispatch_toggles(ev))
assert d._stale_dropped == 1
conn = persistence_db.get_db(db_path)
row = conn.execute("SELECT stale_dropped FROM dispatcher_state").fetchone()
assert row["stale_dropped"] == 1
def test_real_dispatch_dedup_writes_through(db_path):
"""A successful first-sighting writes a dedup row; a second sighting
drops at dedup AND refreshes seen_at."""
cfg = _build_config(cold_start_grace=0, fire_freshness=86400, fire_cooldown=0)
factory, _ = _mk_channel_factory()
d = Dispatcher(cfg, factory)
ev1 = _fire_event(event_id="fire_dup_001", timestamp=_time.time())
asyncio.run(d._dispatch_toggles(ev1))
conn = persistence_db.get_db(db_path)
rows = conn.execute("SELECT source, event_id FROM dispatcher_dedup").fetchall()
assert ("fires", "fire_dup_001") in [(r["source"], r["event_id"]) for r in rows]
# second sighting
ev2 = _fire_event(event_id="fire_dup_001", timestamp=_time.time())
asyncio.run(d._dispatch_toggles(ev2))
assert d._dedup_dropped == 1
row = conn.execute("SELECT dedup_dropped FROM dispatcher_state").fetchone()
assert row["dedup_dropped"] == 1
# ============================================================================
# Synthetic storm -> restart -> replay (commit spec step 5)
# ============================================================================
def test_synthetic_storm_restart_replay(db_path):
"""Phase 1: 50 distinct events. Phase 2: new Dispatcher, same DB. Phase 3:
5 more events including one duplicate from phase 1. Assertions:
- Phase 3 duplicate is dropped by dedup
- Counters in phase 2/3 inherit phase 1 values (not reset)
- Dedup LRU + cooldown carries across
"""
cfg = _build_config(cold_start_grace=0, fire_freshness=86400, fire_cooldown=0)
factory_a, _ = _mk_channel_factory()
d1 = Dispatcher(cfg, factory_a)
# ---- Phase 1: 50 distinct events ----
base_ts = _time.time()
for i in range(50):
ev = _fire_event(event_id=f"storm_{i}", timestamp=base_ts)
asyncio.run(d1._dispatch_toggles(ev))
p1_dedup_rows = persistence_db.get_db(db_path).execute(
"SELECT COUNT(*) FROM dispatcher_dedup"
).fetchone()[0]
p1_dedup_dropped = d1._dedup_dropped
p1_stale_dropped = d1._stale_dropped
assert p1_dedup_rows == 50
# All 50 were first-sightings -> no dedup_dropped increments.
assert p1_dedup_dropped == 0
# ---- Phase 2: simulated restart, fresh Dispatcher, same DB ----
factory_b, _ = _mk_channel_factory()
d2 = Dispatcher(cfg, factory_b)
# Carry-over from phase 1:
assert d2._dedup_dropped == p1_dedup_dropped
assert d2._stale_dropped == p1_stale_dropped
assert len(d2._dedup_lru) == 50
# ---- Phase 3: 5 new events incl one phase-1 duplicate ----
for i in range(50, 54):
asyncio.run(d2._dispatch_toggles(
_fire_event(event_id=f"storm_{i}", timestamp=base_ts)
))
# The duplicate:
dup_ev = _fire_event(event_id="storm_3", timestamp=base_ts)
asyncio.run(d2._dispatch_toggles(dup_ev))
# Now: dedup_dropped must have incremented by exactly 1 (the duplicate).
assert d2._dedup_dropped == p1_dedup_dropped + 1, (
f"expected +1 dedup drop, got {d2._dedup_dropped - p1_dedup_dropped}"
)
# 50 phase-1 + 4 phase-3-new = 54 distinct rows on disk.
final_rows = persistence_db.get_db(db_path).execute(
"SELECT COUNT(*) FROM dispatcher_dedup"
).fetchone()[0]
assert final_rows == 54
# ============================================================================
# Graceful degrade: dispatcher works even when persistence is broken
# ============================================================================
def test_dispatcher_construct_when_db_unavailable(monkeypatch, tmp_path):
"""If get_db() raises on construct, the dispatcher must still init
with fresh in-memory state (no crash, no abort)."""
cfg = _build_config()
factory, _ = _mk_channel_factory()
def _broken_get_db(*a, **kw):
raise RuntimeError("persistence layer down")
monkeypatch.setattr(
"meshai.notifications.pipeline.dispatcher.__name__", "<patched>"
)
# Patch the import target: persistence.get_db.
import meshai.persistence as p
monkeypatch.setattr(p, "get_db", _broken_get_db)
d = Dispatcher(cfg, factory) # must not raise
assert d._stale_dropped == 0
assert d._first_event_at is None

View file

@ -24,6 +24,23 @@ class RecChannel:
def _dispatch(cfg, event):
rec = []
# v0.6-2: wipe dedup state between calls so each _dispatch is an
# independent "what happens if this event arrives now?" check.
# The pre-v0.6-2 in-memory dedup naturally reset per Dispatcher
# instance; the new persisted dedup carries across instances unless
# we clear it here.
try:
from meshai.persistence import get_db
conn = get_db()
conn.execute("DELETE FROM dispatcher_dedup")
conn.execute("DELETE FROM dispatcher_cooldowns")
conn.execute(
"UPDATE dispatcher_state SET cold_start_anchor=NULL, "
"stale_dropped=0, cooldown_dropped=0, dedup_dropped=0, "
"cold_start_dropped=0 WHERE id=1"
)
except Exception:
pass
d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None)
asyncio.run(d.dispatch(event))
return rec