diff --git a/docs/PHASE-1B-NOTES.md b/docs/PHASE-1B-NOTES.md index 878c99f..fbdf93b 100644 --- a/docs/PHASE-1B-NOTES.md +++ b/docs/PHASE-1B-NOTES.md @@ -91,3 +91,22 @@ Per stream, display: - GUI should display warning banner when SNPP is enabled and date approaches - Recommend adding NOAA-21 to satellites list before SNPP EOL - After EOL, adapter will fail to fetch SNPP data (404); GUI should surface this + +## USGS Quake Adapter Configuration + +### Feed Selection +- Default: all_hour (updated every minute, low latency) +- Options: all_hour, all_day, all_week, all_month +- Operators can switch to all_day for less frequent polling with broader window +- Stored in config.adapters.settings.feed + +### Magnitude Tier Color Coding +For GUI display of earthquake events: +- **minor** (M < 3.0): Gray +- **light** (3.0-3.9): Yellow +- **moderate** (4.0-4.9): Orange +- **strong** (5.0-5.9): Red +- **major** (6.0-6.9): Dark Red +- **great** (M >= 7.0): Purple + +Colors follow USGS conventions for earthquake hazard communication. diff --git a/sql/migrations/006_add_usgs_quake_adapter.sql b/sql/migrations/006_add_usgs_quake_adapter.sql new file mode 100644 index 0000000..3bd1f35 --- /dev/null +++ b/sql/migrations/006_add_usgs_quake_adapter.sql @@ -0,0 +1,23 @@ +-- Migration: 006_add_usgs_quake_adapter +-- Seeds USGS earthquake adapter configuration and CENTRAL_QUAKE stream. + +-- Seed USGS quake adapter row +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'usgs_quake', + true, + 60, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.5, + 'south', 31.0, + 'east', -102.0, + 'west', -124.5 + ), + 'feed', 'all_hour' + ) +); + +-- Seed CENTRAL_QUAKE stream row (7d retention) +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_QUAKE', 604800, 1073741824); diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py new file mode 100644 index 0000000..b908323 --- /dev/null +++ b/src/central/adapters/usgs_quake.py @@ -0,0 +1,400 @@ +"""USGS Earthquake Hazards Program adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from shapely.geometry import Point, box as shapely_box +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential_jitter, + retry_if_exception_type, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +# USGS GeoJSON feed base URL +USGS_FEED_BASE = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary" + +# Valid feed options +VALID_FEEDS = {"all_hour", "all_day", "all_week", "all_month"} + + +def magnitude_tier(mag: float) -> str: + """Classify magnitude into USGS-style tier.""" + if mag < 3.0: + return "minor" + if mag < 4.0: + return "light" + if mag < 5.0: + return "moderate" + if mag < 6.0: + return "strong" + if mag < 7.0: + return "major" + return "great" + + +def magnitude_to_severity(mag: float) -> int: + """Map magnitude to severity level (0-5).""" + if mag < 3.0: + return 0 + if mag < 4.0: + return 1 + if mag < 5.0: + return 2 + if mag < 6.0: + return 3 + if mag < 7.0: + return 4 + return 5 + + +class USGSQuakeAdapter(SourceAdapter): + """USGS Earthquake Hazards Program adapter.""" + + name = "usgs_quake" + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, # Unused, accepted for signature uniformity + cursor_db_path: Path, + ) -> None: + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + # Extract settings from config + self._feed: str = config.settings.get("feed", "all_hour") + if self._feed not in VALID_FEEDS: + logger.warning( + "Invalid feed setting, using all_hour", + extra={"feed": self._feed, "valid": list(VALID_FEEDS)}, + ) + self._feed = "all_hour" + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + self._region_box = shapely_box( + self.region.west, + self.region.south, + self.region.east, + self.region.north, + ) + else: + self.region = None + self._region_box = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + # Update feed + new_feed = new_config.settings.get("feed", "all_hour") + if new_feed in VALID_FEEDS: + self._feed = new_feed + else: + logger.warning( + "Invalid feed in new config, keeping current", + extra={"new_feed": new_feed, "current": self._feed}, + ) + + # Update region + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + self._region_box = shapely_box( + self.region.west, + self.region.south, + self.region.east, + self.region.north, + ) + else: + self.region = None + self._region_box = None + + logger.info( + "USGS quake config applied", + extra={ + "region": region_dict, + "feed": self._feed, + }, + ) + + async def startup(self) -> None: + """Initialize HTTP session and dedup tracker.""" + # Initialize HTTP session + self._session = aiohttp.ClientSession( + headers={"User-Agent": "Central/1.0 (earthquake monitoring)"}, + timeout=aiohttp.ClientTimeout(total=30), + ) + + # Initialize dedup tracker (shared sqlite DB) + self._db = sqlite3.connect(str(self._cursor_db_path)) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + + # Sweep old entries on startup (7 days for quakes) + self.sweep_old_ids() + + logger.info( + "USGS quake adapter started", + extra={ + "region": { + "north": self.region.north, + "south": self.region.south, + "east": self.region.east, + "west": self.region.west, + } if self.region else None, + "feed": self._feed, + }, + ) + + async def shutdown(self) -> None: + """Close HTTP session and database.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("USGS quake adapter shut down") + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 7 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-7 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("USGS quake swept old dedup entries", extra={"count": count}) + return count + + def _build_url(self) -> str: + """Build USGS GeoJSON feed URL.""" + return f"{USGS_FEED_BASE}/{self._feed}.geojson" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=15), + retry=retry_if_exception_type((aiohttp.ClientError,)), + reraise=True, + ) + async def _fetch_geojson(self) -> dict[str, Any]: + """Fetch GeoJSON data from USGS.""" + if not self._session: + raise RuntimeError("Session not initialized") + + url = self._build_url() + async with self._session.get(url) as resp: + resp.raise_for_status() + return await resp.json() + + def _point_in_region(self, lon: float, lat: float) -> bool: + """Check if point intersects region bbox using shapely.""" + if self._region_box is None: + return True + point = Point(lon, lat) + return self._region_box.intersects(point) + + def _feature_to_event(self, feature: dict[str, Any]) -> Event | None: + """Convert a GeoJSON feature to an Event.""" + props = feature.get("properties", {}) + geometry = feature.get("geometry", {}) + coords = geometry.get("coordinates", []) + + # Validate required fields + event_id = feature.get("id") + if not event_id: + logger.warning("Feature missing id", extra={"properties": props}) + return None + + # Get magnitude - skip if null/missing (PM decision) + mag = props.get("mag") + if mag is None: + logger.debug( + "Skipping event with null magnitude", + extra={"id": event_id, "place": props.get("place")}, + ) + return None + + try: + mag = float(mag) + except (TypeError, ValueError): + logger.warning( + "Invalid magnitude value", + extra={"id": event_id, "mag": mag}, + ) + return None + + # Get coordinates [lon, lat, depth] + if len(coords) < 2: + logger.warning("Feature missing coordinates", extra={"id": event_id}) + return None + + lon, lat = coords[0], coords[1] + depth = coords[2] if len(coords) > 2 else None + + # Region filter + if not self._point_in_region(lon, lat): + return None + + # Parse event time (milliseconds since epoch) + time_ms = props.get("time") + if time_ms is not None: + try: + event_time = datetime.fromtimestamp(time_ms / 1000, tz=timezone.utc) + except (TypeError, ValueError, OSError): + event_time = datetime.now(timezone.utc) + else: + event_time = datetime.now(timezone.utc) + + # Build tier and severity + tier = magnitude_tier(mag) + severity = magnitude_to_severity(mag) + + # Build geo + geo = Geo( + centroid=(lon, lat), + bbox=(lon, lat, lon, lat), + regions=[], + primary_region=None, + ) + + # Build data payload + data = { + "magnitude": mag, + "place": props.get("place"), + "time_ms": time_ms, + "updated_ms": props.get("updated"), + "tz": props.get("tz"), + "url": props.get("url"), + "detail": props.get("detail"), + "felt": props.get("felt"), + "cdi": props.get("cdi"), + "mmi": props.get("mmi"), + "alert": props.get("alert"), + "status": props.get("status"), + "tsunami": props.get("tsunami"), + "sig": props.get("sig"), + "net": props.get("net"), + "code": props.get("code"), + "ids": props.get("ids"), + "sources": props.get("sources"), + "types": props.get("types"), + "nst": props.get("nst"), + "dmin": props.get("dmin"), + "rms": props.get("rms"), + "gap": props.get("gap"), + "magType": props.get("magType"), + "type": props.get("type"), + "title": props.get("title"), + "longitude": lon, + "latitude": lat, + "depth": depth, + } + + return Event( + id=event_id, + source="central/adapters/usgs_quake", + category=f"quake.event.{tier}", + time=event_time, + expires=None, + severity=severity, + geo=geo, + data=data, + ) + + async def poll(self) -> AsyncIterator[Event]: + """Poll USGS for earthquake data.""" + if not self.region: + logger.warning("USGS quake region not configured, skipping poll") + return + + # Sweep old dedup entries periodically + self.sweep_old_ids() + + try: + data = await self._fetch_geojson() + except Exception as e: + logger.error("Failed to fetch USGS data", extra={"error": str(e)}) + raise + + features = data.get("features", []) + metadata = data.get("metadata", {}) + + logger.info( + "USGS quake poll completed", + extra={ + "feature_count": len(features), + "title": metadata.get("title"), + "generated": metadata.get("generated"), + }, + ) + + new_count = 0 + for feature in features: + event = self._feature_to_event(feature) + if event is None: + continue + + if self.is_published(event.id): + continue + + yield event + self.mark_published(event.id) + new_count += 1 + + logger.info("USGS quake yielded events", extra={"count": new_count}) diff --git a/src/central/models.py b/src/central/models.py index 9671202..5bc00df 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -1,79 +1,87 @@ -"""Data models for Central event processing.""" - -from datetime import datetime -from typing import Any - -from pydantic import BaseModel, ConfigDict - - -class Geo(BaseModel): - """Geographic context for an event.""" - - model_config = ConfigDict(extra="forbid", frozen=True) - - centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order - bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) - regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] - primary_region: str | None = None # alphabetically first region, used for subject - - -class Event(BaseModel): - """Canonical event representation for all adapters.""" - - model_config = ConfigDict(extra="forbid", frozen=True) - - id: str # unique, stable across republish - source: str # adapter identity, e.g. "central/adapters/nws" - category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high" - time: datetime # event-time UTC, not processing-time - expires: datetime | None = None - severity: int | None = None # 0..4 or None for "Unknown" - geo: Geo - data: dict[str, Any] # adapter-specific payload - - -def subject_for_event(ev: Event) -> str: - """ - Compute the NATS subject for an event based on its category. - - Dispatch by category prefix: - - fire.*: returns central. directly - - wx.*: uses weather alert subject logic - - Weather alert subjects: - central.wx.alert.us..county. - or - central.wx.alert.us..zone. - based on whether the primary_region encodes a county or a zone. - - Fire hotspot subjects: - central.fire.hotspot.. - """ - # Fire events: subject is just central. - if ev.category.startswith("fire."): - return f"central.{ev.category}" - - # Weather events: use geo-based subject logic - prefix = "central.wx" - - if ev.geo.primary_region is None: - return f"{prefix}.alert.us.unknown" - - region = ev.geo.primary_region - - # Parse US-- format - # County codes are like "Ada", "Canyon" (names) - # Zone codes start with "Z" like "Z033" - parts = region.split("-") - if len(parts) < 3 or parts[0] != "US": - return f"{prefix}.alert.us.unknown" - - state = parts[1].lower() - code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" - - if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): - # Zone code like Z033 - return f"{prefix}.alert.us.{state}.zone.{code.lower()}" - else: - # County name - return f"{prefix}.alert.us.{state}.county.{code.lower()}" +"""Data models for Central event processing.""" + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict + + +class Geo(BaseModel): + """Geographic context for an event.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order + bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) + regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] + primary_region: str | None = None # alphabetically first region, used for subject + + +class Event(BaseModel): + """Canonical event representation for all adapters.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + id: str # unique, stable across republish + source: str # adapter identity, e.g. "central/adapters/nws" + category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high" + time: datetime # event-time UTC, not processing-time + expires: datetime | None = None + severity: int | None = None # 0..4 or None for "Unknown" + geo: Geo + data: dict[str, Any] # adapter-specific payload + + +def subject_for_event(ev: Event) -> str: + """ + Compute the NATS subject for an event based on its category. + + Dispatch by category prefix: + - fire.*: returns central. directly + - quake.*: returns central. directly + - wx.*: uses weather alert subject logic + + Weather alert subjects: + central.wx.alert.us..county. + or + central.wx.alert.us..zone. + based on whether the primary_region encodes a county or a zone. + + Fire hotspot subjects: + central.fire.hotspot.. + + Quake event subjects: + central.quake.event. + """ + # Fire events: subject is just central. + if ev.category.startswith("fire."): + return f"central.{ev.category}" + + # Quake events: subject is just central. + if ev.category.startswith("quake."): + return f"central.{ev.category}" + + # Weather events: use geo-based subject logic + prefix = "central.wx" + + if ev.geo.primary_region is None: + return f"{prefix}.alert.us.unknown" + + region = ev.geo.primary_region + + # Parse US-- format + # County codes are like "Ada", "Canyon" (names) + # Zone codes start with "Z" like "Z033" + parts = region.split("-") + if len(parts) < 3 or parts[0] != "US": + return f"{prefix}.alert.us.unknown" + + state = parts[1].lower() + code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" + + if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): + # Zone code like Z033 + return f"{prefix}.alert.us.{state}.zone.{code.lower()}" + else: + # County name + return f"{prefix}.alert.us.{state}.county.{code.lower()}" diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 7ce0daf..652ce44 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -16,6 +16,7 @@ from nats.js import JetStreamContext from central.adapter import SourceAdapter from central.adapters.nws import NWSAdapter from central.adapters.firms import FIRMSAdapter +from central.adapters.usgs_quake import USGSQuakeAdapter from central.cloudevents_wire import wrap_event from central.config_models import AdapterConfig from central.config_source import ConfigSource, DbConfigSource @@ -28,6 +29,7 @@ from central.stream_manager import StreamManager _ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { "nws": NWSAdapter, "firms": FIRMSAdapter, + "usgs_quake": USGSQuakeAdapter, } CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") @@ -37,6 +39,7 @@ STREAM_SUBJECTS = { "CENTRAL_WX": ["central.wx.>"], "CENTRAL_META": ["central.meta.>"], "CENTRAL_FIRE": ["central.fire.>"], + "CENTRAL_QUAKE": ["central.quake.>"], } # Recompute interval for stream max_bytes (1 hour) diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py new file mode 100644 index 0000000..6be5f78 --- /dev/null +++ b/tests/test_usgs_quake.py @@ -0,0 +1,482 @@ +"""Tests for USGS earthquake adapter.""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch +from pathlib import Path +import tempfile + +from central.adapters.usgs_quake import ( + USGSQuakeAdapter, + magnitude_tier, + magnitude_to_severity, +) +from central.config_models import AdapterConfig, RegionConfig +from central.models import Event, Geo + + +# Sample USGS GeoJSON response +SAMPLE_GEOJSON = { + "type": "FeatureCollection", + "metadata": { + "generated": 1715878800000, + "url": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson", + "title": "USGS All Earthquakes, Past Hour", + "status": 200, + "api": "1.10.3", + "count": 3 + }, + "features": [ + { + "type": "Feature", + "properties": { + "mag": 2.5, + "place": "10km N of Boise, Idaho", + "time": 1715878500000, + "updated": 1715878600000, + "tz": None, + "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us1234", + "detail": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/us1234.geojson", + "felt": None, + "cdi": None, + "mmi": None, + "alert": None, + "status": "automatic", + "tsunami": 0, + "sig": 100, + "net": "us", + "code": "1234", + "ids": ",us1234,", + "sources": ",us,", + "types": ",origin,", + "nst": 10, + "dmin": 0.5, + "rms": 0.3, + "gap": 100, + "magType": "ml", + "type": "earthquake", + "title": "M 2.5 - 10km N of Boise, Idaho" + }, + "geometry": { + "type": "Point", + "coordinates": [-116.2, 43.7, 10.5] + }, + "id": "us1234" + }, + { + "type": "Feature", + "properties": { + "mag": 4.5, + "place": "20km S of Portland, Oregon", + "time": 1715878400000, + "updated": 1715878500000, + "tz": None, + "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us5678", + "detail": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/us5678.geojson", + "felt": 50, + "cdi": 4.0, + "mmi": 3.5, + "alert": "green", + "status": "reviewed", + "tsunami": 0, + "sig": 300, + "net": "us", + "code": "5678", + "ids": ",us5678,", + "sources": ",us,", + "types": ",origin,shakemap,", + "nst": 25, + "dmin": 0.2, + "rms": 0.2, + "gap": 50, + "magType": "mw", + "type": "earthquake", + "title": "M 4.5 - 20km S of Portland, Oregon" + }, + "geometry": { + "type": "Point", + "coordinates": [-122.6, 45.3, 15.0] + }, + "id": "us5678" + }, + { + "type": "Feature", + "properties": { + "mag": 3.0, + "place": "50km E of San Francisco, California", + "time": 1715878300000, + "updated": 1715878400000, + "tz": None, + "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us9999", + "detail": "https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/us9999.geojson", + "felt": None, + "cdi": None, + "mmi": None, + "alert": None, + "status": "automatic", + "tsunami": 0, + "sig": 150, + "net": "us", + "code": "9999", + "ids": ",us9999,", + "sources": ",us,", + "types": ",origin,", + "nst": 15, + "dmin": 0.3, + "rms": 0.25, + "gap": 80, + "magType": "ml", + "type": "earthquake", + "title": "M 3.0 - 50km E of San Francisco, California" + }, + "geometry": { + "type": "Point", + "coordinates": [-121.5, 37.8, 8.0] + }, + "id": "us9999" + } + ] +} + +# Sample with null magnitude +SAMPLE_NULL_MAG = { + "type": "FeatureCollection", + "metadata": {"count": 1}, + "features": [ + { + "type": "Feature", + "properties": { + "mag": None, + "place": "Quarry blast", + "time": 1715878500000, + "type": "quarry blast" + }, + "geometry": { + "type": "Point", + "coordinates": [-116.0, 44.0, 0.0] + }, + "id": "usquarry1" + } + ] +} + + +def make_adapter_config( + region: dict | None = None, + feed: str = "all_hour", +) -> AdapterConfig: + """Create an AdapterConfig for testing.""" + settings = {"feed": feed} + if region: + settings["region"] = region + else: + settings["region"] = { + "north": 49.5, + "south": 40.0, + "east": -110.0, + "west": -125.0, + } + + return AdapterConfig( + name="usgs_quake", + enabled=True, + cadence_s=60, + settings=settings, + updated_at=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def temp_db_path(): + """Create a temporary database path for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + yield Path(f.name) + + +@pytest.fixture +def mock_config_store(): + """Create a mock ConfigStore.""" + return MagicMock() + + +class TestMagnitudeTier: + """Test magnitude tier classification.""" + + def test_minor(self): + assert magnitude_tier(0.5) == "minor" + assert magnitude_tier(2.9) == "minor" + + def test_light(self): + assert magnitude_tier(3.0) == "light" + assert magnitude_tier(3.9) == "light" + + def test_moderate(self): + assert magnitude_tier(4.0) == "moderate" + assert magnitude_tier(4.9) == "moderate" + + def test_strong(self): + assert magnitude_tier(5.0) == "strong" + assert magnitude_tier(5.9) == "strong" + + def test_major(self): + assert magnitude_tier(6.0) == "major" + assert magnitude_tier(6.9) == "major" + + def test_great(self): + assert magnitude_tier(7.0) == "great" + assert magnitude_tier(9.5) == "great" + + +class TestMagnitudeToSeverity: + """Test magnitude to severity mapping.""" + + def test_severity_levels(self): + assert magnitude_to_severity(2.0) == 0 + assert magnitude_to_severity(3.5) == 1 + assert magnitude_to_severity(4.5) == 2 + assert magnitude_to_severity(5.5) == 3 + assert magnitude_to_severity(6.5) == 4 + assert magnitude_to_severity(7.5) == 5 + + +class TestRegionFiltering: + """Test region/bbox filtering.""" + + @pytest.mark.asyncio + async def test_filters_out_of_bbox(self, temp_db_path, mock_config_store): + """Test that quakes outside bbox are filtered.""" + # Region covers PNW only (north of 40, west of -110) + config = make_adapter_config( + region={"north": 49.5, "south": 40.0, "east": -110.0, "west": -125.0} + ) + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + # us1234 (Boise) and us5678 (Portland) are in bbox + # us9999 (SF, lat 37.8) is outside bbox (south < 40) + assert len(events) == 2 + event_ids = {e.id for e in events} + assert "us1234" in event_ids + assert "us5678" in event_ids + assert "us9999" not in event_ids + + await adapter.shutdown() + + +class TestDeduplication: + """Test deduplication logic.""" + + @pytest.mark.asyncio + async def test_dedup_marks_published(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + event_id = "us1234" + + assert not adapter.is_published(event_id) + adapter.mark_published(event_id) + assert adapter.is_published(event_id) + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_second_poll_no_duplicates(self, temp_db_path, mock_config_store): + """Test that second poll with same events yields nothing.""" + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + # First poll + events1 = [] + async for event in adapter.poll(): + events1.append(event) + + # Second poll - same data + events2 = [] + async for event in adapter.poll(): + events2.append(event) + + # First poll should have events (2 in bbox) + assert len(events1) == 2 + # Second poll should have 0 (all deduped) + assert len(events2) == 0 + + await adapter.shutdown() + + +class TestNullMagnitude: + """Test handling of null magnitude events.""" + + @pytest.mark.asyncio + async def test_skips_null_magnitude(self, temp_db_path, mock_config_store): + """Test that events with null magnitude are skipped.""" + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_NULL_MAG + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Should skip the null-magnitude event + assert len(events) == 0 + + await adapter.shutdown() + + +class TestEventGeneration: + """Test Event generation from features.""" + + @pytest.mark.asyncio + async def test_event_category(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Check categories + categories = {e.category for e in events} + # us1234 is M2.5 -> minor, us5678 is M4.5 -> moderate + assert "quake.event.minor" in categories + assert "quake.event.moderate" in categories + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_event_severity(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Find events by ID + events_by_id = {e.id: e for e in events} + + # M2.5 -> severity 0 + assert events_by_id["us1234"].severity == 0 + # M4.5 -> severity 2 + assert events_by_id["us5678"].severity == 2 + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_event_geo(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + with patch.object(adapter, "_fetch_geojson", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_GEOJSON + + events = [] + async for event in adapter.poll(): + events.append(event) + + events_by_id = {e.id: e for e in events} + + # Check Boise quake coordinates + boise = events_by_id["us1234"] + assert boise.geo.centroid == (-116.2, 43.7) + + await adapter.shutdown() + + +class TestApplyConfig: + """Test hot-reload configuration application.""" + + @pytest.mark.asyncio + async def test_apply_config_updates_region(self, temp_db_path, mock_config_store): + config = make_adapter_config( + region={"north": 49.5, "south": 40.0, "east": -110.0, "west": -125.0} + ) + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + assert adapter.region.north == 49.5 + + new_config = make_adapter_config( + region={"north": 48.0, "south": 45.0, "east": -115.0, "west": -125.0} + ) + await adapter.apply_config(new_config) + + assert adapter.region.north == 48.0 + assert adapter.region.south == 45.0 + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_apply_config_updates_feed(self, temp_db_path, mock_config_store): + config = make_adapter_config(feed="all_hour") + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + assert adapter._feed == "all_hour" + + new_config = make_adapter_config(feed="all_day") + await adapter.apply_config(new_config) + + assert adapter._feed == "all_day" + + await adapter.shutdown()