mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
fix(adapters): complete self-describing adapter attributes
- Replace settings_schema classmethod with Pydantic model class attribute - Add display_name, description, requires_api_key, wizard_order, default_cadence_s - Remove stream_name from adapters (JetStream routes by subject filter) - Define NWSSettings, FIRMSSettings, USGSQuakeSettings Pydantic models - Make discover_adapters() public with error handling - Move adapter registry to Supervisor instance (self._adapters) - Add subject_for tests for all 6 quake magnitude tiers - Fix test_supervisor_integration to use injected mock adapters Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
4573bf6ee2
commit
4ee3d8bd14
7 changed files with 418 additions and 315 deletions
|
|
@ -2,7 +2,9 @@
|
|||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from central.config_models import AdapterConfig
|
||||
|
|
@ -19,11 +21,21 @@ class SourceAdapter(ABC):
|
|||
|
||||
Class attributes that subclasses must define:
|
||||
name: Short identifier, e.g. "nws"
|
||||
stream_name: Target JetStream stream, e.g. "CENTRAL_WX"
|
||||
display_name: Human-readable name for GUI
|
||||
description: Short description of the adapter
|
||||
settings_schema: Pydantic model class for adapter settings
|
||||
requires_api_key: Key alias if API key required, else None
|
||||
wizard_order: Order in setup wizard (None = not in wizard)
|
||||
default_cadence_s: Default polling interval in seconds
|
||||
"""
|
||||
|
||||
name: str # short identifier, e.g. "nws"
|
||||
stream_name: str # target JetStream stream, e.g. "CENTRAL_WX"
|
||||
name: str
|
||||
display_name: str
|
||||
description: str
|
||||
settings_schema: type[BaseModel]
|
||||
requires_api_key: str | None = None
|
||||
wizard_order: int | None = None
|
||||
default_cadence_s: int
|
||||
|
||||
@abstractmethod
|
||||
async def poll(self) -> AsyncIterator[Event]:
|
||||
|
|
@ -55,24 +67,6 @@ class SourceAdapter(ABC):
|
|||
"""
|
||||
...
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def settings_schema(cls) -> dict[str, Any]:
|
||||
"""
|
||||
Return the JSON-serializable schema for this adapter's settings.
|
||||
|
||||
Used by the GUI to render adapter configuration forms.
|
||||
Returns a dict with keys like:
|
||||
{
|
||||
"contact_email": {"type": "str", "default": "", "description": "..."},
|
||||
"region": {"type": "RegionConfig", "default": None, "description": "..."},
|
||||
}
|
||||
|
||||
Note: If a second nested type beyond RegionConfig appears,
|
||||
refactor this to use generic recursion for nested schemas.
|
||||
"""
|
||||
...
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Optional lifecycle hook called before first poll."""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ from tenacity import (
|
|||
)
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -49,11 +51,23 @@ SEVERITY_MAP = {
|
|||
}
|
||||
|
||||
|
||||
class FIRMSSettings(BaseModel):
|
||||
"""Settings schema for FIRMS adapter."""
|
||||
api_key_alias: str = "firms"
|
||||
satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"]
|
||||
region: RegionConfig | None = None
|
||||
|
||||
|
||||
class FIRMSAdapter(SourceAdapter):
|
||||
"""NASA FIRMS fire hotspot adapter."""
|
||||
|
||||
name = "firms"
|
||||
stream_name = "CENTRAL_FIRE"
|
||||
display_name = "NASA FIRMS Fire Hotspots"
|
||||
description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS."
|
||||
settings_schema = FIRMSSettings
|
||||
requires_api_key = "firms"
|
||||
wizard_order = 2
|
||||
default_cadence_s = 300
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -125,26 +139,6 @@ class FIRMSAdapter(SourceAdapter):
|
|||
"""
|
||||
return f"central.{event.category}"
|
||||
|
||||
@classmethod
|
||||
def settings_schema(cls) -> dict[str, Any]:
|
||||
"""Return schema for FIRMS adapter settings."""
|
||||
return {
|
||||
"api_key_alias": {
|
||||
"type": "str",
|
||||
"default": "firms",
|
||||
"description": "Alias for the FIRMS API key in config.api_keys",
|
||||
},
|
||||
"satellites": {
|
||||
"type": "list[str]",
|
||||
"default": ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"],
|
||||
"description": "List of satellite feeds to poll",
|
||||
},
|
||||
"region": {
|
||||
"type": "RegionConfig",
|
||||
"default": None,
|
||||
"description": "Geographic bounding box to filter hotspots",
|
||||
},
|
||||
}
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Initialize HTTP session, dedup tracker, and fetch API key."""
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ from tenacity import (
|
|||
|
||||
from central import __version__
|
||||
from central.adapter import SourceAdapter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -189,11 +191,22 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]:
|
|||
return sorted(regions)
|
||||
|
||||
|
||||
class NWSSettings(BaseModel):
|
||||
"""Settings schema for NWS adapter."""
|
||||
contact_email: str = ""
|
||||
region: RegionConfig | None = None
|
||||
|
||||
|
||||
class NWSAdapter(SourceAdapter):
|
||||
"""National Weather Service alerts adapter."""
|
||||
|
||||
name = "nws"
|
||||
stream_name = "CENTRAL_WX"
|
||||
display_name = "NWS Weather Alerts"
|
||||
description = "National Weather Service active alerts via api.weather.gov."
|
||||
settings_schema = NWSSettings
|
||||
requires_api_key = None
|
||||
wizard_order = 1
|
||||
default_cadence_s = 60
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -263,21 +276,6 @@ class NWSAdapter(SourceAdapter):
|
|||
# County name
|
||||
return f"{prefix}.alert.us.{state}.county.{code.lower()}"
|
||||
|
||||
@classmethod
|
||||
def settings_schema(cls) -> dict[str, Any]:
|
||||
"""Return schema for NWS adapter settings."""
|
||||
return {
|
||||
"contact_email": {
|
||||
"type": "str",
|
||||
"default": "",
|
||||
"description": "Contact email for NWS API User-Agent header",
|
||||
},
|
||||
"region": {
|
||||
"type": "RegionConfig",
|
||||
"default": None,
|
||||
"description": "Geographic bounding box to filter alerts",
|
||||
},
|
||||
}
|
||||
|
||||
def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool:
|
||||
"""Check if feature geometry intersects configured region bbox.
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ from tenacity import (
|
|||
)
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -60,11 +62,22 @@ def magnitude_to_severity(mag: float) -> int:
|
|||
return 5
|
||||
|
||||
|
||||
class USGSQuakeSettings(BaseModel):
|
||||
"""Settings schema for USGS quake adapter."""
|
||||
feed: str = "all_hour"
|
||||
region: RegionConfig | None = None
|
||||
|
||||
|
||||
class USGSQuakeAdapter(SourceAdapter):
|
||||
"""USGS Earthquake Hazards Program adapter."""
|
||||
|
||||
name = "usgs_quake"
|
||||
stream_name = "CENTRAL_QUAKE"
|
||||
display_name = "USGS Earthquakes"
|
||||
description = "USGS earthquake feed (configurable window)."
|
||||
settings_schema = USGSQuakeSettings
|
||||
requires_api_key = None
|
||||
wizard_order = 3
|
||||
default_cadence_s = 60
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -404,29 +417,4 @@ class USGSQuakeAdapter(SourceAdapter):
|
|||
"""Return NATS subject for quake event."""
|
||||
return f"central.{event.category}"
|
||||
|
||||
@classmethod
|
||||
def settings_schema(cls) -> dict[str, Any]:
|
||||
"""Return JSON Schema for USGS quake adapter settings."""
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"feed": {
|
||||
"type": "string",
|
||||
"enum": ["all_hour", "all_day", "all_week", "all_month"],
|
||||
"default": "all_hour",
|
||||
"description": "USGS feed type",
|
||||
},
|
||||
"region": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"north": {"type": "number"},
|
||||
"south": {"type": "number"},
|
||||
"east": {"type": "number"},
|
||||
"west": {"type": "number"},
|
||||
},
|
||||
"required": ["north", "south", "east", "west"],
|
||||
"description": "Bounding box for earthquake monitoring",
|
||||
},
|
||||
},
|
||||
"required": ["region"],
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,11 +25,18 @@ from central.bootstrap_config import get_settings
|
|||
from central.stream_manager import StreamManager
|
||||
import central.adapters
|
||||
|
||||
def _discover_adapters() -> dict[str, type[SourceAdapter]]:
|
||||
def discover_adapters() -> dict[str, type[SourceAdapter]]:
|
||||
"""Auto-discover adapter classes from central.adapters package."""
|
||||
registry: dict[str, type[SourceAdapter]] = {}
|
||||
for module_info in pkgutil.iter_modules(central.adapters.__path__):
|
||||
module = importlib.import_module(f"central.adapters.{module_info.name}")
|
||||
try:
|
||||
module = importlib.import_module(f"central.adapters.{module_info.name}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to import adapter module",
|
||||
extra={"module": module_info.name, "error": str(e)},
|
||||
)
|
||||
continue
|
||||
for attr_name in dir(module):
|
||||
attr = getattr(module, attr_name)
|
||||
if (
|
||||
|
|
@ -41,9 +48,6 @@ def _discover_adapters() -> dict[str, type[SourceAdapter]]:
|
|||
registry[attr.name] = attr
|
||||
return registry
|
||||
|
||||
|
||||
_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = _discover_adapters()
|
||||
|
||||
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
|
||||
|
||||
# Stream subject mappings
|
||||
|
|
@ -126,6 +130,7 @@ class Supervisor:
|
|||
self._config_store = config_store
|
||||
self._nats_url = nats_url
|
||||
self._cloudevents_config = cloudevents_config
|
||||
self._adapters = discover_adapters()
|
||||
self._nc: nats.NATS | None = None
|
||||
self._js: JetStreamContext | None = None
|
||||
self._stream_manager: StreamManager | None = None
|
||||
|
|
@ -173,7 +178,7 @@ class Supervisor:
|
|||
|
||||
def _create_adapter(self, config: AdapterConfig) -> SourceAdapter:
|
||||
"""Create an adapter instance based on config name."""
|
||||
cls = _ADAPTER_REGISTRY.get(config.name)
|
||||
cls = self._adapters.get(config.name)
|
||||
if cls is None:
|
||||
raise ValueError(f"Unknown adapter type: {config.name}")
|
||||
return cls(
|
||||
|
|
|
|||
|
|
@ -200,76 +200,77 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
# Patch NWSAdapter to use our mock
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start supervisor (starts adapter)
|
||||
await supervisor._start_adapter(initial_config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start supervisor (starts adapter)
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
|
||||
# Simulate completed poll 5 minutes ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
# Simulate completed poll 5 minutes ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# Re-enable adapter
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Re-enable adapter
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# The loop should detect that last_poll + cadence is in the past
|
||||
# and poll immediately. Let's verify by checking the wait time logic.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
# The loop should detect that last_poll + cadence is in the past
|
||||
# and poll immediately. Let's verify by checking the wait time logic.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
|
||||
# last_poll was 5 minutes ago, cadence is 60s
|
||||
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
|
||||
# wait_time should be 0 (poll immediately)
|
||||
assert wait_time == 0, (
|
||||
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
|
||||
f"last_poll was {saved_last_poll}, now is {now}"
|
||||
)
|
||||
# last_poll was 5 minutes ago, cadence is 60s
|
||||
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
|
||||
# wait_time should be 0 (poll immediately)
|
||||
assert wait_time == 0, (
|
||||
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
|
||||
f"last_poll was {saved_last_poll}, now is {now}"
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_enable_gap_shorter_than_cadence(
|
||||
|
|
@ -308,75 +309,76 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# Re-enable adapter (simulate 20 seconds later, but we're just
|
||||
# checking the rate limit logic)
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Re-enable adapter (simulate 20 seconds later, but we're just
|
||||
# checking the rate limit logic)
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# The loop should detect that last_poll + cadence is still in the future
|
||||
# and wait until then.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
# The loop should detect that last_poll + cadence is still in the future
|
||||
# and wait until then.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
|
||||
# last_poll was ~10 seconds ago, cadence is 60s
|
||||
# wait_time should be ~50s (60 - 10 = 50)
|
||||
assert 45 < wait_time < 55, (
|
||||
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
|
||||
f"Rate limit violated: poll would happen before last_poll + cadence"
|
||||
)
|
||||
# last_poll was ~10 seconds ago, cadence is 60s
|
||||
# wait_time should be ~50s (60 - 10 = 50)
|
||||
assert 45 < wait_time < 55, (
|
||||
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
|
||||
f"Rate limit violated: poll would happen before last_poll + cadence"
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_delete_readd_fresh_state(
|
||||
|
|
@ -414,60 +416,61 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# DELETE adapter from DB (remove from config source)
|
||||
config_source.set_adapter(None, name="nws")
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# DELETE adapter from DB (remove from config source)
|
||||
config_source.set_adapter(None, name="nws")
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify adapter fully removed
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
# Verify adapter fully removed
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
|
||||
# Re-add adapter with same name
|
||||
new_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(new_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Re-add adapter with same name
|
||||
new_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(new_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify new adapter started fresh
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
# last_completed_poll should be None (fresh adapter)
|
||||
assert state.last_completed_poll is None, (
|
||||
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
|
||||
f"Preserved state not cleared on delete."
|
||||
)
|
||||
# Verify new adapter started fresh
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
# last_completed_poll should be None (fresh adapter)
|
||||
assert state.last_completed_poll is None, (
|
||||
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
|
||||
f"Preserved state not cleared on delete."
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_preserves_state_start_reuses_it(
|
||||
|
|
@ -497,34 +500,35 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
|
||||
saved_poll = state.last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
|
||||
saved_poll = state.last_completed_poll
|
||||
|
||||
# Stop adapter
|
||||
await supervisor._stop_adapter("nws")
|
||||
# Stop adapter
|
||||
await supervisor._stop_adapter("nws")
|
||||
|
||||
# State should still exist
|
||||
assert "nws" in supervisor._adapter_states
|
||||
state = supervisor._adapter_states["nws"]
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
# State should still exist
|
||||
assert "nws" in supervisor._adapter_states
|
||||
state = supervisor._adapter_states["nws"]
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
|
||||
# Restart adapter
|
||||
await supervisor._start_adapter(config)
|
||||
# Restart adapter
|
||||
await supervisor._start_adapter(config)
|
||||
|
||||
# Should reuse existing state
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
# Should reuse existing state
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_remove_adapter_clears_state(
|
||||
|
|
@ -554,14 +558,15 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
await supervisor._start_adapter(config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
await supervisor._start_adapter(config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc)
|
||||
|
||||
# Remove adapter
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Remove adapter
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
# State should be gone
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
# State should be gone
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
|
|
|
|||
|
|
@ -480,3 +480,122 @@ class TestApplyConfig:
|
|||
assert adapter._feed == "all_day"
|
||||
|
||||
await adapter.shutdown()
|
||||
|
||||
|
||||
|
||||
class TestSubjectFor:
|
||||
"""Test subject_for method for all magnitude tiers."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_minor(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-minor",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.minor",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=0,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.minor"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_light(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-light",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.light",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=1,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.light"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_moderate(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-moderate",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.moderate",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=2,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.moderate"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_strong(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-strong",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.strong",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=3,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.strong"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_major(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-major",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.major",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=4,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.major"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_great(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-great",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.great",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=5,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.great"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue