meshai/meshai/persistence/db.py
Matt Johnson (via Claude) 3a410d5087 feat(v0.6-phase3): reminder system + schema split + NWS dedup relaxation
Third broadcast type Active: clock-driven re-broadcasts of still-live
events at human-scale cadences. WFIGS fires 8h, itd_511 work zones daily
8 AM Mountain, SWPC G-storms 8h. NWS is NOT a clock reminder -- instead
the per-CAP-id dedup is relaxed to allow re-broadcast if >3h since last.
Schema split first_broadcast_at + last_broadcast_at on all reminder-
eligible tables. Wire prefix logic: New (first sight), Update (WFIGS
material change), Active (clock reminder). All cadences, channels, day-
of-week patterns, timezones, and termination conditions GUI-editable
from day one via the existing adapter_config editor. Termination:
tombstone OR containment_100 OR end_date_passed (no max-count). Quiet
hours not respected -- ripped out in Phase 2.

Schema (v11.sql):
  - ALTER TABLE fires|nws_alerts|traffic_events|quake_events|swpc_events|
    gauge_readings ADD COLUMN first_broadcast_at REAL
  - Backfill: UPDATE ... SET first_broadcast_at = last_broadcast_at
    WHERE last_broadcast_at IS NOT NULL
  - ALTER TABLE adapter_meta ADD COLUMN reminder_enabled INTEGER NOT NULL
    DEFAULT 0
  - UPDATE adapter_meta SET reminder_enabled=1 WHERE adapter IN
    ('wfigs', 'swpc') -- itd_511_work_zone is a new meta row seeded
    with reminder_enabled=1
  - SCHEMA_VERSION 10 -> 11

Handler commit-callbacks (wfigs/nws/quake/swpc/incident):
  - UPDATE ... SET last_broadcast_at=?, first_broadcast_at=COALESCE(
    first_broadcast_at, ?) -- first_broadcast_at stamped once, never
    overwritten

NWS handler (meshai/central/nws_handler.py):
  - _render() gains a prefix kwarg
  - After-first-broadcast branch: when (now - last_broadcast_at) >=
    adapter_config.nws.duplicate_allowed_after_seconds (default 10800
    = 3h), allow the re-broadcast with prefix=Active. Under the
    window, suppress as before. The commit callback continues to
    update last_broadcast_at.

ReminderScheduler (meshai/notifications/reminders/__init__.py):
  - Async loop, ticks every 60s
  - Each tick: SELECT adapter FROM adapter_meta WHERE reminder_enabled=1
  - Per adapter, load reminders_<adapter> config from adapter_config
    (cadence_kind, cadence_value, channels, terminate_when, dow_mask,
    timezone)
  - Interval cadence: rows where last_broadcast_at <= now - cadence_value
  - Clock cadence: localizes now to configured tz, finds slots that
    just passed in the last tick window, gated by dow_mask
  - Termination conditions checked per adapter:
      wfigs.containment_100      -> current_contained_pct >= 100
      wfigs.last_event_age_24h   -> last_event_at older than 24h
      swpc.end_date_passed       -> payload_json end_time in past
      itd_511_work_zone.end_date_passed -> traffic_events.end_at in past
  - Active: prefix on every emitted wire; dispatcher.dispatch_scheduled_
    broadcast() honors cold-start grace, bypasses toggle path
  - On success, last_broadcast_at = now; first_broadcast_at preserved

Launched from notifications/pipeline/__init__.py:start_pipeline()
alongside BandConditionsScheduler.

adapter_config registry (+15 new keys, 43 -> 58):
  - reminders_wfigs.cadence_kind/cadence_value/channels/terminate_when
  - reminders_swpc.cadence_kind/cadence_value/channels/terminate_when
  - reminders_itd_511_work_zone.cadence_kind/cadence_value/channels/
    dow_mask/timezone/terminate_when
  - nws.duplicate_allowed_after_seconds

adapter_meta (+4 rows, 15 -> 19):
  - reminders_wfigs, reminders_swpc, reminders_itd_511_work_zone
    (pseudo-adapters carrying the reminder config)
  - itd_511_work_zone (reminder target row; reminder_enabled=1)
  - reminder_enabled flag added to wfigs/swpc (existing rows updated by
    v11.sql) and to itd_511_work_zone seed.

