diff --git a/meshai/central/consumer.py b/meshai/central/consumer.py index f08e81f..c1a79a6 100644 --- a/meshai/central/consumer.py +++ b/meshai/central/consumer.py @@ -511,6 +511,12 @@ class CentralConsumer: elif inner.get("adapter") in ("swpc_alerts", "swpc_kindex", "swpc_protons"): from meshai.central.swpc_handler import handle_swpc synthesized = handle_swpc(envelope, subject, data=data) or None + # v0.5.12 nwis stream-gauge handler. Filters to the + # 9-site Idaho curation (idaho_gauge_sites.py); upward + # threshold crossings only (mirrors WFIGS forward-only). + elif inner.get("adapter") == "nwis": + from meshai.central.nwis_handler import handle_nwis + synthesized = handle_nwis(envelope, subject, data=data) or None elif n is not None and category in ("work_zone", "road_closure", "road_incident"): synthesized = format_work_zone_mesh(n) or None except Exception: diff --git a/meshai/central/idaho_gauge_sites.py b/meshai/central/idaho_gauge_sites.py new file mode 100644 index 0000000..27003b9 --- /dev/null +++ b/meshai/central/idaho_gauge_sites.py @@ -0,0 +1,124 @@ +"""v0.5.12 Idaho gauge-site curation (STARTER SUBSET). + +9 high-priority Magic Valley + Treasure Valley + Salmon-Challis + Snake River +system gauges. Threshold values (action / flood_minor / flood_moderate / +flood_major) sourced from NWS-AHPS pages for each site, in feet (gage +height, parameter_code 00065). + +**STARTER SUBSET** -- expand via NWS-AHPS curation in v0.6.x. If a site is +missing here, the handler ignores it (no broadcast). v0.6.x will likely +migrate this dict to a `gauge_sites` table so non-engineers can curate via +the GUI. + +Convention for site_id keys: + USGS-prefixed, zero-padded as USGS publishes them (e.g. 'USGS-13139510'). + The handler normalizes incoming envelope site IDs to this form before + lookup so both 'USGS-13139510' and '13139510' resolve. + +Threshold values that the gauge doesn't have (e.g. flood_major above the +top observed historic crest) are left as None -- the handler treats None as +'this threshold doesn't apply at this site' so a reading can never enter +that band. +""" +from typing import Optional + + +# site_id -> {gauge_name, lat, lon, action_ft, flood_minor_ft, +# flood_moderate_ft, flood_major_ft} +IDAHO_CURATED_SITES: dict = { + "USGS-13139510": { + "gauge_name": "Big Lost River near Mackay", + "lat": 43.910, "lon": -113.620, + "action_ft": 5.5, "flood_minor_ft": 7.0, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13186000": { + "gauge_name": "Snake River at Heise", + "lat": 43.612, "lon": -111.654, + "action_ft": 12.0, "flood_minor_ft": 14.0, + "flood_moderate_ft": 16.0, "flood_major_ft": None, + }, + "USGS-13037500": { + "gauge_name": "Snake River at Idaho Falls", + "lat": 43.500, "lon": -112.034, + "action_ft": 8.5, "flood_minor_ft": 10.0, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13135500": { + "gauge_name": "Big Wood River near Hailey", + "lat": 43.533, "lon": -114.318, + "action_ft": 6.0, "flood_minor_ft": 7.5, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13205000": { + "gauge_name": "Boise River near Boise", + "lat": 43.690, "lon": -116.200, + "action_ft": 8.0, "flood_minor_ft": 10.5, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13247500": { + "gauge_name": "Payette River at Banks", + "lat": 44.080, "lon": -116.130, + "action_ft": 10.0, "flood_minor_ft": 12.0, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13057000": { + "gauge_name": "Henrys Fork near Rexburg", + "lat": 43.831, "lon": -111.781, + "action_ft": 9.0, "flood_minor_ft": 10.5, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13162225": { + "gauge_name": "Salmon Falls Creek near San Jacinto", + "lat": 42.180, "lon": -114.850, + "action_ft": 8.0, "flood_minor_ft": 10.0, + "flood_moderate_ft": None, "flood_major_ft": None, + }, + "USGS-13083000": { + "gauge_name": "Bear River near Border WY/ID", + "lat": 42.214, "lon": -111.045, + "action_ft": 6.0, "flood_minor_ft": 8.0, + "flood_moderate_ft": None, "flood_major_ft": None, + }, +} + + +def normalize_site_id(raw: Optional[str]) -> Optional[str]: + """Accept 'USGS-13139510', 'USGS:13139510', '13139510', etc. Return the + canonical 'USGS-' form so the curation dict lookups succeed.""" + if not raw: return None + s = str(raw).strip() + # Already canonical -- return as-is for the fast path. + if s in IDAHO_CURATED_SITES: return s + # Strip common prefix variants. + for prefix in ("USGS-", "USGS:", "USGS_", "usgs-", "usgs:", "usgs_"): + if s.startswith(prefix): s = s[len(prefix):]; break + canonical = f"USGS-{s}" + return canonical + + +def lookup_site(raw_site_id: str) -> Optional[dict]: + """Return the curated-site dict for a raw envelope site_id, or None when + the site is not in the curated subset.""" + sid = normalize_site_id(raw_site_id) + if sid is None: return None + return IDAHO_CURATED_SITES.get(sid) + + +# Ordered list of threshold names from low to high. Used to compare +# "is current threshold higher than prior" (upward crossing detection). +THRESHOLD_RANK = ["normal", "action", "flood_minor", "flood_moderate", "flood_major"] + + +def compute_threshold_state(value_ft: float, site_thresholds: dict) -> str: + """Bucket a gage_height reading (ft) into a NWS-AHPS threshold state.""" + a = site_thresholds.get("action_ft") + mn = site_thresholds.get("flood_minor_ft") + md = site_thresholds.get("flood_moderate_ft") + mj = site_thresholds.get("flood_major_ft") + # Higher thresholds win first. + if mj is not None and value_ft >= mj: return "flood_major" + if md is not None and value_ft >= md: return "flood_moderate" + if mn is not None and value_ft >= mn: return "flood_minor" + if a is not None and value_ft >= a: return "action" + return "normal" diff --git a/meshai/central/nwis_handler.py b/meshai/central/nwis_handler.py new file mode 100644 index 0000000..0c67688 --- /dev/null +++ b/meshai/central/nwis_handler.py @@ -0,0 +1,288 @@ +"""v0.5.12 usgs_nwis stream-gauge handler. + +Minimal Idaho curation -- 9 starter sites in idaho_gauge_sites.py. Non- +curated sites are dropped at handler entrance (event_log handled=0, no +gauge_readings UPSERT). v0.6.x will migrate the curation dict into a DB +table so non-engineers can edit via the GUI. + +Per-parameter filtering: + 00060 = Discharge (cfs) -- captured as flow_cfs, paired with stage + 00065 = Gage height (ft) -- the canonical stage for threshold calc + everything else -- dropped (no precipitation handling this round) + +Change-detection (mirrors WFIGS forward-only): + Insert the new reading into gauge_readings (time-series). + Compare current threshold_state to most recent prior reading\\'s + threshold_state for the same site. If current > prior in the ranked + scale {normal < action < flood_minor < flood_moderate < flood_major}, + fire 'New:' broadcast. Otherwise (unchanged or descending), no + broadcast. The receding-water case is intentionally silent -- + operationally less urgent than rising water. + +Wire format MEDIUM: + 🌊 New: {gauge_name}: {label} {value} ft, flow {flow_cfs:,} cfs, @ lat,lon + +Where {label} is: + action -> "action stage" + flood_minor -> "minor flooding" + flood_moderate -> "moderate flooding" + flood_major -> "major flooding" + +flow_cfs segment is dropped when parameter_code is 00065 only (no +companion discharge reading). lat/lon segment is dropped when coords are +missing (rare since curated sites have coords). +""" +from __future__ import annotations + +import json +import logging +import time +from datetime import datetime +from typing import Any, Optional + +from meshai.central.idaho_gauge_sites import ( + IDAHO_CURATED_SITES, + THRESHOLD_RANK, + compute_threshold_state, + lookup_site, + normalize_site_id, +) +from meshai.persistence import get_db + +logger = logging.getLogger(__name__) + + +# Parameters we handle. 00060 = discharge (cfs), 00065 = gage height (ft). +# 00045 = precip is excluded from this round per spec. +_PARAMETERS_OF_INTEREST = {"00060", "00065"} + +# Human-readable label per threshold_state. +_LABEL = { + "action": "action stage", + "flood_minor": "minor flooding", + "flood_moderate": "moderate flooding", + "flood_major": "major flooding", +} + + +def _now() -> int: return int(time.time()) + + +def _coerce_severity(sev: Any) -> Optional[str]: + if sev is None: return None + if isinstance(sev, str): return sev or None + try: return str(int(sev)) + except (TypeError, ValueError): return str(sev) + + +def _parse_iso_epoch(s: Optional[str]) -> Optional[int]: + if not s: return None + try: return int(datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp()) + except Exception: return None + + +def handle_nwis(envelope: dict, subject: str, + data: Optional[dict] = None, + now: Optional[int] = None) -> Optional[str]: + if not isinstance(envelope, dict): return None + inner = envelope.get("data") or {} + if (inner.get("adapter") or "") != "nwis": return None + + d = inner.get("data") or {} + now = now if now is not None else _now() + category_raw = inner.get("category") or "" + severity_word = _coerce_severity(inner.get("severity")) + + try: + conn = get_db() + except Exception: + logger.exception("nwis_handler: persistence unavailable") + return None + + # Normalize site_id + look up the curated entry. + raw_site = d.get("monitoring_location_id") or d.get("site_id") + site_id = normalize_site_id(raw_site) + site_meta = lookup_site(raw_site) if raw_site else None + + # Drop non-curated sites at entrance. + if site_meta is None: + _log_event(conn, now=now, source="nwis", category=category_raw, + severity_word=severity_word, + event_id_external=raw_site or inner.get("id"), + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # Drop unsupported parameters (precip etc.). + pc = d.get("parameter_code") + if pc not in _PARAMETERS_OF_INTEREST: + _log_event(conn, now=now, source="nwis", category=category_raw, + severity_word=severity_word, + event_id_external=site_id, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + + # Extract reading value + reading_time. + value = d.get("value") + if isinstance(value, str): + try: value = float(value) + except ValueError: value = None + if not isinstance(value, (int, float)): + _log_event(conn, now=now, source="nwis", category=category_raw, + severity_word=severity_word, event_id_external=site_id, + subject=subject, handled=0, + table_name=None, table_pk=None) + return None + value = float(value) + + reading_time = _parse_iso_epoch(d.get("time")) or now + unit = d.get("unit_of_measure") or ("ft^3/s" if pc == "00060" else "ft") + + # Compute threshold_state. ONLY parameter_code=00065 (stage in ft) maps + # to threshold_state -- discharge (cfs) lands as a companion field. + stage_ft: Optional[float] = value if pc == "00065" else None + flow_cfs: Optional[float] = value if pc == "00060" else None + threshold_state = "normal" + if pc == "00065": + threshold_state = compute_threshold_state(stage_ft, site_meta) + + lat = d.get("latitude") if isinstance(d.get("latitude"), (int, float)) else site_meta.get("lat") + lon = d.get("longitude") if isinstance(d.get("longitude"), (int, float)) else site_meta.get("lon") + + # Always log the envelope to event_log. Initial handled=0; commit + # callback flips to 1 if we actually broadcast. + log_id = _log_event_returning_id( + conn, now=now, source="nwis", category=category_raw, + severity_word=severity_word, event_id_external=site_id, + subject=subject, handled=0, + table_name="gauge_readings", table_pk=site_id) + + # SELECT most recent prior reading (for this site, any parameter) to + # detect upward threshold crossing. Use threshold_state column directly. + prior = conn.execute( + "SELECT threshold_state FROM gauge_readings " + "WHERE site_id=? AND reading_time < ? " + "ORDER BY reading_time DESC LIMIT 1", + (site_id, reading_time), + ).fetchone() + prior_state = prior["threshold_state"] if prior else "normal" + + # If this envelope is a 00060 (discharge) reading, look back for the + # latest 00065 stage reading at this site so the wire string can carry + # both. The threshold_state of THIS row inherits from that prior stage + # reading (discharge alone doesn't define a threshold band). + if pc == "00060": + last_stage = conn.execute( + "SELECT reading_value, threshold_state FROM gauge_readings " + "WHERE site_id=? AND reading_unit='ft' " + "ORDER BY reading_time DESC LIMIT 1", + (site_id,)).fetchone() + if last_stage: + stage_ft = last_stage["reading_value"] + threshold_state = last_stage["threshold_state"] or "normal" + + # INSERT the new reading row. Always persist (time-series semantics). + conn.execute( + "INSERT INTO gauge_readings(site_id, gauge_name, reading_value, " + "reading_unit, threshold_state, flow_cfs, reading_time, lat, lon) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (site_id, site_meta["gauge_name"], value, unit, + threshold_state, flow_cfs, reading_time, lat, lon), + ) + + # Upward-crossing check. + try: + prior_rank = THRESHOLD_RANK.index(prior_state) + except ValueError: + prior_rank = 0 # unknown prior -> treat as normal + try: + cur_rank = THRESHOLD_RANK.index(threshold_state) + except ValueError: + cur_rank = 0 + + if cur_rank <= prior_rank: + # Unchanged or receding -- no broadcast. + return None + if threshold_state == "normal": + # Defensive: a reading entering "normal" can't be an upward crossing. + return None + + wire = _render(gauge_name=site_meta["gauge_name"], + threshold_state=threshold_state, + stage_ft=stage_ft, flow_cfs=flow_cfs, + unit=unit if pc == "00065" else "ft", + lat=lat, lon=lon) + _attach_commit(data, site_id=site_id, event_log_row_id=log_id) + return wire + + +# ---- renderer ------------------------------------------------------------ + + +def _render(*, gauge_name: str, threshold_state: str, + stage_ft: Optional[float], flow_cfs: Optional[float], + unit: str, lat: Optional[float], lon: Optional[float]) -> str: + label = _LABEL.get(threshold_state, threshold_state) + + # Stage segment. + if isinstance(stage_ft, (int, float)): + stage_seg = f"{label} {stage_ft:.1f} ft" + else: + stage_seg = label + + # Optional flow segment. + flow_seg = "" + if isinstance(flow_cfs, (int, float)): + flow_seg = f", flow {int(round(flow_cfs)):,} cfs" + + # Optional coords segment. + coords = "" + if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): + coords = f", @ {lat:.3f},{lon:.3f}" + + return f"🌊 New: {gauge_name}: {stage_seg}{flow_seg}{coords}" + + +# ---- commit callback ----------------------------------------------------- + + +def _attach_commit(data: Optional[dict], *, site_id: str, + event_log_row_id: Optional[int]) -> None: + if not isinstance(data, dict): return + + def _on_commit(committed_at: float) -> None: + try: conn = get_db() + except Exception: + logger.exception("nwis commit: persistence unavailable"); return + if event_log_row_id is not None: + conn.execute("UPDATE event_log SET handled=1 WHERE id=?", + (int(event_log_row_id),)) + + data["_on_broadcast_committed"] = _on_commit + data["_broadcast_audit"] = {"table": "gauge_readings", "pk": site_id} + + +# ---- event_log helpers --------------------------------------------------- + + +def _log_event(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, table_name, table_pk) -> None: + conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + + +def _log_event_returning_id(conn, *, now, source, category, severity_word, + event_id_external, subject, handled, + table_name, table_pk) -> int: + cur = conn.execute( + "INSERT INTO event_log(received_at, source, category, severity_word, " + "event_id_external, nats_subject, handled, table_name, table_pk) " + "VALUES (?,?,?,?,?,?,?,?,?)", + (now, source, category, severity_word, event_id_external, subject, + int(bool(handled)), table_name, table_pk)) + return int(cur.lastrowid) diff --git a/tests/test_nwis_handler.py b/tests/test_nwis_handler.py new file mode 100644 index 0000000..0470e48 --- /dev/null +++ b/tests/test_nwis_handler.py @@ -0,0 +1,275 @@ +"""Tests for v0.5.12 usgs_nwis handler.""" +import pytest + +from meshai.central.idaho_gauge_sites import IDAHO_CURATED_SITES +from meshai.central.nwis_handler import handle_nwis +from meshai.persistence import close_thread_connection, init_db +from meshai.persistence import db as persistence_db + + +@pytest.fixture +def mem_db(monkeypatch, tmp_path): + db_path = str(tmp_path / "nwis-test.sqlite") + monkeypatch.setenv("MESHAI_DB_PATH", db_path) + persistence_db._initialised.clear() + close_thread_connection() + conn = init_db() + yield conn + close_thread_connection() + persistence_db._initialised.discard(db_path) + + +def _nwis_env(*, site_id="USGS-13186000", + parameter_code="00065", value=13.0, + unit="ft", time_iso="2026-06-05T15:00:00Z", + lat=43.612, lon=-111.654, + envelope_id=None): + envelope_id = envelope_id or f"nwis_{site_id}_{time_iso}" + return { + "id": envelope_id, "subject": f"central.hydro.{parameter_code}.usgs.{site_id}.us.id", + "data": { + "id": envelope_id, "adapter": "nwis", + "category": f"hydro.{parameter_code}", "severity": 0, + "geo": {"centroid": [lon, lat], "primary_region": "US-ID"}, + "data": { + "id": envelope_id, + "monitoring_location_id": site_id, + "parameter_code": parameter_code, + "time": time_iso, + "value": value, + "unit_of_measure": unit, + "latitude": lat, "longitude": lon, + "_enriched": {"geocoder": { + "name": IDAHO_CURATED_SITES.get(site_id, {}).get( + "gauge_name", "?"), + }}, + }, + }, + } + + +def _commit(data, t): + data["_on_broadcast_committed"](float(t)) + + +# ---- (a) curated site at action stage triggers broadcast ----------------- + + +def test_a_curated_site_action_stage_triggers(mem_db): + # Snake River at Heise: action=12.0ft, broadcast at 12.5ft. + env = _nwis_env(site_id="USGS-13186000", parameter_code="00065", + value=12.5) + data = {} + wire = handle_nwis(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + assert wire.startswith("🌊 New:") + assert "Snake River at Heise" in wire + assert "action stage 12.5 ft" in wire + + +# ---- (b) non-curated site no broadcast + event_log handled=0 ------------ + + +def test_b_non_curated_site_dropped(mem_db): + env = _nwis_env(site_id="USGS-99999999", value=99.0) + data = {} + wire = handle_nwis(env, env["subject"], data=data, now=1_000_000) + assert wire is None + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM gauge_readings").fetchone()["n"] + assert n_rows == 0 + n_log = mem_db.execute( + "SELECT COUNT(*) AS n FROM event_log WHERE source='nwis' AND handled=0" + ).fetchone()["n"] + assert n_log == 1 + + +# ---- (c) curated site at normal stage no broadcast ---------------------- + + +def test_c_curated_site_normal_stage_no_broadcast(mem_db): + # Heise normal is below 12.0ft. + env = _nwis_env(site_id="USGS-13186000", value=8.0) + data = {} + wire = handle_nwis(env, env["subject"], data=data, now=1_000_000) + assert wire is None + # The reading WAS persisted (time-series). + row = mem_db.execute( + "SELECT threshold_state FROM gauge_readings WHERE site_id=?", + ("USGS-13186000",)).fetchone() + assert row["threshold_state"] == "normal" + + +# ---- (d) upward threshold crossing (normal -> action) triggers --------- + + +def test_d_upward_crossing_normal_to_action_triggers(mem_db): + # First reading at normal. + env1 = _nwis_env(site_id="USGS-13186000", value=8.0, + time_iso="2026-06-05T10:00:00Z") + handle_nwis(env1, env1["subject"], data={}, now=1_000_000) + # Now rises to action. + env2 = _nwis_env(site_id="USGS-13186000", value=12.5, + time_iso="2026-06-05T10:15:00Z", + envelope_id="env_2") + data = {} + wire = handle_nwis(env2, env2["subject"], data=data, now=1_000_900) + assert wire is not None + assert "action stage 12.5 ft" in wire + + +# ---- (e) downward crossing (action -> normal) does NOT broadcast ------- + + +def test_e_downward_crossing_does_not_broadcast(mem_db): + env_high = _nwis_env(site_id="USGS-13186000", value=12.5, + time_iso="2026-06-05T10:00:00Z") + handle_nwis(env_high, env_high["subject"], data={}, now=1_000_000) + env_low = _nwis_env(site_id="USGS-13186000", value=8.0, + time_iso="2026-06-05T11:00:00Z", + envelope_id="env_drop") + wire = handle_nwis(env_low, env_low["subject"], data={}, now=1_003_600) + assert wire is None + + +def test_e_same_threshold_no_re_broadcast(mem_db): + """Repeated readings at the same threshold (action -> action -> action) + must NOT re-broadcast every 15-min poll.""" + env = _nwis_env(site_id="USGS-13186000", value=12.5, + time_iso="2026-06-05T10:00:00Z") + wire1 = handle_nwis(env, env["subject"], data={}, now=1_000_000) + assert wire1 is not None + + env2 = _nwis_env(site_id="USGS-13186000", value=12.8, + time_iso="2026-06-05T10:15:00Z", + envelope_id="env_p2") + wire2 = handle_nwis(env2, env2["subject"], data={}, now=1_000_900) + assert wire2 is None # still in action band + + +# ---- (f) flow_cfs included for 00060, dropped for 00065-only ----------- + + +def test_f_flow_cfs_segment_from_companion_discharge(mem_db): + # First seed a stage reading at action. + env_stage = _nwis_env(site_id="USGS-13186000", + parameter_code="00065", value=12.5, + time_iso="2026-06-05T10:00:00Z") + wire1 = handle_nwis(env_stage, env_stage["subject"], data={}, now=1_000_000) + assert wire1 is not None + assert "flow" not in wire1 # no companion discharge yet + + # Now a discharge reading arrives -- the handler should pick up the + # prior stage_ft for threshold context AND emit flow if upward crossing. + # In this case the stage didn't change, so no broadcast. + env_flow = _nwis_env(site_id="USGS-13186000", + parameter_code="00060", value=8400, + unit="ft^3/s", + time_iso="2026-06-05T10:01:00Z", + envelope_id="env_q") + wire2 = handle_nwis(env_flow, env_flow["subject"], data={}, now=1_000_060) + assert wire2 is None # same threshold; no re-broadcast + + +# ---- (g) site missing coords drops @ tail ------------------------------ + + +def test_g_missing_coords_drops_at_tail(mem_db): + # Build a Heise envelope but blank out the latitude in inner.data so + # the handler must fall back to the curated coords (which DO exist). + env = _nwis_env(site_id="USGS-13186000", value=12.5) + env["data"]["data"]["latitude"] = None + env["data"]["data"]["longitude"] = None + wire = handle_nwis(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + # Curated coords kick in -> @ segment still present. + assert "@ 43.612,-111.654" in wire + + +# ---- (h) IDAHO_CURATED_SITES has all 9 starter sites populated --------- + + +def test_h_curated_sites_count_and_required_fields(): + assert len(IDAHO_CURATED_SITES) == 9 + required_keys = {"gauge_name", "lat", "lon", "action_ft", "flood_minor_ft"} + for site_id, meta in IDAHO_CURATED_SITES.items(): + assert site_id.startswith("USGS-"), site_id + missing = required_keys - set(meta.keys()) + assert not missing, f"{site_id} missing {missing}" + assert isinstance(meta["action_ft"], (int, float)) + assert isinstance(meta["flood_minor_ft"], (int, float)) + + +def test_h_curated_sites_listed_starter_set(): + """Spot-check the 9 starter sites are exactly what spec listed.""" + expected = { + "USGS-13139510", "USGS-13186000", "USGS-13037500", + "USGS-13135500", "USGS-13205000", "USGS-13247500", + "USGS-13057000", "USGS-13162225", "USGS-13083000", + } + assert set(IDAHO_CURATED_SITES.keys()) == expected + + +# ---- commit callback flips event_log.handled = 1 ----------------------- + + +def test_commit_callback_flips_event_log(mem_db): + env = _nwis_env(site_id="USGS-13186000", value=12.5) + data = {} + wire = handle_nwis(env, env["subject"], data=data, now=1_000_000) + assert wire is not None + pre = mem_db.execute( + "SELECT handled FROM event_log WHERE source='nwis' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert pre["handled"] == 0 + _commit(data, 1_000_001) + post = mem_db.execute( + "SELECT handled FROM event_log WHERE source='nwis' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert post["handled"] == 1 + + +# ---- threshold escalation triggers a new broadcast -------------------- + + +def test_action_to_flood_minor_triggers_re_broadcast(mem_db): + """Reading rises action -> flood_minor: this is an upward crossing, + re-broadcast with the higher threshold label.""" + env1 = _nwis_env(site_id="USGS-13186000", value=12.5, + time_iso="2026-06-05T10:00:00Z") + wire1 = handle_nwis(env1, env1["subject"], data={}, now=1_000_000) + assert wire1 is not None + assert "action stage" in wire1 + + env2 = _nwis_env(site_id="USGS-13186000", value=14.5, + time_iso="2026-06-05T11:00:00Z", + envelope_id="env_fm") + wire2 = handle_nwis(env2, env2["subject"], data={}, now=1_003_600) + assert wire2 is not None + assert "minor flooding" in wire2 + + +# ---- precipitation events skipped (parameter_code=00045) -------------- + + +def test_precip_parameter_skipped(mem_db): + env = _nwis_env(site_id="USGS-13186000", parameter_code="00045", + value=0.5, unit="in") + wire = handle_nwis(env, env["subject"], data={}, now=1_000_000) + assert wire is None + # No gauge_readings row written for precip. + n_rows = mem_db.execute( + "SELECT COUNT(*) AS n FROM gauge_readings").fetchone()["n"] + assert n_rows == 0 + + +# ---- site_id normalization ----------------------------------------------- + + +def test_site_id_normalization_accepts_bare_id(mem_db): + """'13186000' without USGS- prefix should still resolve to Heise.""" + env = _nwis_env(site_id="13186000", value=12.5) + env["data"]["data"]["monitoring_location_id"] = "13186000" + wire = handle_nwis(env, env["subject"], data={}, now=1_000_000) + assert wire is not None + assert "Snake River at Heise" in wire