central/tests/test_nwis.py
Matt Johnson 8612f0b75d feat(nwis): site + stats enrichment — named location + WaterWatch normalcy band (v0.8.0)
Opens the v0.8.x data-quality cleanup arc. Production code; central-gui AND
central-supervisor restart (adapter contract + enrichment behavior change).

NWIS events rendered as a bare "Water reading: 111 ft3/s" with an empty Location
column -- an operator couldn't tell where the gauge is or whether 111 ft3/s is
drought-low, normal, or near-flood. Coordinates were present but the reverse
geocoder returns null city/state/county for rural gauge points, and USGS site +
percentile data was never fetched. v0.8.0 fetches both.

Approach B (adapter-owned, per the proposal decision): the NWIS adapter -- which
already owns the USGS APIs -- fetches site metadata and daily stats itself and
writes two provenance bundles under event.data["_enriched"]:
- usgs_site {name, lat, lon, state, county} from the OGC monitoring-locations
  item-by-id (the API family the adapter already speaks; JSON, no RDB parser).
- usgs_stats {value, percentile, class_label, severity_band, p10..p90, record_max,
  count, period} from the legacy RDB daily-statistics service (the OGC API has no
  stats endpoint). USGS percentiles are % of days at-or-below, so higher = higher
  flow; classified to the WaterWatch bands -> severity 0-4 (record=4, much
  above/below=3, above/below=2, normal=1; None reserved for "no stats", distinct
  from a normal-flow gauge). Severity is set on the event, so it drives the v0.7.1
  severity chip-picker filter + v0.7.2 map-marker opacity.
- new nwis_enrich.py: pure parse/classify/percentile/band helpers + a sqlite
  SiteStatsCache (site TTL 365d, stats TTL 90d -- one fetch per site+param serves
  every reading for the window, so a warm cache makes zero USGS calls). USGS down
  -> cached-if-present else all-null bundle; the event still publishes.

Framework: the single agreed generic change -- supervisor apply_enrichment now
MERGES into _enriched instead of overwriting, so the still-global geocoder phase
doesn't clobber the adapter's bundles. No other adapter writes _enriched, so this
is inert for them.

GUI: _event_summaries/nwis.html -> "<site> -- <value> <units> (<band>, <Nth>
percentile)", with graceful fallback to "<site> -- <value>" then the bare
"Water reading:". _event_rows/nwis.html detail gains site/normalcy/typical/location
rows. _events_rows.html Location column falls back generically to any
_enriched.<source> carrying state/county when the geocoder is null (works for
future enrichers). events.json contract unchanged (additions under _enriched only).

conftest isolate_enrichment_cache also redirects NWIS_CACHE_DB_PATH off the prod
path (unprivileged-user test isolation). Adds tests/test_nwis_enrichment.py (28
tests: parse, band edges incl P0/P9/P10/P75/P90/record, percentile interpolation,
cache hit/miss/expire, adapter enrich + graceful-null + cache-hit-no-refetch,
summary rendering per band).

Full suite: 710 passed, 1 skipped (central and unprivileged zvx, 3x each).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 15:30:19 +00:00

364 lines
14 KiB
Python

