From 5624a0bfdbd691b051c98411d126b7f91dd5dc6b Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Tue, 9 Jun 2026 05:18:29 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20wire=20avalanche=20to=20CENTRAL=5FAVY?= =?UTF-8?q?=20=E2=80=94=20central=20handler=20+=20consumer=20routing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- meshai/central/avy_handler.py | 170 ++++++++++++++++++++++++++++++++++ meshai/central/consumer.py | 13 +++ 2 files changed, 183 insertions(+) create mode 100644 meshai/central/avy_handler.py diff --git a/meshai/central/avy_handler.py b/meshai/central/avy_handler.py new file mode 100644 index 0000000..50d569a --- /dev/null +++ b/meshai/central/avy_handler.py @@ -0,0 +1,170 @@ +"""Central avalanche advisory handler (avalanche_org adapter). + +Subscribes to CENTRAL_AVY stream via consumer.py routing. +Adapter: avalanche_org +Subjects: central.avy.advisory.> (active + tombstones in one consumer) + +Wire format: multi-line, _meshai_precomposed=True (bypasses composer +whitespace-collapse). Same pattern as nws_handler / quake_handler. + +Severity gate: uses danger_level (0-5) from data.data directly. +Do NOT use centralseverity as a gate — Central's scale is higher=more +severe (4=Extreme, 3=High, 2=Considerable), which is the inverse of +meshai's broadcast priority convention. Gate on danger_level only. + +Off-season note: CENTRAL_AVY is empty June–September. Handler will +receive no envelopes during off-season — this is correct and expected. +The consumer sits idle; no action needed. + +Tombstones (central.avy.advisory.removed.*): handler returns None +(no broadcast). The env_store's native-path change-detection handles +zone retraction on the native path; on the central path, tombstones +are consumed and acked silently so they don't pile up in the stream. +Future: retraction broadcast ("AVY advisory lifted") could be added here. +""" + +import logging +import time +from typing import Any, Optional + +from meshai.adapter_config import adapter_config +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: + return None + if isinstance(sev, str): + return sev or None + try: + return str(int(sev)) + except (TypeError, ValueError): + return str(sev) + + +def _now() -> int: + return int(time.time()) + + +def handle_avy(envelope: dict, subject: str, + data: Optional[dict] = None) -> Optional[str]: + """Handle a single CENTRAL_AVY envelope. + + Returns the wire string when a broadcast should fire, None otherwise. + """ + if not isinstance(envelope, dict): + return None + + inner = envelope.get("data") or {} + if (inner.get("adapter") or "") != "avalanche_org": + return None + + category = inner.get("category") or "" + + # Tombstone — consume silently, no broadcast. + if "removed" in category: + logger.debug("avy_handler: tombstone for %s — acking silently", category) + return None + + d = inner.get("data") or {} + severity_word = _coerce_severity(inner.get("severity")) + + # Danger level gate — read from data.data, NOT centralseverity. + danger_level = d.get("danger_level") + if not isinstance(danger_level, (int, float)): + return None + + min_level = int(adapter_config.avalanche.min_danger_level) + if danger_level < min_level: + return None + + # Field extraction. + zone_name = d.get("zone_name") or "Unknown Zone" + danger_name = d.get("danger_name") or str(danger_level) + center_id = d.get("center_id") or "" + travel = (d.get("travel_advice") or "").strip() + lat = d.get("latitude") + lon = d.get("longitude") + + # Category → broadcast category for event_log. + category_raw = category + + # Persist to event_log (store-only, no change-detection needed — + # Central deduplicates upstream; we log every envelope we receive). + conn = get_db() + if conn is None: + logger.warning("avy_handler: persistence unavailable, skipping") + return None + + log_id = _log_event_returning_id( + conn, now=_now(), source="avalanche_org", + category=category_raw, severity_word=severity_word, + event_id_external=f"{center_id}:{zone_name}", + subject=subject, handled=0, + table_name="event_log", table_pk=None, + ) + + # Render multi-line wire string. + wire = _render( + danger_level=int(danger_level), + danger_name=danger_name, + zone_name=zone_name, + center_id=center_id, + travel=travel, + ) + + _attach_commit(data, log_id=log_id) + return wire + + +def _render(*, danger_level: int, danger_name: str, zone_name: str, + center_id: str, travel: str) -> str: + emoji = "\u26f7" + # Warning for High/Extreme (4-5), Watch for Considerable (3). + prefix = "WARNING:" if danger_level >= 4 else "Watch:" + + line1 = f"{emoji} AVY {prefix} {zone_name} \u2014 {danger_name} ({danger_level})" + line2 = travel[:120] if travel else None + line3 = f"{center_id} \u00b7 valid today" if center_id else "valid today" + + return "\n".join(l for l in [line1, line2, line3] if l) + + +def _attach_commit(data: Optional[dict], *, log_id: Optional[int]) -> None: + if not isinstance(data, dict): + return + + def _on_commit(committed_at: float) -> None: + try: + conn = get_db() + except Exception: + logger.exception("avy commit: persistence unavailable") + return + if log_id is not None: + conn.execute( + "UPDATE event_log SET handled=1 WHERE id=?", + (int(log_id),), + ) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = { + "table": "event_log", + "pk": log_id, + } + + +def _log_event_returning_id( + conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk, +) -> Optional[int]: + cursor = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, + subject, int(bool(handled)), table_name, table_pk), + ) + return cursor.lastrowid diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 42ef4de..82799a3 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -49,6 +49,7 @@ _SUBJECTS_BARE: dict[str, list[str]] = { "swpc": ["central.space.>"], "traffic": ["central.traffic.>"], "roads511": ["central.traffic.>"], # shared with traffic; sub-adapter routing + "avalanche": ["central.avy.advisory.>"], } # Backwards-compat: keep ADAPTER_SUBJECTS importable for legacy readers/tests. @@ -173,6 +174,14 @@ def _subjects_for(adapter: str, region: Optional[str]) -> list[str]: # shared bare-state subject scoped to both source names. "roads511": [f"central.traffic.*.{state}", f"central.traffic.*.{region}"], + # Avalanche (avalanche_org): Central publishes on CENTRAL_AVY stream. + # Active advisories: central.avy.advisory.us. + # Tombstones (v0.10.11+): central.avy.advisory.removed.us. + # Wide filter covers both in one consumer. Client-side: gate on + # danger_level from data.data, not centralseverity (higher=more severe + # on Central's scale, inverse of what the handler uses). + # Off-season: June–Sep, CENTRAL_AVY will be empty — expected, not broken. + "avalanche": [f"central.avy.advisory.>"], } return list(table.get(adapter, [])) @@ -196,6 +205,7 @@ CENTRAL_ADAPTER_TO_SOURCE: dict[str, str] = { # ALERT_CATEGORIES roads-family rules cover both 511 feeds. A future # v0.6 may split them; for now collapsed for UX simplicity. "itd_511": "roads511", + "avalanche_org": "avalanche", "firms": "firms", } @@ -515,6 +525,9 @@ class CentralConsumer: elif inner.get("adapter") == "nwis": from meshai.central.nwis_handler import handle_nwis synthesized = handle_nwis(envelope, subject, data=data) or None + elif inner.get("adapter") == "avalanche_org": + from meshai.central.avy_handler import handle_avy + synthesized = handle_avy(envelope, subject, data=data) or None # v0.6-1 firms_handler -- STORAGE-ONLY. handle_firms # writes to firms_pixels (with dedup) and returns None # so the default-deny clause below keeps mesh