2026-05-19 16:50:21 +00:00
|
|
|
"""Tests for USGS NWIS adapter (OGC API)."""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from central.config_models import AdapterConfig
|
|
|
|
|
from central.models import Event, Geo
|
|
|
|
|
|
|
|
|
|
FIXTURE_PATH = Path(__file__).parent / "fixtures" / "nwis_latest_sample.json"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fixture_text() -> str:
|
|
|
|
|
return FIXTURE_PATH.read_text()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fixture_json() -> dict:
|
|
|
|
|
return json.loads(_fixture_text())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _config(settings: dict | None = None) -> AdapterConfig:
|
|
|
|
|
return AdapterConfig(
|
|
|
|
|
name="nwis",
|
|
|
|
|
enabled=True,
|
|
|
|
|
cadence_s=900,
|
|
|
|
|
settings=settings or {},
|
|
|
|
|
updated_at=datetime.now(timezone.utc),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestNWISHelpers:
|
|
|
|
|
def test_subject_decomposes_usgs(self):
|
|
|
|
|
"""USGS-05420500 -> agency='usgs', bare='05420500'; subject endswith .00060.usgs.05420500."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
|
|
|
|
|
|
|
|
|
|
agency, bare = _subject_tokens_for_id("USGS-05420500")
|
|
|
|
|
assert agency == "usgs"
|
|
|
|
|
assert bare == "05420500"
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
|
|
|
|
|
event = Event(
|
|
|
|
|
id="USGS-05420500:00060:2026-05-19T15:30:00+00:00",
|
|
|
|
|
adapter="nwis",
|
|
|
|
|
category=f"hydro.00060.{agency}.{bare}",
|
|
|
|
|
time=datetime(2026, 5, 19, 15, 30, tzinfo=timezone.utc),
|
|
|
|
|
severity=0,
|
|
|
|
|
geo=Geo(),
|
|
|
|
|
data={},
|
|
|
|
|
)
|
|
|
|
|
assert adapter.subject_for(event).endswith(".00060.usgs.05420500")
|
|
|
|
|
|
|
|
|
|
def test_subject_decomposes_non_usgs(self):
|
|
|
|
|
"""MO005-400105093591601 -> agency='mo005', bare='400105093591601'; subject reflects both."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
|
|
|
|
|
|
|
|
|
|
agency, bare = _subject_tokens_for_id("MO005-400105093591601")
|
|
|
|
|
assert agency == "mo005"
|
|
|
|
|
assert bare == "400105093591601"
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
|
|
|
|
|
event = Event(
|
|
|
|
|
id="MO005-400105093591601:00060:t",
|
|
|
|
|
adapter="nwis",
|
|
|
|
|
category=f"hydro.00060.{agency}.{bare}",
|
|
|
|
|
time=datetime.now(timezone.utc),
|
|
|
|
|
severity=0,
|
|
|
|
|
geo=Geo(),
|
|
|
|
|
data={},
|
|
|
|
|
)
|
|
|
|
|
assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601")
|
|
|
|
|
|
|
|
|
|
def test_subject_unprefixed_id_falls_back(self):
|
|
|
|
|
"""ID with no dash falls back to agency='unknown'."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id
|
|
|
|
|
|
|
|
|
|
bare_input = "STANDALONE12345"
|
|
|
|
|
agency, bare = _subject_tokens_for_id(bare_input)
|
|
|
|
|
assert agency == "unknown"
|
|
|
|
|
assert bare == bare_input.lower()
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db"))
|
|
|
|
|
event = Event(
|
|
|
|
|
id="X",
|
|
|
|
|
adapter="nwis",
|
|
|
|
|
category=f"hydro.00060.{agency}.{bare}",
|
|
|
|
|
time=datetime.now(timezone.utc),
|
|
|
|
|
severity=0,
|
|
|
|
|
geo=Geo(),
|
|
|
|
|
data={},
|
|
|
|
|
)
|
|
|
|
|
subj = adapter.subject_for(event)
|
|
|
|
|
assert subj.endswith(f".00060.unknown.{bare}")
|
|
|
|
|
|
|
|
|
|
def test_dedup_key_composite(self):
|
|
|
|
|
"""Same id+param+time -> same key; different time -> different key."""
|
|
|
|
|
from central.adapters.nwis import _dedup_key
|
|
|
|
|
|
|
|
|
|
a = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
|
|
|
|
|
b = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00")
|
|
|
|
|
c = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:45:00+00:00")
|
|
|
|
|
assert a == b
|
|
|
|
|
assert a != c
|
|
|
|
|
# All three components are present in the key
|
|
|
|
|
assert "USGS-05420500" in a
|
|
|
|
|
assert "00060" in a
|
|
|
|
|
assert "2026-05-19T15:30:00+00:00" in a
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestNWISSettings:
|
|
|
|
|
def test_parameter_codes_default_is_full_set(self):
|
|
|
|
|
"""Default equals _DEFAULT_PARAMETER_CODES — no parallel literal anywhere."""
|
|
|
|
|
from central.adapters.nwis import NWISSettings, _DEFAULT_PARAMETER_CODES
|
|
|
|
|
|
|
|
|
|
assert NWISSettings().parameter_codes == _DEFAULT_PARAMETER_CODES
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestNWISAdapter:
|
|
|
|
|
def test_class_attrs_complete(self):
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, NWISSettings
|
|
|
|
|
|
|
|
|
|
assert NWISAdapter.name == "nwis"
|
|
|
|
|
assert isinstance(NWISAdapter.display_name, str) and NWISAdapter.display_name
|
|
|
|
|
assert isinstance(NWISAdapter.description, str) and NWISAdapter.description
|
|
|
|
|
assert NWISAdapter.settings_schema is NWISSettings
|
|
|
|
|
assert NWISAdapter.requires_api_key is None
|
|
|
|
|
assert NWISAdapter.api_key_field is None
|
|
|
|
|
assert NWISAdapter.wizard_order is None
|
|
|
|
|
assert NWISAdapter.default_cadence_s == 900
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_lonlat_coordinate_order(self, tmp_path: Path):
|
|
|
|
|
"""Upstream coords [lon, lat] -> Geo.centroid=(lon, lat); no axis swap."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter
|
|
|
|
|
|
|
|
|
|
fix = _fixture_json()
|
|
|
|
|
src = fix["features"][0]
|
|
|
|
|
lon_in, lat_in = src["geometry"]["coordinates"]
|
|
|
|
|
# Sanity: fixture event 0 is in western/northern hemisphere.
|
|
|
|
|
assert lon_in < 0
|
|
|
|
|
assert 0 < lat_in < 90
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"parameter_codes": [src["properties"]["parameter_code"]]}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
adapter._fetch = AsyncMock(return_value=_fixture_text())
|
|
|
|
|
await adapter.startup()
|
|
|
|
|
events = [e async for e in adapter.poll()]
|
|
|
|
|
await adapter.shutdown()
|
|
|
|
|
|
|
|
|
|
target_site = src["properties"]["monitoring_location_id"]
|
|
|
|
|
emitted = next(e for e in events if e.data["monitoring_location_id"] == target_site)
|
|
|
|
|
assert emitted.geo.centroid == (lon_in, lat_in)
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_parameter_allowlist_filters(self, tmp_path: Path):
|
|
|
|
|
"""parameter_codes setting filters which queries we issue per poll.
|
|
|
|
|
|
|
|
|
|
With one allowed code, _fetch should be called exactly once
|
|
|
|
|
(per page; fixture has no next link → one call total).
|
|
|
|
|
"""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter
|
|
|
|
|
|
|
|
|
|
fix = _fixture_json()
|
|
|
|
|
target = fix["features"][0]["properties"]["parameter_code"]
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"parameter_codes": [target]}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
adapter._fetch = AsyncMock(return_value=_fixture_text())
|
|
|
|
|
await adapter.startup()
|
|
|
|
|
events = [e async for e in adapter.poll()]
|
|
|
|
|
await adapter.shutdown()
|
|
|
|
|
|
|
|
|
|
assert events
|
|
|
|
|
# All emitted events carry the only allowed parameter_code.
|
|
|
|
|
for e in events:
|
|
|
|
|
assert e.data["parameter_code"] == target
|
|
|
|
|
# Per-page fetch invoked once per parameter_code in the allowlist.
|
|
|
|
|
assert adapter._fetch.await_count == 1
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pagination_follows_next_link(self, tmp_path: Path):
|
|
|
|
|
"""Adapter follows OGC 'next' link until absent."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter
|
|
|
|
|
|
|
|
|
|
fix = _fixture_json()
|
|
|
|
|
target = fix["features"][0]["properties"]["parameter_code"]
|
|
|
|
|
|
|
|
|
|
page1 = {
|
|
|
|
|
"type": "FeatureCollection",
|
|
|
|
|
"features": fix["features"][:2],
|
|
|
|
|
"numberReturned": 2,
|
|
|
|
|
"links": [
|
|
|
|
|
{"rel": "next", "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?cursor=PAGE2"}
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
page2 = {
|
|
|
|
|
"type": "FeatureCollection",
|
|
|
|
|
"features": fix["features"][2:],
|
|
|
|
|
"numberReturned": len(fix["features"]) - 2,
|
|
|
|
|
"links": [],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"parameter_codes": [target]}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)])
|
|
|
|
|
await adapter.startup()
|
|
|
|
|
events = [e async for e in adapter.poll()]
|
|
|
|
|
await adapter.shutdown()
|
|
|
|
|
|
|
|
|
|
# Both pages drained, fetch called exactly twice.
|
|
|
|
|
assert adapter._fetch.await_count == 2
|
|
|
|
|
# Events from both pages emitted.
|
|
|
|
|
sites = {e.data["monitoring_location_id"] for e in events}
|
|
|
|
|
expected_sites = {f["properties"]["monitoring_location_id"] for f in fix["features"]}
|
|
|
|
|
assert sites == expected_sites
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_no_removal_pattern(self, tmp_path: Path):
|
|
|
|
|
"""Sites missing from a later poll do NOT emit a .removed event in v1."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter
|
|
|
|
|
|
|
|
|
|
fix = _fixture_json()
|
|
|
|
|
target = fix["features"][0]["properties"]["parameter_code"]
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"parameter_codes": [target]}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
# First poll: full fixture.
|
|
|
|
|
adapter._fetch = AsyncMock(return_value=_fixture_text())
|
|
|
|
|
await adapter.startup()
|
|
|
|
|
first_pass = [e async for e in adapter.poll()]
|
|
|
|
|
assert first_pass
|
|
|
|
|
|
|
|
|
|
# Second poll: empty result (every previously-seen site is now missing).
|
|
|
|
|
empty_page = {"type": "FeatureCollection", "features": [], "links": []}
|
|
|
|
|
adapter._fetch = AsyncMock(return_value=json.dumps(empty_page))
|
|
|
|
|
second_pass = [e async for e in adapter.poll()]
|
|
|
|
|
await adapter.shutdown()
|
|
|
|
|
|
|
|
|
|
# No removal events of any flavor.
|
|
|
|
|
assert all(".removed" not in e.category for e in second_pass)
|
|
|
|
|
assert second_pass == []
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path):
|
|
|
|
|
"""Second poll with identical fixture yields no new events (composite dedup hits)."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter
|
|
|
|
|
|
|
|
|
|
fix = _fixture_json()
|
|
|
|
|
target = fix["features"][0]["properties"]["parameter_code"]
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"parameter_codes": [target]}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
adapter._fetch = AsyncMock(return_value=_fixture_text())
|
|
|
|
|
await adapter.startup()
|
|
|
|
|
first_pass = [e async for e in adapter.poll()]
|
|
|
|
|
second_pass = [e async for e in adapter.poll()]
|
|
|
|
|
await adapter.shutdown()
|
|
|
|
|
|
|
|
|
|
assert first_pass
|
|
|
|
|
assert second_pass == []
|
feat(2-G.5): preview_for_settings framework hook + NWIS opt-in
Adds an optional async hook on SourceAdapter so any adapter can surface a
settings-driven preview on its /adapters/<name> edit page. The framework
renders the result generically as a table — no adapter-name branches in
GUI templates or route code.
Framework changes:
- src/central/adapter.py: new async preview_for_settings(self, settings)
on the base class, default returns None. Adapters opt in by overriding;
non-overriding adapters render unchanged.
- src/central/gui/routes.py: GET /adapters/{name} instantiates the adapter
with a no-op _PreviewConfigStore stub and a /dev/null cursor path (GUI
has no live ConfigStore), constructs settings_obj via the schema, and
calls preview_for_settings inside a try/except. Result lands in template
context as preview_rows / preview_error.
- src/central/gui/templates/_adapter_preview.html: new partial. Generic
table with columns derived from the first dict's keys; error banner
mirrors the existing last_error article style.
- src/central/gui/templates/adapters_edit.html: one-line include between
the Region fieldset and Save/Cancel.
NWIS opt-in:
- New NWIS_MONITORING_LOCATIONS_URL constant and _PREVIEW_LIMIT cap of 50.
- preview_for_settings returns None when region is None, otherwise one-shot
fetches monitoring-locations within the bbox via a fresh aiohttp session.
Must work even when adapter is not started -- the GUI process never calls
startup(). Returns list[dict] with the contract column order: site_id,
name, site_type, state. Errors propagate so the framework can render the
operator-visible banner.
- HTTP call factored into _fetch_preview_text so tests mock cleanly.
Tests (7 new):
- tests/test_preview_hook.py: default returns None; partial renders list
with correct headers/rows/count; partial renders error banner; partial
renders empty when both context values are None.
- tests/test_nwis.py adds TestNWISPreview: returns None without region,
returns rows with correct column order, propagates HTTP errors.
Verification:
- 457/457 full suite green (was 450; +7 new tests).
- Live /adapters/nwis preview returns 50 rows with the contract keys
against the current production Iowa bbox.
- /adapters/eonet preview_for_settings returns None via base default --
proves framework is duck-typed, no NWIS-specific code in framework.
2026-05-19 17:34:35 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestNWISPreview:
|
|
|
|
|
"""Preview hook (PR G.5) — exercised without starting the adapter."""
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_preview_returns_none_without_region(self, tmp_path: Path):
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, NWISSettings
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(_config({}), MagicMock(), tmp_path / "cursors.db")
|
|
|
|
|
result = await adapter.preview_for_settings(NWISSettings())
|
|
|
|
|
assert result is None
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_preview_returns_rows_with_region(self, tmp_path: Path):
|
|
|
|
|
"""Given a bbox, preview returns one dict per monitoring-locations feature
|
|
|
|
|
with the contract column order: site_id, name, site_type, state."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, NWISSettings
|
|
|
|
|
|
|
|
|
|
sample_response = json.dumps({
|
|
|
|
|
"type": "FeatureCollection",
|
|
|
|
|
"features": [
|
|
|
|
|
{
|
|
|
|
|
"type": "Feature",
|
|
|
|
|
"id": "USGS-05420500",
|
|
|
|
|
"geometry": {"type": "Point", "coordinates": [-90.25, 41.78]},
|
|
|
|
|
"properties": {
|
|
|
|
|
"monitoring_location_name": "MISSISSIPPI RIVER AT CLINTON, IA",
|
|
|
|
|
"site_type_code": "ST",
|
|
|
|
|
"state_name": "Iowa",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"type": "Feature",
|
|
|
|
|
"id": "USGS-05454500",
|
|
|
|
|
"geometry": {"type": "Point", "coordinates": [-91.65, 41.66]},
|
|
|
|
|
"properties": {
|
|
|
|
|
"monitoring_location_name": "IOWA RIVER AT IOWA CITY, IA",
|
|
|
|
|
"site_type_code": "ST",
|
|
|
|
|
"state_name": "Iowa",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
],
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"region": {"west": -94.0, "south": 40.0, "east": -93.0, "north": 41.0}}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
adapter._fetch_preview_text = AsyncMock(return_value=sample_response)
|
|
|
|
|
settings = NWISSettings(region=adapter.region)
|
|
|
|
|
rows = await adapter.preview_for_settings(settings)
|
|
|
|
|
|
|
|
|
|
assert rows is not None
|
|
|
|
|
assert len(rows) == 2
|
|
|
|
|
# Column order is part of the contract — first row's keys must match exactly.
|
|
|
|
|
expected_keys = ["site_id", "name", "site_type", "state"]
|
|
|
|
|
assert list(rows[0].keys()) == expected_keys
|
|
|
|
|
# Row content reflects fixture data.
|
|
|
|
|
assert rows[0]["site_id"] == "USGS-05420500"
|
|
|
|
|
assert rows[0]["name"] == "MISSISSIPPI RIVER AT CLINTON, IA"
|
|
|
|
|
assert rows[0]["site_type"] == "ST"
|
|
|
|
|
assert rows[0]["state"] == "Iowa"
|
|
|
|
|
assert rows[1]["site_id"] == "USGS-05454500"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_preview_propagates_http_error(self, tmp_path: Path):
|
|
|
|
|
"""Preview must not swallow upstream errors — the framework needs them
|
|
|
|
|
to render the operator-visible 'Preview unavailable: …' banner."""
|
|
|
|
|
from central.adapters.nwis import NWISAdapter, NWISSettings
|
|
|
|
|
|
|
|
|
|
adapter = NWISAdapter(
|
|
|
|
|
_config({"region": {"west": -94.0, "south": 40.0, "east": -93.0, "north": 41.0}}),
|
|
|
|
|
MagicMock(),
|
|
|
|
|
tmp_path / "cursors.db",
|
|
|
|
|
)
|
|
|
|
|
adapter._fetch_preview_text = AsyncMock(side_effect=RuntimeError("upstream 503"))
|
|
|
|
|
settings = NWISSettings(region=adapter.region)
|
|
|
|
|
with pytest.raises(RuntimeError, match="upstream 503"):
|
|
|
|
|
await adapter.preview_for_settings(settings)
|