Tests (tests/test_reminders.py, 10 cases):
  - wfigs reminder fires past 8h cadence, stamps last_broadcast_at,
    preserves first_broadcast_at
  - reminder skipped within cadence
  - reminder skipped when containment_100, last_event_age_24h
  - swpc reminder fires (interval)
  - work_zone clock reminder fires at 08:00 Mountain on enabled DOW
  - work_zone reminder skipped when end_date_passed
  - work_zone reminder skipped outside slot window
  - reminder_enabled=0 suppresses all reminders for that adapter

tests/test_nws_dedup_relaxation.py (5 cases):
  - First sighting renders without Active: prefix
  - Re-broadcast within 3h suppressed
  - Re-broadcast after 3h allowed with Active: prefix
  - adapter_config.nws.duplicate_allowed_after_seconds override takes
    effect (1h window verified)
  - First sighting stamps first_broadcast_at=committed_at,
    last_broadcast_at=committed_at; 4h later broadcast stamps
    last_broadcast_at only, first_broadcast_at preserved

Test count: 829 -> 844 (+15 new, 0 regressions). Foundation tests
updated for new counts (REGISTRY=58, ADAPTER_META=19, schema=v11).
2026-06-05 21:11:32 +00:00

212 lines
7.9 KiB
Python

"""SQLite persistence connection management + migration runner.
Single-writer SQLite pattern with WAL journal mode for reader concurrency.
One connection per thread (threading.local) -- callers should not share
connections across threads.
Path resolution:
1. MESHAI_DB_PATH env var (explicit override)
2. DEFAULT_DB_PATH = /data/meshai.sqlite (prod container mount)
Special value ":memory:" or any path containing "memory" routes to an
in-memory SQLite for tests.
Migrations live in meshai/persistence/migrations/v*.sql. The runner
applies them in version order, recording the applied version in
schema_meta. Idempotent re-run is a no-op.
"""
from __future__ import annotations
import logging
import os
import sqlite3
import threading
from pathlib import Path
from typing import Iterable, Optional
logger = logging.getLogger(__name__)
DEFAULT_DB_PATH = "/data/meshai.sqlite"
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
SCHEMA_VERSION = 11
SCHEMA_META_TABLE = "schema_meta"
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
# Per-thread connection pool. Each thread that calls get_db() gets its
# own sqlite3.Connection cached on threading.local. Tests can clear
# via close_thread_connection() between cases.
_local = threading.local()
# Module-level lock guards init_db() so concurrent first-callers don't
# race on migration application.
_init_lock = threading.Lock()
# Cache of initialised database paths in this process so init_db() is
# idempotent without re-reading migration files on every call.
_initialised: set[str] = set()
def MESHAI_DB_PATH() -> str:
"""Resolve the active SQLite path (env var override or default)."""
return os.environ.get(MESHAI_DB_PATH_ENV) or DEFAULT_DB_PATH
def _is_memory_path(path: str) -> bool:
return path == ":memory:" or "mode=memory" in path or path.startswith("file::memory:")
def _connect(path: str) -> sqlite3.Connection:
"""Open a SQLite connection with sane defaults for this project."""
if _is_memory_path(path):
# For in-memory tests, use a shared cache so multiple connections
# in the same thread can see the same DB. Tests that want isolation
# call close_thread_connection() between cases.
uri = "file::memory:?cache=shared"
conn = sqlite3.connect(uri, uri=True, isolation_level=None,
check_same_thread=False)
else:
# Ensure parent dir exists for file-backed DBs.
Path(path).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path, isolation_level=None,
check_same_thread=False, timeout=30.0)
conn.row_factory = sqlite3.Row
# Enable foreign keys (off by default in SQLite); WAL mode for reader
# concurrency; reasonable busy timeout for the single-writer pattern.
conn.execute("PRAGMA foreign_keys = ON")
if not _is_memory_path(path):
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA synchronous = NORMAL")
conn.execute("PRAGMA busy_timeout = 30000")
return conn
def get_db(path: Optional[str] = None) -> sqlite3.Connection:
"""Return a SQLite connection for the current thread (cached).
First call initialises the database (runs pending migrations).
Subsequent calls in the same thread return the cached connection.
"""
target = path or MESHAI_DB_PATH()
cached = getattr(_local, "conn", None)
cached_path = getattr(_local, "path", None)
if cached is not None and cached_path == target:
return cached
# Different path requested or no cached conn -- (re)open.
if cached is not None:
try: cached.close()
except Exception: pass
conn = _connect(target)
_local.conn = conn
_local.path = target
if target not in _initialised:
with _init_lock:
if target not in _initialised:
_apply_migrations(conn)
_initialised.add(target)
return conn
def close_thread_connection() -> None:
"""Close + drop the cached connection for the current thread.
Tests call this between cases to ensure a clean slate. The shared-cache
in-memory database is reset on the LAST close in the process.
"""
conn = getattr(_local, "conn", None)
if conn is not None:
try: conn.close()
except Exception: pass
if hasattr(_local, "conn"): del _local.conn
if hasattr(_local, "path"): del _local.path
def init_db(path: Optional[str] = None) -> sqlite3.Connection:
"""Explicit init entry point (idempotent). Equivalent to get_db()
semantically but documents intent at startup. Returns the connection.
v0.6-3a: after migrations apply, seed adapter_config + adapter_meta
from meshai/adapter_config/defaults.py:REGISTRY. Idempotent via
INSERT OR IGNORE so a re-run is a clean no-op."""
conn = get_db(path)
try:
from meshai.adapter_config import seed_defaults, prune_orphans
seed_defaults(conn)
prune_orphans(conn)
except Exception:
logger.exception("init_db: adapter_config seed/prune failed")
try:
from meshai.persistence.curation import seed_gauge_sites, seed_town_anchors
seed_gauge_sites(conn)
seed_town_anchors(conn)
except Exception:
logger.exception("init_db: curation seed failed")
return conn
def _read_migration_files() -> list[tuple[int, str, str]]:
"""Return [(version_int, filename, sql_text), ...] sorted by numeric
version (NOT by filename -- v10 must follow v9, not v1)."""
if not MIGRATIONS_DIR.is_dir():
return []
out: list[tuple[int, str, str]] = []
for p in MIGRATIONS_DIR.iterdir():
if not p.is_file() or p.suffix.lower() != ".sql":
continue
stem = p.stem
if not stem.startswith("v"):
continue
n_str = stem[1:].split("_", 1)[0]
try: n = int(n_str)
except ValueError: continue
out.append((n, p.name, p.read_text()))
out.sort(key=lambda x: x[0]) # v0.6-6: numeric sort, not alphabetical
return out
def _current_version(conn: sqlite3.Connection) -> int:
"""Return the highest applied migration version, or 0 if none."""
# Does the schema_meta table exist?
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(SCHEMA_META_TABLE,),
).fetchone()
if row is None:
return 0
row = conn.execute(
f"SELECT value FROM {SCHEMA_META_TABLE} WHERE key='version'",
).fetchone()
if row is None:
return 0
try: return int(row["value"])
except (TypeError, ValueError): return 0
def _apply_migrations(conn: sqlite3.Connection) -> None:
"""Apply any pending migrations in version order. Idempotent."""
migrations = _read_migration_files()
if not migrations:
logger.warning("persistence: no migration files found in %s", MIGRATIONS_DIR)
return
current = _current_version(conn)
pending = [(n, name, sql) for n, name, sql in migrations if n > current]
if not pending:
logger.debug("persistence: schema up to date at v%d", current)
return
for version, filename, sql in pending:
logger.info("persistence: applying migration %s (v%d)", filename, version)
# sqlite3.Connection.executescript() ISSUES ITS OWN COMMIT before
# starting the script and another at the end, so wrapping it in an
# explicit BEGIN/COMMIT is both unnecessary and broken (the
# ROLLBACK in the except clause would fire against an already-
# committed-or-empty transaction). Migration atomicity is bounded
# by executescript's own transaction.
try:
conn.executescript(sql)
conn.execute(
f"INSERT OR REPLACE INTO {SCHEMA_META_TABLE}(key, value) VALUES('version', ?)",
(str(version),),
)
except Exception:
logger.exception("persistence: migration %s failed", filename)
raise
logger.info("persistence: schema now at v%d", pending[-1][0])