"""Tests for USGS NWIS adapter (OGC API)."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event, Geo
FIXTURE_PATH = Path(__file__).parent / "fixtures" / "nwis_latest_sample.json"
def _fixture_text() -> str:
return FIXTURE_PATH.read_text()
def _fixture_json() -> dict:
return json.loads(_fixture_text())
def _config(settings: dict | None = None) -> AdapterConfig:
return AdapterConfig(
name="nwis",
enabled=True,
cadence_s=900,
settings=settings or {},
updated_at=datetime.now(timezone.utc),
)
class TestNWISHelpers:
def test_subject_decomposes_usgs(self):
"""USGS-05420500 -> agency='usgs', bare='05420500'; subject endswith .00060.usgs.05420500."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
agency, bare = _subject_tokens_for_id("USGS-05420500")
assert agency == "usgs"
assert bare == "05420500"
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="USGS-05420500:00060:2026-05-19T15:30:00+00:00",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime(2026, 5, 19, 15, 30, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".00060.usgs.05420500")
def test_subject_decomposes_non_usgs(self):
"""MO005-400105093591601 -> agency='mo005', bare='400105093591601'; subject reflects both."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
agency, bare = _subject_tokens_for_id("MO005-400105093591601")
assert agency == "mo005"
assert bare == "400105093591601"
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="MO005-400105093591601:00060:t",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601")
def test_subject_unprefixed_id_falls_back(self):
"""ID with no dash falls back to agency='unknown'."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
bare_input = "STANDALONE12345"
agency, bare = _subject_tokens_for_id(bare_input)
assert agency == "unknown"
assert bare == bare_input.lower()
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="X",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
subj = adapter.subject_for(event)
assert subj.endswith(f".00060.unknown.{bare}")
def test_dedup_key_composite(self):
"""Same id+param+time -> same key; different time -> different key."""
from central.adapters.nwis import _dedup_key
a = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
b = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
c = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:45:00+00:00")
assert a == b
assert a != c
# All three components are present in the key
assert "USGS-05420500" in a
assert "00060" in a
assert "2026-05-19T15:30:00+00:00" in a
class TestNWISSettings:
def test_parameter_codes_default_is_full_set(self):
"""Default equals _DEFAULT_PARAMETER_CODES — no parallel literal anywhere."""
from central.adapters.nwis import NWISSettings, _DEFAULT_PARAMETER_CODES
assert NWISSettings().parameter_codes == _DEFAULT_PARAMETER_CODES
class TestNWISAdapter:
def test_class_attrs_complete(self):
from central.adapters.nwis import NWISAdapter, NWISSettings
assert NWISAdapter.name == "nwis"
assert isinstance(NWISAdapter.display_name, str) and NWISAdapter.display_name
assert isinstance(NWISAdapter.description, str) and NWISAdapter.description
assert NWISAdapter.settings_schema is NWISSettings
assert NWISAdapter.requires_api_key is None
assert NWISAdapter.api_key_field is None
assert NWISAdapter.wizard_order is None
assert NWISAdapter.default_cadence_s == 900
@pytest.mark.asyncio
async def test_lonlat_coordinate_order(self, tmp_path: Path):
"""Upstream coords [lon, lat] -> Geo.centroid=(lon, lat); no axis swap."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
src = fix["features"][0]
lon_in, lat_in = src["geometry"]["coordinates"]
# Sanity: fixture event 0 is in western/northern hemisphere.
assert lon_in < 0
assert 0 < lat_in < 90
adapter = NWISAdapter(
_config({"parameter_codes": [src["properties"]["parameter_code"]]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
target_site = src["properties"]["monitoring_location_id"]
emitted = next(e for e in events if e.data["monitoring_location_id"] == target_site)
assert emitted.geo.centroid == (lon_in, lat_in)
@pytest.mark.asyncio
async def test_parameter_allowlist_filters(self, tmp_path: Path):
"""parameter_codes setting filters which queries we issue per poll.
With one allowed code, _fetch should be called exactly once
(per page; fixture has no next link → one call total).
"""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
# Isolate polling fetches from v0.8.0 per-event site/stats enrichment
# (which also calls _fetch); enrichment is covered in test_nwis_enrichment.
adapter._enrich_event = AsyncMock(return_value=None)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert events
# All emitted events carry the only allowed parameter_code.
for e in events:
assert e.data["parameter_code"] == target
# Per-page fetch invoked once per parameter_code in the allowlist.
assert adapter._fetch.await_count == 1
@pytest.mark.asyncio
async def test_pagination_follows_next_link(self, tmp_path: Path):
"""Adapter follows OGC 'next' link until absent."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
page1 = {
"type": "FeatureCollection",
"features": fix["features"][:2],
"numberReturned": 2,
"links": [
{"rel": "next", "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?cursor=PAGE2"}
],
}
page2 = {
"type": "FeatureCollection",
"features": fix["features"][2:],
"numberReturned": len(fix["features"]) - 2,
"links": [],
}
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)])
# Isolate polling fetches from v0.8.0 per-event site/stats enrichment.
adapter._enrich_event = AsyncMock(return_value=None)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Both pages drained, fetch called exactly twice.
assert adapter._fetch.await_count == 2
# Events from both pages emitted.
sites = {e.data["monitoring_location_id"] for e in events}
expected_sites = {f["properties"]["monitoring_location_id"] for f in fix["features"]}
assert sites == expected_sites
@pytest.mark.asyncio
async def test_no_removal_pattern(self, tmp_path: Path):
"""Sites missing from a later poll do NOT emit a .removed event in v1."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
# First poll: full fixture.
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
assert first_pass
# Second poll: empty result (every previously-seen site is now missing).
empty_page = {"type": "FeatureCollection", "features": [], "links": []}
adapter._fetch = AsyncMock(return_value=json.dumps(empty_page))
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
# No removal events of any flavor.
assert all(".removed" not in e.category for e in second_pass)
assert second_pass == []
@pytest.mark.asyncio
async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path):
"""Second poll with identical fixture yields no new events (composite dedup hits)."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert first_pass
assert second_pass == []
class TestNWISPreview:
"""Preview hook (PR G.5) — exercised without starting the adapter."""
@pytest.mark.asyncio
async def test_preview_returns_none_without_region(self, tmp_path: Path):
from central.adapters.nwis import NWISAdapter, NWISSettings
adapter = NWISAdapter(_config({}), MagicMock(), tmp_path / "cursors.db")
result = await adapter.preview_for_settings(NWISSettings())
assert result is None
@pytest.mark.asyncio
async def test_preview_returns_rows_with_region(self, tmp_path: Path):
"""Given a bbox, preview returns one dict per monitoring-locations feature
with the contract column order: site_id, name, site_type, state."""
from central.adapters.nwis import NWISAdapter, NWISSettings
sample_response = json.dumps({
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"id": "USGS-05420500",
"geometry": {"type": "Point", "coordinates": [-90.25, 41.78]},
"properties": {
"monitoring_location_name": "MISSISSIPPI RIVER AT CLINTON, IA",
"site_type_code": "ST",
"state_name": "Iowa",
},
},
{
"type": "Feature",
"id": "USGS-05454500",
"geometry": {"type": "Point", "coordinates": [-91.65, 41.66]},
"properties": {
"monitoring_location_name": "IOWA RIVER AT IOWA CITY, IA",
"site_type_code": "ST",
"state_name": "Iowa",
},
},
],
})
adapter = NWISAdapter(
_config({"region": {"west": -94.0, "south": 40.0, "east": -93.0, "north": 41.0}}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch_preview_text = AsyncMock(return_value=sample_response)
settings = NWISSettings(region=adapter.region)
rows = await adapter.preview_for_settings(settings)
assert rows is not None
assert len(rows) == 2
# Column order is part of the contract — first row's keys must match exactly.
expected_keys = ["site_id", "name", "site_type", "state"]
assert list(rows[0].keys()) == expected_keys
# Row content reflects fixture data.
assert rows[0]["site_id"] == "USGS-05420500"
assert rows[0]["name"] == "MISSISSIPPI RIVER AT CLINTON, IA"
assert rows[0]["site_type"] == "ST"
assert rows[0]["state"] == "Iowa"
assert rows[1]["site_id"] == "USGS-05454500"
@pytest.mark.asyncio
async def test_preview_propagates_http_error(self, tmp_path: Path):
"""Preview must not swallow upstream errors — the framework needs them
to render the operator-visible 'Preview unavailable: …' banner."""
from central.adapters.nwis import NWISAdapter, NWISSettings
adapter = NWISAdapter(
_config({"region": {"west": -94.0, "south": 40.0, "east": -93.0, "north": 41.0}}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch_preview_text = AsyncMock(side_effect=RuntimeError("upstream 503"))
settings = NWISSettings(region=adapter.region)
with pytest.raises(RuntimeError, match="upstream 503"):
await adapter.preview_for_settings(settings)