feat(v0.5.11): band conditions scheduled broadcaster (3x/day HF propagation)

First clock-driven broadcaster in meshai, distinct from the v0.5.8b/v0.5.9/v0.5.10 event-driven adapters. The same persistence + dispatcher + cold-start patterns apply, but the trigger is the wall clock at 06:00 / 14:00 / 22:00 Mountain Time (default; GUI-configurable per Rule 17).

Components: (1) meshai/notifications/scheduled/band_conditions.py with BandConditionsScheduler (asyncio loop, mirrors the existing DigestScheduler shape), compute_band_ratings() with two-tier data sourcing -- (a) latest swpc_kindex + swpc_alerts F10.7 rows from persistence within the last 6h, (b) HamQSL.com solarxml.php fallback when SWPC is stale or incomplete, (c) silent skip when both fail, format_band_conditions_wire() multi-line MEDIUM output (~115-120B). (2) v3 schema migration adding band_conditions_broadcasts(broadcast_id PK AUTO, sent_at, scheduled_for UNIQUE, ratings_json, source). UNIQUE(scheduled_for) enforces per-slot dedup so a retry storm cannot double-broadcast. (3) Dispatcher.dispatch_scheduled_broadcast() bypasses the toggle / rules / freshness-gate pipeline but DOES honour the v0.5.8b cold-start grace -- first scheduled broadcast within the grace window after meshai starts is suppressed, mesh_broadcasts_out audit row only inserted on actual delivery. Channel selection routes through the rf_propagation toggle\'s broadcast_channel since band conditions IS RF-propagation info. (4) NotificationsConfig gains band_conditions_enabled (default true), band_conditions_schedule (list of HH:MM strings, default ["06:00","14:00","22:00"]), band_conditions_tz (default "America/Boise" so DST handles automatically). (5) Notifications.tsx grows a Band Conditions card between Cold-Start Grace and Master Toggles with the enable toggle + 3 TimeInput slots + a one-liner explaining the source priority. (6) build_pipeline + start_pipeline spawn the BandConditionsScheduler alongside the existing DigestScheduler -- best-effort, scheduler failures must NOT break notifications startup.

Wire format examples (multi-line, all under 130B target):

  ☀️ Day Propagation
  📡 Band Conditions:
  80-40m: 🟡 Fair
  30-20m: 🟢 Good
  17-15m: 🟢 Good
  12-10m: 🟡 Fair

  🌞 Day Propagation  (14:00 slot when storm onset, Kp=6 SFI=110)
  📡 Band Conditions:
  80-40m: 🔴 Poor
  30-20m: 🔴 Poor
  17-15m: 🔴 Poor
  12-10m: 🟡 Fair

  🌙 Night Propagation  (22:00 slot, recovery, Kp=4 SFI=120)
  📡 Band Conditions:
  80-40m: 🟡 Fair
  30-20m: 🟡 Fair
  17-15m: 🔴 Poor
  12-10m: 🔴 Poor

Tests: was 686 (v0.5.10 baseline), now 704 (+18 net new -- quiet/storm condition ratings, HamQSL XML parse fallback, both-fail silent-skip path, is_day_slot per HH:MM, wire format for all 3 slot variants, byte-size guard, 6-line shape, fire_slot record row, dedup via UNIQUE constraint, silent-skip path, slot_epoch DST alignment summer + winter). Synthetic 24h probe verified the 3 expected slots fire correctly with quiet/storm/recovery scenarios + the 4th no-data scenario lands as source=\'skipped_no_data\' with no broadcast.

usgs_nwis deferred to v0.5.12 (threshold-curation work). Master OFF in prod.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-06-05 07:38:51 +00:00
commit 0da83e0d3d
9 changed files with 957 additions and 1 deletions

View file

