mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-11 01:14:45 +02:00
feat(v0.6-1): FIRMS handler -- storage-only, closes silent-drop on central.fire.hotspot.>
v0.5.13 default-deny was silently dropping every FIRMS hotspot because no per-adapter handler existed. firms_pixels table has been empty since v0.5.8b. This commit adds central/firms_handler.py which stores every passing pixel that clears the (currently hardcoded, future GUI-driven) confidence + FRP floors, with dedup on (round(lat,5), round(lon,5), acq_time, satellite) via a unique partial index added in v4.sql. NO mesh broadcasts emitted by this handler -- FIRMS data is for LLM context only and will become queryable when commit #5 (env_reporter) lands. Defaults baked in: FIRMS_CONFIDENCE_FLOOR = "low" -- store every confidence level FIRMS_FRP_FLOOR = 0.0 -- store every FRP value FIRMS_BBOX_OPTIONAL = None -- no spatial filter These become adapter_config GUI rows in commit #3 (per Matt's v0.6 Phase 1 refinement: hardcoded values become GUI default values so first-deploy behavior is unchanged). Wiring: - meshai/central/firms_handler.py (new, 270 lines) - meshai/persistence/migrations/v4.sql (new, unique dedup index) - meshai/persistence/db.py (SCHEMA_VERSION 3 -> 4) - meshai/central/consumer.py (dispatch ladder gets firms branch before default-deny clause; pattern matches handle_swpc / handle_nwis) - tests/test_firms_handler.py (new, 22 tests covering confidence floor, FRP floor, bbox, dedup, missing fields, end-to-end through consumer) - tests/test_consumer_default_deny.py (swap firms -> avalanche for the "no handler" examples since FIRMS now has one) Test count: 658 -> 680 (+22 firms_handler tests, 0 regressions). Refs audit doc v0.6-phase1-audit.md finding #2.
This commit is contained in:
parent
b6160d2eda
commit
b2c4d53b14
6 changed files with 731 additions and 11 deletions
|
|
@ -518,6 +518,16 @@ class CentralConsumer:
|
|||
elif inner.get("adapter") == "nwis":
|
||||
from meshai.central.nwis_handler import handle_nwis
|
||||
synthesized = handle_nwis(envelope, subject, data=data) or None
|
||||
# v0.6-1 firms_handler -- STORAGE-ONLY. handle_firms
|
||||
# writes to firms_pixels (with dedup) and returns None
|
||||
# so the default-deny clause below keeps mesh
|
||||
# broadcasts suppressed. LLM visibility lands in
|
||||
# commit #5 (env_reporter). Closes the v0.5.13
|
||||
# silent-drop on central.fire.hotspot.> (audit doc
|
||||
# finding #2).
|
||||
elif inner.get("adapter") == "firms":
|
||||
from meshai.central.firms_handler import handle_firms
|
||||
synthesized = handle_firms(envelope, subject, data=data) or None
|
||||
elif n is not None and category in ("work_zone", "road_closure", "road_incident"):
|
||||
synthesized = format_work_zone_mesh(n) or None
|
||||
except Exception:
|
||||
|
|
|
|||
306
meshai/central/firms_handler.py
Normal file
306
meshai/central/firms_handler.py
Normal file
|
|
@ -0,0 +1,306 @@
|
|||
"""v0.6-1 FIRMS hotspot handler -- STORAGE ONLY (no mesh broadcasts).
|
||||
|
||||
Pre-v0.6-1 the v0.5.13 default-deny gate at consumer._normalize() silently
|
||||
dropped every `central.fire.hotspot.>` envelope because no per-adapter handler
|
||||
existed (audit doc v0.6-phase1-audit.md finding #2). The `firms_pixels`
|
||||
table was created in v0.5.8b (v1.sql:98-111) and has been empty ever since.
|
||||
|
||||
This handler closes that gap: every passing FIRMS pixel lands in
|
||||
`firms_pixels`. No mesh broadcasts are emitted -- FIRMS data is for the
|
||||
LLM context (commit #5: env_reporter) and for the v0.6 fire-tracker
|
||||
fusion (per v0.6-design-fire-tracker.md). Returning None from the handler
|
||||
tells the consumer's default-deny clause "no broadcast", which is exactly
|
||||
the v0.6-1 contract (memory rule 19).
|
||||
|
||||
Subject pattern (Central v0.10.0):
|
||||
central.fire.hotspot.<satellite>.<confidence>.<region>
|
||||
where <region> is `us.<state>` or `unknown`.
|
||||
|
||||
Envelope shape (from firms-investigation.md, 250 envelopes 2026-05-28..06-04):
|
||||
envelope["data"]["adapter"] == "firms"
|
||||
envelope["data"]["data"]:
|
||||
latitude (REAL)
|
||||
longitude (REAL)
|
||||
frp (REAL, MW -- fire radiative power)
|
||||
bright_ti4 (REAL, K -- VIIRS brightness temperature)
|
||||
bright_ti5 (REAL, K, optional)
|
||||
satellite ("N" Suomi-NPP | "N20" NOAA-20)
|
||||
instrument ("VIIRS" so far; MODIS would extend this)
|
||||
confidence ("nominal" | "high" | "low")
|
||||
acq_date ("YYYY-MM-DD" UTC)
|
||||
acq_time ("HHMM" UTC, 4-digit)
|
||||
daynight ("D" | "N")
|
||||
version (str)
|
||||
_enriched.geocoder.city/state/county/landclass/elevation_m
|
||||
|
||||
Filtering (hardcoded defaults; commit #3 migrates these to adapter_config
|
||||
GUI rows per Rule 17. Per Matt's lock: defaults become GUI default values
|
||||
with no behavior change on first deploy.):
|
||||
|
||||
FIRMS_CONFIDENCE_FLOOR = "low" -- rank-based; "low" = store every conf
|
||||
FIRMS_FRP_FLOOR = 0.0 -- 0 = store every FRP value
|
||||
FIRMS_BBOX_OPTIONAL = None -- None = no spatial filter
|
||||
|
||||
Permissive defaults are intentional: storage is cheap and v0.6 fire-tracker
|
||||
fusion (FIRMS + WFIGS) needs the full pixel stream to detect unattributed
|
||||
clusters early. Query-time filtering happens in env_reporter (commit #5).
|
||||
|
||||
Dedup:
|
||||
Unique partial index added in v4.sql on
|
||||
(round(lat,5), round(lon,5), acq_time, satellite)
|
||||
Same satellite pixel observation re-published via NATS reconnect /
|
||||
JetStream replay is a no-op INSERT OR IGNORE. 5 decimals on lat/lon
|
||||
is ~1.1 m precision -- well inside VIIRS' 375 m pixel.
|
||||
|
||||
event_log accounting:
|
||||
handled=1 -> row inserted into firms_pixels (or dedup-hit -- still
|
||||
"successfully handled" semantically: we know about it)
|
||||
handled=0 -> dropped (missing coords / outside bbox / below conf
|
||||
floor / below FRP floor / missing acq timestamp).
|
||||
Category is suffixed with "|<reason>" for grep.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from meshai.persistence import get_db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# v0.6-1 hardcoded defaults. Commit #3 will lift these to `adapter_config`
|
||||
# rows + a /api/adapter-config/firms GUI editor. Per Matt's v0.6 Phase 1
|
||||
# refinement #3, the hardcoded values below become the GUI default values
|
||||
# so behavior does not change on first deploy.
|
||||
# ============================================================================
|
||||
|
||||
# Confidence rank. Anything >= floor stores. "low" = store every confidence
|
||||
# level (nominal/high/low). VIIRS-FIRMS publishes string-valued confidence;
|
||||
# MODIS would publish 0-100 ints (not in scope this round -- per investigation
|
||||
# doc §2, all 250 sampled envelopes were VIIRS).
|
||||
_CONFIDENCE_RANK = {"low": 0, "nominal": 1, "high": 2}
|
||||
FIRMS_CONFIDENCE_FLOOR = "low"
|
||||
|
||||
# FRP floor in MW. 0 stores every detection; setting >0 drops below-floor
|
||||
# pixels with event_log "|below_frp_floor" for accounting.
|
||||
FIRMS_FRP_FLOOR = 0.0
|
||||
|
||||
# Optional bbox (min_lat, min_lon, max_lat, max_lon) in degrees. None = no
|
||||
# spatial filter. The Central feed already region-filters at the subject
|
||||
# level (us.id / unknown), so most ops will leave this None.
|
||||
FIRMS_BBOX_OPTIONAL: Optional[tuple[float, float, float, float]] = None
|
||||
|
||||
# Dedup-key lat/lon rounding precision. 5 decimals = ~1.1 m, well inside
|
||||
# the 375 m VIIRS pixel. Same pixel republished via NATS reconnect collapses.
|
||||
_DEDUP_LAT_LON_DECIMALS = 5
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Public entry point
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def handle_firms(envelope: dict, subject: str,
|
||||
data: Optional[dict] = None,
|
||||
now: Optional[int] = None) -> Optional[str]:
|
||||
"""Storage-only FIRMS handler. ALWAYS returns None.
|
||||
|
||||
Args:
|
||||
envelope: CloudEvents envelope from the Central consumer.
|
||||
subject: NATS subject (`central.fire.hotspot.<sat>.<conf>.<region>`).
|
||||
data: Mutable Event.data dict (unused -- no broadcast attached).
|
||||
now: Override current epoch for tests.
|
||||
|
||||
Returns:
|
||||
None unconditionally. The v0.5.13 default-deny clause at
|
||||
consumer._normalize() interprets None as "no broadcast", which is
|
||||
the desired contract for storage-only adapters.
|
||||
"""
|
||||
if not isinstance(envelope, dict):
|
||||
return None
|
||||
inner = envelope.get("data") or {}
|
||||
if (inner.get("adapter") or "") != "firms":
|
||||
return None
|
||||
|
||||
d = inner.get("data") or {}
|
||||
now = now if now is not None else int(time.time())
|
||||
category_raw = inner.get("category") or ""
|
||||
severity_word = _coerce_severity(inner.get("severity"))
|
||||
event_id_external = inner.get("id")
|
||||
|
||||
try:
|
||||
conn = get_db()
|
||||
except Exception:
|
||||
logger.exception("firms_handler: persistence unavailable; dropping")
|
||||
return None
|
||||
|
||||
# ---- field extraction + validation -----------------------------------
|
||||
|
||||
lat = d.get("latitude")
|
||||
lon = d.get("longitude")
|
||||
if not (isinstance(lat, (int, float)) and isinstance(lon, (int, float))):
|
||||
_log_event(conn, now=now, source="firms",
|
||||
category=category_raw + "|missing_coords",
|
||||
severity_word=severity_word,
|
||||
event_id_external=event_id_external,
|
||||
subject=subject, handled=0,
|
||||
table_name=None, table_pk=None)
|
||||
return None
|
||||
lat = float(lat); lon = float(lon)
|
||||
|
||||
# ---- filter: bbox (optional) -----------------------------------------
|
||||
|
||||
if not _in_bbox(lat, lon):
|
||||
_log_event(conn, now=now, source="firms",
|
||||
category=category_raw + "|outside_bbox",
|
||||
severity_word=severity_word,
|
||||
event_id_external=event_id_external,
|
||||
subject=subject, handled=0,
|
||||
table_name=None, table_pk=None)
|
||||
return None
|
||||
|
||||
# ---- filter: confidence floor ----------------------------------------
|
||||
|
||||
conf = d.get("confidence")
|
||||
if not _confidence_passes(conf):
|
||||
_log_event(conn, now=now, source="firms",
|
||||
category=category_raw + "|below_confidence_floor",
|
||||
severity_word=severity_word,
|
||||
event_id_external=event_id_external,
|
||||
subject=subject, handled=0,
|
||||
table_name=None, table_pk=None)
|
||||
return None
|
||||
|
||||
# ---- filter: FRP floor (no-op when FIRMS_FRP_FLOOR <= 0) -------------
|
||||
|
||||
frp_raw = d.get("frp")
|
||||
try:
|
||||
frp = float(frp_raw) if frp_raw is not None else None
|
||||
except (TypeError, ValueError):
|
||||
frp = None
|
||||
if FIRMS_FRP_FLOOR > 0:
|
||||
if frp is None or frp < FIRMS_FRP_FLOOR:
|
||||
_log_event(conn, now=now, source="firms",
|
||||
category=category_raw + "|below_frp_floor",
|
||||
severity_word=severity_word,
|
||||
event_id_external=event_id_external,
|
||||
subject=subject, handled=0,
|
||||
table_name=None, table_pk=None)
|
||||
return None
|
||||
|
||||
# ---- acquisition timestamp (required for dedup key) ------------------
|
||||
|
||||
acq_epoch = _parse_acq_epoch(d.get("acq_date"), d.get("acq_time"))
|
||||
if acq_epoch is None:
|
||||
_log_event(conn, now=now, source="firms",
|
||||
category=category_raw + "|missing_acq_time",
|
||||
severity_word=severity_word,
|
||||
event_id_external=event_id_external,
|
||||
subject=subject, handled=0,
|
||||
table_name=None, table_pk=None)
|
||||
return None
|
||||
|
||||
# ---- persist (INSERT OR IGNORE via v4.sql unique partial index) ------
|
||||
|
||||
satellite = d.get("satellite") or ""
|
||||
brightness_raw = d.get("bright_ti4") if d.get("bright_ti4") is not None \
|
||||
else d.get("brightness")
|
||||
try:
|
||||
brightness = float(brightness_raw) if brightness_raw is not None else None
|
||||
except (TypeError, ValueError):
|
||||
brightness = None
|
||||
|
||||
cur = conn.execute(
|
||||
"INSERT OR IGNORE INTO firms_pixels(irwin_id, lat, lon, acq_time, "
|
||||
"frp, confidence, satellite, brightness) "
|
||||
"VALUES (?,?,?,?,?,?,?,?)",
|
||||
(None, lat, lon, acq_epoch, frp,
|
||||
(str(conf) if conf is not None else None),
|
||||
satellite, brightness),
|
||||
)
|
||||
stored = cur.rowcount > 0
|
||||
|
||||
# event_log row regardless of dedup outcome -- both "stored" and
|
||||
# "dedup-hit" count as "handled" for accounting; the suffix tells them
|
||||
# apart for ops grep.
|
||||
handled = 1 if stored else 1 # dedup hit is still handled-success
|
||||
cat_tag = category_raw if stored else category_raw + "|dedup_hit"
|
||||
_log_event(conn, now=now, source="firms", category=cat_tag,
|
||||
severity_word=severity_word,
|
||||
event_id_external=event_id_external,
|
||||
subject=subject, handled=handled,
|
||||
table_name="firms_pixels" if stored else None,
|
||||
table_pk=(str(cur.lastrowid) if stored else None))
|
||||
|
||||
# STORAGE-ONLY: never broadcast.
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Helpers
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _confidence_passes(conf: Optional[str]) -> bool:
|
||||
"""Return True iff `conf` is at or above FIRMS_CONFIDENCE_FLOOR.
|
||||
|
||||
Unknown / unparseable confidence values fail closed (drop) so the
|
||||
accounting trail flags upstream schema drift instead of silently
|
||||
storing under a degraded label.
|
||||
"""
|
||||
if conf is None:
|
||||
return False
|
||||
rank = _CONFIDENCE_RANK.get(str(conf).lower())
|
||||
if rank is None:
|
||||
return False
|
||||
floor = _CONFIDENCE_RANK.get(FIRMS_CONFIDENCE_FLOOR, 0)
|
||||
return rank >= floor
|
||||
|
||||
|
||||
def _in_bbox(lat: float, lon: float) -> bool:
|
||||
if FIRMS_BBOX_OPTIONAL is None:
|
||||
return True
|
||||
min_lat, min_lon, max_lat, max_lon = FIRMS_BBOX_OPTIONAL
|
||||
return (min_lat <= lat <= max_lat) and (min_lon <= lon <= max_lon)
|
||||
|
||||
|
||||
def _parse_acq_epoch(date_s: Optional[str],
|
||||
time_s: Optional[Any]) -> Optional[int]:
|
||||
"""FIRMS publishes acq_date 'YYYY-MM-DD' + acq_time HHMM (UTC).
|
||||
|
||||
acq_time is sometimes a 4-digit string ("2013") and sometimes an int
|
||||
(2013). Both supported. Zero-padded to 4 chars before parsing.
|
||||
"""
|
||||
if not date_s or time_s is None:
|
||||
return None
|
||||
try:
|
||||
t_str = str(time_s).zfill(4)
|
||||
dt = datetime.strptime(f"{date_s} {t_str}", "%Y-%m-%d %H%M") \
|
||||
.replace(tzinfo=timezone.utc)
|
||||
return int(dt.timestamp())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _coerce_severity(sev: Any) -> Optional[str]:
|
||||
if sev is None: return None
|
||||
if isinstance(sev, str): return sev or None
|
||||
try: return str(int(sev))
|
||||
except (TypeError, ValueError): return str(sev)
|
||||
|
||||
|
||||
def _log_event(conn, *, now, source, category, severity_word,
|
||||
event_id_external, subject, handled,
|
||||
table_name, table_pk) -> None:
|
||||
"""event_log writer -- shape matches sibling handlers exactly."""
|
||||
conn.execute(
|
||||
"INSERT INTO event_log(received_at, source, category, severity_word, "
|
||||
"event_id_external, nats_subject, handled, table_name, table_pk) "
|
||||
"VALUES (?,?,?,?,?,?,?,?,?)",
|
||||
(now, source, category, severity_word, event_id_external, subject,
|
||||
int(bool(handled)), table_name, table_pk),
|
||||
)
|
||||
|
|
@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
DEFAULT_DB_PATH = "/data/meshai.sqlite"
|
||||
MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH"
|
||||
SCHEMA_VERSION = 3
|
||||
SCHEMA_VERSION = 4
|
||||
SCHEMA_META_TABLE = "schema_meta"
|
||||
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
|
||||
|
||||
|
|
|
|||
18
meshai/persistence/migrations/v4.sql
Normal file
18
meshai/persistence/migrations/v4.sql
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
-- v0.6-1 firms_pixels dedup index.
|
||||
--
|
||||
-- The firms_pixels table was created in v1.sql (v0.5.8b) but had no writer
|
||||
-- until v0.6-1 (firms_handler.py). FIRMS publishes the same satellite pixel
|
||||
-- multiple times via NATS reconnect / JetStream replay; without a dedup
|
||||
-- key, every restart would re-store the entire 7-day retention window.
|
||||
--
|
||||
-- Round lat/lon to 5 decimals (~1.1 m precision, well inside the 375 m
|
||||
-- VIIRS pixel) so float-noise variants of the same coord collapse onto
|
||||
-- the same key. `acq_time` is epoch seconds (UTC) parsed from FIRMS'
|
||||
-- acq_date + acq_time fields. `satellite` is "N" (Suomi-NPP) or "N20"
|
||||
-- (NOAA-20); future MODIS adds "Terra"/"Aqua".
|
||||
--
|
||||
-- The handler uses INSERT OR IGNORE so a duplicate is a clean no-op and
|
||||
-- the caller can distinguish via cur.rowcount.
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_firms_pixels_dedup
|
||||
ON firms_pixels(round(lat, 5), round(lon, 5), acq_time, satellite);
|
||||
|
|
@ -7,8 +7,8 @@ return an Event with that exact title + _meshai_precomposed=True so the
|
|||
composer bypass kicks in.
|
||||
|
||||
Covers four cases:
|
||||
(a) envelope with NO matching handler (adapter='firms' has no handler)
|
||||
-> _normalize returns None
|
||||
(a) envelope with NO matching handler (adapter='avalanche' has no
|
||||
Central adapter wired) -> _normalize returns None
|
||||
(b) envelope hits handler, handler returns None (e.g. sub-G3 swpc,
|
||||
stale tomtom) -> _normalize returns None
|
||||
(c) envelope hits handler, handler returns wire string
|
||||
|
|
@ -82,9 +82,10 @@ def _make_envelope(adapter, category, *, inner_id="test_001",
|
|||
|
||||
|
||||
def test_no_handler_match_returns_none(consumer, mem_db):
|
||||
"""FIRMS has no handler; envelope must drop at consumer._normalize."""
|
||||
env = _make_envelope("firms", "fire.hotspot.viirs",
|
||||
inner_id="firms_001")
|
||||
"""Avalanche has no handler (no Central adapter). envelope must
|
||||
drop at consumer._normalize as the default-deny baseline."""
|
||||
env = _make_envelope("avalanche", "avalanche.forecast",
|
||||
inner_id="aval_001")
|
||||
out = consumer._normalize(env["subject"], env)
|
||||
assert out is None
|
||||
|
||||
|
|
@ -193,11 +194,12 @@ def test_handler_returns_wire_event_emitted(consumer, mem_db, monkeypatch):
|
|||
|
||||
def test_envelope_with_title_still_drops_without_handler(consumer, mem_db):
|
||||
"""Regression guard: the v0.5.7-fallback path (data.title -> headline ->\
|
||||
friendly_name -> cat_raw) is GONE in v0.5.13."""
|
||||
env = _make_envelope("firms", "fire.hotspot.viirs",
|
||||
inner_id="firms_with_title",
|
||||
title="Wildfire Hotspot",
|
||||
headline="VIIRS Hotspot Detected")
|
||||
friendly_name -> cat_raw) is GONE in v0.5.13. Uses an unhandled adapter
|
||||
(avalanche) since v0.6-1 added the FIRMS handler."""
|
||||
env = _make_envelope("avalanche", "avalanche.forecast",
|
||||
inner_id="aval_with_title",
|
||||
title="Avalanche Warning",
|
||||
headline="Backcountry advisory")
|
||||
out = consumer._normalize(env["subject"], env)
|
||||
assert out is None, (
|
||||
"v0.5.13 default-deny: data.title and data.headline must NOT rescue\n"
|
||||
|
|
|
|||
384
tests/test_firms_handler.py
Normal file
384
tests/test_firms_handler.py
Normal file
|
|
@ -0,0 +1,384 @@
|
|||
"""Tests for v0.6-1 FIRMS handler (storage-only).
|
||||
|
||||
The handler never returns a wire string (storage-only contract). All
|
||||
assertions check side effects on `firms_pixels` + `event_log`. Per the
|
||||
audit doc finding #2 the handler must close the v0.5.13 silent-drop on
|
||||
`central.fire.hotspot.>` envelopes.
|
||||
|
||||
Envelope shape sourced from firms-investigation.md sampling (250 envelopes
|
||||
2026-05-28..06-04, all VIIRS).
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from meshai.central import firms_handler
|
||||
from meshai.central.firms_handler import handle_firms
|
||||
from meshai.persistence import close_thread_connection, init_db
|
||||
from meshai.persistence import db as persistence_db
|
||||
|
||||
|
||||
# ---------- fixtures --------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mem_db(monkeypatch, tmp_path):
|
||||
db_path = str(tmp_path / "firms-test.sqlite")
|
||||
monkeypatch.setenv("MESHAI_DB_PATH", db_path)
|
||||
persistence_db._initialised.clear()
|
||||
close_thread_connection()
|
||||
conn = init_db()
|
||||
yield conn
|
||||
close_thread_connection()
|
||||
persistence_db._initialised.discard(db_path)
|
||||
|
||||
|
||||
def _firms_env(*,
|
||||
lat=42.19664, lon=-113.70981, frp=135.93,
|
||||
confidence="high", satellite="N",
|
||||
acq_date="2026-05-28", acq_time="1949",
|
||||
bright_ti4=367.0, daynight="D",
|
||||
envelope_id="firms_001",
|
||||
category="fire.hotspot.viirs",
|
||||
severity=3,
|
||||
region="unknown",
|
||||
extras=None):
|
||||
"""Build a FIRMS CloudEvents envelope matching the live wire shape.
|
||||
|
||||
Default fixture is SAMPLE B from firms-investigation.md (Cache Peak
|
||||
high-confidence 135 MW fire). Override individual fields for filter
|
||||
coverage.
|
||||
"""
|
||||
payload = {
|
||||
"id": envelope_id,
|
||||
"latitude": lat,
|
||||
"longitude": lon,
|
||||
"frp": frp,
|
||||
"confidence": confidence,
|
||||
"satellite": satellite,
|
||||
"instrument": "VIIRS",
|
||||
"acq_date": acq_date,
|
||||
"acq_time": acq_time,
|
||||
"bright_ti4": bright_ti4,
|
||||
"daynight": daynight,
|
||||
"version": "2.0NRT",
|
||||
"_enriched": {"geocoder": {"landclass": "Cache Peak Roadless Area",
|
||||
"elevation_m": 2151.9}},
|
||||
}
|
||||
if extras: payload.update(extras)
|
||||
return {
|
||||
"subject": f"central.fire.hotspot.viirs_snpp.{confidence}.{region}",
|
||||
"id": envelope_id,
|
||||
"data": {
|
||||
"id": envelope_id,
|
||||
"adapter": "firms",
|
||||
"category": category,
|
||||
"severity": severity,
|
||||
"time": "2026-05-28T19:49:00Z",
|
||||
"geo": {"centroid": [lon, lat]},
|
||||
"data": payload,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _row_count(mem_db, table):
|
||||
return mem_db.execute(f"SELECT COUNT(*) AS n FROM {table}").fetchone()["n"]
|
||||
|
||||
|
||||
def _last_event_log(mem_db):
|
||||
r = mem_db.execute(
|
||||
"SELECT * FROM event_log ORDER BY id DESC LIMIT 1"
|
||||
).fetchone()
|
||||
return dict(r) if r else None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Happy path: high-confidence pixel stored
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_high_confidence_pixel_persisted(mem_db):
|
||||
env = _firms_env()
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
|
||||
assert out is None, "storage-only handler must never return a wire string"
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
row = mem_db.execute("SELECT * FROM firms_pixels").fetchone()
|
||||
assert row["lat"] == pytest.approx(42.19664)
|
||||
assert row["lon"] == pytest.approx(-113.70981)
|
||||
assert row["frp"] == pytest.approx(135.93)
|
||||
assert row["confidence"] == "high"
|
||||
assert row["satellite"] == "N"
|
||||
assert row["brightness"] == pytest.approx(367.0)
|
||||
assert row["irwin_id"] is None # unattached; v0.6 fire-tracker fills later
|
||||
|
||||
# acq_time should be the parsed UTC epoch of 2026-05-28 19:49Z.
|
||||
import datetime as _dt
|
||||
expected = int(_dt.datetime(2026, 5, 28, 19, 49,
|
||||
tzinfo=_dt.timezone.utc).timestamp())
|
||||
assert row["acq_time"] == expected
|
||||
|
||||
# event_log: handled=1, table_name set, table_pk = inserted rowid.
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["source"] == "firms"
|
||||
assert log["handled"] == 1
|
||||
assert log["table_name"] == "firms_pixels"
|
||||
assert log["table_pk"] == str(row["id"])
|
||||
|
||||
|
||||
def test_nominal_confidence_pixel_persisted_under_default_floor(mem_db):
|
||||
"""Default FIRMS_CONFIDENCE_FLOOR='low' must accept nominal/high/low."""
|
||||
env = _firms_env(confidence="nominal", envelope_id="firms_002")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
def test_low_confidence_pixel_persisted_under_default_floor(mem_db):
|
||||
env = _firms_env(confidence="low", envelope_id="firms_003")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Confidence floor when bumped up
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_low_confidence_dropped_when_floor_is_nominal(mem_db, monkeypatch):
|
||||
monkeypatch.setattr(firms_handler, "FIRMS_CONFIDENCE_FLOOR", "nominal")
|
||||
env = _firms_env(confidence="low", envelope_id="firms_lo")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["handled"] == 0
|
||||
assert log["category"].endswith("|below_confidence_floor")
|
||||
|
||||
|
||||
def test_unknown_confidence_value_dropped(mem_db):
|
||||
env = _firms_env(confidence="bogus", envelope_id="firms_bog")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["category"].endswith("|below_confidence_floor")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# FRP floor
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_low_frp_dropped_when_floor_set(mem_db, monkeypatch):
|
||||
monkeypatch.setattr(firms_handler, "FIRMS_FRP_FLOOR", 5.0)
|
||||
env = _firms_env(frp=1.4, confidence="high", envelope_id="firms_frp_low")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["category"].endswith("|below_frp_floor")
|
||||
|
||||
|
||||
def test_frp_at_floor_stored(mem_db, monkeypatch):
|
||||
monkeypatch.setattr(firms_handler, "FIRMS_FRP_FLOOR", 5.0)
|
||||
env = _firms_env(frp=5.0, confidence="high", envelope_id="firms_frp_at")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
def test_missing_frp_dropped_when_floor_set(mem_db, monkeypatch):
|
||||
monkeypatch.setattr(firms_handler, "FIRMS_FRP_FLOOR", 5.0)
|
||||
env = _firms_env(confidence="high", envelope_id="firms_no_frp",
|
||||
extras={"frp": None})
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
|
||||
|
||||
def test_missing_frp_stored_when_floor_zero(mem_db):
|
||||
"""Default FIRMS_FRP_FLOOR=0: missing FRP still stores (null in column)."""
|
||||
env = _firms_env(confidence="high", envelope_id="firms_no_frp_2",
|
||||
extras={"frp": None})
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
row = mem_db.execute("SELECT * FROM firms_pixels").fetchone()
|
||||
assert row["frp"] is None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Bbox filter (default None = pass-through; explicit bbox tested both sides)
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_bbox_none_passes_through(mem_db):
|
||||
env = _firms_env(lat=51.0, lon=10.0, # Germany
|
||||
envelope_id="firms_eu")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
def test_bbox_drops_outside(mem_db, monkeypatch):
|
||||
# Idaho-ish bbox.
|
||||
monkeypatch.setattr(firms_handler, "FIRMS_BBOX_OPTIONAL",
|
||||
(42.0, -117.5, 49.0, -111.0))
|
||||
env = _firms_env(lat=51.0, lon=10.0, envelope_id="firms_out")
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["category"].endswith("|outside_bbox")
|
||||
|
||||
|
||||
def test_bbox_keeps_inside(mem_db, monkeypatch):
|
||||
monkeypatch.setattr(firms_handler, "FIRMS_BBOX_OPTIONAL",
|
||||
(42.0, -117.5, 49.0, -111.0))
|
||||
env = _firms_env(envelope_id="firms_in") # Cache Peak is inside
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Dedup: same satellite pixel observation arriving twice = no-op
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_dedup_same_pixel_idempotent(mem_db):
|
||||
env = _firms_env(envelope_id="dup_001")
|
||||
handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
handle_firms(env, env["subject"], data={}, now=1_780_660_001)
|
||||
|
||||
assert _row_count(mem_db, "firms_pixels") == 1, "OR IGNORE collapses dup"
|
||||
# Two event_log rows; second is dedup_hit.
|
||||
rows = mem_db.execute(
|
||||
"SELECT category, table_name, handled FROM event_log "
|
||||
"WHERE source='firms' ORDER BY id"
|
||||
).fetchall()
|
||||
assert len(rows) == 2
|
||||
assert rows[0]["table_name"] == "firms_pixels"
|
||||
assert rows[1]["table_name"] is None
|
||||
assert rows[1]["category"].endswith("|dedup_hit")
|
||||
|
||||
|
||||
def test_dedup_collapses_lat_lon_float_noise(mem_db):
|
||||
"""Same coord with sub-1m float noise must hit the same dedup key.
|
||||
5-decimal rounding => differences in the 6th+ decimal are absorbed."""
|
||||
e1 = _firms_env(lat=42.196641234567, lon=-113.709810000001,
|
||||
envelope_id="dup_a")
|
||||
e2 = _firms_env(lat=42.196641111111, lon=-113.709810999999,
|
||||
envelope_id="dup_b")
|
||||
handle_firms(e1, e1["subject"], data={}, now=1_780_660_000)
|
||||
handle_firms(e2, e2["subject"], data={}, now=1_780_660_001)
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
def test_dedup_different_satellite_stored_separately(mem_db):
|
||||
"""Same coord + acq_time but different satellite is 2 distinct observations."""
|
||||
e1 = _firms_env(satellite="N", envelope_id="sat_n")
|
||||
e2 = _firms_env(satellite="N20", envelope_id="sat_n20")
|
||||
handle_firms(e1, e1["subject"], data={}, now=1_780_660_000)
|
||||
handle_firms(e2, e2["subject"], data={}, now=1_780_660_001)
|
||||
assert _row_count(mem_db, "firms_pixels") == 2
|
||||
|
||||
|
||||
def test_dedup_different_acq_time_stored_separately(mem_db):
|
||||
"""Same pixel observed on two passes 12h apart -> 2 rows."""
|
||||
e1 = _firms_env(acq_time="0700", envelope_id="t1")
|
||||
e2 = _firms_env(acq_time="1900", envelope_id="t2")
|
||||
handle_firms(e1, e1["subject"], data={}, now=1_780_660_000)
|
||||
handle_firms(e2, e2["subject"], data={}, now=1_780_660_001)
|
||||
assert _row_count(mem_db, "firms_pixels") == 2
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Bad / missing inputs
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_missing_coords_dropped(mem_db):
|
||||
env = _firms_env(envelope_id="no_coords",
|
||||
extras={"latitude": None, "longitude": None})
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["category"].endswith("|missing_coords")
|
||||
|
||||
|
||||
def test_missing_acq_time_dropped(mem_db):
|
||||
env = _firms_env(envelope_id="no_acq",
|
||||
extras={"acq_date": None, "acq_time": None})
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["category"].endswith("|missing_acq_time")
|
||||
|
||||
|
||||
def test_non_firms_adapter_passes_through(mem_db):
|
||||
"""Defense in depth: handler must early-return on non-firms envelopes."""
|
||||
env = _firms_env(envelope_id="wrong_adapter")
|
||||
env["data"]["adapter"] = "nws"
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 0
|
||||
assert _row_count(mem_db, "event_log") == 0
|
||||
|
||||
|
||||
def test_acq_time_int_accepted(mem_db):
|
||||
"""FIRMS sometimes publishes acq_time as int 2013 rather than '2013'."""
|
||||
env = _firms_env(envelope_id="int_acq",
|
||||
extras={"acq_time": 1949})
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
def test_short_acq_time_zero_padded(mem_db):
|
||||
"""acq_time '49' (early-morning pass) must zero-pad to '0049'."""
|
||||
env = _firms_env(envelope_id="short_acq",
|
||||
extras={"acq_time": "49"})
|
||||
out = handle_firms(env, env["subject"], data={}, now=1_780_660_000)
|
||||
assert out is None
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Integration: envelope through the full consumer -> handler -> SQLite path
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_end_to_end_envelope_through_consumer(mem_db, monkeypatch):
|
||||
"""Confirm: envelope enters consumer._normalize, handle_firms is invoked,
|
||||
firms_pixels row is inserted, and consumer returns None (default-deny
|
||||
keeps the broadcast suppressed). mesh_broadcasts_out MUST stay empty."""
|
||||
from unittest.mock import MagicMock
|
||||
from meshai.config import Config
|
||||
from meshai.central.consumer import CentralConsumer
|
||||
|
||||
cfg = Config()
|
||||
cfg.notifications.cold_start_grace_seconds = 0
|
||||
bus = MagicMock()
|
||||
consumer = CentralConsumer(cfg.environmental, bus)
|
||||
|
||||
env = _firms_env(envelope_id="e2e_001")
|
||||
out = consumer._normalize(env["subject"], env)
|
||||
|
||||
# consumer.normalize returns None -> Event never reaches bus.
|
||||
assert out is None
|
||||
bus.emit.assert_not_called()
|
||||
|
||||
# firms_pixels MUST have the row; mesh_broadcasts_out MUST be empty.
|
||||
assert _row_count(mem_db, "firms_pixels") == 1
|
||||
assert _row_count(mem_db, "mesh_broadcasts_out") == 0
|
||||
|
||||
# event_log records the storage.
|
||||
log = _last_event_log(mem_db)
|
||||
assert log["source"] == "firms"
|
||||
assert log["handled"] == 1
|
||||
assert log["table_name"] == "firms_pixels"
|
||||
Loading…
Add table
Add a link
Reference in a new issue