diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py new file mode 100644 index 0000000..b908323 --- /dev/null +++ b/src/central/adapters/usgs_quake.py @@ -0,0 +1,400 @@ +"""USGS Earthquake Hazards Program adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from shapely.geometry import Point, box as shapely_box +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential_jitter, + retry_if_exception_type, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +# USGS GeoJSON feed base URL +USGS_FEED_BASE = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary" + +# Valid feed options +VALID_FEEDS = {"all_hour", "all_day", "all_week", "all_month"} + + +def magnitude_tier(mag: float) -> str: + """Classify magnitude into USGS-style tier.""" + if mag < 3.0: + return "minor" + if mag < 4.0: + return "light" + if mag < 5.0: + return "moderate" + if mag < 6.0: + return "strong" + if mag < 7.0: + return "major" + return "great" + + +def magnitude_to_severity(mag: float) -> int: + """Map magnitude to severity level (0-5).""" + if mag < 3.0: + return 0 + if mag < 4.0: + return 1 + if mag < 5.0: + return 2 + if mag < 6.0: + return 3 + if mag < 7.0: + return 4 + return 5 + + +class USGSQuakeAdapter(SourceAdapter): + """USGS Earthquake Hazards Program adapter.""" + + name = "usgs_quake" + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, # Unused, accepted for signature uniformity + cursor_db_path: Path, + ) -> None: + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + # Extract settings from config + self._feed: str = config.settings.get("feed", "all_hour") + if self._feed not in VALID_FEEDS: + logger.warning( + "Invalid feed setting, using all_hour", + extra={"feed": self._feed, "valid": list(VALID_FEEDS)}, + ) + self._feed = "all_hour" + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + self._region_box = shapely_box( + self.region.west, + self.region.south, + self.region.east, + self.region.north, + ) + else: + self.region = None + self._region_box = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + # Update feed + new_feed = new_config.settings.get("feed", "all_hour") + if new_feed in VALID_FEEDS: + self._feed = new_feed + else: + logger.warning( + "Invalid feed in new config, keeping current", + extra={"new_feed": new_feed, "current": self._feed}, + ) + + # Update region + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + self._region_box = shapely_box( + self.region.west, + self.region.south, + self.region.east, + self.region.north, + ) + else: + self.region = None + self._region_box = None + + logger.info( + "USGS quake config applied", + extra={ + "region": region_dict, + "feed": self._feed, + }, + ) + + async def startup(self) -> None: + """Initialize HTTP session and dedup tracker.""" + # Initialize HTTP session + self._session = aiohttp.ClientSession( + headers={"User-Agent": "Central/1.0 (earthquake monitoring)"}, + timeout=aiohttp.ClientTimeout(total=30), + ) + + # Initialize dedup tracker (shared sqlite DB) + self._db = sqlite3.connect(str(self._cursor_db_path)) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + + # Sweep old entries on startup (7 days for quakes) + self.sweep_old_ids() + + logger.info( + "USGS quake adapter started", + extra={ + "region": { + "north": self.region.north, + "south": self.region.south, + "east": self.region.east, + "west": self.region.west, + } if self.region else None, + "feed": self._feed, + }, + ) + + async def shutdown(self) -> None: + """Close HTTP session and database.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("USGS quake adapter shut down") + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 7 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-7 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("USGS quake swept old dedup entries", extra={"count": count}) + return count + + def _build_url(self) -> str: + """Build USGS GeoJSON feed URL.""" + return f"{USGS_FEED_BASE}/{self._feed}.geojson" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=15), + retry=retry_if_exception_type((aiohttp.ClientError,)), + reraise=True, + ) + async def _fetch_geojson(self) -> dict[str, Any]: + """Fetch GeoJSON data from USGS.""" + if not self._session: + raise RuntimeError("Session not initialized") + + url = self._build_url() + async with self._session.get(url) as resp: + resp.raise_for_status() + return await resp.json() + + def _point_in_region(self, lon: float, lat: float) -> bool: + """Check if point intersects region bbox using shapely.""" + if self._region_box is None: + return True + point = Point(lon, lat) + return self._region_box.intersects(point) + + def _feature_to_event(self, feature: dict[str, Any]) -> Event | None: + """Convert a GeoJSON feature to an Event.""" + props = feature.get("properties", {}) + geometry = feature.get("geometry", {}) + coords = geometry.get("coordinates", []) + + # Validate required fields + event_id = feature.get("id") + if not event_id: + logger.warning("Feature missing id", extra={"properties": props}) + return None + + # Get magnitude - skip if null/missing (PM decision) + mag = props.get("mag") + if mag is None: + logger.debug( + "Skipping event with null magnitude", + extra={"id": event_id, "place": props.get("place")}, + ) + return None + + try: + mag = float(mag) + except (TypeError, ValueError): + logger.warning( + "Invalid magnitude value", + extra={"id": event_id, "mag": mag}, + ) + return None + + # Get coordinates [lon, lat, depth] + if len(coords) < 2: + logger.warning("Feature missing coordinates", extra={"id": event_id}) + return None + + lon, lat = coords[0], coords[1] + depth = coords[2] if len(coords) > 2 else None + + # Region filter + if not self._point_in_region(lon, lat): + return None + + # Parse event time (milliseconds since epoch) + time_ms = props.get("time") + if time_ms is not None: + try: + event_time = datetime.fromtimestamp(time_ms / 1000, tz=timezone.utc) + except (TypeError, ValueError, OSError): + event_time = datetime.now(timezone.utc) + else: + event_time = datetime.now(timezone.utc) + + # Build tier and severity + tier = magnitude_tier(mag) + severity = magnitude_to_severity(mag) + + # Build geo + geo = Geo( + centroid=(lon, lat), + bbox=(lon, lat, lon, lat), + regions=[], + primary_region=None, + ) + + # Build data payload + data = { + "magnitude": mag, + "place": props.get("place"), + "time_ms": time_ms, + "updated_ms": props.get("updated"), + "tz": props.get("tz"), + "url": props.get("url"), + "detail": props.get("detail"), + "felt": props.get("felt"), + "cdi": props.get("cdi"), + "mmi": props.get("mmi"), + "alert": props.get("alert"), + "status": props.get("status"), + "tsunami": props.get("tsunami"), + "sig": props.get("sig"), + "net": props.get("net"), + "code": props.get("code"), + "ids": props.get("ids"), + "sources": props.get("sources"), + "types": props.get("types"), + "nst": props.get("nst"), + "dmin": props.get("dmin"), + "rms": props.get("rms"), + "gap": props.get("gap"), + "magType": props.get("magType"), + "type": props.get("type"), + "title": props.get("title"), + "longitude": lon, + "latitude": lat, + "depth": depth, + } + + return Event( + id=event_id, + source="central/adapters/usgs_quake", + category=f"quake.event.{tier}", + time=event_time, + expires=None, + severity=severity, + geo=geo, + data=data, + ) + + async def poll(self) -> AsyncIterator[Event]: + """Poll USGS for earthquake data.""" + if not self.region: + logger.warning("USGS quake region not configured, skipping poll") + return + + # Sweep old dedup entries periodically + self.sweep_old_ids() + + try: + data = await self._fetch_geojson() + except Exception as e: + logger.error("Failed to fetch USGS data", extra={"error": str(e)}) + raise + + features = data.get("features", []) + metadata = data.get("metadata", {}) + + logger.info( + "USGS quake poll completed", + extra={ + "feature_count": len(features), + "title": metadata.get("title"), + "generated": metadata.get("generated"), + }, + ) + + new_count = 0 + for feature in features: + event = self._feature_to_event(feature) + if event is None: + continue + + if self.is_published(event.id): + continue + + yield event + self.mark_published(event.id) + new_count += 1 + + logger.info("USGS quake yielded events", extra={"count": new_count}) diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py new file mode 100644 index 0000000..6be5f78 --- /dev/null +++ b/tests/test_usgs_quake.py @@ -0,0 +1,482 @@ +"""Tests for USGS earthquake adapter.""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch +from pathlib import Path +import tempfile + +from central.adapters.usgs_quake import ( + USGSQuakeAdapter, + magnitude_tier, + magnitude_to_severity, +) +from central.config_models import AdapterConfig, RegionConfig +from central.models import Event, Geo + + +# Sample USGS GeoJSON response +SAMPLE_GEOJSON = { + "type": "FeatureCollection", + "metadata": { + "generated": 1715878800000, + "url": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson", + "title": "USGS All Earthquakes, Past Hour", + "status": 200, + "api": "1.10.3", + "count": 3 + }, + "features": [ + { + "type": "Feature", + "properties": { + "mag": 2.5, + "place": "10km N of Boise, Idaho", + "time": 1715878500000, + "updated": 1715878600000, + "tz": None, + "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us1234", + "detail": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/us1234.geojson", + "felt": None, + "cdi": None, + "mmi": None, + "alert": None, + "status": "automatic", + "tsunami": 0, + "sig": 100, + "net": "us", + "code": "1234", + "ids": ",us1234,", + "sources": ",us,", + "types": ",origin,", + "nst": 10, + "dmin": 0.5, + "rms": 0.3, + "gap": 100, + "magType": "ml", + "type": "earthquake", + "title": "M 2.5 - 10km N of Boise, Idaho" + }, + "geometry": { + "type": "Point", + "coordinates": [-116.2, 43.7, 10.5] + }, + "id": "us1234" + }, + { + "type": "Feature", + "properties": { + "mag": 4.5, + "place": "20km S of Portland, Oregon", + "time": 1715878400000, + "updated": 1715878500000, + "tz": None, + "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us5678", + "detail": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/us5678.geojson", + "felt": 50, + "cdi": 4.0, + "mmi": 3.5, + "alert": "green", + "status": "reviewed", + "tsunami": 0, + "sig": 300, + "net": "us", + "code": "5678", + "ids": ",us5678,", + "sources": ",us,", + "types": ",origin,shakemap,", + "nst": 25, + "dmin": 0.2, + "rms": 0.2, + "gap": 50, + "magType": "mw", + "type": "earthquake", + "title": "M 4.5 - 20km S of Portland, Oregon" + }, + "geometry": { + "type": "Point", + "coordinates": [-122.6, 45.3, 15.0] + }, + "id": "us5678" + }, + { + "type": "Feature", + "properties": { + "mag": 3.0, + "place": "50km E of San Francisco, California", + "time": 1715878300000, + "updated": 1715878400000, + "tz": None, + "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us9999", + "detail": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/us9999.geojson", + "felt": None, + "cdi": None, + "mmi": None, + "alert": None, + "status": "automatic", + "tsunami": 0, + "sig": 150, + "net": "us", + "code": "9999", + "ids": ",us9999,", + "sources": ",us,", + "types": ",origin,", + "nst": 15, + "dmin": 0.3, + "rms": 0.25, + "gap": 80, + "magType": "ml", + "type": "earthquake", + "title": "M 3.0 - 50km E of San Francisco, California" + }, + "geometry": { + "type": "Point", + "coordinates": [-121.5, 37.8, 8.0] + }, + "id": "us9999" + } + ] +} + +# Sample with null magnitude +SAMPLE_NULL_MAG = { + "type": "FeatureCollection", + "metadata": {"count": 1}, + "features": [ + { + "type": "Feature", + "properties": { + "mag": None, + "place": "Quarry blast", + "time": 1715878500000, + "type": "quarry blast" + }, + "geometry": { + "type": "Point", + "coordinates": [-116.0, 44.0, 0.0] + }, + "id": "usquarry1" + } + ] +} + + +def make_adapter_config( + region: dict | None = None, + feed: str = "all_hour", +) -> AdapterConfig: + """Create an AdapterConfig for testing.""" + settings = {"feed": feed} + if region: + settings["region"] = region + else: + settings["region"] = { + "north": 49.5, + "south": 40.0, + "east": -110.0, + "west": -125.0, + } + + return AdapterConfig( + name="usgs_quake", + enabled=True, + cadence_s=60, + settings=settings, + updated_at=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def temp_db_path(): + """Create a temporary database path for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + yield Path(f.name) + + +@pytest.fixture +def mock_config_store(): + """Create a mock ConfigStore.""" + return MagicMock() + + +class TestMagnitudeTier: + """Test magnitude tier classification.""" + + def test_minor(self): + assert magnitude_tier(0.5) == "minor" + assert magnitude_tier(2.9) == "minor" + + def test_light(self): + assert magnitude_tier(3.0) == "light" + assert magnitude_tier(3.9) == "light" + + def test_moderate(self): + assert magnitude_tier(4.0) == "moderate" + assert magnitude_tier(4.9) == "moderate" + + def test_strong(self): + assert magnitude_tier(5.0) == "strong" + assert magnitude_tier(5.9) == "strong" + + def test_major(self): + assert magnitude_tier(6.0) == "major" + assert magnitude_tier(6.9) == "major" + + def test_great(self): + assert magnitude_tier(7.0) == "great" + assert magnitude_tier(9.5) == "great" + + +class TestMagnitudeToSeverity: + """Test magnitude to severity mapping.""" + + def test_severity_levels(self): + assert magnitude_to_severity(2.0) == 0 + assert magnitude_to_severity(3.5) == 1 + assert magnitude_to_severity(4.5) == 2 + assert magnitude_to_severity(5.5) == 3 + assert magnitude_to_severity(6.5) == 4 + assert magnitude_to_severity(7.5) == 5 + + +class TestRegionFiltering: + """Test region/bbox filtering.""" + + @pytest.mark.asyncio + async def test_filters_out_of_bbox(self, temp_db_path, mock_config_store): + """Test that quakes outside bbox are filtered.""" + # Region covers PNW only (north of 40, west of -110) + config = make_adapter_config( + region={"north": 49.5, "south": 40.0, "east": -110.0, "west": -125.0} + ) + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + # us1234 (Boise) and us5678 (Portland) are in bbox + # us9999 (SF, lat 37.8) is outside bbox (south < 40) + assert len(events) == 2 + event_ids = {e.id for e in events} + assert "us1234" in event_ids + assert "us5678" in event_ids + assert "us9999" not in event_ids + + await adapter.shutdown() + + +class TestDeduplication: + """Test deduplication logic.""" + + @pytest.mark.asyncio + async def test_dedup_marks_published(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + event_id = "us1234" + + assert not adapter.is_published(event_id) + adapter.mark_published(event_id) + assert adapter.is_published(event_id) + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_second_poll_no_duplicates(self, temp_db_path, mock_config_store): + """Test that second poll with same events yields nothing.""" + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + # First poll + events1 = [] + async for event in adapter.poll(): + events1.append(event) + + # Second poll - same data + events2 = [] + async for event in adapter.poll(): + events2.append(event) + + # First poll should have events (2 in bbox) + assert len(events1) == 2 + # Second poll should have 0 (all deduped) + assert len(events2) == 0 + + await adapter.shutdown() + + +class TestNullMagnitude: + """Test handling of null magnitude events.""" + + @pytest.mark.asyncio + async def test_skips_null_magnitude(self, temp_db_path, mock_config_store): + """Test that events with null magnitude are skipped.""" + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_NULL_MAG + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Should skip the null-magnitude event + assert len(events) == 0 + + await adapter.shutdown() + + +class TestEventGeneration: + """Test Event generation from features.""" + + @pytest.mark.asyncio + async def test_event_category(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Check categories + categories = {e.category for e in events} + # us1234 is M2.5 -> minor, us5678 is M4.5 -> moderate + assert "quake.event.minor" in categories + assert "quake.event.moderate" in categories + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_event_severity(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Find events by ID + events_by_id = {e.id: e for e in events} + + # M2.5 -> severity 0 + assert events_by_id["us1234"].severity == 0 + # M4.5 -> severity 2 + assert events_by_id["us5678"].severity == 2 + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_event_geo(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + events_by_id = {e.id: e for e in events} + + # Check Boise quake coordinates + boise = events_by_id["us1234"] + assert boise.geo.centroid == (-116.2, 43.7) + + await adapter.shutdown() + + +class TestApplyConfig: + """Test hot-reload configuration application.""" + + @pytest.mark.asyncio + async def test_apply_config_updates_region(self, temp_db_path, mock_config_store): + config = make_adapter_config( + region={"north": 49.5, "south": 40.0, "east": -110.0, "west": -125.0} + ) + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + assert adapter.region.north == 49.5 + + new_config = make_adapter_config( + region={"north": 48.0, "south": 45.0, "east": -115.0, "west": -125.0} + ) + await adapter.apply_config(new_config) + + assert adapter.region.north == 48.0 + assert adapter.region.south == 45.0 + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_apply_config_updates_feed(self, temp_db_path, mock_config_store): + config = make_adapter_config(feed="all_hour") + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + assert adapter._feed == "all_hour" + + new_config = make_adapter_config(feed="all_day") + await adapter.apply_config(new_config) + + assert adapter._feed == "all_day" + + await adapter.shutdown()