From b2c4d53b14df7214ec4a36c1834b4345a1b0b480 Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Fri, 5 Jun 2026 15:50:33 +0000 Subject: [PATCH] feat(v0.6-1): FIRMS handler -- storage-only, closes silent-drop on central.fire.hotspot.> v0.5.13 default-deny was silently dropping every FIRMS hotspot because no per-adapter handler existed. firms_pixels table has been empty since v0.5.8b. This commit adds central/firms_handler.py which stores every passing pixel that clears the (currently hardcoded, future GUI-driven) confidence + FRP floors, with dedup on (round(lat,5), round(lon,5), acq_time, satellite) via a unique partial index added in v4.sql. NO mesh broadcasts emitted by this handler -- FIRMS data is for LLM context only and will become queryable when commit #5 (env_reporter) lands. Defaults baked in: FIRMS_CONFIDENCE_FLOOR = "low" -- store every confidence level FIRMS_FRP_FLOOR = 0.0 -- store every FRP value FIRMS_BBOX_OPTIONAL = None -- no spatial filter These become adapter_config GUI rows in commit #3 (per Matt's v0.6 Phase 1 refinement: hardcoded values become GUI default values so first-deploy behavior is unchanged). Wiring: - meshai/central/firms_handler.py (new, 270 lines) - meshai/persistence/migrations/v4.sql (new, unique dedup index) - meshai/persistence/db.py (SCHEMA_VERSION 3 -> 4) - meshai/central/consumer.py (dispatch ladder gets firms branch before default-deny clause; pattern matches handle_swpc / handle_nwis) - tests/test_firms_handler.py (new, 22 tests covering confidence floor, FRP floor, bbox, dedup, missing fields, end-to-end through consumer) - tests/test_consumer_default_deny.py (swap firms -> avalanche for the "no handler" examples since FIRMS now has one) Test count: 658 -> 680 (+22 firms_handler tests, 0 regressions). Refs audit doc v0.6-phase1-audit.md finding #2. --- meshai/central/consumer.py | 10 + meshai/central/firms_handler.py | 306 +++++++++++++++++++++ meshai/persistence/db.py | 2 +- meshai/persistence/migrations/v4.sql | 18 ++ tests/test_consumer_default_deny.py | 22 +- tests/test_firms_handler.py | 384 +++++++++++++++++++++++++++ 6 files changed, 731 insertions(+), 11 deletions(-) create mode 100644 meshai/central/firms_handler.py create mode 100644 meshai/persistence/migrations/v4.sql create mode 100644 tests/test_firms_handler.py diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index 216fe2e..5310598 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -518,6 +518,16 @@ class CentralConsumer: elif inner.get("adapter") == "nwis": from meshai.central.nwis_handler import handle_nwis synthesized = handle_nwis(envelope, subject, data=data) or None + # v0.6-1 firms_handler -- STORAGE-ONLY. handle_firms + # writes to firms_pixels (with dedup) and returns None + # so the default-deny clause below keeps mesh + # broadcasts suppressed. LLM visibility lands in + # commit #5 (env_reporter). Closes the v0.5.13 + # silent-drop on central.fire.hotspot.> (audit doc + # finding #2). + elif inner.get("adapter") == "firms": + from meshai.central.firms_handler import handle_firms + synthesized = handle_firms(envelope, subject, data=data) or None elif n is not None and category in ("work_zone", "road_closure", "road_incident"): synthesized = format_work_zone_mesh(n) or None except Exception: diff --git a/meshai/central/firms_handler.py b/meshai/central/firms_handler.py new file mode 100644 index 0000000..82ebfd5 --- /dev/null +++ b/meshai/central/firms_handler.py @@ -0,0 +1,306 @@ +"""v0.6-1 FIRMS hotspot handler -- STORAGE ONLY (no mesh broadcasts). + +Pre-v0.6-1 the v0.5.13 default-deny gate at consumer._normalize() silently +dropped every `central.fire.hotspot.>` envelope because no per-adapter handler +existed (audit doc v0.6-phase1-audit.md finding #2). The `firms_pixels` +table was created in v0.5.8b (v1.sql:98-111) and has been empty ever since. + +This handler closes that gap: every passing FIRMS pixel lands in +`firms_pixels`. No mesh broadcasts are emitted -- FIRMS data is for the +LLM context (commit #5: env_reporter) and for the v0.6 fire-tracker +fusion (per v0.6-design-fire-tracker.md). Returning None from the handler +tells the consumer's default-deny clause "no broadcast", which is exactly +the v0.6-1 contract (memory rule 19). + +Subject pattern (Central v0.10.0): + central.fire.hotspot... + where is `us.` or `unknown`. + +Envelope shape (from firms-investigation.md, 250 envelopes 2026-05-28..06-04): + envelope["data"]["adapter"] == "firms" + envelope["data"]["data"]: + latitude (REAL) + longitude (REAL) + frp (REAL, MW -- fire radiative power) + bright_ti4 (REAL, K -- VIIRS brightness temperature) + bright_ti5 (REAL, K, optional) + satellite ("N" Suomi-NPP | "N20" NOAA-20) + instrument ("VIIRS" so far; MODIS would extend this) + confidence ("nominal" | "high" | "low") + acq_date ("YYYY-MM-DD" UTC) + acq_time ("HHMM" UTC, 4-digit) + daynight ("D" | "N") + version (str) + _enriched.geocoder.city/state/county/landclass/elevation_m + +Filtering (hardcoded defaults; commit #3 migrates these to adapter_config +GUI rows per Rule 17. Per Matt's lock: defaults become GUI default values +with no behavior change on first deploy.): + + FIRMS_CONFIDENCE_FLOOR = "low" -- rank-based; "low" = store every conf + FIRMS_FRP_FLOOR = 0.0 -- 0 = store every FRP value + FIRMS_BBOX_OPTIONAL = None -- None = no spatial filter + +Permissive defaults are intentional: storage is cheap and v0.6 fire-tracker +fusion (FIRMS + WFIGS) needs the full pixel stream to detect unattributed +clusters early. Query-time filtering happens in env_reporter (commit #5). + +Dedup: + Unique partial index added in v4.sql on + (round(lat,5), round(lon,5), acq_time, satellite) + Same satellite pixel observation re-published via NATS reconnect / + JetStream replay is a no-op INSERT OR IGNORE. 5 decimals on lat/lon + is ~1.1 m precision -- well inside VIIRS' 375 m pixel. + +event_log accounting: + handled=1 -> row inserted into firms_pixels (or dedup-hit -- still + "successfully handled" semantically: we know about it) + handled=0 -> dropped (missing coords / outside bbox / below conf + floor / below FRP floor / missing acq timestamp). + Category is suffixed with "|" for grep. +""" +from __future__ import annotations + +import logging +import time +from datetime import datetime, timezone +from typing import Any, Optional + +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# v0.6-1 hardcoded defaults. Commit #3 will lift these to `adapter_config` +# rows + a /api/adapter-config/firms GUI editor. Per Matt's v0.6 Phase 1 +# refinement #3, the hardcoded values below become the GUI default values +# so behavior does not change on first deploy. +# ============================================================================ + +# Confidence rank. Anything >= floor stores. "low" = store every confidence +# level (nominal/high/low). VIIRS-FIRMS publishes string-valued confidence; +# MODIS would publish 0-100 ints (not in scope this round -- per investigation +# doc ยง2, all 250 sampled envelopes were VIIRS). +_CONFIDENCE_RANK = {"low": 0, "nominal": 1, "high": 2} +FIRMS_CONFIDENCE_FLOOR = "low" + +# FRP floor in MW. 0 stores every detection; setting >0 drops below-floor +# pixels with event_log "|below_frp_floor" for accounting. +FIRMS_FRP_FLOOR = 0.0 + +# Optional bbox (min_lat, min_lon, max_lat, max_lon) in degrees. None = no +# spatial filter. The Central feed already region-filters at the subject +# level (us.id / unknown), so most ops will leave this None. +FIRMS_BBOX_OPTIONAL: Optional[tuple[float, float, float, float]] = None + +# Dedup-key lat/lon rounding precision. 5 decimals = ~1.1 m, well inside +# the 375 m VIIRS pixel. Same pixel republished via NATS reconnect collapses. +_DEDUP_LAT_LON_DECIMALS = 5 + + +# ============================================================================ +# Public entry point +# ============================================================================ + + +def handle_firms(envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + """Storage-only FIRMS handler. ALWAYS returns None. + + Args: + envelope: CloudEvents envelope from the Central consumer. + subject: NATS subject (`central.fire.hotspot...`). + data: Mutable Event.data dict (unused -- no broadcast attached). + now: Override current epoch for tests. + + Returns: + None unconditionally. The v0.5.13 default-deny clause at + consumer._normalize() interprets None as "no broadcast", which is + the desired contract for storage-only adapters. + """ + if not isinstance(envelope, dict): + return None + inner = envelope.get("data") or {} + if (inner.get("adapter") or "") != "firms": + return None + + d = inner.get("data") or {} + now = now if now is not None else int(time.time()) + category_raw = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + event_id_external = inner.get("id") + + try: + conn = get_db() + except Exception: + logger.exception("firms_handler: persistence unavailable; dropping") + return None + + # ---- field extraction + validation ----------------------------------- + + lat = d.get("latitude") + lon = d.get("longitude") + if not (isinstance(lat, (int, float)) and isinstance(lon, (int, float))): + _log_event(conn, now=now, source="firms", + category=category_raw + "|missing_coords", + severity_word=severity_word, + event_id_external=event_id_external, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + lat = float(lat); lon = float(lon) + + # ---- filter: bbox (optional) ----------------------------------------- + + if not _in_bbox(lat, lon): + _log_event(conn, now=now, source="firms", + category=category_raw + "|outside_bbox", + severity_word=severity_word, + event_id_external=event_id_external, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # ---- filter: confidence floor ---------------------------------------- + + conf = d.get("confidence") + if not _confidence_passes(conf): + _log_event(conn, now=now, source="firms", + category=category_raw + "|below_confidence_floor", + severity_word=severity_word, + event_id_external=event_id_external, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # ---- filter: FRP floor (no-op when FIRMS_FRP_FLOOR <= 0) ------------- + + frp_raw = d.get("frp") + try: + frp = float(frp_raw) if frp_raw is not None else None + except (TypeError, ValueError): + frp = None + if FIRMS_FRP_FLOOR > 0: + if frp is None or frp < FIRMS_FRP_FLOOR: + _log_event(conn, now=now, source="firms", + category=category_raw + "|below_frp_floor", + severity_word=severity_word, + event_id_external=event_id_external, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # ---- acquisition timestamp (required for dedup key) ------------------ + + acq_epoch = _parse_acq_epoch(d.get("acq_date"), d.get("acq_time")) + if acq_epoch is None: + _log_event(conn, now=now, source="firms", + category=category_raw + "|missing_acq_time", + severity_word=severity_word, + event_id_external=event_id_external, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # ---- persist (INSERT OR IGNORE via v4.sql unique partial index) ------ + + satellite = d.get("satellite") or "" + brightness_raw = d.get("bright_ti4") if d.get("bright_ti4") is not None \ + else d.get("brightness") + try: + brightness = float(brightness_raw) if brightness_raw is not None else None + except (TypeError, ValueError): + brightness = None + + cur = conn.execute( + "INSERT OR IGNORE INTO firms_pixels(irwin_id, lat, lon, acq_time, " + "frp, confidence, satellite, brightness) " + "VALUES (?,?,?,?,?,?,?,?)", + (None, lat, lon, acq_epoch, frp, + (str(conf) if conf is not None else None), + satellite, brightness), + ) + stored = cur.rowcount > 0 + + # event_log row regardless of dedup outcome -- both "stored" and + # "dedup-hit" count as "handled" for accounting; the suffix tells them + # apart for ops grep. + handled = 1 if stored else 1 # dedup hit is still handled-success + cat_tag = category_raw if stored else category_raw + "|dedup_hit" + _log_event(conn, now=now, source="firms", category=cat_tag, + severity_word=severity_word, + event_id_external=event_id_external, + subject=subject, handled=handled, + table_name="firms_pixels" if stored else None, + table_pk=(str(cur.lastrowid) if stored else None)) + + # STORAGE-ONLY: never broadcast. + return None + + +# ============================================================================ +# Helpers +# ============================================================================ + + +def _confidence_passes(conf: Optional[str]) -> bool: + """Return True iff `conf` is at or above FIRMS_CONFIDENCE_FLOOR. + + Unknown / unparseable confidence values fail closed (drop) so the + accounting trail flags upstream schema drift instead of silently + storing under a degraded label. + """ + if conf is None: + return False + rank = _CONFIDENCE_RANK.get(str(conf).lower()) + if rank is None: + return False + floor = _CONFIDENCE_RANK.get(FIRMS_CONFIDENCE_FLOOR, 0) + return rank >= floor + + +def _in_bbox(lat: float, lon: float) -> bool: + if FIRMS_BBOX_OPTIONAL is None: + return True + min_lat, min_lon, max_lat, max_lon = FIRMS_BBOX_OPTIONAL + return (min_lat <= lat <= max_lat) and (min_lon <= lon <= max_lon) + + +def _parse_acq_epoch(date_s: Optional[str], + time_s: Optional[Any]) -> Optional[int]: + """FIRMS publishes acq_date 'YYYY-MM-DD' + acq_time HHMM (UTC). + + acq_time is sometimes a 4-digit string ("2013") and sometimes an int + (2013). Both supported. Zero-padded to 4 chars before parsing. + """ + if not date_s or time_s is None: + return None + try: + t_str = str(time_s).zfill(4) + dt = datetime.strptime(f"{date_s} {t_str}", "%Y-%m-%d %H%M") \ + .replace(tzinfo=timezone.utc) + return int(dt.timestamp()) + except Exception: + return None + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _log_event(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> None: + """event_log writer -- shape matches sibling handlers exactly.""" + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk), + ) diff --git a/meshai/persistence/db.py b/meshai/persistence/db.py index dead8dc..9bab216 100644 --- a/meshai/persistence/db.py +++ b/meshai/persistence/db.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_DB_PATH = "/data/meshai.sqlite" MESHAI_DB_PATH_ENV = "MESHAI_DB_PATH" -SCHEMA_VERSION = 3 +SCHEMA_VERSION = 4 SCHEMA_META_TABLE = "schema_meta" MIGRATIONS_DIR = Path(__file__).parent / "migrations" diff --git a/meshai/persistence/migrations/v4.sql b/meshai/persistence/migrations/v4.sql new file mode 100644 index 0000000..395576d --- /dev/null +++ b/meshai/persistence/migrations/v4.sql @@ -0,0 +1,18 @@ +-- v0.6-1 firms_pixels dedup index. +-- +-- The firms_pixels table was created in v1.sql (v0.5.8b) but had no writer +-- until v0.6-1 (firms_handler.py). FIRMS publishes the same satellite pixel +-- multiple times via NATS reconnect / JetStream replay; without a dedup +-- key, every restart would re-store the entire 7-day retention window. +-- +-- Round lat/lon to 5 decimals (~1.1 m precision, well inside the 375 m +-- VIIRS pixel) so float-noise variants of the same coord collapse onto +-- the same key. `acq_time` is epoch seconds (UTC) parsed from FIRMS' +-- acq_date + acq_time fields. `satellite` is "N" (Suomi-NPP) or "N20" +-- (NOAA-20); future MODIS adds "Terra"/"Aqua". +-- +-- The handler uses INSERT OR IGNORE so a duplicate is a clean no-op and +-- the caller can distinguish via cur.rowcount. + +CREATE UNIQUE INDEX IF NOT EXISTS idx_firms_pixels_dedup + ON firms_pixels(round(lat, 5), round(lon, 5), acq_time, satellite); diff --git a/tests/test_consumer_default_deny.py b/tests/test_consumer_default_deny.py index 671658f..32a0014 100644 --- a/tests/test_consumer_default_deny.py +++ b/tests/test_consumer_default_deny.py @@ -7,8 +7,8 @@ return an Event with that exact title + _meshai_precomposed=True so the composer bypass kicks in. Covers four cases: - (a) envelope with NO matching handler (adapter='firms' has no handler) - -> _normalize returns None + (a) envelope with NO matching handler (adapter='avalanche' has no + Central adapter wired) -> _normalize returns None (b) envelope hits handler, handler returns None (e.g. sub-G3 swpc, stale tomtom) -> _normalize returns None (c) envelope hits handler, handler returns wire string @@ -82,9 +82,10 @@ def _make_envelope(adapter, category, *, inner_id="test_001", def test_no_handler_match_returns_none(consumer, mem_db): - """FIRMS has no handler; envelope must drop at consumer._normalize.""" - env = _make_envelope("firms", "fire.hotspot.viirs", - inner_id="firms_001") + """Avalanche has no handler (no Central adapter). envelope must + drop at consumer._normalize as the default-deny baseline.""" + env = _make_envelope("avalanche", "avalanche.forecast", + inner_id="aval_001") out = consumer._normalize(env["subject"], env) assert out is None @@ -193,11 +194,12 @@ def test_handler_returns_wire_event_emitted(consumer, mem_db, monkeypatch): def test_envelope_with_title_still_drops_without_handler(consumer, mem_db): """Regression guard: the v0.5.7-fallback path (data.title -> headline ->\ - friendly_name -> cat_raw) is GONE in v0.5.13.""" - env = _make_envelope("firms", "fire.hotspot.viirs", - inner_id="firms_with_title", - title="Wildfire Hotspot", - headline="VIIRS Hotspot Detected") + friendly_name -> cat_raw) is GONE in v0.5.13. Uses an unhandled adapter + (avalanche) since v0.6-1 added the FIRMS handler.""" + env = _make_envelope("avalanche", "avalanche.forecast", + inner_id="aval_with_title", + title="Avalanche Warning", + headline="Backcountry advisory") out = consumer._normalize(env["subject"], env) assert out is None, ( "v0.5.13 default-deny: data.title and data.headline must NOT rescue\n" diff --git a/tests/test_firms_handler.py b/tests/test_firms_handler.py new file mode 100644 index 0000000..8543114 --- /dev/null +++ b/tests/test_firms_handler.py @@ -0,0 +1,384 @@ +"""Tests for v0.6-1 FIRMS handler (storage-only). + +The handler never returns a wire string (storage-only contract). All +assertions check side effects on `firms_pixels` + `event_log`. Per the +audit doc finding #2 the handler must close the v0.5.13 silent-drop on +`central.fire.hotspot.>` envelopes. + +Envelope shape sourced from firms-investigation.md sampling (250 envelopes +2026-05-28..06-04, all VIIRS). +""" +import pytest + +from meshai.central import firms_handler +from meshai.central.firms_handler import handle_firms +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +# ---------- fixtures -------------------------------------------------------- + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "firms-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +def _firms_env(*, + lat=42.19664, lon=-113.70981, frp=135.93, + confidence="high", satellite="N", + acq_date="2026-05-28", acq_time="1949", + bright_ti4=367.0, daynight="D", + envelope_id="firms_001", + category="fire.hotspot.viirs", + severity=3, + region="unknown", + extras=None): + """Build a FIRMS CloudEvents envelope matching the live wire shape. + + Default fixture is SAMPLE B from firms-investigation.md (Cache Peak + high-confidence 135 MW fire). Override individual fields for filter + coverage. + """ + payload = { + "id": envelope_id, + "latitude": lat, + "longitude": lon, + "frp": frp, + "confidence": confidence, + "satellite": satellite, + "instrument": "VIIRS", + "acq_date": acq_date, + "acq_time": acq_time, + "bright_ti4": bright_ti4, + "daynight": daynight, + "version": "2.0NRT", + "_enriched": {"geocoder": {"landclass": "Cache Peak Roadless Area", + "elevation_m": 2151.9}}, + } + if extras: payload.update(extras) + return { + "subject": f"central.fire.hotspot.viirs_snpp.{confidence}.{region}", + "id": envelope_id, + "data": { + "id": envelope_id, + "adapter": "firms", + "category": category, + "severity": severity, + "time": "2026-05-28T19:49:00Z", + "geo": {"centroid": [lon, lat]}, + "data": payload, + }, + } + + +def _row_count(mem_db, table): + return mem_db.execute(f"SELECT COUNT(*) AS n FROM {table}").fetchone()["n"] + + +def _last_event_log(mem_db): + r = mem_db.execute( + "SELECT * FROM event_log ORDER BY id DESC LIMIT 1" + ).fetchone() + return dict(r) if r else None + + +# ============================================================================ +# Happy path: high-confidence pixel stored +# ============================================================================ + + +def test_high_confidence_pixel_persisted(mem_db): + env = _firms_env() + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + + assert out is None, "storage-only handler must never return a wire string" + assert _row_count(mem_db, "firms_pixels") == 1 + + row = mem_db.execute("SELECT * FROM firms_pixels").fetchone() + assert row["lat"] == pytest.approx(42.19664) + assert row["lon"] == pytest.approx(-113.70981) + assert row["frp"] == pytest.approx(135.93) + assert row["confidence"] == "high" + assert row["satellite"] == "N" + assert row["brightness"] == pytest.approx(367.0) + assert row["irwin_id"] is None # unattached; v0.6 fire-tracker fills later + + # acq_time should be the parsed UTC epoch of 2026-05-28 19:49Z. + import datetime as _dt + expected = int(_dt.datetime(2026, 5, 28, 19, 49, + tzinfo=_dt.timezone.utc).timestamp()) + assert row["acq_time"] == expected + + # event_log: handled=1, table_name set, table_pk = inserted rowid. + log = _last_event_log(mem_db) + assert log["source"] == "firms" + assert log["handled"] == 1 + assert log["table_name"] == "firms_pixels" + assert log["table_pk"] == str(row["id"]) + + +def test_nominal_confidence_pixel_persisted_under_default_floor(mem_db): + """Default FIRMS_CONFIDENCE_FLOOR='low' must accept nominal/high/low.""" + env = _firms_env(confidence="nominal", envelope_id="firms_002") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +def test_low_confidence_pixel_persisted_under_default_floor(mem_db): + env = _firms_env(confidence="low", envelope_id="firms_003") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +# ============================================================================ +# Confidence floor when bumped up +# ============================================================================ + + +def test_low_confidence_dropped_when_floor_is_nominal(mem_db, monkeypatch): + monkeypatch.setattr(firms_handler, "FIRMS_CONFIDENCE_FLOOR", "nominal") + env = _firms_env(confidence="low", envelope_id="firms_lo") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + + log = _last_event_log(mem_db) + assert log["handled"] == 0 + assert log["category"].endswith("|below_confidence_floor") + + +def test_unknown_confidence_value_dropped(mem_db): + env = _firms_env(confidence="bogus", envelope_id="firms_bog") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + log = _last_event_log(mem_db) + assert log["category"].endswith("|below_confidence_floor") + + +# ============================================================================ +# FRP floor +# ============================================================================ + + +def test_low_frp_dropped_when_floor_set(mem_db, monkeypatch): + monkeypatch.setattr(firms_handler, "FIRMS_FRP_FLOOR", 5.0) + env = _firms_env(frp=1.4, confidence="high", envelope_id="firms_frp_low") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + log = _last_event_log(mem_db) + assert log["category"].endswith("|below_frp_floor") + + +def test_frp_at_floor_stored(mem_db, monkeypatch): + monkeypatch.setattr(firms_handler, "FIRMS_FRP_FLOOR", 5.0) + env = _firms_env(frp=5.0, confidence="high", envelope_id="firms_frp_at") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +def test_missing_frp_dropped_when_floor_set(mem_db, monkeypatch): + monkeypatch.setattr(firms_handler, "FIRMS_FRP_FLOOR", 5.0) + env = _firms_env(confidence="high", envelope_id="firms_no_frp", + extras={"frp": None}) + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + + +def test_missing_frp_stored_when_floor_zero(mem_db): + """Default FIRMS_FRP_FLOOR=0: missing FRP still stores (null in column).""" + env = _firms_env(confidence="high", envelope_id="firms_no_frp_2", + extras={"frp": None}) + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + row = mem_db.execute("SELECT * FROM firms_pixels").fetchone() + assert row["frp"] is None + + +# ============================================================================ +# Bbox filter (default None = pass-through; explicit bbox tested both sides) +# ============================================================================ + + +def test_bbox_none_passes_through(mem_db): + env = _firms_env(lat=51.0, lon=10.0, # Germany + envelope_id="firms_eu") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +def test_bbox_drops_outside(mem_db, monkeypatch): + # Idaho-ish bbox. + monkeypatch.setattr(firms_handler, "FIRMS_BBOX_OPTIONAL", + (42.0, -117.5, 49.0, -111.0)) + env = _firms_env(lat=51.0, lon=10.0, envelope_id="firms_out") + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + log = _last_event_log(mem_db) + assert log["category"].endswith("|outside_bbox") + + +def test_bbox_keeps_inside(mem_db, monkeypatch): + monkeypatch.setattr(firms_handler, "FIRMS_BBOX_OPTIONAL", + (42.0, -117.5, 49.0, -111.0)) + env = _firms_env(envelope_id="firms_in") # Cache Peak is inside + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +# ============================================================================ +# Dedup: same satellite pixel observation arriving twice = no-op +# ============================================================================ + + +def test_dedup_same_pixel_idempotent(mem_db): + env = _firms_env(envelope_id="dup_001") + handle_firms(env, env["subject"], data={}, now=1_780_660_000) + handle_firms(env, env["subject"], data={}, now=1_780_660_001) + + assert _row_count(mem_db, "firms_pixels") == 1, "OR IGNORE collapses dup" + # Two event_log rows; second is dedup_hit. + rows = mem_db.execute( + "SELECT category, table_name, handled FROM event_log " + "WHERE source='firms' ORDER BY id" + ).fetchall() + assert len(rows) == 2 + assert rows[0]["table_name"] == "firms_pixels" + assert rows[1]["table_name"] is None + assert rows[1]["category"].endswith("|dedup_hit") + + +def test_dedup_collapses_lat_lon_float_noise(mem_db): + """Same coord with sub-1m float noise must hit the same dedup key. + 5-decimal rounding => differences in the 6th+ decimal are absorbed.""" + e1 = _firms_env(lat=42.196641234567, lon=-113.709810000001, + envelope_id="dup_a") + e2 = _firms_env(lat=42.196641111111, lon=-113.709810999999, + envelope_id="dup_b") + handle_firms(e1, e1["subject"], data={}, now=1_780_660_000) + handle_firms(e2, e2["subject"], data={}, now=1_780_660_001) + assert _row_count(mem_db, "firms_pixels") == 1 + + +def test_dedup_different_satellite_stored_separately(mem_db): + """Same coord + acq_time but different satellite is 2 distinct observations.""" + e1 = _firms_env(satellite="N", envelope_id="sat_n") + e2 = _firms_env(satellite="N20", envelope_id="sat_n20") + handle_firms(e1, e1["subject"], data={}, now=1_780_660_000) + handle_firms(e2, e2["subject"], data={}, now=1_780_660_001) + assert _row_count(mem_db, "firms_pixels") == 2 + + +def test_dedup_different_acq_time_stored_separately(mem_db): + """Same pixel observed on two passes 12h apart -> 2 rows.""" + e1 = _firms_env(acq_time="0700", envelope_id="t1") + e2 = _firms_env(acq_time="1900", envelope_id="t2") + handle_firms(e1, e1["subject"], data={}, now=1_780_660_000) + handle_firms(e2, e2["subject"], data={}, now=1_780_660_001) + assert _row_count(mem_db, "firms_pixels") == 2 + + +# ============================================================================ +# Bad / missing inputs +# ============================================================================ + + +def test_missing_coords_dropped(mem_db): + env = _firms_env(envelope_id="no_coords", + extras={"latitude": None, "longitude": None}) + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + log = _last_event_log(mem_db) + assert log["category"].endswith("|missing_coords") + + +def test_missing_acq_time_dropped(mem_db): + env = _firms_env(envelope_id="no_acq", + extras={"acq_date": None, "acq_time": None}) + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + log = _last_event_log(mem_db) + assert log["category"].endswith("|missing_acq_time") + + +def test_non_firms_adapter_passes_through(mem_db): + """Defense in depth: handler must early-return on non-firms envelopes.""" + env = _firms_env(envelope_id="wrong_adapter") + env["data"]["adapter"] = "nws" + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 0 + assert _row_count(mem_db, "event_log") == 0 + + +def test_acq_time_int_accepted(mem_db): + """FIRMS sometimes publishes acq_time as int 2013 rather than '2013'.""" + env = _firms_env(envelope_id="int_acq", + extras={"acq_time": 1949}) + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +def test_short_acq_time_zero_padded(mem_db): + """acq_time '49' (early-morning pass) must zero-pad to '0049'.""" + env = _firms_env(envelope_id="short_acq", + extras={"acq_time": "49"}) + out = handle_firms(env, env["subject"], data={}, now=1_780_660_000) + assert out is None + assert _row_count(mem_db, "firms_pixels") == 1 + + +# ============================================================================ +# Integration: envelope through the full consumer -> handler -> SQLite path +# ============================================================================ + + +def test_end_to_end_envelope_through_consumer(mem_db, monkeypatch): + """Confirm: envelope enters consumer._normalize, handle_firms is invoked, + firms_pixels row is inserted, and consumer returns None (default-deny + keeps the broadcast suppressed). mesh_broadcasts_out MUST stay empty.""" + from unittest.mock import MagicMock + from meshai.config import Config + from meshai.central.consumer import CentralConsumer + + cfg = Config() + cfg.notifications.cold_start_grace_seconds = 0 + bus = MagicMock() + consumer = CentralConsumer(cfg.environmental, bus) + + env = _firms_env(envelope_id="e2e_001") + out = consumer._normalize(env["subject"], env) + + # consumer.normalize returns None -> Event never reaches bus. + assert out is None + bus.emit.assert_not_called() + + # firms_pixels MUST have the row; mesh_broadcasts_out MUST be empty. + assert _row_count(mem_db, "firms_pixels") == 1 + assert _row_count(mem_db, "mesh_broadcasts_out") == 0 + + # event_log records the storage. + log = _last_event_log(mem_db) + assert log["source"] == "firms" + assert log["handled"] == 1 + assert log["table_name"] == "firms_pixels"