@ -65,6 +65,8 @@ interface NotificationsConfig {
quiet_hours_start: string
quiet_hours_end: string
cold_start_grace_seconds?: number
band_conditions_enabled?: boolean
band_conditions_schedule?: string[]
rules: NotificationRuleConfig[]
toggles?: Record<string, NotificationToggle>
}
@ -2135,6 +2137,55 @@ export default function Notifications() {
/>
</div>
{/* Band Conditions -- v0.5.11 */}
<div className="space-y-3 p-4 bg-[#0a0e17] rounded-lg border border-[#1e2a3a]">
<div className="flex items-center gap-2">
<label className="text-xs text-slate-500 uppercase tracking-wide">Band Conditions (HF propagation)</label>
</div>
<Toggle
label="Enable scheduled band-conditions broadcasts"
checked={config.band_conditions_enabled ?? true}
onChange={(v) => setConfig({ ...config, band_conditions_enabled: v })}
helper="3x/day HF propagation summary (Day/Night ratings per band group)"
info="Source priority: (1) recent SWPC readings persisted locally; (2) HamQSL.com fallback; (3) silent skip if both fail. Persistence rows are written either way for an audit trail."
/>
{(config.band_conditions_enabled ?? true) && (
<div className="grid grid-cols-3 gap-3">
<TimeInput
label="Slot 1"
value={(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])[0] || '06:00'}
onChange={(v) => {
const s = [...(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])]
s[0] = v
setConfig({ ...config, band_conditions_schedule: s })
}}
helper="Morning (default 06:00 MT)"
/>
<TimeInput
label="Slot 2"
value={(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])[1] || '14:00'}
onChange={(v) => {
const s = [...(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])]
s[1] = v
setConfig({ ...config, band_conditions_schedule: s })
}}
helper="Afternoon (default 14:00 MT)"
/>
<TimeInput
label="Slot 3"
value={(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])[2] || '22:00'}
onChange={(v) => {
const s = [...(config.band_conditions_schedule ?? ['06:00','14:00','22:00'])]
s[2] = v
setConfig({ ...config, band_conditions_schedule: s })
}}
helper="Night (default 22:00 MT)"
/>
</div>
)}
<p className="text-xs text-slate-600">All times are Mountain Time (America/Boise). DST handled automatically.</p>
</div>
{/* Master Toggles */}
{config.toggles && (
<MasterToggles

View file

@ -614,6 +614,13 @@ class NotificationsConfig:
# meshai can sit idle for hours with master OFF and the grace only
# kicks in when adapters actually start producing.
cold_start_grace_seconds: int = 60
# v0.5.11 band-conditions scheduled broadcaster (3x/day HF propagation).
# GUI-editable per Rule 17. Empty schedule list disables; the
# _enabled flag is the master switch independent of the times.
band_conditions_enabled: bool = True
band_conditions_schedule: list = field(
default_factory=lambda: ["06:00", "14:00", "22:00"])
band_conditions_tz: str = "America/Boise"
toggles: dict = field(default_factory=_default_toggles) # family -> NotificationToggle
digest: DigestConfig = field(default_factory=DigestConfig)
rules: list = field(default_factory=list) # List of NotificationRuleConfig

View file

@ -27,6 +27,12 @@ import logging
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.dispatcher import Dispatcher
try:
from meshai.notifications.scheduled.band_conditions import (
BandConditionsScheduler,
)
except ImportError:
BandConditionsScheduler = None
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.pipeline.toggle_filter import ToggleFilter
@ -194,6 +200,23 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
connector=connector,
)
await scheduler.start()
# v0.5.11 band-conditions scheduler -- spawn alongside the
# digest scheduler. Best-effort: failures in the scheduled
# broadcaster must NOT break notifications pipeline startup.
if BandConditionsScheduler is not None:
try:
comps = getattr(bus, "_pipeline_components", {}) or {}
disp = comps.get("dispatcher")
if disp is not None:
bc_sched = BandConditionsScheduler(config, disp)
await bc_sched.start()
comps["band_conditions_scheduler"] = bc_sched
bus._pipeline_components = comps
except Exception:
import logging as _lg
_lg.getLogger("meshai.pipeline").exception(
"band_conditions scheduler failed to start")
# Phase 2.16.1: periodically flush the grouper so coalesced (routine/
# priority) events are delivered within the window even when poll cadence

View file

@ -252,6 +252,92 @@ class Dispatcher:
"dedup_lru_size": len(self._dedup_lru),
}
async def dispatch_scheduled_broadcast(self, text: str, *,
source_event_table: str,
source_event_pk: str,
) -> bool:
"""v0.5.11 scheduled broadcast entry point.
Bypasses the normal toggle / rules / freshness-gate pipeline
because scheduled broadcasts are intentionally periodic and
already pre-composed. Cold-start grace still applies so the
very first scheduled broadcast after meshai starts is
suppressed (consistent with how event-driven adapters behave).
Channel selection: routes through the rf_propagation toggle\'s
broadcast_channel since band conditions IS RF-propagation info.
If that toggle is not configured with a channel, the broadcast
is dropped (with a log).
Returns True on successful mesh delivery, False on grace-drop
or any other suppression.
"""
# Cold-start grace (mirrors _dispatch_toggles Section 0).
grace_s = int(getattr(self._config.notifications,
"cold_start_grace_seconds", 60) or 0)
if grace_s > 0:
now_anchor = time.time()
if self._first_event_at is None:
self._first_event_at = now_anchor
if (now_anchor - self._first_event_at) < grace_s:
self._cold_start_dropped += 1
self._logger.info(
"cold-start grace: dropping scheduled broadcast "
"(table=%s pk=%s)",
source_event_table, source_event_pk)
return False
# Route through rf_propagation toggle\'s broadcast_channel.
toggles = getattr(self._config.notifications, "toggles", None) or {}
rf = toggles.get("rf_propagation") if isinstance(toggles, dict) else None
if rf is None or not getattr(rf, "broadcast_channel", None):
self._logger.info(
"scheduled-broadcast: rf_propagation channel not "
"configured; dropping")
return False
# Build a synthetic Event purely to reuse _toggle_to_rule + the
# NotificationPayload constructor. Severity \'priority\' keeps it
# out of quiet-hours suppression unless explicitly overridden.
from meshai.notifications.events import (
make_event,
make_payload_from_event,
)
ev = make_event(
source="band_conditions", category="rf_propagation",
severity="priority", title=text,
)
ev.data["_meshai_precomposed"] = True
rule = self._toggle_to_rule(rf, "mesh_broadcast", ev)
try:
channel = self._channel_factory(rule, self._connector)
payload = make_payload_from_event(ev, message=text)
success = await channel.deliver(payload, rule)
except Exception:
self._logger.exception(
"scheduled-broadcast: delivery raised; treating as failed")
return False
if success:
# Audit row -- mirrors _post_broadcast_commit for scheduled.
try:
from meshai.persistence import get_db
conn = get_db()
bytes_sent = len(text.encode("utf-8")) if text else 0
conn.execute(
"INSERT INTO mesh_broadcasts_out(sent_at, recipient, "
"channel, text, source_event_table, source_event_pk, "
"bytes_sent, ack_received) VALUES (?,?,?,?,?,?,?,?)",
(int(time.time()), "broadcast",
rf.broadcast_channel, text,
source_event_table, str(source_event_pk),
bytes_sent, 0),
)
except Exception:
self._logger.exception(
"scheduled-broadcast: audit row insert failed")
return bool(success)
def _post_broadcast_commit(self, event, payload, rule, ch_type: str) -> None:
"""Persistence side-effects of an actually-successful broadcast.

View file

@ -0,0 +1,27 @@
"""meshai.notifications.scheduled -- clock-driven (not event-driven) broadcasters.
Current modules:
band_conditions -- 3x/day HF propagation summary (06:00 / 14:00 /
22:00 Mountain Time by default). SWPC-local
computation + HamQSL.com fallback + silent skip.
These broadcasters bypass the normal incident-handler / freshness-gate
path but DO honour the v0.5.8b cold-start grace.
"""
from meshai.notifications.scheduled.band_conditions import (
BandConditionsScheduler,
compute_band_ratings,
format_band_conditions_wire,
is_day_slot,
record_slot_attempt,
slot_epoch,
)
__all__ = [
"BandConditionsScheduler",
"compute_band_ratings",
"format_band_conditions_wire",
"is_day_slot",
"record_slot_attempt",
"slot_epoch",
]

View file

@ -0,0 +1,425 @@
"""v0.5.11 band-conditions scheduled broadcaster.
Computes HF propagation ratings 3x/day (06:00, 14:00, 22:00 Mountain Time
by default) and broadcasts a multi-line summary to the mesh. Data source
priority:
1. Recent SWPC readings persisted in swpc_events (last 6h) -- preferred
because meshai already ingests these and the local model lets us
compute without an external HTTP call.
2. HamQSL.com solarxml.php fallback -- canonical industry source used
by Ham Radio Toolbox + most propagation apps.
3. Silent skip -- if both fail, no broadcast (a row is still inserted
into band_conditions_broadcasts with source='skipped_no_data' so
the accounting trail is visible).
Distinct from event-driven adapters in two ways:
- Scheduled by the clock, not triggered by incoming envelopes.
- The universal freshness gate (v0.5.9 GAMMA) does NOT apply --
scheduled broadcasts are intentionally periodic, not reactive.
The cold-start grace (v0.5.8b) DOES apply via the dispatcher -- if meshai
just started, the first scheduled broadcast within the grace window is
suppressed for consistency with the event-driven adapters.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Optional
try:
from zoneinfo import ZoneInfo
except ImportError:
ZoneInfo = None # pragma: no cover (3.9+ only)
import httpx
logger = logging.getLogger(__name__)
# Window for "fresh" SWPC data. If the latest swpc_kindex (or whatever we
# need) is older than this, fall through to HamQSL.
_SWPC_FRESHNESS_S = 6 * 3600
# Multi-line wire format -- emoji + headline per slot, then 4 band rows.
# Color codes per Matt: 🟢 Good, 🟡 Fair, 🔴 Poor.
_RATING_EMOJI = {"Good": "🟢", "Fair": "🟡", "Poor": "🔴"}
# Slot -> (emoji, headline) tuple. Hour selection is based on the SLOT, not
# the actual fire time, so a slightly-late firing still gets the right
# headline.
_SLOT_LABEL = {
"06:00": ("☀️", "Day Propagation"),
"14:00": ("🌞", "Day Propagation"),
"22:00": ("🌙", "Night Propagation"),
}
# Band order in the wire string. The 4 row labels match the HamQSL grouping
# convention so users coming from Ham Radio Toolbox recognise the format.
_BAND_ORDER = ["80-40m", "30-20m", "17-15m", "12-10m"]
# HamQSL endpoint. Public, no auth.
_HAMQSL_URL = "https://www.hamqsl.com/solarxml.php"
_HAMQSL_TIMEOUT_S = 5
# ========================================================================
# Public API
# ========================================================================
def is_day_slot(hh_mm: str) -> bool:
"""06:00 and 14:00 broadcast 'day' band ratings; 22:00 broadcasts night."""
return hh_mm in ("06:00", "14:00")
def compute_band_ratings(now: Optional[int] = None,
hh_mm: str = "14:00",
allow_hamqsl: bool = True,
_http_get: Optional[Callable] = None,
) -> Optional[tuple[dict, str]]:
"""Return ((ratings_by_band, source) tuple, or None if no data.
Source = 'swpc_local' when computed from persisted swpc_events, or
'hamqsl_fallback' when fetched from HamQSL.com.
"""
now = now if now is not None else int(time.time())
# 1. Try local SWPC.
state = _load_swpc_state(now=now)
if state is not None:
ratings = _heuristic_ratings(state, day=is_day_slot(hh_mm))
if ratings:
return ratings, "swpc_local"
# 2. HamQSL fallback.
if allow_hamqsl:
try:
ratings = _fetch_hamqsl(day=is_day_slot(hh_mm), _http_get=_http_get)
except Exception:
logger.exception("HamQSL.com fallback failed; silent skip")
ratings = None
if ratings:
return ratings, "hamqsl_fallback"
return None
def format_band_conditions_wire(ratings: dict, hh_mm: str) -> str:
"""Multi-line wire string -- headline + 4 band rows."""
emoji, headline = _SLOT_LABEL.get(hh_mm, ("🌞", "Day Propagation"))
lines = [f"{emoji} {headline}", "📡 Band Conditions:"]
for band in _BAND_ORDER:
rating = ratings.get(band, "Poor")
rating_emoji = _RATING_EMOJI.get(rating, "🔴")
lines.append(f"{band}: {rating_emoji} {rating}")
return "\n".join(lines)
def slot_epoch(now_dt: datetime, hh_mm: str, tz_name: str = "America/Boise") -> int:
"""Return the epoch-second timestamp of `hh_mm` on the date of `now_dt`,
interpreted in the requested timezone. Used as the dedup key for
band_conditions_broadcasts(scheduled_for).
"""
h, m = (int(x) for x in hh_mm.split(":"))
if ZoneInfo is not None:
tz = ZoneInfo(tz_name)
local = now_dt.astimezone(tz)
slot = local.replace(hour=h, minute=m, second=0, microsecond=0)
return int(slot.timestamp())
# Fallback: assume UTC (tests can monkeypatch).
slot = now_dt.replace(hour=h, minute=m, second=0, microsecond=0)
return int(slot.timestamp())
# ========================================================================
# Internal: SWPC reading -> ratings
# ========================================================================
def _load_swpc_state(now: int) -> Optional[dict]:
"""Pull the latest Kp + SFI from swpc_events. Returns None when readings
are missing or older than _SWPC_FRESHNESS_S."""
try:
from meshai.persistence import get_db
conn = get_db()
except Exception:
return None
cutoff = now - _SWPC_FRESHNESS_S
state = {}
# Latest Kp from swpc_kindex.
row = conn.execute(
"SELECT payload_json, occurred_at FROM swpc_events "
"WHERE event_type='swpc_kindex' AND occurred_at >= ? "
"ORDER BY occurred_at DESC LIMIT 1",
(cutoff,)).fetchone()
if row and row["payload_json"]:
try:
payload = json.loads(row["payload_json"])
kp = payload.get("kp_index") or payload.get("kp") or payload.get("value")
if isinstance(kp, (int, float)):
state["kp"] = float(kp)
except Exception: pass
# Latest SFI from swpc_alerts. Look for F10.7 in the payload.
row = conn.execute(
"SELECT payload_json, occurred_at FROM swpc_events "
"WHERE event_type='swpc_alerts' AND occurred_at >= ? "
" AND (payload_json LIKE '%F10.7%' OR payload_json LIKE '%solar_flux%' "
" OR payload_json LIKE '%SFI%' OR payload_json LIKE '%sfi%') "
"ORDER BY occurred_at DESC LIMIT 1",
(cutoff,)).fetchone()
if row and row["payload_json"]:
try:
payload = json.loads(row["payload_json"])
for k in ("F10.7", "f10_7", "f107", "solar_flux", "sfi", "SFI"):
v = payload.get(k)
if isinstance(v, (int, float)):
state["sfi"] = float(v); break
except Exception: pass
# Need at least Kp to do anything useful locally. If SFI is missing,
# caller still falls back; we return None to signal "incomplete".
if "kp" not in state or "sfi" not in state:
return None
return state
def _heuristic_ratings(state: dict, day: bool) -> dict:
"""Per Matt's spec: per-band rating from Kp + SFI."""
kp = state.get("kp", 9.0)
sfi = state.get("sfi", 50.0)
out = {}
# 80-40m: night band primarily.
if not day:
if kp < 4 and sfi > 70: out["80-40m"] = "Good"
elif kp < 5: out["80-40m"] = "Fair"
else: out["80-40m"] = "Poor"
else:
# Daytime 80-40m is usually Fair to Poor (absorption).
if kp < 4: out["80-40m"] = "Fair"
else: out["80-40m"] = "Poor"
# 30-20m: day-strong band; tolerates higher Kp than upper bands.
if day:
if sfi > 120 and kp < 4: out["30-20m"] = "Good"
elif 80 <= sfi <= 120 and kp < 6: out["30-20m"] = "Fair"
elif sfi < 80 or kp >= 6: out["30-20m"] = "Poor"
else: out["30-20m"] = "Fair"
else:
# Night 20m can be Good if SFI high + Kp low (gray-line).
if sfi > 110 and kp < 4: out["30-20m"] = "Good"
elif kp < 5: out["30-20m"] = "Fair"
else: out["30-20m"] = "Poor"
# 17-15m: day-only band most of the time.
if day:
if sfi > 120: out["17-15m"] = "Good"
elif 90 <= sfi <= 120 and kp < 5: out["17-15m"] = "Fair"
else: out["17-15m"] = "Poor"
else:
# Night upper bands typically Poor unless aurora/Es event.
out["17-15m"] = "Poor"
# 12-10m: day-only solar-max band.
if day:
if sfi > 140: out["12-10m"] = "Good"
elif 110 <= sfi <= 140: out["12-10m"] = "Fair"
else: out["12-10m"] = "Poor"
else:
out["12-10m"] = "Poor"
return out
# ========================================================================
# Internal: HamQSL.com fallback
# ========================================================================
def _fetch_hamqsl(day: bool, _http_get: Optional[Callable] = None) -> Optional[dict]:
"""Pull HamQSL solarxml.php and parse calculatedconditions."""
if _http_get is None:
def _http_get(url, timeout):
with httpx.Client(timeout=timeout) as c:
return c.get(url)
try:
resp = _http_get(_HAMQSL_URL, _HAMQSL_TIMEOUT_S)
except Exception:
return None
if getattr(resp, "status_code", 0) != 200:
return None
try:
root = ET.fromstring(resp.text)
except ET.ParseError:
return None
cond = root.find(".//calculatedconditions")
if cond is None: return None
want_time = "day" if day else "night"
out = {}
# HamQSL uses names like "80m-40m", "30m-20m", "17m-15m", "12m-10m".
for band in cond.findall("band"):
name = (band.get("name") or "").lower().replace("m", "")
# Normalise to our 4-row labels.
key = None
if "80" in name and "40" in name: key = "80-40m"
elif "30" in name and "20" in name: key = "30-20m"
elif "17" in name and "15" in name: key = "17-15m"
elif "12" in name and "10" in name: key = "12-10m"
if not key: continue
t = (band.get("time") or "").lower()
if t != want_time: continue
text = (band.text or "").strip()
if text not in _RATING_EMOJI: continue
out[key] = text
return out if out else None
# ========================================================================
# Persistence helpers (band_conditions_broadcasts rows)
# ========================================================================
def record_slot_attempt(slot_epoch_s: int, *, source: str,
ratings: Optional[dict] = None,
sent_at: Optional[int] = None) -> Optional[int]:
"""Insert (or ignore) a row in band_conditions_broadcasts. Returns the
broadcast_id on success, None when the UNIQUE(scheduled_for) constraint
blocks a duplicate firing (the slot already ran)."""
try:
from meshai.persistence import get_db
conn = get_db()
except Exception:
return None
rj = json.dumps(ratings) if ratings else None
cur = conn.execute(
"INSERT OR IGNORE INTO band_conditions_broadcasts(sent_at, "
"scheduled_for, ratings_json, source) VALUES (?,?,?,?)",
(sent_at, slot_epoch_s, rj, source),
)
return int(cur.lastrowid) if cur.rowcount > 0 else None
# ========================================================================
# Scheduler -- async loop firing once per slot
# ========================================================================
class BandConditionsScheduler:
"""Fires band-conditions broadcasts at configured local times."""
def __init__(self, config, dispatcher, *,
clock: Optional[Callable[[], float]] = None,
sleep: Optional[Callable[[float], Any]] = None,
tz_name: Optional[str] = None):
self._config = config
self._dispatcher = dispatcher
self._clock = clock or time.time
self._sleep = sleep or asyncio.sleep
self._task: Optional[asyncio.Task] = None
self._stop_event: Optional[asyncio.Event] = None
self._tz_name = tz_name or getattr(
config.notifications, "band_conditions_tz", "America/Boise")
self._logger = logging.getLogger("meshai.scheduled.band_conditions")
def _enabled(self) -> bool:
return bool(getattr(self._config.notifications,
"band_conditions_enabled", True))
def _schedule(self) -> list:
sched = getattr(self._config.notifications,
"band_conditions_schedule",
["06:00", "14:00", "22:00"])
return [s for s in sched if isinstance(s, str) and ":" in s]
async def start(self) -> None:
if self._task is not None and not self._task.done():
raise RuntimeError("BandConditionsScheduler already running")
self._stop_event = asyncio.Event()
self._task = asyncio.create_task(self._run(),
name="band-conditions-scheduler")
self._logger.info(
"Band conditions scheduler started: enabled=%s schedule=%s tz=%s",
self._enabled(), self._schedule(), self._tz_name)
async def stop(self) -> None:
if self._stop_event: self._stop_event.set()
if self._task: await self._task
async def _run(self) -> None:
while not (self._stop_event and self._stop_event.is_set()):
if not self._enabled():
await self._sleep(60); continue
now = self._clock()
now_dt = datetime.fromtimestamp(now, tz=timezone.utc)
target_epoch, target_hh_mm = self._next_slot(now_dt)
wait_s = max(1, target_epoch - int(now))
try: await self._sleep(min(wait_s, 3600))
except asyncio.CancelledError: break
now2 = int(self._clock())
if now2 >= target_epoch:
await self.fire_slot(target_epoch, target_hh_mm)
def _next_slot(self, now_dt: datetime) -> tuple[int, str]:
"""Return the (epoch, hh_mm) of the next future slot. Searches today's
slots first; if all past, picks tomorrow's first slot."""
schedule = sorted(set(self._schedule()))
if not schedule:
# Fall back to noon tomorrow if config is broken.
tomorrow = now_dt + timedelta(days=1)
return slot_epoch(tomorrow, "12:00", self._tz_name), "12:00"
today_now = int(now_dt.timestamp())
for hh_mm in schedule:
ep = slot_epoch(now_dt, hh_mm, self._tz_name)
if ep > today_now: return ep, hh_mm
# All today's slots are past; pick tomorrow's first.
tomorrow = now_dt + timedelta(days=1)
return slot_epoch(tomorrow, schedule[0], self._tz_name), schedule[0]
async def fire_slot(self, slot_epoch_s: int, hh_mm: str) -> bool:
"""Compute + broadcast for this slot. Returns True on broadcast,
False on suppression (grace), None on silent skip (no data)."""
result = compute_band_ratings(now=int(self._clock()), hh_mm=hh_mm)
if result is None:
record_slot_attempt(slot_epoch_s, source="skipped_no_data")
self._logger.info(
"band-conditions: silent skip for %s (no SWPC + HamQSL down)",
hh_mm)
return False
ratings, source = result
# Pre-insert the slot row (UNIQUE constraint handles dedup).
bcast_id = record_slot_attempt(slot_epoch_s, source=source,
ratings=ratings,
sent_at=int(self._clock()))
if bcast_id is None:
# Slot already broadcast (UNIQUE constraint blocked the INSERT).
self._logger.info(
"band-conditions: slot %s already broadcast; skipping dup",
hh_mm)
return False
wire = format_band_conditions_wire(ratings, hh_mm)
try:
success = await self._dispatcher.dispatch_scheduled_broadcast(
text=wire,
source_event_table="band_conditions_broadcasts",
source_event_pk=str(bcast_id),
)
except Exception:
self._logger.exception(
"band-conditions: dispatcher raised; row stays in table")
success = False
return bool(success)

