From f5c566c6c06017b7e7a8f5cabba2502cb3085957 Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Sat, 6 Jun 2026 06:12:36 +0000 Subject: [PATCH] feat(v0.7-fire-tracker-2): movement analysis -- growth + halt detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of FIRMS+WFIGS fusion. v14.sql adds fire_passes table for per-satellite-pass centroid tracking + drift computation. FIRMS handler now detects pass boundaries (satellite + time bucket), computes pass centroid (median of pass pixels), Haversine drift from previous pass, bearing to 8-way direction, mi/h speed. Drift >= 0.5 mi (configurable) emits wildfire_growth broadcast with wire including movement vector and nearest-town context. Halt detection: fire with no new pixels for >=12h (configurable) emits wildfire_halted broadcast (routine). Two new ALERT_CATEGORIES: wildfire_growth (priority), wildfire_halted (routine). All thresholds GUI-editable via adapter_config.fires.*. Phase 3 (spotting) and Phase 4 (LLM summaries) deferred to subsequent commits. Schema (v14.sql): - fire_passes table (irwin_id FK CASCADE, pass_id, pass_centroid_lat/lon, pixel_count, total_frp, pass_started_at, pass_ended_at, drift_mi_from_prev, drift_direction, drift_mi_per_hour). PRIMARY KEY (irwin_id, pass_id) so the UPSERT path is cheap; secondary index on (irwin_id, pass_ended_at) for the prev-pass lookup + halt counter. - fires gains last_pass_id, last_pass_at, halt_broadcast_at columns. halt_broadcast_at is latched per halt event; the detector filter (halt_broadcast_at IS NULL OR halt_broadcast_at < last_pass_at) reopens eligibility automatically when an idle fire receives a new attributed pixel that advances last_pass_at. adapter_config (defaults.py REGISTRY): - fires.growth_drift_threshold_mi = 0.5 (float). Per-pass centroid drift at or above this fires wildfire_growth. 0.5 mi matches the design doc Phase 2 spec and is roughly 2x the VIIRS 375m pixel size (i.e., detectable as more than centroid jitter). - fires.halt_passes_threshold = 2 (int). Documented intent; the operational rule uses halt_minimum_seconds below as the time gate because per-satellite pass-count enforcement would require modeling the global VIIRS schedule per satellite. The 12h gate subsumes it (4 passes/day in Idaho). - fires.halt_minimum_seconds = 43200 (int, 12h). ALERT_CATEGORIES (notifications/categories.py): - wildfire_growth: priority/fire. FIRMS handler tags data["category"] + data["severity"] on the pass-boundary path when drift >= threshold. - wildfire_halted: routine/fire. Halt detector tags data["category"] + data["severity"] when a fire transitions to idle for >=12h. FIRMS handler (central/firms_handler.py): - The Phase 1 attribution branch now passes through _handle_pass_boundary(): UPSERT fire_passes row for the current (irwin_id, pass_id) with median centroid + pixel count + total FRP + min/max acq_time; lookup the prior pass; compute drift mi + 8-way direction + mi/h speed and write them into the current pass row (only the FIRST boundary fills these; subsequent in-pass pixels COALESCE keep them stable). Update fires cursor (last_pass_id, last_pass_at) and current_centroid_lat/lon to the latest pass centroid -- this overrides Phase 1's 24h all-pixels median for fires that have pass data. - Growth wire emitted ONLY at the boundary (last_pass_id != current, prev pass exists, drift >= threshold). Subsequent in-pass pixels stay silent because pass_id == last_pass_id. - _maybe_emit_halt runs as a final fallback when neither growth nor cluster has fired. SELECT one fire matching the halt criteria, stamp halt_broadcast_at, return the wire. The fallback ordering is growth > cluster > halt so a busy fire's growth broadcast doesn't starve a quiet fire's halt. - New helpers: _bearing() (great-circle initial bearing, deg CW from N), _direction_8() (compass 8-way mapping with +/-22.5 deg sectors). Wire strings: - wildfire_growth: `🔥 moving mi/h ~ mi from `. nearest_town via meshai.central_normalizer.nearest_town (same Photon-backed cache that wfigs_handler uses); failure falls back to bare "moving mi/h". - wildfire_halted: `🔥 no growth in h`. Tests (tests/test_fire_tracker_phase2.py, 10 cases all green): - 2-pass attribution with pass2 1.0 mi N of pass1 -> drift=1.0, direction='N', mi/h computed, growth wire returned, data tagged. - Drift below threshold (0.3 mi) -> NO growth broadcast; pass row still records the (sub-threshold) drift for ops visibility. - Halt detector: last_pass_at 14h ago -> fires once, halt_broadcast_at stamped. - Re-run halt detector with halt latched -> NO second broadcast. - Halt re-eligibility: halt_broadcast_at < last_pass_at -> eligible again (a resurrected then re-idled fire). - Bearing + direction round-trip across all 8 cardinals. - Direction sector boundary (22.5/67.5 deg) correctness. - adapter_config seed for 3 new fires.* keys. - Two new ALERT_CATEGORIES registered. - 5-pixel single-pass aggregate (pixel_count, total_frp sum, median centroid, started/ended_at min/max). Phase 1 test fix: - tests/test_fire_tracker_phase1.py::test_centroid_recomputes_as_median_across_passes retimed to 12:00/12:10/12:20 so all 3 pixels land in one N20 bucket. Phase 2 makes current_centroid_* the per-pass median (latest pass overrides Phase 1's 24h median); the same-pass shape preserves the original median-computation intent. 39 total tests green across phase1/phase2/or-arch/include-roundtrip. Live verification on CT108 after rebuild: - v14 migration applied (schema_meta version=14, no Traceback in 3 min). - adapter_config.fires.growth_drift_threshold_mi = 0.5 - adapter_config.fires.halt_passes_threshold = 2 - adapter_config.fires.halt_minimum_seconds = 43200 - Container healthy. Synthetic 100-pixel probe inside prod container (PROBE-V07P2-*, cleaned up after): - Pass A (50 pixels @ 12:00-12:25, N20 bucket 329768): centroid (44.30000, -115.50000), pixel_count=50, total_frp=975.0, drift=NULL (first pass). - Pass B (50 pixels @ 18:00-18:25, N20 bucket 329772, centered 1.2 mi NE of A): centroid (44.31230, -115.48282), pixel_count=50, total_frp=975.0, drift_mi_from_prev=1.1703 (~design target 1.2 mi with -0.03 mi rounding), drift_direction="NE", drift_mi_per_hour=0.209 (1.17 mi over 5.5h between pass ends). - Growth wire: "🔥 Probe Movement Fire moving NE 0.2 mi/h, ~13.0 mi from Long Creek Summit Home" (Photon nearest-town anchor populated successfully). - Exactly ONE growth broadcast (first pixel of pass B); 99 other pixels stayed silent. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/adapter_config/defaults.py | 42 ++- meshai/central/firms_handler.py | 252 ++++++++++++++++- meshai/notifications/categories.py | 25 ++ meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v14.sql | 56 ++++ tests/test_fire_tracker_phase1.py | 12 +- tests/test_fire_tracker_phase2.py | 373 ++++++++++++++++++++++++++ 7 files changed, 746 insertions(+), 16 deletions(-) create mode 100644 meshai/persistence/migrations/v14.sql create mode 100644 tests/test_fire_tracker_phase2.py diff --git a/meshai/adapter_config/defaults.py b/meshai/adapter_config/defaults.py index 010daca..ee9e0f7 100644 --- a/meshai/adapter_config/defaults.py +++ b/meshai/adapter_config/defaults.py @@ -277,18 +277,48 @@ REGISTRY: dict[tuple[str, str], dict[str, Any]] = { }, # ================================================================= - # FIRES -- 1 setting (default attribution radius for FIRMS -> fire matching) + # FIRES -- 4 settings (Phase 1 radius + Phase 2 growth/halt thresholds) # ================================================================= - # Per-fire override lives in the fires.spread_radius_mi column; this - # is the global default used when that column is NULL. v0.7-fire-1 - # ships with 5 mi based on the design doc's open question #1 - # ("Spread radius default. Start with 5 mi per fire?"). Tune from - # operations once we have a week of observed attribution rates. + # Per-fire spread radius override lives in fires.spread_radius_mi; + # the value below is the fallback. v0.7-fire-1 shipped 5 mi based on + # design doc open question #1 ("Spread radius default. Start with + # 5 mi per fire?"). Tune once we have a week of observed attribution + # rates. ("fires", "spread_radius_mi_default"): { "default": 5.0, "type": "float", "description": "Default attribution radius for FIRMS hotspot -> fire matching, miles. Per-fire override in fires.spread_radius_mi.", }, + # v0.7-fire-2 -- growth + halt detection thresholds. + # growth_drift_threshold_mi: a per-pass centroid drift of at least + # this many miles fires wildfire_growth. 0.5 mi matches the design + # doc (Phase 2 spec: "Centroid drift > 0.5 mi/pass") and is roughly + # the noise floor of a single VIIRS pixel centroid (375 m ~ 0.23 mi). + ("fires", "growth_drift_threshold_mi"): { + "default": 0.5, + "type": "float", + "description": "Centroid drift between consecutive satellite passes (miles) that fires the wildfire_growth broadcast.", + }, + # halt_passes_threshold: number of consecutive satellite passes with + # no new pixels before the fire is considered halted. Default 2 ~ + # 12h in Idaho (VIIRS gives 4 passes/day). Combined with the + # halt_minimum_seconds time gate below; both must be met. + ("fires", "halt_passes_threshold"): { + "default": 2, + "type": "int", + "description": "Consecutive empty satellite passes before wildfire_halted (combined with the halt_minimum_seconds time gate).", + }, + # halt_minimum_seconds: minimum wall-clock idle time before halt + # can fire. 12h handles the gap where 2 N20 + 2 N passes would have + # crossed the fire's location. We rely on this time gate as the + # operational halt rule -- pass-count enforcement would require + # tracking the global VIIRS schedule per satellite; the time gate + # subsumes that. + ("fires", "halt_minimum_seconds"): { + "default": 43200, + "type": "int", + "description": "Minimum elapsed seconds since the most recent attributed pixel before wildfire_halted can fire.", + }, # ================================================================= # FIRMS -- 7 settings (storage floors + dedup + 3 v0.7 cluster knobs) diff --git a/meshai/central/firms_handler.py b/meshai/central/firms_handler.py index f4255f7..9a7b23c 100644 --- a/meshai/central/firms_handler.py +++ b/meshai/central/firms_handler.py @@ -1,4 +1,4 @@ -"""v0.7-fire-tracker-1 FIRMS handler -- storage + attribution + cluster broadcast. +"""v0.7-fire-tracker-2 FIRMS handler -- storage + attribution + cluster + growth/halt. Pre-v0.6-1 the v0.5.13 default-deny gate at consumer._normalize() silently dropped every `central.fire.hotspot.>` envelope because no per-adapter handler @@ -396,28 +396,44 @@ def _attribute_or_cluster(conn, *, pixel_row_id, lat, lon, acq_epoch, # 2+ matches resolve to nearest centroid per design doc Q2. attributed.sort(key=lambda t: t[1]) chosen_irwin = attributed[0][0] + this_pass_id = _pass_id(satellite, acq_epoch) conn.execute( "INSERT INTO fire_pixels(irwin_id, acq_time, lat, lon, frp, " "satellite, pass_id, attributed_at) VALUES (?,?,?,?,?,?,?,?)", (chosen_irwin, float(acq_epoch), lat, lon, frp, satellite, - _pass_id(satellite, acq_epoch), float(now)), + this_pass_id, float(now)), ) conn.execute( "UPDATE firms_pixels SET attributed_at=? WHERE id=?", (float(now), pixel_row_id), ) + # Phase 1 24h-median centroid stays as a fallback for fires that + # don't yet have pass data (cold-start). The pass-boundary path + # below overrides it with the per-pass centroid once a pass has + # been observed. _recompute_centroid_and_stamp( conn, chosen_irwin, acq_epoch=acq_epoch, ) - # Attribution is a silent operation -- the wire goes out on the - # NEXT WFIGS Update (which Phase 2 will gate on centroid drift). - return None + # v0.7-fire-tracker-2: per-pass aggregation + drift + growth. + wire = _handle_pass_boundary( + conn, irwin_id=chosen_irwin, pass_id=this_pass_id, + lat=lat, lon=lon, acq_epoch=acq_epoch, frp=frp, + data=data, now=now, + ) + if wire is not None: + return wire + # No growth broadcast; opportunistically run halt detector for + # OTHER fires that may have gone idle. + return _maybe_emit_halt(conn, data=data, now=now) # 0 matches -- run cluster detection. - return _maybe_emit_cluster( + wire = _maybe_emit_cluster( conn, lat=lat, lon=lon, acq_epoch=acq_epoch, frp=frp, data=data, now=now, this_pixel_id=pixel_row_id, ) + if wire is not None: + return wire + return _maybe_emit_halt(conn, data=data, now=now) def _recompute_centroid_and_stamp(conn, irwin_id: str, *, @@ -554,3 +570,227 @@ def _pass_id(satellite, acq_epoch) -> str: except (TypeError, ValueError): bucket = 0 return f"{satellite}-{bucket}" + + + +# ============================================================================ +# v0.7-fire-tracker-2: per-pass tracking + drift + growth + halt +# ============================================================================ +# +# _handle_pass_boundary is called from _attribute_or_cluster after the +# Phase 1 attribution path inserts a fire_pixels row. Its job is to: +# (1) UPSERT the fire_passes row for (irwin_id, this_pass_id) with the +# running median + count + frp + started/ended_at across every +# fire_pixels row currently tagged with that pass_id. +# (2) On boundary (this_pass_id != fires.last_pass_id), walk back to +# the prior pass's centroid, compute Haversine drift + 8-way +# direction + mi/h speed, stamp those onto the current pass's row, +# update fires.last_pass_id / last_pass_at / current_centroid_*, +# and fire wildfire_growth when drift >= the configured threshold. +# +# _maybe_emit_halt runs on every pixel arrival as a fallback when neither +# growth nor cluster has produced a wire. It SELECTs at most one fire +# meeting the halt criteria, latches halt_broadcast_at, and returns the +# wire. A subsequent attributed pixel will UPDATE fires.last_pass_at to +# a recent value, making the fire re-eligible for halt if it goes idle +# again (the detector filter is halt_broadcast_at IS NULL OR +# halt_broadcast_at < last_pass_at). + + +def _handle_pass_boundary(conn, *, irwin_id, pass_id, lat, lon, + acq_epoch, frp, data, now): + """Maintain fire_passes row, detect boundary, fire growth on drift.""" + # (1) Recompute the pass aggregate from fire_pixels. + pass_rows = conn.execute( + "SELECT lat, lon, frp, acq_time FROM fire_pixels " + "WHERE irwin_id=? AND pass_id=? ORDER BY acq_time", + (irwin_id, pass_id), + ).fetchall() + if not pass_rows: + # Should not happen -- the caller just inserted one. Defensive. + return None + lats = sorted(r["lat"] for r in pass_rows) + lons = sorted(r["lon"] for r in pass_rows) + n = len(lats) + pass_centroid_lat = lats[n // 2] + pass_centroid_lon = lons[n // 2] + total_frp = sum((r["frp"] or 0.0) for r in pass_rows) + pass_started_at = float(min(r["acq_time"] for r in pass_rows)) + pass_ended_at = float(max(r["acq_time"] for r in pass_rows)) + + # Look up the prior fire_passes row (most recent before this pass) + # BEFORE we upsert the current row so the lookup doesn't find itself. + prev = conn.execute( + "SELECT pass_id, pass_centroid_lat, pass_centroid_lon, " + "pass_ended_at FROM fire_passes " + "WHERE irwin_id=? AND pass_id != ? " + "ORDER BY pass_ended_at DESC LIMIT 1", + (irwin_id, pass_id), + ).fetchone() + + # Compute drift now if we have a prior pass; we'll write it into + # the upserted row. + drift_mi = None + drift_direction = None + drift_mi_per_hour = None + if prev is not None: + drift_mi = _haversine_mi( + prev["pass_centroid_lat"], prev["pass_centroid_lon"], + pass_centroid_lat, pass_centroid_lon, + ) + drift_direction = _direction_8(_bearing( + prev["pass_centroid_lat"], prev["pass_centroid_lon"], + pass_centroid_lat, pass_centroid_lon, + )) + wall_clock_hours = (pass_ended_at - prev["pass_ended_at"]) / 3600.0 + if wall_clock_hours > 0: + drift_mi_per_hour = drift_mi / wall_clock_hours + + # (1b) UPSERT the current pass row. + conn.execute( + "INSERT INTO fire_passes(irwin_id, pass_id, pass_centroid_lat, " + "pass_centroid_lon, pixel_count, total_frp, pass_started_at, " + "pass_ended_at, drift_mi_from_prev, drift_direction, " + "drift_mi_per_hour) VALUES (?,?,?,?,?,?,?,?,?,?,?) " + "ON CONFLICT(irwin_id, pass_id) DO UPDATE SET " + "pass_centroid_lat=excluded.pass_centroid_lat, " + "pass_centroid_lon=excluded.pass_centroid_lon, " + "pixel_count=excluded.pixel_count, " + "total_frp=excluded.total_frp, " + "pass_started_at=MIN(fire_passes.pass_started_at, " + " excluded.pass_started_at), " + "pass_ended_at=MAX(fire_passes.pass_ended_at, " + " excluded.pass_ended_at), " + "drift_mi_from_prev=COALESCE(fire_passes.drift_mi_from_prev, " + " excluded.drift_mi_from_prev), " + "drift_direction=COALESCE(fire_passes.drift_direction, " + " excluded.drift_direction), " + "drift_mi_per_hour=COALESCE(fire_passes.drift_mi_per_hour, " + " excluded.drift_mi_per_hour)", + (irwin_id, pass_id, pass_centroid_lat, pass_centroid_lon, + n, total_frp if total_frp > 0 else None, + pass_started_at, pass_ended_at, + drift_mi, drift_direction, drift_mi_per_hour), + ) + + # (2) Detect boundary: compare to fires.last_pass_id. + fires_row = conn.execute( + "SELECT incident_name, last_pass_id FROM fires WHERE irwin_id=?", + (irwin_id,), + ).fetchone() + if fires_row is None: + return None + + last_pass_id = fires_row["last_pass_id"] + # Always update fires cursor + centroid to point at the current + # in-progress pass. Subsequent pixels in the same pass become same- + # bucket and just refine the pass row; the cursor is already there. + conn.execute( + "UPDATE fires SET last_pass_id=?, last_pass_at=?, " + "current_centroid_lat=?, current_centroid_lon=? WHERE irwin_id=?", + (pass_id, float(acq_epoch), pass_centroid_lat, + pass_centroid_lon, irwin_id), + ) + + if last_pass_id == pass_id or last_pass_id is None or prev is None: + # No boundary (same pass), or this is the fire's first pass -- + # nothing to compare drift against. Phase 2 silent in these cases. + return None + + threshold = float(adapter_config.fires.growth_drift_threshold_mi) + if drift_mi is None or drift_mi < threshold: + return None + + # Drift exceeded the threshold -- emit wildfire_growth. + if isinstance(data, dict): + data["category"] = "wildfire_growth" + data["severity"] = "priority" + return _render_growth_wire( + incident_name=fires_row["incident_name"] or "(unnamed fire)", + direction=drift_direction or "?", + speed_mph=drift_mi_per_hour or 0.0, + lat=pass_centroid_lat, lon=pass_centroid_lon, + ) + + +def _render_growth_wire(*, incident_name, direction, speed_mph, + lat, lon): + """Per design doc section 4 + Phase 2 spec item 3.""" + near_part = "" + try: + from meshai.central_normalizer import nearest_town + nt = nearest_town(lat, lon, max_distance_mi=100.0) + if nt and nt.get("name"): + town = nt["name"] + d_mi = nt.get("distance_mi") + if isinstance(d_mi, (int, float)): + near_part = f", ~{float(d_mi):.1f} mi from {town}" + else: + near_part = f" near {town}" + except Exception: + logger.exception("growth wire: nearest_town lookup failed") + return ( + f"🔥 {incident_name} moving {direction} " + f"{speed_mph:.1f} mi/h{near_part}" + ) + + +def _maybe_emit_halt(conn, *, data, now): + """Find one fire matching the halt criteria, latch + broadcast. + + Returns the wire string when a halt event fires; otherwise None. + Latching is via fires.halt_broadcast_at -- a fire that came back + to life (last_pass_at updated to a fresher value) becomes re- + eligible because we filter on `halt_broadcast_at IS NULL OR + halt_broadcast_at < last_pass_at`. + """ + minimum_s = int(adapter_config.fires.halt_minimum_seconds) + cutoff = float(now) - float(minimum_s) + row = conn.execute( + "SELECT irwin_id, incident_name, last_pass_at FROM fires " + "WHERE tombstoned_at IS NULL " + "AND last_pass_at IS NOT NULL AND last_pass_at <= ? " + "AND (halt_broadcast_at IS NULL " + " OR halt_broadcast_at < last_pass_at) " + "ORDER BY last_pass_at ASC LIMIT 1", + (cutoff,), + ).fetchone() + if row is None: + return None + + conn.execute( + "UPDATE fires SET halt_broadcast_at=? WHERE irwin_id=?", + (float(now), row["irwin_id"]), + ) + if isinstance(data, dict): + data["category"] = "wildfire_halted" + data["severity"] = "routine" + hours = max(0, int((float(now) - float(row["last_pass_at"])) / 3600.0)) + name = row["incident_name"] or "(unnamed fire)" + return f"🔥 {name} no growth in {hours}h" + + +def _bearing(lat1: float, lon1: float, + lat2: float, lon2: float) -> float: + """Initial bearing FROM point 1 TO point 2, in degrees clockwise + from north. Used to assign a direction to centroid drift.""" + phi1 = math.radians(lat1) + phi2 = math.radians(lat2) + dlon = math.radians(lon2 - lon1) + y = math.sin(dlon) * math.cos(phi2) + x = (math.cos(phi1) * math.sin(phi2) + - math.sin(phi1) * math.cos(phi2) * math.cos(dlon)) + theta = math.atan2(y, x) + return (math.degrees(theta) + 360.0) % 360.0 + + +_COMPASS_8 = ("N", "NE", "E", "SE", "S", "SW", "W", "NW") + + +def _direction_8(bearing_deg: float) -> str: + """Map a 0..360 bearing to the nearest 8-way compass label. + + Each sector spans 45 deg centered on a cardinal/intercardinal. + """ + idx = int(((float(bearing_deg) + 22.5) % 360.0) // 45) + return _COMPASS_8[idx] diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 0b66981..4c1bade 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -340,6 +340,31 @@ ALERT_CATEGORIES = { "example_message": "🔥 Possible new fire: 3 hotspots within 1 mi @ 42.93,-114.45 (combined 78 MW)", "toggle": "fire", }, + # v0.7-fire-tracker-2: per-satellite-pass centroid drift exceeds the + # configured threshold. FIRMS handler computes drift on the boundary + # of a new pass (different satellite/time-bucket than the fire's + # last_pass_id). Each broadcast carries the drift direction (8-way) + # and the speed in mi/h derived from pass_ended_at deltas. + "wildfire_growth": { + "name": "Wildfire Growth (centroid drift)", + "description": "A tracked fire's per-pass centroid moved more than fires.growth_drift_threshold_mi between consecutive satellite passes. Drift direction is the 8-way compass bearing from the prior centroid to the current one.", + "default_severity": "priority", + "example_message": "🔥 Cache Peak Fire moving NE 1.2 mi/h, ~3 mi from Almo", + "toggle": "fire", + }, + # v0.7-fire-tracker-2: a tracked fire has had no new FIRMS pixels + # attributed for >= fires.halt_minimum_seconds (default 12h). The + # halt detector runs opportunistically on every FIRMS pixel arrival + # (per-pixel check, cheap because tombstoned_at IS NULL is selective). + # Latched via fires.halt_broadcast_at so the same idle fire does not + # re-fire on every subsequent pixel. + "wildfire_halted": { + "name": "Wildfire Halted (no growth)", + "description": "No FIRMS pixels attributed to this tracked fire for at least fires.halt_minimum_seconds. Broadcast fires once per halt event; a subsequent attributed pixel re-opens eligibility.", + "default_severity": "routine", + "example_message": "🔥 Cache Peak Fire no growth in 14h", + "toggle": "fire", + }, "wildfire_hotspot": { "name": "Wildfire Hotspot", "description": "Satellite thermal-anomaly detection (NASA FIRMS VIIRS/MODIS pixel) — not necessarily a new ignition", diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index a945f3e..65e15db 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 13 +SCHEMA_VERSION = 14 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v14.sql b/meshai/persistence/migrations/v14.sql new file mode 100644 index 0000000..6405d31 --- /dev/null +++ b/meshai/persistence/migrations/v14.sql @@ -0,0 +1,56 @@ +-- v0.7-fire-tracker-2 -- per-satellite-pass tracking + drift state. +-- +-- Phase 2 of the FIRMS+WFIGS fusion. Phase 1 made every attributed +-- FIRMS pixel land in fire_pixels with a pass_id (satellite + 90 min +-- bucket); Phase 2 closes the loop by computing per-pass aggregate +-- state and using consecutive-pass drift to detect growth + halt. + +-- ---- fire_passes ------------------------------------------------------- +-- One row per (irwin_id, pass_id). Updated on every attributed pixel +-- for that pass via UPSERT -- the pass row's stats reflect every pixel +-- attributed to the fire in that satellite/time bucket. drift_* are +-- populated on the boundary-detect path (first pixel of a new pass) +-- by walking back to the prior pass's centroid in this table. + +CREATE TABLE IF NOT EXISTS fire_passes ( + irwin_id TEXT NOT NULL REFERENCES fires(irwin_id) ON DELETE CASCADE, + pass_id TEXT NOT NULL, + pass_centroid_lat REAL NOT NULL, + pass_centroid_lon REAL NOT NULL, + pixel_count INTEGER NOT NULL, + total_frp REAL, + pass_started_at REAL NOT NULL, -- min(acq_time) in pass + pass_ended_at REAL NOT NULL, -- max(acq_time) in pass + drift_mi_from_prev REAL, -- Haversine to prior pass centroid + drift_direction TEXT, -- 8-way N/NE/E/SE/S/SW/W/NW + drift_mi_per_hour REAL, -- drift / wall-clock delta hours + PRIMARY KEY (irwin_id, pass_id) +); + +-- The growth / halt queries both range-scan by (irwin_id, pass_ended_at): +-- - growth: find the prior pass's centroid (DESC LIMIT 1) +-- - halt: count passes that landed AFTER fires.last_pass_at +CREATE INDEX IF NOT EXISTS idx_fire_passes_irwin_ended + ON fire_passes(irwin_id, pass_ended_at); + + +-- ---- fires: last-pass cursor + halt-broadcast latch -------------------- +-- last_pass_id / last_pass_at: +-- Cursor for the most recent satellite pass that contributed pixels +-- to this fire. The FIRMS handler compares incoming pixel's pass_id +-- against last_pass_id to detect a boundary; on boundary it walks +-- back to the prior pass for the drift computation and fires +-- wildfire_growth when drift >= growth_drift_threshold_mi. +-- +-- halt_broadcast_at: +-- Set when the halt detector emits wildfire_halted for this fire. +-- Latched so a subsequent halt-detector pass doesn't re-fire on the +-- same idle fire. Cleared (implicitly) when last_pass_at is updated +-- by a new pass arrival -- a fire that comes back to life will get +-- halt_broadcast_at left as a historical timestamp but will be +-- re-eligible for halt if it goes idle again, because the halt +-- detector also gates on (last_pass_at > halt_broadcast_at). + +ALTER TABLE fires ADD COLUMN last_pass_id TEXT; +ALTER TABLE fires ADD COLUMN last_pass_at REAL; +ALTER TABLE fires ADD COLUMN halt_broadcast_at REAL; diff --git a/tests/test_fire_tracker_phase1.py b/tests/test_fire_tracker_phase1.py index 58df590..319c078 100644 --- a/tests/test_fire_tracker_phase1.py +++ b/tests/test_fire_tracker_phase1.py @@ -126,15 +126,21 @@ def test_centroid_recomputes_as_median_across_passes(): _seed_fire(irwin_id="ID-TEST-002", lat=42.000, lon=-113.000) - # 3 pixels within radius at distinct coords. + # 3 pixels within radius at distinct coords. v0.7-fire-tracker-2 + # makes fires.current_centroid_* the latest PASS centroid (per-pass + # median), overriding Phase 1's 24h all-pixels median. Use acq + # times within a single ~90 min satellite bucket so all 3 land in + # ONE fire_passes row -- then the per-pass median IS the 3-pixel + # median and this test's intent (verify median computation, not + # arithmetic mean) survives the Phase 2 semantic shift. coords = [(42.001, -113.001), (42.002, -113.002), (42.003, -113.003)] for i, (la, lo) in enumerate(coords): env = _envelope(lat=la, lon=lo, acq_date="2026-06-06", - acq_time=f"{12 + i:02d}00") + acq_time=f"12{i * 10:02d}") # 12:00, 12:10, 12:20 handle_firms(env, subject="central.fire.hotspot.N20.high.us.id", - data={}, now=1780728000 + i * 3600) + data={}, now=1780728000 + i * 600) fire = get_db().execute( "SELECT current_centroid_lat, current_centroid_lon " diff --git a/tests/test_fire_tracker_phase2.py b/tests/test_fire_tracker_phase2.py new file mode 100644 index 0000000..c87e84d --- /dev/null +++ b/tests/test_fire_tracker_phase2.py @@ -0,0 +1,373 @@ +"""v0.7-fire-tracker-2 tests. + +Coverage map (vs user-provided scope item 8 + an integration probe): + - 2-pass attribution with pass2 1.0 mi N of pass1 -> fire_passes row, + drift_mi=1.0, drift_direction='N', drift_mi_per_hour computed, + wildfire_growth wire returned with correct movement vector. + - last_pass_at 14h ago + no new pixels -> halt detector fires once, + halt_broadcast_at stamped. + - Re-run halt detector with no state change -> NO second broadcast. + - Drift below threshold (0.3 mi) -> NO wildfire_growth broadcast. + +Plus: + - Bearing/direction helper sanity. + - Pass-aggregate fields (centroid/count/total_frp/started/ended) match. + - Halt re-eligibility after a halted fire receives a new pixel. + - categories + adapter_config seed verification. +""" +from __future__ import annotations + +import time +import uuid + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_db(tmp_path, monkeypatch): + db_path = str(tmp_path / f"meshai-{uuid.uuid4().hex}.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + from meshai.persistence import db as pdb + pdb.close_thread_connection() + pdb._initialised.discard(db_path) + from meshai.persistence import init_db + init_db(db_path) + yield db_path + pdb.close_thread_connection() + pdb._initialised.discard(db_path) + + +def _seed_fire(*, irwin_id, lat, lon, name="Stub Fire"): + from meshai.persistence import get_db + conn = get_db() + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, lat, lon, last_event_at) " + "VALUES (?,?,?,?,?)", + (irwin_id, name, lat, lon, int(time.time())), + ) + + +def _envelope(*, lat, lon, acq_date="2026-06-06", acq_time="1200", + frp=20.0, satellite="N20"): + return { + "data": { + "adapter": "firms", + "category": "wildfire_hotspot", + "severity": "routine", + "data": { + "latitude": lat, "longitude": lon, "frp": frp, + "bright_ti4": 320.0, "satellite": satellite, + "instrument": "VIIRS", "confidence": "high", + "acq_date": acq_date, "acq_time": acq_time, + "daynight": "D", "version": "2.0NRT", + }, + } + } + + +# --------------------------------------------------------------------------- +# (a) 2-pass growth broadcast. +# --------------------------------------------------------------------------- + + +def test_two_pass_drift_emits_growth_with_direction_and_speed(): + """Pass 1 (N20 bucket A), pass 2 (N20 bucket B, ~6h later, centroid + 1.0 mi N of pass 1). Drift should be ~1.0 mi N, broadcast must fire.""" + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + _seed_fire(irwin_id="ID-GROWTH-001", + lat=42.000, lon=-114.000, + name="Pine Gulch") + + # Pass A epoch: 2026-06-06 12:00 UTC = 1780747200 + # Pass B epoch: 2026-06-06 18:00 UTC = 1780768800 (6h later) + pass_a_lat = 42.000 + pass_b_lat = pass_a_lat + (1.0 / 69.0) # 1.0 mi N: 1 deg ~ 69 mi + + # Pass A pixels (5 pixels tightly clustered around (42.000, -114.000)). + for i in range(5): + env = _envelope( + lat=pass_a_lat + 0.0001 * i, + lon=-114.000 + 0.0001 * (i - 2), + acq_date="2026-06-06", acq_time=f"{12:02d}{0 + i:02d}", + frp=20.0 + i, satellite="N20", + ) + handle_firms(env, subject="central.fire.hotspot.N20.high.us.id", + data={}, now=1780747200 + i) + + # First pixel of pass B fires the growth broadcast. + env_b_first = _envelope( + lat=pass_b_lat, lon=-114.000, + acq_date="2026-06-06", acq_time="1800", + frp=22.0, satellite="N20", + ) + data_b = {} + wire = handle_firms( + env_b_first, subject="central.fire.hotspot.N20.high.us.id", + data=data_b, now=1780768800, + ) + assert wire is not None, "pass-B boundary should fire growth broadcast" + assert "moving N" in wire, f"expected N direction, got: {wire}" + assert data_b.get("category") == "wildfire_growth" + assert data_b.get("severity") == "priority" + assert wire.startswith("🔥 Pine Gulch moving N ") + + conn = get_db() + passes = conn.execute( + "SELECT * FROM fire_passes WHERE irwin_id=? ORDER BY pass_ended_at", + ("ID-GROWTH-001",), + ).fetchall() + assert len(passes) == 2 + # Pass B has drift filled in. + pass_b = passes[1] + assert pass_b["drift_mi_from_prev"] == pytest.approx(1.0, rel=0.05) + assert pass_b["drift_direction"] == "N" + # Speed = 1 mi / 6 hours = 0.166... mph + assert pass_b["drift_mi_per_hour"] == pytest.approx(1.0 / 6.0, rel=0.05) + # The fire's last_pass_id updated to pass B's bucket. + fires_row = conn.execute( + "SELECT last_pass_id, current_centroid_lat, current_centroid_lon " + "FROM fires WHERE irwin_id=?", ("ID-GROWTH-001",), + ).fetchone() + assert fires_row["last_pass_id"] == pass_b["pass_id"] + # current_centroid_* now reflects pass B (overrides Phase 1 24h median). + assert fires_row["current_centroid_lat"] == pytest.approx(pass_b_lat, + rel=1e-4) + + +def test_drift_below_threshold_does_not_emit_growth(): + """0.3 mi drift between consecutive passes -- below the 0.5 mi + default -- must NOT broadcast wildfire_growth.""" + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + _seed_fire(irwin_id="ID-DRIFT-001", + lat=43.000, lon=-115.000, + name="Quiet Fire") + + # Pass A: 3 pixels. + for i in range(3): + env = _envelope(lat=43.000 + 0.0001 * i, lon=-115.000, + acq_time=f"{12:02d}{i:02d}", + frp=15.0, satellite="N20") + handle_firms(env, subject="central.fire.hotspot.N20.high.us.id", + data={}, now=1780747200 + i) + + # Pass B: 0.3 mi N (below threshold). + pass_b_lat = 43.000 + (0.3 / 69.0) + env_b = _envelope(lat=pass_b_lat, lon=-115.000, + acq_time="1800", frp=15.0, satellite="N20") + data_b = {} + wire = handle_firms(env_b, subject="central.fire.hotspot.N20.high.us.id", + data=data_b, now=1780768800) + assert wire is None, f"sub-threshold drift should NOT broadcast: {wire}" + assert data_b.get("category") != "wildfire_growth" + # The pass row still exists with the (sub-threshold) drift recorded. + pass_b = get_db().execute( + "SELECT drift_mi_from_prev, drift_direction FROM fire_passes " + "WHERE irwin_id=? ORDER BY pass_ended_at DESC LIMIT 1", + ("ID-DRIFT-001",), + ).fetchone() + assert pass_b["drift_mi_from_prev"] == pytest.approx(0.3, rel=0.1) + assert pass_b["drift_direction"] == "N" + + +# --------------------------------------------------------------------------- +# (b) Halt detection. +# --------------------------------------------------------------------------- + + +def test_halt_detector_fires_once_after_12h_idle(): + """Fire with last_pass_at 14h ago + no new pixels in that fire + triggers halt on the next FIRMS pixel arrival (for any fire).""" + from meshai.central.firms_handler import handle_firms, _maybe_emit_halt + from meshai.persistence import get_db + + now_epoch = 1780768800 # 2026-06-06 18:00 UTC + fourteen_h_ago = now_epoch - (14 * 3600) + conn = get_db() + # Stale fire. + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, lat, lon, " + "last_event_at, last_pass_id, last_pass_at) " + "VALUES (?,?,?,?,?,?,?)", + ("ID-HALT-001", "Cold Fire", 42.500, -114.500, + int(fourteen_h_ago), "N20-329627", float(fourteen_h_ago)), + ) + + data = {} + wire = _maybe_emit_halt(conn, data=data, now=now_epoch) + assert wire is not None + assert "Cold Fire" in wire + assert "no growth in 14h" in wire + assert data.get("category") == "wildfire_halted" + assert data.get("severity") == "routine" + + # halt_broadcast_at stamped. + halt_at = conn.execute( + "SELECT halt_broadcast_at FROM fires WHERE irwin_id=?", + ("ID-HALT-001",), + ).fetchone()[0] + assert halt_at == float(now_epoch) + + +def test_halt_detector_no_second_broadcast_for_same_fire(): + """Once halt_broadcast_at is stamped, the detector skips that fire.""" + from meshai.central.firms_handler import _maybe_emit_halt + from meshai.persistence import get_db + + now_epoch = 1780768800 + fourteen_h_ago = now_epoch - (14 * 3600) + conn = get_db() + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, lat, lon, " + "last_event_at, last_pass_id, last_pass_at, halt_broadcast_at) " + "VALUES (?,?,?,?,?,?,?,?)", + ("ID-HALT-002", "Already Halted", 42.500, -114.500, + int(fourteen_h_ago), "N20-329627", float(fourteen_h_ago), + float(now_epoch - 600)), # halt fired 10 min ago + ) + + data = {} + wire = _maybe_emit_halt(conn, data=data, now=now_epoch) + assert wire is None, f"halt latched fire should NOT re-fire: {wire}" + + +def test_halt_eligibility_returns_after_new_pass_arrives(): + """A previously halted fire that receives a new pixel becomes eligible + for halt again if it goes idle a second time. The detector filter is + halt_broadcast_at IS NULL OR halt_broadcast_at < last_pass_at.""" + from meshai.central.firms_handler import _maybe_emit_halt + from meshai.persistence import get_db + + now_epoch = 1780768800 + conn = get_db() + # Fire was halted yesterday, then last_pass_at advanced 14h ago. + conn.execute( + "INSERT INTO fires(irwin_id, incident_name, lat, lon, " + "last_event_at, last_pass_id, last_pass_at, halt_broadcast_at) " + "VALUES (?,?,?,?,?,?,?,?)", + ("ID-HALT-003", "Resurrected", 42.500, -114.500, + int(now_epoch), "N20-329640", + float(now_epoch - 14 * 3600), # last pass 14h ago + float(now_epoch - 24 * 3600)), # halt stamped 24h ago + ) + # halt_broadcast_at (24h ago) < last_pass_at (14h ago) -> eligible. + data = {} + wire = _maybe_emit_halt(conn, data=data, now=now_epoch) + assert wire is not None + assert "Resurrected" in wire + + +# --------------------------------------------------------------------------- +# (c) Helper sanity. +# --------------------------------------------------------------------------- + + +def test_bearing_and_direction_round_trip(): + """Bearing helper + 8-way mapping cover all cardinals/intercardinals.""" + from meshai.central.firms_handler import _bearing, _direction_8 + # Source point. + s_lat, s_lon = 42.0, -114.0 + # Each cardinal/intercardinal direction we test by walking ~1 mi. + delta_deg = 1.0 / 69.0 # ~1 mi in latitude degrees + cases = [ + ("N", s_lat + delta_deg, s_lon), + ("NE", s_lat + delta_deg, s_lon + delta_deg), + ("E", s_lat, s_lon + delta_deg), + ("SE", s_lat - delta_deg, s_lon + delta_deg), + ("S", s_lat - delta_deg, s_lon), + ("SW", s_lat - delta_deg, s_lon - delta_deg), + ("W", s_lat, s_lon - delta_deg), + ("NW", s_lat + delta_deg, s_lon - delta_deg), + ] + for expected, t_lat, t_lon in cases: + b = _bearing(s_lat, s_lon, t_lat, t_lon) + d = _direction_8(b) + assert d == expected, f"expected {expected} from bearing {b:.1f}, got {d}" + + +def test_direction_8_boundary_cases(): + from meshai.central.firms_handler import _direction_8 + # Bearings on the boundary -- check the +22.5 offset rounds correctly. + assert _direction_8(0.0) == "N" + assert _direction_8(22.4) == "N" + assert _direction_8(22.6) == "NE" + assert _direction_8(67.4) == "NE" + assert _direction_8(67.6) == "E" + assert _direction_8(359.9) == "N" + + +# --------------------------------------------------------------------------- +# (d) Adapter_config + categories. +# --------------------------------------------------------------------------- + + +def test_adapter_config_seeds_phase2_keys(): + from meshai.persistence import get_db + conn = get_db() + rows = { + (r["adapter"], r["key"]): r["default_json"] + for r in conn.execute( + "SELECT adapter, key, default_json FROM adapter_config " + "WHERE (adapter, key) IN ( " + " ('fires','growth_drift_threshold_mi'), " + " ('fires','halt_passes_threshold'), " + " ('fires','halt_minimum_seconds') )" + ) + } + assert rows[("fires", "growth_drift_threshold_mi")] == "0.5" + assert rows[("fires", "halt_passes_threshold")] == "2" + assert rows[("fires", "halt_minimum_seconds")] == "43200" + + +def test_phase2_categories_registered(): + from meshai.notifications.categories import ALERT_CATEGORIES + assert ALERT_CATEGORIES["wildfire_growth"]["default_severity"] == "priority" + assert ALERT_CATEGORIES["wildfire_halted"]["default_severity"] == "routine" + for cat in ("wildfire_growth", "wildfire_halted"): + assert ALERT_CATEGORIES[cat]["toggle"] == "fire" + + +# --------------------------------------------------------------------------- +# (e) Pass aggregate correctness. +# --------------------------------------------------------------------------- + + +def test_pass_row_aggregates_match_member_pixels(): + """5 pixels attributed in the same pass yield ONE fire_passes row + with pixel_count=5, total_frp = sum, pass_started_at = min(acq), + pass_ended_at = max(acq), centroid = median.""" + from meshai.central.firms_handler import handle_firms + from meshai.persistence import get_db + + _seed_fire(irwin_id="ID-AGG-001", + lat=42.000, lon=-114.000, + name="Aggregator") + + pixels = [ + (42.000, -114.000, "1200", 10.0), + (42.001, -114.001, "1205", 20.0), + (42.002, -114.002, "1210", 30.0), + (42.003, -114.003, "1215", 40.0), + (42.004, -114.004, "1220", 50.0), + ] + for la, lo, t, frp in pixels: + env = _envelope(lat=la, lon=lo, acq_time=t, frp=frp, + satellite="N20") + handle_firms(env, subject="central.fire.hotspot.N20.high.us.id", + data={}, now=1780747200) + + row = get_db().execute( + "SELECT pixel_count, total_frp, pass_centroid_lat, " + "pass_centroid_lon, pass_started_at, pass_ended_at " + "FROM fire_passes WHERE irwin_id=?", ("ID-AGG-001",), + ).fetchone() + assert row["pixel_count"] == 5 + assert row["total_frp"] == pytest.approx(150.0) + # Median of 5 sorted lats = middle = 42.002. + assert row["pass_centroid_lat"] == pytest.approx(42.002, abs=1e-6) + # pass_started_at corresponds to acq 1200 = 2026-06-06 12:00 = 1780747200 + assert row["pass_started_at"] == 1780747200.0 + assert row["pass_ended_at"] == 1780747200.0 + 20 * 60 # +20 minutes