Merge pull request #59 from zvx-echo6/feat/nwis-site-stats-enricher

feat(nwis): site + stats enrichment — named location + WaterWatch normalcy band (v0.8.0)
This commit is contained in:
malice 2026-05-25 09:35:43 -06:00 committed by GitHub
commit de464a08e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 729 additions and 19 deletions

View file

@ -19,6 +19,12 @@ from tenacity import (
) )
from central.adapter import SourceAdapter from central.adapter import SourceAdapter
from central.adapters import nwis_enrich
from central.adapters.nwis_enrich import (
USGS_SITE_TTL_S,
USGS_STATS_TTL_S,
SiteStatsCache,
)
from central.config_models import AdapterConfig, RegionConfig from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.models import Event, Geo from central.models import Event, Geo
@ -31,6 +37,13 @@ NWIS_LATEST_CONTINUOUS_URL = (
NWIS_MONITORING_LOCATIONS_URL = ( NWIS_MONITORING_LOCATIONS_URL = (
"https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items" "https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items"
) )
# v0.8.0 enrichment endpoints: site metadata via OGC item-by-id; daily stats via
# the legacy RDB stat service (the OGC API exposes no statistics endpoint).
NWIS_SITE_ITEM_URL = NWIS_MONITORING_LOCATIONS_URL
NWIS_STATS_URL = "https://waterservices.usgs.gov/nwis/stat/"
# Site/stats enrichment cache (monkeypatched off the prod path in tests, like
# the supervisor's ENRICHMENT_CACHE_DB_PATH).
NWIS_CACHE_DB_PATH = Path("/var/lib/central/nwis_cache.db")
# Per-render cap for the settings-driven preview (PR G.5). Keep small so the # Per-render cap for the settings-driven preview (PR G.5). Keep small so the
# /adapters/<name> edit page renders quickly. # /adapters/<name> edit page renders quickly.
_PREVIEW_LIMIT = 50 _PREVIEW_LIMIT = 50
@ -140,6 +153,7 @@ class NWISAdapter(SourceAdapter):
self._cursor_db_path = cursor_db_path self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None self._db: sqlite3.Connection | None = None
self._enrich_cache: SiteStatsCache | None = None
self.parameter_codes: list[str] = list( self.parameter_codes: list[str] = list(
config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES) config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES)
) )
@ -167,6 +181,7 @@ class NWISAdapter(SourceAdapter):
ON published_ids (last_seen) ON published_ids (last_seen)
""") """)
self._db.commit() self._db.commit()
self._enrich_cache = SiteStatsCache(NWIS_CACHE_DB_PATH)
if self.region is None: if self.region is None:
logger.warning( logger.warning(
"NWIS started without region bbox — upstream will return CONUS-wide records on every poll. " "NWIS started without region bbox — upstream will return CONUS-wide records on every poll. "
@ -313,6 +328,12 @@ class NWISAdapter(SourceAdapter):
) )
if self.is_published(dedup_key): if self.is_published(dedup_key):
continue continue
# Site + stats enrichment (v0.8.0) on new events only. Sets
# _enriched.usgs_site / usgs_stats in event.data and derives
# severity from the WaterWatch band (None when no stats).
severity = await self._enrich_event(event)
if severity != event.severity:
event = event.model_copy(update={"severity": severity})
yield event yield event
self.mark_published(dedup_key) self.mark_published(dedup_key)
events_yielded += 1 events_yielded += 1
@ -394,6 +415,84 @@ class NWISAdapter(SourceAdapter):
data=data, data=data,
) )
async def _site_bundle(self, site_id: str) -> dict[str, Any]:
"""usgs_site bundle from the OGC monitoring-locations item. Cache-first;
all-null (never raises) on lookup failure so the event still publishes."""
if self._enrich_cache is not None:
cached = await self._enrich_cache.get("site", site_id, USGS_SITE_TTL_S)
if cached is not None:
return cached
try:
text = await self._fetch(f"{NWIS_SITE_ITEM_URL}/{site_id}?f=json")
bundle = nwis_enrich.parse_site_feature(json.loads(text))
except Exception as e:
logger.warning(
"NWIS site enrichment failed",
extra={"site": site_id, "error": str(e)},
)
return nwis_enrich.site_null_bundle()
if self._enrich_cache is not None:
await self._enrich_cache.set("site", site_id, bundle)
return bundle
async def _stats_bundle(
self,
site_id: str,
bare_site_no: str,
parameter_code: str,
value: float | None,
event_time: datetime,
) -> dict[str, Any]:
"""usgs_stats bundle from the legacy RDB daily-percentile service.
Caches the parsed day-of-year table per (site, parameter_code) so a
single fetch classifies every reading at that site for the TTL window.
All-null (value echoed; never raises) on failure / no data.
"""
key = f"{site_id}:{parameter_code}"
table = None
if self._enrich_cache is not None:
table = await self._enrich_cache.get("stats", key, USGS_STATS_TTL_S)
if table is None:
params = {
"sites": bare_site_no,
"statReportType": "daily",
"statTypeCd": "P10,P25,P50,P75,P90,max",
"parameterCd": parameter_code,
"format": "rdb",
}
try:
text = await self._fetch(f"{NWIS_STATS_URL}?{urlencode(params)}")
table = nwis_enrich.parse_stats_rdb(text)
except Exception as e:
logger.warning(
"NWIS stats enrichment failed",
extra={"site": site_id, "parameter_code": parameter_code, "error": str(e)},
)
return {**nwis_enrich.stats_null_bundle(), "value": value}
if self._enrich_cache is not None:
await self._enrich_cache.set("stats", key, table)
return nwis_enrich.build_stats_bundle(
value, table, event_time.month, event_time.day
)
async def _enrich_event(self, event: Event) -> int | None:
"""Attach _enriched.usgs_site + _enriched.usgs_stats in place; return the
stats-derived severity (0-4, or None when no usable stats)."""
data = event.data
site_id = data.get("monitoring_location_id")
if not site_id:
return event.severity
_agency, bare_site_no = _subject_tokens_for_id(site_id)
site = await self._site_bundle(site_id)
stats = await self._stats_bundle(
site_id, bare_site_no, data.get("parameter_code"), data.get("value"), event.time
)
enriched = data.setdefault("_enriched", {})
enriched["usgs_site"] = site
enriched["usgs_stats"] = stats
return stats.get("severity_band")
async def _fetch_preview_text(self, url: str) -> str: async def _fetch_preview_text(self, url: str) -> str:
"""One-shot GET for the preview render. """One-shot GET for the preview render.

