diff --git a/.gitignore b/.gitignore index 7638790..93face6 100644 --- a/.gitignore +++ b/.gitignore @@ -71,3 +71,6 @@ Thumbs.db !.env.example local.yaml !local.yaml.example +data/*.sqlite +data/*.sqlite-wal +data/*.sqlite-shm diff --git a/dashboard-frontend/src/pages/Notifications.tsx b/dashboard-frontend/src/pages/Notifications.tsx index 6edf862..97b4b03 100644 --- a/dashboard-frontend/src/pages/Notifications.tsx +++ b/dashboard-frontend/src/pages/Notifications.tsx @@ -64,6 +64,7 @@ interface NotificationsConfig { quiet_hours_enabled: boolean quiet_hours_start: string quiet_hours_end: string + cold_start_grace_seconds?: number rules: NotificationRuleConfig[] toggles?: Record } @@ -2118,6 +2119,22 @@ export default function Notifications() { )} + {/* Cold-start grace -- v0.5.8b */} +
+
+ +
+ setConfig({ ...config, cold_start_grace_seconds: v })} + min={0} + max={600} + helper="Suppress broadcasts for this many seconds after the first event arrives" + info="When meshai starts seeing events for the first time, suppress mesh broadcasts for this many seconds to absorb any JetStream backlog. Persistence rows still get written; only broadcasts are suppressed." + /> +
+ {/* Master Toggles */} {config.toggles && ( INSERT, prefix="New", return wire + (ii) row exists, last_broadcast_at IS NULL + -> UPDATE current_*, prefix="New", + return wire (never broadcast yet) + (iii) row exists, last_broadcast_at NOT NULL + -> UPDATE current_*, gate on change + + 8h cooldown. If pass: prefix="Update", + return wire; else return None. + +The last_broadcast_* UPDATE has moved OUT of the handler and INTO a callback +attached to event.data["_on_broadcast_committed"]. The dispatcher calls it +ONLY after a successful broadcast. The mesh_broadcasts_out audit row is now +inserted by the dispatcher (via event.data["_broadcast_audit"]) for the same +reason -- it should only exist for actually-delivered broadcasts. + +Concurrency: each consumer thread gets its own SQLite connection via +meshai.persistence.get_db() (threading.local pool). Writes are serial +inside that connection's autocommit mode. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, Optional + +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + +# Broadcast cooldown per fire (8h). Without this, every poll cycle would +# re-broadcast even when nothing changed. Spec: change-detection on acres +# OR containment AND >=28800s since last broadcast. +WFIGS_BROADCAST_COOLDOWN_S = 8 * 60 * 60 # 28800 + + +def _now() -> int: + return int(time.time()) + + +# ---------- public entry -------------------------------------------------- + + +def handle_wfigs(normalized: dict, envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + """Route a normalized WFIGS dict through persistence + change-detection. + + `data` is the mutable dict the caller (consumer._normalize) is composing + into the Event. When a broadcast should fire, the handler attaches an + `_on_broadcast_committed` callback and `_broadcast_audit` descriptor to + it; the dispatcher invokes both AFTER a successful deliver(). + + Returns a wire string when a broadcast should fire, None otherwise. + """ + if not isinstance(normalized, dict): + return None + kind = normalized.get("_kind") + if kind not in ("wfigs_incident", "wfigs_tombstone", "wfigs_perimeter"): + return None + + now = now if now is not None else _now() + inner = envelope.get("data") or {} if isinstance(envelope, dict) else {} + category = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + irwin_id = normalized.get("irwin_id") + + try: + conn = get_db() + except Exception: + logger.exception("wfigs_handler: persistence unavailable; " + "deferring to default pipeline") + return None + + if kind in ("wfigs_tombstone", "wfigs_perimeter"): + source = "wfigs_incidents" if kind == "wfigs_tombstone" else "wfigs_perimeters" + _log_event(conn, now=now, source=source, category=category, + severity_word=severity_word, irwin_id=irwin_id, + subject=subject, handled=0, + table_name=None, table_pk=irwin_id) + return None + + # ---- active incident ---- + # v0.5.8b: log handled=0 initially. The commit callback UPDATEs this + # row to handled=1 if/when the dispatcher actually broadcasts -- if it + # drops (cold-start grace, staleness, cooldown, dedup), the row stays + # handled=0 and we can grep the event_log to find the suppressed events. + log_id = _log_event_returning_id( + conn, now=now, source="wfigs_incidents", category=category, + severity_word=severity_word, irwin_id=irwin_id, + subject=subject, handled=0, + table_name="fires", table_pk=irwin_id) + + row = conn.execute( + "SELECT current_acres, current_contained_pct, last_broadcast_at, " + "last_broadcast_acres, last_broadcast_contained " + "FROM fires WHERE irwin_id = ?", (irwin_id,)).fetchone() + + acres = normalized.get("acres") + contained_pct = normalized.get("contained_pct") + + # ---- (i) row missing -- INSERT, mark "New", but DO NOT set last_broadcast_* + if row is None: + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, incident_type, " + "current_acres, current_contained_pct, status, lat, lon, " + "county, state, landclass, declared_at, last_event_at, " + "last_broadcast_at, last_broadcast_acres, last_broadcast_contained) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + ( + irwin_id, + normalized.get("incident_name"), + normalized.get("incident_type"), + acres, contained_pct, + None, # status reserved + normalized.get("lat"), normalized.get("lon"), + normalized.get("county"), normalized.get("state"), + normalized.get("landclass"), + normalized.get("declared_at_epoch"), + now, # last_event_at + None, None, None, # last_broadcast_* explicitly NULL + ), + ) + wire = _render(normalized, prefix="New") + _attach_commit_handles(data, irwin_id=irwin_id, + acres=acres, contained_pct=contained_pct, + event_log_row_id=log_id) + return wire + + # ---- (ii) row exists but never broadcast -- UPDATE current_*, prefix="New" + if row["last_broadcast_at"] is None: + conn.execute( + "UPDATE fires SET current_acres=?, current_contained_pct=?, " + "lat=COALESCE(?, lat), lon=COALESCE(?, lon), last_event_at=? " + "WHERE irwin_id=?", + (acres, contained_pct, normalized.get("lat"), + normalized.get("lon"), now, irwin_id), + ) + wire = _render(normalized, prefix="New") + _attach_commit_handles(data, irwin_id=irwin_id, + acres=acres, contained_pct=contained_pct, + event_log_row_id=log_id) + return wire + + # ---- (iii) row exists AND already broadcast -- gate on change + 8h cooldown + conn.execute( + "UPDATE fires SET current_acres=?, current_contained_pct=?, " + "lat=COALESCE(?, lat), lon=COALESCE(?, lon), last_event_at=? " + "WHERE irwin_id=?", + (acres, contained_pct, normalized.get("lat"), + normalized.get("lon"), now, irwin_id), + ) + + last_bcast_at = row["last_broadcast_at"] + last_bcast_acres = row["last_broadcast_acres"] + last_bcast_contained = row["last_broadcast_contained"] + + # Forward-only change detection: more acres or higher containment counts. + # Downward revisions and unchanged values do not warrant re-broadcast. + changed_acres = ( + acres is not None + and (last_bcast_acres is None or acres > last_bcast_acres) + ) + changed_contained = ( + contained_pct is not None + and (last_bcast_contained is None or contained_pct > last_bcast_contained) + ) + eight_hours_passed = ( + last_bcast_at is None + or (now - int(last_bcast_at) >= WFIGS_BROADCAST_COOLDOWN_S) + ) + + if (changed_acres or changed_contained) and eight_hours_passed: + wire = _render(normalized, prefix="Update") + _attach_commit_handles(data, irwin_id=irwin_id, + acres=acres, contained_pct=contained_pct, + event_log_row_id=log_id) + return wire + + return None + + +# ---------- commit-callback factory --------------------------------------- + + +def _attach_commit_handles(data: Optional[dict], *, irwin_id: str, + acres: Optional[float], + contained_pct: Optional[int], + event_log_row_id: Optional[int] = None) -> None: + """Attach `_on_broadcast_committed` callback + `_broadcast_audit` + descriptor to the event-data dict. Both are read by the dispatcher + AFTER a successful broadcast. + + The callback closure captures the irwin_id + acres + contained_pct that + triggered THIS broadcast. The dispatcher passes the actual delivery + timestamp, which we record in last_broadcast_at. This keeps cold-start + races correct: if the dispatcher drops the broadcast, the callback is + not invoked and last_broadcast_at stays NULL -- so the NEXT successful + broadcast still labels itself "New:". + """ + if not isinstance(data, dict): + return + + def _on_commit(committed_at: float) -> None: + try: + conn = get_db() + except Exception: + logger.exception( + "wfigs commit callback: persistence unavailable; " + "last_broadcast_* not updated for irwin=%s", irwin_id) + return + conn.execute( + "UPDATE fires SET last_broadcast_at=?, last_broadcast_acres=?, " + "last_broadcast_contained=? WHERE irwin_id=?", + (int(committed_at), acres, contained_pct, irwin_id), + ) + # Flip the matching event_log row to handled=1. A NULL row id + # (caller forgot to thread it) is silently skipped -- the broadcast + # still went out. + if event_log_row_id is not None: + conn.execute( + "UPDATE event_log SET handled=1 WHERE id=?", + (int(event_log_row_id),), + ) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = {"table": "fires", "pk": irwin_id} + + +# ---------- helpers ------------------------------------------------------- + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _log_event(conn, *, now, source, category, severity_word, irwin_id, + subject, handled, table_name, table_pk) -> None: + """Insert an event_log row; void return (used for tombstones/perimeters + where the handled flag is fixed at write-time).""" + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, irwin_id, subject, + int(bool(handled)), table_name, table_pk), + ) + + +def _log_event_returning_id(conn, *, now, source, category, severity_word, + irwin_id, subject, handled, table_name, + table_pk) -> int: + """Insert an event_log row and return its primary key id. + + Used for active-incident logging where the commit callback updates + the same row to handled=1 once a broadcast actually goes out. + """ + cur = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, irwin_id, subject, + int(bool(handled)), table_name, table_pk), + ) + return int(cur.lastrowid) + + +# ---------- renderer ------------------------------------------------------ + + +def _render(n: dict, *, prefix: str = "") -> str: + """MEDIUM-style mesh wire string. See spec in module docstring.""" + name = n.get("incident_name") or "(unnamed)" + itype = n.get("incident_type") or "incident" + lat = n.get("lat") + lon = n.get("lon") + + anchor = _location_anchor(n) + acres = n.get("acres") + contained = n.get("contained_pct") + + acres_str = "N/A" if acres is None else f"{int(acres):,} ac" + contained_str = ( + "containment unknown" if contained is None + else f"{int(contained)}% contained" + ) + + prefix_str = f"{prefix}: " if prefix else "" + head = f"🔥 {prefix_str}{name} ({itype}), {anchor}" + body = f"{acres_str}, {contained_str}" + + coords = "" + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + coords = f", @ {lat:.3f},{lon:.3f}" + + return f"{head}: {body}{coords}" + + +def _location_anchor(n: dict) -> str: + """Anchor priority: geocoder.city > nearest_town > landclass > county.""" + city = n.get("geocoder_city") + if city: + return str(city) + + lat = n.get("lat") + lon = n.get("lon") + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + try: + from meshai.central_normalizer import nearest_town + nt = nearest_town(lat, lon, max_distance_mi=100.0) + except Exception: + logger.exception("nearest_town failed; falling through") + nt = None + if nt and nt.get("name"): + town = nt["name"] + d = nt.get("distance_mi") + bearing = nt.get("bearing") + if isinstance(d, (int, float)): + if d < 1: + return f"near {town}" + return f"{int(round(d))} mi {bearing or ''} of {town}".strip() + return f"near {town}" + + landclass = n.get("landclass") + if landclass: + return str(landclass) + + county = n.get("county") + state = n.get("state") + if county and state: + return f"{county} Co {state}" + if state: + return str(state) + return "(location unknown)" diff --git a/meshai/central_normalizer.py b/meshai/central_normalizer.py index a8fdd5b..287c44d 100644 --- a/meshai/central_normalizer.py +++ b/meshai/central_normalizer.py @@ -458,6 +458,288 @@ def _parse_state_511_atis(inner_data: dict, geo: dict) -> dict: } +# ---------- wzdx federal vocabulary maps ---------------------------------- + +# FHWA WZDx v4 + custom-feed vocabulary observed in the wild. Unknown values +# fall through to lowercased + hyphens→spaces (see _norm_wzdx_sub_type). +_WZDX_WORK_TYPE_MAP: dict[str, Optional[str]] = { + # WZDx v4 spec types_of_work.type_name enum: + "maintenance": "maintenance", + "minor-road-defect-repair": "minor repair", + "roadside-work": "roadside work", + "overhead-work": "overhead work", + "below-road-work": "subsurface work", + "barrier-work": "barrier work", + "surface-work": "surface work", + "painting": "painting", + "roadway-relocation": "roadway relocation", + "roadway-creation": "new construction", + # Common informal values seen in upstream feeds (ID, WA): + "road-work": "road work", + "paving": "paving", + "bridge-construction": "bridge construction", + "bridge-maintenance": "bridge maintenance", + "utility-work": "utility work", + "road-construction": "road construction", + "construction": "construction", + "emergency-repairs": "emergency repairs", + # event_type values (drop the too-generic ones): + "work-zone": None, + "detour": "detour", +} + + +# vehicle_impact taxonomy (WZDx v4). Maps to mesh-friendly phrase. +# Returns None for values the renderer should drop entirely. +_WZDX_IMPACT_MAP: dict[str, Optional[str]] = { + "all-lanes-closed": "all lanes closed", + "some-lanes-closed": "lanes reduced", + "alternating-one-way": "one-way alternating", + "unknown": None, + "all-lanes-open": None, # informational only; nothing to do +} + + +def _norm_wzdx_sub_type(raw) -> Optional[str]: + if not raw: return None + s = str(raw).strip().lower() + if not s: return None + if s in _WZDX_WORK_TYPE_MAP: + return _WZDX_WORK_TYPE_MAP[s] + # Unknown value — keep lowercased, hyphens → spaces, single-line. + return re.sub(r"\s+", " ", s.replace("-", " ")).strip() or None + + +# ---------- per-adapter parser: wzdx federal ------------------------------ + +def _parse_wzdx_federal(inner_data: dict, geo: dict) -> dict: + """Normalize a wzdx-adapter envelope (FHWA WZDx federal spec). + + Central flattens the upstream payload in practice (the FHWA-spec + `core_details.*` nesting is not preserved), but we defensively check + nested keys too so any future Central change doesn't silently regress. + + sub_type uses types_of_work[0].type_name when present, else event_type, + each normalized via _WZDX_WORK_TYPE_MAP. impact_phrase is folded INTO + the sub_type slot for the renderer (so the description-slot reads e.g. + 'lanes reduced, paving' or 'one-way alternating' or 'road work'). + 'all lanes closed' is set on impact='full_closure' so the renderer's + existing full-closure promotion handles it -- avoids double-printing. + """ + cd = inner_data.get("core_details") + if not isinstance(cd, dict): cd = {} + def field(key): + v = cd.get(key) + if v is None or (isinstance(v, str) and not v.strip()): + v = inner_data.get(key) + return v + + # --- road (raw, verbatim per Matt's spec) ----------------------------- + road_names = field("road_names") + road = None + if isinstance(road_names, list) and road_names: + road = str(road_names[0]).strip() or None + elif isinstance(road_names, str) and road_names.strip(): + road = road_names.strip() + if _is_uninformative_road(road): + road = None + + # --- direction -------------------------------------------------------- + direction = _norm_direction(field("direction")) + + # --- sub_type (types_of_work[0] | event_type) ------------------------- + work_type: Optional[str] = None + tow = field("types_of_work") + if isinstance(tow, list) and tow: + first = tow[0] + if isinstance(first, dict): + work_type = _norm_wzdx_sub_type(first.get("type_name")) + elif isinstance(first, str): + work_type = _norm_wzdx_sub_type(first) + if not work_type: + work_type = _norm_wzdx_sub_type(field("event_type")) + + # --- vehicle_impact --------------------------------------------------- + vi_raw = (inner_data.get("vehicle_impact") or cd.get("vehicle_impact") or "") + impact_phrase: Optional[str] = _WZDX_IMPACT_MAP.get(str(vi_raw).strip().lower()) + is_full_closure = (str(vi_raw).strip().lower() == "all-lanes-closed") + + # Fold impact_phrase + work_type into the renderer's sub_type slot. + # For full-closure, exclude impact_phrase here -- the renderer prepends + # "all lanes closed" itself via the impact='full_closure' branch. + parts: list[str] = [] + if impact_phrase and not is_full_closure: + parts.append(impact_phrase) + if work_type: + parts.append(work_type) + sub_type = ", ".join(parts) if parts else None + impact = "full_closure" if is_full_closure else "partial" + + # --- ends_at: structured end_date ISO-8601 --------------------------- + ends_at: Optional[datetime] = None + end_date = inner_data.get("end_date") or cd.get("end_date") + if end_date: + try: + s = str(end_date).replace("Z", "+00:00") + ends_at = datetime.fromisoformat(s) + # Strip tzinfo so _format_end_short compares naive-to-naive. + if ends_at.tzinfo is not None: + ends_at = ends_at.astimezone().replace(tzinfo=None) + except Exception: + ends_at = None + + # --- mile_start/_end: regex on description, fall back to structured -- + desc = _clean_description(field("description")) + mile_start, mile_end = _parse_mile_posts(desc or "") + if mile_start is None: + ms = inner_data.get("road_mile_post_start") + if ms is not None: + try: mile_start = int(ms) + except (TypeError, ValueError): pass + if mile_end is None: + me = inner_data.get("road_mile_post_end") + if me is not None: + try: mile_end = int(me) + except (TypeError, ValueError): pass + + # --- coordinates ----------------------------------------------------- + event_lat = inner_data.get("latitude") + event_lon = inner_data.get("longitude") + if event_lat is None and geo.get("centroid"): + try: event_lon, event_lat = geo["centroid"][0], geo["centroid"][1] + except (IndexError, TypeError): pass + + # --- town fallback chain (same as state_511_atis) -------------------- + enriched = (inner_data.get("_enriched") or {}).get("geocoder") or {} + town = (enriched.get("city") or "").strip() or None + distance_mi: Optional[int] = None + bearing: Optional[str] = None + if town: + distance_mi, bearing = _compute_distance_bearing(event_lat, event_lon, town) + elif event_lat is not None: + nt = nearest_town(event_lat, event_lon) + if nt: + town = nt.get("name") + distance_mi = nt.get("distance_mi") + bearing = nt.get("bearing") + + return { + "source": "wzdx", + "road": road, + "direction": direction, + "mile_start": mile_start, + "mile_end": mile_end, + "description": desc, + "sub_type": sub_type, + "impact": impact, + "ends_at": ends_at, + "town": town, + "distance_mi": distance_mi, + "bearing": bearing, + } + + + +# ---------- WFIGS incidents (wildfire+prescribed) ------------------------- + +# IncidentName values like "IA 1", "IA 27" are auto-numbered Initial-Attack +# placeholders that WFIGS issues before a fire gets a proper name. We pass +# them through verbatim per Matt's call -- they at least signal "new fire +# in " even without an interesting name. +_WFIGS_ACRES_KEYS = ("DailyAcres", "IncidentSize") +_WFIGS_ACRES_RAW_KEYS = ("DiscoveryAcres", "FinalAcres") +_WFIGS_CONTAINED_KEYS = ("PercentContained",) +_WFIGS_CONTAINED_RAW_KEYS = ("PercentContained",) + + +def _first_non_null(d: dict, keys) -> Any: + """Return d[k] for the first k in keys with a non-null value, else None.""" + for k in keys: + v = d.get(k) + if v is not None and v != "": + return v + return None + + +def _parse_wfigs_acres(inner_data: dict) -> Optional[float]: + """Acres fallback chain: top-level DailyAcres/IncidentSize -> raw.* -> None.""" + val = _first_non_null(inner_data, _WFIGS_ACRES_KEYS) + if val is None: + raw = inner_data.get("raw") or {} + if isinstance(raw, dict): + val = _first_non_null(raw, _WFIGS_ACRES_RAW_KEYS) + if val is None: + return None + try: return float(val) + except (TypeError, ValueError): return None + + +def _parse_wfigs_contained(inner_data: dict) -> Optional[int]: + """Containment fallback chain: top-level PercentContained -> raw.* -> None.""" + val = _first_non_null(inner_data, _WFIGS_CONTAINED_KEYS) + if val is None: + raw = inner_data.get("raw") or {} + if isinstance(raw, dict): + val = _first_non_null(raw, _WFIGS_CONTAINED_RAW_KEYS) + if val is None: + return None + try: return int(round(float(val))) + except (TypeError, ValueError): return None + + +def _parse_wfigs_incidents(inner_data: dict, geo: dict) -> dict: + """Normalize a WFIGS-incidents payload into a flat render-ready dict. + + Field shapes per Central v0.10.0 guide (see /OneDrive/.../wfigs-investigation.md): + Top-level (incident): IrwinID, IncidentName, IncidentTypeCategory, + latitude, longitude, FireDiscoveryDateTime (epoch-ms), POOState, + POOCounty, DailyAcres, IncidentSize, PercentContained. + Nested raw dict (97-key): DiscoveryAcres, FinalAcres, PercentContained + (often the place where real values live in early season when the + top-level fields haven't populated yet). + _enriched.geocoder.landclass: optional ("Sawtooth National Forest", etc). + + Returns the normalized dict. Caller layers on "_kind": "wfigs_incident". + """ + geocoder = geo.get("geocoder") or {} + irwin_id = inner_data.get("IrwinID") or inner_data.get("irwin_id") + name = inner_data.get("IncidentName") + itype = inner_data.get("IncidentTypeCategory") + lat = inner_data.get("latitude") + lon = inner_data.get("longitude") + county = inner_data.get("POOCounty") + state = inner_data.get("POOState") + landclass = geocoder.get("landclass") + + # FireDiscoveryDateTime is epoch-ms in WFIGS; convert to epoch-s. + declared_at_epoch = None + fdt = inner_data.get("FireDiscoveryDateTime") + if isinstance(fdt, (int, float)): + # Heuristic: anything >1e12 is ms (post-2001 in ms is ~1.4e12). + declared_at_epoch = int(fdt / 1000) if fdt > 1e12 else int(fdt) + + acres = _parse_wfigs_acres(inner_data) + contained_pct = _parse_wfigs_contained(inner_data) + + # Geocoder-side anchor enrichment for the renderer. + city = geocoder.get("city") + + return { + "irwin_id": irwin_id, + "incident_name": name, + "incident_type": itype, + "acres": acres, + "contained_pct": contained_pct, + "lat": lat, + "lon": lon, + "county": county, + "state": state, + "landclass": landclass, + "geocoder_city": city, + "declared_at_epoch": declared_at_epoch, + } + + # ---------- public entry point -------------------------------------------- def normalize(envelope: dict) -> Optional[dict]: @@ -474,6 +756,34 @@ def normalize(envelope: dict) -> Optional[dict]: if adapter == "state_511_atis": return _parse_state_511_atis(inner_data, geo) + if adapter == "wzdx": + return _parse_wzdx_federal(inner_data, geo) + + # v0.5.8 WFIGS dispatch -- incidents + tombstones + perimeters. + # The handler downstream uses _kind to route to change-detection + # (active incidents) or to event_log-only logging (tombstones, + # perimeters). Tombstones carry only irwin_id + state + county; + # perimeters share the IrwinID with their parent incident. + category_raw = inner.get("category") or "" + if adapter == "wfigs_incidents": + if category_raw.startswith("fire.incident.removed"): + return { + "_kind": "wfigs_tombstone", + "irwin_id": inner_data.get("irwin_id") or inner_data.get("IrwinID"), + "state": inner_data.get("state") or inner_data.get("POOState"), + "county": inner_data.get("county") or inner_data.get("POOCounty"), + } + if category_raw.startswith("fire.incident"): + n = _parse_wfigs_incidents(inner_data, geo) + n["_kind"] = "wfigs_incident" + return n + if adapter == "wfigs_perimeters": + return { + "_kind": "wfigs_perimeter", + "irwin_id": inner_data.get("irwin_id") or inner_data.get("IrwinID"), + "state": inner_data.get("state") or inner_data.get("POOState"), + "county": inner_data.get("county") or inner_data.get("POOCounty"), + } # Other adapters await per-adapter parsers; return None to defer. return None diff --git a/meshai/config.py b/meshai/config.py index d9db27e..7abbd70 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -607,6 +607,13 @@ class NotificationsConfig: quiet_hours_enabled: bool = True # Master toggle for quiet hours quiet_hours_start: str = "22:00" quiet_hours_end: str = "06:00" + # v0.5.8b cold-start grace: after the first event the dispatcher sees, + # suppress mesh broadcasts for N seconds to absorb any JetStream + # backlog. Persistence rows still get written -- only broadcasts are + # suppressed. Anchor is "first-event-seen" (not container-boot) so + # meshai can sit idle for hours with master OFF and the grace only + # kicks in when adapters actually start producing. + cold_start_grace_seconds: int = 60 toggles: dict = field(default_factory=_default_toggles) # family -> NotificationToggle digest: DigestConfig = field(default_factory=DigestConfig) rules: list = field(default_factory=list) # List of NotificationRuleConfig diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index bc0a793..dfda419 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -51,6 +51,11 @@ class Dispatcher: self._stale_dropped = 0 self._cooldown_dropped = 0 self._dedup_dropped = 0 + # v0.5.8b cold-start grace: anchor lazily on FIRST event the + # dispatcher sees through an enabled toggle. Grace window read + # from config so it can be tuned at runtime via /api/config PUT. + self._first_event_at: Optional[float] = None + self._cold_start_dropped = 0 # (toggle.name, category, region) -> last-fire wall-clock seconds self._toggle_cooldown: dict[tuple[str, str, str], float] = {} # Insertion-ordered (source, event.id) -> sentinel; evict oldest at cap. @@ -115,6 +120,31 @@ class Dispatcher: if tog is None or not getattr(tog, "enabled", False): return + # ---------- Section 0 — cold-start grace (v0.5.8b) ---------- + # First event ever to reach an enabled toggle anchors the grace + # window. Any broadcast attempt inside the window is dropped, but + # the event still flowed through the consumer -> handler chain + # before us, so persistence rows have already been written. Only + # the broadcast is suppressed. + grace_s = int(getattr(self._config.notifications, "cold_start_grace_seconds", 60) or 0) + if grace_s > 0: + now_anchor = time.time() + if self._first_event_at is None: + self._first_event_at = now_anchor + self._logger.info( + "cold-start grace anchor set: t0=%.3f window=%ds", + now_anchor, grace_s, + ) + if (now_anchor - self._first_event_at) < grace_s: + self._cold_start_dropped += 1 + self._logger.info( + "cold-start grace: dropping broadcast source=%s category=%s " + "elapsed=%.1fs window=%ds", + event.source, event.category, + now_anchor - self._first_event_at, grace_s, + ) + return + # ---------- Section 1 — staleness filter ---------- # `event.timestamp` is the upstream-published wall-clock the adapter # sets when minting the event. For Central-sourced events that's the @@ -200,6 +230,11 @@ class Dispatcher: success = await channel.deliver(payload, rule) if success: self._logger.info(f"Dispatched event {event.id} via toggle {fam}/{ch_type}") + # v0.5.8b post-broadcast commit. Persistence-side + # bookkeeping that should only happen when a delivery + # actually went out: mesh_broadcasts_out audit row + + # handler-supplied last_broadcast_* UPDATE callback. + self._post_broadcast_commit(event, payload, rule, ch_type) else: self._logger.warning(f"Toggle channel delivery returned False for {fam}/{ch_type}") except Exception: @@ -211,10 +246,66 @@ class Dispatcher: "stale_dropped": self._stale_dropped, "cooldown_dropped": self._cooldown_dropped, "dedup_dropped": self._dedup_dropped, + "cold_start_dropped": self._cold_start_dropped, + "cold_start_anchor_at": self._first_event_at, "cooldown_keys": len(self._toggle_cooldown), "dedup_lru_size": len(self._dedup_lru), } + def _post_broadcast_commit(self, event, payload, rule, ch_type: str) -> None: + """Persistence side-effects of an actually-successful broadcast. + + Inserts the mesh_broadcasts_out audit row when the handler signalled + it wants one via `event.data["_broadcast_audit"]`, then invokes the + handler-supplied `_on_broadcast_committed` callback so the handler + can refresh its own last_broadcast_* bookkeeping. Both calls are + wrapped: a bookkeeping failure must NOT undo the actual broadcast + nor break dispatch for sibling toggles. + """ + data = getattr(event, "data", None) or {} + if not data: + return + committed_at = time.time() + + audit = data.get("_broadcast_audit") + if isinstance(audit, dict): + try: + from meshai.persistence import get_db + conn = get_db() + text = payload.message if payload is not None else (event.title or "") + bytes_sent = len(text.encode("utf-8")) if text else 0 + if ch_type == "mesh_dm": + node_ids = list(getattr(rule, "node_ids", []) or []) + recipient = ",".join(map(str, node_ids)) or "dm" + else: + recipient = "broadcast" + channel = getattr(rule, "broadcast_channel", None) + conn.execute( + "INSERT INTO mesh_broadcasts_out(sent_at, recipient, channel, " + "text, source_event_table, source_event_pk, bytes_sent, " + "ack_received) VALUES (?,?,?,?,?,?,?,?)", + ( + int(committed_at), recipient, channel, text, + audit.get("table"), audit.get("pk"), + bytes_sent, 0, + ), + ) + except Exception: + self._logger.exception( + "post-broadcast: mesh_broadcasts_out insert failed " + "(table=%s pk=%s)", + audit.get("table"), audit.get("pk"), + ) + + cb = data.get("_on_broadcast_committed") + if callable(cb): + try: + cb(committed_at) + except Exception: + self._logger.exception( + "post-broadcast: handler commit-callback raised" + ) + def _toggle_to_rule(self, tog, ch_type: str, event: Event): from meshai.config import NotificationRuleConfig return NotificationRuleConfig( diff --git a/meshai/persistence/__init__.py b/meshai/persistence/__init__.py new file mode 100644 index 0000000..0e444ae --- /dev/null +++ b/meshai/persistence/__init__.py @@ -0,0 +1,31 @@ +"""meshai persistence layer. + +SQLite-backed adapter event store + mesh telemetry. Single-writer pattern +with WAL mode; threading.local connection pool. v1 schema covers 13 data +tables + event_log + schema_meta. + +Public API: + from meshai.persistence import get_db, init_db, MESHAI_DB_PATH + +The db.py module handles connection pooling and runs pending migrations +on first init_db() call. Adapter handlers are responsible for inserting +into the right table (none wired yet -- foundation only). +""" + +from meshai.persistence.db import ( + get_db, + init_db, + close_thread_connection, + MESHAI_DB_PATH, + DEFAULT_DB_PATH, + SCHEMA_VERSION, +) + +__all__ = [ + "get_db", + "init_db", + "close_thread_connection", + "MESHAI_DB_PATH", + "DEFAULT_DB_PATH", + "SCHEMA_VERSION", +] diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py new file mode 100644 index 0000000..23b86d6 --- /dev/null +++ b/meshai/persistence/db.py @@ -0,0 +1,194 @@ +"""SQLite persistence connection management + migration runner. + +Single-writer SQLite pattern with WAL journal mode for reader concurrency. +One connection per thread (threading.local) -- callers should not share +connections across threads. + +Path resolution: + 1. MESHAI_DB_PATH env var (explicit override) + 2. DEFAULT_DB_PATH = /data/meshai.sqlite (prod container mount) + +Special value ":memory:" or any path containing "memory" routes to an +in-memory SQLite for tests. + +Migrations live in meshai/persistence/migrations/v*.sql. The runner +applies them in version order, recording the applied version in +schema_meta. Idempotent re-run is a no-op. +""" + +from __future__ import annotations + +import logging +import os +import sqlite3 +import threading +from pathlib import Path +from typing import Iterable, Optional + +logger = logging.getLogger(__name__) + + +DEFAULT_DB_PATH = "/data/meshai.sqlite" +MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" +SCHEMA_VERSION = 1 +SCHEMA_META_TABLE = "schema_meta" +MIGRATIONS_DIR = Path(__file__).parent / "migrations" + +# Per-thread connection pool. Each thread that calls get_db() gets its +# own sqlite3.Connection cached on threading.local. Tests can clear +# via close_thread_connection() between cases. +_local = threading.local() +# Module-level lock guards init_db() so concurrent first-callers don't +# race on migration application. +_init_lock = threading.Lock() +# Cache of initialised database paths in this process so init_db() is +# idempotent without re-reading migration files on every call. +_initialised: set[str] = set() + + +def MESHAI_DB_PATH() -> str: + """Resolve the active SQLite path (env var override or default).""" + return os.environ.get(MESHAI_DB_PATH_ENV) or DEFAULT_DB_PATH + + +def _is_memory_path(path: str) -> bool: + return path == ":memory:" or "mode=memory" in path or path.startswith("file::memory:") + + +def _connect(path: str) -> sqlite3.Connection: + """Open a SQLite connection with sane defaults for this project.""" + if _is_memory_path(path): + # For in-memory tests, use a shared cache so multiple connections + # in the same thread can see the same DB. Tests that want isolation + # call close_thread_connection() between cases. + uri = "file::memory:?cache=shared" + conn = sqlite3.connect(uri, uri=True, isolation_level=None, + check_same_thread=False) + else: + # Ensure parent dir exists for file-backed DBs. + Path(path).parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(path, isolation_level=None, + check_same_thread=False, timeout=30.0) + conn.row_factory = sqlite3.Row + # Enable foreign keys (off by default in SQLite); WAL mode for reader + # concurrency; reasonable busy timeout for the single-writer pattern. + conn.execute("PRAGMA foreign_keys = ON") + if not _is_memory_path(path): + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA synchronous = NORMAL") + conn.execute("PRAGMA busy_timeout = 30000") + return conn + + +def get_db(path: Optional[str] = None) -> sqlite3.Connection: + """Return a SQLite connection for the current thread (cached). + + First call initialises the database (runs pending migrations). + Subsequent calls in the same thread return the cached connection. + """ + target = path or MESHAI_DB_PATH() + cached = getattr(_local, "conn", None) + cached_path = getattr(_local, "path", None) + if cached is not None and cached_path == target: + return cached + # Different path requested or no cached conn -- (re)open. + if cached is not None: + try: cached.close() + except Exception: pass + conn = _connect(target) + _local.conn = conn + _local.path = target + if target not in _initialised: + with _init_lock: + if target not in _initialised: + _apply_migrations(conn) + _initialised.add(target) + return conn + + +def close_thread_connection() -> None: + """Close + drop the cached connection for the current thread. + + Tests call this between cases to ensure a clean slate. The shared-cache + in-memory database is reset on the LAST close in the process. + """ + conn = getattr(_local, "conn", None) + if conn is not None: + try: conn.close() + except Exception: pass + if hasattr(_local, "conn"): del _local.conn + if hasattr(_local, "path"): del _local.path + + +def init_db(path: Optional[str] = None) -> sqlite3.Connection: + """Explicit init entry point (idempotent). Equivalent to get_db() + semantically but documents intent at startup. Returns the connection.""" + return get_db(path) + + +def _read_migration_files() -> list[tuple[int, str, str]]: + """Return [(version_int, filename, sql_text), ...] sorted ascending.""" + if not MIGRATIONS_DIR.is_dir(): + return [] + out: list[tuple[int, str, str]] = [] + for p in sorted(MIGRATIONS_DIR.iterdir()): + if not p.is_file() or p.suffix.lower() != ".sql": + continue + # Filename format: v.sql or v_