mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
feat(nwis): site + stats enrichment — named location + WaterWatch normalcy band (v0.8.0)
Opens the v0.8.x data-quality cleanup arc. Production code; central-gui AND
central-supervisor restart (adapter contract + enrichment behavior change).
NWIS events rendered as a bare "Water reading: 111 ft3/s" with an empty Location
column -- an operator couldn't tell where the gauge is or whether 111 ft3/s is
drought-low, normal, or near-flood. Coordinates were present but the reverse
geocoder returns null city/state/county for rural gauge points, and USGS site +
percentile data was never fetched. v0.8.0 fetches both.
Approach B (adapter-owned, per the proposal decision): the NWIS adapter -- which
already owns the USGS APIs -- fetches site metadata and daily stats itself and
writes two provenance bundles under event.data["_enriched"]:
- usgs_site {name, lat, lon, state, county} from the OGC monitoring-locations
item-by-id (the API family the adapter already speaks; JSON, no RDB parser).
- usgs_stats {value, percentile, class_label, severity_band, p10..p90, record_max,
count, period} from the legacy RDB daily-statistics service (the OGC API has no
stats endpoint). USGS percentiles are % of days at-or-below, so higher = higher
flow; classified to the WaterWatch bands -> severity 0-4 (record=4, much
above/below=3, above/below=2, normal=1; None reserved for "no stats", distinct
from a normal-flow gauge). Severity is set on the event, so it drives the v0.7.1
severity chip-picker filter + v0.7.2 map-marker opacity.
- new nwis_enrich.py: pure parse/classify/percentile/band helpers + a sqlite
SiteStatsCache (site TTL 365d, stats TTL 90d -- one fetch per site+param serves
every reading for the window, so a warm cache makes zero USGS calls). USGS down
-> cached-if-present else all-null bundle; the event still publishes.
Framework: the single agreed generic change -- supervisor apply_enrichment now
MERGES into _enriched instead of overwriting, so the still-global geocoder phase
doesn't clobber the adapter's bundles. No other adapter writes _enriched, so this
is inert for them.
GUI: _event_summaries/nwis.html -> "<site> -- <value> <units> (<band>, <Nth>
percentile)", with graceful fallback to "<site> -- <value>" then the bare
"Water reading:". _event_rows/nwis.html detail gains site/normalcy/typical/location
rows. _events_rows.html Location column falls back generically to any
_enriched.<source> carrying state/county when the geocoder is null (works for
future enrichers). events.json contract unchanged (additions under _enriched only).
conftest isolate_enrichment_cache also redirects NWIS_CACHE_DB_PATH off the prod
path (unprivileged-user test isolation). Adds tests/test_nwis_enrichment.py (28
tests: parse, band edges incl P0/P9/P10/P75/P90/record, percentile interpolation,
cache hit/miss/expire, adapter enrich + graceful-null + cache-hit-no-refetch,
summary rendering per band).
Full suite: 710 passed, 1 skipped (central and unprivileged zvx, 3x each).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6546db0144
commit
8612f0b75d
9 changed files with 729 additions and 19 deletions
|
|
@ -19,6 +19,12 @@ from tenacity import (
|
|||
)
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapters import nwis_enrich
|
||||
from central.adapters.nwis_enrich import (
|
||||
USGS_SITE_TTL_S,
|
||||
USGS_STATS_TTL_S,
|
||||
SiteStatsCache,
|
||||
)
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -31,6 +37,13 @@ NWIS_LATEST_CONTINUOUS_URL = (
|
|||
NWIS_MONITORING_LOCATIONS_URL = (
|
||||
"https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items"
|
||||
)
|
||||
# v0.8.0 enrichment endpoints: site metadata via OGC item-by-id; daily stats via
|
||||
# the legacy RDB stat service (the OGC API exposes no statistics endpoint).
|
||||
NWIS_SITE_ITEM_URL = NWIS_MONITORING_LOCATIONS_URL
|
||||
NWIS_STATS_URL = "https://waterservices.usgs.gov/nwis/stat/"
|
||||
# Site/stats enrichment cache (monkeypatched off the prod path in tests, like
|
||||
# the supervisor's ENRICHMENT_CACHE_DB_PATH).
|
||||
NWIS_CACHE_DB_PATH = Path("/var/lib/central/nwis_cache.db")
|
||||
# Per-render cap for the settings-driven preview (PR G.5). Keep small so the
|
||||
# /adapters/<name> edit page renders quickly.
|
||||
_PREVIEW_LIMIT = 50
|
||||
|
|
@ -140,6 +153,7 @@ class NWISAdapter(SourceAdapter):
|
|||
self._cursor_db_path = cursor_db_path
|
||||
self._session: aiohttp.ClientSession | None = None
|
||||
self._db: sqlite3.Connection | None = None
|
||||
self._enrich_cache: SiteStatsCache | None = None
|
||||
self.parameter_codes: list[str] = list(
|
||||
config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES)
|
||||
)
|
||||
|
|
@ -167,6 +181,7 @@ class NWISAdapter(SourceAdapter):
|
|||
ON published_ids (last_seen)
|
||||
""")
|
||||
self._db.commit()
|
||||
self._enrich_cache = SiteStatsCache(NWIS_CACHE_DB_PATH)
|
||||
if self.region is None:
|
||||
logger.warning(
|
||||
"NWIS started without region bbox — upstream will return CONUS-wide records on every poll. "
|
||||
|
|
@ -313,6 +328,12 @@ class NWISAdapter(SourceAdapter):
|
|||
)
|
||||
if self.is_published(dedup_key):
|
||||
continue
|
||||
# Site + stats enrichment (v0.8.0) on new events only. Sets
|
||||
# _enriched.usgs_site / usgs_stats in event.data and derives
|
||||
# severity from the WaterWatch band (None when no stats).
|
||||
severity = await self._enrich_event(event)
|
||||
if severity != event.severity:
|
||||
event = event.model_copy(update={"severity": severity})
|
||||
yield event
|
||||
self.mark_published(dedup_key)
|
||||
events_yielded += 1
|
||||
|
|
@ -394,6 +415,84 @@ class NWISAdapter(SourceAdapter):
|
|||
data=data,
|
||||
)
|
||||
|
||||
async def _site_bundle(self, site_id: str) -> dict[str, Any]:
|
||||
"""usgs_site bundle from the OGC monitoring-locations item. Cache-first;
|
||||
all-null (never raises) on lookup failure so the event still publishes."""
|
||||
if self._enrich_cache is not None:
|
||||
cached = await self._enrich_cache.get("site", site_id, USGS_SITE_TTL_S)
|
||||
if cached is not None:
|
||||
return cached
|
||||
try:
|
||||
text = await self._fetch(f"{NWIS_SITE_ITEM_URL}/{site_id}?f=json")
|
||||
bundle = nwis_enrich.parse_site_feature(json.loads(text))
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"NWIS site enrichment failed",
|
||||
extra={"site": site_id, "error": str(e)},
|
||||
)
|
||||
return nwis_enrich.site_null_bundle()
|
||||
if self._enrich_cache is not None:
|
||||
await self._enrich_cache.set("site", site_id, bundle)
|
||||
return bundle
|
||||
|
||||
async def _stats_bundle(
|
||||
self,
|
||||
site_id: str,
|
||||
bare_site_no: str,
|
||||
parameter_code: str,
|
||||
value: float | None,
|
||||
event_time: datetime,
|
||||
) -> dict[str, Any]:
|
||||
"""usgs_stats bundle from the legacy RDB daily-percentile service.
|
||||
|
||||
Caches the parsed day-of-year table per (site, parameter_code) so a
|
||||
single fetch classifies every reading at that site for the TTL window.
|
||||
All-null (value echoed; never raises) on failure / no data.
|
||||
"""
|
||||
key = f"{site_id}:{parameter_code}"
|
||||
table = None
|
||||
if self._enrich_cache is not None:
|
||||
table = await self._enrich_cache.get("stats", key, USGS_STATS_TTL_S)
|
||||
if table is None:
|
||||
params = {
|
||||
"sites": bare_site_no,
|
||||
"statReportType": "daily",
|
||||
"statTypeCd": "P10,P25,P50,P75,P90,max",
|
||||
"parameterCd": parameter_code,
|
||||
"format": "rdb",
|
||||
}
|
||||
try:
|
||||
text = await self._fetch(f"{NWIS_STATS_URL}?{urlencode(params)}")
|
||||
table = nwis_enrich.parse_stats_rdb(text)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"NWIS stats enrichment failed",
|
||||
extra={"site": site_id, "parameter_code": parameter_code, "error": str(e)},
|
||||
)
|
||||
return {**nwis_enrich.stats_null_bundle(), "value": value}
|
||||
if self._enrich_cache is not None:
|
||||
await self._enrich_cache.set("stats", key, table)
|
||||
return nwis_enrich.build_stats_bundle(
|
||||
value, table, event_time.month, event_time.day
|
||||
)
|
||||
|
||||
async def _enrich_event(self, event: Event) -> int | None:
|
||||
"""Attach _enriched.usgs_site + _enriched.usgs_stats in place; return the
|
||||
stats-derived severity (0-4, or None when no usable stats)."""
|
||||
data = event.data
|
||||
site_id = data.get("monitoring_location_id")
|
||||
if not site_id:
|
||||
return event.severity
|
||||
_agency, bare_site_no = _subject_tokens_for_id(site_id)
|
||||
site = await self._site_bundle(site_id)
|
||||
stats = await self._stats_bundle(
|
||||
site_id, bare_site_no, data.get("parameter_code"), data.get("value"), event.time
|
||||
)
|
||||
enriched = data.setdefault("_enriched", {})
|
||||
enriched["usgs_site"] = site
|
||||
enriched["usgs_stats"] = stats
|
||||
return stats.get("severity_band")
|
||||
|
||||
async def _fetch_preview_text(self, url: str) -> str:
|
||||
"""One-shot GET for the preview render.
|
||||
|
||||
|
|
|
|||
305
src/central/adapters/nwis_enrich.py
Normal file
305
src/central/adapters/nwis_enrich.py
Normal file
|
|
@ -0,0 +1,305 @@
|
|||
"""USGS site + stats enrichment helpers for the NWIS adapter (v0.8.0).
|
||||
|
||||
NWIS-specific (Approach B — the adapter owns its USGS enrichment), producing the
|
||||
``_enriched.usgs_site`` and ``_enriched.usgs_stats`` bundles. This module holds
|
||||
the pure parse/classify functions plus a small sqlite cache; the adapter wires
|
||||
them in (see nwis.py).
|
||||
|
||||
- Site metadata: OGC monitoring-locations item-by-id (JSON), same API family the
|
||||
adapter already speaks.
|
||||
- Daily stats: the legacy waterservices RDB ``stat`` service — the OGC API has no
|
||||
statistics endpoint.
|
||||
|
||||
USGS percentiles are "percent of days at or below this value", so a HIGHER
|
||||
percentile means HIGHER flow. WaterWatch bands map to a 0-4 severity (None is
|
||||
reserved for "no stats available", which is distinct from a normal-flow gauge):
|
||||
|
||||
value > historical daily max -> record high severity 4
|
||||
value > P90 -> much above normal severity 3
|
||||
P75 < value <= P90 -> above normal severity 2
|
||||
P25 <= value <= P75 -> normal severity 1
|
||||
P10 <= value < P25 -> below normal severity 2
|
||||
value < P10 -> much below normal severity 3
|
||||
(no usable thresholds) -> None severity None
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TTLs: site metadata is near-static; the daily-percentile table drifts slowly
|
||||
# (USGS recomputes period-of-record stats infrequently), so one fetch per
|
||||
# site+parameter covers the whole year of day-of-year rows for a season.
|
||||
USGS_SITE_TTL_S = 365 * 86400
|
||||
USGS_STATS_TTL_S = 90 * 86400
|
||||
|
||||
SITE_FIELDS: tuple[str, ...] = ("name", "lat", "lon", "state", "county")
|
||||
STATS_FIELDS: tuple[str, ...] = (
|
||||
"value", "percentile", "class_label", "severity_band",
|
||||
"p10", "p25", "p50", "p75", "p90", "record_max", "count", "period",
|
||||
)
|
||||
|
||||
# WaterWatch band -> severity (0-4). None is NOT in here: it means "no stats".
|
||||
SEVERITY_BY_BAND: dict[str, int] = {
|
||||
"record high": 4,
|
||||
"much above normal": 3,
|
||||
"above normal": 2,
|
||||
"normal": 1,
|
||||
"below normal": 2,
|
||||
"much below normal": 3,
|
||||
}
|
||||
|
||||
|
||||
def site_null_bundle() -> dict[str, Any]:
|
||||
return {f: None for f in SITE_FIELDS}
|
||||
|
||||
|
||||
def stats_null_bundle() -> dict[str, Any]:
|
||||
return {f: None for f in STATS_FIELDS}
|
||||
|
||||
|
||||
def parse_site_feature(feature: dict) -> dict[str, Any]:
|
||||
"""OGC monitoring-locations Feature -> usgs_site bundle (all-null on bad shape)."""
|
||||
if not isinstance(feature, dict):
|
||||
return site_null_bundle()
|
||||
props = feature.get("properties") or {}
|
||||
geom = feature.get("geometry") or {}
|
||||
coords = geom.get("coordinates") if isinstance(geom, dict) else None
|
||||
lat = lon = None
|
||||
if (
|
||||
isinstance(coords, list)
|
||||
and len(coords) == 2
|
||||
and all(isinstance(c, (int, float)) for c in coords)
|
||||
):
|
||||
lon, lat = float(coords[0]), float(coords[1]) # GeoJSON (lon, lat)
|
||||
return {
|
||||
"name": props.get("monitoring_location_name"),
|
||||
"lat": lat,
|
||||
"lon": lon,
|
||||
"state": props.get("state_name"),
|
||||
"county": props.get("county_name"),
|
||||
}
|
||||
|
||||
|
||||
def _num(cols: list[str], idx: dict[str, int], key: str) -> float | None:
|
||||
i = idx.get(key)
|
||||
if i is None or i >= len(cols):
|
||||
return None
|
||||
raw = cols[i].strip()
|
||||
if raw == "":
|
||||
return None
|
||||
try:
|
||||
return float(raw)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def parse_stats_rdb(text: str) -> dict[str, dict[str, Any]]:
|
||||
"""Parse the daily-statistics RDB into a per-day threshold table.
|
||||
|
||||
Returns ``{"<month>-<day>": {p10, p25, p50, p75, p90, max, count,
|
||||
begin_yr, end_yr}}`` with blank/missing numeric cells as None. Keys are
|
||||
JSON-friendly strings so the table caches directly. ``{}`` on bad input.
|
||||
Column positions are read from the RDB header row (robust to USGS column
|
||||
reordering); the line after the header is the RDB format row and is skipped.
|
||||
"""
|
||||
lines = [ln for ln in text.splitlines() if ln and not ln.startswith("#")]
|
||||
if len(lines) < 3:
|
||||
return {}
|
||||
header = lines[0].split("\t")
|
||||
idx = {name: i for i, name in enumerate(header)}
|
||||
if "month_nu" not in idx or "day_nu" not in idx:
|
||||
return {}
|
||||
table: dict[str, dict[str, Any]] = {}
|
||||
for ln in lines[2:]: # lines[1] is the "5s 15s ..." format row
|
||||
cols = ln.split("\t")
|
||||
month = _num(cols, idx, "month_nu")
|
||||
day = _num(cols, idx, "day_nu")
|
||||
if month is None or day is None:
|
||||
continue
|
||||
count = _num(cols, idx, "count_nu")
|
||||
table[f"{int(month)}-{int(day)}"] = {
|
||||
"p10": _num(cols, idx, "p10_va"),
|
||||
"p25": _num(cols, idx, "p25_va"),
|
||||
"p50": _num(cols, idx, "p50_va"),
|
||||
"p75": _num(cols, idx, "p75_va"),
|
||||
"p90": _num(cols, idx, "p90_va"),
|
||||
"max": _num(cols, idx, "max_va"),
|
||||
"count": int(count) if count is not None else None,
|
||||
"begin_yr": _num(cols, idx, "begin_yr"),
|
||||
"end_yr": _num(cols, idx, "end_yr"),
|
||||
}
|
||||
return table
|
||||
|
||||
|
||||
def percentile_of(value: float, day: dict[str, Any]) -> int | None:
|
||||
"""Interpolate the value's approximate percentile from a day's thresholds.
|
||||
|
||||
Piecewise-linear over the available (percentile, threshold) points, with an
|
||||
implicit (0th, 0.0) lower bound (flow/stage are non-negative) and a (100th,
|
||||
max) upper bound when the daily max is known. None when fewer than two
|
||||
usable points exist.
|
||||
"""
|
||||
pts: list[tuple[float, float]] = [(0.0, 0.0)]
|
||||
for pct, key in ((10, "p10"), (25, "p25"), (50, "p50"), (75, "p75"), (90, "p90")):
|
||||
v = day.get(key)
|
||||
if v is not None:
|
||||
pts.append((float(pct), float(v)))
|
||||
mx = day.get("max")
|
||||
if mx is not None:
|
||||
pts.append((100.0, float(mx)))
|
||||
pts = sorted(set(pts), key=lambda t: t[1])
|
||||
if len(pts) < 2:
|
||||
return None
|
||||
if value <= pts[0][1]:
|
||||
return int(round(pts[0][0]))
|
||||
if value >= pts[-1][1]:
|
||||
return int(round(pts[-1][0]))
|
||||
for i in range(1, len(pts)):
|
||||
p0, v0 = pts[i - 1]
|
||||
p1, v1 = pts[i]
|
||||
if v0 <= value <= v1:
|
||||
if v1 == v0:
|
||||
return int(round(p1))
|
||||
return int(round(p0 + (p1 - p0) * (value - v0) / (v1 - v0)))
|
||||
return None
|
||||
|
||||
|
||||
def classify(value: float | None, day: dict[str, Any]) -> tuple[str | None, int | None, int | None]:
|
||||
"""Classify a value against a day's thresholds -> (class_label, percentile, severity).
|
||||
|
||||
Best-effort when some thresholds are missing (e.g. P90 blank -> the top
|
||||
reachable band without a max is 'above normal'). Returns all-None when no
|
||||
threshold lets us place the value at all.
|
||||
"""
|
||||
if value is None:
|
||||
return (None, None, None)
|
||||
p10, p25, p75, p90, mx = (
|
||||
day.get("p10"), day.get("p25"), day.get("p75"), day.get("p90"), day.get("max"),
|
||||
)
|
||||
label: str | None = None
|
||||
if mx is not None and value > mx:
|
||||
label = "record high"
|
||||
elif p90 is not None and value > p90:
|
||||
label = "much above normal"
|
||||
elif p75 is not None and value > p75:
|
||||
label = "above normal"
|
||||
elif p25 is not None and value >= p25:
|
||||
label = "normal"
|
||||
elif p10 is not None and value >= p10:
|
||||
label = "below normal"
|
||||
elif p10 is not None and value < p10:
|
||||
label = "much below normal"
|
||||
if label is None:
|
||||
return (None, percentile_of(value, day), None)
|
||||
return (label, percentile_of(value, day), SEVERITY_BY_BAND.get(label))
|
||||
|
||||
|
||||
def build_stats_bundle(value: float | None, table: dict[str, dict[str, Any]], month: int, day: int) -> dict[str, Any]:
|
||||
"""Assemble the usgs_stats bundle for one reading from a parsed day-table.
|
||||
|
||||
The reading's ``value`` is always echoed (useful even with no thresholds);
|
||||
thresholds/classification fill in when the matching day-of-year row exists.
|
||||
"""
|
||||
bundle = stats_null_bundle()
|
||||
bundle["value"] = value
|
||||
row = table.get(f"{month}-{day}") if table else None
|
||||
if not row:
|
||||
return bundle
|
||||
for k in ("p10", "p25", "p50", "p75", "p90"):
|
||||
bundle[k] = row.get(k)
|
||||
bundle["record_max"] = row.get("max")
|
||||
bundle["count"] = row.get("count")
|
||||
by, ey = row.get("begin_yr"), row.get("end_yr")
|
||||
bundle["period"] = f"{int(by)}–{int(ey)}" if by and ey else None
|
||||
label, pct, sev = classify(value, row)
|
||||
bundle["class_label"] = label
|
||||
bundle["percentile"] = pct
|
||||
bundle["severity_band"] = sev
|
||||
return bundle
|
||||
|
||||
|
||||
class SiteStatsCache:
|
||||
"""Thread-offloaded sqlite cache for NWIS site bundles + stats day-tables.
|
||||
|
||||
Keyed by (kind, key): kind 'site' key=monitoring_location_id (TTL ~1yr),
|
||||
kind 'stats' key='<site_id>:<parameter_code>' (TTL ~90d, stores the whole
|
||||
parsed day-of-year table so one fetch serves every reading at that site).
|
||||
Mirrors the EnrichmentCache pattern (fresh connection per op, ttl on read).
|
||||
"""
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS nwis_cache (
|
||||
kind TEXT NOT NULL,
|
||||
key TEXT NOT NULL,
|
||||
payload_json TEXT NOT NULL,
|
||||
cached_at TEXT NOT NULL,
|
||||
PRIMARY KEY (kind, key)
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str | Path) -> None:
|
||||
self._db_path = Path(db_path)
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = self._connect()
|
||||
try:
|
||||
conn.execute(self._SCHEMA)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
return sqlite3.connect(self._db_path, timeout=30)
|
||||
|
||||
def _get_sync(self, kind: str, key: str, ttl_s: int) -> Any | None:
|
||||
conn = self._connect()
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"SELECT payload_json, cached_at FROM nwis_cache WHERE kind = ? AND key = ?",
|
||||
(kind, key),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
if row is None:
|
||||
return None
|
||||
payload_json, cached_at_iso = row
|
||||
try:
|
||||
cached_at = datetime.fromisoformat(cached_at_iso)
|
||||
except ValueError:
|
||||
return None
|
||||
if cached_at.tzinfo is None:
|
||||
cached_at = cached_at.replace(tzinfo=timezone.utc)
|
||||
if (datetime.now(timezone.utc) - cached_at).total_seconds() > ttl_s:
|
||||
return None
|
||||
return json.loads(payload_json)
|
||||
|
||||
def _set_sync(self, kind: str, key: str, payload: Any) -> None:
|
||||
now_iso = datetime.now(timezone.utc).isoformat()
|
||||
conn = self._connect()
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO nwis_cache (kind, key, payload_json, cached_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT (kind, key) DO UPDATE SET
|
||||
payload_json = excluded.payload_json,
|
||||
cached_at = excluded.cached_at
|
||||
""",
|
||||
(kind, key, json.dumps(payload), now_iso),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
async def get(self, kind: str, key: str, ttl_s: int) -> Any | None:
|
||||
return await asyncio.to_thread(self._get_sync, kind, key, ttl_s)
|
||||
|
||||
async def set(self, kind: str, key: str, payload: Any) -> None:
|
||||
await asyncio.to_thread(self._set_sync, kind, key, payload)
|
||||
|
|
@ -1,6 +1,13 @@
|
|||
{# USGS NWIS water observations. Fields from payload->data->data. #}
|
||||
{# USGS NWIS water observations + v0.8.0 site/stats enrichment. payload->data->data. #}
|
||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||
{% set enr = d.get('_enriched') or {} %}
|
||||
{% set site = enr.get('usgs_site') or {} %}
|
||||
{% set st = enr.get('usgs_stats') or {} %}
|
||||
{% if site.get('name') %}<dt>Site name</dt><dd>{{ site.name }}</dd>{% endif %}
|
||||
{% if d.get('parameter_code') is not none %}<dt>Parameter</dt><dd>{{ d.parameter_code }}</dd>{% endif %}
|
||||
{% if d.get('value') is not none %}<dt>Value</dt><dd>{{ d.value }} {{ d.get('unit_of_measure', '') }}</dd>{% endif %}
|
||||
{% if d.get('monitoring_location_id') is not none %}<dt>Site</dt><dd><code>{{ d.monitoring_location_id }}</code></dd>{% endif %}
|
||||
{% if st.get('class_label') %}<dt>Normalcy</dt><dd>{{ st.class_label }}{% if st.get('percentile') is not none %} (~{{ st.percentile }} percentile){% endif %}</dd>{% endif %}
|
||||
{% if st.get('p50') is not none %}<dt>Typical (this day)</dt><dd>median {{ st.p50 }} {{ d.get('unit_of_measure', '') }}{% if st.get('period') %} · {{ st.period }}{% if st.get('count') %} ({{ st.count }} yrs){% endif %}{% endif %}</dd>{% endif %}
|
||||
{% if site.get('county') or site.get('state') %}<dt>Location</dt><dd>{{ [site.county, site.state] | select | join(', ') }}</dd>{% endif %}
|
||||
{% if d.get('monitoring_location_id') is not none %}<dt>Site ID</dt><dd><code>{{ d.monitoring_location_id }}</code></dd>{% endif %}
|
||||
{% if d.get('statistic_id') is not none %}<dt>Statistic</dt><dd>{{ d.statistic_id }}</dd>{% endif %}
|
||||
|
|
|
|||
|
|
@ -1,2 +1,15 @@
|
|||
{# USGS NWIS one-line summary (v0.8.0). Prefer site name + value + WaterWatch
|
||||
band/percentile; fall back to "<site> — <value>" when stats are absent but the
|
||||
site is named; finally to the bare "Water reading: <value>" so we never regress. #}
|
||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||
{%- if d.get('value') is not none %}Water reading: {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}{% endif -%}
|
||||
{% set enr = d.get('_enriched') or {} %}
|
||||
{% set site = enr.get('usgs_site') or {} %}
|
||||
{% set st = enr.get('usgs_stats') or {} %}
|
||||
{%- set p = st.get('percentile') -%}
|
||||
{%- set suf = (('th' if (p % 100) in [11, 12, 13] else {1: 'st', 2: 'nd', 3: 'rd'}.get(p % 10, 'th')) if p is not none else '') -%}
|
||||
{%- if site.get('name') and d.get('value') is not none -%}
|
||||
{{ site.name }} — {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}
|
||||
{%- if st.get('class_label') %} ({{ st.class_label }}{% if p is not none %}, {{ p }}{{ suf }} percentile{% endif %}){% endif -%}
|
||||
{%- elif d.get('value') is not none -%}
|
||||
Water reading: {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}
|
||||
{%- endif -%}
|
||||
|
|
|
|||
|
|
@ -20,14 +20,24 @@
|
|||
fallback (no hardcoded list). Captured once so it serves both the
|
||||
Subject cell and the map popup (via data-subject). #}
|
||||
{% set subject_summary %}{% include ["_event_summaries/" ~ event.adapter ~ ".html", "_event_summaries/_default.html"] %}{% endset %}
|
||||
{# Location: generic _enriched.geocoder reader, then top-level named
|
||||
fields, then coordinates. No adapter-specific logic. #}
|
||||
{# Location: geocoder-first, then top-level named fields. When the
|
||||
geocoder resolved no named place, fall back generically to ANY other
|
||||
_enriched.<source> carrying county/state (e.g. usgs_site, v0.8.0) so
|
||||
the pattern works for future enrichers without per-adapter logic. #}
|
||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||
{% set gc = (d.get('_enriched') or {}).get('geocoder') or {} %}
|
||||
{% set loc_local = gc.get('city') or d.get('city') or gc.get('county') or d.get('county') %}
|
||||
{% set loc_state = gc.get('state') or d.get('state') %}
|
||||
{% set loc_country = gc.get('country') or d.get('country') %}
|
||||
{% set loc_parts = [loc_local, loc_state, loc_country] | select | list %}
|
||||
{% set enr = d.get('_enriched') or {} %}
|
||||
{% set gc = enr.get('geocoder') or {} %}
|
||||
{% set ns = namespace(
|
||||
local=(gc.get('city') or gc.get('county') or d.get('city') or d.get('county')),
|
||||
state=(gc.get('state') or d.get('state')),
|
||||
country=(gc.get('country') or d.get('country'))) %}
|
||||
{% if not (ns.local or ns.state) %}
|
||||
{% for _src, b in enr.items() if _src != 'geocoder' and b is mapping %}
|
||||
{% if not ns.local %}{% set ns.local = b.get('county') or b.get('city') %}{% endif %}
|
||||
{% if not ns.state %}{% set ns.state = b.get('state') %}{% endif %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
{% set loc_parts = [ns.local, ns.state, ns.country] | select | list %}
|
||||
<tr class="event-row" data-row-idx="{{ loop.index0 }}"
|
||||
data-event-id="{{ event.id }}"
|
||||
data-adapter="{{ event.adapter }}"
|
||||
|
|
|
|||
|
|
@ -79,9 +79,12 @@ async def apply_enrichment(
|
|||
a non-null coordinate pair in event.data. If no declared pair resolves to
|
||||
coordinates, still attaches an all-null bundle so that every event from an
|
||||
enriched adapter carries _enriched (consumers get a stable field set).
|
||||
Each enricher's result is keyed by enricher.name. Mutates the data dict in
|
||||
place (Event is frozen, but its data dict is not — this avoids a
|
||||
model_copy on every published event).
|
||||
Each enricher's result is keyed by enricher.name. Results are MERGED into
|
||||
event.data["_enriched"] (not overwritten) so an adapter that populated its
|
||||
own bundles before yielding — e.g. NWIS's usgs_site/usgs_stats (v0.8.0) —
|
||||
keeps them alongside the framework's location-keyed bundles. Mutates the
|
||||
data dict in place (Event is frozen, but its data dict is not — this avoids
|
||||
a model_copy on every published event).
|
||||
"""
|
||||
if not enrichment_locations or not enrichers:
|
||||
return
|
||||
|
|
@ -91,20 +94,18 @@ async def apply_enrichment(
|
|||
if lat is None or lon is None:
|
||||
continue
|
||||
location = {"lat": float(lat), "lon": float(lon)}
|
||||
enriched: dict[str, Any] = {}
|
||||
target = event.data.setdefault("_enriched", {})
|
||||
for enricher in enrichers:
|
||||
enriched[enricher.name] = await enricher.enrich(location)
|
||||
event.data["_enriched"] = enriched
|
||||
target[enricher.name] = await enricher.enrich(location)
|
||||
return
|
||||
# No declared pair resolved to coordinates. Still attach _enriched: each
|
||||
# enricher resolves the null location to its own all-null bundle (per the
|
||||
# never-raise contract), so coordless events (e.g. removal tombstones)
|
||||
# carry the same shape as enriched ones.
|
||||
null_location = {"lat": None, "lon": None}
|
||||
enriched = {}
|
||||
target = event.data.setdefault("_enriched", {})
|
||||
for enricher in enrichers:
|
||||
enriched[enricher.name] = await enricher.enrich(null_location)
|
||||
event.data["_enriched"] = enriched
|
||||
target[enricher.name] = await enricher.enrich(null_location)
|
||||
|
||||
# Stream subject mappings -- derived from the registry; every stream is included
|
||||
# (META too: supervisor must create it in JetStream even though archive skips it).
|
||||
|
|
|
|||
|
|
@ -22,14 +22,24 @@ def isolate_enrichment_cache(tmp_path, monkeypatch):
|
|||
so without this fixture the suite writes to (or, for any user without write
|
||||
access to /var/lib/central, fails on) the live cache. Point it at a
|
||||
per-test temp dir so no test ever touches the production path.
|
||||
|
||||
Also redirects the NWIS adapter's site/stats cache (v0.8.0,
|
||||
`central.adapters.nwis.NWIS_CACHE_DB_PATH`, same /var/lib/central prod
|
||||
default) for the same reason — NWISAdapter.startup() opens it.
|
||||
"""
|
||||
import central.supervisor as supervisor_mod
|
||||
import central.adapters.nwis as nwis_mod
|
||||
|
||||
monkeypatch.setattr(
|
||||
supervisor_mod,
|
||||
"ENRICHMENT_CACHE_DB_PATH",
|
||||
tmp_path / "enrichment_cache.db",
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
nwis_mod,
|
||||
"NWIS_CACHE_DB_PATH",
|
||||
tmp_path / "nwis_cache.db",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
|
|
|
|||
|
|
@ -175,6 +175,9 @@ class TestNWISAdapter:
|
|||
tmp_path / "cursors.db",
|
||||
)
|
||||
adapter._fetch = AsyncMock(return_value=_fixture_text())
|
||||
# Isolate polling fetches from v0.8.0 per-event site/stats enrichment
|
||||
# (which also calls _fetch); enrichment is covered in test_nwis_enrichment.
|
||||
adapter._enrich_event = AsyncMock(return_value=None)
|
||||
await adapter.startup()
|
||||
events = [e async for e in adapter.poll()]
|
||||
await adapter.shutdown()
|
||||
|
|
@ -215,6 +218,8 @@ class TestNWISAdapter:
|
|||
tmp_path / "cursors.db",
|
||||
)
|
||||
adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)])
|
||||
# Isolate polling fetches from v0.8.0 per-event site/stats enrichment.
|
||||
adapter._enrich_event = AsyncMock(return_value=None)
|
||||
await adapter.startup()
|
||||
events = [e async for e in adapter.poll()]
|
||||
await adapter.shutdown()
|
||||
|
|
|
|||
260
tests/test_nwis_enrichment.py
Normal file
260
tests/test_nwis_enrichment.py
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
"""Tests for v0.8.0 NWIS site + stats enrichment.
|
||||
|
||||
Covers the pure parse/classify/band logic, the SiteStatsCache (hit/miss/expire),
|
||||
the adapter's _enrich_event orchestration with mocked USGS responses (incl.
|
||||
graceful nulls when USGS is down and cache-hit avoiding a refetch), and the
|
||||
L-c summary template rendering per WaterWatch band. No live network, no live DB.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import jinja2
|
||||
import pytest
|
||||
|
||||
from central.adapters import nwis_enrich as ne
|
||||
from central.adapters.nwis import NWISAdapter
|
||||
from central.models import Event, Geo
|
||||
|
||||
# ── canned USGS fixtures ────────────────────────────────────────────────────
|
||||
|
||||
SITE_JSON = """
|
||||
{"type":"Feature",
|
||||
"geometry":{"type":"Point","coordinates":[-93.1926861111111,40.8004444444444]},
|
||||
"properties":{"monitoring_location_name":"South Fork Chariton River near Promise City, IA",
|
||||
"state_name":"Iowa","county_name":"Wayne County",
|
||||
"country_name":"United States of America"}}
|
||||
"""
|
||||
|
||||
# header / RDB-format row / one data row for month 1 day 1 (p90 intentionally blank).
|
||||
STATS_RDB = (
|
||||
"# canned\n"
|
||||
"agency_cd\tsite_no\tparameter_cd\tts_id\tloc_web_ds\tmonth_nu\tday_nu\tbegin_yr\tend_yr\tcount_nu\tmax_va_yr\tmax_va\tp10_va\tp25_va\tp50_va\tp75_va\tp90_va\n"
|
||||
"5s\t15s\t5s\t10n\t15s\t3n\t3n\t6n\t6n\t8n\t5n\t12s\t12s\t12s\t12s\t12s\t12s\n"
|
||||
"USGS\t06903700\t00060\t43334\t\t1\t1\t1968\t2026\t59\t2019\t315\t10\t25\t50\t75\t90\n"
|
||||
)
|
||||
|
||||
DAY = {"p10": 10.0, "p25": 25.0, "p50": 50.0, "p75": 75.0, "p90": 90.0, "max": 200.0}
|
||||
|
||||
|
||||
# ── parse_site_feature ──────────────────────────────────────────────────────
|
||||
|
||||
def test_parse_site_feature_full():
|
||||
import json
|
||||
b = ne.parse_site_feature(json.loads(SITE_JSON))
|
||||
assert b["name"] == "South Fork Chariton River near Promise City, IA"
|
||||
assert b["state"] == "Iowa" and b["county"] == "Wayne County"
|
||||
assert round(b["lat"], 3) == 40.800 and round(b["lon"], 3) == -93.193
|
||||
|
||||
|
||||
def test_parse_site_feature_bad_shape_is_all_null():
|
||||
assert ne.parse_site_feature({}) == ne.site_null_bundle()
|
||||
assert ne.parse_site_feature(None) == ne.site_null_bundle()
|
||||
|
||||
|
||||
# ── parse_stats_rdb ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_parse_stats_rdb_keys_and_values():
|
||||
table = ne.parse_stats_rdb(STATS_RDB)
|
||||
assert "1-1" in table
|
||||
row = table["1-1"]
|
||||
assert row["p10"] == 10.0 and row["p75"] == 75.0 and row["max"] == 315.0
|
||||
assert row["count"] == 59 and row["begin_yr"] == 1968.0 and row["end_yr"] == 2026.0
|
||||
|
||||
|
||||
def test_parse_stats_rdb_garbage_is_empty():
|
||||
assert ne.parse_stats_rdb("") == {}
|
||||
assert ne.parse_stats_rdb("not\trdb\n") == {}
|
||||
|
||||
|
||||
# ── classify: WaterWatch band edges ─────────────────────────────────────────
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value,label,severity",
|
||||
[
|
||||
(5.0, "much below normal", 3), # < P10 (P0-P9 region)
|
||||
(9.0, "much below normal", 3), # P9
|
||||
(10.0, "below normal", 2), # == P10 boundary
|
||||
(24.0, "below normal", 2),
|
||||
(25.0, "normal", 1), # == P25
|
||||
(50.0, "normal", 1),
|
||||
(75.0, "normal", 1), # == P75 (not > P75)
|
||||
(80.0, "above normal", 2), # P75..P90
|
||||
(90.0, "above normal", 2), # == P90 (not > P90)
|
||||
(120.0, "much above normal", 3), # > P90, <= max
|
||||
(250.0, "record high", 4), # > historical max
|
||||
],
|
||||
)
|
||||
def test_classify_bands(value, label, severity):
|
||||
lbl, pct, sev = ne.classify(value, DAY)
|
||||
assert lbl == label
|
||||
assert sev == severity
|
||||
assert ne.SEVERITY_BY_BAND.get(label) == severity
|
||||
|
||||
|
||||
def test_classify_no_thresholds_is_none():
|
||||
assert ne.classify(42.0, {}) == (None, None, None)
|
||||
|
||||
|
||||
def test_classify_none_value():
|
||||
assert ne.classify(None, DAY) == (None, None, None)
|
||||
|
||||
|
||||
def test_percentile_interpolates_and_bounds():
|
||||
assert ne.percentile_of(50.0, DAY) == 50 # on the P50 point
|
||||
assert ne.percentile_of(0.0, DAY) == 0 # lower bound
|
||||
assert ne.percentile_of(200.0, DAY) == 100 # at max
|
||||
mid = ne.percentile_of(17.5, DAY) # between P10(10) and P25(25)
|
||||
assert 10 < mid < 25
|
||||
|
||||
|
||||
# ── build_stats_bundle ──────────────────────────────────────────────────────
|
||||
|
||||
def test_build_stats_bundle_classifies_matching_day():
|
||||
table = ne.parse_stats_rdb(STATS_RDB)
|
||||
b = ne.build_stats_bundle(120.0, table, 1, 1)
|
||||
assert b["value"] == 120.0
|
||||
assert b["class_label"] == "much above normal"
|
||||
assert b["severity_band"] == 3
|
||||
assert b["p50"] == 50.0 and b["record_max"] == 315.0
|
||||
assert b["period"] == "1968–2026" and b["count"] == 59
|
||||
|
||||
|
||||
def test_build_stats_bundle_no_matching_day_echoes_value_only():
|
||||
table = ne.parse_stats_rdb(STATS_RDB)
|
||||
b = ne.build_stats_bundle(120.0, table, 7, 4) # no 7-4 row
|
||||
assert b["value"] == 120.0
|
||||
assert b["class_label"] is None and b["severity_band"] is None
|
||||
|
||||
|
||||
# ── SiteStatsCache: miss / hit / expire ─────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cache_miss_then_hit(tmp_path):
|
||||
cache = ne.SiteStatsCache(tmp_path / "nwis_cache.db")
|
||||
assert await cache.get("site", "X", 100) is None
|
||||
await cache.set("site", "X", {"name": "Gauge"})
|
||||
assert (await cache.get("site", "X", 100))["name"] == "Gauge"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cache_expired(tmp_path):
|
||||
cache = ne.SiteStatsCache(tmp_path / "nwis_cache.db")
|
||||
await cache.set("stats", "X:00060", {"p10": 1})
|
||||
assert await cache.get("stats", "X:00060", -1) is None # ttl already elapsed
|
||||
|
||||
|
||||
# ── adapter _enrich_event orchestration ─────────────────────────────────────
|
||||
|
||||
def _make_event():
|
||||
return Event(
|
||||
id="USGS-06903700:00060:2026-01-01T00:00:00+00:00",
|
||||
adapter="nwis",
|
||||
category="hydro.00060.usgs.06903700",
|
||||
time=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||
severity=0,
|
||||
geo=Geo(centroid=(-93.19, 40.80)),
|
||||
data={
|
||||
"monitoring_location_id": "USGS-06903700",
|
||||
"parameter_code": "00060",
|
||||
"value": 120.0,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _adapter_with_fetch(fetch, cache=None):
|
||||
a = object.__new__(NWISAdapter)
|
||||
a._enrich_cache = cache
|
||||
a._fetch = fetch # shadows the @retry method
|
||||
return a
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enrich_event_populates_bundles_and_severity(tmp_path):
|
||||
async def fetch(url):
|
||||
if "monitoring-locations" in url:
|
||||
return SITE_JSON
|
||||
if "/stat/" in url or "statReportType" in url:
|
||||
return STATS_RDB
|
||||
raise AssertionError(url)
|
||||
|
||||
a = _adapter_with_fetch(fetch, cache=ne.SiteStatsCache(tmp_path / "c.db"))
|
||||
ev = _make_event()
|
||||
sev = await a._enrich_event(ev)
|
||||
enr = ev.data["_enriched"]
|
||||
assert enr["usgs_site"]["name"].startswith("South Fork Chariton")
|
||||
assert enr["usgs_site"]["state"] == "Iowa"
|
||||
assert enr["usgs_stats"]["class_label"] == "much above normal"
|
||||
assert enr["usgs_stats"]["severity_band"] == 3
|
||||
assert sev == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enrich_event_graceful_nulls_when_usgs_down(tmp_path):
|
||||
async def fetch(url):
|
||||
raise TimeoutError("USGS down")
|
||||
|
||||
a = _adapter_with_fetch(fetch, cache=ne.SiteStatsCache(tmp_path / "c.db"))
|
||||
ev = _make_event()
|
||||
sev = await a._enrich_event(ev)
|
||||
enr = ev.data["_enriched"]
|
||||
assert enr["usgs_site"] == ne.site_null_bundle()
|
||||
assert enr["usgs_stats"]["value"] == 120.0 # value still echoed
|
||||
assert enr["usgs_stats"]["class_label"] is None
|
||||
assert sev is None # no stats -> unknown severity
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enrich_event_cache_hit_avoids_refetch(tmp_path):
|
||||
calls = {"n": 0}
|
||||
|
||||
async def fetch(url):
|
||||
calls["n"] += 1
|
||||
return SITE_JSON if "monitoring-locations" in url else STATS_RDB
|
||||
|
||||
cache = ne.SiteStatsCache(tmp_path / "c.db")
|
||||
a = _adapter_with_fetch(fetch, cache=cache)
|
||||
await a._enrich_event(_make_event())
|
||||
first = calls["n"]
|
||||
assert first == 2 # one site + one stats fetch
|
||||
await a._enrich_event(_make_event()) # same site+param -> both cached
|
||||
assert calls["n"] == first # no additional USGS calls
|
||||
|
||||
|
||||
# ── L-c summary template rendering per band ─────────────────────────────────
|
||||
|
||||
_TEMPLATES_DIR = Path(__file__).resolve().parents[1] / "src/central/gui/templates"
|
||||
|
||||
|
||||
def _render_summary(stats=None, site=None, value=120.0, unit="ft3/s"):
|
||||
src = (_TEMPLATES_DIR / "_event_summaries/nwis.html").read_text()
|
||||
tmpl = jinja2.Environment(autoescape=True).from_string(src)
|
||||
inner = {"value": value, "unit_of_measure": unit}
|
||||
enriched = {}
|
||||
if site is not None:
|
||||
enriched["usgs_site"] = site
|
||||
if stats is not None:
|
||||
enriched["usgs_stats"] = stats
|
||||
if enriched:
|
||||
inner["_enriched"] = enriched
|
||||
event = type("E", (), {"data": {"data": {"data": inner}}})()
|
||||
return tmpl.render(event=event).strip()
|
||||
|
||||
|
||||
def test_summary_full_band_and_percentile():
|
||||
out = _render_summary(
|
||||
site={"name": "South Fork Grand River near Gallatin, MO"},
|
||||
stats={"class_label": "below normal", "percentile": 18},
|
||||
)
|
||||
assert "South Fork Grand River near Gallatin, MO — 120.0 ft3/s" in out
|
||||
assert "(below normal, 18th percentile)" in out
|
||||
|
||||
|
||||
def test_summary_site_no_stats():
|
||||
out = _render_summary(site={"name": "Some Creek near Town, IA"}, stats=None)
|
||||
assert out == "Some Creek near Town, IA — 120.0 ft3/s"
|
||||
|
||||
|
||||
def test_summary_no_enrichment_falls_back():
|
||||
out = _render_summary(site=None, stats=None)
|
||||
assert out == "Water reading: 120.0 ft3/s"
|
||||
Loading…
Add table
Add a link
Reference in a new issue