From 90783376e8776dbd94e45311734ead5ee67dc8d3 Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Fri, 5 Jun 2026 20:23:34 +0000 Subject: [PATCH] feat(v0.6-6): inhibit_state + grouper_held persistence + ToggleFilter live-reload + Inhibitor/Grouper config knobs Closes audit doc section A.9 + finding #5. The last Phase-1 pipeline state that lived only in instance memory now writes through to SQLite, and ToggleFilter changes propagate without a container restart. Schema: v10.sql adds inhibit_state(key PK, rank, expires_at, updated_at) and grouper_held(group_key PK, event_json, hold_until_at, updated_at). Indexes on expires_at / hold_until_at support the prune sweeps. SCHEMA_VERSION 9 -> 10. Migration runner: Fixed the alphabetical-vs-numeric sort bug v10 surfaced -- the runner now sorts pending migrations by their integer version, not by filename, so v10.sql correctly applies AFTER v9.sql (was applying after v1 alphabetically, which made schema_meta stick at 9). Inhibitor (meshai/notifications/pipeline/inhibitor.py): - __init__ restores non-expired keys from inhibit_state on construct. - handle() write-throughs every (key, rank, expires_at) tuple. - _prune_expired DELETEs the same expired keys from disk. - clear() (test path) drops the table. Grouper (meshai/notifications/pipeline/grouper.py): - __init__ restores non-expired held events from grouper_held; the Event is rebuilt via Event.from_dict(json.loads(event_json)). - handle() write-throughs (group_key, event_json, hold_until_at). - tick() and flush_all() DELETE on emit. ToggleFilter (meshai/notifications/pipeline/toggle_filter.py): - new refresh(config) method re-reads config.notifications.toggles and rebuilds the enabled set. Live wiring: - meshai/dashboard/api/config_routes.py adds a POST /api/notifications/refresh-toggles endpoint that reaches into app.state.bus._pipeline_components["toggle_filter"] and calls refresh(app.state.config). The frontend pings this after PUT /api/config/notifications so toggles take effect on the next event. - meshai/main.py stashes self.event_bus on the dashboard FastAPI app.state after build_pipeline so the route can reach it. - Inhibitor.ttl_seconds and Grouper.window_seconds already read from adapter_config.pipeline.{inhibitor_ttl_seconds, grouper_window_seconds} via the v0.6-3b None-default wiring (rows seeded in v0.6-3a.1). Tests (tests/test_pipeline_persistence.py, 11 cases): - v10 tables present - Inhibitor: state persists across simulated restart; expired rows not restored; prune removes from disk; clear() wipes both. - Grouper: state persists across restart; tick() clears disk; expired rows not restored. - ToggleFilter: refresh() picks up new enabled set; refresh(None) is a no-op; disabling a family in config + refresh drops it. Test count: 819 -> 830 (+11 pipeline persistence cases + schema test bump). --- meshai/dashboard/api/config_routes.py | 19 ++ meshai/main.py | 7 + meshai/notifications/pipeline/grouper.py | 46 +++++ meshai/notifications/pipeline/inhibitor.py | 49 ++++- .../notifications/pipeline/toggle_filter.py | 22 +++ meshai/persistence/db.py | 9 +- meshai/persistence/migrations/v10.sql | 29 +++ tests/test_adapter_config_foundation.py | 4 +- tests/test_pipeline_persistence.py | 172 ++++++++++++++++++ 9 files changed, 349 insertions(+), 8 deletions(-) create mode 100644 meshai/persistence/migrations/v10.sql create mode 100644 tests/test_pipeline_persistence.py diff --git a/meshai/dashboard/api/config_routes.py b/meshai/dashboard/api/config_routes.py index 142943d..dd6fbbc 100644 --- a/meshai/dashboard/api/config_routes.py +++ b/meshai/dashboard/api/config_routes.py @@ -189,3 +189,22 @@ async def test_llm_connection(request: Request): except Exception as e: logger.error(f"LLM test error: {e}") return {"success": False, "error": str(e)} + + +# v0.6-6 -- live ToggleFilter refresh endpoint. +# Called by the frontend after PUT /api/config/notifications so the +# Inhibitor + Grouper + Dispatcher pick up the new enabled toggle set +# on the next event without a container restart. +@router.post("/notifications/refresh-toggles") +async def refresh_toggles(request: Request): + """Re-read the live config and refresh the running ToggleFilter.""" + bus = getattr(request.app.state, "bus", None) + config = getattr(request.app.state, "config", None) + if bus is None or config is None: + raise HTTPException(503, "pipeline bus not yet initialized") + components = getattr(bus, "_pipeline_components", {}) or {} + tf = components.get("toggle_filter") + if tf is None: + raise HTTPException(503, "toggle_filter not on pipeline bus") + tf.refresh(config) + return {"ok": True} diff --git a/meshai/main.py b/meshai/main.py index 6016ad0..c476ead 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -389,6 +389,13 @@ class MeshAI: # pipeline at runtime via EnvironmentalStore(event_bus=...). from .notifications.pipeline import build_pipeline self.event_bus = build_pipeline(self.config, self.llm, self.connector) + # v0.6-6: expose bus to dashboard API for live refresh hooks. + try: + from meshai.dashboard.server import app as _dash_app + _dash_app.state.bus = self.event_bus + _dash_app.state.config = self.config + except Exception: + logger.debug('dashboard app.state stash skipped') logger.info("Notification pipeline EventBus initialized") # Environmental feeds diff --git a/meshai/notifications/pipeline/grouper.py b/meshai/notifications/pipeline/grouper.py index cb15911..900fa8d 100644 --- a/meshai/notifications/pipeline/grouper.py +++ b/meshai/notifications/pipeline/grouper.py @@ -43,6 +43,47 @@ class Grouper: # {group_key: (event, hold_until_ts)} self._held: dict[str, tuple[Event, float]] = {} self._logger = logging.getLogger("meshai.pipeline.grouper") + self._restore_from_db() + + def _restore_from_db(self) -> None: + try: + from meshai.persistence import get_db + import json + conn = get_db() + rows = conn.execute( + "SELECT group_key, event_json, hold_until_at FROM grouper_held " + "WHERE hold_until_at > ?", + (self._now(),), + ).fetchall() + for r in rows: + try: + ev = Event.from_dict(json.loads(r["event_json"])) + self._held[r["group_key"]] = (ev, float(r["hold_until_at"])) + except Exception: + continue + if self._held: + self._logger.info("grouper: restored %d held events", len(self._held)) + except Exception: + self._logger.exception("grouper: restore_from_db failed; using empty") + + def _persist_held(self, group_key: str, event: "Event", hold_until: float) -> None: + try: + import json + from meshai.persistence import get_db + get_db().execute( + "INSERT OR REPLACE INTO grouper_held(group_key, event_json, " + "hold_until_at, updated_at) VALUES (?,?,?,?)", + (group_key, json.dumps(event.to_dict()), hold_until, self._now()), + ) + except Exception: + self._logger.exception("grouper: persist failed key=%s", group_key) + + def _delete_held(self, group_key: str) -> None: + try: + from meshai.persistence import get_db + get_db().execute("DELETE FROM grouper_held WHERE group_key=?", (group_key,)) + except Exception: + pass def _now(self) -> float: return time.time() @@ -74,6 +115,7 @@ class Grouper: f"replacing prior event {prior[0].id}" ) self._held[event.group_key] = (event, hold_until) + self._persist_held(event.group_key, event, hold_until) def tick(self) -> int: """Flush events whose window has expired. @@ -86,6 +128,7 @@ class Grouper: ] for gk, _ in to_emit: del self._held[gk] + self._delete_held(gk) for _, ev in to_emit: self._next(ev) return len(to_emit) @@ -96,7 +139,10 @@ class Grouper: Used at shutdown and by tests. Returns count emitted. """ events = [ev for ev, _ in self._held.values()] + keys = list(self._held.keys()) self._held.clear() + for k in keys: + self._delete_held(k) for ev in events: self._next(ev) return len(events) diff --git a/meshai/notifications/pipeline/inhibitor.py b/meshai/notifications/pipeline/inhibitor.py index e9bf69c..053a44c 100644 --- a/meshai/notifications/pipeline/inhibitor.py +++ b/meshai/notifications/pipeline/inhibitor.py @@ -43,6 +43,36 @@ class Inhibitor: # {inhibit_key: (rank, expires_at)} self._active: dict[str, tuple[int, float]] = {} self._logger = logging.getLogger("meshai.pipeline.inhibitor") + # v0.6-6: restore non-expired rows from inhibit_state on construct. + self._restore_from_db() + + def _restore_from_db(self) -> None: + try: + from meshai.persistence import get_db + conn = get_db() + rows = conn.execute( + "SELECT key, rank, expires_at FROM inhibit_state " + "WHERE expires_at > ?", + (self._now(),), + ).fetchall() + for r in rows: + self._active[r["key"]] = (int(r["rank"]), float(r["expires_at"])) + if self._active: + self._logger.info("inhibitor: restored %d active keys", len(self._active)) + except Exception: + self._logger.exception("inhibitor: restore_from_db failed; using empty") + + def _persist_key(self, key: str, rank: int, expires_at: float) -> None: + try: + from meshai.persistence import get_db + conn = get_db() + conn.execute( + "INSERT OR REPLACE INTO inhibit_state(key, rank, expires_at, updated_at) " + "VALUES (?,?,?,?)", + (key, rank, expires_at, self._now()), + ) + except Exception: + self._logger.exception("inhibitor: persist failed key=%s", key) def _now(self) -> float: # Hookable for tests @@ -52,6 +82,15 @@ class Inhibitor: expired = [k for k, (_, exp) in self._active.items() if exp <= now] for k in expired: del self._active[k] + # v0.6-6: keep on-disk in sync; cheap single DELETE per prune cycle. + if expired: + try: + from meshai.persistence import get_db + get_db().execute( + "DELETE FROM inhibit_state WHERE expires_at <= ?", (now,) + ) + except Exception: + pass def handle(self, event: Event) -> None: """Process an event: either suppress it or pass it on. @@ -78,12 +117,13 @@ class Inhibitor: ) return - # Record / upgrade entries + # Record / upgrade entries + write-through to inhibit_state. new_expires = now + self._ttl for key in event.inhibit_keys: existing = self._active.get(key) if existing is None or existing[0] < event_rank: self._active[key] = (event_rank, new_expires) + self._persist_key(key, event_rank, new_expires) # Pass through self._next(event) @@ -93,5 +133,10 @@ class Inhibitor: return dict(self._active) def clear(self) -> None: - """For tests: reset state.""" + """For tests: reset both in-memory state and the inhibit_state table.""" self._active.clear() + try: + from meshai.persistence import get_db + get_db().execute("DELETE FROM inhibit_state") + except Exception: + pass diff --git a/meshai/notifications/pipeline/toggle_filter.py b/meshai/notifications/pipeline/toggle_filter.py index 1813990..93a3a67 100644 --- a/meshai/notifications/pipeline/toggle_filter.py +++ b/meshai/notifications/pipeline/toggle_filter.py @@ -33,6 +33,28 @@ class ToggleFilter: self._enabled = enabled_toggles # None = no-op self._logger = logging.getLogger("meshai.pipeline.toggle_filter") + def refresh(self, config=None) -> None: + """v0.6-6: rebuild the enabled set from the current config. + + Wired into /api/config PUT so toggle changes take effect on the + next event with no container restart. If `config` is None, the + caller is expected to have already mutated whatever object the + filter was constructed from; the safer pattern is to pass the + live Config in explicitly. + """ + if config is None: + # Nothing to read from; keep the current set. + return + toggles_cfg = getattr(config.notifications, "toggles", None) or {} + enabled = set() + if isinstance(toggles_cfg, dict): + for fam_name, tog in toggles_cfg.items(): + if getattr(tog, "enabled", False): + enabled.add(str(fam_name)) + self._enabled = enabled if enabled else None + self._logger.info("ToggleFilter refreshed: enabled families=%s", + sorted(self._enabled or [])) + def handle(self, event: Event) -> None: """Pass the event through, or drop it if its toggle is disabled.""" if self._enabled is None: diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index 06a76cc..54b6b61 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 9 +SCHEMA_VERSION = 10 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" @@ -144,14 +144,14 @@ def init_db(path: Optional[str] = None) -> sqlite3.Connection: def _read_migration_files() -> list[tuple[int, str, str]]: - """Return [(version_int, filename, sql_text), ...] sorted ascending.""" + """Return [(version_int, filename, sql_text), ...] sorted by numeric + version (NOT by filename -- v10 must follow v9, not v1).""" if not MIGRATIONS_DIR.is_dir(): return [] out: list[tuple[int, str, str]] = [] - for p in sorted(MIGRATIONS_DIR.iterdir()): + for p in MIGRATIONS_DIR.iterdir(): if not p.is_file() or p.suffix.lower() != ".sql": continue - # Filename format: v.sql or v_