diff --git a/meshai/adapter_config/defaults.py b/meshai/adapter_config/defaults.py index 005806a..010daca 100644 --- a/meshai/adapter_config/defaults.py +++ b/meshai/adapter_config/defaults.py @@ -277,8 +277,21 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = { }, # ================================================================= - # FIRMS -- 4 settings (confidence floor + FRP floor + spatial bbox + - # dedup quantization distance in METERS) + # FIRES -- 1 setting (default attribution radius for FIRMS -> fire matching) + # ================================================================= + # Per-fire override lives in the fires.spread_radius_mi column; this + # is the global default used when that column is NULL. v0.7-fire-1 + # ships with 5 mi based on the design doc's open question #1 + # ("Spread radius default. Start with 5 mi per fire?"). Tune from + # operations once we have a week of observed attribution rates. + ("fires", "spread_radius_mi_default"): { + "default": 5.0, + "type": "float", + "description": "Default attribution radius for FIRMS hotspot -> fire matching, miles. Per-fire override in fires.spread_radius_mi.", + }, + + # ================================================================= + # FIRMS -- 7 settings (storage floors + dedup + 3 v0.7 cluster knobs) # ================================================================= ("firms", "confidence_floor"): { "default": "low", # firms_handler.py FIRMS_CONFIDENCE_FLOOR @@ -307,6 +320,29 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = { "description": "Distance in meters within which two FIRMS pixel observations from the same satellite + acquisition time are considered duplicates.", }, + # ---- v0.7-fire-tracker-1 unattributed-cluster knobs ---- + # On every FIRMS pixel that fails attribution to any known fire, the + # handler asks: "are there enough other unattributed pixels nearby + # right now to suggest a new ignition?" The three knobs below define + # "enough", "nearby", and "right now". Defaults match design doc + # open question #6 ("3 pixels within 1 mi") -- tune from ops once we + # have false-positive data. + ("firms", "cluster_min_pixels"): { + "default": 3, + "type": "int", + "description": "Minimum unattributed pixels within cluster_max_radius_mi over cluster_time_window_minutes to fire an unattributed_hotspot_cluster broadcast.", + }, + ("firms", "cluster_max_radius_mi"): { + "default": 1.0, + "type": "float", + "description": "Spatial radius (miles) defining a candidate hotspot cluster.", + }, + ("firms", "cluster_time_window_minutes"): { + "default": 60, + "type": "int", + "description": "Temporal window (minutes); unattributed pixels older than this don't count toward a new cluster.", + }, + # ================================================================= # PIPELINE (Inhibitor + Grouper) -- 2 settings # ================================================================= @@ -426,6 +462,15 @@ ADAPTER_META: dict[str, dict[str, Any]] = { "reminder_enabled": True, "description": "NIFC-authoritative wildfire registry (named incidents, acres, containment).", }, + # v0.7-fire-tracker-1: "fires" is not a feed; it's the registry table + # populated by WFIGS first-sight + FIRMS attribution. Surfacing it as + # an adapter_meta family lets the GUI show "spread radius default" + # alongside the per-feed knobs. + "fires": { + "display_name": "Fire registry", + "include_in_llm_context": True, + "description": "Cross-feed fire registry: WFIGS declares them; FIRMS pixels grow them. spread_radius_mi_default tunes the attribution gate.", + }, "firms": { "display_name": "FIRMS satellite hotspots", "include_in_llm_context": True, diff --git a/meshai/central/firms_handler.py b/meshai/central/firms_handler.py index 9a7fbdd..f4255f7 100644 --- a/meshai/central/firms_handler.py +++ b/meshai/central/firms_handler.py @@ -1,4 +1,4 @@ -"""v0.6-1 FIRMS hotspot handler -- STORAGE ONLY (no mesh broadcasts). +"""v0.7-fire-tracker-1 FIRMS handler -- storage + attribution + cluster broadcast. Pre-v0.6-1 the v0.5.13 default-deny gate at consumer._normalize() silently dropped every `central.fire.hotspot.>` envelope because no per-adapter handler @@ -63,6 +63,7 @@ from __future__ import annotations from meshai.adapter_config import adapter_config import logging +import math import time from datetime import datetime, timezone from typing import Any, Optional @@ -239,8 +240,20 @@ def handle_firms(envelope: dict, subject: str, table_name="firms_pixels" if stored else None, table_pk=(str(cur.lastrowid) if stored else None)) - # STORAGE-ONLY: never broadcast. - return None + # ---- v0.7-fire-tracker-1: attribution + cluster ------------------- + # Dedup hits skip attribution -- the original insert already had its + # chance. Only newly-stored pixels run through here. + if not stored: + return None + + return _attribute_or_cluster( + conn, + pixel_row_id=int(cur.lastrowid), + lat=lat, lon=lon, + acq_epoch=acq_epoch, + frp=frp, satellite=satellite, + data=data, now=now, + ) # ============================================================================ @@ -319,3 +332,225 @@ def _log_event(conn, *, now, source, category, severity_word, (now, source, category, severity_word, event_id_external, subject, int(bool(handled)), table_name, table_pk), ) + + + +# ============================================================================ +# v0.7-fire-tracker-1: attribution + unattributed-cluster detection +# ============================================================================ +# +# Attribution: a FIRMS pixel is matched to a fire when it lies within the +# fire's spread radius (per-fire override or global default). The match is +# nearest-centroid on ties. Successfully attributed pixels append to +# fire_pixels and update fires.last_hotspot_at + fires.current_centroid_* +# (median of last 24h of pixels for that fire). +# +# Cluster detection: if NO fire matches, the pixel is "unattributed". We +# query firms_pixels for OTHER recent unattributed pixels within a small +# radius (cluster_max_radius_mi over cluster_time_window_minutes). If the +# count (including this pixel) reaches cluster_min_pixels AND none of +# them has cluster_broadcast_at set, fire a single broadcast and stamp +# cluster_broadcast_at on every member. A subsequent pixel arriving in +# the same cluster will find the stamp and stay silent. + + +def _attribute_or_cluster(conn, *, pixel_row_id, lat, lon, acq_epoch, + frp, satellite, data, now): + """Try attribution; on miss, run cluster check. Returns wire str | None.""" + global_default_mi = float(adapter_config.fires.spread_radius_mi_default) + # Conservative bbox prefilter: take the larger of the global default + # and 10 mi so a per-fire override beyond the default doesn't get + # quietly excluded. Real Haversine done after. + search_mi = max(global_default_mi, 10.0) + deg_lat = search_mi / 69.0 + cos_lat = max(0.01, math.cos(math.radians(lat))) + deg_lon = search_mi / (69.0 * cos_lat) + + candidates = conn.execute( + "SELECT irwin_id, lat AS anchor_lat, lon AS anchor_lon, " + "current_centroid_lat, current_centroid_lon, spread_radius_mi " + "FROM fires WHERE tombstoned_at IS NULL AND (" + "COALESCE(current_centroid_lat, lat) BETWEEN ? AND ?) AND (" + "COALESCE(current_centroid_lon, lon) BETWEEN ? AND ?)", + (lat - deg_lat, lat + deg_lat, lon - deg_lon, lon + deg_lon), + ).fetchall() + + attributed: list[tuple[str, float]] = [] + for row in candidates: + fire_lat = (row["current_centroid_lat"] + if row["current_centroid_lat"] is not None + else row["anchor_lat"]) + fire_lon = (row["current_centroid_lon"] + if row["current_centroid_lon"] is not None + else row["anchor_lon"]) + if fire_lat is None or fire_lon is None: + continue + r_mi = (row["spread_radius_mi"] + if row["spread_radius_mi"] is not None + else global_default_mi) + d_mi = _haversine_mi(lat, lon, fire_lat, fire_lon) + if d_mi <= r_mi: + attributed.append((row["irwin_id"], d_mi)) + + if attributed: + # 2+ matches resolve to nearest centroid per design doc Q2. + attributed.sort(key=lambda t: t[1]) + chosen_irwin = attributed[0][0] + conn.execute( + "INSERT INTO fire_pixels(irwin_id, acq_time, lat, lon, frp, " + "satellite, pass_id, attributed_at) VALUES (?,?,?,?,?,?,?,?)", + (chosen_irwin, float(acq_epoch), lat, lon, frp, satellite, + _pass_id(satellite, acq_epoch), float(now)), + ) + conn.execute( + "UPDATE firms_pixels SET attributed_at=? WHERE id=?", + (float(now), pixel_row_id), + ) + _recompute_centroid_and_stamp( + conn, chosen_irwin, acq_epoch=acq_epoch, + ) + # Attribution is a silent operation -- the wire goes out on the + # NEXT WFIGS Update (which Phase 2 will gate on centroid drift). + return None + + # 0 matches -- run cluster detection. + return _maybe_emit_cluster( + conn, lat=lat, lon=lon, acq_epoch=acq_epoch, frp=frp, + data=data, now=now, this_pixel_id=pixel_row_id, + ) + + +def _recompute_centroid_and_stamp(conn, irwin_id: str, *, + acq_epoch) -> None: + """fires.current_centroid_* = median of last 24h of fire_pixels for + this fire. last_hotspot_at = this pixel's acq_time (max of last 24h + is the same thing on insert). Median over mean per design doc Q4 + ("Median (more robust to outliers)").""" + window_start = float(acq_epoch) - 86400.0 + pixels = conn.execute( + "SELECT lat, lon FROM fire_pixels WHERE irwin_id=? " + "AND acq_time >= ?", + (irwin_id, window_start), + ).fetchall() + if not pixels: + return + lats = sorted(r["lat"] for r in pixels) + lons = sorted(r["lon"] for r in pixels) + median_lat = lats[len(lats) // 2] + median_lon = lons[len(lons) // 2] + conn.execute( + "UPDATE fires SET current_centroid_lat=?, current_centroid_lon=?, " + "last_hotspot_at=? WHERE irwin_id=?", + (float(median_lat), float(median_lon), float(acq_epoch), irwin_id), + ) + + +def _maybe_emit_cluster(conn, *, lat, lon, acq_epoch, frp, data, now, + this_pixel_id): + """Return wire string + set data["category"] when a cluster condition + fires; otherwise return None and leave data alone.""" + min_pixels = int(adapter_config.firms.cluster_min_pixels) + radius_mi = float(adapter_config.firms.cluster_max_radius_mi) + window_s = int(adapter_config.firms.cluster_time_window_minutes) * 60 + + window_start = float(acq_epoch) - window_s + # Bbox prefilter again, this time on radius_mi (much tighter than the + # 10 mi attribution search). + deg_lat = radius_mi / 69.0 + cos_lat = max(0.01, math.cos(math.radians(lat))) + deg_lon = radius_mi / (69.0 * cos_lat) + rows = conn.execute( + "SELECT id, lat, lon, frp FROM firms_pixels WHERE " + "attributed_at IS NULL AND cluster_broadcast_at IS NULL " + "AND acq_time >= ? " + "AND lat BETWEEN ? AND ? AND lon BETWEEN ? AND ?", + (window_start, lat - deg_lat, lat + deg_lat, + lon - deg_lon, lon + deg_lon), + ).fetchall() + + # Filter to exact Haversine radius. The query above is a bbox; the + # corners are slightly farther than radius_mi from the center. + members: list[dict] = [] + total_frp = 0.0 + sum_lat = 0.0 + sum_lon = 0.0 + for r in rows: + d_mi = _haversine_mi(lat, lon, r["lat"], r["lon"]) + if d_mi > radius_mi: + continue + members.append({"id": r["id"], "lat": r["lat"], "lon": r["lon"]}) + sum_lat += r["lat"]; sum_lon += r["lon"] + if r["frp"] is not None: + total_frp += float(r["frp"]) + + if len(members) < min_pixels: + return None + + centroid_lat = sum_lat / len(members) + centroid_lon = sum_lon / len(members) + + # Stamp cluster_broadcast_at on every member so a future pixel + # arriving in the same cluster does not re-fire the broadcast. + member_ids = [int(m["id"]) for m in members] + placeholders = ",".join("?" * len(member_ids)) + conn.execute( + f"UPDATE firms_pixels SET cluster_broadcast_at=? " + f"WHERE id IN ({placeholders})", + (float(now), *member_ids), + ) + + # Override the FIRMS source category so the dispatcher routes this + # broadcast under unattributed_hotspot_cluster (priority, fire toggle). + if isinstance(data, dict): + data["category"] = "unattributed_hotspot_cluster" + # Set severity to priority so downstream rules see the right tier. + data["severity"] = "priority" + + return _render_cluster_wire( + n=len(members), radius_mi=radius_mi, + centroid_lat=centroid_lat, centroid_lon=centroid_lon, + total_frp=total_frp, + ) + + +def _render_cluster_wire(*, n, radius_mi, centroid_lat, centroid_lon, + total_frp): + """Wire string per design doc section 4 + user item 6.""" + # Drop the decimal on radius when it's an integer mile for terse output. + radius_str = (f"{int(radius_mi)}" if float(radius_mi).is_integer() + else f"{radius_mi:.1f}") + frp_str = "" + if total_frp > 0: + frp_str = f" (combined {int(round(total_frp))} MW)" + return ( + f"🔥 Possible new fire: {n} hotspots within {radius_str} mi " + f"@ {centroid_lat:.3f},{centroid_lon:.3f}{frp_str}" + ) + + +def _haversine_mi(lat1: float, lon1: float, + lat2: float, lon2: float) -> float: + """Great-circle distance in statute miles.""" + R_MI = 3958.7613 + p1 = math.radians(lat1); p2 = math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlam = math.radians(lon2 - lon1) + a = (math.sin(dphi / 2.0) ** 2 + + math.cos(p1) * math.cos(p2) * math.sin(dlam / 2.0) ** 2) + return 2.0 * R_MI * math.asin(min(1.0, math.sqrt(a))) + + +def _pass_id(satellite, acq_epoch) -> str: + """Coarse satellite-pass bucket: -. + + VIIRS makes ~4 passes/day in Idaho (one every ~6h), so a 90-minute + bucket groups pixels from the same overpass without straddling + boundaries. Phase 2's per-pass centroid logic will use this column. + """ + if not satellite: + satellite = "?" + try: + bucket = int(acq_epoch) // 5400 + except (TypeError, ValueError): + bucket = 0 + return f"{satellite}-{bucket}" diff --git a/meshai/central/wfigs_handler.py b/meshai/central/wfigs_handler.py index 17b3321..1b99b9f 100644 --- a/meshai/central/wfigs_handler.py +++ b/meshai/central/wfigs_handler.py @@ -145,6 +145,11 @@ def handle_wfigs(normalized: dict, envelope: dict, subject: str, ), ) wire = _render(normalized, prefix="New") + # v0.7-fire-tracker-1: tag first-sight broadcasts with the new + # wildfire_declared category so the dispatcher rules them apart + # from acres/containment updates (wildfire_incident). + if isinstance(data, dict): + data["category"] = "wildfire_declared" _attach_commit_handles(data, irwin_id=irwin_id, acres=acres, contained_pct=contained_pct, event_log_row_id=log_id) @@ -160,6 +165,11 @@ def handle_wfigs(normalized: dict, envelope: dict, subject: str, normalized.get("lon"), now, irwin_id), ) wire = _render(normalized, prefix="New") + # v0.7-fire-tracker-1: case-(ii) is also first-sight as far as + # broadcast history goes -- the row exists because some prior + # handler call ran but no actual broadcast went out. + if isinstance(data, dict): + data["category"] = "wildfire_declared" _attach_commit_handles(data, irwin_id=irwin_id, acres=acres, contained_pct=contained_pct, event_log_row_id=log_id) diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index c9684c6..0b66981 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -314,6 +314,32 @@ ALERT_CATEGORIES = { "example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.", "toggle": "fire", }, + # v0.7-fire-tracker-1: WFIGS first-sight of a declared incident. The + # WFIGS handler tags the data dict with this category on cases (i) + # and (ii) -- INSERT or row-exists-but-never-broadcast. Subsequent + # Update broadcasts (case iii) keep the existing wildfire_incident + # category so the dispatcher can rule them separately. + "wildfire_declared": { + "name": "Wildfire Declared (WFIGS first sight)", + "description": "WFIGS published a new IRWIN incident -- the first time meshai has seen this declared fire. Subsequent acreage/containment updates fall under wildfire_incident.", + "default_severity": "priority", + "example_message": "🔥 New: Cache Peak Fire (WF), 3 mi N of Almo: 250 ac, 0% contained, @ 42.118,-113.643", + "toggle": "fire", + }, + # v0.7-fire-tracker-1: 3+ FIRMS pixels cluster within 1 mi over a 60 + # min window with no WFIGS-declared fire to attribute to. This is the + # "possible new fire" early-warning category -- FIRMS sees the heat + # well before NIFC publishes the incident. Knobs in adapter_config: + # firms.cluster_min_pixels (3) + # firms.cluster_max_radius_mi (1.0) + # firms.cluster_time_window_minutes (60) + "unattributed_hotspot_cluster": { + "name": "Unattributed Hotspot Cluster (possible new fire)", + "description": "3+ FIRMS satellite hotspots clustered within a small radius with no matching WFIGS incident. Possible new ignition; the cluster broadcast fires once per cluster (member pixels are marked covered so a 4th arrival does not re-trigger).", + "default_severity": "priority", + "example_message": "🔥 Possible new fire: 3 hotspots within 1 mi @ 42.93,-114.45 (combined 78 MW)", + "toggle": "fire", + }, "wildfire_hotspot": { "name": "Wildfire Hotspot", "description": "Satellite thermal-anomaly detection (NASA FIRMS VIIRS/MODIS pixel) — not necessarily a new ignition", diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index fbe5414..a945f3e 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 12 +SCHEMA_VERSION = 13 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v13.sql b/meshai/persistence/migrations/v13.sql new file mode 100644 index 0000000..3602cad --- /dev/null +++ b/meshai/persistence/migrations/v13.sql @@ -0,0 +1,86 @@ +-- v0.7-fire-tracker-1 -- registry correlation tables + per-fire spread radius. +-- +-- Phase 1 of the FIRMS+WFIGS fusion design (v0.6-design-fire-tracker.md): +-- we make every incoming FIRMS pixel either attributable to a known fire +-- or a candidate for an early-warning "possible new fire" broadcast. This +-- migration adds the persistent state the FIRMS handler needs to do that; +-- the handler-side wiring lives in central/firms_handler.py. + +-- ---- fire_pixels -------------------------------------------------------- +-- Per-(fire, pixel) attribution history. Distinct from firms_pixels (which +-- is the raw inbound-event store -- one row per VIIRS detection regardless +-- of whether we can link it to a fire). fire_pixels exists so the centroid +-- query is a clean indexed (irwin_id, acq_time) range scan instead of a +-- join through firms_pixels. Phase 2's movement analysis will read this +-- table directly. + +CREATE TABLE IF NOT EXISTS fire_pixels ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + irwin_id TEXT NOT NULL REFERENCES fires(irwin_id) ON DELETE CASCADE, + acq_time REAL NOT NULL, + lat REAL NOT NULL, + lon REAL NOT NULL, + frp REAL, + satellite TEXT, + pass_id TEXT, + attributed_at REAL NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_fire_pixels_irwin_acq ON fire_pixels(irwin_id, acq_time); +CREATE INDEX IF NOT EXISTS idx_fire_pixels_acq ON fire_pixels(acq_time); + + +-- ---- fires: per-fire override + running centroid + last hotspot -------- +-- spread_radius_mi: +-- Per-fire attribution radius (statute miles). NULL means +-- "use the global default from adapter_config.fires.spread_radius_mi_default". +-- Per-fire overrides are intended for outliers (mega-fires with active +-- spotting, or contained fires where we want to tighten attribution to +-- avoid false-positive growth signals). +-- +-- current_centroid_lat / current_centroid_lon: +-- The FIRMS-attributed running centroid (median of last 24h of +-- fire_pixels). Distinct from fires.lat/fires.lon (the WFIGS-declared +-- anchor) because a fire can drift miles from its initial declared +-- point as it grows; the centroid is where the fire actually IS +-- right now per satellite passes, while the anchor stays put for +-- reference. NULL until the first FIRMS attribution. +-- +-- last_hotspot_at: +-- UNIX-epoch (REAL) timestamp of the most recent FIRMS pixel +-- attributed to this fire. Used in Phase 1 by the centroid query +-- (filter to last 24h) and by Phase 2's halt detector (no new +-- pixels for >= 2 satellite passes => fire considered halted). + +ALTER TABLE fires ADD COLUMN spread_radius_mi REAL; +ALTER TABLE fires ADD COLUMN current_centroid_lat REAL; +ALTER TABLE fires ADD COLUMN current_centroid_lon REAL; +ALTER TABLE fires ADD COLUMN last_hotspot_at REAL; + + +-- ---- firms_pixels: cluster-tracking columns ---------------------------- +-- attributed_at: +-- UNIX-epoch when this pixel was attributed to a fire via a +-- fire_pixels insertion. NULL means the pixel did NOT match any +-- known fire's spread radius -- it's a candidate for cluster +-- detection (unattributed_hotspot_cluster category). +-- +-- cluster_broadcast_at: +-- UNIX-epoch when this pixel was covered by a fired +-- unattributed_hotspot_cluster broadcast. Stamped on every member +-- pixel of a broadcast so a subsequent pixel arriving in the same +-- cluster does not re-fire the broadcast for the same hotspots. + +ALTER TABLE firms_pixels ADD COLUMN attributed_at REAL; +ALTER TABLE firms_pixels ADD COLUMN cluster_broadcast_at REAL; + +-- Compound index supporting the cluster query: +-- SELECT * FROM firms_pixels +-- WHERE attributed_at IS NULL AND cluster_broadcast_at IS NULL +-- AND acq_time > ? +-- The leading two columns are NULL-discriminator selective; acq_time +-- handles the time-window prune. (lat/lon prefilter is done by the +-- handler in Python after a bounding-box scan -- SQLite has no native +-- Haversine.) +CREATE INDEX IF NOT EXISTS idx_firms_pixels_unattributed + ON firms_pixels(attributed_at, cluster_broadcast_at, acq_time); diff --git a/tests/test_fire_tracker_phase1.py b/tests/test_fire_tracker_phase1.py new file mode 100644 index 0000000..58df590 --- /dev/null +++ b/tests/test_fire_tracker_phase1.py @@ -0,0 +1,424 @@ +"""v0.7-fire-tracker-1 tests. + +Coverage map (vs the user-provided scope item 7): + - Pixel within radius -> attribution + centroid + last_hotspot_at + - Pixel outside radius -> no attribution, stays unattributed + - 3 unattributed within 1 mi -> cluster broadcast fires once + - 4th pixel same cluster -> NO second broadcast + - 5th pixel after 60 min -> can form a NEW cluster + +Plus: + - wfigs first-sight tags data["category"]="wildfire_declared" + - wfigs Update path does NOT tag wildfire_declared + - Haversine sanity +""" +from __future__ import annotations + +import os +import sqlite3 +import time +import uuid + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_db(tmp_path, monkeypatch): + """Force MESHAI_DB_PATH to a tmp file per test + reset thread cache.""" + db_path = str(tmp_path / f"meshai-{uuid.uuid4().hex}.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + # The persistence module caches a connection on threading.local; + # reset between tests so we get a fresh DB each time. + from meshai.persistence import db as pdb + pdb.close_thread_connection() + pdb._initialised.discard(db_path) + from meshai.persistence import init_db + init_db(db_path) + yield db_path + pdb.close_thread_connection() + pdb._initialised.discard(db_path) + + +def _seed_fire(*, irwin_id, lat, lon, name="Stub Fire", state="ID"): + """Insert a minimal active fire row at known coords.""" + from meshai.persistence import get_db + conn = get_db() + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, lat, lon, " + "last_event_at) VALUES (?,?,?,?,?)", + (irwin_id, name, lat, lon, int(time.time())), + ) + + +def _envelope(*, lat, lon, acq_date="2026-06-06", acq_time="1200", + frp=15.0, satellite="N20", conf="high"): + """Build a Central FIRMS envelope shaped like the real Central feed.""" + return { + "data": { + "adapter": "firms", + "category": "wildfire_hotspot", + "severity": "routine", + "data": { + "latitude": lat, + "longitude": lon, + "frp": frp, + "bright_ti4": 320.5, + "satellite": satellite, + "instrument": "VIIRS", + "confidence": conf, + "acq_date": acq_date, + "acq_time": acq_time, + "daynight": "D", + "version": "2.0NRT", + }, + } + } + + +# --------------------------------------------------------------------------- +# (a) Attribution within radius. +# --------------------------------------------------------------------------- + + +def test_pixel_within_radius_attributes_to_fire(): + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + # Cache Peak Fire stub @ 42.118, -113.643. + _seed_fire(irwin_id="ID-TEST-001", + lat=42.118, lon=-113.643) + + # FIRMS pixel ~0.2 mi NE of the anchor -- well inside default 5 mi. + env = _envelope(lat=42.121, lon=-113.640, frp=18.0) + wire = handle_firms(env, subject="central.fire.hotspot.N20.high.us.id", + data={}, now=1780728000) + # Attribution is silent (return None); the wire only fires on cluster. + assert wire is None + + conn = get_db() + # fire_pixels row created. + rows = conn.execute("SELECT * FROM fire_pixels").fetchall() + assert len(rows) == 1 + assert rows[0]["irwin_id"] == "ID-TEST-001" + assert rows[0]["frp"] == 18.0 + # firms_pixels row has attributed_at stamped. + raw = conn.execute( + "SELECT attributed_at, cluster_broadcast_at FROM firms_pixels" + ).fetchone() + assert raw["attributed_at"] is not None + assert raw["cluster_broadcast_at"] is None + # fires row has centroid + last_hotspot_at updated. + fire = conn.execute( + "SELECT current_centroid_lat, current_centroid_lon, last_hotspot_at " + "FROM fires WHERE irwin_id=?", ("ID-TEST-001",) + ).fetchone() + assert fire["current_centroid_lat"] == 42.121 + assert fire["current_centroid_lon"] == -113.640 + assert fire["last_hotspot_at"] is not None + + +def test_centroid_recomputes_as_median_across_passes(): + """A second attributed pixel updates the centroid to the median, not + just the latest pixel's coords.""" + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + _seed_fire(irwin_id="ID-TEST-002", + lat=42.000, lon=-113.000) + + # 3 pixels within radius at distinct coords. + coords = [(42.001, -113.001), (42.002, -113.002), (42.003, -113.003)] + for i, (la, lo) in enumerate(coords): + env = _envelope(lat=la, lon=lo, + acq_date="2026-06-06", + acq_time=f"{12 + i:02d}00") + handle_firms(env, + subject="central.fire.hotspot.N20.high.us.id", + data={}, now=1780728000 + i * 3600) + + fire = get_db().execute( + "SELECT current_centroid_lat, current_centroid_lon " + "FROM fires WHERE irwin_id=?", ("ID-TEST-002",) + ).fetchone() + # Median of 3 sorted lats = middle = 42.002. Same for lons. + assert abs(fire["current_centroid_lat"] - 42.002) < 1e-9 + assert abs(fire["current_centroid_lon"] - -113.002) < 1e-9 + + +# --------------------------------------------------------------------------- +# (b) Pixel outside radius -- no attribution. +# --------------------------------------------------------------------------- + + +def test_pixel_outside_radius_stays_unattributed(): + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + _seed_fire(irwin_id="ID-TEST-003", + lat=42.000, lon=-113.000) + + # Pixel ~50 mi away -- comfortably outside the 5 mi default. + env = _envelope(lat=42.700, lon=-113.000, frp=10.0) + wire = handle_firms(env, subject="central.fire.hotspot.N20.high.us.id", + data={}, now=1780728000) + # No attribution AND below the 3-pixel cluster threshold -> no wire. + assert wire is None + + conn = get_db() + # No fire_pixels row. + assert conn.execute("SELECT COUNT(*) FROM fire_pixels").fetchone()[0] == 0 + # firms_pixels has the row but attributed_at IS NULL. + raw = conn.execute( + "SELECT attributed_at FROM firms_pixels" + ).fetchone() + assert raw["attributed_at"] is None + # fires row centroid unchanged. + fire = conn.execute( + "SELECT current_centroid_lat FROM fires WHERE irwin_id=?", + ("ID-TEST-003",) + ).fetchone() + assert fire["current_centroid_lat"] is None + + +# --------------------------------------------------------------------------- +# (c) Cluster broadcast fires once on the 3rd pixel. +# --------------------------------------------------------------------------- + + +def _hhmm(h, m=0): + return f"{int(h):02d}{int(m):02d}" + + +def test_three_unattributed_pixels_fire_cluster_once(): + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + # No fires seeded -- everything is unattributed. + # Three pixels within ~0.3 mi over 30 minutes. + base_lat, base_lon = 43.500, -114.500 + pixels = [ + (base_lat, base_lon, "1200", 25.0), + (base_lat + 0.001, base_lon + 0.001, "1210", 32.0), + (base_lat - 0.001, base_lon - 0.002, "1220", 21.0), + ] + wires: list[str | None] = [] + for la, lo, t, frp in pixels: + env = _envelope(lat=la, lon=lo, acq_time=t, frp=frp) + data = {} + wires.append(handle_firms( + env, subject="central.fire.hotspot.N20.high.unknown", + data=data, now=1780728000, + )) + if wires[-1] is not None: + # The handler must have tagged data with the cluster category. + assert data.get("category") == "unattributed_hotspot_cluster" + assert data.get("severity") == "priority" + + # Exactly one of the three returned a wire. + fired = [w for w in wires if w is not None] + assert len(fired) == 1, f"expected 1 cluster wire, got {len(fired)}: {wires}" + # Wire content. + w = fired[0] + assert w.startswith("🔥 Possible new fire: 3 hotspots within 1 mi @ ") + # The combined-FRP suffix lists the rounded sum. + assert "(combined 78 MW)" in w + + # All three pixels have cluster_broadcast_at set. + conn = get_db() + stamped = conn.execute( + "SELECT COUNT(*) FROM firms_pixels WHERE cluster_broadcast_at IS NOT NULL" + ).fetchone()[0] + assert stamped == 3 + + +def test_fourth_pixel_in_same_cluster_does_not_refire(): + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + base_lat, base_lon = 43.500, -114.500 + # Seed 3 pixels -> cluster fires once. + for i, (la, lo, t) in enumerate([ + (base_lat, base_lon, "1200"), + (base_lat + 0.001, base_lon + 0.001, "1210"), + (base_lat - 0.001, base_lon - 0.002, "1220"), + ]): + env = _envelope(lat=la, lon=lo, acq_time=t) + handle_firms(env, + subject="central.fire.hotspot.N20.high.unknown", + data={}, now=1780728000 + i) + + # A 4th pixel inside the same cluster footprint. + env = _envelope(lat=base_lat + 0.0005, lon=base_lon - 0.0005, + acq_time="1230") + data4 = {} + wire = handle_firms(env, + subject="central.fire.hotspot.N20.high.unknown", + data=data4, now=1780728100) + # The existing 3 members already have cluster_broadcast_at stamped, + # so they don't count toward the new cluster query (the SQL filter + # is `cluster_broadcast_at IS NULL`). The 4th pixel alone fails + # the min_pixels threshold -- no wire. + assert wire is None + assert "category" not in data4 + + # The 4th pixel itself does NOT get cluster_broadcast_at (since no + # new cluster fired for it). + conn = get_db() + rows = conn.execute( + "SELECT COUNT(*) FROM firms_pixels WHERE cluster_broadcast_at IS NULL" + ).fetchone()[0] + assert rows == 1, "the 4th pixel should remain un-broadcast" + + +def test_fifth_pixel_after_time_window_can_form_new_cluster(): + """The cluster query filters on acq_time > NOW - cluster_time_window. + A pixel arriving 60+ minutes after the prior cluster's members has + no nearby unstamped pixels to count, so it stays silent -- but if + we then ingest TWO more nearby pixels (also outside the original + window), we should fire a NEW cluster.""" + from meshai.central.firms_handler import handle_firms + + base_lat, base_lon = 43.500, -114.500 + # First cluster at 12:00..12:20 -> fires + stamps all 3. + for i, (la, lo, t) in enumerate([ + (base_lat, base_lon, "1200"), + (base_lat + 0.001, base_lon + 0.001, "1210"), + (base_lat - 0.001, base_lon - 0.002, "1220"), + ]): + env = _envelope(lat=la, lon=lo, acq_time=t) + handle_firms(env, + subject="central.fire.hotspot.N20.high.unknown", + data={}, now=1780728000 + i) + + # Three NEW pixels at 14:00..14:20 -- well past the 60 min window + # from the first cluster (which ended at 12:20 acq time). + base2_lat, base2_lon = 43.510, -114.510 + wires2: list[str | None] = [] + for i, (la, lo, t) in enumerate([ + (base2_lat, base2_lon, "1400"), + (base2_lat + 0.001, base2_lon + 0.001, "1410"), + (base2_lat - 0.001, base2_lon - 0.002, "1420"), + ]): + env = _envelope(lat=la, lon=lo, acq_time=t) + wires2.append(handle_firms( + env, subject="central.fire.hotspot.N20.high.unknown", + data={}, now=1780728000 + 7200 + i, + )) + # A second cluster must have fired. + fired = [w for w in wires2 if w is not None] + assert len(fired) == 1, f"expected a second cluster wire, got: {wires2}" + + +# --------------------------------------------------------------------------- +# (d) WFIGS first-sight tags wildfire_declared; Update does not. +# --------------------------------------------------------------------------- + + +def test_wfigs_first_sight_tags_wildfire_declared(): + from meshai.central.wfigs_handler import handle_wfigs + + normalized = { + "_kind": "wfigs_incident", + "irwin_id": "ID-NEW-001", + "incident_name": "Pine Gulch", + "incident_type": "WF", + "acres": 250.0, + "contained_pct": 0, + "lat": 42.93, "lon": -114.45, + "county": "Twin Falls", "state": "ID", + "declared_at_epoch": 1780728000, + } + envelope = { + "data": {"adapter": "wfigs", "category": "wildfire_incident", + "severity": "priority"} + } + data = {} + wire = handle_wfigs(normalized, envelope, + subject="central.fire.incident.id", + data=data, now=1780728000) + assert wire is not None + assert "New:" in wire + assert data.get("category") == "wildfire_declared", \ + f"expected wildfire_declared, got data={data!r}" + + +def test_wfigs_update_does_not_retag_wildfire_declared(): + """After a row exists AND has been broadcast, an acres-grew Update + must NOT carry the wildfire_declared category.""" + from meshai.central.wfigs_handler import handle_wfigs + from meshai.persistence import get_db + + # Pre-existing row that has already been broadcast. + conn = get_db() + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, current_acres, " + "current_contained_pct, lat, lon, last_event_at, " + "last_broadcast_at, last_broadcast_acres, last_broadcast_contained) " + "VALUES (?,?,?,?,?,?,?,?,?,?)", + ("ID-UPD-001", "Pine Gulch", 250.0, 0, 42.93, -114.45, + 1780728000, 1780728000, 250.0, 0), + ) + normalized = { + "_kind": "wfigs_incident", + "irwin_id": "ID-UPD-001", + "incident_name": "Pine Gulch", + "incident_type": "WF", + "acres": 500.0, + "contained_pct": 15, + "lat": 42.93, "lon": -114.45, + "county": "Twin Falls", "state": "ID", + } + envelope = { + "data": {"adapter": "wfigs", "category": "wildfire_incident", + "severity": "priority"} + } + data = {} + # 8h cooldown clear: now > 28800s after last_broadcast_at. + wire = handle_wfigs(normalized, envelope, + subject="central.fire.incident.id", + data=data, now=1780728000 + 30000) + assert wire is not None + assert "Update:" in wire + # Update branch must NOT re-tag with wildfire_declared. + assert data.get("category") != "wildfire_declared" + + +# --------------------------------------------------------------------------- +# (e) Adapter_config rows present after seed. +# --------------------------------------------------------------------------- + + +def test_adapter_config_seeds_new_keys(): + from meshai.persistence import get_db + conn = get_db() + rows = { + (r["adapter"], r["key"]): r["default_json"] + for r in conn.execute( + "SELECT adapter, key, default_json FROM adapter_config " + "WHERE adapter IN ('firms','fires') AND key IN (" + "'spread_radius_mi_default', 'cluster_min_pixels', " + "'cluster_max_radius_mi', 'cluster_time_window_minutes')" + ) + } + assert ("fires", "spread_radius_mi_default") in rows + assert ("firms", "cluster_min_pixels") in rows + assert ("firms", "cluster_max_radius_mi") in rows + assert ("firms", "cluster_time_window_minutes") in rows + # Values are JSON-encoded; spot-check the float. + assert rows[("fires", "spread_radius_mi_default")] == "5.0" + assert rows[("firms", "cluster_min_pixels")] == "3" + + +# --------------------------------------------------------------------------- +# (f) Categories registered. +# --------------------------------------------------------------------------- + + +def test_new_categories_registered(): + from meshai.notifications.categories import ALERT_CATEGORIES + assert "wildfire_declared" in ALERT_CATEGORIES + assert "unattributed_hotspot_cluster" in ALERT_CATEGORIES + for cat in ("wildfire_declared", "unattributed_hotspot_cluster"): + entry = ALERT_CATEGORIES[cat] + assert entry["toggle"] == "fire" + assert entry["default_severity"] == "priority"