central/tests/test_nwis.py
zvx 5d64a8f70d feat(2-G): USGS NWIS adapter (OGC API) + CENTRAL_HYDRO stream
NASA WaterData OGC API v0 (latest-continuous collection) — polls configured
parameter codes within an operator-set bbox and publishes on the new
CENTRAL_HYDRO stream.

- Subject: central.hydro.<parameter_code>.<agency>.<bare_site_no>
  (e.g. central.hydro.00060.usgs.05420500). The agency/site decomposition
  lives in a single _subject_tokens_for_id helper.
- Default parameter codes: 00060 (discharge), 00065 (gage height),
  00010 (water temperature). Operator-tunable; single SoT in
  _DEFAULT_PARAMETER_CODES — no parallel literals.
- Composite dedup: nwis:<monitoring_location_id>:<param>:<time_iso>.
  Prefix kept in dedup key for cross-agency uniqueness.
- Pagination: follows OGC 'rel=next' link until absent (cursor-based).
- Region bbox is REQUIRED in practice; adapter logs WARN at startup if
  region is None (does not refuse to start).
- New stream CENTRAL_HYDRO added to streams.py registry (one line).
  Retention mirrors CENTRAL_DISASTER (7 days, 1 GiB).
- No removal pattern in v1 — sites are static; missing data is the signal.

Upstream divergences from the original spec brief, caught by pre-build curl:
- Collection is 'latest-continuous', not 'instantaneous-values'.
- Site filter param is 'monitoring_location_id' (singular), not
  'monitoring_locations_id' (plural).
- Site identifier requires agency prefix in queries (USGS-NNNNN).
- feature.id is a per-record UUID, not stable; dedup uses joint key.

Ships disabled; operator enables via GUI after setting a bbox.
2026-05-19 16:50:21 +00:00

278 lines
10 KiB
Python

