refactor(adapters): self-describing adapter pattern with auto-discovery

- Add stream_name, subject_for(), and settings_schema() to SourceAdapter ABC
- Implement all three methods in NWSAdapter, FIRMSAdapter, USGSQuakeAdapter
- Replace manual _ADAPTER_REGISTRY with pkgutil.iter_modules auto-discovery
- Remove subject_for_event from models.py (each adapter owns its subject logic)
- Update supervisor to use adapter.subject_for(event) instead of helper
- Fix quake events going to wrong stream (was publishing to CENTRAL_WX)
- Update test files to use adapter methods

This fixes the quake stream bug where events were published to
central.wx.alert.us.unknown instead of central.quake.event.<tier>.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-18 22:14:12 +00:00
commit 4573bf6ee2
9 changed files with 185 additions and 118 deletions

View file

@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from central.config_models import AdapterConfig
@ -16,9 +16,14 @@ class SourceAdapter(ABC):
Adapters yield Events. The supervisor handles scheduling,
CloudEvents wrapping, publish, and metadata heartbeats.
Class attributes that subclasses must define:
name: Short identifier, e.g. "nws"
stream_name: Target JetStream stream, e.g. "CENTRAL_WX"
"""
name: str # short identifier, e.g. "nws"
stream_name: str # target JetStream stream, e.g. "CENTRAL_WX"
@abstractmethod
async def poll(self) -> AsyncIterator[Event]:
@ -40,6 +45,34 @@ class SourceAdapter(ABC):
"""
...
@abstractmethod
def subject_for(self, event: Event) -> str:
"""
Compute the NATS subject for an event.
Each adapter knows its own subject hierarchy. The supervisor
calls this to determine where to publish each event.
"""
...
@classmethod
@abstractmethod
def settings_schema(cls) -> dict[str, Any]:
"""
Return the JSON-serializable schema for this adapter's settings.
Used by the GUI to render adapter configuration forms.
Returns a dict with keys like:
{
"contact_email": {"type": "str", "default": "", "description": "..."},
"region": {"type": "RegionConfig", "default": None, "description": "..."},
}
Note: If a second nested type beyond RegionConfig appears,
refactor this to use generic recursion for nested schemas.
"""
...
async def startup(self) -> None:
"""Optional lifecycle hook called before first poll."""
pass

View file

