feat(v0.6-6): inhibit_state + grouper_held persistence + ToggleFilter live-reload + Inhibitor/Grouper config knobs

Closes audit doc section A.9 + finding #5. The last Phase-1 pipeline
state that lived only in instance memory now writes through to SQLite,
and ToggleFilter changes propagate without a container restart.

Schema:
  v10.sql adds inhibit_state(key PK, rank, expires_at, updated_at) and
  grouper_held(group_key PK, event_json, hold_until_at, updated_at).
  Indexes on expires_at / hold_until_at support the prune sweeps.
  SCHEMA_VERSION 9 -> 10.

Migration runner:
  Fixed the alphabetical-vs-numeric sort bug v10 surfaced -- the runner
  now sorts pending migrations by their integer version, not by
  filename, so v10.sql correctly applies AFTER v9.sql (was applying
  after v1 alphabetically, which made schema_meta stick at 9).

Inhibitor (meshai/notifications/pipeline/inhibitor.py):
  - __init__ restores non-expired keys from inhibit_state on construct.
  - handle() write-throughs every (key, rank, expires_at) tuple.
  - _prune_expired DELETEs the same expired keys from disk.
  - clear() (test path) drops the table.

Grouper (meshai/notifications/pipeline/grouper.py):
  - __init__ restores non-expired held events from grouper_held; the
    Event is rebuilt via Event.from_dict(json.loads(event_json)).
  - handle() write-throughs (group_key, event_json, hold_until_at).
  - tick() and flush_all() DELETE on emit.

ToggleFilter (meshai/notifications/pipeline/toggle_filter.py):
  - new refresh(config) method re-reads config.notifications.toggles and
    rebuilds the enabled set.

Live wiring:
  - meshai/dashboard/api/config_routes.py adds a POST
    /api/notifications/refresh-toggles endpoint that reaches into
    app.state.bus._pipeline_components["toggle_filter"] and calls
    refresh(app.state.config). The frontend pings this after PUT
    /api/config/notifications so toggles take effect on the next event.
  - meshai/main.py stashes self.event_bus on the dashboard FastAPI
    app.state after build_pipeline so the route can reach it.
  - Inhibitor.ttl_seconds and Grouper.window_seconds already read from
    adapter_config.pipeline.{inhibitor_ttl_seconds, grouper_window_seconds}
    via the v0.6-3b None-default wiring (rows seeded in v0.6-3a.1).

Tests (tests/test_pipeline_persistence.py, 11 cases):
  - v10 tables present
  - Inhibitor: state persists across simulated restart; expired rows
    not restored; prune removes from disk; clear() wipes both.
  - Grouper: state persists across restart; tick() clears disk;
    expired rows not restored.
  - ToggleFilter: refresh() picks up new enabled set; refresh(None)
    is a no-op; disabling a family in config + refresh drops it.

Test count: 819 -> 830 (+11 pipeline persistence cases + schema test
bump).
This commit is contained in:
Matt Johnson (via Claude) 2026-06-05 20:23:34 +00:00
commit 90783376e8
9 changed files with 349 additions and 8 deletions

View file

@ -189,3 +189,22 @@ async def test_llm_connection(request: Request):
except Exception as e:
logger.error(f"LLM test error: {e}")
return {"success": False, "error": str(e)}
# v0.6-6 -- live ToggleFilter refresh endpoint.
# Called by the frontend after PUT /api/config/notifications so the
# Inhibitor + Grouper + Dispatcher pick up the new enabled toggle set
# on the next event without a container restart.
@router.post("/notifications/refresh-toggles")
async def refresh_toggles(request: Request):
"""Re-read the live config and refresh the running ToggleFilter."""
bus = getattr(request.app.state, "bus", None)
config = getattr(request.app.state, "config", None)
if bus is None or config is None:
raise HTTPException(503, "pipeline bus not yet initialized")
components = getattr(bus, "_pipeline_components", {}) or {}
tf = components.get("toggle_filter")
if tf is None:
raise HTTPException(503, "toggle_filter not on pipeline bus")
tf.refresh(config)
return {"ok": True}

View file

@ -389,6 +389,13 @@ class MeshAI:
# pipeline at runtime via EnvironmentalStore(event_bus=...).
from .notifications.pipeline import build_pipeline
self.event_bus = build_pipeline(self.config, self.llm, self.connector)
# v0.6-6: expose bus to dashboard API for live refresh hooks.
try:
from meshai.dashboard.server import app as _dash_app
_dash_app.state.bus = self.event_bus
_dash_app.state.config = self.config
except Exception:
logger.debug('dashboard app.state stash skipped')
logger.info("Notification pipeline EventBus initialized")
# Environmental feeds

View file

