feat(v0.6-tail): close 5 v0.6-phase1-complete.md follow-ups

(1) Auto-call refresh-toggles on PUT /api/config/notifications
    meshai/dashboard/api/config_routes.py adds register_config_routes_hooks(app)
    which registers a FastAPI HTTP middleware: on any 2xx PUT whose path
    matches /api/config/notifications or /api/config, the middleware
    invokes _refresh_toggle_filter(app) which reaches into app.state.bus._
    pipeline_components["toggle_filter"] and calls .refresh(app.state.config).
    The dashboard no longer has to remember to ping POST /api/notifications/
    refresh-toggles after a toggle change. The explicit endpoint stays for
    backwards-compat.

(2) env_reporter block-size cap moved to adapter_config
    New registry row pipeline.env_reporter_block_chars (int, default 3000).
    meshai/notifications/env_reporter.py replaces the hardcoded
    _BLOCK_MAX_CHARS = 3000 with _DEFAULT_BLOCK_MAX_CHARS (the fallback) +
    a _block_cap() helper that reads from adapter_config on every slice.
    Mutating the row via PUT /api/adapter-config takes effect on the next
    env_reporter call -- no restart.

(3) Bulk-import endpoint for gauge_sites
    meshai/dashboard/api/gauge_sites_import.py adds
    POST /api/gauge-sites/import with two paths:
      format=csv      -- expects "data" (CSV text with header row matching
                          gauge_sites columns: site_id, gauge_name, lat, lon,
                          and optionally action_ft/flood_minor_ft/
                          flood_moderate_ft/flood_major_ft/enabled). UPSERT
                          via ON CONFLICT(site_id) DO UPDATE. Returns
                          {inserted, updated, skipped}.
      format=nws-ahps -- expects "wfo" (list of WFO codes). Fetches
                          water.weather.gov/ahps2/index.php?wfo=<WFO> for each,
                          regex-parses gauge links, then fetches up to 50
                          gauge detail pages per request and regex-parses
                          lat/lon + four threshold values. Best-effort; rows
                          stored under "AHPS-<gauge_id>" so they dont collide
                          with USGS-* ids. Returns the same shape plus
                          detail_fetched + errors list.
    Frontend (dashboard-frontend/src/pages/GaugeSites.tsx) gains a
    Import button + modal with two tabs (Paste CSV / Scrape NWS-AHPS)
    rendered via an ImportModal component. CSV tab has a 48-row textarea
    with the column-header hint inline; AHPS tab has a comma-separated WFO
    input defaulting to BOI. Both submit via fetch() and show the JSON
    response inline. Invalidates the curation cache server-side on any
    successful insert/update so nwis_handler sees the new gauges on its
    next call.

(4) WFIGS tombstone column -- CORRECTNESS
    v12.sql adds fires.tombstoned_at REAL (nullable) + idx_fires_tombstoned_at.
    meshai/central/wfigs_handler.py: the tombstone branch
    (kind=="wfigs_tombstone") UPDATE fires SET tombstoned_at=COALESCE(
    tombstoned_at, ?) so the first tombstone-time wins (idempotent against
    repeated tombstone envelopes).
    meshai/notifications/reminders/__init__.py: the wfigs tombstone
    termination condition now checks row["tombstoned_at"] IS NOT NULL.
    Reminders correctly STOP for closed fires -- before this change the
    8h cadence would have kept Active: broadcasts going indefinitely past
    a WFIGS removal.
    SCHEMA_VERSION 11 -> 12.

(5) Delete INCIDENT_BROADCAST_HEARTBEAT_S
    meshai/central/incident_handler.py: removed the dead constant
    (v0.5.9 REVISED dropped the heartbeat path but left the constant
    imported-but-never-read).
    tests/test_incident_handler.py: removed the orphan
    test_i_8h_heartbeat_triggers_update test (asserted None, used the
    deleted constant for time arithmetic) and the stray import line.

Tests (tests/test_tail_followups.py, 16 cases):
  - middleware fires refresh on PUT /api/config/notifications (200), does
    NOT fire on PUT /api/config/llm
  - env_reporter _block_cap() default 3000; mutate via PUT, invalidate,
    next read returns the new cap
  - CSV import inserts new rows, updates existing, skips bad rows,
    rejects missing required columns, rejects bad format
  - AHPS index parser extracts (gauge_id, name) from realistic HTML
  - AHPS detail parser extracts lat/lon + four thresholds from realistic
    HTML
  - fires has tombstoned_at column after migrations
  - wfigs tombstone branch stamps tombstoned_at
  - ReminderScheduler skips a fire whose tombstoned_at is NOT NULL
  - ReminderScheduler still fires for a fire whose tombstoned_at IS NULL
  - INCIDENT_BROADCAST_HEARTBEAT_S no longer importable