@ -53,6 +53,7 @@ class FIRMSAdapter(SourceAdapter):
"""NASA FIRMS fire hotspot adapter."""
name = "firms"
stream_name = "CENTRAL_FIRE"
def __init__(
self,
@ -116,6 +117,35 @@ class FIRMSAdapter(SourceAdapter):
},
)
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for a fire hotspot event.
Subject format: central.fire.hotspot.<satellite>.<confidence>
The category already contains this structure.
"""
return f"central.{event.category}"
@classmethod
def settings_schema(cls) -> dict[str, Any]:
"""Return schema for FIRMS adapter settings."""
return {
"api_key_alias": {
"type": "str",
"default": "firms",
"description": "Alias for the FIRMS API key in config.api_keys",
},
"satellites": {
"type": "list[str]",
"default": ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"],
"description": "List of satellite feeds to poll",
},
"region": {
"type": "RegionConfig",
"default": None,
"description": "Geographic bounding box to filter hotspots",
},
}
async def startup(self) -> None:
"""Initialize HTTP session, dedup tracker, and fetch API key."""
# Fetch API key
@ -417,14 +447,3 @@ class FIRMSAdapter(SourceAdapter):
},
)
def subject_for_fire_hotspot(ev: Event) -> str:
"""Compute the NATS subject for a fire hotspot event.
Subject format: central.fire.hotspot.<satellite>.<confidence>
The category already contains the satellite and confidence info,
so we just prefix with 'central.'.
"""
# category is "fire.hotspot.<satellite>.<confidence>"
return f"central.{ev.category}"

View file

@ -193,6 +193,7 @@ class NWSAdapter(SourceAdapter):
"""National Weather Service alerts adapter."""
name = "nws"
stream_name = "CENTRAL_WX"
def __init__(
self,
@ -234,6 +235,50 @@ class NWSAdapter(SourceAdapter):
},
)
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for a weather alert.
Subject format: central.wx.alert.us.<state>.<type>.<code>
where type is 'county' or 'zone' based on primary_region format.
"""
prefix = "central.wx"
if event.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = event.geo.primary_region
# Parse US-<STATE>-<CODE> format
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"
@classmethod
def settings_schema(cls) -> dict[str, Any]:
"""Return schema for NWS adapter settings."""
return {
"contact_email": {
"type": "str",
"default": "",
"description": "Contact email for NWS API User-Agent header",
},
"region": {
"type": "RegionConfig",
"default": None,
"description": "Geographic bounding box to filter alerts",
},
}
def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool:
"""Check if feature geometry intersects configured region bbox.

View file

@ -64,6 +64,7 @@ class USGSQuakeAdapter(SourceAdapter):
"""USGS Earthquake Hazards Program adapter."""
name = "usgs_quake"
stream_name = "CENTRAL_QUAKE"
def __init__(
self,
@ -398,3 +399,34 @@ class USGSQuakeAdapter(SourceAdapter):
new_count += 1
logger.info("USGS quake yielded events", extra={"count": new_count})
def subject_for(self, event: Event) -> str:
"""Return NATS subject for quake event."""
return f"central.{event.category}"
@classmethod
def settings_schema(cls) -> dict[str, Any]:
"""Return JSON Schema for USGS quake adapter settings."""
return {
"type": "object",
"properties": {
"feed": {
"type": "string",
"enum": ["all_hour", "all_day", "all_week", "all_month"],
"default": "all_hour",
"description": "USGS feed type",
},
"region": {
"type": "object",
"properties": {
"north": {"type": "number"},
"south": {"type": "number"},
"east": {"type": "number"},
"west": {"type": "number"},
},
"required": ["north", "south", "east", "west"],
"description": "Bounding box for earthquake monitoring",
},
},
"required": ["region"],
}

View file

@ -32,48 +32,3 @@ class Event(BaseModel):
data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event) -> str:
"""
Compute the NATS subject for an event based on its category.
Dispatch by category prefix:
- fire.*: returns central.<category> directly
- wx.*: uses weather alert subject logic
Weather alert subjects:
central.wx.alert.us.<state_lower>.county.<county_lower>
or
central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone.
Fire hotspot subjects:
central.fire.hotspot.<satellite>.<confidence>
"""
# Fire events: subject is just central.<category>
if ev.category.startswith("fire."):
return f"central.{ev.category}"
# Weather events: use geo-based subject logic
prefix = "central.wx"
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = ev.geo.primary_region
# Parse US-<STATE>-<CODE> format
# County codes are like "Ada", "Canyon" (names)
# Zone codes start with "Z" like "Z033"
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"

View file

@ -13,24 +13,36 @@ from typing import Any
import nats
from nats.js import JetStreamContext
import importlib
import pkgutil
from central.adapter import SourceAdapter
from central.adapters.nws import NWSAdapter
from central.adapters.firms import FIRMSAdapter
from central.adapters.usgs_quake import USGSQuakeAdapter
from central.cloudevents_wire import wrap_event
from central.config_models import AdapterConfig
from central.config_source import ConfigSource, DbConfigSource
from central.config_store import ConfigStore
from central.bootstrap_config import get_settings
from central.models import subject_for_event
from central.stream_manager import StreamManager
import central.adapters
# Adapter registry - add new adapters here
_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = {
"nws": NWSAdapter,
"firms": FIRMSAdapter,
"usgs_quake": USGSQuakeAdapter,
}
def _discover_adapters() -> dict[str, type[SourceAdapter]]:
"""Auto-discover adapter classes from central.adapters package."""
registry: dict[str, type[SourceAdapter]] = {}
for module_info in pkgutil.iter_modules(central.adapters.__path__):
module = importlib.import_module(f"central.adapters.{module_info.name}")
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, SourceAdapter)
and attr is not SourceAdapter
and hasattr(attr, "name")
):
registry[attr.name] = attr
return registry
_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = _discover_adapters()
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
@ -232,7 +244,7 @@ class Supervisor:
# Build CloudEvent (uses defaults if no config provided)
envelope, msg_id = wrap_event(event, self._cloudevents_config)
subject = subject_for_event(event)
subject = state.adapter.subject_for(event)
# Publish
await self._publish_event(subject, envelope, msg_id)

View file

@ -10,7 +10,6 @@ from central.adapters.firms import (
FIRMSAdapter,
CONFIDENCE_MAP,
SATELLITE_SHORT,
subject_for_fire_hotspot,
)
from central.config_models import AdapterConfig, RegionConfig
from central.models import Event, Geo
@ -285,7 +284,14 @@ class TestDeduplication:
class TestSubjectGeneration:
"""Test subject generation for fire hotspots."""
def test_subject_format(self):
@pytest.mark.asyncio
async def test_subject_format(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test",
adapter="firms",
@ -296,10 +302,17 @@ class TestSubjectGeneration:
data={},
)
subject = subject_for_fire_hotspot(event)
subject = adapter.subject_for(event)
assert subject == "central.fire.hotspot.viirs_snpp.high"
def test_subject_nominal_confidence(self):
@pytest.mark.asyncio
async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test",
adapter="firms",
@ -310,7 +323,7 @@ class TestSubjectGeneration:
data={},
)
subject = subject_for_fire_hotspot(event)
subject = adapter.subject_for(event)
assert subject == "central.fire.hotspot.viirs_noaa20.nominal"

View file

@ -4,7 +4,7 @@ from datetime import datetime, timezone
import pytest
from central.models import Event, Geo, subject_for_event
from central.models import Event, Geo
from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config
from central.cloudevents_wire import wrap_event
@ -57,47 +57,6 @@ def sample_config() -> Config:
)
class TestSubjectForEvent:
"""Tests for subject_for_event helper."""
def test_county_subject(self, sample_event: Event) -> None:
"""County codes produce county subject."""
subject = subject_for_event(sample_event)
assert subject == "central.wx.alert.us.id.county.ada"
def test_zone_subject(self, sample_geo: Geo) -> None:
"""Zone codes produce zone subject."""
geo = Geo(
centroid=sample_geo.centroid,
bbox=sample_geo.bbox,
regions=["US-ID-Z033"],
primary_region="US-ID-Z033",
)
event = Event(
id="test-zone",
adapter="nws",
category="wx.alert.winter_storm_warning",
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
geo=geo,
data={},
)
subject = subject_for_event(event)
assert subject == "central.wx.alert.us.id.zone.z033"
def test_unknown_subject(self, sample_event: Event) -> None:
"""Missing primary_region produces unknown subject."""
geo = Geo(regions=[], primary_region=None)
event = Event(
id="test-unknown",
adapter="nws",
category="wx.alert.test",
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
geo=geo,
data={},
)
subject = subject_for_event(event)
assert subject == "central.wx.alert.us.unknown"
class TestCloudEventsWire:
"""Tests for CloudEvents wire format."""

View file

@ -17,7 +17,6 @@ from central.adapters.nws import (
SEVERITY_MAP,
)
from central.config_models import AdapterConfig
from central.models import subject_for_event
# Sample NWS GeoJSON features for testing
@ -272,7 +271,7 @@ class TestSubjectDerivation:
def test_county_subject(self, adapter: NWSAdapter) -> None:
event = adapter._normalize_feature(SAMPLE_FEATURE_ID)
assert event is not None
subject = subject_for_event(event)
subject = adapter.subject_for(event)
# Primary region should be alphabetically first
# Could be county or zone depending on sort order
assert subject.startswith("central.wx.alert.us.id.")
@ -294,7 +293,7 @@ class TestSubjectDerivation:
}
event = adapter._normalize_feature(feature)
assert event is not None
subject = subject_for_event(event)
subject = adapter.subject_for(event)
assert "zone" in subject