"""Tests for USGS NWIS adapter (OGC API)."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event, Geo
FIXTURE_PATH = Path(__file__).parent / "fixtures" / "nwis_latest_sample.json"
def _fixture_text() -> str:
return FIXTURE_PATH.read_text()
def _fixture_json() -> dict:
return json.loads(_fixture_text())
def _config(settings: dict | None = None) -> AdapterConfig:
return AdapterConfig(
name="nwis",
enabled=True,
cadence_s=900,
settings=settings or {},
updated_at=datetime.now(timezone.utc),
)
class TestNWISHelpers:
def test_subject_decomposes_usgs(self):
"""USGS-05420500 -> agency='usgs', bare='05420500'; subject endswith .00060.usgs.05420500."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
agency, bare = _subject_tokens_for_id("USGS-05420500")
assert agency == "usgs"
assert bare == "05420500"
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="USGS-05420500:00060:2026-05-19T15:30:00+00:00",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime(2026, 5, 19, 15, 30, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".00060.usgs.05420500")
def test_subject_decomposes_non_usgs(self):
"""MO005-400105093591601 -> agency='mo005', bare='400105093591601'; subject reflects both."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
agency, bare = _subject_tokens_for_id("MO005-400105093591601")
assert agency == "mo005"
assert bare == "400105093591601"
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="MO005-400105093591601:00060:t",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601")
def test_subject_unprefixed_id_falls_back(self):
"""ID with no dash falls back to agency='unknown'."""
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
bare_input = "STANDALONE12345"
agency, bare = _subject_tokens_for_id(bare_input)
assert agency == "unknown"
assert bare == bare_input.lower()
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
event = Event(
id="X",
adapter="nwis",
category=f"hydro.00060.{agency}.{bare}",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={},
)
subj = adapter.subject_for(event)
assert subj.endswith(f".00060.unknown.{bare}")
def test_dedup_key_composite(self):
"""Same id+param+time -> same key; different time -> different key."""
from central.adapters.nwis import _dedup_key
a = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
b = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
c = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:45:00+00:00")
assert a == b
assert a != c
# All three components are present in the key
assert "USGS-05420500" in a
assert "00060" in a
assert "2026-05-19T15:30:00+00:00" in a
class TestNWISSettings:
def test_parameter_codes_default_is_full_set(self):
"""Default equals _DEFAULT_PARAMETER_CODES — no parallel literal anywhere."""
from central.adapters.nwis import NWISSettings, _DEFAULT_PARAMETER_CODES
assert NWISSettings().parameter_codes == _DEFAULT_PARAMETER_CODES
class TestNWISAdapter:
def test_class_attrs_complete(self):
from central.adapters.nwis import NWISAdapter, NWISSettings
assert NWISAdapter.name == "nwis"
assert isinstance(NWISAdapter.display_name, str) and NWISAdapter.display_name
assert isinstance(NWISAdapter.description, str) and NWISAdapter.description
assert NWISAdapter.settings_schema is NWISSettings
assert NWISAdapter.requires_api_key is None
assert NWISAdapter.api_key_field is None
assert NWISAdapter.wizard_order is None
assert NWISAdapter.default_cadence_s == 900
@pytest.mark.asyncio
async def test_lonlat_coordinate_order(self, tmp_path: Path):
"""Upstream coords [lon, lat] -> Geo.centroid=(lon, lat); no axis swap."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
src = fix["features"][0]
lon_in, lat_in = src["geometry"]["coordinates"]
# Sanity: fixture event 0 is in western/northern hemisphere.
assert lon_in < 0
assert 0 < lat_in < 90
adapter = NWISAdapter(
_config({"parameter_codes": [src["properties"]["parameter_code"]]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
target_site = src["properties"]["monitoring_location_id"]
emitted = next(e for e in events if e.data["monitoring_location_id"] == target_site)
assert emitted.geo.centroid == (lon_in, lat_in)
@pytest.mark.asyncio
async def test_parameter_allowlist_filters(self, tmp_path: Path):
"""parameter_codes setting filters which queries we issue per poll.
With one allowed code, _fetch should be called exactly once
(per page; fixture has no next link → one call total).
"""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert events
# All emitted events carry the only allowed parameter_code.
for e in events:
assert e.data["parameter_code"] == target
# Per-page fetch invoked once per parameter_code in the allowlist.
assert adapter._fetch.await_count == 1
@pytest.mark.asyncio
async def test_pagination_follows_next_link(self, tmp_path: Path):
"""Adapter follows OGC 'next' link until absent."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
page1 = {
"type": "FeatureCollection",
"features": fix["features"][:2],
"numberReturned": 2,
"links": [
{"rel": "next", "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?cursor=PAGE2"}
],
}
page2 = {
"type": "FeatureCollection",
"features": fix["features"][2:],
"numberReturned": len(fix["features"]) - 2,
"links": [],
}
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)])
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Both pages drained, fetch called exactly twice.
assert adapter._fetch.await_count == 2
# Events from both pages emitted.
sites = {e.data["monitoring_location_id"] for e in events}
expected_sites = {f["properties"]["monitoring_location_id"] for f in fix["features"]}
assert sites == expected_sites
@pytest.mark.asyncio
async def test_no_removal_pattern(self, tmp_path: Path):
"""Sites missing from a later poll do NOT emit a .removed event in v1."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
# First poll: full fixture.
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
assert first_pass
# Second poll: empty result (every previously-seen site is now missing).
empty_page = {"type": "FeatureCollection", "features": [], "links": []}
adapter._fetch = AsyncMock(return_value=json.dumps(empty_page))
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
# No removal events of any flavor.
assert all(".removed" not in e.category for e in second_pass)
assert second_pass == []
@pytest.mark.asyncio
async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path):
"""Second poll with identical fixture yields no new events (composite dedup hits)."""
from central.adapters.nwis import NWISAdapter
fix = _fixture_json()
target = fix["features"][0]["properties"]["parameter_code"]
adapter = NWISAdapter(
_config({"parameter_codes": [target]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=_fixture_text())
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert first_pass
assert second_pass == []