Foundation/API test counts bumped:
  REGISTRY 58 -> 59 (+ env_reporter_block_chars)
  schema_meta v11 -> v12

Test count: 844 -> 859 (+16 new, -1 deleted dead test). 0 regressions.
This commit is contained in:
Matt Johnson (via Claude) 2026-06-05 21:37:05 +00:00
commit 566b06de06
13 changed files with 704 additions and 47 deletions

View file

@ -195,9 +195,30 @@ async def test_llm_connection(request: Request):
# Called by the frontend after PUT /api/config/notifications so the
# Inhibitor + Grouper + Dispatcher pick up the new enabled toggle set
# on the next event without a container restart.
def _refresh_toggle_filter(app) -> bool:
"""Best-effort live refresh of the running ToggleFilter. Returns True
when the refresh actually fired, False if the pipeline isn t up yet
(typical during tests / early startup). Never raises."""
try:
bus = getattr(app.state, "bus", None)
config = getattr(app.state, "config", None)
if bus is None or config is None:
return False
components = getattr(bus, "_pipeline_components", {}) or {}
tf = components.get("toggle_filter")
if tf is None:
return False
tf.refresh(config)
return True
except Exception:
logger.exception("toggle_filter refresh failed")
return False
@router.post("/notifications/refresh-toggles")
async def refresh_toggles(request: Request):
"""Re-read the live config and refresh the running ToggleFilter."""
"""Explicit refresh endpoint (kept for backwards-compat with the
dashboard's manual ping path)."""
bus = getattr(request.app.state, "bus", None)
config = getattr(request.app.state, "config", None)
if bus is None or config is None:
@ -208,3 +229,24 @@ async def refresh_toggles(request: Request):
raise HTTPException(503, "toggle_filter not on pipeline bus")
tf.refresh(config)
return {"ok": True}
# v0.6-tail item 1: auto-refresh the ToggleFilter after any successful
# config PUT that touches notifications. Registered from server.py at
# startup via register_config_routes_hooks(app).
def register_config_routes_hooks(app):
@app.middleware("http")
async def _auto_refresh_toggle_filter(request, call_next):
response = await call_next(request)
try:
method = request.method.upper()
path = request.url.path
if (method == "PUT"
and 200 <= response.status_code < 300
and ("/api/config/notifications" in path
or path.rstrip("/").endswith("/api/config"))):
_refresh_toggle_filter(request.app)
except Exception:
logger.exception("auto-refresh middleware failed")
return response

View file

