diff --git a/src/central/adapter.py b/src/central/adapter.py index f7659fe..276a9cf 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -2,7 +2,9 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING + +from pydantic import BaseModel if TYPE_CHECKING: from central.config_models import AdapterConfig @@ -19,11 +21,21 @@ class SourceAdapter(ABC): Class attributes that subclasses must define: name: Short identifier, e.g. "nws" - stream_name: Target JetStream stream, e.g. "CENTRAL_WX" + display_name: Human-readable name for GUI + description: Short description of the adapter + settings_schema: Pydantic model class for adapter settings + requires_api_key: Key alias if API key required, else None + wizard_order: Order in setup wizard (None = not in wizard) + default_cadence_s: Default polling interval in seconds """ - name: str # short identifier, e.g. "nws" - stream_name: str # target JetStream stream, e.g. "CENTRAL_WX" + name: str + display_name: str + description: str + settings_schema: type[BaseModel] + requires_api_key: str | None = None + wizard_order: int | None = None + default_cadence_s: int @abstractmethod async def poll(self) -> AsyncIterator[Event]: @@ -55,24 +67,6 @@ class SourceAdapter(ABC): """ ... - @classmethod - @abstractmethod - def settings_schema(cls) -> dict[str, Any]: - """ - Return the JSON-serializable schema for this adapter's settings. - - Used by the GUI to render adapter configuration forms. - Returns a dict with keys like: - { - "contact_email": {"type": "str", "default": "", "description": "..."}, - "region": {"type": "RegionConfig", "default": None, "description": "..."}, - } - - Note: If a second nested type beyond RegionConfig appears, - refactor this to use generic recursion for nested schemas. - """ - ... - async def startup(self) -> None: """Optional lifecycle hook called before first poll.""" pass diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 11ba054..7538d96 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -18,6 +18,8 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -49,11 +51,23 @@ SEVERITY_MAP = { } +class FIRMSSettings(BaseModel): + """Settings schema for FIRMS adapter.""" + api_key_alias: str = "firms" + satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + region: RegionConfig | None = None + + class FIRMSAdapter(SourceAdapter): """NASA FIRMS fire hotspot adapter.""" name = "firms" - stream_name = "CENTRAL_FIRE" + display_name = "NASA FIRMS Fire Hotspots" + description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS." + settings_schema = FIRMSSettings + requires_api_key = "firms" + wizard_order = 2 + default_cadence_s = 300 def __init__( self, @@ -125,26 +139,6 @@ class FIRMSAdapter(SourceAdapter): """ return f"central.{event.category}" - @classmethod - def settings_schema(cls) -> dict[str, Any]: - """Return schema for FIRMS adapter settings.""" - return { - "api_key_alias": { - "type": "str", - "default": "firms", - "description": "Alias for the FIRMS API key in config.api_keys", - }, - "satellites": { - "type": "list[str]", - "default": ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"], - "description": "List of satellite feeds to poll", - }, - "region": { - "type": "RegionConfig", - "default": None, - "description": "Geographic bounding box to filter hotspots", - }, - } async def startup(self) -> None: """Initialize HTTP session, dedup tracker, and fetch API key.""" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index ce2bf73..ce95d3a 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -19,6 +19,8 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -189,11 +191,22 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]: return sorted(regions) +class NWSSettings(BaseModel): + """Settings schema for NWS adapter.""" + contact_email: str = "" + region: RegionConfig | None = None + + class NWSAdapter(SourceAdapter): """National Weather Service alerts adapter.""" name = "nws" - stream_name = "CENTRAL_WX" + display_name = "NWS Weather Alerts" + description = "National Weather Service active alerts via api.weather.gov." + settings_schema = NWSSettings + requires_api_key = None + wizard_order = 1 + default_cadence_s = 60 def __init__( self, @@ -263,21 +276,6 @@ class NWSAdapter(SourceAdapter): # County name return f"{prefix}.alert.us.{state}.county.{code.lower()}" - @classmethod - def settings_schema(cls) -> dict[str, Any]: - """Return schema for NWS adapter settings.""" - return { - "contact_email": { - "type": "str", - "default": "", - "description": "Contact email for NWS API User-Agent header", - }, - "region": { - "type": "RegionConfig", - "default": None, - "description": "Geographic bounding box to filter alerts", - }, - } def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: """Check if feature geometry intersects configured region bbox. diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 2f7e4c0..e73148f 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -17,6 +17,8 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -60,11 +62,22 @@ def magnitude_to_severity(mag: float) -> int: return 5 +class USGSQuakeSettings(BaseModel): + """Settings schema for USGS quake adapter.""" + feed: str = "all_hour" + region: RegionConfig | None = None + + class USGSQuakeAdapter(SourceAdapter): """USGS Earthquake Hazards Program adapter.""" name = "usgs_quake" - stream_name = "CENTRAL_QUAKE" + display_name = "USGS Earthquakes" + description = "USGS earthquake feed (configurable window)." + settings_schema = USGSQuakeSettings + requires_api_key = None + wizard_order = 3 + default_cadence_s = 60 def __init__( self, @@ -404,29 +417,4 @@ class USGSQuakeAdapter(SourceAdapter): """Return NATS subject for quake event.""" return f"central.{event.category}" - @classmethod - def settings_schema(cls) -> dict[str, Any]: - """Return JSON Schema for USGS quake adapter settings.""" - return { - "type": "object", - "properties": { - "feed": { - "type": "string", - "enum": ["all_hour", "all_day", "all_week", "all_month"], - "default": "all_hour", - "description": "USGS feed type", - }, - "region": { - "type": "object", - "properties": { - "north": {"type": "number"}, - "south": {"type": "number"}, - "east": {"type": "number"}, - "west": {"type": "number"}, - }, - "required": ["north", "south", "east", "west"], - "description": "Bounding box for earthquake monitoring", - }, - }, - "required": ["region"], - } + diff --git a/src/central/supervisor.py b/src/central/supervisor.py index f78ff41..da00f0e 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -25,11 +25,18 @@ from central.bootstrap_config import get_settings from central.stream_manager import StreamManager import central.adapters -def _discover_adapters() -> dict[str, type[SourceAdapter]]: +def discover_adapters() -> dict[str, type[SourceAdapter]]: """Auto-discover adapter classes from central.adapters package.""" registry: dict[str, type[SourceAdapter]] = {} for module_info in pkgutil.iter_modules(central.adapters.__path__): - module = importlib.import_module(f"central.adapters.{module_info.name}") + try: + module = importlib.import_module(f"central.adapters.{module_info.name}") + except Exception as e: + logger.error( + "Failed to import adapter module", + extra={"module": module_info.name, "error": str(e)}, + ) + continue for attr_name in dir(module): attr = getattr(module, attr_name) if ( @@ -41,9 +48,6 @@ def _discover_adapters() -> dict[str, type[SourceAdapter]]: registry[attr.name] = attr return registry - -_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = _discover_adapters() - CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") # Stream subject mappings @@ -126,6 +130,7 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config + self._adapters = discover_adapters() self._nc: nats.NATS | None = None self._js: JetStreamContext | None = None self._stream_manager: StreamManager | None = None @@ -173,7 +178,7 @@ class Supervisor: def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: """Create an adapter instance based on config name.""" - cls = _ADAPTER_REGISTRY.get(config.name) + cls = self._adapters.get(config.name) if cls is None: raise ValueError(f"Unknown adapter type: {config.name}") return cls( diff --git a/tests/test_supervisor_integration.py b/tests/test_supervisor_integration.py index cf1c65b..e318752 100644 --- a/tests/test_supervisor_integration.py +++ b/tests/test_supervisor_integration.py @@ -200,76 +200,77 @@ class TestEnableDisableEnableIntegration: supervisor._js = mock_nats.jetstream() # Patch NWSAdapter to use our mock - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start supervisor (starts adapter) - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start supervisor (starts adapter) + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) - # Simulate completed poll 5 minutes ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 5 minutes ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is in the past - # and poll immediately. Let's verify by checking the wait time logic. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is in the past + # and poll immediately. Let's verify by checking the wait time logic. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was 5 minutes ago, cadence is 60s - # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago - # wait_time should be 0 (poll immediately) - assert wait_time == 0, ( - f"Expected immediate poll (wait=0), got wait={wait_time}s. " - f"last_poll was {saved_last_poll}, now is {now}" - ) + # last_poll was 5 minutes ago, cadence is 60s + # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago + # wait_time should be 0 (poll immediately) + assert wait_time == 0, ( + f"Expected immediate poll (wait=0), got wait={wait_time}s. " + f"last_poll was {saved_last_poll}, now is {now}" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_enable_gap_shorter_than_cadence( @@ -308,75 +309,76 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter (simulate 20 seconds later, but we're just - # checking the rate limit logic) - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter (simulate 20 seconds later, but we're just + # checking the rate limit logic) + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is still in the future - # and wait until then. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is still in the future + # and wait until then. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was ~10 seconds ago, cadence is 60s - # wait_time should be ~50s (60 - 10 = 50) - assert 45 < wait_time < 55, ( - f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " - f"Rate limit violated: poll would happen before last_poll + cadence" - ) + # last_poll was ~10 seconds ago, cadence is 60s + # wait_time should be ~50s (60 - 10 = 50) + assert 45 < wait_time < 55, ( + f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " + f"Rate limit violated: poll would happen before last_poll + cadence" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_delete_readd_fresh_state( @@ -414,60 +416,61 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # DELETE adapter from DB (remove from config source) - config_source.set_adapter(None, name="nws") - await supervisor._on_config_change("adapters", "nws") + # DELETE adapter from DB (remove from config source) + config_source.set_adapter(None, name="nws") + await supervisor._on_config_change("adapters", "nws") - # Verify adapter fully removed - assert "nws" not in supervisor._adapter_states + # Verify adapter fully removed + assert "nws" not in supervisor._adapter_states - # Re-add adapter with same name - new_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(new_config) - await supervisor._on_config_change("adapters", "nws") + # Re-add adapter with same name + new_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(new_config) + await supervisor._on_config_change("adapters", "nws") - # Verify new adapter started fresh - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - # last_completed_poll should be None (fresh adapter) - assert state.last_completed_poll is None, ( - f"Expected None (fresh adapter), got {state.last_completed_poll}. " - f"Preserved state not cleared on delete." - ) + # Verify new adapter started fresh + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + # last_completed_poll should be None (fresh adapter) + assert state.last_completed_poll is None, ( + f"Expected None (fresh adapter), got {state.last_completed_poll}. " + f"Preserved state not cleared on delete." + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_stop_preserves_state_start_reuses_it( @@ -497,34 +500,35 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) - saved_poll = state.last_completed_poll + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) + saved_poll = state.last_completed_poll - # Stop adapter - await supervisor._stop_adapter("nws") + # Stop adapter + await supervisor._stop_adapter("nws") - # State should still exist - assert "nws" in supervisor._adapter_states - state = supervisor._adapter_states["nws"] - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # State should still exist + assert "nws" in supervisor._adapter_states + state = supervisor._adapter_states["nws"] + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Restart adapter - await supervisor._start_adapter(config) + # Restart adapter + await supervisor._start_adapter(config) - # Should reuse existing state - state = supervisor._adapter_states.get("nws") - assert adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # Should reuse existing state + state = supervisor._adapter_states.get("nws") + assert adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_remove_adapter_clears_state( @@ -554,14 +558,15 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - await supervisor._start_adapter(config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - # Remove adapter - await cleanup_adapter(supervisor, "nws") + # Remove adapter + await cleanup_adapter(supervisor, "nws") - # State should be gone - assert "nws" not in supervisor._adapter_states + # State should be gone + assert "nws" not in supervisor._adapter_states diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py index 24c6f73..690f7da 100644 --- a/tests/test_usgs_quake.py +++ b/tests/test_usgs_quake.py @@ -480,3 +480,122 @@ class TestApplyConfig: assert adapter._feed == "all_day" await adapter.shutdown() + + + +class TestSubjectFor: + """Test subject_for method for all magnitude tiers.""" + + @pytest.mark.asyncio + async def test_subject_minor(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-minor", + adapter="usgs_quake", + category="quake.event.minor", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.minor" + + @pytest.mark.asyncio + async def test_subject_light(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-light", + adapter="usgs_quake", + category="quake.event.light", + time=datetime.now(timezone.utc), + severity=1, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.light" + + @pytest.mark.asyncio + async def test_subject_moderate(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-moderate", + adapter="usgs_quake", + category="quake.event.moderate", + time=datetime.now(timezone.utc), + severity=2, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.moderate" + + @pytest.mark.asyncio + async def test_subject_strong(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-strong", + adapter="usgs_quake", + category="quake.event.strong", + time=datetime.now(timezone.utc), + severity=3, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.strong" + + @pytest.mark.asyncio + async def test_subject_major(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-major", + adapter="usgs_quake", + category="quake.event.major", + time=datetime.now(timezone.utc), + severity=4, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.major" + + @pytest.mark.asyncio + async def test_subject_great(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-great", + adapter="usgs_quake", + category="quake.event.great", + time=datetime.now(timezone.utc), + severity=5, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.great"