View file

@ -0,0 +1,305 @@
"""USGS site + stats enrichment helpers for the NWIS adapter (v0.8.0).
NWIS-specific (Approach B the adapter owns its USGS enrichment), producing the
``_enriched.usgs_site`` and ``_enriched.usgs_stats`` bundles. This module holds
the pure parse/classify functions plus a small sqlite cache; the adapter wires
them in (see nwis.py).
- Site metadata: OGC monitoring-locations item-by-id (JSON), same API family the
adapter already speaks.
- Daily stats: the legacy waterservices RDB ``stat`` service the OGC API has no
statistics endpoint.
USGS percentiles are "percent of days at or below this value", so a HIGHER
percentile means HIGHER flow. WaterWatch bands map to a 0-4 severity (None is
reserved for "no stats available", which is distinct from a normal-flow gauge):
value > historical daily max -> record high severity 4
value > P90 -> much above normal severity 3
P75 < value <= P90 -> above normal severity 2
P25 <= value <= P75 -> normal severity 1
P10 <= value < P25 -> below normal severity 2
value < P10 -> much below normal severity 3
(no usable thresholds) -> None severity None
"""
import asyncio
import json
import logging
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# TTLs: site metadata is near-static; the daily-percentile table drifts slowly
# (USGS recomputes period-of-record stats infrequently), so one fetch per
# site+parameter covers the whole year of day-of-year rows for a season.
USGS_SITE_TTL_S = 365 * 86400
USGS_STATS_TTL_S = 90 * 86400
SITE_FIELDS: tuple[str, ...] = ("name", "lat", "lon", "state", "county")
STATS_FIELDS: tuple[str, ...] = (
"value", "percentile", "class_label", "severity_band",
"p10", "p25", "p50", "p75", "p90", "record_max", "count", "period",
)
# WaterWatch band -> severity (0-4). None is NOT in here: it means "no stats".
SEVERITY_BY_BAND: dict[str, int] = {
"record high": 4,
"much above normal": 3,
"above normal": 2,
"normal": 1,
"below normal": 2,
"much below normal": 3,
}
def site_null_bundle() -> dict[str, Any]:
return {f: None for f in SITE_FIELDS}
def stats_null_bundle() -> dict[str, Any]:
return {f: None for f in STATS_FIELDS}
def parse_site_feature(feature: dict) -> dict[str, Any]:
"""OGC monitoring-locations Feature -> usgs_site bundle (all-null on bad shape)."""
if not isinstance(feature, dict):
return site_null_bundle()
props = feature.get("properties") or {}
geom = feature.get("geometry") or {}
coords = geom.get("coordinates") if isinstance(geom, dict) else None
lat = lon = None
if (
isinstance(coords, list)
and len(coords) == 2
and all(isinstance(c, (int, float)) for c in coords)
):
lon, lat = float(coords[0]), float(coords[1]) # GeoJSON (lon, lat)
return {
"name": props.get("monitoring_location_name"),
"lat": lat,
"lon": lon,
"state": props.get("state_name"),
"county": props.get("county_name"),
}
def _num(cols: list[str], idx: dict[str, int], key: str) -> float | None:
i = idx.get(key)
if i is None or i >= len(cols):
return None
raw = cols[i].strip()
if raw == "":
return None
try:
return float(raw)
except ValueError:
return None
def parse_stats_rdb(text: str) -> dict[str, dict[str, Any]]:
"""Parse the daily-statistics RDB into a per-day threshold table.
Returns ``{"<month>-<day>": {p10, p25, p50, p75, p90, max, count,
begin_yr, end_yr}}`` with blank/missing numeric cells as None. Keys are
JSON-friendly strings so the table caches directly. ``{}`` on bad input.
Column positions are read from the RDB header row (robust to USGS column
reordering); the line after the header is the RDB format row and is skipped.
"""
lines = [ln for ln in text.splitlines() if ln and not ln.startswith("#")]
if len(lines) < 3:
return {}
header = lines[0].split("\t")
idx = {name: i for i, name in enumerate(header)}
if "month_nu" not in idx or "day_nu" not in idx:
return {}
table: dict[str, dict[str, Any]] = {}
for ln in lines[2:]: # lines[1] is the "5s 15s ..." format row
cols = ln.split("\t")
month = _num(cols, idx, "month_nu")
day = _num(cols, idx, "day_nu")
if month is None or day is None:
continue
count = _num(cols, idx, "count_nu")
table[f"{int(month)}-{int(day)}"] = {
"p10": _num(cols, idx, "p10_va"),
"p25": _num(cols, idx, "p25_va"),
"p50": _num(cols, idx, "p50_va"),
"p75": _num(cols, idx, "p75_va"),
"p90": _num(cols, idx, "p90_va"),
"max": _num(cols, idx, "max_va"),
"count": int(count) if count is not None else None,
"begin_yr": _num(cols, idx, "begin_yr"),
"end_yr": _num(cols, idx, "end_yr"),
}
return table
def percentile_of(value: float, day: dict[str, Any]) -> int | None:
"""Interpolate the value's approximate percentile from a day's thresholds.
Piecewise-linear over the available (percentile, threshold) points, with an
implicit (0th, 0.0) lower bound (flow/stage are non-negative) and a (100th,
max) upper bound when the daily max is known. None when fewer than two
usable points exist.
"""
pts: list[tuple[float, float]] = [(0.0, 0.0)]
for pct, key in ((10, "p10"), (25, "p25"), (50, "p50"), (75, "p75"), (90, "p90")):
v = day.get(key)
if v is not None:
pts.append((float(pct), float(v)))
mx = day.get("max")
if mx is not None:
pts.append((100.0, float(mx)))
pts = sorted(set(pts), key=lambda t: t[1])
if len(pts) < 2:
return None
if value <= pts[0][1]:
return int(round(pts[0][0]))
if value >= pts[-1][1]:
return int(round(pts[-1][0]))
for i in range(1, len(pts)):
p0, v0 = pts[i - 1]
p1, v1 = pts[i]
if v0 <= value <= v1:
if v1 == v0:
return int(round(p1))
return int(round(p0 + (p1 - p0) * (value - v0) / (v1 - v0)))
return None
def classify(value: float | None, day: dict[str, Any]) -> tuple[str | None, int | None, int | None]:
"""Classify a value against a day's thresholds -> (class_label, percentile, severity).
Best-effort when some thresholds are missing (e.g. P90 blank -> the top
reachable band without a max is 'above normal'). Returns all-None when no
threshold lets us place the value at all.
"""
if value is None:
return (None, None, None)
p10, p25, p75, p90, mx = (
day.get("p10"), day.get("p25"), day.get("p75"), day.get("p90"), day.get("max"),
)
label: str | None = None
if mx is not None and value > mx:
label = "record high"
elif p90 is not None and value > p90:
label = "much above normal"
elif p75 is not None and value > p75:
label = "above normal"
elif p25 is not None and value >= p25:
label = "normal"
elif p10 is not None and value >= p10:
label = "below normal"
elif p10 is not None and value < p10:
label = "much below normal"
if label is None:
return (None, percentile_of(value, day), None)
return (label, percentile_of(value, day), SEVERITY_BY_BAND.get(label))
def build_stats_bundle(value: float | None, table: dict[str, dict[str, Any]], month: int, day: int) -> dict[str, Any]:
"""Assemble the usgs_stats bundle for one reading from a parsed day-table.
The reading's ``value`` is always echoed (useful even with no thresholds);
thresholds/classification fill in when the matching day-of-year row exists.
"""
bundle = stats_null_bundle()
bundle["value"] = value
row = table.get(f"{month}-{day}") if table else None
if not row:
return bundle
for k in ("p10", "p25", "p50", "p75", "p90"):
bundle[k] = row.get(k)
bundle["record_max"] = row.get("max")
bundle["count"] = row.get("count")
by, ey = row.get("begin_yr"), row.get("end_yr")
bundle["period"] = f"{int(by)}{int(ey)}" if by and ey else None
label, pct, sev = classify(value, row)
bundle["class_label"] = label
bundle["percentile"] = pct
bundle["severity_band"] = sev
return bundle
class SiteStatsCache:
"""Thread-offloaded sqlite cache for NWIS site bundles + stats day-tables.
Keyed by (kind, key): kind 'site' key=monitoring_location_id (TTL ~1yr),
kind 'stats' key='<site_id>:<parameter_code>' (TTL ~90d, stores the whole
parsed day-of-year table so one fetch serves every reading at that site).
Mirrors the EnrichmentCache pattern (fresh connection per op, ttl on read).
"""
_SCHEMA = """
CREATE TABLE IF NOT EXISTS nwis_cache (
kind TEXT NOT NULL,
key TEXT NOT NULL,
payload_json TEXT NOT NULL,
cached_at TEXT NOT NULL,
PRIMARY KEY (kind, key)
)
"""
def __init__(self, db_path: str | Path) -> None:
self._db_path = Path(db_path)
self._db_path.parent.mkdir(parents=True, exist_ok=True)
conn = self._connect()
try:
conn.execute(self._SCHEMA)
conn.commit()
finally:
conn.close()
def _connect(self) -> sqlite3.Connection:
return sqlite3.connect(self._db_path, timeout=30)
def _get_sync(self, kind: str, key: str, ttl_s: int) -> Any | None:
conn = self._connect()
try:
cur = conn.execute(
"SELECT payload_json, cached_at FROM nwis_cache WHERE kind = ? AND key = ?",
(kind, key),
)
row = cur.fetchone()
finally:
conn.close()
if row is None:
return None
payload_json, cached_at_iso = row
try:
cached_at = datetime.fromisoformat(cached_at_iso)
except ValueError:
return None
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
if (datetime.now(timezone.utc) - cached_at).total_seconds() > ttl_s:
return None
return json.loads(payload_json)
def _set_sync(self, kind: str, key: str, payload: Any) -> None:
now_iso = datetime.now(timezone.utc).isoformat()
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO nwis_cache (kind, key, payload_json, cached_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (kind, key) DO UPDATE SET
payload_json = excluded.payload_json,
cached_at = excluded.cached_at
""",
(kind, key, json.dumps(payload), now_iso),
)
conn.commit()
finally:
conn.close()
async def get(self, kind: str, key: str, ttl_s: int) -> Any | None:
return await asyncio.to_thread(self._get_sync, kind, key, ttl_s)
async def set(self, kind: str, key: str, payload: Any) -> None:
await asyncio.to_thread(self._set_sync, kind, key, payload)

View file

@ -1,6 +1,13 @@
{# USGS NWIS water observations. Fields from payload->data->data. #} {# USGS NWIS water observations + v0.8.0 site/stats enrichment. payload->data->data. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %} {% set d = (event.data.get('data') or {}).get('data') or {} %}
{% set enr = d.get('_enriched') or {} %}
{% set site = enr.get('usgs_site') or {} %}
{% set st = enr.get('usgs_stats') or {} %}
{% if site.get('name') %}<dt>Site name</dt><dd>{{ site.name }}</dd>{% endif %}
{% if d.get('parameter_code') is not none %}<dt>Parameter</dt><dd>{{ d.parameter_code }}</dd>{% endif %} {% if d.get('parameter_code') is not none %}<dt>Parameter</dt><dd>{{ d.parameter_code }}</dd>{% endif %}
{% if d.get('value') is not none %}<dt>Value</dt><dd>{{ d.value }} {{ d.get('unit_of_measure', '') }}</dd>{% endif %} {% if d.get('value') is not none %}<dt>Value</dt><dd>{{ d.value }} {{ d.get('unit_of_measure', '') }}</dd>{% endif %}
{% if d.get('monitoring_location_id') is not none %}<dt>Site</dt><dd><code>{{ d.monitoring_location_id }}</code></dd>{% endif %} {% if st.get('class_label') %}<dt>Normalcy</dt><dd>{{ st.class_label }}{% if st.get('percentile') is not none %} (~{{ st.percentile }} percentile){% endif %}</dd>{% endif %}
{% if st.get('p50') is not none %}<dt>Typical (this day)</dt><dd>median {{ st.p50 }} {{ d.get('unit_of_measure', '') }}{% if st.get('period') %} · {{ st.period }}{% if st.get('count') %} ({{ st.count }} yrs){% endif %}{% endif %}</dd>{% endif %}
{% if site.get('county') or site.get('state') %}<dt>Location</dt><dd>{{ [site.county, site.state] | select | join(', ') }}</dd>{% endif %}
{% if d.get('monitoring_location_id') is not none %}<dt>Site ID</dt><dd><code>{{ d.monitoring_location_id }}</code></dd>{% endif %}
{% if d.get('statistic_id') is not none %}<dt>Statistic</dt><dd>{{ d.statistic_id }}</dd>{% endif %} {% if d.get('statistic_id') is not none %}<dt>Statistic</dt><dd>{{ d.statistic_id }}</dd>{% endif %}

View file

@ -1,2 +1,15 @@
{# USGS NWIS one-line summary (v0.8.0). Prefer site name + value + WaterWatch
band/percentile; fall back to "<site><value>" when stats are absent but the
site is named; finally to the bare "Water reading: <value>" so we never regress. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %} {% set d = (event.data.get('data') or {}).get('data') or {} %}
{%- if d.get('value') is not none %}Water reading: {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}{% endif -%} {% set enr = d.get('_enriched') or {} %}
{% set site = enr.get('usgs_site') or {} %}
{% set st = enr.get('usgs_stats') or {} %}
{%- set p = st.get('percentile') -%}
{%- set suf = (('th' if (p % 100) in [11, 12, 13] else {1: 'st', 2: 'nd', 3: 'rd'}.get(p % 10, 'th')) if p is not none else '') -%}
{%- if site.get('name') and d.get('value') is not none -%}
{{ site.name }} — {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}
{%- if st.get('class_label') %} ({{ st.class_label }}{% if p is not none %}, {{ p }}{{ suf }} percentile{% endif %}){% endif -%}
{%- elif d.get('value') is not none -%}
Water reading: {{ d.value }}{% if d.get('unit_of_measure') %} {{ d.unit_of_measure }}{% endif %}
{%- endif -%}

View file

@ -20,14 +20,24 @@
fallback (no hardcoded list). Captured once so it serves both the fallback (no hardcoded list). Captured once so it serves both the
Subject cell and the map popup (via data-subject). #} Subject cell and the map popup (via data-subject). #}
{% set subject_summary %}{% include ["_event_summaries/" ~ event.adapter ~ ".html", "_event_summaries/_default.html"] %}{% endset %} {% set subject_summary %}{% include ["_event_summaries/" ~ event.adapter ~ ".html", "_event_summaries/_default.html"] %}{% endset %}
{# Location: generic _enriched.geocoder reader, then top-level named {# Location: geocoder-first, then top-level named fields. When the
fields, then coordinates. No adapter-specific logic. #} geocoder resolved no named place, fall back generically to ANY other
_enriched.<source> carrying county/state (e.g. usgs_site, v0.8.0) so
the pattern works for future enrichers without per-adapter logic. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %} {% set d = (event.data.get('data') or {}).get('data') or {} %}
{% set gc = (d.get('_enriched') or {}).get('geocoder') or {} %} {% set enr = d.get('_enriched') or {} %}
{% set loc_local = gc.get('city') or d.get('city') or gc.get('county') or d.get('county') %} {% set gc = enr.get('geocoder') or {} %}
{% set loc_state = gc.get('state') or d.get('state') %} {% set ns = namespace(
{% set loc_country = gc.get('country') or d.get('country') %} local=(gc.get('city') or gc.get('county') or d.get('city') or d.get('county')),
{% set loc_parts = [loc_local, loc_state, loc_country] | select | list %} state=(gc.get('state') or d.get('state')),
country=(gc.get('country') or d.get('country'))) %}
{% if not (ns.local or ns.state) %}
{% for _src, b in enr.items() if _src != 'geocoder' and b is mapping %}
{% if not ns.local %}{% set ns.local = b.get('county') or b.get('city') %}{% endif %}
{% if not ns.state %}{% set ns.state = b.get('state') %}{% endif %}
{% endfor %}
{% endif %}
{% set loc_parts = [ns.local, ns.state, ns.country] | select | list %}
<tr class="event-row" data-row-idx="{{ loop.index0 }}" <tr class="event-row" data-row-idx="{{ loop.index0 }}"
data-event-id="{{ event.id }}" data-event-id="{{ event.id }}"
data-adapter="{{ event.adapter }}" data-adapter="{{ event.adapter }}"

View file

@ -79,9 +79,12 @@ async def apply_enrichment(
a non-null coordinate pair in event.data. If no declared pair resolves to a non-null coordinate pair in event.data. If no declared pair resolves to
coordinates, still attaches an all-null bundle so that every event from an coordinates, still attaches an all-null bundle so that every event from an
enriched adapter carries _enriched (consumers get a stable field set). enriched adapter carries _enriched (consumers get a stable field set).
Each enricher's result is keyed by enricher.name. Mutates the data dict in Each enricher's result is keyed by enricher.name. Results are MERGED into
place (Event is frozen, but its data dict is not this avoids a event.data["_enriched"] (not overwritten) so an adapter that populated its
model_copy on every published event). own bundles before yielding e.g. NWIS's usgs_site/usgs_stats (v0.8.0) —
keeps them alongside the framework's location-keyed bundles. Mutates the
data dict in place (Event is frozen, but its data dict is not this avoids
a model_copy on every published event).
""" """
if not enrichment_locations or not enrichers: if not enrichment_locations or not enrichers:
return return
@ -91,20 +94,18 @@ async def apply_enrichment(
if lat is None or lon is None: if lat is None or lon is None:
continue continue
location = {"lat": float(lat), "lon": float(lon)} location = {"lat": float(lat), "lon": float(lon)}
enriched: dict[str, Any] = {} target = event.data.setdefault("_enriched", {})
for enricher in enrichers: for enricher in enrichers:
enriched[enricher.name] = await enricher.enrich(location) target[enricher.name] = await enricher.enrich(location)
event.data["_enriched"] = enriched
return return
# No declared pair resolved to coordinates. Still attach _enriched: each # No declared pair resolved to coordinates. Still attach _enriched: each
# enricher resolves the null location to its own all-null bundle (per the # enricher resolves the null location to its own all-null bundle (per the
# never-raise contract), so coordless events (e.g. removal tombstones) # never-raise contract), so coordless events (e.g. removal tombstones)
# carry the same shape as enriched ones. # carry the same shape as enriched ones.
null_location = {"lat": None, "lon": None} null_location = {"lat": None, "lon": None}
enriched = {} target = event.data.setdefault("_enriched", {})
for enricher in enrichers: for enricher in enrichers:
enriched[enricher.name] = await enricher.enrich(null_location) target[enricher.name] = await enricher.enrich(null_location)
event.data["_enriched"] = enriched
# Stream subject mappings -- derived from the registry; every stream is included # Stream subject mappings -- derived from the registry; every stream is included
# (META too: supervisor must create it in JetStream even though archive skips it). # (META too: supervisor must create it in JetStream even though archive skips it).

View file

@ -22,14 +22,24 @@ def isolate_enrichment_cache(tmp_path, monkeypatch):
so without this fixture the suite writes to (or, for any user without write so without this fixture the suite writes to (or, for any user without write
access to /var/lib/central, fails on) the live cache. Point it at a access to /var/lib/central, fails on) the live cache. Point it at a
per-test temp dir so no test ever touches the production path. per-test temp dir so no test ever touches the production path.
Also redirects the NWIS adapter's site/stats cache (v0.8.0,
`central.adapters.nwis.NWIS_CACHE_DB_PATH`, same /var/lib/central prod
default) for the same reason NWISAdapter.startup() opens it.
""" """
import central.supervisor as supervisor_mod import central.supervisor as supervisor_mod
import central.adapters.nwis as nwis_mod
monkeypatch.setattr( monkeypatch.setattr(
supervisor_mod, supervisor_mod,
"ENRICHMENT_CACHE_DB_PATH", "ENRICHMENT_CACHE_DB_PATH",
tmp_path / "enrichment_cache.db", tmp_path / "enrichment_cache.db",
) )
monkeypatch.setattr(
nwis_mod,
"NWIS_CACHE_DB_PATH",
tmp_path / "nwis_cache.db",
)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")

View file

@ -175,6 +175,9 @@ class TestNWISAdapter:
tmp_path / "cursors.db", tmp_path / "cursors.db",
) )
adapter._fetch = AsyncMock(return_value=_fixture_text()) adapter._fetch = AsyncMock(return_value=_fixture_text())
# Isolate polling fetches from v0.8.0 per-event site/stats enrichment
# (which also calls _fetch); enrichment is covered in test_nwis_enrichment.
adapter._enrich_event = AsyncMock(return_value=None)
await adapter.startup() await adapter.startup()
events = [e async for e in adapter.poll()] events = [e async for e in adapter.poll()]
await adapter.shutdown() await adapter.shutdown()
@ -215,6 +218,8 @@ class TestNWISAdapter:
tmp_path / "cursors.db", tmp_path / "cursors.db",
) )
adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)]) adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)])
# Isolate polling fetches from v0.8.0 per-event site/stats enrichment.
adapter._enrich_event = AsyncMock(return_value=None)
await adapter.startup() await adapter.startup()
events = [e async for e in adapter.poll()] events = [e async for e in adapter.poll()]
await adapter.shutdown() await adapter.shutdown()

View file

@ -0,0 +1,260 @@
"""Tests for v0.8.0 NWIS site + stats enrichment.
Covers the pure parse/classify/band logic, the SiteStatsCache (hit/miss/expire),
the adapter's _enrich_event orchestration with mocked USGS responses (incl.
graceful nulls when USGS is down and cache-hit avoiding a refetch), and the
L-c summary template rendering per WaterWatch band. No live network, no live DB.
"""
from datetime import datetime, timezone
from pathlib import Path
import jinja2
import pytest
from central.adapters import nwis_enrich as ne
from central.adapters.nwis import NWISAdapter
from central.models import Event, Geo
# ── canned USGS fixtures ────────────────────────────────────────────────────
SITE_JSON = """
{"type":"Feature",
"geometry":{"type":"Point","coordinates":[-93.1926861111111,40.8004444444444]},
"properties":{"monitoring_location_name":"South Fork Chariton River near Promise City, IA",
"state_name":"Iowa","county_name":"Wayne County",
"country_name":"United States of America"}}
"""
# header / RDB-format row / one data row for month 1 day 1 (p90 intentionally blank).
STATS_RDB = (
"# canned\n"
"agency_cd\tsite_no\tparameter_cd\tts_id\tloc_web_ds\tmonth_nu\tday_nu\tbegin_yr\tend_yr\tcount_nu\tmax_va_yr\tmax_va\tp10_va\tp25_va\tp50_va\tp75_va\tp90_va\n"
"5s\t15s\t5s\t10n\t15s\t3n\t3n\t6n\t6n\t8n\t5n\t12s\t12s\t12s\t12s\t12s\t12s\n"
"USGS\t06903700\t00060\t43334\t\t1\t1\t1968\t2026\t59\t2019\t315\t10\t25\t50\t75\t90\n"
)
DAY = {"p10": 10.0, "p25": 25.0, "p50": 50.0, "p75": 75.0, "p90": 90.0, "max": 200.0}
# ── parse_site_feature ──────────────────────────────────────────────────────
def test_parse_site_feature_full():
import json
b = ne.parse_site_feature(json.loads(SITE_JSON))
assert b["name"] == "South Fork Chariton River near Promise City, IA"
assert b["state"] == "Iowa" and b["county"] == "Wayne County"
assert round(b["lat"], 3) == 40.800 and round(b["lon"], 3) == -93.193
def test_parse_site_feature_bad_shape_is_all_null():
assert ne.parse_site_feature({}) == ne.site_null_bundle()
assert ne.parse_site_feature(None) == ne.site_null_bundle()
# ── parse_stats_rdb ─────────────────────────────────────────────────────────
def test_parse_stats_rdb_keys_and_values():
table = ne.parse_stats_rdb(STATS_RDB)
assert "1-1" in table
row = table["1-1"]
assert row["p10"] == 10.0 and row["p75"] == 75.0 and row["max"] == 315.0
assert row["count"] == 59 and row["begin_yr"] == 1968.0 and row["end_yr"] == 2026.0
def test_parse_stats_rdb_garbage_is_empty():
assert ne.parse_stats_rdb("") == {}
assert ne.parse_stats_rdb("not\trdb\n") == {}
# ── classify: WaterWatch band edges ─────────────────────────────────────────
@pytest.mark.parametrize(
"value,label,severity",
[
(5.0, "much below normal", 3), # < P10 (P0-P9 region)
(9.0, "much below normal", 3), # P9
(10.0, "below normal", 2), # == P10 boundary
(24.0, "below normal", 2),
(25.0, "normal", 1), # == P25
(50.0, "normal", 1),
(75.0, "normal", 1), # == P75 (not > P75)
(80.0, "above normal", 2), # P75..P90
(90.0, "above normal", 2), # == P90 (not > P90)
(120.0, "much above normal", 3), # > P90, <= max
(250.0, "record high", 4), # > historical max
],
)
def test_classify_bands(value, label, severity):
lbl, pct, sev = ne.classify(value, DAY)
assert lbl == label
assert sev == severity
assert ne.SEVERITY_BY_BAND.get(label) == severity
def test_classify_no_thresholds_is_none():
assert ne.classify(42.0, {}) == (None, None, None)
def test_classify_none_value():
assert ne.classify(None, DAY) == (None, None, None)
def test_percentile_interpolates_and_bounds():
assert ne.percentile_of(50.0, DAY) == 50 # on the P50 point
assert ne.percentile_of(0.0, DAY) == 0 # lower bound
assert ne.percentile_of(200.0, DAY) == 100 # at max
mid = ne.percentile_of(17.5, DAY) # between P10(10) and P25(25)
assert 10 < mid < 25
# ── build_stats_bundle ──────────────────────────────────────────────────────
def test_build_stats_bundle_classifies_matching_day():
table = ne.parse_stats_rdb(STATS_RDB)
b = ne.build_stats_bundle(120.0, table, 1, 1)
assert b["value"] == 120.0
assert b["class_label"] == "much above normal"
assert b["severity_band"] == 3
assert b["p50"] == 50.0 and b["record_max"] == 315.0
assert b["period"] == "19682026" and b["count"] == 59
def test_build_stats_bundle_no_matching_day_echoes_value_only():
table = ne.parse_stats_rdb(STATS_RDB)
b = ne.build_stats_bundle(120.0, table, 7, 4) # no 7-4 row
assert b["value"] == 120.0
assert b["class_label"] is None and b["severity_band"] is None
# ── SiteStatsCache: miss / hit / expire ─────────────────────────────────────
@pytest.mark.asyncio
async def test_cache_miss_then_hit(tmp_path):
cache = ne.SiteStatsCache(tmp_path / "nwis_cache.db")
assert await cache.get("site", "X", 100) is None
await cache.set("site", "X", {"name": "Gauge"})
assert (await cache.get("site", "X", 100))["name"] == "Gauge"
@pytest.mark.asyncio
async def test_cache_expired(tmp_path):
cache = ne.SiteStatsCache(tmp_path / "nwis_cache.db")
await cache.set("stats", "X:00060", {"p10": 1})
assert await cache.get("stats", "X:00060", -1) is None # ttl already elapsed
# ── adapter _enrich_event orchestration ─────────────────────────────────────
def _make_event():
return Event(
id="USGS-06903700:00060:2026-01-01T00:00:00+00:00",
adapter="nwis",
category="hydro.00060.usgs.06903700",
time=datetime(2026, 1, 1, tzinfo=timezone.utc),
severity=0,
geo=Geo(centroid=(-93.19, 40.80)),
data={
"monitoring_location_id": "USGS-06903700",
"parameter_code": "00060",
"value": 120.0,
},
)
def _adapter_with_fetch(fetch, cache=None):
a = object.__new__(NWISAdapter)
a._enrich_cache = cache
a._fetch = fetch # shadows the @retry method
return a
@pytest.mark.asyncio
async def test_enrich_event_populates_bundles_and_severity(tmp_path):
async def fetch(url):
if "monitoring-locations" in url:
return SITE_JSON
if "/stat/" in url or "statReportType" in url:
return STATS_RDB
raise AssertionError(url)
a = _adapter_with_fetch(fetch, cache=ne.SiteStatsCache(tmp_path / "c.db"))
ev = _make_event()
sev = await a._enrich_event(ev)
enr = ev.data["_enriched"]
assert enr["usgs_site"]["name"].startswith("South Fork Chariton")
assert enr["usgs_site"]["state"] == "Iowa"
assert enr["usgs_stats"]["class_label"] == "much above normal"
assert enr["usgs_stats"]["severity_band"] == 3
assert sev == 3
@pytest.mark.asyncio
async def test_enrich_event_graceful_nulls_when_usgs_down(tmp_path):
async def fetch(url):
raise TimeoutError("USGS down")
a = _adapter_with_fetch(fetch, cache=ne.SiteStatsCache(tmp_path / "c.db"))
ev = _make_event()
sev = await a._enrich_event(ev)
enr = ev.data["_enriched"]
assert enr["usgs_site"] == ne.site_null_bundle()
assert enr["usgs_stats"]["value"] == 120.0 # value still echoed
assert enr["usgs_stats"]["class_label"] is None
assert sev is None # no stats -> unknown severity
@pytest.mark.asyncio
async def test_enrich_event_cache_hit_avoids_refetch(tmp_path):
calls = {"n": 0}
async def fetch(url):
calls["n"] += 1
return SITE_JSON if "monitoring-locations" in url else STATS_RDB
cache = ne.SiteStatsCache(tmp_path / "c.db")
a = _adapter_with_fetch(fetch, cache=cache)
await a._enrich_event(_make_event())
first = calls["n"]
assert first == 2 # one site + one stats fetch
await a._enrich_event(_make_event()) # same site+param -> both cached
assert calls["n"] == first # no additional USGS calls
# ── L-c summary template rendering per band ─────────────────────────────────
_TEMPLATES_DIR = Path(__file__).resolve().parents[1] / "src/central/gui/templates"
def _render_summary(stats=None, site=None, value=120.0, unit="ft3/s"):
src = (_TEMPLATES_DIR / "_event_summaries/nwis.html").read_text()
tmpl = jinja2.Environment(autoescape=True).from_string(src)
inner = {"value": value, "unit_of_measure": unit}
enriched = {}
if site is not None:
enriched["usgs_site"] = site
if stats is not None:
enriched["usgs_stats"] = stats
if enriched:
inner["_enriched"] = enriched
event = type("E", (), {"data": {"data": {"data": inner}}})()
return tmpl.render(event=event).strip()
def test_summary_full_band_and_percentile():
out = _render_summary(
site={"name": "South Fork Grand River near Gallatin, MO"},
stats={"class_label": "below normal", "percentile": 18},
)
assert "South Fork Grand River near Gallatin, MO — 120.0 ft3/s" in out
assert "(below normal, 18th percentile)" in out
def test_summary_site_no_stats():
out = _render_summary(site={"name": "Some Creek near Town, IA"}, stats=None)
assert out == "Some Creek near Town, IA — 120.0 ft3/s"
def test_summary_no_enrichment_falls_back():
out = _render_summary(site=None, stats=None)
assert out == "Water reading: 120.0 ft3/s"