diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 40416db..f08e81f 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -498,6 +498,19 @@ class CentralConsumer: if n is not None and str(n.get("_kind", "")).startswith("wfigs"): from meshai.central.wfigs_handler import handle_wfigs synthesized = handle_wfigs(n, envelope, subject, data=data) or None + # v0.5.10 nws + usgs_quake + swpc handlers. Adapter-specific + # filters: NWS severity gate, quake magnitude+Idaho-distance, + # SWPC G3+/R3+/S1+ NOAA scales. Universal freshness gate above + # already dropped stale envelopes per central_normalizer. + elif inner.get("adapter") == "nws": + from meshai.central.nws_handler import handle_nws + synthesized = handle_nws(envelope, subject, data=data) or None + elif inner.get("adapter") == "usgs_quake": + from meshai.central.quake_handler import handle_quake + synthesized = handle_quake(envelope, subject, data=data) or None + elif inner.get("adapter") in ("swpc_alerts", "swpc_kindex", "swpc_protons"): + from meshai.central.swpc_handler import handle_swpc + synthesized = handle_swpc(envelope, subject, data=data) or None elif n is not None and category in ("work_zone", "road_closure", "road_incident"): synthesized = format_work_zone_mesh(n) or None except Exception: diff --git a/meshai/central/nws_handler.py b/meshai/central/nws_handler.py new file mode 100644 index 0000000..eb2e4f9 --- /dev/null +++ b/meshai/central/nws_handler.py @@ -0,0 +1,293 @@ +"""v0.5.10 NWS weather-alerts handler. + +Severity floor: broadcast only when CAP severity in {Extreme, Severe}. Watch / +Advisory / Statement (Moderate, Minor, Unknown) get logged to event_log +handled=0 and silently skipped. + +Tombstone handling: msgType in {Cancel, Expire} -> log handled=0, no +broadcast. + +Per-CAP-id dedup: nws_alerts table keyed on CAP `event_id` (the urn-style +identifier). First sighting fires `New:`; re-issues UPSERT current_* but +don't re-broadcast (v0.5.9-incident no-Update rule). + +Wire format (MEDIUM, ~80-90 B): + {emoji} {event_type}: {area_desc}, until {expires_short}, @ {lat:.3f},{lon:.3f} + +Emoji by event_type prefix (substring match, case-insensitive): + Tornado Warning -> đŸŒĒī¸ + Severe Thunderstorm War.. -> đŸŒŠī¸ + Flash Flood / Flood -> 🌊 + Winter Storm / Blizzard / Ice -> â„ī¸ + Heat / Excessive Heat -> đŸŒĄī¸ + High Wind / Wind -> đŸŒŦī¸ + Fire Weather / Red Flag -> đŸ”Ĩ + Air Quality -> 😷 + Frost / Freeze -> đŸĨļ + default -> âš ī¸ +""" +from __future__ import annotations + +import logging +import re +import time +from datetime import datetime, timezone +from typing import Any, Optional + +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +# CAP severity strings that pass the gate. +_BROADCAST_SEVERITIES = {"Extreme", "Severe"} + +# Tombstone msgType values. +_TOMBSTONE_MSGTYPES = {"Cancel", "Expire"} + +# Ordered (substring, emoji) checks; first match wins. +_EVENT_EMOJI = [ + ("tornado", "đŸŒĒī¸"), + ("severe thunderstorm", "đŸŒŠī¸"), + ("thunderstorm", "đŸŒŠī¸"), + ("flash flood", "🌊"), + ("flood", "🌊"), + ("winter storm", "â„ī¸"), + ("blizzard", "â„ī¸"), + ("ice storm", "â„ī¸"), + ("ice", "â„ī¸"), + ("excessive heat", "đŸŒĄī¸"), + ("heat", "đŸŒĄī¸"), + ("high wind", "đŸŒŦī¸"), + ("wind", "đŸŒŦī¸"), + ("fire weather", "đŸ”Ĩ"), + ("red flag", "đŸ”Ĩ"), + ("air quality", "😷"), + ("freeze", "đŸĨļ"), + ("frost", "đŸĨļ"), +] + + +def _now() -> int: return int(time.time()) + + +def _parse_iso(s: Optional[str]) -> Optional[int]: + if not s: return None + try: return int(datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp()) + except Exception: return None + + +def _emoji_for_event(event_type: Optional[str]) -> str: + if not event_type: return "âš ī¸" + s = event_type.lower() + for substr, emoji in _EVENT_EMOJI: + if substr in s: + return emoji + return "âš ī¸" + + +def _format_expires_short(epoch: Optional[int], now: Optional[int] = None) -> str: + """Renders 'until 8:15pm' / 'until Mon 3am' / 'until 6/12 8pm' depending on + how far away the expiry is. now defaults to current time so the relative + rendering is correct in tests too.""" + if not epoch: return "expires unknown" + now = now or _now() + diff = epoch - now + try: + dt = datetime.fromtimestamp(epoch, tz=timezone.utc).astimezone() + except Exception: + return "expires unknown" + + hour = dt.strftime("%-I").lstrip("0") or "0" + minute = dt.minute + ampm = "am" if dt.hour < 12 else "pm" + if minute: + time_str = f"{hour}:{minute:02d}{ampm}" + else: + time_str = f"{hour}{ampm}" + + if diff < 6 * 3600: + return f"until {time_str}" + if diff < 7 * 86400: + return f"until {dt.strftime('%a')} {time_str}" + return f"until {dt.strftime('%-m/%-d')} {time_str}" + + +def _location_anchor(area_desc: Optional[str], geocoder_city: Optional[str], + county: Optional[str], state: Optional[str]) -> str: + """Priority: geocoder.city > areaDesc (first 30 chars) > county+state > state.""" + if geocoder_city: + return str(geocoder_city) + if area_desc: + # NWS areaDesc is often semicolon-delimited list of zones; trim to first. + head = area_desc.split(";")[0].strip() + if len(head) > 30: head = head[:27] + "..." + return head + if county and state: + return f"{county} Co {state}" + if state: + return str(state) + return "(location unknown)" + + +def handle_nws(envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + if not isinstance(envelope, dict): return None + inner = envelope.get("data") or {} + if (inner.get("adapter") or "") != "nws": return None + + d = inner.get("data") or {} + geo = inner.get("geo") or {} + ge = (d.get("_enriched") or {}).get("geocoder") or {} + now = now if now is not None else _now() + category_raw = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + + try: + conn = get_db() + except Exception: + logger.exception("nws_handler: persistence unavailable") + return None + + cap_id = d.get("id") or inner.get("id") + if not cap_id: + return None + + # Tombstone: msgType in {Cancel, Expire} -> log handled=0, no broadcast. + msg_type = d.get("msgType") + if msg_type in _TOMBSTONE_MSGTYPES: + _log_event(conn, now=now, source="nws", category=category_raw, + severity_word=severity_word, event_id_external=cap_id, + subject=subject, handled=0, + table_name="nws_alerts", table_pk=cap_id) + return None + + # Severity gate (CAP string from data.severity, fall back to category + # heuristic for envelopes that lack the field). + cap_sev = d.get("severity") + if cap_sev not in _BROADCAST_SEVERITIES: + # Heuristic: category like wx.alert.severe_thunderstorm_warning -> + # treat as Severe even when CAP severity field is missing. + if not (category_raw.endswith("_warning") or category_raw.endswith(".warning")): + _log_event(conn, now=now, source="nws", category=category_raw, + severity_word=severity_word, event_id_external=cap_id, + subject=subject, handled=0, + table_name="nws_alerts", table_pk=cap_id) + return None + + # Per-CAP-id dedup. + log_id = _log_event_returning_id( + conn, now=now, source="nws", category=category_raw, + severity_word=severity_word, event_id_external=cap_id, + subject=subject, handled=0, + table_name="nws_alerts", table_pk=cap_id) + + row = conn.execute( + "SELECT last_broadcast_at FROM nws_alerts WHERE event_id=?", + (cap_id,)).fetchone() + + event_type = d.get("event") or _category_to_event_type(category_raw) + area_desc = d.get("areaDesc") + headline = d.get("headline") + description = d.get("description") + cap_severity = d.get("severity") + county = d.get("areaDesc") or ge.get("county") + state = ge.get("state") or d.get("state") + expires_epoch = _parse_iso(d.get("expires")) + + lat = lon = None + cent = geo.get("centroid") or [] + if isinstance(cent, list) and len(cent) >= 2: + lon, lat = cent[0], cent[1] + + if row is None: + conn.execute( + "INSERT INTO nws_alerts(event_id, alert_type, severity, county, " + "state, headline, description, expires_at, first_seen_at, " + "last_broadcast_at) VALUES (?,?,?,?,?,?,?,?,?,?)", + (cap_id, event_type, cap_severity, county, state, + headline, description, expires_epoch, now, None), + ) + wire = _render(event_type=event_type, area_desc=area_desc, + geocoder_city=ge.get("city"), county=county, state=state, + expires_epoch=expires_epoch, lat=lat, lon=lon, now=now) + _attach_commit(data, cap_id=cap_id, event_log_row_id=log_id) + return wire + + if row["last_broadcast_at"] is None: + # Cold-start race: row exists but broadcast was previously dropped. + wire = _render(event_type=event_type, area_desc=area_desc, + geocoder_city=ge.get("city"), county=county, state=state, + expires_epoch=expires_epoch, lat=lat, lon=lon, now=now) + _attach_commit(data, cap_id=cap_id, event_log_row_id=log_id) + return wire + + # Already broadcast -- no Update for v0.5.10 (mirrors v0.5.9 incident rule). + return None + + +def _render(*, event_type, area_desc, geocoder_city, county, state, + expires_epoch, lat, lon, now) -> str: + emoji = _emoji_for_event(event_type) + anchor = _location_anchor(area_desc, geocoder_city, county, state) + expires_seg = _format_expires_short(expires_epoch, now=now) + coords = "" + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + coords = f", @ {lat:.3f},{lon:.3f}" + return f"{emoji} {event_type or 'Weather Alert'}: {anchor}, {expires_seg}{coords}" + + +def _category_to_event_type(category_raw: str) -> str: + """Best-effort friendly-name derivation when data.event is missing. + Turns 'wx.alert.severe_thunderstorm_warning' -> 'Severe Thunderstorm Warning'.""" + if not category_raw: return "Weather Alert" + tail = category_raw.split(".")[-1] if "." in category_raw else category_raw + return tail.replace("_", " ").title() + + +def _attach_commit(data: Optional[dict], *, cap_id: str, + event_log_row_id: Optional[int]) -> None: + if not isinstance(data, dict): return + + def _on_commit(committed_at: float) -> None: + try: conn = get_db() + except Exception: + logger.exception("nws commit: persistence unavailable"); return + conn.execute("UPDATE nws_alerts SET last_broadcast_at=? WHERE event_id=?", + (int(committed_at), cap_id)) + if event_log_row_id is not None: + conn.execute("UPDATE event_log SET handled=1 WHERE id=?", + (int(event_log_row_id),)) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = {"table": "nws_alerts", "pk": cap_id} + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _log_event(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, table_name, table_pk) -> None: + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + + +def _log_event_returning_id(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> int: + cur = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + return int(cur.lastrowid) diff --git a/meshai/central/quake_handler.py b/meshai/central/quake_handler.py new file mode 100644 index 0000000..c228ff8 --- /dev/null +++ b/meshai/central/quake_handler.py @@ -0,0 +1,227 @@ +"""v0.5.10 USGS earthquakes handler. + +Broadcast gate (any of these triggers): + (a) magnitude >= 3.0 globally + (b) magnitude >= 2.5 within 250 mi of Idaho centroid + (c) tsunami_warning at any magnitude + (d) PAGER alert level in {orange, red} + +Wire format: + {emoji} Magnitude {mag:.1f} earthquake {place_string}, {depth}km depth, @ {lat:.3f},{lon:.3f} + +Emoji: + Routine -> 🌐 + M5+ -> âš ī¸ + tsunami warning -> 🚨 + +place_string: prefer data.place (USGS curated, e.g. "11 km SSW of Snowville, +Utah"); fall back to nearest_town anchor when missing. + +Persistence: UPSERT into quake_events using USGS event_id. First sighting +fires New:; revisions UPSERT but don't re-broadcast (v0.5.9 no-Update rule). +""" +from __future__ import annotations + +import logging +import math +import time +from typing import Any, Optional + +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +# Idaho centroid + radius for the M2.5 regional broadcast tier. +_IDAHO_CENTROID_LAT = 44.36 +_IDAHO_CENTROID_LON = -114.61 +_IDAHO_RADIUS_MI = 250 + +_PAGER_BROADCAST_LEVELS = {"orange", "red"} + + +def _now() -> int: return int(time.time()) + + +def _haversine_mi(lat1, lon1, lat2, lon2) -> float: + R_mi = 3958.8 + p1 = math.radians(lat1); p2 = math.radians(lat2) + dp = math.radians(lat2 - lat1); dl = math.radians(lon2 - lon1) + a = math.sin(dp/2)**2 + math.cos(p1)*math.cos(p2)*math.sin(dl/2)**2 + return 2 * R_mi * math.atan2(math.sqrt(a), math.sqrt(1-a)) + + +def within_250mi_of_idaho(lat: float, lon: float) -> bool: + """Return True if (lat, lon) is within 250 mi of Idaho's centroid. + Public so tests can verify the boundary directly.""" + if not (isinstance(lat, (int, float)) and isinstance(lon, (int, float))): + return False + return _haversine_mi(lat, lon, _IDAHO_CENTROID_LAT, _IDAHO_CENTROID_LON) <= _IDAHO_RADIUS_MI + + +def _should_broadcast(mag: Optional[float], lat: Optional[float], + lon: Optional[float], tsunami: bool, + pager_alert: Optional[str]) -> bool: + if tsunami: return True + if pager_alert and pager_alert.lower() in _PAGER_BROADCAST_LEVELS: + return True + if not isinstance(mag, (int, float)): return False + if mag >= 3.0: return True + if mag >= 2.5 and within_250mi_of_idaho(lat, lon): return True + return False + + +def _emoji_for(mag: Optional[float], tsunami: bool) -> str: + if tsunami: return "🚨" + if isinstance(mag, (int, float)) and mag >= 5.0: return "âš ī¸" + return "🌐" + + +def handle_quake(envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + if not isinstance(envelope, dict): return None + inner = envelope.get("data") or {} + if (inner.get("adapter") or "") != "usgs_quake": return None + + d = inner.get("data") or {} + geo = inner.get("geo") or {} + now = now if now is not None else _now() + category_raw = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + + try: + conn = get_db() + except Exception: + logger.exception("quake_handler: persistence unavailable") + return None + + event_id = d.get("id") or inner.get("id") + if not event_id: + return None + + mag = d.get("magnitude") or d.get("mag") + if isinstance(mag, str): + try: mag = float(mag) + except ValueError: mag = None + elif isinstance(mag, (int, float)): + mag = float(mag) + + depth_km = d.get("depth_km") or d.get("depth") + place = d.get("place") + tsunami = bool(d.get("tsunami") or d.get("tsunami_warning")) + pager_alert = d.get("alert") + + cent = geo.get("centroid") or [] + if isinstance(cent, list) and len(cent) >= 2: + lon, lat = cent[0], cent[1] + else: + lat = lon = None + + occurred_at = None + tms = d.get("time_ms") + if isinstance(tms, (int, float)) and tms > 1e12: + occurred_at = int(tms / 1000) + elif tms and isinstance(tms, (int, float)): + occurred_at = int(tms) + + # Filter -- gate check. + if not _should_broadcast(mag, lat, lon, tsunami, pager_alert): + _log_event(conn, now=now, source="usgs_quake", category=category_raw, + severity_word=severity_word, event_id_external=event_id, + subject=subject, handled=0, + table_name="quake_events", table_pk=event_id) + return None + + log_id = _log_event_returning_id( + conn, now=now, source="usgs_quake", category=category_raw, + severity_word=severity_word, event_id_external=event_id, + subject=subject, handled=0, + table_name="quake_events", table_pk=event_id) + + row = conn.execute( + "SELECT last_broadcast_at FROM quake_events WHERE event_id=?", + (event_id,)).fetchone() + + if row is None: + conn.execute( + "INSERT INTO quake_events(event_id, magnitude, depth_km, place, lat, lon, " + "occurred_at, tsunami_warning, first_seen_at, last_broadcast_at) " + "VALUES (?,?,?,?,?,?,?,?,?,?)", + (event_id, mag, depth_km, place, lat, lon, occurred_at, + 1 if tsunami else 0, now, None), + ) + wire = _render(mag=mag, place=place, depth_km=depth_km, lat=lat, lon=lon, + tsunami=tsunami) + _attach_commit(data, event_id=event_id, event_log_row_id=log_id) + return wire + + if row["last_broadcast_at"] is None: + wire = _render(mag=mag, place=place, depth_km=depth_km, lat=lat, lon=lon, + tsunami=tsunami) + _attach_commit(data, event_id=event_id, event_log_row_id=log_id) + return wire + + return None + + +def _render(*, mag, place, depth_km, lat, lon, tsunami) -> str: + emoji = _emoji_for(mag, tsunami) + mag_str = f"{mag:.1f}" if isinstance(mag, (int, float)) else "?" + place_str = place if place else "(location unknown)" + if isinstance(depth_km, (int, float)): + depth_seg = f", {int(round(depth_km))}km depth" + else: + depth_seg = "" + coords = "" + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + coords = f", @ {lat:.3f},{lon:.3f}" + tsunami_seg = " -- TSUNAMI WARNING" if tsunami else "" + return f"{emoji} Magnitude {mag_str} earthquake {place_str}{depth_seg}{coords}{tsunami_seg}" + + +def _attach_commit(data: Optional[dict], *, event_id: str, + event_log_row_id: Optional[int]) -> None: + if not isinstance(data, dict): return + + def _on_commit(committed_at: float) -> None: + try: conn = get_db() + except Exception: + logger.exception("quake commit: persistence unavailable"); return + conn.execute("UPDATE quake_events SET last_broadcast_at=? WHERE event_id=?", + (int(committed_at), event_id)) + if event_log_row_id is not None: + conn.execute("UPDATE event_log SET handled=1 WHERE id=?", + (int(event_log_row_id),)) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = {"table": "quake_events", "pk": event_id} + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _log_event(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, table_name, table_pk) -> None: + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + + +def _log_event_returning_id(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> int: + cur = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + return int(cur.lastrowid) diff --git a/meshai/central/swpc_handler.py b/meshai/central/swpc_handler.py new file mode 100644 index 0000000..2de0529 --- /dev/null +++ b/meshai/central/swpc_handler.py @@ -0,0 +1,351 @@ +"""v0.5.10 SWPC space-weather handler. + +Aggressive filter -- broadcast ONLY when: + (a) Geomagnetic storm Kp >= 7 (G3 strong or higher) + (b) Solar flare X1+ (R3 strong radio blackout or higher) + (c) Solar proton event >= 10 pfu @ >= 10 MeV (S1 minor radiation storm + or higher) + +All else (Kp < 7, M-class flares, S0 protons) -> swpc_events table for +history + event_log handled=0, NO broadcast. + +Three Central sub-adapters all route here: + swpc_kindex -> check Kp threshold + swpc_alerts -> parse alert payload (flare class, geomag, proton scale) + swpc_protons -> check >=10 MeV proton flux threshold + +Wire format (Matt's approved option C): + 🌌 Strong geomagnetic storm (G3/Kp7) -- HF degraded, aurora possible + 🔆 Major solar flare (R3/X1.2) -- HF radio fading ~30 min, GPS may glitch + â˜ĸī¸ Solar radiation storm (S1) -- polar HF radio affected +""" +from __future__ import annotations + +import json +import logging +import re +import time +from typing import Any, Optional + +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +# Kp -> G-scale mapping. Broadcast only G3 and above (Kp >= 7). +_G_SCALE = {5: ("G1", "minor"), 6: ("G2", "moderate"), 7: ("G3", "strong"), + 8: ("G4", "severe"), 9: ("G5", "extreme")} + +# Flare class -> R-scale. Broadcast only R3 and above (X1+). +_FLARE_R_THRESHOLD = "X1" # minimum class to broadcast + +# Proton flux -> S-scale. >= 10 pfu @ >=10 MeV is S1. +_S_SCALE_THRESHOLDS = [ + (1e5, "S5", "extreme"), + (1e4, "S4", "severe"), + (1e3, "S3", "strong"), + (1e2, "S2", "moderate"), + (10, "S1", "minor"), +] + + +def _now() -> int: return int(time.time()) + + +def _coerce_float(v) -> Optional[float]: + if v is None: return None + if isinstance(v, (int, float)): return float(v) + try: return float(v) + except (TypeError, ValueError): return None + + +def _kp_g_scale(kp: float) -> Optional[tuple]: + """Returns (G-code, label) for Kp values that should broadcast (Kp >= 7).""" + k = int(kp) if kp == int(kp) else int(kp) + 1 if kp - int(kp) > 0.5 else int(kp) + # Be liberal -- treat Kp 7.0 and above as G3+ exactly. + if kp >= 9: return _G_SCALE[9] + if kp >= 8: return _G_SCALE[8] + if kp >= 7: return _G_SCALE[7] + return None + + +def _flare_r_scale(flare_class: Optional[str]) -> Optional[tuple]: + """Parse 'X1.2', 'M5.5', 'C3.1' etc. Return (R-code, label, class_str) + for X1+ classes only.""" + if not flare_class: return None + s = str(flare_class).strip().upper() + m = re.match(r"^([ABCMX])([0-9.]+)?", s) + if not m: return None + cls, magnitude = m.group(1), m.group(2) + try: mag = float(magnitude) if magnitude else 1.0 + except ValueError: mag = 1.0 + if cls == "X": + if mag >= 20: return ("R5", "extreme", s) + if mag >= 10: return ("R4", "severe", s) + return ("R3", "strong", s) # X1-X9.9 + # M-class and below: skip. + return None + + +def _proton_s_scale(pfu: float) -> Optional[tuple]: + """Return (S-code, label, pfu_value) for proton flux at >= 10 pfu @ >=10 MeV.""" + for thr, code, label in _S_SCALE_THRESHOLDS: + if pfu >= thr: + return (code, label, pfu) + return None + + +def _extract_kp(d: dict) -> Optional[float]: + for k in ("kp_index", "kp", "k_index", "kindex", "value", "estimated_kp"): + v = d.get(k) + f = _coerce_float(v) + if f is not None: return f + return None + + +# S1+ NOAA scale is calibrated for the >=10 MeV proton channel. Lower- +# energy channels (>=1 MeV, >=5 MeV) have much higher baseline flux and +# would trigger spurious 'storm' events. Only honor these energy labels. +_S_SCALE_RELEVANT_ENERGIES = ("10", ">=10", ">10", ">=10 MeV", ">=10MeV", + "30", ">=30", ">=30 MeV", ">=50 MeV", + ">=100 MeV", ">=100") + + +def _is_relevant_proton_energy(energy) -> bool: + if energy is None: + return False # missing energy label -> can't validate; safer to skip + if isinstance(energy, (int, float)): + return energy >= 10 + s = str(energy).strip() + return s in _S_SCALE_RELEVANT_ENERGIES + + +def _extract_proton_flux(d: dict) -> Optional[float]: + """Match the 'flux at >=10 MeV' channel (or higher). Field names vary; + explicit channel labels win. Envelopes with `energy='>=1 MeV'` or + `'>=5 MeV'` are ALWAYS rejected -- different background floor.""" + # Explicit per-channel field names (already named after the energy). + for k in ("p10mev", "proton_flux_10mev", "flux_10mev", "p_geq_10MeV"): + v = d.get(k) + f = _coerce_float(v) + if f is not None: return f + # Generic flux/value -- require the `energy` field to validate channel. + energy = d.get("energy_mev") or d.get("energy") + if _is_relevant_proton_energy(energy): + for k in ("flux", "value", "proton_flux"): + v = d.get(k) + f = _coerce_float(v) + if f is not None: return f + return None + + +def _extract_flare_class(d: dict) -> Optional[str]: + for k in ("flare_class", "class", "magnitude_class", "x_ray_class"): + v = d.get(k) + if v: return str(v) + # The product_id sometimes encodes the class (e.g. "X1.2 FLARE"). + pid = d.get("product_id") or d.get("message") or "" + m = re.search(r"\b([MX][0-9.]+)\b", str(pid).upper()) + return m.group(0) if m else None + + +def handle_swpc(envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + if not isinstance(envelope, dict): return None + inner = envelope.get("data") or {} + adapter = inner.get("adapter") or "" + if adapter not in ("swpc_alerts", "swpc_kindex", "swpc_protons"): + return None + + d = inner.get("data") or {} + now = now if now is not None else _now() + category_raw = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + + try: + conn = get_db() + except Exception: + logger.exception("swpc_handler: persistence unavailable") + return None + + event_id = d.get("id") or inner.get("id") or d.get("product_id") + if not event_id: + return None + + # Classify the event + decide. + event_kind = None # "geomag" | "flare" | "proton" + scale_code = None + label = None + scalar_str = None + + if adapter == "swpc_kindex": + kp = _extract_kp(d) + if kp is not None: + g = _kp_g_scale(kp) + if g: + event_kind = "geomag" + scale_code, label = g + scalar_str = f"Kp{int(round(kp))}" + + elif adapter == "swpc_protons": + pfu = _extract_proton_flux(d) + if pfu is not None: + s = _proton_s_scale(pfu) + if s: + event_kind = "proton" + scale_code, label, val = s + scalar_str = f"{int(val) if float(val) >= 1 else val:.0f} pfu" if val >= 1 else f"{val:.1f} pfu" + + elif adapter == "swpc_alerts": + # swpc_alerts can carry any kind. Try Kp first, flare next, proton last. + kp = _extract_kp(d) + if kp is not None: + g = _kp_g_scale(kp) + if g: + event_kind = "geomag"; scale_code, label = g + scalar_str = f"Kp{int(round(kp))}" + if event_kind is None: + fcls = _extract_flare_class(d) + r = _flare_r_scale(fcls) + if r: + event_kind = "flare"; scale_code, label, cls_str = r + scalar_str = cls_str + if event_kind is None: + pfu = _extract_proton_flux(d) + if pfu is not None: + s = _proton_s_scale(pfu) + if s: + event_kind = "proton"; scale_code, label, val = s + scalar_str = f"{int(val)} pfu" if val >= 1 else f"{val:.1f} pfu" + + # Persist + filter. + payload_json = None + try: payload_json = json.dumps(d, default=str)[:8000] + except Exception: payload_json = None + occurred_at = None + t = d.get("time") or d.get("issued_at") or d.get("issue_time") + if isinstance(t, str): + try: + from datetime import datetime as _dt + occurred_at = int(_dt.fromisoformat(t.replace("Z", "+00:00")).timestamp()) + except Exception: pass + elif isinstance(t, (int, float)): + occurred_at = int(t / 1000) if t > 1e12 else int(t) + + if event_kind is None: + # Below threshold (routine Kp, M-class flare, S0 protons, etc). + # Persist for history; log handled=0; no broadcast. + _upsert_swpc(conn, event_id=event_id, adapter=adapter, + payload_json=payload_json, occurred_at=occurred_at or now, + first_seen_at=now, set_last_broadcast=False) + _log_event(conn, now=now, source="swpc", category=category_raw, + severity_word=severity_word, event_id_external=event_id, + subject=subject, handled=0, + table_name="swpc_events", table_pk=event_id) + return None + + # Broadcast-worthy. Per-event dedup + commit pattern. + log_id = _log_event_returning_id( + conn, now=now, source="swpc", category=category_raw, + severity_word=severity_word, event_id_external=event_id, + subject=subject, handled=0, + table_name="swpc_events", table_pk=event_id) + + row = conn.execute( + "SELECT last_broadcast_at FROM swpc_events WHERE event_id=?", + (event_id,)).fetchone() + + if row is None: + _upsert_swpc(conn, event_id=event_id, adapter=adapter, + payload_json=payload_json, occurred_at=occurred_at or now, + first_seen_at=now, set_last_broadcast=False) + wire = _render(event_kind, scale_code, label, scalar_str) + _attach_commit(data, event_id=event_id, event_log_row_id=log_id) + return wire + + if row["last_broadcast_at"] is None: + wire = _render(event_kind, scale_code, label, scalar_str) + _attach_commit(data, event_id=event_id, event_log_row_id=log_id) + return wire + + return None + + +def _render(event_kind, scale_code, label, scalar_str) -> str: + if event_kind == "geomag": + return (f"🌌 {label.title()} geomagnetic storm ({scale_code}/{scalar_str}) " + f"-- HF degraded, aurora possible") + if event_kind == "flare": + return (f"🔆 Major solar flare ({scale_code}/{scalar_str}) " + f"-- HF radio fading ~30 min, GPS may glitch") + if event_kind == "proton": + return (f"â˜ĸī¸ Solar radiation storm ({scale_code}/{scalar_str}) " + f"-- polar HF radio affected") + return f"âš ī¸ Space weather event ({scale_code or '?'})" + + +def _upsert_swpc(conn, *, event_id, adapter, payload_json, occurred_at, + first_seen_at, set_last_broadcast=False, broadcast_at=None) -> None: + existing = conn.execute( + "SELECT 1 FROM swpc_events WHERE event_id=?", (event_id,)).fetchone() + if existing is None: + conn.execute( + "INSERT INTO swpc_events(event_id, event_type, severity_int, " + "payload_json, occurred_at, first_seen_at, last_broadcast_at) " + "VALUES (?,?,?,?,?,?,?)", + (event_id, adapter, None, payload_json, occurred_at, + first_seen_at, broadcast_at if set_last_broadcast else None)) + else: + conn.execute( + "UPDATE swpc_events SET event_type=?, payload_json=?, occurred_at=? " + "WHERE event_id=?", + (adapter, payload_json, occurred_at, event_id)) + + +def _attach_commit(data: Optional[dict], *, event_id: str, + event_log_row_id: Optional[int]) -> None: + if not isinstance(data, dict): return + + def _on_commit(committed_at: float) -> None: + try: conn = get_db() + except Exception: + logger.exception("swpc commit: persistence unavailable"); return + conn.execute( + "UPDATE swpc_events SET last_broadcast_at=? WHERE event_id=?", + (int(committed_at), event_id)) + if event_log_row_id is not None: + conn.execute("UPDATE event_log SET handled=1 WHERE id=?", + (int(event_log_row_id),)) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = {"table": "swpc_events", "pk": event_id} + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _log_event(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, table_name, table_pk) -> None: + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + + +def _log_event_returning_id(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> int: + cur = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + return int(cur.lastrowid) diff --git a/meshai/central_normalizer.py b/meshai/central_normalizer.py index e98a26d..9d3d364 100644 --- a/meshai/central_normalizer.py +++ b/meshai/central_normalizer.py @@ -948,6 +948,24 @@ def is_incident_envelope_stale(envelope: dict, now: int, val = d.get("start_epoch") if isinstance(val, (int, float)) and val > 0: se = int(val) + elif adapter == "nws": + # NWS CAP: prefer `sent` (issuance), fall back to `effective`. + se = (_parse_iso_epoch_freshness(d.get("sent")) + or _parse_iso_epoch_freshness(d.get("effective"))) + elif adapter == "usgs_quake": + val = d.get("time_ms") + if isinstance(val, (int, float)) and val > 0: + se = int(val / 1000) if val > 1e12 else int(val) + elif adapter in ("swpc_alerts", "swpc_kindex", "swpc_protons"): + # Generic time / issued_at field. + for k in ("time", "issued_at", "issue_time"): + v = d.get(k) + if isinstance(v, str): + se = _parse_iso_epoch_freshness(v) + if se is not None: break + elif isinstance(v, (int, float)) and v > 0: + se = int(v / 1000) if v > 1e12 else int(v) + break else: return False # adapter not in scope of this gate diff --git a/tests/test_nws_handler.py b/tests/test_nws_handler.py new file mode 100644 index 0000000..77b9731 --- /dev/null +++ b/tests/test_nws_handler.py @@ -0,0 +1,201 @@ +"""Tests for v0.5.10 NWS handler.""" +import pytest + +from meshai.central.nws_handler import handle_nws, _emoji_for_event +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "nws-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +def _nws_env(*, cap_id="urn:oid:test.001", + event="Severe Thunderstorm Warning", + severity_str="Severe", + area_desc="Twin Falls County", + county="Twin Falls", state="ID", + expires="2026-06-05T03:00:00Z", + msg_type=None, + lat=42.500, lon=-114.460, + geocoder_city=None, + category="wx.alert.severe_thunderstorm_warning"): + return { + "id": cap_id, "subject": "central.wx.alert.us.id", + "data": { + "id": cap_id, "adapter": "nws", "category": category, + "severity": 2, + "geo": {"centroid": [lon, lat], "primary_region": "US-ID"}, + "data": { + "id": cap_id, "@type": "wx:Alert", + "event": event, "severity": severity_str, + "areaDesc": area_desc, "msgType": msg_type or "Alert", + "headline": f"{event} for {area_desc}", + "description": "Storm details.", + "expires": expires, + "_enriched": {"geocoder": {"city": geocoder_city, + "county": county, "state": state}}, + }, + }, + } + + +def _commit(data, t): + data["_on_broadcast_committed"](float(t)) + + +# ---- severity gate ---- + + +def test_severe_thunderstorm_warning_broadcasts(mem_db): + env = _nws_env(severity_str="Severe", event="Severe Thunderstorm Warning") + data = {} + wire = handle_nws(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith("đŸŒŠī¸") + assert "Severe Thunderstorm Warning" in wire + + +def test_extreme_emergency_broadcasts(mem_db): + env = _nws_env(severity_str="Extreme", event="Tornado Warning", + category="wx.alert.tornado_warning") + data = {} + wire = handle_nws(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith("đŸŒĒī¸") + + +def test_special_weather_statement_skipped(mem_db): + env = _nws_env(severity_str="Minor", event="Special Weather Statement", + category="wx.alert.special_weather_statement") + data = {} + wire = handle_nws(env, env["subject"], data=data, now=1_000_000) + assert wire is None + n_rows = mem_db.execute("SELECT COUNT(*) AS n FROM nws_alerts").fetchone()["n"] + assert n_rows == 0 + n_log = mem_db.execute( + "SELECT COUNT(*) AS n FROM event_log WHERE source='nws' AND handled=0" + ).fetchone()["n"] + assert n_log == 1 + + +def test_watch_severity_moderate_skipped(mem_db): + env = _nws_env(severity_str="Moderate", event="Severe Thunderstorm Watch", + category="wx.alert.severe_thunderstorm_watch") + data = {} + wire = handle_nws(env, env["subject"], data=data, now=1_000_000) + assert wire is None + + +# ---- emoji map ---- + + +@pytest.mark.parametrize("event_type, expected_emoji", [ + ("Severe Thunderstorm Warning", "đŸŒŠī¸"), + ("Tornado Warning", "đŸŒĒī¸"), + ("Flash Flood Warning", "🌊"), + ("Flood Warning", "🌊"), + ("Winter Storm Warning", "â„ī¸"), + ("Blizzard Warning", "â„ī¸"), + ("Excessive Heat Warning", "đŸŒĄī¸"), + ("High Wind Warning", "đŸŒŦī¸"), + ("Red Flag Warning", "đŸ”Ĩ"), + ("Fire Weather Watch", "đŸ”Ĩ"), + ("Air Quality Alert", "😷"), + ("Freeze Warning", "đŸĨļ"), + ("Coastal Flood Warning", "🌊"), + ("(some other warning)", "âš ī¸"), +]) +def test_emoji_map(event_type, expected_emoji): + assert _emoji_for_event(event_type) == expected_emoji + + +# ---- tombstone ---- + + +def test_cancel_msgType_tombstone_skipped(mem_db): + env = _nws_env(severity_str="Severe", event="Severe Thunderstorm Warning", + msg_type="Cancel") + data = {} + wire = handle_nws(env, env["subject"], data=data, now=1_000_000) + assert wire is None + n_log = mem_db.execute( + "SELECT COUNT(*) AS n FROM event_log WHERE source='nws' AND handled=0" + ).fetchone()["n"] + assert n_log == 1 + + +def test_expire_msgType_tombstone_skipped(mem_db): + env = _nws_env(severity_str="Severe", event="Tornado Warning", + msg_type="Expire") + wire = handle_nws(env, env["subject"], data={}, now=1_000_000) + assert wire is None + + +# ---- per-CAP-id dedup ---- + + +def test_per_cap_id_dedup_no_reissue(mem_db): + env = _nws_env(severity_str="Severe") + data1 = {} + wire1 = handle_nws(env, env["subject"], data=data1, now=1_000_000) + assert wire1 is not None + _commit(data1, 1_000_001) + + # Same CAP id republishes (e.g. headline update). Should NOT re-broadcast. + data2 = {} + wire2 = handle_nws(env, env["subject"], data=data2, now=1_000_300) + assert wire2 is None + + +# ---- area_desc fallback ---- + + +def test_area_desc_used_when_geocoder_city_missing(mem_db): + env = _nws_env(severity_str="Severe", area_desc="Twin Falls County", + geocoder_city=None) + wire = handle_nws(env, env["subject"], data={}, now=1_000_000) + assert "Twin Falls" in wire + + +def test_geocoder_city_preferred_over_area_desc(mem_db): + env = _nws_env(severity_str="Severe", area_desc="Twin Falls County", + geocoder_city="Twin Falls") + wire = handle_nws(env, env["subject"], data={}, now=1_000_000) + assert "Twin Falls" in wire # either source serves the same anchor + + +# ---- commit callback ---- + + +def test_commit_callback_updates_last_broadcast(mem_db): + env = _nws_env(severity_str="Severe") + data = {} + handle_nws(env, env["subject"], data=data, now=1_000_000) + fr_pre = mem_db.execute( + "SELECT last_broadcast_at FROM nws_alerts").fetchone() + assert fr_pre["last_broadcast_at"] is None + _commit(data, 1_000_001) + fr_post = mem_db.execute( + "SELECT last_broadcast_at FROM nws_alerts").fetchone() + assert fr_post["last_broadcast_at"] == 1_000_001 + # event_log row flipped to handled=1. + el = mem_db.execute( + "SELECT handled FROM event_log WHERE source='nws' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert el["handled"] == 1 + + +def test_wire_includes_coords_and_expires(mem_db): + env = _nws_env(severity_str="Severe", lat=42.500, lon=-114.460) + wire = handle_nws(env, env["subject"], data={}, now=1_000_000) + assert "@ 42.500,-114.460" in wire + assert "until" in wire.lower() diff --git a/tests/test_quake_handler.py b/tests/test_quake_handler.py new file mode 100644 index 0000000..d90a611 --- /dev/null +++ b/tests/test_quake_handler.py @@ -0,0 +1,175 @@ +"""Tests for v0.5.10 USGS earthquakes handler.""" +import pytest + +from meshai.central.quake_handler import ( + handle_quake, + within_250mi_of_idaho, +) +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "quake-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +def _quake_env(*, event_id="uu80141266", mag=3.5, depth_km=9.0, + place="9 km SW of Stanley, Idaho", + lat=44.094, lon=-115.962, + tsunami=0, alert=None, + time_ms=1780006952030, + category="quake.event.minor"): + return { + "id": event_id, "subject": "central.quake.event.minor.unknown", + "data": { + "id": event_id, "adapter": "usgs_quake", "category": category, + "severity": 0, + "geo": {"centroid": [lon, lat], "primary_region": None}, + "data": { + "id": event_id, "magnitude": mag, "place": place, + "depth_km": depth_km, "time_ms": time_ms, + "tsunami": tsunami, "alert": alert, "status": "reviewed", + }, + }, + } + + +def _commit(data, t): + data["_on_broadcast_committed"](float(t)) + + +# ---- magnitude floor ---- + + +def test_m3_anywhere_broadcasts(mem_db): + env = _quake_env(mag=3.5, lat=37.0, lon=-122.0) # SF Bay area, outside Idaho + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "Magnitude 3.5" in wire + + +def test_m25_inside_idaho_broadcasts(mem_db): + env = _quake_env(mag=2.7, lat=44.094, lon=-115.962, event_id="uu1") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "Magnitude 2.7" in wire + + +def test_m25_outside_idaho_skipped(mem_db): + # San Francisco -- well outside 250mi of Idaho centroid. + env = _quake_env(mag=2.7, lat=37.0, lon=-122.0, event_id="uu2") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is None + + +def test_below_25_skipped(mem_db): + env = _quake_env(mag=1.01, lat=44.0, lon=-114.0, event_id="uu3") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is None + + +# ---- tsunami special ---- + + +def test_tsunami_any_magnitude_broadcasts(mem_db): + env = _quake_env(mag=4.5, lat=10.0, lon=140.0, tsunami=1, event_id="japan1") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "TSUNAMI WARNING" in wire + assert wire.startswith("🚨") + + +# ---- PAGER alert ---- + + +def test_pager_orange_broadcasts(mem_db): + env = _quake_env(mag=2.0, lat=37.0, lon=-122.0, alert="orange", + event_id="pager1") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + + +def test_pager_red_broadcasts(mem_db): + env = _quake_env(mag=2.0, lat=37.0, lon=-122.0, alert="red", + event_id="pager2") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + + +# ---- wire format ---- + + +def test_uses_usgs_place_string(mem_db): + env = _quake_env(mag=4.1, place="9 km SW of Stanley, Idaho", + event_id="usgs1") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert "9 km SW of Stanley, Idaho" in wire + + +def test_m5_uses_warning_emoji(mem_db): + env = _quake_env(mag=5.2, lat=44.0, lon=-114.0, event_id="big1") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert wire.startswith("âš ī¸") + + +def test_wire_includes_depth_and_coords(mem_db): + env = _quake_env(mag=4.1, depth_km=9.0, lat=44.094, lon=-115.962, + event_id="d1") + wire = handle_quake(env, env["subject"], data={}, now=1_000_000) + assert "9km depth" in wire + assert "@ 44.094,-115.962" in wire + + +# ---- per-event dedup ---- + + +def test_per_event_id_dedup_no_reissue(mem_db): + env = _quake_env(mag=4.0, event_id="dedup1") + data1 = {} + handle_quake(env, env["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + + # Same event_id republishes (magnitude revision). Should NOT re-broadcast. + env_rev = _quake_env(mag=4.2, event_id="dedup1") # higher mag, same id + wire2 = handle_quake(env_rev, env_rev["subject"], data={}, now=1_000_300) + assert wire2 is None + + +# ---- distance helper ---- + + +def test_within_250mi_of_idaho_boundary(): + # Boise, ID -- inside + assert within_250mi_of_idaho(43.6, -116.2) is True + # San Francisco -- outside + assert within_250mi_of_idaho(37.0, -122.0) is False + # Seattle -- inside (250mi from Idaho centroid; verify the boundary) + assert within_250mi_of_idaho(47.6, -122.3) is False + # Boundary edge case (Idaho center) + assert within_250mi_of_idaho(44.36, -114.61) is True + + +# ---- commit callback ---- + + +def test_commit_callback_updates_last_broadcast(mem_db): + env = _quake_env(mag=4.0, event_id="cb1") + data = {} + handle_quake(env, env["subject"], data=data, now=1_000_000) + pre = mem_db.execute( + "SELECT last_broadcast_at FROM quake_events WHERE event_id='cb1'" + ).fetchone() + assert pre["last_broadcast_at"] is None + _commit(data, 1_000_001) + post = mem_db.execute( + "SELECT last_broadcast_at FROM quake_events WHERE event_id='cb1'" + ).fetchone() + assert post["last_broadcast_at"] == 1_000_001 diff --git a/tests/test_swpc_handler.py b/tests/test_swpc_handler.py new file mode 100644 index 0000000..c317381 --- /dev/null +++ b/tests/test_swpc_handler.py @@ -0,0 +1,218 @@ +"""Tests for v0.5.10 SWPC space-weather handler.""" +import pytest + +from meshai.central.swpc_handler import handle_swpc +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "swpc-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +def _kindex_env(*, kp=3.0, event_id="kp_2026_06_05_15Z"): + return { + "id": event_id, "subject": "central.space.kindex", + "data": { + "id": event_id, "adapter": "swpc_kindex", + "category": "space.kindex", "severity": 0, + "geo": {}, + "data": {"id": event_id, "kp_index": kp, + "time": "2026-06-05T15:00:00Z"}, + }, + } + + +def _protons_env(*, flux=1.0, event_id="p_2026_06_05_15Z"): + return { + "id": event_id, "subject": "central.space.proton_flux", + "data": { + "id": event_id, "adapter": "swpc_protons", + "category": "space.proton_flux", "severity": 0, + "geo": {}, + "data": {"id": event_id, "p10mev": flux, + "time": "2026-06-05T15:00:00Z"}, + }, + } + + +def _alert_env(*, flare_class=None, kp=None, pfu=None, + event_id="alert_001", product_id="ALTPRO"): + d = {"id": event_id, "product_id": product_id, + "time": "2026-06-05T15:00:00Z"} + if flare_class: d["flare_class"] = flare_class + if kp: d["kp_index"] = kp + if pfu: d["p10mev"] = pfu + return { + "id": event_id, "subject": "central.space.alert.xrayflare", + "data": { + "id": event_id, "adapter": "swpc_alerts", + "category": "space.alert", "severity": 1, + "geo": {}, "data": d, + }, + } + + +def _commit(data, t): + data["_on_broadcast_committed"](float(t)) + + +# ---- geomagnetic storm ---- + + +def test_kp_below_7_skipped(mem_db): + env = _kindex_env(kp=4.0, event_id="kp_low") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is None + # Row persisted for trending, not broadcast. + row = mem_db.execute( + "SELECT last_broadcast_at FROM swpc_events WHERE event_id='kp_low'" + ).fetchone() + assert row is not None + assert row["last_broadcast_at"] is None + + +def test_kp7_g3_broadcasts(mem_db): + env = _kindex_env(kp=7.0, event_id="kp_g3") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert wire.startswith("🌌") + assert "G3" in wire + assert "Kp7" in wire + assert "geomagnetic storm" in wire.lower() + + +def test_kp9_g5_broadcasts_with_extreme_label(mem_db): + env = _kindex_env(kp=9.0, event_id="kp_g5") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "G5" in wire + assert "extreme" in wire.lower() + + +# ---- solar flares ---- + + +def test_m_class_flare_skipped(mem_db): + env = _alert_env(flare_class="M5.5", event_id="m55_flare") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is None + + +def test_x1_flare_r3_broadcasts(mem_db): + env = _alert_env(flare_class="X1.2", event_id="x1_flare") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert wire.startswith("🔆") + assert "R3" in wire + assert "X1.2" in wire + + +def test_x10_flare_r4_broadcasts(mem_db): + env = _alert_env(flare_class="X10", event_id="x10_flare") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "R4" in wire or "R5" in wire + + +def test_flare_class_in_product_id(mem_db): + """Some swpc_alerts encode the class in product_id rather than flare_class.""" + env = _alert_env(event_id="prod_id_flare", product_id="X2.1 FLARE EVENT") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "R3" in wire + + +# ---- proton events ---- + + +def test_proton_below_threshold_skipped(mem_db): + env = _protons_env(flux=0.5, event_id="p_low") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is None + row = mem_db.execute( + "SELECT last_broadcast_at FROM swpc_events WHERE event_id='p_low'" + ).fetchone() + assert row is not None + assert row["last_broadcast_at"] is None + + +def test_proton_s1_threshold_broadcasts(mem_db): + env = _protons_env(flux=15, event_id="p_s1") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert wire.startswith("â˜ĸī¸") + assert "S1" in wire + + +def test_proton_s2_broadcasts(mem_db): + env = _protons_env(flux=200, event_id="p_s2") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "S2" in wire + + +# ---- wire format ---- + + +def test_wire_has_scale_code_and_scalar_tail(mem_db): + env = _kindex_env(kp=7.0, event_id="fmt1") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + # Matt's format: "🌌 Strong geomagnetic storm (G3/Kp7) -- HF degraded, ..." + assert "(G3/Kp7)" in wire + assert "--" in wire + + +# ---- per-event dedup ---- + + +def test_per_event_dedup_no_reissue(mem_db): + env = _kindex_env(kp=7.0, event_id="dedup_kp") + data1 = {} + handle_swpc(env, env["subject"], data=data1, now=1_000_000) + _commit(data1, 1_000_001) + # Re-publish with same id and same Kp -- should not re-broadcast. + wire2 = handle_swpc(env, env["subject"], data={}, now=1_000_300) + assert wire2 is None + + +# ---- commit callback ---- + + +def test_commit_callback_updates_last_broadcast(mem_db): + env = _kindex_env(kp=7.0, event_id="cb_swpc") + data = {} + handle_swpc(env, env["subject"], data=data, now=1_000_000) + pre = mem_db.execute( + "SELECT last_broadcast_at FROM swpc_events WHERE event_id='cb_swpc'" + ).fetchone() + assert pre["last_broadcast_at"] is None + _commit(data, 1_000_001) + post = mem_db.execute( + "SELECT last_broadcast_at FROM swpc_events WHERE event_id='cb_swpc'" + ).fetchone() + assert post["last_broadcast_at"] == 1_000_001 + + +# ---- routine readings persist but never broadcast ---- + + +def test_routine_kp_reading_persists_no_broadcast(mem_db): + """Sub-G3 Kp must still be saved for trending queries.""" + env = _kindex_env(kp=4.5, event_id="routine_kp") + wire = handle_swpc(env, env["subject"], data={}, now=1_000_000) + assert wire is None + row = mem_db.execute( + "SELECT event_type, payload_json FROM swpc_events " + "WHERE event_id='routine_kp'").fetchone() + assert row is not None + assert row["event_type"] == "swpc_kindex" + assert "kp_index" in row["payload_json"]