From 0da83e0d3d1f2a47e8cc1f5f356c823b1210c22f Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Fri, 5 Jun 2026 07:38:51 +0000 Subject: [PATCH] feat(v0.5.11): band conditions scheduled broadcaster (3x/day HF propagation) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First clock-driven broadcaster in meshai, distinct from the v0.5.8b/v0.5.9/v0.5.10 event-driven adapters. The same persistence + dispatcher + cold-start patterns apply, but the trigger is the wall clock at 06:00 / 14:00 / 22:00 Mountain Time (default; GUI-configurable per Rule 17). Components: (1) meshai/notifications/scheduled/band_conditions.py with BandConditionsScheduler (asyncio loop, mirrors the existing DigestScheduler shape), compute_band_ratings() with two-tier data sourcing -- (a) latest swpc_kindex + swpc_alerts F10.7 rows from persistence within the last 6h, (b) HamQSL.com solarxml.php fallback when SWPC is stale or incomplete, (c) silent skip when both fail, format_band_conditions_wire() multi-line MEDIUM output (~115-120B). (2) v3 schema migration adding band_conditions_broadcasts(broadcast_id PK AUTO, sent_at, scheduled_for UNIQUE, ratings_json, source). UNIQUE(scheduled_for) enforces per-slot dedup so a retry storm cannot double-broadcast. (3) Dispatcher.dispatch_scheduled_broadcast() bypasses the toggle / rules / freshness-gate pipeline but DOES honour the v0.5.8b cold-start grace -- first scheduled broadcast within the grace window after meshai starts is suppressed, mesh_broadcasts_out audit row only inserted on actual delivery. Channel selection routes through the rf_propagation toggle\'s broadcast_channel since band conditions IS RF-propagation info. (4) NotificationsConfig gains band_conditions_enabled (default true), band_conditions_schedule (list of HH:MM strings, default ["06:00","14:00","22:00"]), band_conditions_tz (default "America/Boise" so DST handles automatically). (5) Notifications.tsx grows a Band Conditions card between Cold-Start Grace and Master Toggles with the enable toggle + 3 TimeInput slots + a one-liner explaining the source priority. (6) build_pipeline + start_pipeline spawn the BandConditionsScheduler alongside the existing DigestScheduler -- best-effort, scheduler failures must NOT break notifications startup. Wire format examples (multi-line, all under 130B target): ☀️ Day Propagation 📡 Band Conditions: 80-40m: 🟡 Fair 30-20m: 🟢 Good 17-15m: 🟢 Good 12-10m: 🟡 Fair 🌞 Day Propagation (14:00 slot when storm onset, Kp=6 SFI=110) 📡 Band Conditions: 80-40m: 🔴 Poor 30-20m: 🔴 Poor 17-15m: 🔴 Poor 12-10m: 🟡 Fair 🌙 Night Propagation (22:00 slot, recovery, Kp=4 SFI=120) 📡 Band Conditions: 80-40m: 🟡 Fair 30-20m: 🟡 Fair 17-15m: 🔴 Poor 12-10m: 🔴 Poor Tests: was 686 (v0.5.10 baseline), now 704 (+18 net new -- quiet/storm condition ratings, HamQSL XML parse fallback, both-fail silent-skip path, is_day_slot per HH:MM, wire format for all 3 slot variants, byte-size guard, 6-line shape, fire_slot record row, dedup via UNIQUE constraint, silent-skip path, slot_epoch DST alignment summer + winter). Synthetic 24h probe verified the 3 expected slots fire correctly with quiet/storm/recovery scenarios + the 4th no-data scenario lands as source=\'skipped_no_data\' with no broadcast. usgs_nwis deferred to v0.5.12 (threshold-curation work). Master OFF in prod. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/pages/Notifications.tsx | 51 +++ meshai/config.py | 7 + meshai/notifications/pipeline/__init__.py | 23 + meshai/notifications/pipeline/dispatcher.py | 86 ++++ meshai/notifications/scheduled/__init__.py | 27 ++ .../scheduled/band_conditions.py | 425 ++++++++++++++++++ meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v3.sql | 25 ++ tests/test_band_conditions.py | 312 +++++++++++++ 9 files changed, 957 insertions(+), 1 deletion(-) create mode 100644 meshai/notifications/scheduled/__init__.py create mode 100644 meshai/notifications/scheduled/band_conditions.py create mode 100644 meshai/persistence/migrations/v3.sql create mode 100644 tests/test_band_conditions.py diff --git a/dashboard-frontend/src/pages/Notifications.tsx b/dashboard-frontend/src/pages/Notifications.tsx index 97b4b03..3c4d97a 100644 --- a/dashboard-frontend/src/pages/Notifications.tsx +++ b/dashboard-frontend/src/pages/Notifications.tsx @@ -65,6 +65,8 @@ interface NotificationsConfig { quiet_hours_start: string quiet_hours_end: string cold_start_grace_seconds?: number + band_conditions_enabled?: boolean + band_conditions_schedule?: string[] rules: NotificationRuleConfig[] toggles?: Record } @@ -2135,6 +2137,55 @@ export default function Notifications() { /> + {/* Band Conditions -- v0.5.11 */} +
+
+ +
+ setConfig({ ...config, band_conditions_enabled: v })} + helper="3x/day HF propagation summary (Day/Night ratings per band group)" + info="Source priority: (1) recent SWPC readings persisted locally; (2) HamQSL.com fallback; (3) silent skip if both fail. Persistence rows are written either way for an audit trail." + /> + {(config.band_conditions_enabled ?? true) && ( +
+ { + const s = [...(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])] + s[0] = v + setConfig({ ...config, band_conditions_schedule: s }) + }} + helper="Morning (default 06:00 MT)" + /> + { + const s = [...(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])] + s[1] = v + setConfig({ ...config, band_conditions_schedule: s }) + }} + helper="Afternoon (default 14:00 MT)" + /> + { + const s = [...(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])] + s[2] = v + setConfig({ ...config, band_conditions_schedule: s }) + }} + helper="Night (default 22:00 MT)" + /> +
+ )} +