@ -0,0 +1,283 @@
"""v0.6-tail item 3: gauge_sites bulk-import endpoint.
POST /api/gauge-sites/import
body:
{"format": "csv", "data": "<CSV text with header row>"}
OR {"format": "nws-ahps", "wfo": ["BOI", "PIH", ...]}
CSV path:
Header must include site_id, gauge_name, lat, lon. Optional:
action_ft, flood_minor_ft, flood_moderate_ft, flood_major_ft, enabled.
Each row is UPSERTed by site_id. Returns count of inserted, updated,
skipped (parse-failure) rows.
NWS-AHPS path:
Fetches https://water.weather.gov/ahps2/index.php?wfo=<WFO> for each
requested WFO, extracts gauge links + names + thresholds via regex
on the gauge-detail pages, UPSERTs each. Hard cap on detail-page
fetches per call to keep the request bounded.
Both paths call invalidate_curation_cache() on success so the next
nwis_handler.lookup_site() call sees the new rows.
"""
from __future__ import annotations
import csv
import io
import logging
import re
import time
from typing import Any, Optional
import httpx
from fastapi import APIRouter, HTTPException, Request
from meshai.persistence.curation import invalidate_curation_cache
logger = logging.getLogger(__name__)
router = APIRouter(tags=["curation"])
_AHPS_DETAIL_FETCH_CAP = 50 # hard cap on gauge detail pages per call
_AHPS_TIMEOUT_S = 8.0
_AHPS_BASE = "https://water.weather.gov/ahps2"
# ============================================================================
# CSV
# ============================================================================
_NUMERIC_FIELDS = ("lat", "lon", "action_ft", "flood_minor_ft",
"flood_moderate_ft", "flood_major_ft")
_REQUIRED = ("site_id", "gauge_name", "lat", "lon")
def _csv_upsert(conn, text: str) -> dict[str, int]:
inserted = updated = skipped = 0
reader = csv.DictReader(io.StringIO(text))
if reader.fieldnames is None:
raise HTTPException(400, "CSV must have a header row")
missing = [c for c in _REQUIRED if c not in reader.fieldnames]
if missing:
raise HTTPException(400, f"CSV missing required columns: {missing}")
now = time.time()
for row in reader:
try:
site_id = (row.get("site_id") or "").strip()
gauge_name = (row.get("gauge_name") or "").strip()
if not site_id or not gauge_name:
skipped += 1; continue
lat = float(row["lat"]); lon = float(row["lon"])
kwargs = {"action_ft": None, "flood_minor_ft": None,
"flood_moderate_ft": None, "flood_major_ft": None}
for f in ("action_ft", "flood_minor_ft", "flood_moderate_ft", "flood_major_ft"):
v = row.get(f)
if v is not None and str(v).strip() != "":
kwargs[f] = float(v)
enabled_raw = row.get("enabled")
enabled = 1
if enabled_raw is not None and str(enabled_raw).strip().lower() in ("false", "0", "no"):
enabled = 0
except (TypeError, ValueError) as e:
skipped += 1
logger.debug("csv import: row skipped %s: %s", row, e)
continue
existing = conn.execute(
"SELECT 1 FROM gauge_sites WHERE site_id=?", (site_id,)
).fetchone()
conn.execute(
"INSERT INTO gauge_sites(site_id, gauge_name, lat, lon, "
"action_ft, flood_minor_ft, flood_moderate_ft, flood_major_ft, "
"enabled, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?) "
"ON CONFLICT(site_id) DO UPDATE SET "
"gauge_name=excluded.gauge_name, lat=excluded.lat, lon=excluded.lon, "
"action_ft=excluded.action_ft, flood_minor_ft=excluded.flood_minor_ft, "
"flood_moderate_ft=excluded.flood_moderate_ft, flood_major_ft=excluded.flood_major_ft, "
"enabled=excluded.enabled, updated_at=excluded.updated_at",
(site_id, gauge_name, lat, lon,
kwargs["action_ft"], kwargs["flood_minor_ft"],
kwargs["flood_moderate_ft"], kwargs["flood_major_ft"],
enabled, now),
)
if existing: updated += 1
else: inserted += 1
return {"inserted": inserted, "updated": updated, "skipped": skipped}
# ============================================================================
# NWS-AHPS scrape (best-effort)
# ============================================================================
# Gauge id rows in the WFO index appear as
# <a href="hydrograph.php?gage=GAUGE&...">GAUGE Name</a>
# Format varies by WFO; we use a forgiving pattern.
_GAUGE_LINK_RE = re.compile(
r'href="hydrograph\.php\?gage=([a-z0-9]+)[^"]*"[^>]*>([^<]+)</a>',
re.IGNORECASE,
)
# On the detail page (hydrograph.php?gage=X) thresholds appear in a tabular
# format like "Action Stage ... 5.5 ft". We grep on common patterns.
_THRESHOLD_PATTERNS = {
"action_ft": re.compile(r"Action(?:\s*Stage)?[^0-9]{1,60}([0-9.]+)\s*ft", re.IGNORECASE),
"flood_minor_ft": re.compile(r"Minor(?:\s*Flood(?:\s*Stage)?)?[^0-9]{1,60}([0-9.]+)\s*ft", re.IGNORECASE),
"flood_moderate_ft": re.compile(r"Moderate(?:\s*Flood(?:\s*Stage)?)?[^0-9]{1,60}([0-9.]+)\s*ft", re.IGNORECASE),
"flood_major_ft": re.compile(r"Major(?:\s*Flood(?:\s*Stage)?)?[^0-9]{1,60}([0-9.]+)\s*ft", re.IGNORECASE),
}
_LATLON_RE = re.compile(
r"Latitude[^0-9\-]{1,40}([0-9.\-]+)[\s\S]{1,200}?Longitude[^0-9\-]{1,40}([0-9.\-]+)",
re.IGNORECASE,
)
def _ahps_parse_index(html: str) -> list[tuple[str, str]]:
"""Return [(gauge_id, gauge_name), ...] from the WFO index page."""
out: list[tuple[str, str]] = []
seen: set[str] = set()
for m in _GAUGE_LINK_RE.finditer(html or ""):
gid = m.group(1).strip().lower()
name = re.sub(r"\s+", " ", m.group(2)).strip()
if gid and gid not in seen:
seen.add(gid)
out.append((gid, name))
return out
def _ahps_parse_detail(html: str) -> dict[str, Any]:
"""Pull lat/lon + thresholds from the gauge detail HTML."""
out: dict[str, Any] = {
"lat": None, "lon": None,
"action_ft": None, "flood_minor_ft": None,
"flood_moderate_ft": None, "flood_major_ft": None,
}
if not html: return out
m = _LATLON_RE.search(html)
if m:
try:
out["lat"] = float(m.group(1)); out["lon"] = float(m.group(2))
except ValueError:
pass
for key, pat in _THRESHOLD_PATTERNS.items():
m = pat.search(html)
if m:
try: out[key] = float(m.group(1))
except ValueError: pass
return out
def _ahps_upsert(conn, wfos: list[str]) -> dict[str, Any]:
"""Best-effort: fetch each WFO index, then up to _AHPS_DETAIL_FETCH_CAP
gauge detail pages per call across all WFOs. Returns a summary."""
inserted = updated = skipped = 0
fetched_detail = 0
errors: list[str] = []
with httpx.Client(timeout=_AHPS_TIMEOUT_S, follow_redirects=True) as cli:
for wfo in wfos:
wfo = (wfo or "").strip().upper()
if not wfo or not wfo.isalnum():
errors.append(f"WFO {wfo!r}: invalid"); continue
url = f"{_AHPS_BASE}/index.php?wfo={wfo}"
try:
r = cli.get(url)
if r.status_code != 200:
errors.append(f"WFO {wfo}: index status {r.status_code}")
continue
gauges = _ahps_parse_index(r.text)
except Exception as e:
errors.append(f"WFO {wfo}: index fetch {e}"); continue
for gid, gname in gauges:
if fetched_detail >= _AHPS_DETAIL_FETCH_CAP:
errors.append(
f"detail-fetch cap ({_AHPS_DETAIL_FETCH_CAP}) "
f"reached; remaining gauges skipped"
)
break
fetched_detail += 1
detail_url = f"{_AHPS_BASE}/hydrograph.php?gage={gid}"
try:
r2 = cli.get(detail_url)
if r2.status_code != 200:
skipped += 1; continue
parsed = _ahps_parse_detail(r2.text)
except Exception:
skipped += 1; continue
if parsed["lat"] is None or parsed["lon"] is None:
skipped += 1; continue
# AHPS gauge ids are the NWS id; we store as 'AHPS-<id>' so
# they don't collide with USGS-* ids managed elsewhere.
site_id = f"AHPS-{gid.upper()}"
now = time.time()
existing = conn.execute(
"SELECT 1 FROM gauge_sites WHERE site_id=?", (site_id,)
).fetchone()
conn.execute(
"INSERT INTO gauge_sites(site_id, gauge_name, lat, lon, "
"action_ft, flood_minor_ft, flood_moderate_ft, flood_major_ft, "
"enabled, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?) "
"ON CONFLICT(site_id) DO UPDATE SET "
"gauge_name=excluded.gauge_name, lat=excluded.lat, lon=excluded.lon, "
"action_ft=COALESCE(excluded.action_ft, action_ft), "
"flood_minor_ft=COALESCE(excluded.flood_minor_ft, flood_minor_ft), "
"flood_moderate_ft=COALESCE(excluded.flood_moderate_ft, flood_moderate_ft), "
"flood_major_ft=COALESCE(excluded.flood_major_ft, flood_major_ft), "
"updated_at=excluded.updated_at",
(site_id, gname, parsed["lat"], parsed["lon"],
parsed["action_ft"], parsed["flood_minor_ft"],
parsed["flood_moderate_ft"], parsed["flood_major_ft"],
1, now),
)
if existing: updated += 1
else: inserted += 1
return {
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"detail_fetched": fetched_detail,
"errors": errors,
}
# ============================================================================
# Route
# ============================================================================
@router.post("/gauge-sites/import")
async def gauge_sites_import(request: Request) -> dict:
body = await request.json()
if not isinstance(body, dict):
raise HTTPException(400, "body must be a JSON object")
fmt = (body.get("format") or "").lower().strip()
if fmt not in ("csv", "nws-ahps"):
raise HTTPException(400, "format must be 'csv' or 'nws-ahps'")
from meshai.persistence import get_db
conn = get_db()
if fmt == "csv":
text = body.get("data")
if not isinstance(text, str) or not text.strip():
raise HTTPException(400, "csv body must include 'data' (CSV text)")
result = _csv_upsert(conn, text)
else: # nws-ahps
wfos = body.get("wfo") or []
if isinstance(wfos, str):
wfos = [wfos]
if not isinstance(wfos, list) or not wfos:
raise HTTPException(400, "nws-ahps body must include 'wfo' (list or string)")
result = _ahps_upsert(conn, wfos)
if result.get("inserted", 0) or result.get("updated", 0):
invalidate_curation_cache()
logger.info("gauge_sites import (%s): %s", fmt, result)
return result