View file

@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
DEFAULT_DB_PATH = "/data/meshai.sqlite"
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
SCHEMA_VERSION = 2
SCHEMA_VERSION = 3
SCHEMA_META_TABLE = "schema_meta"
MIGRATIONS_DIR = Path(__file__).parent / "migrations"

View file

@ -0,0 +1,25 @@
-- v0.5.11 schema migration: band_conditions_broadcasts.
--
-- One row per band-conditions broadcast attempt (3x/day). Inserted
-- whether or not the broadcast actually went out -- source='skipped_no_data'
-- accounts for the cases where neither local SWPC data nor HamQSL.com
-- fallback yielded usable ratings, so the scheduler skipped silently.
--
-- UNIQUE(scheduled_for) enforces per-slot dedup: if the scheduler fires
-- multiple times for the same target hour (e.g. retry loop, clock drift),
-- only the first INSERT wins. Caller checks the constraint via
-- INSERT OR IGNORE so a duplicate firing is a clean no-op.
CREATE TABLE IF NOT EXISTS band_conditions_broadcasts (
broadcast_id INTEGER PRIMARY KEY AUTOINCREMENT,
sent_at INTEGER,
scheduled_for INTEGER NOT NULL,
ratings_json TEXT,
source TEXT NOT NULL,
UNIQUE(scheduled_for)
);
CREATE INDEX IF NOT EXISTS idx_band_conditions_sent
ON band_conditions_broadcasts(sent_at);
CREATE INDEX IF NOT EXISTS idx_band_conditions_scheduled
ON band_conditions_broadcasts(scheduled_for);