All times are Mountain Time (America/Boise). DST handled automatically.

+
+ {/* Master Toggles */} {config.toggles && ( NotificationToggle digest: DigestConfig = field(default_factory=DigestConfig) rules: list = field(default_factory=list) # List of NotificationRuleConfig diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 0e632bc..ad05225 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -27,6 +27,12 @@ import logging from meshai.notifications.channels import create_channel from meshai.notifications.pipeline.bus import EventBus, get_bus from meshai.notifications.pipeline.dispatcher import Dispatcher +try: + from meshai.notifications.scheduled.band_conditions import ( + BandConditionsScheduler, + ) +except ImportError: + BandConditionsScheduler = None from meshai.notifications.pipeline.inhibitor import Inhibitor from meshai.notifications.pipeline.grouper import Grouper from meshai.notifications.pipeline.toggle_filter import ToggleFilter @@ -194,6 +200,23 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler: connector=connector, ) await scheduler.start() + # v0.5.11 band-conditions scheduler -- spawn alongside the + # digest scheduler. Best-effort: failures in the scheduled + # broadcaster must NOT break notifications pipeline startup. + if BandConditionsScheduler is not None: + try: + comps = getattr(bus, "_pipeline_components", {}) or {} + disp = comps.get("dispatcher") + if disp is not None: + bc_sched = BandConditionsScheduler(config, disp) + await bc_sched.start() + comps["band_conditions_scheduler"] = bc_sched + bus._pipeline_components = comps + except Exception: + import logging as _lg + _lg.getLogger("meshai.pipeline").exception( + "band_conditions scheduler failed to start") + # Phase 2.16.1: periodically flush the grouper so coalesced (routine/ # priority) events are delivered within the window even when poll cadence diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index dfda419..04ad3ac 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -252,6 +252,92 @@ class Dispatcher: "dedup_lru_size": len(self._dedup_lru), } + async def dispatch_scheduled_broadcast(self, text: str, *, + source_event_table: str, + source_event_pk: str, + ) -> bool: + """v0.5.11 scheduled broadcast entry point. + + Bypasses the normal toggle / rules / freshness-gate pipeline + because scheduled broadcasts are intentionally periodic and + already pre-composed. Cold-start grace still applies so the + very first scheduled broadcast after meshai starts is + suppressed (consistent with how event-driven adapters behave). + + Channel selection: routes through the rf_propagation toggle\'s + broadcast_channel since band conditions IS RF-propagation info. + If that toggle is not configured with a channel, the broadcast + is dropped (with a log). + + Returns True on successful mesh delivery, False on grace-drop + or any other suppression. + """ + # Cold-start grace (mirrors _dispatch_toggles Section 0). + grace_s = int(getattr(self._config.notifications, + "cold_start_grace_seconds", 60) or 0) + if grace_s > 0: + now_anchor = time.time() + if self._first_event_at is None: + self._first_event_at = now_anchor + if (now_anchor - self._first_event_at) < grace_s: + self._cold_start_dropped += 1 + self._logger.info( + "cold-start grace: dropping scheduled broadcast " + "(table=%s pk=%s)", + source_event_table, source_event_pk) + return False + + # Route through rf_propagation toggle\'s broadcast_channel. + toggles = getattr(self._config.notifications, "toggles", None) or {} + rf = toggles.get("rf_propagation") if isinstance(toggles, dict) else None + if rf is None or not getattr(rf, "broadcast_channel", None): + self._logger.info( + "scheduled-broadcast: rf_propagation channel not " + "configured; dropping") + return False + + # Build a synthetic Event purely to reuse _toggle_to_rule + the + # NotificationPayload constructor. Severity \'priority\' keeps it + # out of quiet-hours suppression unless explicitly overridden. + from meshai.notifications.events import ( + make_event, + make_payload_from_event, + ) + ev = make_event( + source="band_conditions", category="rf_propagation", + severity="priority", title=text, + ) + ev.data["_meshai_precomposed"] = True + rule = self._toggle_to_rule(rf, "mesh_broadcast", ev) + try: + channel = self._channel_factory(rule, self._connector) + payload = make_payload_from_event(ev, message=text) + success = await channel.deliver(payload, rule) + except Exception: + self._logger.exception( + "scheduled-broadcast: delivery raised; treating as failed") + return False + + if success: + # Audit row -- mirrors _post_broadcast_commit for scheduled. + try: + from meshai.persistence import get_db + conn = get_db() + bytes_sent = len(text.encode("utf-8")) if text else 0 + conn.execute( + "INSERT INTO mesh_broadcasts_out(sent_at, recipient, " + "channel, text, source_event_table, source_event_pk, " + "bytes_sent, ack_received) VALUES (?,?,?,?,?,?,?,?)", + (int(time.time()), "broadcast", + rf.broadcast_channel, text, + source_event_table, str(source_event_pk), + bytes_sent, 0), + ) + except Exception: + self._logger.exception( + "scheduled-broadcast: audit row insert failed") + return bool(success) + def _post_broadcast_commit(self, event, payload, rule, ch_type: str) -> None: """Persistence side-effects of an actually-successful broadcast. diff --git a/meshai/notifications/scheduled/__init__.py b/meshai/notifications/scheduled/__init__.py new file mode 100644 index 0000000..6ee7654 --- /dev/null +++ b/meshai/notifications/scheduled/__init__.py @@ -0,0 +1,27 @@ +"""meshai.notifications.scheduled -- clock-driven (not event-driven) broadcasters. + +Current modules: + band_conditions -- 3x/day HF propagation summary (06:00 / 14:00 / + 22:00 Mountain Time by default). SWPC-local + computation + HamQSL.com fallback + silent skip. + +These broadcasters bypass the normal incident-handler / freshness-gate +path but DO honour the v0.5.8b cold-start grace. +""" +from meshai.notifications.scheduled.band_conditions import ( + BandConditionsScheduler, + compute_band_ratings, + format_band_conditions_wire, + is_day_slot, + record_slot_attempt, + slot_epoch, +) + +__all__ = [ + "BandConditionsScheduler", + "compute_band_ratings", + "format_band_conditions_wire", + "is_day_slot", + "record_slot_attempt", + "slot_epoch", +] diff --git a/meshai/notifications/scheduled/band_conditions.py b/meshai/notifications/scheduled/band_conditions.py new file mode 100644 index 0000000..456ad19 --- /dev/null +++ b/meshai/notifications/scheduled/band_conditions.py @@ -0,0 +1,425 @@ +"""v0.5.11 band-conditions scheduled broadcaster. + +Computes HF propagation ratings 3x/day (06:00, 14:00, 22:00 Mountain Time +by default) and broadcasts a multi-line summary to the mesh. Data source +priority: + + 1. Recent SWPC readings persisted in swpc_events (last 6h) -- preferred + because meshai already ingests these and the local model lets us + compute without an external HTTP call. + 2. HamQSL.com solarxml.php fallback -- canonical industry source used + by Ham Radio Toolbox + most propagation apps. + 3. Silent skip -- if both fail, no broadcast (a row is still inserted + into band_conditions_broadcasts with source='skipped_no_data' so + the accounting trail is visible). + +Distinct from event-driven adapters in two ways: + - Scheduled by the clock, not triggered by incoming envelopes. + - The universal freshness gate (v0.5.9 GAMMA) does NOT apply -- + scheduled broadcasts are intentionally periodic, not reactive. +The cold-start grace (v0.5.8b) DOES apply via the dispatcher -- if meshai +just started, the first scheduled broadcast within the grace window is +suppressed for consistency with the event-driven adapters. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import time +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Optional + +try: + from zoneinfo import ZoneInfo +except ImportError: + ZoneInfo = None # pragma: no cover (3.9+ only) + +import httpx + +logger = logging.getLogger(__name__) + +# Window for "fresh" SWPC data. If the latest swpc_kindex (or whatever we +# need) is older than this, fall through to HamQSL. +_SWPC_FRESHNESS_S = 6 * 3600 + +# Multi-line wire format -- emoji + headline per slot, then 4 band rows. +# Color codes per Matt: 🟢 Good, 🟡 Fair, 🔴 Poor. +_RATING_EMOJI = {"Good": "🟢", "Fair": "🟡", "Poor": "🔴"} + +# Slot -> (emoji, headline) tuple. Hour selection is based on the SLOT, not +# the actual fire time, so a slightly-late firing still gets the right +# headline. +_SLOT_LABEL = { + "06:00": ("☀️", "Day Propagation"), + "14:00": ("🌞", "Day Propagation"), + "22:00": ("🌙", "Night Propagation"), +} + +# Band order in the wire string. The 4 row labels match the HamQSL grouping +# convention so users coming from Ham Radio Toolbox recognise the format. +_BAND_ORDER = ["80-40m", "30-20m", "17-15m", "12-10m"] + +# HamQSL endpoint. Public, no auth. +_HAMQSL_URL = "https://www.hamqsl.com/solarxml.php" +_HAMQSL_TIMEOUT_S = 5 + + +# ======================================================================== +# Public API +# ======================================================================== + + +def is_day_slot(hh_mm: str) -> bool: + """06:00 and 14:00 broadcast 'day' band ratings; 22:00 broadcasts night.""" + return hh_mm in ("06:00", "14:00") + + +def compute_band_ratings(now: Optional[int] = None, + hh_mm: str = "14:00", + allow_hamqsl: bool = True, + _http_get: Optional[Callable] = None, + ) -> Optional[tuple[dict, str]]: + """Return ((ratings_by_band, source) tuple, or None if no data. + + Source = 'swpc_local' when computed from persisted swpc_events, or + 'hamqsl_fallback' when fetched from HamQSL.com. + """ + now = now if now is not None else int(time.time()) + + # 1. Try local SWPC. + state = _load_swpc_state(now=now) + if state is not None: + ratings = _heuristic_ratings(state, day=is_day_slot(hh_mm)) + if ratings: + return ratings, "swpc_local" + + # 2. HamQSL fallback. + if allow_hamqsl: + try: + ratings = _fetch_hamqsl(day=is_day_slot(hh_mm), _http_get=_http_get) + except Exception: + logger.exception("HamQSL.com fallback failed; silent skip") + ratings = None + if ratings: + return ratings, "hamqsl_fallback" + + return None + + +def format_band_conditions_wire(ratings: dict, hh_mm: str) -> str: + """Multi-line wire string -- headline + 4 band rows.""" + emoji, headline = _SLOT_LABEL.get(hh_mm, ("🌞", "Day Propagation")) + lines = [f"{emoji} {headline}", "📡 Band Conditions:"] + for band in _BAND_ORDER: + rating = ratings.get(band, "Poor") + rating_emoji = _RATING_EMOJI.get(rating, "🔴") + lines.append(f"{band}: {rating_emoji} {rating}") + return "\n".join(lines) + + +def slot_epoch(now_dt: datetime, hh_mm: str, tz_name: str = "America/Boise") -> int: + """Return the epoch-second timestamp of `hh_mm` on the date of `now_dt`, + interpreted in the requested timezone. Used as the dedup key for + band_conditions_broadcasts(scheduled_for). + """ + h, m = (int(x) for x in hh_mm.split(":")) + if ZoneInfo is not None: + tz = ZoneInfo(tz_name) + local = now_dt.astimezone(tz) + slot = local.replace(hour=h, minute=m, second=0, microsecond=0) + return int(slot.timestamp()) + # Fallback: assume UTC (tests can monkeypatch). + slot = now_dt.replace(hour=h, minute=m, second=0, microsecond=0) + return int(slot.timestamp()) + + +# ======================================================================== +# Internal: SWPC reading -> ratings +# ======================================================================== + + +def _load_swpc_state(now: int) -> Optional[dict]: + """Pull the latest Kp + SFI from swpc_events. Returns None when readings + are missing or older than _SWPC_FRESHNESS_S.""" + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + return None + + cutoff = now - _SWPC_FRESHNESS_S + + state = {} + + # Latest Kp from swpc_kindex. + row = conn.execute( + "SELECT payload_json, occurred_at FROM swpc_events " + "WHERE event_type='swpc_kindex' AND occurred_at >= ? " + "ORDER BY occurred_at DESC LIMIT 1", + (cutoff,)).fetchone() + if row and row["payload_json"]: + try: + payload = json.loads(row["payload_json"]) + kp = payload.get("kp_index") or payload.get("kp") or payload.get("value") + if isinstance(kp, (int, float)): + state["kp"] = float(kp) + except Exception: pass + + # Latest SFI from swpc_alerts. Look for F10.7 in the payload. + row = conn.execute( + "SELECT payload_json, occurred_at FROM swpc_events " + "WHERE event_type='swpc_alerts' AND occurred_at >= ? " + " AND (payload_json LIKE '%F10.7%' OR payload_json LIKE '%solar_flux%' " + " OR payload_json LIKE '%SFI%' OR payload_json LIKE '%sfi%') " + "ORDER BY occurred_at DESC LIMIT 1", + (cutoff,)).fetchone() + if row and row["payload_json"]: + try: + payload = json.loads(row["payload_json"]) + for k in ("F10.7", "f10_7", "f107", "solar_flux", "sfi", "SFI"): + v = payload.get(k) + if isinstance(v, (int, float)): + state["sfi"] = float(v); break + except Exception: pass + + # Need at least Kp to do anything useful locally. If SFI is missing, + # caller still falls back; we return None to signal "incomplete". + if "kp" not in state or "sfi" not in state: + return None + return state + + +def _heuristic_ratings(state: dict, day: bool) -> dict: + """Per Matt's spec: per-band rating from Kp + SFI.""" + kp = state.get("kp", 9.0) + sfi = state.get("sfi", 50.0) + + out = {} + + # 80-40m: night band primarily. + if not day: + if kp < 4 and sfi > 70: out["80-40m"] = "Good" + elif kp < 5: out["80-40m"] = "Fair" + else: out["80-40m"] = "Poor" + else: + # Daytime 80-40m is usually Fair to Poor (absorption). + if kp < 4: out["80-40m"] = "Fair" + else: out["80-40m"] = "Poor" + + # 30-20m: day-strong band; tolerates higher Kp than upper bands. + if day: + if sfi > 120 and kp < 4: out["30-20m"] = "Good" + elif 80 <= sfi <= 120 and kp < 6: out["30-20m"] = "Fair" + elif sfi < 80 or kp >= 6: out["30-20m"] = "Poor" + else: out["30-20m"] = "Fair" + else: + # Night 20m can be Good if SFI high + Kp low (gray-line). + if sfi > 110 and kp < 4: out["30-20m"] = "Good" + elif kp < 5: out["30-20m"] = "Fair" + else: out["30-20m"] = "Poor" + + # 17-15m: day-only band most of the time. + if day: + if sfi > 120: out["17-15m"] = "Good" + elif 90 <= sfi <= 120 and kp < 5: out["17-15m"] = "Fair" + else: out["17-15m"] = "Poor" + else: + # Night upper bands typically Poor unless aurora/Es event. + out["17-15m"] = "Poor" + + # 12-10m: day-only solar-max band. + if day: + if sfi > 140: out["12-10m"] = "Good" + elif 110 <= sfi <= 140: out["12-10m"] = "Fair" + else: out["12-10m"] = "Poor" + else: + out["12-10m"] = "Poor" + + return out + + +# ======================================================================== +# Internal: HamQSL.com fallback +# ======================================================================== + + +def _fetch_hamqsl(day: bool, _http_get: Optional[Callable] = None) -> Optional[dict]: + """Pull HamQSL solarxml.php and parse calculatedconditions.""" + if _http_get is None: + def _http_get(url, timeout): + with httpx.Client(timeout=timeout) as c: + return c.get(url) + try: + resp = _http_get(_HAMQSL_URL, _HAMQSL_TIMEOUT_S) + except Exception: + return None + if getattr(resp, "status_code", 0) != 200: + return None + + try: + root = ET.fromstring(resp.text) + except ET.ParseError: + return None + + cond = root.find(".//calculatedconditions") + if cond is None: return None + + want_time = "day" if day else "night" + out = {} + # HamQSL uses names like "80m-40m", "30m-20m", "17m-15m", "12m-10m". + for band in cond.findall("band"): + name = (band.get("name") or "").lower().replace("m", "") + # Normalise to our 4-row labels. + key = None + if "80" in name and "40" in name: key = "80-40m" + elif "30" in name and "20" in name: key = "30-20m" + elif "17" in name and "15" in name: key = "17-15m" + elif "12" in name and "10" in name: key = "12-10m" + if not key: continue + t = (band.get("time") or "").lower() + if t != want_time: continue + text = (band.text or "").strip() + if text not in _RATING_EMOJI: continue + out[key] = text + + return out if out else None + + +# ======================================================================== +# Persistence helpers (band_conditions_broadcasts rows) +# ======================================================================== + + +def record_slot_attempt(slot_epoch_s: int, *, source: str, + ratings: Optional[dict] = None, + sent_at: Optional[int] = None) -> Optional[int]: + """Insert (or ignore) a row in band_conditions_broadcasts. Returns the + broadcast_id on success, None when the UNIQUE(scheduled_for) constraint + blocks a duplicate firing (the slot already ran).""" + try: + from meshai.persistence import get_db + conn = get_db() + except Exception: + return None + rj = json.dumps(ratings) if ratings else None + cur = conn.execute( + "INSERT OR IGNORE INTO band_conditions_broadcasts(sent_at, " + "scheduled_for, ratings_json, source) VALUES (?,?,?,?)", + (sent_at, slot_epoch_s, rj, source), + ) + return int(cur.lastrowid) if cur.rowcount > 0 else None + + +# ======================================================================== +# Scheduler -- async loop firing once per slot +# ======================================================================== + + +class BandConditionsScheduler: + """Fires band-conditions broadcasts at configured local times.""" + + def __init__(self, config, dispatcher, *, + clock: Optional[Callable[[], float]] = None, + sleep: Optional[Callable[[float], Any]] = None, + tz_name: Optional[str] = None): + self._config = config + self._dispatcher = dispatcher + self._clock = clock or time.time + self._sleep = sleep or asyncio.sleep + self._task: Optional[asyncio.Task] = None + self._stop_event: Optional[asyncio.Event] = None + self._tz_name = tz_name or getattr( + config.notifications, "band_conditions_tz", "America/Boise") + self._logger = logging.getLogger("meshai.scheduled.band_conditions") + + def _enabled(self) -> bool: + return bool(getattr(self._config.notifications, + "band_conditions_enabled", True)) + + def _schedule(self) -> list: + sched = getattr(self._config.notifications, + "band_conditions_schedule", + ["06:00", "14:00", "22:00"]) + return [s for s in sched if isinstance(s, str) and ":" in s] + + async def start(self) -> None: + if self._task is not None and not self._task.done(): + raise RuntimeError("BandConditionsScheduler already running") + self._stop_event = asyncio.Event() + self._task = asyncio.create_task(self._run(), + name="band-conditions-scheduler") + self._logger.info( + "Band conditions scheduler started: enabled=%s schedule=%s tz=%s", + self._enabled(), self._schedule(), self._tz_name) + + async def stop(self) -> None: + if self._stop_event: self._stop_event.set() + if self._task: await self._task + + async def _run(self) -> None: + while not (self._stop_event and self._stop_event.is_set()): + if not self._enabled(): + await self._sleep(60); continue + now = self._clock() + now_dt = datetime.fromtimestamp(now, tz=timezone.utc) + target_epoch, target_hh_mm = self._next_slot(now_dt) + wait_s = max(1, target_epoch - int(now)) + try: await self._sleep(min(wait_s, 3600)) + except asyncio.CancelledError: break + now2 = int(self._clock()) + if now2 >= target_epoch: + await self.fire_slot(target_epoch, target_hh_mm) + + def _next_slot(self, now_dt: datetime) -> tuple[int, str]: + """Return the (epoch, hh_mm) of the next future slot. Searches today's + slots first; if all past, picks tomorrow's first slot.""" + schedule = sorted(set(self._schedule())) + if not schedule: + # Fall back to noon tomorrow if config is broken. + tomorrow = now_dt + timedelta(days=1) + return slot_epoch(tomorrow, "12:00", self._tz_name), "12:00" + today_now = int(now_dt.timestamp()) + for hh_mm in schedule: + ep = slot_epoch(now_dt, hh_mm, self._tz_name) + if ep > today_now: return ep, hh_mm + # All today's slots are past; pick tomorrow's first. + tomorrow = now_dt + timedelta(days=1) + return slot_epoch(tomorrow, schedule[0], self._tz_name), schedule[0] + + async def fire_slot(self, slot_epoch_s: int, hh_mm: str) -> bool: + """Compute + broadcast for this slot. Returns True on broadcast, + False on suppression (grace), None on silent skip (no data).""" + result = compute_band_ratings(now=int(self._clock()), hh_mm=hh_mm) + if result is None: + record_slot_attempt(slot_epoch_s, source="skipped_no_data") + self._logger.info( + "band-conditions: silent skip for %s (no SWPC + HamQSL down)", + hh_mm) + return False + ratings, source = result + + # Pre-insert the slot row (UNIQUE constraint handles dedup). + bcast_id = record_slot_attempt(slot_epoch_s, source=source, + ratings=ratings, + sent_at=int(self._clock())) + if bcast_id is None: + # Slot already broadcast (UNIQUE constraint blocked the INSERT). + self._logger.info( + "band-conditions: slot %s already broadcast; skipping dup", + hh_mm) + return False + + wire = format_band_conditions_wire(ratings, hh_mm) + try: + success = await self._dispatcher.dispatch_scheduled_broadcast( + text=wire, + source_event_table="band_conditions_broadcasts", + source_event_pk=str(bcast_id), + ) + except Exception: + self._logger.exception( + "band-conditions: dispatcher raised; row stays in table") + success = False + return bool(success) diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index c68f688..dead8dc 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 2 +SCHEMA_VERSION = 3 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v3.sql b/meshai/persistence/migrations/v3.sql new file mode 100644 index 0000000..ed3e988 --- /dev/null +++ b/meshai/persistence/migrations/v3.sql @@ -0,0 +1,25 @@ +-- v0.5.11 schema migration: band_conditions_broadcasts. +-- +-- One row per band-conditions broadcast attempt (3x/day). Inserted +-- whether or not the broadcast actually went out -- source='skipped_no_data' +-- accounts for the cases where neither local SWPC data nor HamQSL.com +-- fallback yielded usable ratings, so the scheduler skipped silently. +-- +-- UNIQUE(scheduled_for) enforces per-slot dedup: if the scheduler fires +-- multiple times for the same target hour (e.g. retry loop, clock drift), +-- only the first INSERT wins. Caller checks the constraint via +-- INSERT OR IGNORE so a duplicate firing is a clean no-op. + +CREATE TABLE IF NOT EXISTS band_conditions_broadcasts ( + broadcast_id INTEGER PRIMARY KEY AUTOINCREMENT, + sent_at INTEGER, + scheduled_for INTEGER NOT NULL, + ratings_json TEXT, + source TEXT NOT NULL, + UNIQUE(scheduled_for) +); + +CREATE INDEX IF NOT EXISTS idx_band_conditions_sent + ON band_conditions_broadcasts(sent_at); +CREATE INDEX IF NOT EXISTS idx_band_conditions_scheduled + ON band_conditions_broadcasts(scheduled_for); diff --git a/tests/test_band_conditions.py b/tests/test_band_conditions.py new file mode 100644 index 0000000..7b5ea39 --- /dev/null +++ b/tests/test_band_conditions.py @@ -0,0 +1,312 @@ +"""Tests for v0.5.11 band-conditions scheduled broadcaster.""" +import asyncio +import json +import time +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest + +from meshai.notifications.scheduled.band_conditions import ( + BandConditionsScheduler, + _heuristic_ratings, + compute_band_ratings, + format_band_conditions_wire, + is_day_slot, + record_slot_attempt, + slot_epoch, +) +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "band-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +def _insert_swpc_kindex(conn, *, kp, when=None): + when = when or int(time.time()) + conn.execute( + "INSERT INTO swpc_events(event_id, event_type, payload_json, " + "occurred_at, first_seen_at, last_broadcast_at) VALUES (?,?,?,?,?,?)", + (f"kp_{when}", "swpc_kindex", + json.dumps({"kp_index": kp, "time": "2026-06-05T15:00:00Z"}), + when, when, None), + ) + + +def _insert_swpc_alert(conn, *, sfi, when=None): + when = when or int(time.time()) + conn.execute( + "INSERT INTO swpc_events(event_id, event_type, payload_json, " + "occurred_at, first_seen_at, last_broadcast_at) VALUES (?,?,?,?,?,?)", + (f"sfi_{when}", "swpc_alerts", + json.dumps({"F10.7": sfi, "product_id": "F10.7-Daily"}), + when, when, None), + ) + + +# ---- (a) compute with fresh SWPC quiet conditions ---------------------- + + +def test_compute_quiet_conditions_all_good_to_fair(mem_db): + """Kp=2 SFI=140 -> all bands Good/Fair on day; 80-40m Good at night.""" + now = int(time.time()) + _insert_swpc_kindex(mem_db, kp=2.0, when=now - 600) + _insert_swpc_alert(mem_db, sfi=140.0, when=now - 1200) + + result = compute_band_ratings(now=now, hh_mm="14:00", allow_hamqsl=False) + assert result is not None + ratings, source = result + assert source == "swpc_local" + # High SFI + low Kp on day -> 30-20m and 17-15m and 12-10m all Good. + assert ratings["30-20m"] == "Good" + assert ratings["17-15m"] == "Good" + assert ratings["12-10m"] == "Fair" # SFI 140 right at the edge of Fair/Good + # 80-40m on day is usually Fair at best. + assert ratings["80-40m"] in ("Fair", "Good") + + +# ---- (b) compute with storm conditions --------------------------------- + + +def test_compute_storm_conditions_mostly_poor(mem_db): + """Kp=7 (G3) crushes ratings to Poor on upper bands.""" + now = int(time.time()) + _insert_swpc_kindex(mem_db, kp=7.0, when=now - 600) + _insert_swpc_alert(mem_db, sfi=90.0, when=now - 1200) + + result = compute_band_ratings(now=now, hh_mm="14:00", allow_hamqsl=False) + assert result is not None + ratings, _ = result + assert ratings["17-15m"] == "Poor" + assert ratings["12-10m"] == "Poor" + assert ratings["30-20m"] == "Poor" + assert ratings["80-40m"] == "Poor" + + +# ---- (c) HamQSL fallback when SWPC missing ----------------------------- + + +def test_compute_falls_back_to_hamqsl_when_no_swpc(mem_db): + """No SWPC rows -> HamQSL fallback. Mock the HTTP call.""" + XML = ''' + + + Fair + Good + Good + Fair + Good + Poor + Fair + Poor + + +''' + class Resp: + status_code = 200 + text = XML + def mock_get(url, timeout): + return Resp() + + result = compute_band_ratings(now=int(time.time()), hh_mm="14:00", + _http_get=mock_get) + assert result is not None + ratings, source = result + assert source == "hamqsl_fallback" + assert ratings["80-40m"] == "Fair" + assert ratings["12-10m"] == "Fair" + + +# ---- (d) both fail -> None silent skip --------------------------------- + + +def test_compute_returns_none_when_both_sources_fail(mem_db): + def mock_get(url, timeout): + raise httpx_TimeoutError("simulated") + class httpx_TimeoutError(Exception): pass + result = compute_band_ratings(now=int(time.time()), hh_mm="14:00", + _http_get=lambda u,t: (_ for _ in ()).throw(Exception("timeout"))) + assert result is None + + +# ---- (e) time-of-day selection ----------------------------------------- + + +@pytest.mark.parametrize("hh_mm, expected_day", [ + ("06:00", True), + ("14:00", True), + ("22:00", False), + ("23:30", False), +]) +def test_is_day_slot(hh_mm, expected_day): + assert is_day_slot(hh_mm) is expected_day + + +# ---- (f) wire format --------------------------------------------------- + + +def test_wire_format_day_slot(): + ratings = {"80-40m": "Fair", "30-20m": "Good", + "17-15m": "Fair", "12-10m": "Poor"} + wire = format_band_conditions_wire(ratings, "14:00") + assert wire.startswith("🌞 Day Propagation") + assert "📡 Band Conditions:" in wire + assert "80-40m: 🟡 Fair" in wire + assert "30-20m: 🟢 Good" in wire + assert "17-15m: 🟡 Fair" in wire + assert "12-10m: 🔴 Poor" in wire + + +def test_wire_format_night_slot(): + ratings = {"80-40m": "Good", "30-20m": "Fair", + "17-15m": "Poor", "12-10m": "Poor"} + wire = format_band_conditions_wire(ratings, "22:00") + assert wire.startswith("🌙 Night Propagation") + assert "80-40m: 🟢 Good" in wire + + +def test_wire_format_morning_slot(): + ratings = {"80-40m": "Fair", "30-20m": "Good", + "17-15m": "Fair", "12-10m": "Poor"} + wire = format_band_conditions_wire(ratings, "06:00") + assert wire.startswith("☀️ Day Propagation") + + +# ---- (g) byte size under target ---------------------------------------- + + +def test_wire_byte_size_under_target(): + ratings = {"80-40m": "Good", "30-20m": "Good", + "17-15m": "Good", "12-10m": "Good"} + wire = format_band_conditions_wire(ratings, "14:00") + nb = len(wire.encode("utf-8")) + assert nb < 130, f"wire {nb}B exceeds soft target -- {wire!r}" + + +def test_wire_has_4_band_rows_and_2_header_lines(): + ratings = {"80-40m": "Good", "30-20m": "Fair", + "17-15m": "Fair", "12-10m": "Poor"} + wire = format_band_conditions_wire(ratings, "06:00") + lines = wire.split("\n") + assert len(lines) == 6 # emoji headline + 📡 header + 4 band rows + + +# ---- (h) cold-start grace + dedup (i) via scheduler.fire_slot ---------- + + +class _MockDispatcher: + def __init__(self): + self.calls = [] + + async def dispatch_scheduled_broadcast(self, *, text, source_event_table, + source_event_pk): + self.calls.append((text, source_event_table, source_event_pk)) + return True + + +def _build_cfg(): + from meshai.config import Config + cfg = Config() + cfg.notifications.rules = [] + cfg.notifications.cold_start_grace_seconds = 0 + cfg.notifications.band_conditions_enabled = True + cfg.notifications.band_conditions_schedule = ["06:00", "14:00", "22:00"] + rf = cfg.notifications.toggles.get("rf_propagation") + if rf is not None: + rf.enabled = True + rf.broadcast_channel = 1 + return cfg + + +def test_fire_slot_records_broadcast_row(mem_db): + now = int(time.time()) + _insert_swpc_kindex(mem_db, kp=2.0, when=now - 600) + _insert_swpc_alert(mem_db, sfi=140.0, when=now - 1200) + sched = BandConditionsScheduler(_build_cfg(), _MockDispatcher()) + slot = now + 60 # arbitrary epoch + asyncio.run(sched.fire_slot(slot, "14:00")) + row = mem_db.execute( + "SELECT source, ratings_json FROM band_conditions_broadcasts " + "WHERE scheduled_for=?", (slot,)).fetchone() + assert row is not None + assert row["source"] == "swpc_local" + assert json.loads(row["ratings_json"])["30-20m"] == "Good" + + +def test_fire_slot_dedup_via_unique_constraint(mem_db): + """Same scheduled_for fired twice -> only one row, only one broadcast.""" + now = int(time.time()) + _insert_swpc_kindex(mem_db, kp=2.0, when=now - 600) + _insert_swpc_alert(mem_db, sfi=140.0, when=now - 1200) + disp = _MockDispatcher() + sched = BandConditionsScheduler(_build_cfg(), disp) + slot = now + 60 + asyncio.run(sched.fire_slot(slot, "14:00")) + asyncio.run(sched.fire_slot(slot, "14:00")) # dup firing + + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM band_conditions_broadcasts " + "WHERE scheduled_for=?", (slot,)).fetchone()["n"] + assert n_rows == 1 + assert len(disp.calls) == 1 # second firing aborted before dispatch + + +def test_fire_slot_silent_skip_when_no_data(mem_db): + """No SWPC + (mocked) HamQSL failure -> source='skipped_no_data'.""" + # Patch the HamQSL fetcher inside the module to simulate the HTTP error. + from meshai.notifications.scheduled import band_conditions as bc_mod + original = bc_mod._fetch_hamqsl + bc_mod._fetch_hamqsl = lambda day, _http_get=None: None + try: + sched = BandConditionsScheduler(_build_cfg(), _MockDispatcher()) + slot = int(time.time()) + 60 + asyncio.run(sched.fire_slot(slot, "14:00")) + row = mem_db.execute( + "SELECT source FROM band_conditions_broadcasts " + "WHERE scheduled_for=?", (slot,)).fetchone() + assert row is not None + assert row["source"] == "skipped_no_data" + finally: + bc_mod._fetch_hamqsl = original + + +# ---- record_slot_attempt direct unit test ------------------------------ + + +def test_record_slot_attempt_returns_id_first_then_none(mem_db): + """First insert returns a row id; duplicate returns None.""" + bid1 = record_slot_attempt(1_000_000, source="swpc_local", + ratings={"80-40m": "Good"}) + assert isinstance(bid1, int) + bid2 = record_slot_attempt(1_000_000, source="swpc_local", + ratings={"80-40m": "Good"}) + assert bid2 is None + + +# ---- slot_epoch alignment ---------------------------------------------- + + +def test_slot_epoch_returns_local_time_alignment(): + """slot_epoch('06:00', America/Boise) should be the same wall-clock + hour regardless of UTC offset (DST handled automatically).""" + now_dt = datetime(2026, 6, 5, 12, 0, tzinfo=timezone.utc) + ep = slot_epoch(now_dt, "06:00", "America/Boise") + # Mountain Daylight Time in June -> UTC-6 -> 06:00 MDT == 12:00 UTC. + dt = datetime.fromtimestamp(ep, tz=timezone.utc) + assert dt.hour == 12 and dt.minute == 0 # 06:00 MDT in June + + # Winter date -> UTC-7 -> 06:00 MST == 13:00 UTC. + winter_dt = datetime(2026, 1, 15, 12, 0, tzinfo=timezone.utc) + ep = slot_epoch(winter_dt, "06:00", "America/Boise") + dt = datetime.fromtimestamp(ep, tz=timezone.utc) + assert dt.hour == 13 and dt.minute == 0 # 06:00 MST in January