From 8612f0b75d70b4f8806de34dcdd95e270609a814 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Mon, 25 May 2026 15:30:19 +0000 Subject: [PATCH] =?UTF-8?q?feat(nwis):=20site=20+=20stats=20enrichment=20?= =?UTF-8?q?=E2=80=94=20named=20location=20+=20WaterWatch=20normalcy=20band?= =?UTF-8?q?=20(v0.8.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Opens the v0.8.x data-quality cleanup arc. Production code; central-gui AND central-supervisor restart (adapter contract + enrichment behavior change). NWIS events rendered as a bare "Water reading: 111 ft3/s" with an empty Location column -- an operator couldn't tell where the gauge is or whether 111 ft3/s is drought-low, normal, or near-flood. Coordinates were present but the reverse geocoder returns null city/state/county for rural gauge points, and USGS site + percentile data was never fetched. v0.8.0 fetches both. Approach B (adapter-owned, per the proposal decision): the NWIS adapter -- which already owns the USGS APIs -- fetches site metadata and daily stats itself and writes two provenance bundles under event.data["_enriched"]: - usgs_site {name, lat, lon, state, county} from the OGC monitoring-locations item-by-id (the API family the adapter already speaks; JSON, no RDB parser). - usgs_stats {value, percentile, class_label, severity_band, p10..p90, record_max, count, period} from the legacy RDB daily-statistics service (the OGC API has no stats endpoint). USGS percentiles are % of days at-or-below, so higher = higher flow; classified to the WaterWatch bands -> severity 0-4 (record=4, much above/below=3, above/below=2, normal=1; None reserved for "no stats", distinct from a normal-flow gauge). Severity is set on the event, so it drives the v0.7.1 severity chip-picker filter + v0.7.2 map-marker opacity. - new nwis_enrich.py: pure parse/classify/percentile/band helpers + a sqlite SiteStatsCache (site TTL 365d, stats TTL 90d -- one fetch per site+param serves every reading for the window, so a warm cache makes zero USGS calls). USGS down -> cached-if-present else all-null bundle; the event still publishes. Framework: the single agreed generic change -- supervisor apply_enrichment now MERGES into _enriched instead of overwriting, so the still-global geocoder phase doesn't clobber the adapter's bundles. No other adapter writes _enriched, so this is inert for them. GUI: _event_summaries/nwis.html -> " -- (, percentile)", with graceful fallback to " -- " then the bare "Water reading:". _event_rows/nwis.html detail gains site/normalcy/typical/location rows. _events_rows.html Location column falls back generically to any _enriched. carrying state/county when the geocoder is null (works for future enrichers). events.json contract unchanged (additions under _enriched only). conftest isolate_enrichment_cache also redirects NWIS_CACHE_DB_PATH off the prod path (unprivileged-user test isolation). Adds tests/test_nwis_enrichment.py (28 tests: parse, band edges incl P0/P9/P10/P75/P90/record, percentile interpolation, cache hit/miss/expire, adapter enrich + graceful-null + cache-hit-no-refetch, summary rendering per band). Full suite: 710 passed, 1 skipped (central and unprivileged zvx, 3x each). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/central/adapters/nwis.py | 99 ++++++ src/central/adapters/nwis_enrich.py | 305 ++++++++++++++++++ .../gui/templates/_event_rows/nwis.html | 11 +- .../gui/templates/_event_summaries/nwis.html | 15 +- src/central/gui/templates/_events_rows.html | 24 +- src/central/supervisor.py | 19 +- tests/conftest.py | 10 + tests/test_nwis.py | 5 + tests/test_nwis_enrichment.py | 260 +++++++++++++++ 9 files changed, 729 insertions(+), 19 deletions(-) create mode 100644 src/central/adapters/nwis_enrich.py create mode 100644 tests/test_nwis_enrichment.py diff --git a/src/central/adapters/nwis.py b/src/central/adapters/nwis.py index 5a7db62..431c4bb 100644 --- a/src/central/adapters/nwis.py +++ b/src/central/adapters/nwis.py @@ -19,6 +19,12 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from central.adapters import nwis_enrich +from central.adapters.nwis_enrich import ( + USGS_SITE_TTL_S, + USGS_STATS_TTL_S, + SiteStatsCache, +) from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -31,6 +37,13 @@ NWIS_LATEST_CONTINUOUS_URL = ( NWIS_MONITORING_LOCATIONS_URL = ( "https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items" ) +# v0.8.0 enrichment endpoints: site metadata via OGC item-by-id; daily stats via +# the legacy RDB stat service (the OGC API exposes no statistics endpoint). +NWIS_SITE_ITEM_URL = NWIS_MONITORING_LOCATIONS_URL +NWIS_STATS_URL = "https://waterservices.usgs.gov/nwis/stat/" +# Site/stats enrichment cache (monkeypatched off the prod path in tests, like +# the supervisor's ENRICHMENT_CACHE_DB_PATH). +NWIS_CACHE_DB_PATH = Path("/var/lib/central/nwis_cache.db") # Per-render cap for the settings-driven preview (PR G.5). Keep small so the # /adapters/ edit page renders quickly. _PREVIEW_LIMIT = 50 @@ -140,6 +153,7 @@ class NWISAdapter(SourceAdapter): self._cursor_db_path = cursor_db_path self._session: aiohttp.ClientSession | None = None self._db: sqlite3.Connection | None = None + self._enrich_cache: SiteStatsCache | None = None self.parameter_codes: list[str] = list( config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES) ) @@ -167,6 +181,7 @@ class NWISAdapter(SourceAdapter): ON published_ids (last_seen) """) self._db.commit() + self._enrich_cache = SiteStatsCache(NWIS_CACHE_DB_PATH) if self.region is None: logger.warning( "NWIS started without region bbox — upstream will return CONUS-wide records on every poll. " @@ -313,6 +328,12 @@ class NWISAdapter(SourceAdapter): ) if self.is_published(dedup_key): continue + # Site + stats enrichment (v0.8.0) on new events only. Sets + # _enriched.usgs_site / usgs_stats in event.data and derives + # severity from the WaterWatch band (None when no stats). + severity = await self._enrich_event(event) + if severity != event.severity: + event = event.model_copy(update={"severity": severity}) yield event self.mark_published(dedup_key) events_yielded += 1 @@ -394,6 +415,84 @@ class NWISAdapter(SourceAdapter): data=data, ) + async def _site_bundle(self, site_id: str) -> dict[str, Any]: + """usgs_site bundle from the OGC monitoring-locations item. Cache-first; + all-null (never raises) on lookup failure so the event still publishes.""" + if self._enrich_cache is not None: + cached = await self._enrich_cache.get("site", site_id, USGS_SITE_TTL_S) + if cached is not None: + return cached + try: + text = await self._fetch(f"{NWIS_SITE_ITEM_URL}/{site_id}?f=json") + bundle = nwis_enrich.parse_site_feature(json.loads(text)) + except Exception as e: + logger.warning( + "NWIS site enrichment failed", + extra={"site": site_id, "error": str(e)}, + ) + return nwis_enrich.site_null_bundle() + if self._enrich_cache is not None: + await self._enrich_cache.set("site", site_id, bundle) + return bundle + + async def _stats_bundle( + self, + site_id: str, + bare_site_no: str, + parameter_code: str, + value: float | None, + event_time: datetime, + ) -> dict[str, Any]: + """usgs_stats bundle from the legacy RDB daily-percentile service. + + Caches the parsed day-of-year table per (site, parameter_code) so a + single fetch classifies every reading at that site for the TTL window. + All-null (value echoed; never raises) on failure / no data. + """ + key = f"{site_id}:{parameter_code}" + table = None + if self._enrich_cache is not None: + table = await self._enrich_cache.get("stats", key, USGS_STATS_TTL_S) + if table is None: + params = { + "sites": bare_site_no, + "statReportType": "daily", + "statTypeCd": "P10,P25,P50,P75,P90,max", + "parameterCd": parameter_code, + "format": "rdb", + } + try: + text = await self._fetch(f"{NWIS_STATS_URL}?{urlencode(params)}") + table = nwis_enrich.parse_stats_rdb(text) + except Exception as e: + logger.warning( + "NWIS stats enrichment failed", + extra={"site": site_id, "parameter_code": parameter_code, "error": str(e)}, + ) + return {**nwis_enrich.stats_null_bundle(), "value": value} + if self._enrich_cache is not None: + await self._enrich_cache.set("stats", key, table) + return nwis_enrich.build_stats_bundle( + value, table, event_time.month, event_time.day + ) + + async def _enrich_event(self, event: Event) -> int | None: + """Attach _enriched.usgs_site + _enriched.usgs_stats in place; return the + stats-derived severity (0-4, or None when no usable stats).""" + data = event.data + site_id = data.get("monitoring_location_id") + if not site_id: + return event.severity + _agency, bare_site_no = _subject_tokens_for_id(site_id) + site = await self._site_bundle(site_id) + stats = await self._stats_bundle( + site_id, bare_site_no, data.get("parameter_code"), data.get("value"), event.time + ) + enriched = data.setdefault("_enriched", {}) + enriched["usgs_site"] = site + enriched["usgs_stats"] = stats + return stats.get("severity_band") + async def _fetch_preview_text(self, url: str) -> str: """One-shot GET for the preview render. diff --git a/src/central/adapters/nwis_enrich.py b/src/central/adapters/nwis_enrich.py new file mode 100644 index 0000000..c528feb --- /dev/null +++ b/src/central/adapters/nwis_enrich.py @@ -0,0 +1,305 @@ +"""USGS site + stats enrichment helpers for the NWIS adapter (v0.8.0). + +NWIS-specific (Approach B — the adapter owns its USGS enrichment), producing the +``_enriched.usgs_site`` and ``_enriched.usgs_stats`` bundles. This module holds +the pure parse/classify functions plus a small sqlite cache; the adapter wires +them in (see nwis.py). + +- Site metadata: OGC monitoring-locations item-by-id (JSON), same API family the + adapter already speaks. +- Daily stats: the legacy waterservices RDB ``stat`` service — the OGC API has no + statistics endpoint. + +USGS percentiles are "percent of days at or below this value", so a HIGHER +percentile means HIGHER flow. WaterWatch bands map to a 0-4 severity (None is +reserved for "no stats available", which is distinct from a normal-flow gauge): + + value > historical daily max -> record high severity 4 + value > P90 -> much above normal severity 3 + P75 < value <= P90 -> above normal severity 2 + P25 <= value <= P75 -> normal severity 1 + P10 <= value < P25 -> below normal severity 2 + value < P10 -> much below normal severity 3 + (no usable thresholds) -> None severity None +""" + +import asyncio +import json +import logging +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# TTLs: site metadata is near-static; the daily-percentile table drifts slowly +# (USGS recomputes period-of-record stats infrequently), so one fetch per +# site+parameter covers the whole year of day-of-year rows for a season. +USGS_SITE_TTL_S = 365 * 86400 +USGS_STATS_TTL_S = 90 * 86400 + +SITE_FIELDS: tuple[str, ...] = ("name", "lat", "lon", "state", "county") +STATS_FIELDS: tuple[str, ...] = ( + "value", "percentile", "class_label", "severity_band", + "p10", "p25", "p50", "p75", "p90", "record_max", "count", "period", +) + +# WaterWatch band -> severity (0-4). None is NOT in here: it means "no stats". +SEVERITY_BY_BAND: dict[str, int] = { + "record high": 4, + "much above normal": 3, + "above normal": 2, + "normal": 1, + "below normal": 2, + "much below normal": 3, +} + + +def site_null_bundle() -> dict[str, Any]: + return {f: None for f in SITE_FIELDS} + + +def stats_null_bundle() -> dict[str, Any]: + return {f: None for f in STATS_FIELDS} + + +def parse_site_feature(feature: dict) -> dict[str, Any]: + """OGC monitoring-locations Feature -> usgs_site bundle (all-null on bad shape).""" + if not isinstance(feature, dict): + return site_null_bundle() + props = feature.get("properties") or {} + geom = feature.get("geometry") or {} + coords = geom.get("coordinates") if isinstance(geom, dict) else None + lat = lon = None + if ( + isinstance(coords, list) + and len(coords) == 2 + and all(isinstance(c, (int, float)) for c in coords) + ): + lon, lat = float(coords[0]), float(coords[1]) # GeoJSON (lon, lat) + return { + "name": props.get("monitoring_location_name"), + "lat": lat, + "lon": lon, + "state": props.get("state_name"), + "county": props.get("county_name"), + } + + +def _num(cols: list[str], idx: dict[str, int], key: str) -> float | None: + i = idx.get(key) + if i is None or i >= len(cols): + return None + raw = cols[i].strip() + if raw == "": + return None + try: + return float(raw) + except ValueError: + return None + + +def parse_stats_rdb(text: str) -> dict[str, dict[str, Any]]: + """Parse the daily-statistics RDB into a per-day threshold table. + + Returns ``{"-": {p10, p25, p50, p75, p90, max, count, + begin_yr, end_yr}}`` with blank/missing numeric cells as None. Keys are + JSON-friendly strings so the table caches directly. ``{}`` on bad input. + Column positions are read from the RDB header row (robust to USGS column + reordering); the line after the header is the RDB format row and is skipped. + """ + lines = [ln for ln in text.splitlines() if ln and not ln.startswith("#")] + if len(lines) < 3: + return {} + header = lines[0].split("\t") + idx = {name: i for i, name in enumerate(header)} + if "month_nu" not in idx or "day_nu" not in idx: + return {} + table: dict[str, dict[str, Any]] = {} + for ln in lines[2:]: # lines[1] is the "5s 15s ..." format row + cols = ln.split("\t") + month = _num(cols, idx, "month_nu") + day = _num(cols, idx, "day_nu") + if month is None or day is None: + continue + count = _num(cols, idx, "count_nu") + table[f"{int(month)}-{int(day)}"] = { + "p10": _num(cols, idx, "p10_va"), + "p25": _num(cols, idx, "p25_va"), + "p50": _num(cols, idx, "p50_va"), + "p75": _num(cols, idx, "p75_va"), + "p90": _num(cols, idx, "p90_va"), + "max": _num(cols, idx, "max_va"), + "count": int(count) if count is not None else None, + "begin_yr": _num(cols, idx, "begin_yr"), + "end_yr": _num(cols, idx, "end_yr"), + } + return table + + +def percentile_of(value: float, day: dict[str, Any]) -> int | None: + """Interpolate the value's approximate percentile from a day's thresholds. + + Piecewise-linear over the available (percentile, threshold) points, with an + implicit (0th, 0.0) lower bound (flow/stage are non-negative) and a (100th, + max) upper bound when the daily max is known. None when fewer than two + usable points exist. + """ + pts: list[tuple[float, float]] = [(0.0, 0.0)] + for pct, key in ((10, "p10"), (25, "p25"), (50, "p50"), (75, "p75"), (90, "p90")): + v = day.get(key) + if v is not None: + pts.append((float(pct), float(v))) + mx = day.get("max") + if mx is not None: + pts.append((100.0, float(mx))) + pts = sorted(set(pts), key=lambda t: t[1]) + if len(pts) < 2: + return None + if value <= pts[0][1]: + return int(round(pts[0][0])) + if value >= pts[-1][1]: + return int(round(pts[-1][0])) + for i in range(1, len(pts)): + p0, v0 = pts[i - 1] + p1, v1 = pts[i] + if v0 <= value <= v1: + if v1 == v0: + return int(round(p1)) + return int(round(p0 + (p1 - p0) * (value - v0) / (v1 - v0))) + return None + + +def classify(value: float | None, day: dict[str, Any]) -> tuple[str | None, int | None, int | None]: + """Classify a value against a day's thresholds -> (class_label, percentile, severity). + + Best-effort when some thresholds are missing (e.g. P90 blank -> the top + reachable band without a max is 'above normal'). Returns all-None when no + threshold lets us place the value at all. + """ + if value is None: + return (None, None, None) + p10, p25, p75, p90, mx = ( + day.get("p10"), day.get("p25"), day.get("p75"), day.get("p90"), day.get("max"), + ) + label: str | None = None + if mx is not None and value > mx: + label = "record high" + elif p90 is not None and value > p90: + label = "much above normal" + elif p75 is not None and value > p75: + label = "above normal" + elif p25 is not None and value >= p25: + label = "normal" + elif p10 is not None and value >= p10: + label = "below normal" + elif p10 is not None and value < p10: + label = "much below normal" + if label is None: + return (None, percentile_of(value, day), None) + return (label, percentile_of(value, day), SEVERITY_BY_BAND.get(label)) + + +def build_stats_bundle(value: float | None, table: dict[str, dict[str, Any]], month: int, day: int) -> dict[str, Any]: + """Assemble the usgs_stats bundle for one reading from a parsed day-table. + + The reading's ``value`` is always echoed (useful even with no thresholds); + thresholds/classification fill in when the matching day-of-year row exists. + """ + bundle = stats_null_bundle() + bundle["value"] = value + row = table.get(f"{month}-{day}") if table else None + if not row: + return bundle + for k in ("p10", "p25", "p50", "p75", "p90"): + bundle[k] = row.get(k) + bundle["record_max"] = row.get("max") + bundle["count"] = row.get("count") + by, ey = row.get("begin_yr"), row.get("end_yr") + bundle["period"] = f"{int(by)}–{int(ey)}" if by and ey else None + label, pct, sev = classify(value, row) + bundle["class_label"] = label + bundle["percentile"] = pct + bundle["severity_band"] = sev + return bundle + + +class SiteStatsCache: + """Thread-offloaded sqlite cache for NWIS site bundles + stats day-tables. + + Keyed by (kind, key): kind 'site' key=monitoring_location_id (TTL ~1yr), + kind 'stats' key=':' (TTL ~90d, stores the whole + parsed day-of-year table so one fetch serves every reading at that site). + Mirrors the EnrichmentCache pattern (fresh connection per op, ttl on read). + """ + + _SCHEMA = """ + CREATE TABLE IF NOT EXISTS nwis_cache ( + kind TEXT NOT NULL, + key TEXT NOT NULL, + payload_json TEXT NOT NULL, + cached_at TEXT NOT NULL, + PRIMARY KEY (kind, key) + ) + """ + + def __init__(self, db_path: str | Path) -> None: + self._db_path = Path(db_path) + self._db_path.parent.mkdir(parents=True, exist_ok=True) + conn = self._connect() + try: + conn.execute(self._SCHEMA) + conn.commit() + finally: + conn.close() + + def _connect(self) -> sqlite3.Connection: + return sqlite3.connect(self._db_path, timeout=30) + + def _get_sync(self, kind: str, key: str, ttl_s: int) -> Any | None: + conn = self._connect() + try: + cur = conn.execute( + "SELECT payload_json, cached_at FROM nwis_cache WHERE kind = ? AND key = ?", + (kind, key), + ) + row = cur.fetchone() + finally: + conn.close() + if row is None: + return None + payload_json, cached_at_iso = row + try: + cached_at = datetime.fromisoformat(cached_at_iso) + except ValueError: + return None + if cached_at.tzinfo is None: + cached_at = cached_at.replace(tzinfo=timezone.utc) + if (datetime.now(timezone.utc) - cached_at).total_seconds() > ttl_s: + return None + return json.loads(payload_json) + + def _set_sync(self, kind: str, key: str, payload: Any) -> None: + now_iso = datetime.now(timezone.utc).isoformat() + conn = self._connect() + try: + conn.execute( + """ + INSERT INTO nwis_cache (kind, key, payload_json, cached_at) + VALUES (?, ?, ?, ?) + ON CONFLICT (kind, key) DO UPDATE SET + payload_json = excluded.payload_json, + cached_at = excluded.cached_at + """, + (kind, key, json.dumps(payload), now_iso), + ) + conn.commit() + finally: + conn.close() + + async def get(self, kind: str, key: str, ttl_s: int) -> Any | None: + return await asyncio.to_thread(self._get_sync, kind, key, ttl_s) + + async def set(self, kind: str, key: str, payload: Any) -> None: + await asyncio.to_thread(self._set_sync, kind, key, payload) diff --git a/src/central/gui/templates/_event_rows/nwis.html b/src/central/gui/templates/_event_rows/nwis.html index bd6d1c5..79849b0 100644 --- a/src/central/gui/templates/_event_rows/nwis.html +++ b/src/central/gui/templates/_event_rows/nwis.html @@ -1,6 +1,13 @@ -{# USGS NWIS water observations. Fields from payload->data->data. #} +{# USGS NWIS water observations + v0.8.0 site/stats enrichment. payload->data->data. #} {% set d = (event.data.get('data') or {}).get('data') or {} %} +{% set enr = d.get('_enriched') or {} %} +{% set site = enr.get('usgs_site') or {} %} +{% set st = enr.get('usgs_stats') or {} %} +{% if site.get('name') %}
Site name
{{ site.name }}
{% endif %} {% if d.get('parameter_code') is not none %}
Parameter
{{ d.parameter_code }}
{% endif %} {% if d.get('value') is not none %}
Value
{{ d.value }} {{ d.get('unit_of_measure', '') }}
{% endif %} -{% if d.get('monitoring_location_id') is not none %}
Site
{{ d.monitoring_location_id }}
{% endif %} +{% if st.get('class_label') %}
Normalcy
{{ st.class_label }}{% if st.get('percentile') is not none %} (~{{ st.percentile }} percentile){% endif %}
{% endif %} +{% if st.get('p50') is not none %}
Typical (this day)
median {{ st.p50 }} {{ d.get('unit_of_measure', '') }}{% if st.get('period') %} · {{ st.period }}{% if st.get('count') %} ({{ st.count }} yrs){% endif %}{% endif %}
{% endif %} +{% if site.get('county') or site.get('state') %}
Location
{{ [site.county, site.state] | select | join(', ') }}
{% endif %} +{% if d.get('monitoring_location_id') is not none %}
Site ID
{{ d.monitoring_location_id }}
{% endif %} {% if d.get('statistic_id') is not none %}
Statistic
{{ d.statistic_id }}
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/nwis.html b/src/central/gui/templates/_event_summaries/nwis.html index f628d04..4d0180b 100644 --- a/src/central/gui/templates/_event_summaries/nwis.html +++ b/src/central/gui/templates/_event_summaries/nwis.html @@ -1,2 +1,15 @@ +{# USGS NWIS one-line summary (v0.8.0). Prefer site name + value + WaterWatch + band/percentile; fall back to "" when stats are absent but the + site is named; finally to the bare "Water reading: " so we never regress. #} {% set d = (event.data.get('data') or {}).get('data') or {} %} -{%- if d.get('value') is not none %}Water reading: {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}{% endif -%} +{% set enr = d.get('_enriched') or {} %} +{% set site = enr.get('usgs_site') or {} %} +{% set st = enr.get('usgs_stats') or {} %} +{%- set p = st.get('percentile') -%} +{%- set suf = (('th' if (p % 100) in [11, 12, 13] else {1: 'st', 2: 'nd', 3: 'rd'}.get(p % 10, 'th')) if p is not none else '') -%} +{%- if site.get('name') and d.get('value') is not none -%} +{{ site.name }} — {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %} +{%- if st.get('class_label') %} ({{ st.class_label }}{% if p is not none %}, {{ p }}{{ suf }} percentile{% endif %}){% endif -%} +{%- elif d.get('value') is not none -%} +Water reading: {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %} +{%- endif -%} diff --git a/src/central/gui/templates/_events_rows.html b/src/central/gui/templates/_events_rows.html index 0ff917b..346fcea 100644 --- a/src/central/gui/templates/_events_rows.html +++ b/src/central/gui/templates/_events_rows.html @@ -20,14 +20,24 @@ fallback (no hardcoded list). Captured once so it serves both the Subject cell and the map popup (via data-subject). #} {% set subject_summary %}{% include ["_event_summaries/" ~ event.adapter ~ ".html", "_event_summaries/_default.html"] %}{% endset %} - {# Location: generic _enriched.geocoder reader, then top-level named - fields, then coordinates. No adapter-specific logic. #} + {# Location: geocoder-first, then top-level named fields. When the + geocoder resolved no named place, fall back generically to ANY other + _enriched. carrying county/state (e.g. usgs_site, v0.8.0) so + the pattern works for future enrichers without per-adapter logic. #} {% set d = (event.data.get('data') or {}).get('data') or {} %} - {% set gc = (d.get('_enriched') or {}).get('geocoder') or {} %} - {% set loc_local = gc.get('city') or d.get('city') or gc.get('county') or d.get('county') %} - {% set loc_state = gc.get('state') or d.get('state') %} - {% set loc_country = gc.get('country') or d.get('country') %} - {% set loc_parts = [loc_local, loc_state, loc_country] | select | list %} + {% set enr = d.get('_enriched') or {} %} + {% set gc = enr.get('geocoder') or {} %} + {% set ns = namespace( + local=(gc.get('city') or gc.get('county') or d.get('city') or d.get('county')), + state=(gc.get('state') or d.get('state')), + country=(gc.get('country') or d.get('country'))) %} + {% if not (ns.local or ns.state) %} + {% for _src, b in enr.items() if _src != 'geocoder' and b is mapping %} + {% if not ns.local %}{% set ns.local = b.get('county') or b.get('city') %}{% endif %} + {% if not ns.state %}{% set ns.state = b.get('state') %}{% endif %} + {% endfor %} + {% endif %} + {% set loc_parts = [ns.local, ns.state, ns.country] | select | list %} P75) + (80.0, "above normal", 2), # P75..P90 + (90.0, "above normal", 2), # == P90 (not > P90) + (120.0, "much above normal", 3), # > P90, <= max + (250.0, "record high", 4), # > historical max + ], +) +def test_classify_bands(value, label, severity): + lbl, pct, sev = ne.classify(value, DAY) + assert lbl == label + assert sev == severity + assert ne.SEVERITY_BY_BAND.get(label) == severity + + +def test_classify_no_thresholds_is_none(): + assert ne.classify(42.0, {}) == (None, None, None) + + +def test_classify_none_value(): + assert ne.classify(None, DAY) == (None, None, None) + + +def test_percentile_interpolates_and_bounds(): + assert ne.percentile_of(50.0, DAY) == 50 # on the P50 point + assert ne.percentile_of(0.0, DAY) == 0 # lower bound + assert ne.percentile_of(200.0, DAY) == 100 # at max + mid = ne.percentile_of(17.5, DAY) # between P10(10) and P25(25) + assert 10 < mid < 25 + + +# ── build_stats_bundle ────────────────────────────────────────────────────── + +def test_build_stats_bundle_classifies_matching_day(): + table = ne.parse_stats_rdb(STATS_RDB) + b = ne.build_stats_bundle(120.0, table, 1, 1) + assert b["value"] == 120.0 + assert b["class_label"] == "much above normal" + assert b["severity_band"] == 3 + assert b["p50"] == 50.0 and b["record_max"] == 315.0 + assert b["period"] == "1968–2026" and b["count"] == 59 + + +def test_build_stats_bundle_no_matching_day_echoes_value_only(): + table = ne.parse_stats_rdb(STATS_RDB) + b = ne.build_stats_bundle(120.0, table, 7, 4) # no 7-4 row + assert b["value"] == 120.0 + assert b["class_label"] is None and b["severity_band"] is None + + +# ── SiteStatsCache: miss / hit / expire ───────────────────────────────────── + +@pytest.mark.asyncio +async def test_cache_miss_then_hit(tmp_path): + cache = ne.SiteStatsCache(tmp_path / "nwis_cache.db") + assert await cache.get("site", "X", 100) is None + await cache.set("site", "X", {"name": "Gauge"}) + assert (await cache.get("site", "X", 100))["name"] == "Gauge" + + +@pytest.mark.asyncio +async def test_cache_expired(tmp_path): + cache = ne.SiteStatsCache(tmp_path / "nwis_cache.db") + await cache.set("stats", "X:00060", {"p10": 1}) + assert await cache.get("stats", "X:00060", -1) is None # ttl already elapsed + + +# ── adapter _enrich_event orchestration ───────────────────────────────────── + +def _make_event(): + return Event( + id="USGS-06903700:00060:2026-01-01T00:00:00+00:00", + adapter="nwis", + category="hydro.00060.usgs.06903700", + time=datetime(2026, 1, 1, tzinfo=timezone.utc), + severity=0, + geo=Geo(centroid=(-93.19, 40.80)), + data={ + "monitoring_location_id": "USGS-06903700", + "parameter_code": "00060", + "value": 120.0, + }, + ) + + +def _adapter_with_fetch(fetch, cache=None): + a = object.__new__(NWISAdapter) + a._enrich_cache = cache + a._fetch = fetch # shadows the @retry method + return a + + +@pytest.mark.asyncio +async def test_enrich_event_populates_bundles_and_severity(tmp_path): + async def fetch(url): + if "monitoring-locations" in url: + return SITE_JSON + if "/stat/" in url or "statReportType" in url: + return STATS_RDB + raise AssertionError(url) + + a = _adapter_with_fetch(fetch, cache=ne.SiteStatsCache(tmp_path / "c.db")) + ev = _make_event() + sev = await a._enrich_event(ev) + enr = ev.data["_enriched"] + assert enr["usgs_site"]["name"].startswith("South Fork Chariton") + assert enr["usgs_site"]["state"] == "Iowa" + assert enr["usgs_stats"]["class_label"] == "much above normal" + assert enr["usgs_stats"]["severity_band"] == 3 + assert sev == 3 + + +@pytest.mark.asyncio +async def test_enrich_event_graceful_nulls_when_usgs_down(tmp_path): + async def fetch(url): + raise TimeoutError("USGS down") + + a = _adapter_with_fetch(fetch, cache=ne.SiteStatsCache(tmp_path / "c.db")) + ev = _make_event() + sev = await a._enrich_event(ev) + enr = ev.data["_enriched"] + assert enr["usgs_site"] == ne.site_null_bundle() + assert enr["usgs_stats"]["value"] == 120.0 # value still echoed + assert enr["usgs_stats"]["class_label"] is None + assert sev is None # no stats -> unknown severity + + +@pytest.mark.asyncio +async def test_enrich_event_cache_hit_avoids_refetch(tmp_path): + calls = {"n": 0} + + async def fetch(url): + calls["n"] += 1 + return SITE_JSON if "monitoring-locations" in url else STATS_RDB + + cache = ne.SiteStatsCache(tmp_path / "c.db") + a = _adapter_with_fetch(fetch, cache=cache) + await a._enrich_event(_make_event()) + first = calls["n"] + assert first == 2 # one site + one stats fetch + await a._enrich_event(_make_event()) # same site+param -> both cached + assert calls["n"] == first # no additional USGS calls + + +# ── L-c summary template rendering per band ───────────────────────────────── + +_TEMPLATES_DIR = Path(__file__).resolve().parents[1] / "src/central/gui/templates" + + +def _render_summary(stats=None, site=None, value=120.0, unit="ft3/s"): + src = (_TEMPLATES_DIR / "_event_summaries/nwis.html").read_text() + tmpl = jinja2.Environment(autoescape=True).from_string(src) + inner = {"value": value, "unit_of_measure": unit} + enriched = {} + if site is not None: + enriched["usgs_site"] = site + if stats is not None: + enriched["usgs_stats"] = stats + if enriched: + inner["_enriched"] = enriched + event = type("E", (), {"data": {"data": {"data": inner}}})() + return tmpl.render(event=event).strip() + + +def test_summary_full_band_and_percentile(): + out = _render_summary( + site={"name": "South Fork Grand River near Gallatin, MO"}, + stats={"class_label": "below normal", "percentile": 18}, + ) + assert "South Fork Grand River near Gallatin, MO — 120.0 ft3/s" in out + assert "(below normal, 18th percentile)" in out + + +def test_summary_site_no_stats(): + out = _render_summary(site={"name": "Some Creek near Town, IA"}, stats=None) + assert out == "Some Creek near Town, IA — 120.0 ft3/s" + + +def test_summary_no_enrichment_falls_back(): + out = _render_summary(site=None, stats=None) + assert out == "Water reading: 120.0 ft3/s"