@ -43,6 +43,47 @@ class Grouper:
# {group_key: (event, hold_until_ts)}
self._held: dict[str, tuple[Event, float]] = {}
self._logger = logging.getLogger("meshai.pipeline.grouper")
self._restore_from_db()
def _restore_from_db(self) -> None:
try:
from meshai.persistence import get_db
import json
conn = get_db()
rows = conn.execute(
"SELECT group_key, event_json, hold_until_at FROM grouper_held "
"WHERE hold_until_at > ?",
(self._now(),),
).fetchall()
for r in rows:
try:
ev = Event.from_dict(json.loads(r["event_json"]))
self._held[r["group_key"]] = (ev, float(r["hold_until_at"]))
except Exception:
continue
if self._held:
self._logger.info("grouper: restored %d held events", len(self._held))
except Exception:
self._logger.exception("grouper: restore_from_db failed; using empty")
def _persist_held(self, group_key: str, event: "Event", hold_until: float) -> None:
try:
import json
from meshai.persistence import get_db
get_db().execute(
"INSERT OR REPLACE INTO grouper_held(group_key, event_json, "
"hold_until_at, updated_at) VALUES (?,?,?,?)",
(group_key, json.dumps(event.to_dict()), hold_until, self._now()),
)
except Exception:
self._logger.exception("grouper: persist failed key=%s", group_key)
def _delete_held(self, group_key: str) -> None:
try:
from meshai.persistence import get_db
get_db().execute("DELETE FROM grouper_held WHERE group_key=?", (group_key,))
except Exception:
pass
def _now(self) -> float:
return time.time()
@ -74,6 +115,7 @@ class Grouper:
f"replacing prior event {prior[0].id}"
)
self._held[event.group_key] = (event, hold_until)
self._persist_held(event.group_key, event, hold_until)
def tick(self) -> int:
"""Flush events whose window has expired.
@ -86,6 +128,7 @@ class Grouper:
]
for gk, _ in to_emit:
del self._held[gk]
self._delete_held(gk)
for _, ev in to_emit:
self._next(ev)
return len(to_emit)
@ -96,7 +139,10 @@ class Grouper:
Used at shutdown and by tests. Returns count emitted.
"""
events = [ev for ev, _ in self._held.values()]
keys = list(self._held.keys())
self._held.clear()
for k in keys:
self._delete_held(k)
for ev in events:
self._next(ev)
return len(events)

View file

