From 5d64a8f70dfa46049942f53cd46d3d7d46097e36 Mon Sep 17 00:00:00 2001 From: zvx Date: Tue, 19 May 2026 16:50:21 +0000 Subject: [PATCH] feat(2-G): USGS NWIS adapter (OGC API) + CENTRAL_HYDRO stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NASA WaterData OGC API v0 (latest-continuous collection) — polls configured parameter codes within an operator-set bbox and publishes on the new CENTRAL_HYDRO stream. - Subject: central.hydro... (e.g. central.hydro.00060.usgs.05420500). The agency/site decomposition lives in a single _subject_tokens_for_id helper. - Default parameter codes: 00060 (discharge), 00065 (gage height), 00010 (water temperature). Operator-tunable; single SoT in _DEFAULT_PARAMETER_CODES — no parallel literals. - Composite dedup: nwis:::. Prefix kept in dedup key for cross-agency uniqueness. - Pagination: follows OGC 'rel=next' link until absent (cursor-based). - Region bbox is REQUIRED in practice; adapter logs WARN at startup if region is None (does not refuse to start). - New stream CENTRAL_HYDRO added to streams.py registry (one line). Retention mirrors CENTRAL_DISASTER (7 days, 1 GiB). - No removal pattern in v1 — sites are static; missing data is the signal. Upstream divergences from the original spec brief, caught by pre-build curl: - Collection is 'latest-continuous', not 'instantaneous-values'. - Site filter param is 'monitoring_location_id' (singular), not 'monitoring_locations_id' (plural). - Site identifier requires agency prefix in queries (USGS-NNNNN). - feature.id is a per-record UUID, not stable; dedup uses joint key. Ships disabled; operator enables via GUI after setting a bbox. --- .../023_add_nwis_adapter_and_hydro_stream.sql | 29 ++ src/central/adapters/nwis.py | 377 ++++++++++++++++++ src/central/streams.py | 1 + tests/fixtures/nwis_latest_sample.json | 105 +++++ tests/test_nwis.py | 278 +++++++++++++ 5 files changed, 790 insertions(+) create mode 100644 sql/migrations/023_add_nwis_adapter_and_hydro_stream.sql create mode 100644 src/central/adapters/nwis.py create mode 100644 tests/fixtures/nwis_latest_sample.json create mode 100644 tests/test_nwis.py diff --git a/sql/migrations/023_add_nwis_adapter_and_hydro_stream.sql b/sql/migrations/023_add_nwis_adapter_and_hydro_stream.sql new file mode 100644 index 0000000..a282885 --- /dev/null +++ b/sql/migrations/023_add_nwis_adapter_and_hydro_stream.sql @@ -0,0 +1,29 @@ +-- Migration: 023_add_nwis_adapter_and_hydro_stream +-- Adds the CENTRAL_HYDRO JetStream stream row AND the NWIS adapter row. +-- Folded into a single migration because the adapter publishes onto +-- central.hydro.> — both rows ship together. +-- +-- Stream retention mirrors CENTRAL_DISASTER (7 days, 1 GiB). +-- Adapter ships disabled; operator enables via GUI after setting a bbox. +-- +-- The settings JSON below is the literal output of NWISSettings().model_dump_json() +-- at migration-author time. Regenerate via: +-- sudo -u central .venv/bin/python -c \ +-- "from central.adapters.nwis import NWISSettings; print(NWISSettings().model_dump_json())" +-- Do NOT hand-edit the parameter_codes here — _DEFAULT_PARAMETER_CODES in +-- src/central/adapters/nwis.py is the single source of truth. +-- +-- Idempotent: both inserts use ON CONFLICT DO NOTHING. + +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_HYDRO', 604800, 1073741824) +ON CONFLICT (name) DO NOTHING; + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'nwis', + false, + 900, + '{"parameter_codes":["00060","00065","00010"],"region":null}'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/nwis.py b/src/central/adapters/nwis.py new file mode 100644 index 0000000..772262f --- /dev/null +++ b/src/central/adapters/nwis.py @@ -0,0 +1,377 @@ +"""USGS NWIS (National Water Information System) adapter — OGC API v0.""" + +import json +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from urllib.parse import urlencode + +import aiohttp +from pydantic import BaseModel, Field +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +NWIS_LATEST_CONTINUOUS_URL = ( + "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items" +) + +# Single source of truth for the parameter-code default. Operators tune via +# NWISSettings.parameter_codes; do NOT duplicate this list elsewhere +# (tests, fixtures, migration JSON all derive from NWISSettings defaults). +# Codes are USGS pcodes — see /api/v3/parameter-codes for the registry. +# 00060 = Discharge, cubic feet per second +# 00065 = Gage height, feet +# 00010 = Temperature, water, degrees Celsius +_DEFAULT_PARAMETER_CODES: list[str] = ["00060", "00065", "00010"] + +# Per-request page size cap. Upstream maxes around 10000; we use a +# moderate value to balance pagination overhead vs latency. +_PAGE_LIMIT = 1000 + + +def _subject_tokens_for_id(monitoring_location_id: str) -> tuple[str, str]: + """Decompose an agency-prefixed monitoring_location_id into (agency, bare_site_no). + + Examples: + USGS-05420500 -> ("usgs", "05420500") + MO005-400105... -> ("mo005", "400105...") + no-dash-id -> ("unknown", "no-dash-id"-lowercased; effectively the whole id) + + This is the ONLY place this decomposition lives — subject_for() and + Event.category construction both call through here. + """ + if "-" not in monitoring_location_id: + return ("unknown", monitoring_location_id.lower()) + agency, bare = monitoring_location_id.split("-", 1) + return (agency.lower(), bare) + + +def _parse_iso_utc(raw: str | None) -> datetime | None: + """Parse an ISO 8601 timestamp ('...Z' or with offset) to UTC datetime.""" + if not raw: + return None + try: + dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _dedup_key(monitoring_location_id: str, parameter_code: str, time_iso: str) -> str: + """Composite dedup: same site+param+measurement-time -> suppress; new time -> re-publish.""" + return f"nwis:{monitoring_location_id}:{parameter_code}:{time_iso}" + + +def _next_link(page: dict) -> str | None: + """Extract OGC API pagination 'next' link href, or None if absent.""" + for link in page.get("links") or []: + if link.get("rel") == "next" and link.get("href"): + return link["href"] + return None + + +class NWISSettings(BaseModel): + """Settings schema for USGS NWIS adapter. + + bbox via RegionConfig is REQUIRED in practice — without a region the + upstream endpoint returns CONUS-wide records (tens of thousands per poll). + Adapter logs WARN at startup if region is None; it does not refuse to + start (operator may be testing). + """ + + parameter_codes: list[str] = Field(default=list(_DEFAULT_PARAMETER_CODES)) + region: RegionConfig | None = None + + +class NWISAdapter(SourceAdapter): + """USGS NWIS adapter via the OGC API v0 `latest-continuous` collection.""" + + name = "nwis" + display_name = "USGS NWIS — Water Data (OGC)" + description = ( + "USGS National Water Information System via the OGC API " + "(latest-continuous collection). Polls the configured parameter codes " + "within the configured bbox. Default params: discharge (00060), " + "gage height (00065), water temperature (00010). Operator opts in to " + "more via parameter_codes. bbox is REQUIRED — without one the endpoint " + "returns the entire US (tens of thousands of records per poll)." + ) + settings_schema = NWISSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 900 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self.parameter_codes: list[str] = list( + config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES) + ) + region_dict = config.settings.get("region") + self.region: RegionConfig | None = ( + RegionConfig(**region_dict) if region_dict else None + ) + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + if self.region is None: + logger.warning( + "NWIS started without region bbox — upstream will return CONUS-wide records on every poll. " + "Set region via the GUI before relying on this adapter." + ) + logger.info( + "NWIS adapter started", + extra={ + "parameter_codes": self.parameter_codes, + "region": self.region.model_dump() if self.region else None, + }, + ) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("NWIS adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + self.parameter_codes = list( + new_config.settings.get("parameter_codes", _DEFAULT_PARAMETER_CODES) + ) + region_dict = new_config.settings.get("region") + self.region = RegionConfig(**region_dict) if region_dict else None + logger.info( + "NWIS config updated", + extra={ + "parameter_codes": self.parameter_codes, + "region": self.region.model_dump() if self.region else None, + }, + ) + + def is_published(self, dedup_key: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, dedup_key), + ) + return cur.fetchone() is not None + + def mark_published(self, dedup_key: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, dedup_key), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("NWIS swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + # event.category is "hydro..." + parts = event.category.split(".") + if len(parts) >= 4: + return f"central.hydro.{parts[1]}.{parts[2]}.{parts[3]}" + return "central.hydro.unknown.unknown.unknown" + + def _initial_url(self, parameter_code: str) -> str: + params: dict[str, str] = { + "parameter_code": parameter_code, + "limit": str(_PAGE_LIMIT), + } + if self.region is not None: + params["bbox"] = ( + f"{self.region.west},{self.region.south}," + f"{self.region.east},{self.region.north}" + ) + return f"{NWIS_LATEST_CONTINUOUS_URL}?{urlencode(params)}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self, url: str) -> str: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + url, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + return await resp.text() + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + events_yielded = 0 + for parameter_code in self.parameter_codes: + url: str | None = self._initial_url(parameter_code) + pages_fetched = 0 + features_seen = 0 + while url: + try: + content = await self._fetch(url) + except Exception as e: + logger.error( + "NWIS fetch failed", + extra={"error": str(e), "parameter_code": parameter_code}, + ) + raise + try: + page = json.loads(content) + except json.JSONDecodeError as e: + logger.error( + "NWIS JSON parse error", + extra={"error": str(e), "parameter_code": parameter_code}, + ) + raise + pages_fetched += 1 + features = page.get("features") or [] + features_seen += len(features) + + for feature in features: + event = self._build_event(feature, parameter_code) + if event is None: + continue + dedup_key = _dedup_key( + event.data["monitoring_location_id"], + parameter_code, + event.data["time"], + ) + if self.is_published(dedup_key): + continue + yield event + self.mark_published(dedup_key) + events_yielded += 1 + + url = _next_link(page) + + logger.info( + "NWIS parameter poll completed", + extra={ + "parameter_code": parameter_code, + "pages_fetched": pages_fetched, + "features_seen": features_seen, + }, + ) + + self.sweep_old_ids() + logger.info( + "NWIS poll completed", + extra={"events_yielded": events_yielded}, + ) + + def _build_event(self, feature: dict, parameter_code: str) -> Event | None: + props = feature.get("properties") or {} + monitoring_location_id = props.get("monitoring_location_id") + if not monitoring_location_id: + return None + + time_iso = props.get("time") + event_time = _parse_iso_utc(time_iso) + if event_time is None or not time_iso: + return None + + value_raw = props.get("value") + try: + value = float(value_raw) if value_raw is not None else None + except (TypeError, ValueError): + value = None + if value is None: + return None + + geom = feature.get("geometry") or {} + coords = geom.get("coordinates") + centroid: tuple[float, float] | None = None + if ( + isinstance(coords, list) + and len(coords) == 2 + and all(isinstance(c, (int, float)) for c in coords) + ): + centroid = (float(coords[0]), float(coords[1])) # GeoJSON (lon, lat) + + agency, bare_site_no = _subject_tokens_for_id(monitoring_location_id) + + data: dict[str, Any] = { + "monitoring_location_id": monitoring_location_id, + "parameter_code": parameter_code, + "time": time_iso, + "value": value, + "unit_of_measure": props.get("unit_of_measure"), + "statistic_id": props.get("statistic_id"), + "approval_status": props.get("approval_status"), + "qualifier": props.get("qualifier"), + "time_series_id": props.get("time_series_id"), + "last_modified": props.get("last_modified"), + } + + return Event( + id=f"{monitoring_location_id}:{parameter_code}:{time_iso}", + adapter=self.name, + category=f"hydro.{parameter_code}.{agency}.{bare_site_no}", + time=event_time, + severity=0, + geo=Geo(centroid=centroid), + data=data, + ) diff --git a/src/central/streams.py b/src/central/streams.py index e9e05f5..e0408f5 100644 --- a/src/central/streams.py +++ b/src/central/streams.py @@ -28,5 +28,6 @@ STREAMS: list[StreamEntry] = [ StreamEntry("CENTRAL_QUAKE", "central.quake.>"), StreamEntry("CENTRAL_SPACE", "central.space.>"), StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), + StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), ] diff --git a/tests/fixtures/nwis_latest_sample.json b/tests/fixtures/nwis_latest_sample.json new file mode 100644 index 0000000..05dd279 --- /dev/null +++ b/tests/fixtures/nwis_latest_sample.json @@ -0,0 +1,105 @@ +{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "b28554ea-25f0-485c-ade2-0a4c73901768", + "geometry": { + "type": "Point", + "coordinates": [-90.2520730305021, 41.7805863501123] + }, + "properties": { + "id": "b28554ea-25f0-485c-ade2-0a4c73901768", + "time_series_id": "fbe039224c874449ae574fe6668e11d8", + "monitoring_location_id": "USGS-05420500", + "parameter_code": "00060", + "statistic_id": "00011", + "time": "2026-05-19T15:30:00+00:00", + "value": "57800", + "unit_of_measure": "ft^3/s", + "approval_status": "Provisional", + "qualifier": null, + "last_modified": "2026-05-19T15:43:28.012704+00:00" + } + }, + { + "type": "Feature", + "id": "8a4eb91f-1d33-4a02-9c4a-92c5b1ac0d12", + "geometry": { + "type": "Point", + "coordinates": [-91.6510, 41.6628] + }, + "properties": { + "id": "8a4eb91f-1d33-4a02-9c4a-92c5b1ac0d12", + "time_series_id": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6", + "monitoring_location_id": "USGS-05454500", + "parameter_code": "00060", + "statistic_id": "00011", + "time": "2026-05-19T15:30:00+00:00", + "value": "2110", + "unit_of_measure": "ft^3/s", + "approval_status": "Approved", + "qualifier": null, + "last_modified": "2026-05-19T15:40:00.000000+00:00" + } + }, + { + "type": "Feature", + "id": "synthetic-non-usgs-record", + "geometry": { + "type": "Point", + "coordinates": [-93.98775, 40.01822222222222] + }, + "properties": { + "id": "synthetic-non-usgs-record", + "time_series_id": "synth-mo005-ts-001", + "monitoring_location_id": "MO005-400105093591601", + "parameter_code": "00060", + "statistic_id": "00011", + "time": "2026-05-19T15:00:00+00:00", + "value": "12.5", + "unit_of_measure": "ft^3/s", + "approval_status": "Provisional", + "qualifier": null, + "last_modified": "2026-05-19T15:05:00.000000+00:00" + } + }, + { + "type": "Feature", + "id": "synthetic-no-dash-record", + "geometry": { + "type": "Point", + "coordinates": [-92.0, 42.0] + }, + "properties": { + "id": "synthetic-no-dash-record", + "time_series_id": "synth-nodash-ts", + "monitoring_location_id": "STANDALONE12345", + "parameter_code": "00060", + "statistic_id": "00011", + "time": "2026-05-19T14:45:00+00:00", + "value": "1.0", + "unit_of_measure": "ft^3/s", + "approval_status": "Provisional", + "qualifier": null, + "last_modified": "2026-05-19T14:50:00.000000+00:00" + } + } + ], + "numberReturned": 4, + "links": [ + { + "type": "application/json", + "rel": "self", + "title": "This document as JSON", + "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?f=json¶meter_code=00060&limit=1000" + }, + { + "type": "application/json", + "rel": "collection", + "title": "Parent collection", + "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous" + } + ], + "timeStamp": "2026-05-19T16:00:00.000000+00:00" +} diff --git a/tests/test_nwis.py b/tests/test_nwis.py new file mode 100644 index 0000000..a68d283 --- /dev/null +++ b/tests/test_nwis.py @@ -0,0 +1,278 @@ +"""Tests for USGS NWIS adapter (OGC API).""" + +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.config_models import AdapterConfig +from central.models import Event, Geo + +FIXTURE_PATH = Path(__file__).parent / "fixtures" / "nwis_latest_sample.json" + + +def _fixture_text() -> str: + return FIXTURE_PATH.read_text() + + +def _fixture_json() -> dict: + return json.loads(_fixture_text()) + + +def _config(settings: dict | None = None) -> AdapterConfig: + return AdapterConfig( + name="nwis", + enabled=True, + cadence_s=900, + settings=settings or {}, + updated_at=datetime.now(timezone.utc), + ) + + +class TestNWISHelpers: + def test_subject_decomposes_usgs(self): + """USGS-05420500 -> agency='usgs', bare='05420500'; subject endswith .00060.usgs.05420500.""" + from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id + + agency, bare = _subject_tokens_for_id("USGS-05420500") + assert agency == "usgs" + assert bare == "05420500" + + adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db")) + event = Event( + id="USGS-05420500:00060:2026-05-19T15:30:00+00:00", + adapter="nwis", + category=f"hydro.00060.{agency}.{bare}", + time=datetime(2026, 5, 19, 15, 30, tzinfo=timezone.utc), + severity=0, + geo=Geo(), + data={}, + ) + assert adapter.subject_for(event).endswith(".00060.usgs.05420500") + + def test_subject_decomposes_non_usgs(self): + """MO005-400105093591601 -> agency='mo005', bare='400105093591601'; subject reflects both.""" + from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id + + agency, bare = _subject_tokens_for_id("MO005-400105093591601") + assert agency == "mo005" + assert bare == "400105093591601" + + adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db")) + event = Event( + id="MO005-400105093591601:00060:t", + adapter="nwis", + category=f"hydro.00060.{agency}.{bare}", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(), + data={}, + ) + assert adapter.subject_for(event).endswith(".00060.mo005.400105093591601") + + def test_subject_unprefixed_id_falls_back(self): + """ID with no dash falls back to agency='unknown'.""" + from central.adapters.nwis import NWISAdapter, _subject_tokens_for_id + + bare_input = "STANDALONE12345" + agency, bare = _subject_tokens_for_id(bare_input) + assert agency == "unknown" + assert bare == bare_input.lower() + + adapter = NWISAdapter(_config(), MagicMock(), Path("/tmp/never_used.db")) + event = Event( + id="X", + adapter="nwis", + category=f"hydro.00060.{agency}.{bare}", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(), + data={}, + ) + subj = adapter.subject_for(event) + assert subj.endswith(f".00060.unknown.{bare}") + + def test_dedup_key_composite(self): + """Same id+param+time -> same key; different time -> different key.""" + from central.adapters.nwis import _dedup_key + + a = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00") + b = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:30:00+00:00") + c = _dedup_key("USGS-05420500", "00060", "2026-05-19T15:45:00+00:00") + assert a == b + assert a != c + # All three components are present in the key + assert "USGS-05420500" in a + assert "00060" in a + assert "2026-05-19T15:30:00+00:00" in a + + +class TestNWISSettings: + def test_parameter_codes_default_is_full_set(self): + """Default equals _DEFAULT_PARAMETER_CODES — no parallel literal anywhere.""" + from central.adapters.nwis import NWISSettings, _DEFAULT_PARAMETER_CODES + + assert NWISSettings().parameter_codes == _DEFAULT_PARAMETER_CODES + + +class TestNWISAdapter: + def test_class_attrs_complete(self): + from central.adapters.nwis import NWISAdapter, NWISSettings + + assert NWISAdapter.name == "nwis" + assert isinstance(NWISAdapter.display_name, str) and NWISAdapter.display_name + assert isinstance(NWISAdapter.description, str) and NWISAdapter.description + assert NWISAdapter.settings_schema is NWISSettings + assert NWISAdapter.requires_api_key is None + assert NWISAdapter.api_key_field is None + assert NWISAdapter.wizard_order is None + assert NWISAdapter.default_cadence_s == 900 + + @pytest.mark.asyncio + async def test_lonlat_coordinate_order(self, tmp_path: Path): + """Upstream coords [lon, lat] -> Geo.centroid=(lon, lat); no axis swap.""" + from central.adapters.nwis import NWISAdapter + + fix = _fixture_json() + src = fix["features"][0] + lon_in, lat_in = src["geometry"]["coordinates"] + # Sanity: fixture event 0 is in western/northern hemisphere. + assert lon_in < 0 + assert 0 < lat_in < 90 + + adapter = NWISAdapter( + _config({"parameter_codes": [src["properties"]["parameter_code"]]}), + MagicMock(), + tmp_path / "cursors.db", + ) + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + target_site = src["properties"]["monitoring_location_id"] + emitted = next(e for e in events if e.data["monitoring_location_id"] == target_site) + assert emitted.geo.centroid == (lon_in, lat_in) + + @pytest.mark.asyncio + async def test_parameter_allowlist_filters(self, tmp_path: Path): + """parameter_codes setting filters which queries we issue per poll. + + With one allowed code, _fetch should be called exactly once + (per page; fixture has no next link → one call total). + """ + from central.adapters.nwis import NWISAdapter + + fix = _fixture_json() + target = fix["features"][0]["properties"]["parameter_code"] + + adapter = NWISAdapter( + _config({"parameter_codes": [target]}), + MagicMock(), + tmp_path / "cursors.db", + ) + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert events + # All emitted events carry the only allowed parameter_code. + for e in events: + assert e.data["parameter_code"] == target + # Per-page fetch invoked once per parameter_code in the allowlist. + assert adapter._fetch.await_count == 1 + + @pytest.mark.asyncio + async def test_pagination_follows_next_link(self, tmp_path: Path): + """Adapter follows OGC 'next' link until absent.""" + from central.adapters.nwis import NWISAdapter + + fix = _fixture_json() + target = fix["features"][0]["properties"]["parameter_code"] + + page1 = { + "type": "FeatureCollection", + "features": fix["features"][:2], + "numberReturned": 2, + "links": [ + {"rel": "next", "href": "https://api.waterdata.usgs.gov/ogcapi/v0/collections/latest-continuous/items?cursor=PAGE2"} + ], + } + page2 = { + "type": "FeatureCollection", + "features": fix["features"][2:], + "numberReturned": len(fix["features"]) - 2, + "links": [], + } + + adapter = NWISAdapter( + _config({"parameter_codes": [target]}), + MagicMock(), + tmp_path / "cursors.db", + ) + adapter._fetch = AsyncMock(side_effect=[json.dumps(page1), json.dumps(page2)]) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + # Both pages drained, fetch called exactly twice. + assert adapter._fetch.await_count == 2 + # Events from both pages emitted. + sites = {e.data["monitoring_location_id"] for e in events} + expected_sites = {f["properties"]["monitoring_location_id"] for f in fix["features"]} + assert sites == expected_sites + + @pytest.mark.asyncio + async def test_no_removal_pattern(self, tmp_path: Path): + """Sites missing from a later poll do NOT emit a .removed event in v1.""" + from central.adapters.nwis import NWISAdapter + + fix = _fixture_json() + target = fix["features"][0]["properties"]["parameter_code"] + + adapter = NWISAdapter( + _config({"parameter_codes": [target]}), + MagicMock(), + tmp_path / "cursors.db", + ) + # First poll: full fixture. + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + assert first_pass + + # Second poll: empty result (every previously-seen site is now missing). + empty_page = {"type": "FeatureCollection", "features": [], "links": []} + adapter._fetch = AsyncMock(return_value=json.dumps(empty_page)) + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + # No removal events of any flavor. + assert all(".removed" not in e.category for e in second_pass) + assert second_pass == [] + + @pytest.mark.asyncio + async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path): + """Second poll with identical fixture yields no new events (composite dedup hits).""" + from central.adapters.nwis import NWISAdapter + + fix = _fixture_json() + target = fix["features"][0]["properties"]["parameter_code"] + + adapter = NWISAdapter( + _config({"parameter_codes": [target]}), + MagicMock(), + tmp_path / "cursors.db", + ) + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert first_pass + assert second_pass == []