View file

@ -0,0 +1,312 @@
"""Tests for v0.5.11 band-conditions scheduled broadcaster."""
import asyncio
import json
import time
from datetime import datetime, timezone
from unittest.mock import patch
import pytest
from meshai.notifications.scheduled.band_conditions import (
BandConditionsScheduler,
_heuristic_ratings,
compute_band_ratings,
format_band_conditions_wire,
is_day_slot,
record_slot_attempt,
slot_epoch,
)
from meshai.persistence import close_thread_connection, init_db
from meshai.persistence import db as persistence_db
@pytest.fixture
def mem_db(monkeypatch, tmp_path):
db_path = str(tmp_path / "band-test.sqlite")
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
persistence_db._initialised.clear()
close_thread_connection()
conn = init_db()
yield conn
close_thread_connection()
persistence_db._initialised.discard(db_path)
def _insert_swpc_kindex(conn, *, kp, when=None):
when = when or int(time.time())
conn.execute(
"INSERT INTO swpc_events(event_id, event_type, payload_json, "
"occurred_at, first_seen_at, last_broadcast_at) VALUES (?,?,?,?,?,?)",
(f"kp_{when}", "swpc_kindex",
json.dumps({"kp_index": kp, "time": "2026-06-05T15:00:00Z"}),
when, when, None),
)
def _insert_swpc_alert(conn, *, sfi, when=None):
when = when or int(time.time())
conn.execute(
"INSERT INTO swpc_events(event_id, event_type, payload_json, "
"occurred_at, first_seen_at, last_broadcast_at) VALUES (?,?,?,?,?,?)",
(f"sfi_{when}", "swpc_alerts",
json.dumps({"F10.7": sfi, "product_id": "F10.7-Daily"}),
when, when, None),
)
# ---- (a) compute with fresh SWPC quiet conditions ----------------------
def test_compute_quiet_conditions_all_good_to_fair(mem_db):
"""Kp=2 SFI=140 -> all bands Good/Fair on day; 80-40m Good at night."""
now = int(time.time())
_insert_swpc_kindex(mem_db, kp=2.0, when=now - 600)
_insert_swpc_alert(mem_db, sfi=140.0, when=now - 1200)
result = compute_band_ratings(now=now, hh_mm="14:00", allow_hamqsl=False)
assert result is not None
ratings, source = result
assert source == "swpc_local"
# High SFI + low Kp on day -> 30-20m and 17-15m and 12-10m all Good.
assert ratings["30-20m"] == "Good"
assert ratings["17-15m"] == "Good"
assert ratings["12-10m"] == "Fair" # SFI 140 right at the edge of Fair/Good
# 80-40m on day is usually Fair at best.
assert ratings["80-40m"] in ("Fair", "Good")
# ---- (b) compute with storm conditions ---------------------------------
def test_compute_storm_conditions_mostly_poor(mem_db):
"""Kp=7 (G3) crushes ratings to Poor on upper bands."""
now = int(time.time())
_insert_swpc_kindex(mem_db, kp=7.0, when=now - 600)
_insert_swpc_alert(mem_db, sfi=90.0, when=now - 1200)
result = compute_band_ratings(now=now, hh_mm="14:00", allow_hamqsl=False)
assert result is not None
ratings, _ = result
assert ratings["17-15m"] == "Poor"
assert ratings["12-10m"] == "Poor"
assert ratings["30-20m"] == "Poor"
assert ratings["80-40m"] == "Poor"
# ---- (c) HamQSL fallback when SWPC missing -----------------------------
def test_compute_falls_back_to_hamqsl_when_no_swpc(mem_db):
"""No SWPC rows -> HamQSL fallback. Mock the HTTP call."""
XML = '''<?xml version="1.0"?>
<solar><solardata>
<calculatedconditions>
<band name="80m-40m" time="day">Fair</band>
<band name="80m-40m" time="night">Good</band>
<band name="30m-20m" time="day">Good</band>
<band name="30m-20m" time="night">Fair</band>
<band name="17m-15m" time="day">Good</band>
<band name="17m-15m" time="night">Poor</band>
<band name="12m-10m" time="day">Fair</band>
<band name="12m-10m" time="night">Poor</band>
</calculatedconditions>
</solardata></solar>
'''
class Resp:
status_code = 200
text = XML
def mock_get(url, timeout):
return Resp()
result = compute_band_ratings(now=int(time.time()), hh_mm="14:00",
_http_get=mock_get)
assert result is not None
ratings, source = result
assert source == "hamqsl_fallback"
assert ratings["80-40m"] == "Fair"
assert ratings["12-10m"] == "Fair"
# ---- (d) both fail -> None silent skip ---------------------------------
def test_compute_returns_none_when_both_sources_fail(mem_db):
def mock_get(url, timeout):
raise httpx_TimeoutError("simulated")
class httpx_TimeoutError(Exception): pass
result = compute_band_ratings(now=int(time.time()), hh_mm="14:00",
_http_get=lambda u,t: (_ for _ in ()).throw(Exception("timeout")))
assert result is None
# ---- (e) time-of-day selection -----------------------------------------
@pytest.mark.parametrize("hh_mm, expected_day", [
("06:00", True),
("14:00", True),
("22:00", False),
("23:30", False),
])
def test_is_day_slot(hh_mm, expected_day):
assert is_day_slot(hh_mm) is expected_day
# ---- (f) wire format ---------------------------------------------------
def test_wire_format_day_slot():
ratings = {"80-40m": "Fair", "30-20m": "Good",
"17-15m": "Fair", "12-10m": "Poor"}
wire = format_band_conditions_wire(ratings, "14:00")
assert wire.startswith("🌞 Day Propagation")
assert "📡 Band Conditions:" in wire
assert "80-40m: 🟡 Fair" in wire
assert "30-20m: 🟢 Good" in wire
assert "17-15m: 🟡 Fair" in wire
assert "12-10m: 🔴 Poor" in wire
def test_wire_format_night_slot():
ratings = {"80-40m": "Good", "30-20m": "Fair",
"17-15m": "Poor", "12-10m": "Poor"}
wire = format_band_conditions_wire(ratings, "22:00")
assert wire.startswith("🌙 Night Propagation")
assert "80-40m: 🟢 Good" in wire
def test_wire_format_morning_slot():
ratings = {"80-40m": "Fair", "30-20m": "Good",
"17-15m": "Fair", "12-10m": "Poor"}
wire = format_band_conditions_wire(ratings, "06:00")
assert wire.startswith("☀️ Day Propagation")
# ---- (g) byte size under target ----------------------------------------
def test_wire_byte_size_under_target():
ratings = {"80-40m": "Good", "30-20m": "Good",
"17-15m": "Good", "12-10m": "Good"}
wire = format_band_conditions_wire(ratings, "14:00")
nb = len(wire.encode("utf-8"))
assert nb < 130, f"wire {nb}B exceeds soft target -- {wire!r}"
def test_wire_has_4_band_rows_and_2_header_lines():
ratings = {"80-40m": "Good", "30-20m": "Fair",
"17-15m": "Fair", "12-10m": "Poor"}
wire = format_band_conditions_wire(ratings, "06:00")
lines = wire.split("\n")
assert len(lines) == 6 # emoji headline + 📡 header + 4 band rows
# ---- (h) cold-start grace + dedup (i) via scheduler.fire_slot ----------
class _MockDispatcher:
def __init__(self):
self.calls = []
async def dispatch_scheduled_broadcast(self, *, text, source_event_table,
source_event_pk):
self.calls.append((text, source_event_table, source_event_pk))
return True
def _build_cfg():
from meshai.config import Config
cfg = Config()
cfg.notifications.rules = []
cfg.notifications.cold_start_grace_seconds = 0
cfg.notifications.band_conditions_enabled = True
cfg.notifications.band_conditions_schedule = ["06:00", "14:00", "22:00"]
rf = cfg.notifications.toggles.get("rf_propagation")
if rf is not None:
rf.enabled = True
rf.broadcast_channel = 1
return cfg
def test_fire_slot_records_broadcast_row(mem_db):
now = int(time.time())
_insert_swpc_kindex(mem_db, kp=2.0, when=now - 600)
_insert_swpc_alert(mem_db, sfi=140.0, when=now - 1200)
sched = BandConditionsScheduler(_build_cfg(), _MockDispatcher())
slot = now + 60 # arbitrary epoch
asyncio.run(sched.fire_slot(slot, "14:00"))
row = mem_db.execute(
"SELECT source, ratings_json FROM band_conditions_broadcasts "
"WHERE scheduled_for=?", (slot,)).fetchone()
assert row is not None
assert row["source"] == "swpc_local"
assert json.loads(row["ratings_json"])["30-20m"] == "Good"
def test_fire_slot_dedup_via_unique_constraint(mem_db):
"""Same scheduled_for fired twice -> only one row, only one broadcast."""
now = int(time.time())
_insert_swpc_kindex(mem_db, kp=2.0, when=now - 600)
_insert_swpc_alert(mem_db, sfi=140.0, when=now - 1200)
disp = _MockDispatcher()
sched = BandConditionsScheduler(_build_cfg(), disp)
slot = now + 60
asyncio.run(sched.fire_slot(slot, "14:00"))
asyncio.run(sched.fire_slot(slot, "14:00")) # dup firing
n_rows = mem_db.execute(
"SELECT COUNT(*) AS n FROM band_conditions_broadcasts "
"WHERE scheduled_for=?", (slot,)).fetchone()["n"]
assert n_rows == 1
assert len(disp.calls) == 1 # second firing aborted before dispatch
def test_fire_slot_silent_skip_when_no_data(mem_db):
"""No SWPC + (mocked) HamQSL failure -> source='skipped_no_data'."""
# Patch the HamQSL fetcher inside the module to simulate the HTTP error.
from meshai.notifications.scheduled import band_conditions as bc_mod
original = bc_mod._fetch_hamqsl
bc_mod._fetch_hamqsl = lambda day, _http_get=None: None
try:
sched = BandConditionsScheduler(_build_cfg(), _MockDispatcher())
slot = int(time.time()) + 60
asyncio.run(sched.fire_slot(slot, "14:00"))
row = mem_db.execute(
"SELECT source FROM band_conditions_broadcasts "
"WHERE scheduled_for=?", (slot,)).fetchone()
assert row is not None
assert row["source"] == "skipped_no_data"
finally:
bc_mod._fetch_hamqsl = original
# ---- record_slot_attempt direct unit test ------------------------------
def test_record_slot_attempt_returns_id_first_then_none(mem_db):
"""First insert returns a row id; duplicate returns None."""
bid1 = record_slot_attempt(1_000_000, source="swpc_local",
ratings={"80-40m": "Good"})
assert isinstance(bid1, int)
bid2 = record_slot_attempt(1_000_000, source="swpc_local",
ratings={"80-40m": "Good"})
assert bid2 is None
# ---- slot_epoch alignment ----------------------------------------------
def test_slot_epoch_returns_local_time_alignment():
"""slot_epoch('06:00', America/Boise) should be the same wall-clock
hour regardless of UTC offset (DST handled automatically)."""
now_dt = datetime(2026, 6, 5, 12, 0, tzinfo=timezone.utc)
ep = slot_epoch(now_dt, "06:00", "America/Boise")
# Mountain Daylight Time in June -> UTC-6 -> 06:00 MDT == 12:00 UTC.
dt = datetime.fromtimestamp(ep, tz=timezone.utc)
assert dt.hour == 12 and dt.minute == 0 # 06:00 MDT in June
# Winter date -> UTC-7 -> 06:00 MST == 13:00 UTC.
winter_dt = datetime(2026, 1, 15, 12, 0, tzinfo=timezone.utc)
ep = slot_epoch(winter_dt, "06:00", "America/Boise")
dt = datetime.fromtimestamp(ep, tz=timezone.utc)
assert dt.hour == 13 and dt.minute == 0 # 06:00 MST in January