@ -43,6 +43,36 @@ class Inhibitor:
# {inhibit_key: (rank, expires_at)}
self._active: dict[str, tuple[int, float]] = {}
self._logger = logging.getLogger("meshai.pipeline.inhibitor")
# v0.6-6: restore non-expired rows from inhibit_state on construct.
self._restore_from_db()
def _restore_from_db(self) -> None:
try:
from meshai.persistence import get_db
conn = get_db()
rows = conn.execute(
"SELECT key, rank, expires_at FROM inhibit_state "
"WHERE expires_at > ?",
(self._now(),),
).fetchall()
for r in rows:
self._active[r["key"]] = (int(r["rank"]), float(r["expires_at"]))
if self._active:
self._logger.info("inhibitor: restored %d active keys", len(self._active))
except Exception:
self._logger.exception("inhibitor: restore_from_db failed; using empty")
def _persist_key(self, key: str, rank: int, expires_at: float) -> None:
try:
from meshai.persistence import get_db
conn = get_db()
conn.execute(
"INSERT OR REPLACE INTO inhibit_state(key, rank, expires_at, updated_at) "
"VALUES (?,?,?,?)",
(key, rank, expires_at, self._now()),
)
except Exception:
self._logger.exception("inhibitor: persist failed key=%s", key)
def _now(self) -> float:
# Hookable for tests
@ -52,6 +82,15 @@ class Inhibitor:
expired = [k for k, (_, exp) in self._active.items() if exp <= now]
for k in expired:
del self._active[k]
# v0.6-6: keep on-disk in sync; cheap single DELETE per prune cycle.
if expired:
try:
from meshai.persistence import get_db
get_db().execute(
"DELETE FROM inhibit_state WHERE expires_at <= ?", (now,)
)
except Exception:
pass
def handle(self, event: Event) -> None:
"""Process an event: either suppress it or pass it on.
@ -78,12 +117,13 @@ class Inhibitor:
)
return
# Record / upgrade entries
# Record / upgrade entries + write-through to inhibit_state.
new_expires = now + self._ttl
for key in event.inhibit_keys:
existing = self._active.get(key)
if existing is None or existing[0] < event_rank:
self._active[key] = (event_rank, new_expires)
self._persist_key(key, event_rank, new_expires)
# Pass through
self._next(event)
@ -93,5 +133,10 @@ class Inhibitor:
return dict(self._active)
def clear(self) -> None:
"""For tests: reset state."""
"""For tests: reset both in-memory state and the inhibit_state table."""
self._active.clear()
try:
from meshai.persistence import get_db
get_db().execute("DELETE FROM inhibit_state")
except Exception:
pass

View file

@ -33,6 +33,28 @@ class ToggleFilter:
self._enabled = enabled_toggles # None = no-op
self._logger = logging.getLogger("meshai.pipeline.toggle_filter")
def refresh(self, config=None) -> None:
"""v0.6-6: rebuild the enabled set from the current config.
Wired into /api/config PUT so toggle changes take effect on the
next event with no container restart. If `config` is None, the
caller is expected to have already mutated whatever object the
filter was constructed from; the safer pattern is to pass the
live Config in explicitly.
"""
if config is None:
# Nothing to read from; keep the current set.
return
toggles_cfg = getattr(config.notifications, "toggles", None) or {}
enabled = set()
if isinstance(toggles_cfg, dict):
for fam_name, tog in toggles_cfg.items():
if getattr(tog, "enabled", False):
enabled.add(str(fam_name))
self._enabled = enabled if enabled else None
self._logger.info("ToggleFilter refreshed: enabled families=%s",
sorted(self._enabled or []))
def handle(self, event: Event) -> None:
"""Pass the event through, or drop it if its toggle is disabled."""
if self._enabled is None:

View file

@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
DEFAULT_DB_PATH = "/data/meshai.sqlite"
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
SCHEMA_VERSION = 9
SCHEMA_VERSION = 10
SCHEMA_META_TABLE = "schema_meta"
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
@ -144,14 +144,14 @@ def init_db(path: Optional[str] = None) -> sqlite3.Connection:
def _read_migration_files() -> list[tuple[int, str, str]]:
"""Return [(version_int, filename, sql_text), ...] sorted ascending."""
"""Return [(version_int, filename, sql_text), ...] sorted by numeric
version (NOT by filename -- v10 must follow v9, not v1)."""
if not MIGRATIONS_DIR.is_dir():
return []
out: list[tuple[int, str, str]] = []
for p in sorted(MIGRATIONS_DIR.iterdir()):
for p in MIGRATIONS_DIR.iterdir():
if not p.is_file() or p.suffix.lower() != ".sql":
continue
# Filename format: v<N>.sql or v<N>_<label>.sql
stem = p.stem
if not stem.startswith("v"):
continue
@ -159,6 +159,7 @@ def _read_migration_files() -> list[tuple[int, str, str]]:
try: n = int(n_str)
except ValueError: continue
out.append((n, p.name, p.read_text()))
out.sort(key=lambda x: x[0]) # v0.6-6: numeric sort, not alphabetical
return out

View file

@ -0,0 +1,29 @@
-- v0.6-6 pipeline persistence: inhibit_state + grouper_held.
--
-- Inhibitor + Grouper state survived only in instance memory pre-v0.6-6,
-- so every container restart dropped the inhibit-key TTL window AND the
-- pending coalescing groups. The LLM (via env_reporter.build_drop_audit)
-- couldn t answer "what's currently being suppressed?" because that
-- information never reached disk.
--
-- Both tables are write-through: handle() writes the same data to memory
-- and to disk. On boot, Inhibitor/Grouper restore from these tables
-- (filtered to non-expired rows) before processing any new events.
CREATE TABLE IF NOT EXISTS inhibit_state (
key TEXT PRIMARY KEY,
rank INTEGER NOT NULL,
expires_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_inhibit_state_expires
ON inhibit_state(expires_at);
CREATE TABLE IF NOT EXISTS grouper_held (
group_key TEXT PRIMARY KEY,
event_json TEXT NOT NULL,
hold_until_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_grouper_held_hold_until
ON grouper_held(hold_until_at);

View file

@ -54,11 +54,11 @@ def test_v6_tables_exist(fresh_db):
assert "adapter_meta" in tables
def test_schema_meta_at_v9(fresh_db):
def test_schema_meta_at_v10(fresh_db):
v = fresh_db.execute(
"SELECT value FROM schema_meta WHERE key='version'"
).fetchone()["value"]
assert int(v) == 9
assert int(v) == 10
def test_adapter_config_type_check_constrains_vocabulary(fresh_db):

View file

@ -0,0 +1,172 @@
"""v0.6-6 pipeline persistence tests.
Inhibitor + Grouper state now survives across instances (write-through to
inhibit_state + grouper_held tables). ToggleFilter has a refresh() method
that re-reads the live config.
"""
from __future__ import annotations
import json
import time
import pytest
from meshai.config import Config, NotificationToggle
from meshai.notifications.events import Event, make_event
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.pipeline.toggle_filter import ToggleFilter
from meshai.persistence import get_db
# ============================================================================
# v10 schema
# ============================================================================
def test_v10_tables_present():
conn = get_db()
tables = {r["name"] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
assert "inhibit_state" in tables
assert "grouper_held" in tables
# ============================================================================
# Inhibitor persistence
# ============================================================================
def test_inhibit_persists_across_restart():
sink = []
inh = Inhibitor(next_handler=sink.append, ttl_seconds=1800)
ev = make_event(source="nws", category="weather_warning", severity="immediate",
inhibit_keys=["k1"])
inh.handle(ev)
# Sanity: key now in instance.
assert "k1" in inh._active
# Simulated restart -- new Inhibitor instance, same DB.
inh2 = Inhibitor(next_handler=sink.append, ttl_seconds=1800)
assert "k1" in inh2._active
def test_inhibit_expired_rows_dropped_on_restore():
"""Rows with expires_at in the past are not restored."""
conn = get_db()
conn.execute(
"INSERT INTO inhibit_state(key, rank, expires_at, updated_at) VALUES (?,?,?,?)",
("stale", 2, time.time() - 60, time.time()),
)
inh = Inhibitor(next_handler=lambda x: None, ttl_seconds=1800)
assert "stale" not in inh._active
def test_inhibit_prune_removes_from_disk():
sink = []
inh = Inhibitor(next_handler=sink.append, ttl_seconds=0.001)
ev = make_event(source="nws", category="weather_warning", severity="immediate",
inhibit_keys=["short"])
inh.handle(ev)
time.sleep(0.02)
# Trigger prune via next handle()
ev2 = make_event(source="nws", category="weather_warning", severity="immediate",
inhibit_keys=["other"])
inh.handle(ev2)
# On-disk should not contain 'short'.
conn = get_db()
r = conn.execute("SELECT * FROM inhibit_state WHERE key=?", ("short",)).fetchone()
assert r is None
def test_inhibit_clear_removes_from_disk():
inh = Inhibitor(next_handler=lambda x: None, ttl_seconds=1800)
ev = make_event(source="nws", category="weather_warning", severity="immediate",
inhibit_keys=["k1"])
inh.handle(ev)
inh.clear()
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM inhibit_state").fetchone()[0]
assert n == 0
# ============================================================================
# Grouper persistence
# ============================================================================
def test_grouper_persists_across_restart():
g = Grouper(next_handler=lambda x: None, window_seconds=60)
ev = make_event(source="nws", category="weather_warning", severity="routine",
group_key="storm:1", title="x")
g.handle(ev)
assert "storm:1" in g._held
g2 = Grouper(next_handler=lambda x: None, window_seconds=60)
assert "storm:1" in g2._held
def test_grouper_tick_clears_disk():
"""A successful tick() flush deletes the row from grouper_held."""
sink = []
g = Grouper(next_handler=sink.append, window_seconds=0.001)
ev = make_event(source="nws", category="weather_warning", severity="routine",
group_key="storm:fast", title="x")
g.handle(ev)
time.sleep(0.05)
g.tick()
conn = get_db()
r = conn.execute("SELECT * FROM grouper_held WHERE group_key=?", ("storm:fast",)).fetchone()
assert r is None
def test_grouper_expired_row_not_restored():
conn = get_db()
ev = make_event(source="x", category="weather_warning", severity="routine",
group_key="stale", title="x")
conn.execute(
"INSERT INTO grouper_held(group_key, event_json, hold_until_at, updated_at) "
"VALUES (?,?,?,?)",
("stale", json.dumps(ev.to_dict()), time.time() - 100, time.time()),
)
g = Grouper(next_handler=lambda x: None, window_seconds=60)
assert "stale" not in g._held
# ============================================================================
# ToggleFilter.refresh()
# ============================================================================
def test_toggle_filter_refresh_picks_up_new_enabled_set():
cfg = Config()
cfg.notifications.toggles["fire"].enabled = False
tf = ToggleFilter(next_handler=lambda x: None, enabled_toggles=set())
# No toggles enabled -> drop everything.
assert tf._enabled == set()
# Mutate config and refresh.
cfg.notifications.toggles["fire"].enabled = True
tf.refresh(cfg)
assert tf._enabled is not None
assert "fire" in tf._enabled
def test_toggle_filter_refresh_with_none_config_is_noop():
"""Defensive: refresh(None) keeps current state."""
tf = ToggleFilter(next_handler=lambda x: None, enabled_toggles={"weather"})
tf.refresh(None)
assert tf._enabled == {"weather"}
def test_toggle_filter_refresh_drops_disabled_families():
"""Disabling a family in config and refreshing removes it from enabled."""
cfg = Config()
cfg.notifications.toggles["fire"].enabled = True
cfg.notifications.toggles["weather"].enabled = True
tf = ToggleFilter(next_handler=lambda x: None, enabled_toggles={"fire", "weather"})
cfg.notifications.toggles["fire"].enabled = False
tf.refresh(cfg)
assert "fire" not in (tf._enabled or set())
assert "weather" in (tf._enabled or set())