diff --git a/src/central/adapter.py b/src/central/adapter.py index 2037eef..f7659fe 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from central.config_models import AdapterConfig @@ -16,9 +16,14 @@ class SourceAdapter(ABC): Adapters yield Events. The supervisor handles scheduling, CloudEvents wrapping, publish, and metadata heartbeats. + + Class attributes that subclasses must define: + name: Short identifier, e.g. "nws" + stream_name: Target JetStream stream, e.g. "CENTRAL_WX" """ name: str # short identifier, e.g. "nws" + stream_name: str # target JetStream stream, e.g. "CENTRAL_WX" @abstractmethod async def poll(self) -> AsyncIterator[Event]: @@ -40,6 +45,34 @@ class SourceAdapter(ABC): """ ... + @abstractmethod + def subject_for(self, event: Event) -> str: + """ + Compute the NATS subject for an event. + + Each adapter knows its own subject hierarchy. The supervisor + calls this to determine where to publish each event. + """ + ... + + @classmethod + @abstractmethod + def settings_schema(cls) -> dict[str, Any]: + """ + Return the JSON-serializable schema for this adapter's settings. + + Used by the GUI to render adapter configuration forms. + Returns a dict with keys like: + { + "contact_email": {"type": "str", "default": "", "description": "..."}, + "region": {"type": "RegionConfig", "default": None, "description": "..."}, + } + + Note: If a second nested type beyond RegionConfig appears, + refactor this to use generic recursion for nested schemas. + """ + ... + async def startup(self) -> None: """Optional lifecycle hook called before first poll.""" pass diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 3f746fa..11ba054 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -53,6 +53,7 @@ class FIRMSAdapter(SourceAdapter): """NASA FIRMS fire hotspot adapter.""" name = "firms" + stream_name = "CENTRAL_FIRE" def __init__( self, @@ -116,6 +117,35 @@ class FIRMSAdapter(SourceAdapter): }, ) + def subject_for(self, event: Event) -> str: + """Compute NATS subject for a fire hotspot event. + + Subject format: central.fire.hotspot.. + The category already contains this structure. + """ + return f"central.{event.category}" + + @classmethod + def settings_schema(cls) -> dict[str, Any]: + """Return schema for FIRMS adapter settings.""" + return { + "api_key_alias": { + "type": "str", + "default": "firms", + "description": "Alias for the FIRMS API key in config.api_keys", + }, + "satellites": { + "type": "list[str]", + "default": ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"], + "description": "List of satellite feeds to poll", + }, + "region": { + "type": "RegionConfig", + "default": None, + "description": "Geographic bounding box to filter hotspots", + }, + } + async def startup(self) -> None: """Initialize HTTP session, dedup tracker, and fetch API key.""" # Fetch API key @@ -417,14 +447,3 @@ class FIRMSAdapter(SourceAdapter): }, ) - -def subject_for_fire_hotspot(ev: Event) -> str: - """Compute the NATS subject for a fire hotspot event. - - Subject format: central.fire.hotspot.. - - The category already contains the satellite and confidence info, - so we just prefix with 'central.'. - """ - # category is "fire.hotspot.." - return f"central.{ev.category}" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 129584f..ce2bf73 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -193,6 +193,7 @@ class NWSAdapter(SourceAdapter): """National Weather Service alerts adapter.""" name = "nws" + stream_name = "CENTRAL_WX" def __init__( self, @@ -234,6 +235,50 @@ class NWSAdapter(SourceAdapter): }, ) + def subject_for(self, event: Event) -> str: + """Compute NATS subject for a weather alert. + + Subject format: central.wx.alert.us... + where type is 'county' or 'zone' based on primary_region format. + """ + prefix = "central.wx" + + if event.geo.primary_region is None: + return f"{prefix}.alert.us.unknown" + + region = event.geo.primary_region + + # Parse US-- format + parts = region.split("-") + if len(parts) < 3 or parts[0] != "US": + return f"{prefix}.alert.us.unknown" + + state = parts[1].lower() + code = "-".join(parts[2:]) # Handle multi-part names + + if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): + # Zone code like Z033 + return f"{prefix}.alert.us.{state}.zone.{code.lower()}" + else: + # County name + return f"{prefix}.alert.us.{state}.county.{code.lower()}" + + @classmethod + def settings_schema(cls) -> dict[str, Any]: + """Return schema for NWS adapter settings.""" + return { + "contact_email": { + "type": "str", + "default": "", + "description": "Contact email for NWS API User-Agent header", + }, + "region": { + "type": "RegionConfig", + "default": None, + "description": "Geographic bounding box to filter alerts", + }, + } + def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: """Check if feature geometry intersects configured region bbox. diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 601c52b..2f7e4c0 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -64,6 +64,7 @@ class USGSQuakeAdapter(SourceAdapter): """USGS Earthquake Hazards Program adapter.""" name = "usgs_quake" + stream_name = "CENTRAL_QUAKE" def __init__( self, @@ -398,3 +399,34 @@ class USGSQuakeAdapter(SourceAdapter): new_count += 1 logger.info("USGS quake yielded events", extra={"count": new_count}) + + def subject_for(self, event: Event) -> str: + """Return NATS subject for quake event.""" + return f"central.{event.category}" + + @classmethod + def settings_schema(cls) -> dict[str, Any]: + """Return JSON Schema for USGS quake adapter settings.""" + return { + "type": "object", + "properties": { + "feed": { + "type": "string", + "enum": ["all_hour", "all_day", "all_week", "all_month"], + "default": "all_hour", + "description": "USGS feed type", + }, + "region": { + "type": "object", + "properties": { + "north": {"type": "number"}, + "south": {"type": "number"}, + "east": {"type": "number"}, + "west": {"type": "number"}, + }, + "required": ["north", "south", "east", "west"], + "description": "Bounding box for earthquake monitoring", + }, + }, + "required": ["region"], + } diff --git a/src/central/models.py b/src/central/models.py index 17145ad..c7a2159 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -32,48 +32,3 @@ class Event(BaseModel): data: dict[str, Any] # adapter-specific payload -def subject_for_event(ev: Event) -> str: - """ - Compute the NATS subject for an event based on its category. - - Dispatch by category prefix: - - fire.*: returns central. directly - - wx.*: uses weather alert subject logic - - Weather alert subjects: - central.wx.alert.us..county. - or - central.wx.alert.us..zone. - based on whether the primary_region encodes a county or a zone. - - Fire hotspot subjects: - central.fire.hotspot.. - """ - # Fire events: subject is just central. - if ev.category.startswith("fire."): - return f"central.{ev.category}" - - # Weather events: use geo-based subject logic - prefix = "central.wx" - - if ev.geo.primary_region is None: - return f"{prefix}.alert.us.unknown" - - region = ev.geo.primary_region - - # Parse US-- format - # County codes are like "Ada", "Canyon" (names) - # Zone codes start with "Z" like "Z033" - parts = region.split("-") - if len(parts) < 3 or parts[0] != "US": - return f"{prefix}.alert.us.unknown" - - state = parts[1].lower() - code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" - - if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): - # Zone code like Z033 - return f"{prefix}.alert.us.{state}.zone.{code.lower()}" - else: - # County name - return f"{prefix}.alert.us.{state}.county.{code.lower()}" diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 652ce44..f78ff41 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -13,24 +13,36 @@ from typing import Any import nats from nats.js import JetStreamContext +import importlib +import pkgutil + from central.adapter import SourceAdapter -from central.adapters.nws import NWSAdapter -from central.adapters.firms import FIRMSAdapter -from central.adapters.usgs_quake import USGSQuakeAdapter from central.cloudevents_wire import wrap_event from central.config_models import AdapterConfig from central.config_source import ConfigSource, DbConfigSource from central.config_store import ConfigStore from central.bootstrap_config import get_settings -from central.models import subject_for_event from central.stream_manager import StreamManager +import central.adapters -# Adapter registry - add new adapters here -_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { - "nws": NWSAdapter, - "firms": FIRMSAdapter, - "usgs_quake": USGSQuakeAdapter, -} +def _discover_adapters() -> dict[str, type[SourceAdapter]]: + """Auto-discover adapter classes from central.adapters package.""" + registry: dict[str, type[SourceAdapter]] = {} + for module_info in pkgutil.iter_modules(central.adapters.__path__): + module = importlib.import_module(f"central.adapters.{module_info.name}") + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, SourceAdapter) + and attr is not SourceAdapter + and hasattr(attr, "name") + ): + registry[attr.name] = attr + return registry + + +_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = _discover_adapters() CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") @@ -232,7 +244,7 @@ class Supervisor: # Build CloudEvent (uses defaults if no config provided) envelope, msg_id = wrap_event(event, self._cloudevents_config) - subject = subject_for_event(event) + subject = state.adapter.subject_for(event) # Publish await self._publish_event(subject, envelope, msg_id) diff --git a/tests/test_firms.py b/tests/test_firms.py index 9e51331..bfe629a 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -10,7 +10,6 @@ from central.adapters.firms import ( FIRMSAdapter, CONFIDENCE_MAP, SATELLITE_SHORT, - subject_for_fire_hotspot, ) from central.config_models import AdapterConfig, RegionConfig from central.models import Event, Geo @@ -285,7 +284,14 @@ class TestDeduplication: class TestSubjectGeneration: """Test subject generation for fire hotspots.""" - def test_subject_format(self): + @pytest.mark.asyncio + async def test_subject_format(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) event = Event( id="test", adapter="firms", @@ -296,10 +302,17 @@ class TestSubjectGeneration: data={}, ) - subject = subject_for_fire_hotspot(event) + subject = adapter.subject_for(event) assert subject == "central.fire.hotspot.viirs_snpp.high" - def test_subject_nominal_confidence(self): + @pytest.mark.asyncio + async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) event = Event( id="test", adapter="firms", @@ -310,7 +323,7 @@ class TestSubjectGeneration: data={}, ) - subject = subject_for_fire_hotspot(event) + subject = adapter.subject_for(event) assert subject == "central.fire.hotspot.viirs_noaa20.nominal" diff --git a/tests/test_models.py b/tests/test_models.py index c010f39..7ed5c54 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone import pytest -from central.models import Event, Geo, subject_for_event +from central.models import Event, Geo from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config from central.cloudevents_wire import wrap_event @@ -57,47 +57,6 @@ def sample_config() -> Config: ) -class TestSubjectForEvent: - """Tests for subject_for_event helper.""" - - def test_county_subject(self, sample_event: Event) -> None: - """County codes produce county subject.""" - subject = subject_for_event(sample_event) - assert subject == "central.wx.alert.us.id.county.ada" - - def test_zone_subject(self, sample_geo: Geo) -> None: - """Zone codes produce zone subject.""" - geo = Geo( - centroid=sample_geo.centroid, - bbox=sample_geo.bbox, - regions=["US-ID-Z033"], - primary_region="US-ID-Z033", - ) - event = Event( - id="test-zone", - adapter="nws", - category="wx.alert.winter_storm_warning", - time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), - geo=geo, - data={}, - ) - subject = subject_for_event(event) - assert subject == "central.wx.alert.us.id.zone.z033" - - def test_unknown_subject(self, sample_event: Event) -> None: - """Missing primary_region produces unknown subject.""" - geo = Geo(regions=[], primary_region=None) - event = Event( - id="test-unknown", - adapter="nws", - category="wx.alert.test", - time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), - geo=geo, - data={}, - ) - subject = subject_for_event(event) - assert subject == "central.wx.alert.us.unknown" - class TestCloudEventsWire: """Tests for CloudEvents wire format.""" diff --git a/tests/test_nws_normalization.py b/tests/test_nws_normalization.py index 53a7e49..706ce1d 100644 --- a/tests/test_nws_normalization.py +++ b/tests/test_nws_normalization.py @@ -17,7 +17,6 @@ from central.adapters.nws import ( SEVERITY_MAP, ) from central.config_models import AdapterConfig -from central.models import subject_for_event # Sample NWS GeoJSON features for testing @@ -272,7 +271,7 @@ class TestSubjectDerivation: def test_county_subject(self, adapter: NWSAdapter) -> None: event = adapter._normalize_feature(SAMPLE_FEATURE_ID) assert event is not None - subject = subject_for_event(event) + subject = adapter.subject_for(event) # Primary region should be alphabetically first # Could be county or zone depending on sort order assert subject.startswith("central.wx.alert.us.id.") @@ -294,7 +293,7 @@ class TestSubjectDerivation: } event = adapter._normalize_feature(feature) assert event is not None - subject = subject_for_event(event) + subject = adapter.subject_for(event) assert "zone" in subject