From 4573bf6ee2c47607f06e75adcebb6e32523cd012 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Mon, 18 May 2026 22:14:12 +0000 Subject: [PATCH 1/2] refactor(adapters): self-describing adapter pattern with auto-discovery - Add stream_name, subject_for(), and settings_schema() to SourceAdapter ABC - Implement all three methods in NWSAdapter, FIRMSAdapter, USGSQuakeAdapter - Replace manual _ADAPTER_REGISTRY with pkgutil.iter_modules auto-discovery - Remove subject_for_event from models.py (each adapter owns its subject logic) - Update supervisor to use adapter.subject_for(event) instead of helper - Fix quake events going to wrong stream (was publishing to CENTRAL_WX) - Update test files to use adapter methods This fixes the quake stream bug where events were published to central.wx.alert.us.unknown instead of central.quake.event.. Co-Authored-By: Claude Opus 4.5 --- src/central/adapter.py | 35 ++++++++++++++++++++++- src/central/adapters/firms.py | 41 +++++++++++++++++++-------- src/central/adapters/nws.py | 45 ++++++++++++++++++++++++++++++ src/central/adapters/usgs_quake.py | 32 +++++++++++++++++++++ src/central/models.py | 45 ------------------------------ src/central/supervisor.py | 34 ++++++++++++++-------- tests/test_firms.py | 23 +++++++++++---- tests/test_models.py | 43 +--------------------------- tests/test_nws_normalization.py | 5 ++-- 9 files changed, 185 insertions(+), 118 deletions(-) diff --git a/src/central/adapter.py b/src/central/adapter.py index 2037eef..f7659fe 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from central.config_models import AdapterConfig @@ -16,9 +16,14 @@ class SourceAdapter(ABC): Adapters yield Events. The supervisor handles scheduling, CloudEvents wrapping, publish, and metadata heartbeats. + + Class attributes that subclasses must define: + name: Short identifier, e.g. "nws" + stream_name: Target JetStream stream, e.g. "CENTRAL_WX" """ name: str # short identifier, e.g. "nws" + stream_name: str # target JetStream stream, e.g. "CENTRAL_WX" @abstractmethod async def poll(self) -> AsyncIterator[Event]: @@ -40,6 +45,34 @@ class SourceAdapter(ABC): """ ... + @abstractmethod + def subject_for(self, event: Event) -> str: + """ + Compute the NATS subject for an event. + + Each adapter knows its own subject hierarchy. The supervisor + calls this to determine where to publish each event. + """ + ... + + @classmethod + @abstractmethod + def settings_schema(cls) -> dict[str, Any]: + """ + Return the JSON-serializable schema for this adapter's settings. + + Used by the GUI to render adapter configuration forms. + Returns a dict with keys like: + { + "contact_email": {"type": "str", "default": "", "description": "..."}, + "region": {"type": "RegionConfig", "default": None, "description": "..."}, + } + + Note: If a second nested type beyond RegionConfig appears, + refactor this to use generic recursion for nested schemas. + """ + ... + async def startup(self) -> None: """Optional lifecycle hook called before first poll.""" pass diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 3f746fa..11ba054 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -53,6 +53,7 @@ class FIRMSAdapter(SourceAdapter): """NASA FIRMS fire hotspot adapter.""" name = "firms" + stream_name = "CENTRAL_FIRE" def __init__( self, @@ -116,6 +117,35 @@ class FIRMSAdapter(SourceAdapter): }, ) + def subject_for(self, event: Event) -> str: + """Compute NATS subject for a fire hotspot event. + + Subject format: central.fire.hotspot.. + The category already contains this structure. + """ + return f"central.{event.category}" + + @classmethod + def settings_schema(cls) -> dict[str, Any]: + """Return schema for FIRMS adapter settings.""" + return { + "api_key_alias": { + "type": "str", + "default": "firms", + "description": "Alias for the FIRMS API key in config.api_keys", + }, + "satellites": { + "type": "list[str]", + "default": ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"], + "description": "List of satellite feeds to poll", + }, + "region": { + "type": "RegionConfig", + "default": None, + "description": "Geographic bounding box to filter hotspots", + }, + } + async def startup(self) -> None: """Initialize HTTP session, dedup tracker, and fetch API key.""" # Fetch API key @@ -417,14 +447,3 @@ class FIRMSAdapter(SourceAdapter): }, ) - -def subject_for_fire_hotspot(ev: Event) -> str: - """Compute the NATS subject for a fire hotspot event. - - Subject format: central.fire.hotspot.. - - The category already contains the satellite and confidence info, - so we just prefix with 'central.'. - """ - # category is "fire.hotspot.." - return f"central.{ev.category}" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 129584f..ce2bf73 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -193,6 +193,7 @@ class NWSAdapter(SourceAdapter): """National Weather Service alerts adapter.""" name = "nws" + stream_name = "CENTRAL_WX" def __init__( self, @@ -234,6 +235,50 @@ class NWSAdapter(SourceAdapter): }, ) + def subject_for(self, event: Event) -> str: + """Compute NATS subject for a weather alert. + + Subject format: central.wx.alert.us... + where type is 'county' or 'zone' based on primary_region format. + """ + prefix = "central.wx" + + if event.geo.primary_region is None: + return f"{prefix}.alert.us.unknown" + + region = event.geo.primary_region + + # Parse US-- format + parts = region.split("-") + if len(parts) < 3 or parts[0] != "US": + return f"{prefix}.alert.us.unknown" + + state = parts[1].lower() + code = "-".join(parts[2:]) # Handle multi-part names + + if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): + # Zone code like Z033 + return f"{prefix}.alert.us.{state}.zone.{code.lower()}" + else: + # County name + return f"{prefix}.alert.us.{state}.county.{code.lower()}" + + @classmethod + def settings_schema(cls) -> dict[str, Any]: + """Return schema for NWS adapter settings.""" + return { + "contact_email": { + "type": "str", + "default": "", + "description": "Contact email for NWS API User-Agent header", + }, + "region": { + "type": "RegionConfig", + "default": None, + "description": "Geographic bounding box to filter alerts", + }, + } + def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: """Check if feature geometry intersects configured region bbox. diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 601c52b..2f7e4c0 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -64,6 +64,7 @@ class USGSQuakeAdapter(SourceAdapter): """USGS Earthquake Hazards Program adapter.""" name = "usgs_quake" + stream_name = "CENTRAL_QUAKE" def __init__( self, @@ -398,3 +399,34 @@ class USGSQuakeAdapter(SourceAdapter): new_count += 1 logger.info("USGS quake yielded events", extra={"count": new_count}) + + def subject_for(self, event: Event) -> str: + """Return NATS subject for quake event.""" + return f"central.{event.category}" + + @classmethod + def settings_schema(cls) -> dict[str, Any]: + """Return JSON Schema for USGS quake adapter settings.""" + return { + "type": "object", + "properties": { + "feed": { + "type": "string", + "enum": ["all_hour", "all_day", "all_week", "all_month"], + "default": "all_hour", + "description": "USGS feed type", + }, + "region": { + "type": "object", + "properties": { + "north": {"type": "number"}, + "south": {"type": "number"}, + "east": {"type": "number"}, + "west": {"type": "number"}, + }, + "required": ["north", "south", "east", "west"], + "description": "Bounding box for earthquake monitoring", + }, + }, + "required": ["region"], + } diff --git a/src/central/models.py b/src/central/models.py index 17145ad..c7a2159 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -32,48 +32,3 @@ class Event(BaseModel): data: dict[str, Any] # adapter-specific payload -def subject_for_event(ev: Event) -> str: - """ - Compute the NATS subject for an event based on its category. - - Dispatch by category prefix: - - fire.*: returns central. directly - - wx.*: uses weather alert subject logic - - Weather alert subjects: - central.wx.alert.us..county. - or - central.wx.alert.us..zone. - based on whether the primary_region encodes a county or a zone. - - Fire hotspot subjects: - central.fire.hotspot.. - """ - # Fire events: subject is just central. - if ev.category.startswith("fire."): - return f"central.{ev.category}" - - # Weather events: use geo-based subject logic - prefix = "central.wx" - - if ev.geo.primary_region is None: - return f"{prefix}.alert.us.unknown" - - region = ev.geo.primary_region - - # Parse US-- format - # County codes are like "Ada", "Canyon" (names) - # Zone codes start with "Z" like "Z033" - parts = region.split("-") - if len(parts) < 3 or parts[0] != "US": - return f"{prefix}.alert.us.unknown" - - state = parts[1].lower() - code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" - - if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): - # Zone code like Z033 - return f"{prefix}.alert.us.{state}.zone.{code.lower()}" - else: - # County name - return f"{prefix}.alert.us.{state}.county.{code.lower()}" diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 652ce44..f78ff41 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -13,24 +13,36 @@ from typing import Any import nats from nats.js import JetStreamContext +import importlib +import pkgutil + from central.adapter import SourceAdapter -from central.adapters.nws import NWSAdapter -from central.adapters.firms import FIRMSAdapter -from central.adapters.usgs_quake import USGSQuakeAdapter from central.cloudevents_wire import wrap_event from central.config_models import AdapterConfig from central.config_source import ConfigSource, DbConfigSource from central.config_store import ConfigStore from central.bootstrap_config import get_settings -from central.models import subject_for_event from central.stream_manager import StreamManager +import central.adapters -# Adapter registry - add new adapters here -_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { - "nws": NWSAdapter, - "firms": FIRMSAdapter, - "usgs_quake": USGSQuakeAdapter, -} +def _discover_adapters() -> dict[str, type[SourceAdapter]]: + """Auto-discover adapter classes from central.adapters package.""" + registry: dict[str, type[SourceAdapter]] = {} + for module_info in pkgutil.iter_modules(central.adapters.__path__): + module = importlib.import_module(f"central.adapters.{module_info.name}") + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, SourceAdapter) + and attr is not SourceAdapter + and hasattr(attr, "name") + ): + registry[attr.name] = attr + return registry + + +_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = _discover_adapters() CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") @@ -232,7 +244,7 @@ class Supervisor: # Build CloudEvent (uses defaults if no config provided) envelope, msg_id = wrap_event(event, self._cloudevents_config) - subject = subject_for_event(event) + subject = state.adapter.subject_for(event) # Publish await self._publish_event(subject, envelope, msg_id) diff --git a/tests/test_firms.py b/tests/test_firms.py index 9e51331..bfe629a 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -10,7 +10,6 @@ from central.adapters.firms import ( FIRMSAdapter, CONFIDENCE_MAP, SATELLITE_SHORT, - subject_for_fire_hotspot, ) from central.config_models import AdapterConfig, RegionConfig from central.models import Event, Geo @@ -285,7 +284,14 @@ class TestDeduplication: class TestSubjectGeneration: """Test subject generation for fire hotspots.""" - def test_subject_format(self): + @pytest.mark.asyncio + async def test_subject_format(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) event = Event( id="test", adapter="firms", @@ -296,10 +302,17 @@ class TestSubjectGeneration: data={}, ) - subject = subject_for_fire_hotspot(event) + subject = adapter.subject_for(event) assert subject == "central.fire.hotspot.viirs_snpp.high" - def test_subject_nominal_confidence(self): + @pytest.mark.asyncio + async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) event = Event( id="test", adapter="firms", @@ -310,7 +323,7 @@ class TestSubjectGeneration: data={}, ) - subject = subject_for_fire_hotspot(event) + subject = adapter.subject_for(event) assert subject == "central.fire.hotspot.viirs_noaa20.nominal" diff --git a/tests/test_models.py b/tests/test_models.py index c010f39..7ed5c54 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone import pytest -from central.models import Event, Geo, subject_for_event +from central.models import Event, Geo from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config from central.cloudevents_wire import wrap_event @@ -57,47 +57,6 @@ def sample_config() -> Config: ) -class TestSubjectForEvent: - """Tests for subject_for_event helper.""" - - def test_county_subject(self, sample_event: Event) -> None: - """County codes produce county subject.""" - subject = subject_for_event(sample_event) - assert subject == "central.wx.alert.us.id.county.ada" - - def test_zone_subject(self, sample_geo: Geo) -> None: - """Zone codes produce zone subject.""" - geo = Geo( - centroid=sample_geo.centroid, - bbox=sample_geo.bbox, - regions=["US-ID-Z033"], - primary_region="US-ID-Z033", - ) - event = Event( - id="test-zone", - adapter="nws", - category="wx.alert.winter_storm_warning", - time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), - geo=geo, - data={}, - ) - subject = subject_for_event(event) - assert subject == "central.wx.alert.us.id.zone.z033" - - def test_unknown_subject(self, sample_event: Event) -> None: - """Missing primary_region produces unknown subject.""" - geo = Geo(regions=[], primary_region=None) - event = Event( - id="test-unknown", - adapter="nws", - category="wx.alert.test", - time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), - geo=geo, - data={}, - ) - subject = subject_for_event(event) - assert subject == "central.wx.alert.us.unknown" - class TestCloudEventsWire: """Tests for CloudEvents wire format.""" diff --git a/tests/test_nws_normalization.py b/tests/test_nws_normalization.py index 53a7e49..706ce1d 100644 --- a/tests/test_nws_normalization.py +++ b/tests/test_nws_normalization.py @@ -17,7 +17,6 @@ from central.adapters.nws import ( SEVERITY_MAP, ) from central.config_models import AdapterConfig -from central.models import subject_for_event # Sample NWS GeoJSON features for testing @@ -272,7 +271,7 @@ class TestSubjectDerivation: def test_county_subject(self, adapter: NWSAdapter) -> None: event = adapter._normalize_feature(SAMPLE_FEATURE_ID) assert event is not None - subject = subject_for_event(event) + subject = adapter.subject_for(event) # Primary region should be alphabetically first # Could be county or zone depending on sort order assert subject.startswith("central.wx.alert.us.id.") @@ -294,7 +293,7 @@ class TestSubjectDerivation: } event = adapter._normalize_feature(feature) assert event is not None - subject = subject_for_event(event) + subject = adapter.subject_for(event) assert "zone" in subject From 4ee3d8bd14e7c2179fbec12d7dd37d68e118385b Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Mon, 18 May 2026 22:33:19 +0000 Subject: [PATCH 2/2] fix(adapters): complete self-describing adapter attributes - Replace settings_schema classmethod with Pydantic model class attribute - Add display_name, description, requires_api_key, wizard_order, default_cadence_s - Remove stream_name from adapters (JetStream routes by subject filter) - Define NWSSettings, FIRMSSettings, USGSQuakeSettings Pydantic models - Make discover_adapters() public with error handling - Move adapter registry to Supervisor instance (self._adapters) - Add subject_for tests for all 6 quake magnitude tiers - Fix test_supervisor_integration to use injected mock adapters Co-Authored-By: Claude Opus 4.5 --- src/central/adapter.py | 38 ++- src/central/adapters/firms.py | 36 +-- src/central/adapters/nws.py | 30 +- src/central/adapters/usgs_quake.py | 42 +-- src/central/supervisor.py | 17 +- tests/test_supervisor_integration.py | 399 ++++++++++++++------------- tests/test_usgs_quake.py | 119 ++++++++ 7 files changed, 392 insertions(+), 289 deletions(-) diff --git a/src/central/adapter.py b/src/central/adapter.py index f7659fe..276a9cf 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -2,7 +2,9 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING + +from pydantic import BaseModel if TYPE_CHECKING: from central.config_models import AdapterConfig @@ -19,11 +21,21 @@ class SourceAdapter(ABC): Class attributes that subclasses must define: name: Short identifier, e.g. "nws" - stream_name: Target JetStream stream, e.g. "CENTRAL_WX" + display_name: Human-readable name for GUI + description: Short description of the adapter + settings_schema: Pydantic model class for adapter settings + requires_api_key: Key alias if API key required, else None + wizard_order: Order in setup wizard (None = not in wizard) + default_cadence_s: Default polling interval in seconds """ - name: str # short identifier, e.g. "nws" - stream_name: str # target JetStream stream, e.g. "CENTRAL_WX" + name: str + display_name: str + description: str + settings_schema: type[BaseModel] + requires_api_key: str | None = None + wizard_order: int | None = None + default_cadence_s: int @abstractmethod async def poll(self) -> AsyncIterator[Event]: @@ -55,24 +67,6 @@ class SourceAdapter(ABC): """ ... - @classmethod - @abstractmethod - def settings_schema(cls) -> dict[str, Any]: - """ - Return the JSON-serializable schema for this adapter's settings. - - Used by the GUI to render adapter configuration forms. - Returns a dict with keys like: - { - "contact_email": {"type": "str", "default": "", "description": "..."}, - "region": {"type": "RegionConfig", "default": None, "description": "..."}, - } - - Note: If a second nested type beyond RegionConfig appears, - refactor this to use generic recursion for nested schemas. - """ - ... - async def startup(self) -> None: """Optional lifecycle hook called before first poll.""" pass diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 11ba054..7538d96 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -18,6 +18,8 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -49,11 +51,23 @@ SEVERITY_MAP = { } +class FIRMSSettings(BaseModel): + """Settings schema for FIRMS adapter.""" + api_key_alias: str = "firms" + satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + region: RegionConfig | None = None + + class FIRMSAdapter(SourceAdapter): """NASA FIRMS fire hotspot adapter.""" name = "firms" - stream_name = "CENTRAL_FIRE" + display_name = "NASA FIRMS Fire Hotspots" + description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS." + settings_schema = FIRMSSettings + requires_api_key = "firms" + wizard_order = 2 + default_cadence_s = 300 def __init__( self, @@ -125,26 +139,6 @@ class FIRMSAdapter(SourceAdapter): """ return f"central.{event.category}" - @classmethod - def settings_schema(cls) -> dict[str, Any]: - """Return schema for FIRMS adapter settings.""" - return { - "api_key_alias": { - "type": "str", - "default": "firms", - "description": "Alias for the FIRMS API key in config.api_keys", - }, - "satellites": { - "type": "list[str]", - "default": ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"], - "description": "List of satellite feeds to poll", - }, - "region": { - "type": "RegionConfig", - "default": None, - "description": "Geographic bounding box to filter hotspots", - }, - } async def startup(self) -> None: """Initialize HTTP session, dedup tracker, and fetch API key.""" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index ce2bf73..ce95d3a 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -19,6 +19,8 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -189,11 +191,22 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]: return sorted(regions) +class NWSSettings(BaseModel): + """Settings schema for NWS adapter.""" + contact_email: str = "" + region: RegionConfig | None = None + + class NWSAdapter(SourceAdapter): """National Weather Service alerts adapter.""" name = "nws" - stream_name = "CENTRAL_WX" + display_name = "NWS Weather Alerts" + description = "National Weather Service active alerts via api.weather.gov." + settings_schema = NWSSettings + requires_api_key = None + wizard_order = 1 + default_cadence_s = 60 def __init__( self, @@ -263,21 +276,6 @@ class NWSAdapter(SourceAdapter): # County name return f"{prefix}.alert.us.{state}.county.{code.lower()}" - @classmethod - def settings_schema(cls) -> dict[str, Any]: - """Return schema for NWS adapter settings.""" - return { - "contact_email": { - "type": "str", - "default": "", - "description": "Contact email for NWS API User-Agent header", - }, - "region": { - "type": "RegionConfig", - "default": None, - "description": "Geographic bounding box to filter alerts", - }, - } def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: """Check if feature geometry intersects configured region bbox. diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 2f7e4c0..e73148f 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -17,6 +17,8 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -60,11 +62,22 @@ def magnitude_to_severity(mag: float) -> int: return 5 +class USGSQuakeSettings(BaseModel): + """Settings schema for USGS quake adapter.""" + feed: str = "all_hour" + region: RegionConfig | None = None + + class USGSQuakeAdapter(SourceAdapter): """USGS Earthquake Hazards Program adapter.""" name = "usgs_quake" - stream_name = "CENTRAL_QUAKE" + display_name = "USGS Earthquakes" + description = "USGS earthquake feed (configurable window)." + settings_schema = USGSQuakeSettings + requires_api_key = None + wizard_order = 3 + default_cadence_s = 60 def __init__( self, @@ -404,29 +417,4 @@ class USGSQuakeAdapter(SourceAdapter): """Return NATS subject for quake event.""" return f"central.{event.category}" - @classmethod - def settings_schema(cls) -> dict[str, Any]: - """Return JSON Schema for USGS quake adapter settings.""" - return { - "type": "object", - "properties": { - "feed": { - "type": "string", - "enum": ["all_hour", "all_day", "all_week", "all_month"], - "default": "all_hour", - "description": "USGS feed type", - }, - "region": { - "type": "object", - "properties": { - "north": {"type": "number"}, - "south": {"type": "number"}, - "east": {"type": "number"}, - "west": {"type": "number"}, - }, - "required": ["north", "south", "east", "west"], - "description": "Bounding box for earthquake monitoring", - }, - }, - "required": ["region"], - } + diff --git a/src/central/supervisor.py b/src/central/supervisor.py index f78ff41..da00f0e 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -25,11 +25,18 @@ from central.bootstrap_config import get_settings from central.stream_manager import StreamManager import central.adapters -def _discover_adapters() -> dict[str, type[SourceAdapter]]: +def discover_adapters() -> dict[str, type[SourceAdapter]]: """Auto-discover adapter classes from central.adapters package.""" registry: dict[str, type[SourceAdapter]] = {} for module_info in pkgutil.iter_modules(central.adapters.__path__): - module = importlib.import_module(f"central.adapters.{module_info.name}") + try: + module = importlib.import_module(f"central.adapters.{module_info.name}") + except Exception as e: + logger.error( + "Failed to import adapter module", + extra={"module": module_info.name, "error": str(e)}, + ) + continue for attr_name in dir(module): attr = getattr(module, attr_name) if ( @@ -41,9 +48,6 @@ def _discover_adapters() -> dict[str, type[SourceAdapter]]: registry[attr.name] = attr return registry - -_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = _discover_adapters() - CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") # Stream subject mappings @@ -126,6 +130,7 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config + self._adapters = discover_adapters() self._nc: nats.NATS | None = None self._js: JetStreamContext | None = None self._stream_manager: StreamManager | None = None @@ -173,7 +178,7 @@ class Supervisor: def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: """Create an adapter instance based on config name.""" - cls = _ADAPTER_REGISTRY.get(config.name) + cls = self._adapters.get(config.name) if cls is None: raise ValueError(f"Unknown adapter type: {config.name}") return cls( diff --git a/tests/test_supervisor_integration.py b/tests/test_supervisor_integration.py index cf1c65b..e318752 100644 --- a/tests/test_supervisor_integration.py +++ b/tests/test_supervisor_integration.py @@ -200,76 +200,77 @@ class TestEnableDisableEnableIntegration: supervisor._js = mock_nats.jetstream() # Patch NWSAdapter to use our mock - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start supervisor (starts adapter) - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start supervisor (starts adapter) + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) - # Simulate completed poll 5 minutes ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 5 minutes ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is in the past - # and poll immediately. Let's verify by checking the wait time logic. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is in the past + # and poll immediately. Let's verify by checking the wait time logic. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was 5 minutes ago, cadence is 60s - # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago - # wait_time should be 0 (poll immediately) - assert wait_time == 0, ( - f"Expected immediate poll (wait=0), got wait={wait_time}s. " - f"last_poll was {saved_last_poll}, now is {now}" - ) + # last_poll was 5 minutes ago, cadence is 60s + # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago + # wait_time should be 0 (poll immediately) + assert wait_time == 0, ( + f"Expected immediate poll (wait=0), got wait={wait_time}s. " + f"last_poll was {saved_last_poll}, now is {now}" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_enable_gap_shorter_than_cadence( @@ -308,75 +309,76 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter (simulate 20 seconds later, but we're just - # checking the rate limit logic) - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter (simulate 20 seconds later, but we're just + # checking the rate limit logic) + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is still in the future - # and wait until then. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is still in the future + # and wait until then. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was ~10 seconds ago, cadence is 60s - # wait_time should be ~50s (60 - 10 = 50) - assert 45 < wait_time < 55, ( - f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " - f"Rate limit violated: poll would happen before last_poll + cadence" - ) + # last_poll was ~10 seconds ago, cadence is 60s + # wait_time should be ~50s (60 - 10 = 50) + assert 45 < wait_time < 55, ( + f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " + f"Rate limit violated: poll would happen before last_poll + cadence" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_delete_readd_fresh_state( @@ -414,60 +416,61 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # DELETE adapter from DB (remove from config source) - config_source.set_adapter(None, name="nws") - await supervisor._on_config_change("adapters", "nws") + # DELETE adapter from DB (remove from config source) + config_source.set_adapter(None, name="nws") + await supervisor._on_config_change("adapters", "nws") - # Verify adapter fully removed - assert "nws" not in supervisor._adapter_states + # Verify adapter fully removed + assert "nws" not in supervisor._adapter_states - # Re-add adapter with same name - new_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(new_config) - await supervisor._on_config_change("adapters", "nws") + # Re-add adapter with same name + new_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(new_config) + await supervisor._on_config_change("adapters", "nws") - # Verify new adapter started fresh - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - # last_completed_poll should be None (fresh adapter) - assert state.last_completed_poll is None, ( - f"Expected None (fresh adapter), got {state.last_completed_poll}. " - f"Preserved state not cleared on delete." - ) + # Verify new adapter started fresh + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + # last_completed_poll should be None (fresh adapter) + assert state.last_completed_poll is None, ( + f"Expected None (fresh adapter), got {state.last_completed_poll}. " + f"Preserved state not cleared on delete." + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_stop_preserves_state_start_reuses_it( @@ -497,34 +500,35 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) - saved_poll = state.last_completed_poll + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) + saved_poll = state.last_completed_poll - # Stop adapter - await supervisor._stop_adapter("nws") + # Stop adapter + await supervisor._stop_adapter("nws") - # State should still exist - assert "nws" in supervisor._adapter_states - state = supervisor._adapter_states["nws"] - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # State should still exist + assert "nws" in supervisor._adapter_states + state = supervisor._adapter_states["nws"] + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Restart adapter - await supervisor._start_adapter(config) + # Restart adapter + await supervisor._start_adapter(config) - # Should reuse existing state - state = supervisor._adapter_states.get("nws") - assert adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # Should reuse existing state + state = supervisor._adapter_states.get("nws") + assert adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_remove_adapter_clears_state( @@ -554,14 +558,15 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - await supervisor._start_adapter(config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - # Remove adapter - await cleanup_adapter(supervisor, "nws") + # Remove adapter + await cleanup_adapter(supervisor, "nws") - # State should be gone - assert "nws" not in supervisor._adapter_states + # State should be gone + assert "nws" not in supervisor._adapter_states diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py index 24c6f73..690f7da 100644 --- a/tests/test_usgs_quake.py +++ b/tests/test_usgs_quake.py @@ -480,3 +480,122 @@ class TestApplyConfig: assert adapter._feed == "all_day" await adapter.shutdown() + + + +class TestSubjectFor: + """Test subject_for method for all magnitude tiers.""" + + @pytest.mark.asyncio + async def test_subject_minor(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-minor", + adapter="usgs_quake", + category="quake.event.minor", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.minor" + + @pytest.mark.asyncio + async def test_subject_light(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-light", + adapter="usgs_quake", + category="quake.event.light", + time=datetime.now(timezone.utc), + severity=1, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.light" + + @pytest.mark.asyncio + async def test_subject_moderate(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-moderate", + adapter="usgs_quake", + category="quake.event.moderate", + time=datetime.now(timezone.utc), + severity=2, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.moderate" + + @pytest.mark.asyncio + async def test_subject_strong(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-strong", + adapter="usgs_quake", + category="quake.event.strong", + time=datetime.now(timezone.utc), + severity=3, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.strong" + + @pytest.mark.asyncio + async def test_subject_major(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-major", + adapter="usgs_quake", + category="quake.event.major", + time=datetime.now(timezone.utc), + severity=4, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.major" + + @pytest.mark.asyncio + async def test_subject_great(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-great", + adapter="usgs_quake", + category="quake.event.great", + time=datetime.now(timezone.utc), + severity=5, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.great"