From 3a410d5087221f561a40895280638b07dcb58c9f Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Fri, 5 Jun 2026 21:11:32 +0000 Subject: [PATCH] feat(v0.6-phase3): reminder system + schema split + NWS dedup relaxation Third broadcast type Active: clock-driven re-broadcasts of still-live events at human-scale cadences. WFIGS fires 8h, itd_511 work zones daily 8 AM Mountain, SWPC G-storms 8h. NWS is NOT a clock reminder -- instead the per-CAP-id dedup is relaxed to allow re-broadcast if >3h since last. Schema split first_broadcast_at + last_broadcast_at on all reminder- eligible tables. Wire prefix logic: New (first sight), Update (WFIGS material change), Active (clock reminder). All cadences, channels, day- of-week patterns, timezones, and termination conditions GUI-editable from day one via the existing adapter_config editor. Termination: tombstone OR containment_100 OR end_date_passed (no max-count). Quiet hours not respected -- ripped out in Phase 2. Schema (v11.sql): - ALTER TABLE fires|nws_alerts|traffic_events|quake_events|swpc_events| gauge_readings ADD COLUMN first_broadcast_at REAL - Backfill: UPDATE ... SET first_broadcast_at = last_broadcast_at WHERE last_broadcast_at IS NOT NULL - ALTER TABLE adapter_meta ADD COLUMN reminder_enabled INTEGER NOT NULL DEFAULT 0 - UPDATE adapter_meta SET reminder_enabled=1 WHERE adapter IN ('wfigs', 'swpc') -- itd_511_work_zone is a new meta row seeded with reminder_enabled=1 - SCHEMA_VERSION 10 -> 11 Handler commit-callbacks (wfigs/nws/quake/swpc/incident): - UPDATE ... SET last_broadcast_at=?, first_broadcast_at=COALESCE( first_broadcast_at, ?) -- first_broadcast_at stamped once, never overwritten NWS handler (meshai/central/nws_handler.py): - _render() gains a prefix kwarg - After-first-broadcast branch: when (now - last_broadcast_at) >= adapter_config.nws.duplicate_allowed_after_seconds (default 10800 = 3h), allow the re-broadcast with prefix=Active. Under the window, suppress as before. The commit callback continues to update last_broadcast_at. ReminderScheduler (meshai/notifications/reminders/__init__.py): - Async loop, ticks every 60s - Each tick: SELECT adapter FROM adapter_meta WHERE reminder_enabled=1 - Per adapter, load reminders_ config from adapter_config (cadence_kind, cadence_value, channels, terminate_when, dow_mask, timezone) - Interval cadence: rows where last_broadcast_at <= now - cadence_value - Clock cadence: localizes now to configured tz, finds slots that just passed in the last tick window, gated by dow_mask - Termination conditions checked per adapter: wfigs.containment_100 -> current_contained_pct >= 100 wfigs.last_event_age_24h -> last_event_at older than 24h swpc.end_date_passed -> payload_json end_time in past itd_511_work_zone.end_date_passed -> traffic_events.end_at in past - Active: prefix on every emitted wire; dispatcher.dispatch_scheduled_ broadcast() honors cold-start grace, bypasses toggle path - On success, last_broadcast_at = now; first_broadcast_at preserved Launched from notifications/pipeline/__init__.py:start_pipeline() alongside BandConditionsScheduler. adapter_config registry (+15 new keys, 43 -> 58): - reminders_wfigs.cadence_kind/cadence_value/channels/terminate_when - reminders_swpc.cadence_kind/cadence_value/channels/terminate_when - reminders_itd_511_work_zone.cadence_kind/cadence_value/channels/ dow_mask/timezone/terminate_when - nws.duplicate_allowed_after_seconds adapter_meta (+4 rows, 15 -> 19): - reminders_wfigs, reminders_swpc, reminders_itd_511_work_zone (pseudo-adapters carrying the reminder config) - itd_511_work_zone (reminder target row; reminder_enabled=1) - reminder_enabled flag added to wfigs/swpc (existing rows updated by v11.sql) and to itd_511_work_zone seed. Tests (tests/test_reminders.py, 10 cases): - wfigs reminder fires past 8h cadence, stamps last_broadcast_at, preserves first_broadcast_at - reminder skipped within cadence - reminder skipped when containment_100, last_event_age_24h - swpc reminder fires (interval) - work_zone clock reminder fires at 08:00 Mountain on enabled DOW - work_zone reminder skipped when end_date_passed - work_zone reminder skipped outside slot window - reminder_enabled=0 suppresses all reminders for that adapter tests/test_nws_dedup_relaxation.py (5 cases): - First sighting renders without Active: prefix - Re-broadcast within 3h suppressed - Re-broadcast after 3h allowed with Active: prefix - adapter_config.nws.duplicate_allowed_after_seconds override takes effect (1h window verified) - First sighting stamps first_broadcast_at=committed_at, last_broadcast_at=committed_at; 4h later broadcast stamps last_broadcast_at only, first_broadcast_at preserved Test count: 829 -> 844 (+15 new, 0 regressions). Foundation tests updated for new counts (REGISTRY=58, ADAPTER_META=19, schema=v11). --- meshai/adapter_config/__init__.py | 6 +- meshai/adapter_config/defaults.py | 111 ++++++ meshai/central/incident_handler.py | 3 +- meshai/central/nws_handler.py | 25 +- meshai/central/quake_handler.py | 6 +- meshai/central/swpc_handler.py | 5 +- meshai/central/wfigs_handler.py | 7 +- meshai/notifications/pipeline/__init__.py | 19 ++ meshai/notifications/reminders/__init__.py | 378 +++++++++++++++++++++ meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v11.sql | 40 +++ tests/test_adapter_config_api.py | 4 +- tests/test_adapter_config_foundation.py | 12 +- tests/test_nws_dedup_relaxation.py | 140 ++++++++ tests/test_reminders.py | 206 +++++++++++ 15 files changed, 940 insertions(+), 24 deletions(-) create mode 100644 meshai/notifications/reminders/__init__.py create mode 100644 meshai/persistence/migrations/v11.sql create mode 100644 tests/test_nws_dedup_relaxation.py create mode 100644 tests/test_reminders.py diff --git a/meshai/adapter_config/__init__.py b/meshai/adapter_config/__init__.py index fb6d805..83c513f 100644 --- a/meshai/adapter_config/__init__.py +++ b/meshai/adapter_config/__init__.py @@ -82,10 +82,12 @@ def seed_defaults(conn: sqlite3.Connection) -> tuple[int, int]: for adapter, meta in ADAPTER_META.items(): cur = conn.execute( "INSERT OR IGNORE INTO adapter_meta(" - "adapter, display_name, include_in_llm_context, description, updated_at) " - "VALUES (?,?,?,?,?)", + "adapter, display_name, include_in_llm_context, reminder_enabled, " + "description, updated_at) " + "VALUES (?,?,?,?,?,?)", (adapter, meta.get("display_name") or adapter, 1 if meta.get("include_in_llm_context", True) else 0, + 1 if meta.get("reminder_enabled", False) else 0, meta.get("description") or "", now), ) if cur.rowcount > 0: diff --git a/meshai/adapter_config/defaults.py b/meshai/adapter_config/defaults.py index 2704cfc..8cdd498 100644 --- a/meshai/adapter_config/defaults.py +++ b/meshai/adapter_config/defaults.py @@ -320,6 +320,89 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = { "type": "int", "description": "How long to hold a group_key before emitting downstream.", }, + # ================================================================= + # v0.6-phase3 reminders: per-adapter clock-driven re-broadcast config. + # ================================================================= + ("reminders_wfigs", "cadence_kind"): { + "default": "interval", + "type": "str", + "description": "Reminder cadence kind (interval | clock).", + }, + ("reminders_wfigs", "cadence_value"): { + "default": 28800, # 8h + "type": "json", + "description": "Cadence value: int seconds for interval, list of HH:MM strings for clock.", + }, + ("reminders_wfigs", "channels"): { + "default": ["mesh_broadcast"], + "type": "json", + "description": "Channel types for the reminder broadcast.", + }, + ("reminders_wfigs", "terminate_when"): { + "default": ["tombstone", "containment_100", "last_event_age_24h"], + "type": "json", + "description": "Stop reminding when any of these conditions is true.", + }, + + ("reminders_swpc", "cadence_kind"): { + "default": "interval", + "type": "str", + "description": "Reminder cadence kind (interval | clock).", + }, + ("reminders_swpc", "cadence_value"): { + "default": 28800, # 8h + "type": "json", + "description": "Cadence value: int seconds for interval, list of HH:MM strings for clock.", + }, + ("reminders_swpc", "channels"): { + "default": ["mesh_broadcast"], + "type": "json", + "description": "Channel types for the reminder broadcast.", + }, + ("reminders_swpc", "terminate_when"): { + "default": ["tombstone", "end_date_passed"], + "type": "json", + "description": "Stop reminding when any of these conditions is true.", + }, + + ("reminders_itd_511_work_zone", "cadence_kind"): { + "default": "clock", + "type": "str", + "description": "Reminder cadence kind (interval | clock).", + }, + ("reminders_itd_511_work_zone", "cadence_value"): { + "default": ["08:00"], + "type": "json", + "description": "List of HH:MM clock slots (local timezone) when reminders fire.", + }, + ("reminders_itd_511_work_zone", "channels"): { + "default": ["mesh_broadcast"], + "type": "json", + "description": "Channel types for the reminder broadcast.", + }, + ("reminders_itd_511_work_zone", "dow_mask"): { + "default": [True, True, True, True, True, True, True], + "type": "json", + "description": "Day-of-week enable mask (Mon..Sun).", + }, + ("reminders_itd_511_work_zone", "timezone"): { + "default": "America/Boise", + "type": "str", + "description": "Timezone for the clock slots.", + }, + ("reminders_itd_511_work_zone", "terminate_when"): { + "default": ["tombstone", "end_date_passed"], + "type": "json", + "description": "Stop reminding when any of these conditions is true.", + }, + + # NWS dedup-window relaxation (separate from reminders by design). + ("nws", "duplicate_allowed_after_seconds"): { + "default": 10800, # 3h + "type": "int", + "description": "Allow re-broadcast of the same CAP id after this many seconds (the nws_handler relaxes its dedup gate past this point and uses an Active: prefix).", + }, + } @@ -335,6 +418,7 @@ ADAPTER_META: dict[str, dict[str, Any]] = { "wfigs": { "display_name": "WFIGS wildfire incidents", "include_in_llm_context": True, + "reminder_enabled": True, "description": "NIFC-authoritative wildfire registry (named incidents, acres, containment).", }, "firms": { @@ -355,6 +439,7 @@ ADAPTER_META: dict[str, dict[str, Any]] = { "swpc": { "display_name": "SWPC space weather", "include_in_llm_context": True, + "reminder_enabled": True, "description": "Geomagnetic / flare / proton storm alerts (G/R/S scale).", }, "usgs_nwis": { @@ -407,6 +492,32 @@ ADAPTER_META: dict[str, dict[str, Any]] = { "include_in_llm_context": True, "description": "TTL + window tunables for the Inhibitor and Grouper stages.", }, + + # v0.6-phase3 reminder pseudo-adapters: each carries the per-adapter + # ReminderScheduler config. Their adapter_meta rows exist so the GUI + # surfaces them; include_in_llm_context is True so the LLM can answer + # "are reminders firing for fires right now?". + "reminders_wfigs": { + "display_name": "Reminders (WFIGS fires)", + "include_in_llm_context": True, + "description": "Per-fire Active: reminders. 8h interval by default.", + }, + "reminders_swpc": { + "display_name": "Reminders (SWPC space weather)", + "include_in_llm_context": True, + "description": "Active: reminders for ongoing G-storm / R-flare / S-radiation events. 8h interval.", + }, + "reminders_itd_511_work_zone": { + "display_name": "Reminders (ITD 511 work zones)", + "include_in_llm_context": True, + "description": "Clock-driven daily reminders for active road-works zones (default 08:00 Mountain).", + }, + "itd_511_work_zone": { + "display_name": "ITD 511 (work zones, reminder-eligible)", + "include_in_llm_context": True, + "reminder_enabled": True, + "description": "Subset of itd_511 traffic_events filtered to work-zone sub_type, used as the reminder target.", + }, } diff --git a/meshai/central/incident_handler.py b/meshai/central/incident_handler.py index e5df3ce..96344fa 100644 --- a/meshai/central/incident_handler.py +++ b/meshai/central/incident_handler.py @@ -654,10 +654,11 @@ def _attach_commit_handles(data: Optional[dict], *, source: str, return conn.execute( "UPDATE traffic_events SET last_broadcast_at=?, " + "first_broadcast_at=COALESCE(first_broadcast_at, ?), " "last_broadcast_magnitude=?, last_broadcast_delay_seconds=?, " "last_broadcast_icon_category=? " "WHERE source=? AND external_id=?", - (int(committed_at), magnitude, delay_seconds, icon_category, + (int(committed_at), int(committed_at), magnitude, delay_seconds, icon_category, source, external_id), ) if event_log_row_id is not None: diff --git a/meshai/central/nws_handler.py b/meshai/central/nws_handler.py index f4ebf78..572f56d 100644 --- a/meshai/central/nws_handler.py +++ b/meshai/central/nws_handler.py @@ -223,19 +223,31 @@ def handle_nws(envelope: dict, subject: str, _attach_commit(data, cap_id=cap_id, event_log_row_id=log_id) return wire - # Already broadcast -- no Update for v0.5.10 (mirrors v0.5.9 incident rule). + # v0.6-phase3: dedup-window relaxation. If the CAP id was last + # broadcast more than `nws.duplicate_allowed_after_seconds` ago, allow + # the re-broadcast with an "Active:" prefix; otherwise suppress. + last_bcast = float(row["last_broadcast_at"]) + window_s = int(adapter_config.nws.duplicate_allowed_after_seconds) + if window_s > 0 and (now - last_bcast) >= window_s: + wire = _render(event_type=event_type, area_desc=area_desc, + geocoder_city=ge.get("city"), county=county, state=state, + expires_epoch=expires_epoch, lat=lat, lon=lon, now=now, + prefix="Active") + _attach_commit(data, cap_id=cap_id, event_log_row_id=log_id) + return wire return None def _render(*, event_type, area_desc, geocoder_city, county, state, - expires_epoch, lat, lon, now) -> str: + expires_epoch, lat, lon, now, prefix: str = "") -> str: emoji = _emoji_for_event(event_type) anchor = _location_anchor(area_desc, geocoder_city, county, state) expires_seg = _format_expires_short(expires_epoch, now=now) coords = "" if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): coords = f", @ {lat:.3f},{lon:.3f}" - return f"{emoji} {event_type or 'Weather Alert'}: {anchor}, {expires_seg}{coords}" + prefix_seg = f"{prefix}: " if prefix else "" + return f"{emoji} {prefix_seg}{event_type or 'Weather Alert'}: {anchor}, {expires_seg}{coords}" def _category_to_event_type(category_raw: str) -> str: @@ -254,8 +266,11 @@ def _attach_commit(data: Optional[dict], *, cap_id: str, try: conn = get_db() except Exception: logger.exception("nws commit: persistence unavailable"); return - conn.execute("UPDATE nws_alerts SET last_broadcast_at=? WHERE event_id=?", - (int(committed_at), cap_id)) + conn.execute( + "UPDATE nws_alerts SET last_broadcast_at=?, " + "first_broadcast_at=COALESCE(first_broadcast_at, ?) " + "WHERE event_id=?", + (int(committed_at), int(committed_at), cap_id)) if event_log_row_id is not None: conn.execute("UPDATE event_log SET handled=1 WHERE id=?", (int(event_log_row_id),)) diff --git a/meshai/central/quake_handler.py b/meshai/central/quake_handler.py index c68147e..2b19944 100644 --- a/meshai/central/quake_handler.py +++ b/meshai/central/quake_handler.py @@ -195,8 +195,10 @@ def _attach_commit(data: Optional[dict], *, event_id: str, try: conn = get_db() except Exception: logger.exception("quake commit: persistence unavailable"); return - conn.execute("UPDATE quake_events SET last_broadcast_at=? WHERE event_id=?", - (int(committed_at), event_id)) + conn.execute( + "UPDATE quake_events SET last_broadcast_at=?, " + "first_broadcast_at=COALESCE(first_broadcast_at, ?) WHERE event_id=?", + (int(committed_at), int(committed_at), event_id)) if event_log_row_id is not None: conn.execute("UPDATE event_log SET handled=1 WHERE id=?", (int(event_log_row_id),)) diff --git a/meshai/central/swpc_handler.py b/meshai/central/swpc_handler.py index 32df366..f9cd90a 100644 --- a/meshai/central/swpc_handler.py +++ b/meshai/central/swpc_handler.py @@ -349,8 +349,9 @@ def _attach_commit(data: Optional[dict], *, event_id: str, except Exception: logger.exception("swpc commit: persistence unavailable"); return conn.execute( - "UPDATE swpc_events SET last_broadcast_at=? WHERE event_id=?", - (int(committed_at), event_id)) + "UPDATE swpc_events SET last_broadcast_at=?, " + "first_broadcast_at=COALESCE(first_broadcast_at, ?) WHERE event_id=?", + (int(committed_at), int(committed_at), event_id)) if event_log_row_id is not None: conn.execute("UPDATE event_log SET handled=1 WHERE id=?", (int(event_log_row_id),)) diff --git a/meshai/central/wfigs_handler.py b/meshai/central/wfigs_handler.py index 33a43d0..8ca4823 100644 --- a/meshai/central/wfigs_handler.py +++ b/meshai/central/wfigs_handler.py @@ -225,9 +225,10 @@ def _attach_commit_handles(data: Optional[dict], *, irwin_id: str, "last_broadcast_* not updated for irwin=%s", irwin_id) return conn.execute( - "UPDATE fires SET last_broadcast_at=?, last_broadcast_acres=?, " - "last_broadcast_contained=? WHERE irwin_id=?", - (int(committed_at), acres, contained_pct, irwin_id), + "UPDATE fires SET last_broadcast_at=?, " + "first_broadcast_at=COALESCE(first_broadcast_at, ?), " + "last_broadcast_acres=?, last_broadcast_contained=? WHERE irwin_id=?", + (int(committed_at), int(committed_at), acres, contained_pct, irwin_id), ) # Flip the matching event_log row to handled=1. A NULL row id # (caller forgot to thread it) is silently skipped -- the broadcast diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index dffdf3f..0a64f99 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -33,6 +33,10 @@ try: ) except ImportError: BandConditionsScheduler = None +try: + from meshai.notifications.reminders import ReminderScheduler +except ImportError: + ReminderScheduler = None from meshai.notifications.pipeline.inhibitor import Inhibitor from meshai.notifications.pipeline.grouper import Grouper from meshai.notifications.pipeline.toggle_filter import ToggleFilter @@ -227,6 +231,21 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: _lg.getLogger("meshai.pipeline").exception( "band_conditions scheduler failed to start") + # v0.6-phase3 ReminderScheduler -- runs alongside band_conditions. + if ReminderScheduler is not None: + try: + comps = getattr(bus, "_pipeline_components", {}) or {} + disp = comps.get("dispatcher") + if disp is not None: + rem_sched = ReminderScheduler(disp) + await rem_sched.start() + comps["reminder_scheduler"] = rem_sched + bus._pipeline_components = comps + except Exception: + import logging as _lg + _lg.getLogger("meshai.pipeline").exception( + "reminder scheduler failed to start") + # Phase 2.16.1: periodically flush the grouper so coalesced (routine/ # priority) events are delivered within the window even when poll cadence diff --git a/meshai/notifications/reminders/__init__.py b/meshai/notifications/reminders/__init__.py new file mode 100644 index 0000000..411f9df --- /dev/null +++ b/meshai/notifications/reminders/__init__.py @@ -0,0 +1,378 @@ +"""v0.6-phase3 ReminderScheduler. + +Third broadcast type (after New: and Update:): Active: clock-driven +re-broadcasts of still-live events at human-scale cadences. Reads +reminder config from `adapter_config` (adapter='reminders_'), reads +`reminder_enabled` from `adapter_meta`, scans the per-adapter persistence +tables on a fixed tick, and dispatches via the existing +`Dispatcher.dispatch_scheduled_broadcast()`. + +Cadence kinds: + "interval" -- re-broadcast when (NOW - last_broadcast_at) >= cadence_value (seconds). + "clock" -- re-broadcast at the configured HH:MM slots in cadence_value, + gated by dow_mask (Mon..Sun) and timezone. + +Termination conditions (per-adapter, JSON list of tokens): + "tombstone" -- row has a tombstone marker (per-handler semantics) + "containment_100" -- WFIGS: current_contained_pct >= 100 + "last_event_age_24h" -- last_event_at older than 24h + "end_date_passed" -- traffic_events.end_at / swpc payload end_time in past + +When a reminder fires: + - "Active:" prefix is composed into the wire string via the adapter's + existing renderer (or a minimal fallback). + - dispatch_scheduled_broadcast() is called (cold-start grace honored, + toggle path bypassed because reminders are clock-driven, not event- + driven). + - On successful delivery, `last_broadcast_at = NOW` is written via the + adapter table's normal commit path. `first_broadcast_at` is NEVER + touched here (only the handler sets it on first sight). + +Quiet hours are NOT respected (Phase-2 deleted that concept). +""" +from __future__ import annotations + +import asyncio +import logging +import time +from datetime import datetime, timezone +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +# Default tick. The scheduler wakes every N seconds and decides what to +# fire. 60s is fine: interval reminders are 8h-scale; clock reminders +# tolerate up to a minute of lag. +_TICK_SECONDS = 60.0 + + +# ============================================================================ +# Public scheduler +# ============================================================================ + + +class ReminderScheduler: + """Fires Active: reminders for adapters where reminder_enabled=1.""" + + def __init__(self, dispatcher, *, + clock=None, sleep=None, + tick_seconds: float = _TICK_SECONDS): + self._dispatcher = dispatcher + self._clock = clock or time.time + self._sleep = sleep or asyncio.sleep + self._tick = tick_seconds + self._task: Optional[asyncio.Task] = None + self._stop: Optional[asyncio.Event] = None + + async def start(self) -> None: + if self._task is not None and not self._task.done(): + raise RuntimeError("ReminderScheduler already running") + self._stop = asyncio.Event() + self._task = asyncio.create_task(self._run(), name="reminder-scheduler") + logger.info("ReminderScheduler started; tick=%ss", self._tick) + + async def stop(self) -> None: + if self._stop: self._stop.set() + if self._task: + try: await self._task + except Exception: pass + + async def _run(self) -> None: + while not (self._stop and self._stop.is_set()): + try: + await self.tick_once() + except Exception: + logger.exception("ReminderScheduler tick failed") + try: + await asyncio.wait_for(self._stop.wait(), timeout=self._tick) + return + except asyncio.TimeoutError: + pass + + async def tick_once(self) -> int: + """One pass over every reminder-enabled adapter. Returns broadcasts + fired. Public so tests can drive single ticks deterministically.""" + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + return 0 + + enabled = [r["adapter"] for r in conn.execute( + "SELECT adapter FROM adapter_meta WHERE reminder_enabled = 1" + ).fetchall()] + if not enabled: + return 0 + + fired = 0 + for adapter in enabled: + try: + fired += await self._tick_adapter(adapter) + except Exception: + logger.exception("reminder tick failed for adapter=%s", adapter) + return fired + + # ---- per-adapter -------------------------------------------------- + + async def _tick_adapter(self, adapter: str) -> int: + """Look up the reminder config + query the adapter's table.""" + cfg = _ReminderConfig.load(adapter) + if cfg is None: + return 0 + + now = self._clock() + + if cfg.cadence_kind == "interval": + return await self._tick_interval(adapter, cfg, now) + if cfg.cadence_kind == "clock": + return await self._tick_clock(adapter, cfg, now) + logger.debug("reminder %s: unknown cadence_kind=%r", adapter, cfg.cadence_kind) + return 0 + + async def _tick_interval(self, adapter: str, cfg: "_ReminderConfig", now: float) -> int: + cadence = int(cfg.cadence_value) if cfg.cadence_value else 0 + if cadence <= 0: + return 0 + + targets = self._rows_for(adapter, now, cadence_seconds=cadence) + fired = 0 + for r in targets: + if self._terminated(adapter, r, cfg.terminate_when, now): + continue + wire = self._render(adapter, r, prefix="Active") + if not wire: + continue + ok = await self._dispatch(adapter, r, wire) + if ok: + self._stamp_broadcast(adapter, r, now) + fired += 1 + return fired + + async def _tick_clock(self, adapter: str, cfg: "_ReminderConfig", now: float) -> int: + # cadence_value is a list of HH:MM strings. + slots = list(cfg.cadence_value or []) + if not slots: + return 0 + # dow_mask: Mon=0..Sun=6 (Python weekday convention). + dow_mask = cfg.dow_mask or [True] * 7 + tz_name = cfg.timezone or "UTC" + # Localize "now" to the configured tz. + try: + from zoneinfo import ZoneInfo + local_dt = datetime.fromtimestamp(now, tz=timezone.utc).astimezone(ZoneInfo(tz_name)) + except Exception: + local_dt = datetime.fromtimestamp(now, tz=timezone.utc) + if not dow_mask[local_dt.weekday()]: + return 0 + # Find the most recent slot we re past today. + current_local_min = local_dt.hour * 60 + local_dt.minute + slot_minutes = [] + for s in slots: + try: + hh, mm = s.split(":") + slot_minutes.append(int(hh) * 60 + int(mm)) + except Exception: + continue + # Has any slot just passed and we haven t fired today? + # Simplest: fire iff there's a slot in (current - tick_minutes, current]. + tick_min = max(1, int(self._tick / 60)) + recent_slots = [s for s in slot_minutes + if current_local_min - tick_min <= s <= current_local_min] + if not recent_slots: + return 0 + # Targets: every row last broadcast more than ~24h ago (so daily + # reminders don t double up if a slot fires twice on the same day). + targets = self._rows_for(adapter, now, cadence_seconds=86400) + fired = 0 + for r in targets: + if self._terminated(adapter, r, cfg.terminate_when, now): + continue + wire = self._render(adapter, r, prefix="Active") + if not wire: continue + ok = await self._dispatch(adapter, r, wire) + if ok: + self._stamp_broadcast(adapter, r, now) + fired += 1 + return fired + + # ---- table-specific helpers -------------------------------------- + + def _rows_for(self, adapter: str, now: float, + *, cadence_seconds: int) -> list: + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + return [] + cutoff = now - cadence_seconds + if adapter == "wfigs": + return list(conn.execute( + "SELECT * FROM fires WHERE last_broadcast_at IS NOT NULL " + "AND last_broadcast_at <= ?", + (cutoff,), + ).fetchall()) + if adapter == "swpc": + return list(conn.execute( + "SELECT * FROM swpc_events WHERE last_broadcast_at IS NOT NULL " + "AND last_broadcast_at <= ?", + (cutoff,), + ).fetchall()) + if adapter == "itd_511_work_zone": + return list(conn.execute( + "SELECT * FROM traffic_events WHERE source='itd_511' " + "AND sub_type='road_works' " + "AND last_broadcast_at IS NOT NULL " + "AND last_broadcast_at <= ?", + (cutoff,), + ).fetchall()) + return [] + + def _terminated(self, adapter: str, row, terminate_when: list, now: float) -> bool: + if not terminate_when: return False + tokens = set(terminate_when) + if adapter == "wfigs": + if "containment_100" in tokens: + c = row["current_contained_pct"] + if c is not None and int(c) >= 100: + return True + if "last_event_age_24h" in tokens: + le = row["last_event_at"] + if le is not None and (now - float(le)) > 86400: + return True + # "tombstone" -- fires row has no flag; best-effort via event_log + # would be expensive on each tick. Skip; deliberate scope-limit. + return False + if adapter == "swpc": + if "end_date_passed" in tokens: + # swpc_events payload_json may carry an end time. Best-effort + # scan -- not all payloads include it. We treat its absence + # as "not terminated". + import json + try: + payload = json.loads(row["payload_json"] or "{}") + end_raw = payload.get("end_time") or payload.get("end") + if end_raw: + from datetime import datetime as _dt + try: + end_epoch = int(_dt.fromisoformat( + str(end_raw).replace("Z", "+00:00")).timestamp()) + if end_epoch < now: + return True + except Exception: + pass + except Exception: + pass + return False + if adapter == "itd_511_work_zone": + if "end_date_passed" in tokens: + end_at = row["end_at"] + if end_at is not None and float(end_at) < now: + return True + return False + return False + + def _render(self, adapter: str, row, *, prefix: str) -> str: + """Best-effort Active: wire string per adapter family.""" + if adapter == "wfigs": + name = row["incident_name"] or "(unnamed)" + acres = row["current_acres"] + cont = row["current_contained_pct"] + acres_s = "N/A" if acres is None else f"{int(acres):,} ac" + cont_s = "?" if cont is None else f"{int(cont)}%" + anchor = " / ".join(p for p in (row["county"], row["state"]) if p) or "?" + return f"🔥 {prefix}: {name}, {anchor}: {acres_s}, {cont_s} contained" + if adapter == "swpc": + return f"🌌 {prefix}: ongoing space weather event ({row['event_type']})" + if adapter == "itd_511_work_zone": + road = row["road"] or "road?" + county = row["county"] or "?" + return f"🚧 {prefix}: ongoing work zone on {road} ({county})" + return "" + + async def _dispatch(self, adapter: str, row, wire: str) -> bool: + """Send via dispatcher.dispatch_scheduled_broadcast (cold-start + grace honored, no toggle-path freshness gating).""" + table, pk = self._row_pk(adapter, row) + try: + return bool(await self._dispatcher.dispatch_scheduled_broadcast( + text=wire, source_event_table=table, source_event_pk=pk, + )) + except Exception: + logger.exception("reminder dispatch failed adapter=%s pk=%s", adapter, pk) + return False + + def _row_pk(self, adapter: str, row) -> tuple[str, str]: + if adapter == "wfigs": return ("fires", str(row["irwin_id"])) + if adapter == "swpc": return ("swpc_events", str(row["event_id"])) + if adapter == "itd_511_work_zone": + return ("traffic_events", f"{row['source']}|{row['external_id']}") + return ("?", "?") + + def _stamp_broadcast(self, adapter: str, row, now: float) -> None: + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + return + if adapter == "wfigs": + conn.execute( + "UPDATE fires SET last_broadcast_at=? WHERE irwin_id=?", + (now, row["irwin_id"]), + ) + elif adapter == "swpc": + conn.execute( + "UPDATE swpc_events SET last_broadcast_at=? WHERE event_id=?", + (now, row["event_id"]), + ) + elif adapter == "itd_511_work_zone": + conn.execute( + "UPDATE traffic_events SET last_broadcast_at=? " + "WHERE source=? AND external_id=?", + (now, row["source"], row["external_id"]), + ) + + +# ============================================================================ +# Config helper +# ============================================================================ + + +class _ReminderConfig: + """Snapshot of the reminders_ config rows.""" + __slots__ = ("cadence_kind", "cadence_value", "channels", "terminate_when", + "dow_mask", "timezone") + + def __init__(self, **kw): + for k in self.__slots__: + setattr(self, k, kw.get(k)) + + @classmethod + def load(cls, adapter: str) -> Optional["_ReminderConfig"]: + """Pull reminders_ rows from adapter_config.""" + try: + from meshai.adapter_config import adapter_config + ac_adapter = f"reminders_{adapter}" + ck = _safe_get(adapter_config, ac_adapter, "cadence_kind") + if ck is None: + return None + return cls( + cadence_kind=ck, + cadence_value=_safe_get(adapter_config, ac_adapter, "cadence_value"), + channels=_safe_get(adapter_config, ac_adapter, "channels") or [], + terminate_when=_safe_get(adapter_config, ac_adapter, "terminate_when") or [], + dow_mask=_safe_get(adapter_config, ac_adapter, "dow_mask"), + timezone=_safe_get(adapter_config, ac_adapter, "timezone"), + ) + except Exception: + logger.exception("reminders: config load failed for %s", adapter) + return None + + +def _safe_get(adapter_config, adapter: str, key: str) -> Any: + try: + return getattr(getattr(adapter_config, adapter), key) + except AttributeError: + return None + except Exception: + return None diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index 54b6b61..2c2bf27 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 10 +SCHEMA_VERSION = 11 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v11.sql b/meshai/persistence/migrations/v11.sql new file mode 100644 index 0000000..9dc2a03 --- /dev/null +++ b/meshai/persistence/migrations/v11.sql @@ -0,0 +1,40 @@ +-- v0.6-phase3 schema split: first_broadcast_at vs last_broadcast_at + +-- adapter_meta.reminder_enabled. +-- +-- Pre-this-migration every adapter table tracked only last_broadcast_at, +-- so the wire-string prefix logic couldn t tell "this is the first time +-- we ve ever announced this row" from "this is the Nth Active: reminder". +-- The split lets the handler / ReminderScheduler stamp: +-- New: first_broadcast_at IS NULL (first sight) +-- Update: WFIGS material change (handler-level) +-- Active: ReminderScheduler clock fire (first_broadcast_at IS NOT NULL) +-- +-- Backfill: any existing row whose last_broadcast_at IS NOT NULL gets its +-- first_broadcast_at = last_broadcast_at -- we don t know the actual first +-- time but treating the existing broadcast time as both is the conservative +-- read ("first observable broadcast we have a record of"). + +ALTER TABLE fires ADD COLUMN first_broadcast_at REAL; +ALTER TABLE nws_alerts ADD COLUMN first_broadcast_at REAL; +ALTER TABLE traffic_events ADD COLUMN first_broadcast_at REAL; +ALTER TABLE quake_events ADD COLUMN first_broadcast_at REAL; +ALTER TABLE swpc_events ADD COLUMN first_broadcast_at REAL; +ALTER TABLE gauge_readings ADD COLUMN first_broadcast_at REAL; + +UPDATE fires SET first_broadcast_at = last_broadcast_at WHERE last_broadcast_at IS NOT NULL; +UPDATE nws_alerts SET first_broadcast_at = last_broadcast_at WHERE last_broadcast_at IS NOT NULL; +UPDATE traffic_events SET first_broadcast_at = last_broadcast_at WHERE last_broadcast_at IS NOT NULL; +UPDATE quake_events SET first_broadcast_at = last_broadcast_at WHERE last_broadcast_at IS NOT NULL; +UPDATE swpc_events SET first_broadcast_at = last_broadcast_at WHERE last_broadcast_at IS NOT NULL; +-- gauge_readings has no last_broadcast_at column -- the time-series table +-- inserts a new row per reading. The first_broadcast_at column is added +-- for schema uniformity but the ReminderScheduler doesn t scan it today. + +-- adapter_meta: which adapters PRODUCE clock-driven reminders. +ALTER TABLE adapter_meta ADD COLUMN reminder_enabled INTEGER NOT NULL DEFAULT 0; + +-- Default reminder_enabled=1 for the three adapter families that opt in. +-- The itd_511_work_zone row itself is seeded by adapter_config.defaults in +-- the same boot via INSERT OR IGNORE; this UPDATE just ensures the flag +-- holds when the row already exists. +UPDATE adapter_meta SET reminder_enabled = 1 WHERE adapter IN ('wfigs', 'swpc'); diff --git a/tests/test_adapter_config_api.py b/tests/test_adapter_config_api.py index 9b1f9db..be65fb3 100644 --- a/tests/test_adapter_config_api.py +++ b/tests/test_adapter_config_api.py @@ -29,14 +29,14 @@ def client(): # ============================================================================ -def test_list_returns_all_43_keys(client): +def test_list_returns_all_58_keys(client): r = client.get("/api/adapter-config") assert r.status_code == 200 body = r.json() # 14 adapters with at least one key (itd_511 has zero -- not in the # grouped dict because the SQL only returns rows that exist). total = sum(len(v) for v in body.values()) - assert total == 43 + assert total == 58 def test_list_grouped_by_adapter(client): diff --git a/tests/test_adapter_config_foundation.py b/tests/test_adapter_config_foundation.py index f137072..f8e0cec 100644 --- a/tests/test_adapter_config_foundation.py +++ b/tests/test_adapter_config_foundation.py @@ -54,11 +54,11 @@ def test_v6_tables_exist(fresh_db): assert "adapter_meta" in tables -def test_schema_meta_at_v10(fresh_db): +def test_schema_meta_at_v11(fresh_db): v = fresh_db.execute( "SELECT value FROM schema_meta WHERE key='version'" ).fetchone()["value"] - assert int(v) == 10 + assert int(v) == 11 def test_adapter_config_type_check_constrains_vocabulary(fresh_db): @@ -73,16 +73,16 @@ def test_adapter_config_type_check_constrains_vocabulary(fresh_db): # ---------- registry shape ----------------------------------------------- -def test_registry_at_43_entries(): +def test_registry_at_58_entries(): """v0.6-3a.1 trim: 43 CONFIG-only keys (was 77 in v0.6-3a draft).""" - assert len(REGISTRY) == 43, ( + assert len(REGISTRY) == 58, ( f"REGISTRY should have 43 entries after CONFIG-vs-CODE trim; got {len(REGISTRY)}. " f"If a sentence template / emoji / heuristic snuck in, it belongs in CODE not config." ) -def test_adapter_meta_at_15(fresh_db): - assert len(ADAPTER_META) == 15 +def test_adapter_meta_at_19(fresh_db): + assert len(ADAPTER_META) == 19 # ---------- seed ---------------------------------------------------------- diff --git a/tests/test_nws_dedup_relaxation.py b/tests/test_nws_dedup_relaxation.py new file mode 100644 index 0000000..b60da18 --- /dev/null +++ b/tests/test_nws_dedup_relaxation.py @@ -0,0 +1,140 @@ +"""v0.6-phase3 NWS dedup-window relaxation tests. + +The same CAP id is now re-broadcast (with Active: prefix) when more than +`nws.duplicate_allowed_after_seconds` (default 10800 = 3h) have elapsed +since the last broadcast. +""" +from __future__ import annotations + +import time + +import pytest + +from meshai.central.nws_handler import handle_nws +from meshai.persistence import get_db + + +def _env(*, cap_id="urn:oid:dedup.001", event="Severe Thunderstorm Warning", + severity="Severe", area="Ada County", county="Ada", state="ID", + expires="2026-06-05T03:00:00Z", lat=43.6, lon=-116.2, + category="wx.alert.severe_thunderstorm_warning"): + return { + "id": cap_id, "subject": "central.wx.alert.us.id", + "data": { + "id": cap_id, "adapter": "nws", "category": category, + "severity": 2, + "geo": {"centroid": [lon, lat], "primary_region": "US-ID"}, + "data": { + "id": cap_id, "event": event, "severity": severity, + "areaDesc": area, "msgType": "Alert", + "headline": f"{event} for {area}", + "description": "X", "expires": expires, + "_enriched": {"geocoder": {"city": None, + "county": county, "state": state}}, + }, + }, + } + + +def _commit(data, ts): + cb = data["_on_broadcast_committed"] + cb(float(ts)) + + +def test_first_broadcast_no_active_prefix(): + """A first sighting renders without Active: prefix.""" + env = _env() + data = {} + wire = handle_nws(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert "Active:" not in wire + + +def test_repeat_within_3h_suppressed(): + env = _env(cap_id="urn:oid:rep1") + data = {} + # First broadcast at t=0. + wire1 = handle_nws(env, env["subject"], data=data, now=0) + assert wire1 is not None + _commit(data, 0) + + # Same CAP id again 2h later -- inside 3h window -> suppressed. + env2 = _env(cap_id="urn:oid:rep1") + data2 = {} + wire2 = handle_nws(env2, env2["subject"], data=data2, now=2 * 3600) + assert wire2 is None + + +def test_repeat_after_3h_allowed_with_active_prefix(): + env = _env(cap_id="urn:oid:rep2") + data = {} + wire1 = handle_nws(env, env["subject"], data=data, now=0) + assert wire1 is not None + _commit(data, 0) + + # Same CAP id 4h later -> allowed, Active: prefix. + env2 = _env(cap_id="urn:oid:rep2") + data2 = {} + wire2 = handle_nws(env2, env2["subject"], data=data2, now=4 * 3600) + assert wire2 is not None + assert "Active:" in wire2 + + +def test_dedup_window_respects_config_override(): + """Changing nws.duplicate_allowed_after_seconds via adapter_config takes effect.""" + from meshai.adapter_config import invalidate_cache + conn = get_db() + conn.execute( + "UPDATE adapter_config SET value_json='3600' " + "WHERE adapter='nws' AND key='duplicate_allowed_after_seconds'" + ) + invalidate_cache() + + env = _env(cap_id="urn:oid:tunable") + data = {} + handle_nws(env, env["subject"], data=data, now=0) + _commit(data, 0) + + # 90 min later -> still suppressed (under new 1h window). + # Wait, 90 min > 60 min so it would BE allowed. Use 30 min instead. + env2 = _env(cap_id="urn:oid:tunable") + data2 = {} + wire = handle_nws(env2, env2["subject"], data=data2, now=30 * 60) + assert wire is None # 30 min < 60 min window + + # 2h later -> allowed (over 1h window now). + env3 = _env(cap_id="urn:oid:tunable") + data3 = {} + wire3 = handle_nws(env3, env3["subject"], data=data3, now=2 * 3600) + assert wire3 is not None + assert "Active:" in wire3 + + +def test_handler_stamps_first_broadcast_at(): + """The commit callback writes first_broadcast_at via COALESCE -- only on + the first commit, never overwriting it.""" + env = _env(cap_id="urn:oid:stamp") + data = {} + handle_nws(env, env["subject"], data=data, now=0) + _commit(data, 100.0) + + conn = get_db() + row = conn.execute( + "SELECT first_broadcast_at, last_broadcast_at FROM nws_alerts " + "WHERE event_id='urn:oid:stamp'" + ).fetchone() + assert row["first_broadcast_at"] == 100.0 + assert row["last_broadcast_at"] == 100.0 + + # Second broadcast 4h later -> last_broadcast_at updates, first_broadcast_at preserved. + env2 = _env(cap_id="urn:oid:stamp") + data2 = {} + wire2 = handle_nws(env2, env2["subject"], data=data2, now=4 * 3600) + assert wire2 is not None + _commit(data2, 4 * 3600.0) + row2 = conn.execute( + "SELECT first_broadcast_at, last_broadcast_at FROM nws_alerts " + "WHERE event_id='urn:oid:stamp'" + ).fetchone() + assert row2["first_broadcast_at"] == 100.0 # unchanged + assert row2["last_broadcast_at"] == 4 * 3600.0 diff --git a/tests/test_reminders.py b/tests/test_reminders.py new file mode 100644 index 0000000..5d07228 --- /dev/null +++ b/tests/test_reminders.py @@ -0,0 +1,206 @@ +"""v0.6-phase3 ReminderScheduler tests.""" +from __future__ import annotations + +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from meshai.notifications.reminders import ReminderScheduler +from meshai.persistence import get_db + + +# ---------- helpers -------------------------------------------------------- + + +def _seed_fire(conn, *, irwin_id, last_broadcast_at, current_contained_pct=10, + last_event_at=None, name="Test Fire"): + if last_event_at is None: last_event_at = int(time.time()) + conn.execute( + "INSERT OR REPLACE INTO fires(irwin_id, incident_name, incident_type, " + "current_acres, current_contained_pct, lat, lon, county, state, " + "declared_at, last_event_at, first_broadcast_at, last_broadcast_at) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", + (irwin_id, name, "WF", 500, current_contained_pct, + 42.5, -114.5, "Cassia", "ID", + last_broadcast_at, last_event_at, last_broadcast_at, last_broadcast_at), + ) + + +def _seed_swpc(conn, *, event_id, last_broadcast_at, event_type="swpc_kindex"): + conn.execute( + "INSERT OR REPLACE INTO swpc_events(event_id, event_type, severity_int, " + "payload_json, occurred_at, first_seen_at, first_broadcast_at, " + "last_broadcast_at) VALUES (?,?,?,?,?,?,?,?)", + (event_id, event_type, 7, "{}", last_broadcast_at, last_broadcast_at, + last_broadcast_at, last_broadcast_at), + ) + + +def _seed_work_zone(conn, *, external_id, last_broadcast_at, end_at=None, + sub_type="road_works"): + conn.execute( + "INSERT OR REPLACE INTO traffic_events(source, external_id, road, " + "direction, county, state, sub_type, impact, first_seen_at, " + "last_seen_at, first_broadcast_at, last_broadcast_at, end_at) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", + ("itd_511", external_id, "I-84", "E", "Ada", "ID", sub_type, + None, last_broadcast_at, last_broadcast_at, + last_broadcast_at, last_broadcast_at, end_at), + ) + + +@pytest.fixture +def mock_dispatcher(): + d = MagicMock() + d.dispatch_scheduled_broadcast = AsyncMock(return_value=True) + return d + + +# ============================================================================ +# Interval cadence (wfigs) +# ============================================================================ + + +def test_wfigs_reminder_fires_past_cadence(mock_dispatcher): + """A fire whose last_broadcast_at is past the 8h cadence emits Active:.""" + now = 1_780_000_000 + conn = get_db() + _seed_fire(conn, irwin_id="F1", last_broadcast_at=now - 9 * 3600) + + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + fired = asyncio.run(sch.tick_once()) + assert fired == 1 + mock_dispatcher.dispatch_scheduled_broadcast.assert_called_once() + args = mock_dispatcher.dispatch_scheduled_broadcast.call_args.kwargs + assert "Active" in args["text"] + assert args["source_event_table"] == "fires" + assert args["source_event_pk"] == "F1" + + +def test_wfigs_reminder_skipped_within_cadence(mock_dispatcher): + """A fire broadcast 1h ago is within the 8h cadence -> no reminder.""" + now = 1_780_000_000 + conn = get_db() + _seed_fire(conn, irwin_id="F1", last_broadcast_at=now - 3600) + + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + fired = asyncio.run(sch.tick_once()) + assert fired == 0 + mock_dispatcher.dispatch_scheduled_broadcast.assert_not_called() + + +def test_wfigs_reminder_skipped_when_containment_100(mock_dispatcher): + """containment_100 termination overrides the cadence.""" + now = 1_780_000_000 + conn = get_db() + _seed_fire(conn, irwin_id="F1", last_broadcast_at=now - 9 * 3600, + current_contained_pct=100) + + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + fired = asyncio.run(sch.tick_once()) + assert fired == 0 + mock_dispatcher.dispatch_scheduled_broadcast.assert_not_called() + + +def test_wfigs_reminder_skipped_when_last_event_age_24h(mock_dispatcher): + """If last_event_at > 24h ago, treat the fire as gone -> no reminder.""" + now = 1_780_000_000 + conn = get_db() + _seed_fire(conn, irwin_id="F1", last_broadcast_at=now - 9 * 3600, + last_event_at=now - 48 * 3600) + + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + fired = asyncio.run(sch.tick_once()) + assert fired == 0 + + +def test_wfigs_reminder_stamps_last_broadcast_at(mock_dispatcher): + now = 1_780_000_000 + conn = get_db() + _seed_fire(conn, irwin_id="F1", last_broadcast_at=now - 9 * 3600) + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + asyncio.run(sch.tick_once()) + row = conn.execute("SELECT last_broadcast_at, first_broadcast_at FROM fires WHERE irwin_id='F1'").fetchone() + assert row["last_broadcast_at"] == now + # first_broadcast_at preserved + assert row["first_broadcast_at"] == now - 9 * 3600 + + +def test_reminder_disabled_adapter_does_nothing(mock_dispatcher): + """When adapter_meta.reminder_enabled=0 for wfigs, no reminders fire.""" + now = 1_780_000_000 + conn = get_db() + _seed_fire(conn, irwin_id="F1", last_broadcast_at=now - 9 * 3600) + conn.execute("UPDATE adapter_meta SET reminder_enabled=0 WHERE adapter='wfigs'") + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + fired = asyncio.run(sch.tick_once()) + assert fired == 0 + + +# ============================================================================ +# SWPC interval cadence +# ============================================================================ + + +def test_swpc_reminder_fires(mock_dispatcher): + now = 1_780_000_000 + conn = get_db() + _seed_swpc(conn, event_id="S1", last_broadcast_at=now - 10 * 3600) + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now) + fired = asyncio.run(sch.tick_once()) + assert fired == 1 + args = mock_dispatcher.dispatch_scheduled_broadcast.call_args.kwargs + assert "Active" in args["text"] + assert args["source_event_table"] == "swpc_events" + + +# ============================================================================ +# itd_511_work_zone clock cadence +# ============================================================================ + + +def test_work_zone_reminder_fires_at_clock_slot(mock_dispatcher): + """At the 08:00 Mountain slot, an active work zone broadcasts Active:.""" + # 08:00 America/Boise on a Monday. Compute UTC equivalent. + from datetime import datetime + from zoneinfo import ZoneInfo + # Pick a Monday at exactly 08:00 Mountain. + local = datetime(2026, 6, 8, 8, 0, 0, tzinfo=ZoneInfo("America/Boise")) + now = int(local.timestamp()) + conn = get_db() + _seed_work_zone(conn, external_id="WZ1", last_broadcast_at=now - 48 * 3600) + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now, tick_seconds=60) + fired = asyncio.run(sch.tick_once()) + assert fired == 1 + args = mock_dispatcher.dispatch_scheduled_broadcast.call_args.kwargs + assert "Active" in args["text"] + assert args["source_event_table"] == "traffic_events" + + +def test_work_zone_reminder_skipped_when_end_date_passed(mock_dispatcher): + from datetime import datetime + from zoneinfo import ZoneInfo + local = datetime(2026, 6, 8, 8, 0, 0, tzinfo=ZoneInfo("America/Boise")) + now = int(local.timestamp()) + conn = get_db() + _seed_work_zone(conn, external_id="WZ1", + last_broadcast_at=now - 48 * 3600, + end_at=now - 3600) # end_at in the past + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now, tick_seconds=60) + fired = asyncio.run(sch.tick_once()) + assert fired == 0 + + +def test_work_zone_reminder_skipped_outside_slot(mock_dispatcher): + """At 14:00 Mountain (no slot), no reminder fires.""" + from datetime import datetime + from zoneinfo import ZoneInfo + local = datetime(2026, 6, 8, 14, 0, 0, tzinfo=ZoneInfo("America/Boise")) + now = int(local.timestamp()) + conn = get_db() + _seed_work_zone(conn, external_id="WZ1", last_broadcast_at=now - 48 * 3600) + sch = ReminderScheduler(mock_dispatcher, clock=lambda: now, tick_seconds=60) + fired = asyncio.run(sch.tick_once()) + assert fired == 0