diff --git a/docs/PHASE-1B-NOTES.md b/docs/PHASE-1B-NOTES.md index b7cecc8..878c99f 100644 --- a/docs/PHASE-1B-NOTES.md +++ b/docs/PHASE-1B-NOTES.md @@ -53,3 +53,41 @@ Per stream, display: - Supervisor receives NOTIFY and hot-reloads - No service restarts required for config changes - Stream retention changes apply within 5 seconds + +## FIRMS Adapter Configuration + +### MAP_KEY Management +- Display key alias () and timestamp +- Allow operator to rotate key value (re-encrypt new key) +- Show warning if key not present (polling disabled) +- No key value display (security) + +### Satellite Selection +- Toggle individual satellites: VIIRS_SNPP, VIIRS_NOAA20, VIIRS_NOAA21 +- Stored in array +- Changes hot-reload to adapter without restart + +### SNPP End-of-Life Notice +- NASA timeline: SNPP mission ends ~October 2026 +- GUI should display warning banner when SNPP is enabled and date approaches +- Recommend adding NOAA-21 to satellites list before SNPP EOL +- After EOL, adapter will fail to fetch SNPP data (404); GUI should surface this + +## FIRMS Adapter Configuration + +### MAP_KEY Management +- Display key alias (firms) and last_used_at timestamp +- Allow operator to rotate key value (re-encrypt new key) +- Show warning if key not present (polling disabled) +- No key value display (security) + +### Satellite Selection +- Toggle individual satellites: VIIRS_SNPP, VIIRS_NOAA20, VIIRS_NOAA21 +- Stored in config.adapters.settings.satellites array +- Changes hot-reload to adapter without restart + +### SNPP End-of-Life Notice +- NASA timeline: SNPP mission ends ~October 2026 +- GUI should display warning banner when SNPP is enabled and date approaches +- Recommend adding NOAA-21 to satellites list before SNPP EOL +- After EOL, adapter will fail to fetch SNPP data (404); GUI should surface this diff --git a/sql/migrations/005_add_firms_adapter.sql b/sql/migrations/005_add_firms_adapter.sql new file mode 100644 index 0000000..65b8791 --- /dev/null +++ b/sql/migrations/005_add_firms_adapter.sql @@ -0,0 +1,27 @@ +-- Migration: 005_add_firms_adapter +-- Seeds FIRMS adapter configuration and CENTRAL_FIRE stream. + +-- Seed FIRMS adapter row +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'firms', + true, + 300, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.5, + 'south', 31.0, + 'east', -102.0, + 'west', -124.5 + ), + 'api_key_alias', 'firms', + 'satellites', jsonb_build_array( + 'VIIRS_SNPP_NRT', + 'VIIRS_NOAA20_NRT' + ) + ) +); + +-- Seed CENTRAL_FIRE stream row +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_FIRE', 604800, 1073741824); diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py new file mode 100644 index 0000000..c882a64 --- /dev/null +++ b/src/central/adapters/firms.py @@ -0,0 +1,430 @@ +"""FIRMS (Fire Information for Resource Management System) adapter.""" + +import csv +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from io import StringIO +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential_jitter, + retry_if_exception_type, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +# FIRMS API base URL +FIRMS_API_BASE = "https://firms.modaps.eosdis.nasa.gov/api/area/csv" + +# Satellite name mapping +SATELLITE_SHORT = { + "VIIRS_SNPP_NRT": "viirs_snpp", + "VIIRS_NOAA20_NRT": "viirs_noaa20", + "VIIRS_NOAA21_NRT": "viirs_noaa21", +} + +# Confidence mapping +CONFIDENCE_MAP = { + "l": "low", + "n": "nominal", + "h": "high", +} + +# Severity mapping (confidence -> severity level) +SEVERITY_MAP = { + "high": 3, + "nominal": 2, + "low": 1, +} + + +class FIRMSAdapter(SourceAdapter): + """NASA FIRMS fire hotspot adapter.""" + + name = "firms" + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._api_key: str | None = None + + # Extract settings from config + self._api_key_alias: str = config.settings.get("api_key_alias", "firms") + self._satellites: list[str] = config.settings.get( + "satellites", ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + ) + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + else: + self.region = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + old_alias = self._api_key_alias + + # Update settings + self._api_key_alias = new_config.settings.get("api_key_alias", "firms") + self._satellites = new_config.settings.get( + "satellites", ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + ) + + # Update region + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + else: + self.region = None + + # If API key alias changed, re-fetch the key + if self._api_key_alias != old_alias: + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + if self._api_key: + logger.info("FIRMS API key reloaded", extra={"alias": self._api_key_alias}) + else: + logger.warning( + "FIRMS API key not found after alias change", + extra={"alias": self._api_key_alias}, + ) + + logger.info( + "FIRMS config applied", + extra={ + "region": region_dict, + "satellites": self._satellites, + "api_key_alias": self._api_key_alias, + }, + ) + + async def startup(self) -> None: + """Initialize HTTP session, dedup tracker, and fetch API key.""" + # Fetch API key + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + if not self._api_key: + logger.error( + "FIRMS API key not found - polling will be skipped until key is set", + extra={"alias": self._api_key_alias}, + ) + + # Initialize HTTP session + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + + # Initialize dedup tracker (shared sqlite DB with NWS) + self._db = sqlite3.connect(str(self._cursor_db_path)) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + + # Sweep old entries on startup (48h for FIRMS) + self.sweep_old_ids() + + logger.info( + "FIRMS adapter started", + extra={ + "region": { + "north": self.region.north, + "south": self.region.south, + "east": self.region.east, + "west": self.region.west, + } if self.region else None, + "satellites": self._satellites, + "api_key_present": self._api_key is not None, + }, + ) + + async def shutdown(self) -> None: + """Close HTTP session and database.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("FIRMS adapter shut down") + + def is_published(self, stable_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, stable_id), + ) + return cur.fetchone() is not None + + def mark_published(self, stable_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, stable_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 48 hours. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-48 hours')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("FIRMS swept old dedup entries", extra={"count": count}) + return count + + def _build_stable_id( + self, satellite: str, acq_date: str, acq_time: str, lat: float, lon: float + ) -> str: + """Build stable ID for deduplication.""" + # Round lat/lon to 0.001 degrees to handle floating-point comparison + lat_rounded = round(lat, 3) + lon_rounded = round(lon, 3) + return f"{satellite}:{acq_date}:{acq_time}:{lat_rounded}:{lon_rounded}" + + def _build_url(self, satellite: str) -> str | None: + """Build FIRMS API URL for a satellite.""" + if not self._api_key or not self.region: + return None + + # Area format: west,south,east,north + area = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" + return f"{FIRMS_API_BASE}/{self._api_key}/{satellite}/{area}/1" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=2, max=30), + retry=retry_if_exception_type((aiohttp.ClientError,)), + reraise=True, + ) + async def _fetch_csv(self, url: str) -> str: + """Fetch CSV data from FIRMS API.""" + if not self._session: + raise RuntimeError("Session not initialized") + + async with self._session.get(url) as resp: + # Check for error responses + content_type = resp.headers.get("Content-Type", "") + if "text/html" in content_type: + text = await resp.text() + logger.error( + "FIRMS returned HTML (likely auth error)", + extra={"status": resp.status, "preview": text[:200]}, + ) + raise ValueError("FIRMS returned HTML instead of CSV") + + resp.raise_for_status() + return await resp.text() + + def _parse_csv(self, csv_text: str, satellite: str) -> list[dict[str, Any]]: + """Parse FIRMS CSV response into list of dicts.""" + rows = [] + reader = csv.DictReader(StringIO(csv_text)) + + for row in reader: + try: + # Parse required fields + lat = float(row["latitude"]) + lon = float(row["longitude"]) + acq_date = row["acq_date"] + acq_time = row["acq_time"] + confidence_raw = row.get("confidence", "n").lower() + confidence = CONFIDENCE_MAP.get(confidence_raw, "nominal") + + rows.append({ + "latitude": lat, + "longitude": lon, + "bright_ti4": float(row.get("bright_ti4", 0)) if row.get("bright_ti4") else None, + "bright_ti5": float(row.get("bright_ti5", 0)) if row.get("bright_ti5") else None, + "scan": float(row.get("scan", 0)) if row.get("scan") else None, + "track": float(row.get("track", 0)) if row.get("track") else None, + "acq_date": acq_date, + "acq_time": acq_time, + "satellite": row.get("satellite", satellite), + "instrument": row.get("instrument", "VIIRS"), + "confidence": confidence, + "confidence_raw": confidence_raw, + "version": row.get("version", ""), + "frp": float(row.get("frp", 0)) if row.get("frp") else None, + "daynight": row.get("daynight", ""), + }) + except (KeyError, ValueError) as e: + logger.warning( + "Failed to parse FIRMS row", + extra={"error": str(e), "row": dict(row)}, + ) + continue + + return rows + + def _row_to_event(self, row: dict[str, Any], satellite: str) -> Event: + """Convert a parsed CSV row to an Event.""" + satellite_short = SATELLITE_SHORT.get(satellite, satellite.lower().replace("_nrt", "")) + confidence = row["confidence"] + severity = SEVERITY_MAP.get(confidence, 1) + + # Parse acquisition time + acq_date = row["acq_date"] + acq_time = row["acq_time"] + # acq_time is HHMM format + try: + time = datetime.strptime( + f"{acq_date} {acq_time}", "%Y-%m-%d %H%M" + ).replace(tzinfo=timezone.utc) + except ValueError: + time = datetime.now(timezone.utc) + + lat = row["latitude"] + lon = row["longitude"] + + # Build stable ID + stable_id = self._build_stable_id(satellite, acq_date, acq_time, lat, lon) + + geo = Geo( + centroid=(lon, lat), # GeoJSON order: lon, lat + bbox=(lon, lat, lon, lat), # Point bbox + regions=[], + primary_region=None, + ) + + return Event( + id=stable_id, + source="central/adapters/firms", + category=f"fire.hotspot.{satellite_short}.{confidence}", + time=time, + expires=None, + severity=severity, + geo=geo, + data=row, + ) + + async def poll(self) -> AsyncIterator[Event]: + """Poll FIRMS API for fire hotspots.""" + # Check API key + if not self._api_key: + # Try to fetch again in case it was added + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + if not self._api_key: + logger.warning( + "FIRMS API key still not available, skipping poll", + extra={"alias": self._api_key_alias}, + ) + return + + if not self.region: + logger.warning("FIRMS region not configured, skipping poll") + return + + # Sweep old dedup entries periodically + self.sweep_old_ids() + + total_features = 0 + total_new = 0 + + for satellite in self._satellites: + url = self._build_url(satellite) + if not url: + continue + + try: + csv_text = await self._fetch_csv(url) + rows = self._parse_csv(csv_text, satellite) + feature_count = len(rows) + total_features += feature_count + + new_count = 0 + for row in rows: + stable_id = self._build_stable_id( + satellite, + row["acq_date"], + row["acq_time"], + row["latitude"], + row["longitude"], + ) + + if self.is_published(stable_id): + continue + + event = self._row_to_event(row, satellite) + yield event + self.mark_published(stable_id) + new_count += 1 + + total_new += new_count + logger.info( + "FIRMS satellite poll completed", + extra={ + "satellite": satellite, + "feature_count": feature_count, + "new_count": new_count, + }, + ) + + except Exception as e: + logger.error( + "FIRMS poll failed for satellite", + extra={"satellite": satellite, "error": str(e)}, + ) + continue + + logger.info( + "FIRMS poll completed", + extra={ + "total_features": total_features, + "total_new": total_new, + "satellites": self._satellites, + }, + ) + + +def subject_for_fire_hotspot(ev: Event) -> str: + """Compute the NATS subject for a fire hotspot event. + + Subject format: central.fire.hotspot.. + + The category already contains the satellite and confidence info, + so we just prefix with 'central.'. + """ + # category is "fire.hotspot.." + return f"central.{ev.category}" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index d2ad155..fa12f98 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -20,6 +20,7 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore from central.models import Event, Geo from shapely.geometry import box as shapely_box, shape as shapely_shape @@ -196,6 +197,7 @@ class NWSAdapter(SourceAdapter): def __init__( self, config: AdapterConfig, + config_store: ConfigStore, cursor_db_path: Path, ) -> None: self.cursor_db_path = cursor_db_path diff --git a/src/central/models.py b/src/central/models.py index 588591c..9671202 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -1,68 +1,79 @@ -"""Data models for Central event processing.""" - -from datetime import datetime -from typing import Any - -from pydantic import BaseModel, ConfigDict - - -class Geo(BaseModel): - """Geographic context for an event.""" - - model_config = ConfigDict(extra="forbid", frozen=True) - - centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order - bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) - regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] - primary_region: str | None = None # alphabetically first region, used for subject - - -class Event(BaseModel): - """Canonical event representation for all adapters.""" - - model_config = ConfigDict(extra="forbid", frozen=True) - - id: str # unique, stable across republish - source: str # adapter identity, e.g. "central/adapters/nws" - category: str # e.g. "wx.alert.severe_thunderstorm_warning" - time: datetime # event-time UTC, not processing-time - expires: datetime | None = None - severity: int | None = None # 0..4 or None for "Unknown" - geo: Geo - data: dict[str, Any] # adapter-specific payload - - -def subject_for_event(ev: Event, prefix: str = "central.wx") -> str: - """ - Compute the NATS subject for an alert-style event. - - For weather alerts the subject is: - central.wx.alert.us..county. - or - central.wx.alert.us..zone. - based on whether the primary_region encodes a county or a zone. - - If primary_region is None or unparseable, returns: - central.wx.alert.us.unknown - """ - if ev.geo.primary_region is None: - return f"{prefix}.alert.us.unknown" - - region = ev.geo.primary_region - - # Parse US-- format - # County codes are like "Ada", "Canyon" (names) - # Zone codes start with "Z" like "Z033" - parts = region.split("-") - if len(parts) < 3 or parts[0] != "US": - return f"{prefix}.alert.us.unknown" - - state = parts[1].lower() - code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" - - if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): - # Zone code like Z033 - return f"{prefix}.alert.us.{state}.zone.{code.lower()}" - else: - # County name - return f"{prefix}.alert.us.{state}.county.{code.lower()}" +"""Data models for Central event processing.""" + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict + + +class Geo(BaseModel): + """Geographic context for an event.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order + bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) + regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] + primary_region: str | None = None # alphabetically first region, used for subject + + +class Event(BaseModel): + """Canonical event representation for all adapters.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + id: str # unique, stable across republish + source: str # adapter identity, e.g. "central/adapters/nws" + category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high" + time: datetime # event-time UTC, not processing-time + expires: datetime | None = None + severity: int | None = None # 0..4 or None for "Unknown" + geo: Geo + data: dict[str, Any] # adapter-specific payload + + +def subject_for_event(ev: Event) -> str: + """ + Compute the NATS subject for an event based on its category. + + Dispatch by category prefix: + - fire.*: returns central. directly + - wx.*: uses weather alert subject logic + + Weather alert subjects: + central.wx.alert.us..county. + or + central.wx.alert.us..zone. + based on whether the primary_region encodes a county or a zone. + + Fire hotspot subjects: + central.fire.hotspot.. + """ + # Fire events: subject is just central. + if ev.category.startswith("fire."): + return f"central.{ev.category}" + + # Weather events: use geo-based subject logic + prefix = "central.wx" + + if ev.geo.primary_region is None: + return f"{prefix}.alert.us.unknown" + + region = ev.geo.primary_region + + # Parse US-- format + # County codes are like "Ada", "Canyon" (names) + # Zone codes start with "Z" like "Z033" + parts = region.split("-") + if len(parts) < 3 or parts[0] != "US": + return f"{prefix}.alert.us.unknown" + + state = parts[1].lower() + code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" + + if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): + # Zone code like Z033 + return f"{prefix}.alert.us.{state}.zone.{code.lower()}" + else: + # County name + return f"{prefix}.alert.us.{state}.county.{code.lower()}" diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 2543291..7ce0daf 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -1,773 +1,785 @@ -"""Central supervisor - adapter scheduler and event publisher.""" - -import asyncio -import json -import logging -import signal -import sys -from dataclasses import dataclass, field -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -import nats -from nats.js import JetStreamContext - -from central.adapter import SourceAdapter -from central.adapters.nws import NWSAdapter -from central.cloudevents_wire import wrap_event -from central.config_models import AdapterConfig -from central.config_source import ConfigSource, DbConfigSource -from central.config_store import ConfigStore -from central.bootstrap_config import get_settings -from central.models import subject_for_event -from central.stream_manager import StreamManager - -CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") - -# Stream subject mappings -STREAM_SUBJECTS = { - "CENTRAL_WX": ["central.wx.>"], - "CENTRAL_META": ["central.meta.>"], -} - -# Recompute interval for stream max_bytes (1 hour) -STREAM_RECOMPUTE_INTERVAL_S = 3600 - - -class JsonFormatter(logging.Formatter): - """JSON log formatter for structured logging.""" - - def format(self, record: logging.LogRecord) -> str: - log_obj: dict[str, Any] = { - "ts": datetime.now(timezone.utc).isoformat(), - "level": record.levelname, - "logger": record.name, - "msg": record.getMessage(), - } - if record.exc_info: - log_obj["exc"] = self.formatException(record.exc_info) - if hasattr(record, "extra"): - log_obj.update(record.extra) - for key in record.__dict__: - if key not in ( - "name", "msg", "args", "created", "filename", "funcName", - "levelname", "levelno", "lineno", "module", "msecs", - "pathname", "process", "processName", "relativeCreated", - "stack_info", "exc_info", "exc_text", "thread", "threadName", - "taskName", "message", - ): - log_obj[key] = record.__dict__[key] - return json.dumps(log_obj) - - -def setup_logging() -> None: - """Configure JSON logging to stdout.""" - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(JsonFormatter()) - logging.root.handlers = [handler] - logging.root.setLevel(logging.INFO) - - -logger = logging.getLogger("central.supervisor") - - -@dataclass -class AdapterState: - """Runtime state for a scheduled adapter.""" - - name: str - adapter: SourceAdapter - config: AdapterConfig - task: asyncio.Task[None] | None = None - last_completed_poll: datetime | None = None - cancel_event: asyncio.Event = field(default_factory=asyncio.Event) - - @property - def is_running(self) -> bool: - """Check if adapter loop is currently running.""" - return self.task is not None and not self.task.done() - - -class Supervisor: - """Main supervisor process.""" - - def __init__( - self, - config_source: ConfigSource, - config_store: ConfigStore, - nats_url: str, - cloudevents_config: Any = None, - ) -> None: - self._config_source = config_source - self._config_store = config_store - self._nats_url = nats_url - self._cloudevents_config = cloudevents_config - self._nc: nats.NATS | None = None - self._js: JetStreamContext | None = None - self._stream_manager: StreamManager | None = None - self._adapter_states: dict[str, AdapterState] = {} - self._tasks: list[asyncio.Task[None]] = [] - self._shutdown_event = asyncio.Event() - self._start_time = datetime.now(timezone.utc) - self._config_watch_task: asyncio.Task[None] | None = None - self._lock = asyncio.Lock() - - async def connect(self) -> None: - """Connect to NATS.""" - self._nc = await nats.connect(self._nats_url) - self._js = self._nc.jetstream() - self._stream_manager = StreamManager(self._js) - logger.info("Connected to NATS", extra={"url": self._nats_url}) - - async def disconnect(self) -> None: - """Disconnect from NATS.""" - if self._nc: - await self._nc.drain() - await self._nc.close() - self._nc = None - self._js = None - self._stream_manager = None - logger.info("Disconnected from NATS") - - async def _publish_meta(self, subject: str, data: dict[str, Any]) -> None: - """Publish a meta event (no Nats-Msg-Id).""" - if not self._nc: - return - payload = json.dumps(data).encode() - await self._nc.publish(subject, payload) - - async def _publish_event(self, subject: str, envelope: dict[str, Any], msg_id: str) -> None: - """Publish an event with dedup header.""" - if not self._js: - return - payload = json.dumps(envelope).encode() - await self._js.publish( - subject, - payload, - headers={"Nats-Msg-Id": msg_id}, - ) - - def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: - """Create an adapter instance based on config name.""" - if config.name == "nws": - return NWSAdapter(config=config, cursor_db_path=CURSOR_DB_PATH) - else: - raise ValueError(f"Unknown adapter type: {config.name}") - - async def _run_adapter_loop(self, state: AdapterState) -> None: - """Run an adapter poll loop with rate-limit aware scheduling.""" - while not self._shutdown_event.is_set(): - # Calculate next poll time based on rate-limit guarantee - now = datetime.now(timezone.utc) - - if state.last_completed_poll is not None: - next_poll_at = state.last_completed_poll.timestamp() + state.config.cadence_s - wait_time = max(0, next_poll_at - now.timestamp()) - else: - # First poll - run immediately - wait_time = 0 - - if wait_time > 0: - logger.debug( - "Waiting for next poll", - extra={ - "adapter": state.name, - "wait_s": wait_time, - "next_poll": datetime.fromtimestamp( - now.timestamp() + wait_time, tz=timezone.utc - ).isoformat(), - }, - ) - # Wait for either timeout or cancel signal - try: - await asyncio.wait_for( - state.cancel_event.wait(), - timeout=wait_time, - ) - # Cancel event was set - check if we should exit or reschedule - if self._shutdown_event.is_set(): - break - # Clear the cancel event and re-evaluate schedule - state.cancel_event.clear() - continue - except asyncio.TimeoutError: - pass - - # Check shutdown before polling - if self._shutdown_event.is_set(): - break - - # Check if adapter is still enabled - if not state.config.enabled or state.config.is_paused: - logger.info( - "Adapter disabled/paused, stopping loop", - extra={"adapter": state.name}, - ) - break - - poll_start = datetime.now(timezone.utc) - try: - async for event in state.adapter.poll(): - # Dedup check - if state.adapter.is_published(event.id): - state.adapter.bump_last_seen(event.id) - continue - - # Build CloudEvent (uses defaults if no config provided) - envelope, msg_id = wrap_event(event, self._cloudevents_config) - - subject = subject_for_event(event) - - # Publish - await self._publish_event(subject, envelope, msg_id) - state.adapter.mark_published(event.id) - - logger.info( - "Published event", - extra={"id": event.id, "subject": subject, "category": event.category} - ) - - # Publish success status - await self._publish_meta( - f"central.meta.adapter.{state.name}.status", - {"ok": True, "ts": datetime.now(timezone.utc).isoformat()} - ) - - # Mark poll completion time for rate limiting - state.last_completed_poll = datetime.now(timezone.utc) - - except Exception as e: - logger.exception("Adapter poll failed", extra={"adapter": state.name}) - await self._publish_meta( - f"central.meta.adapter.{state.name}.status", - { - "ok": False, - "error": str(e), - "ts": datetime.now(timezone.utc).isoformat() - } - ) - # Still mark completion time to avoid tight retry loops - state.last_completed_poll = datetime.now(timezone.utc) - - # Sweep old IDs - swept = state.adapter.sweep_old_ids() - if swept > 0: - logger.info("Swept old published IDs", extra={"count": swept}) - - async def _start_adapter(self, config: AdapterConfig) -> None: - """Start an adapter based on its configuration. - - If the adapter was previously stopped (state exists but task is not running), - reuses the existing state to preserve last_completed_poll for rate limiting. - """ - existing_state = self._adapter_states.get(config.name) - - if existing_state is not None: - if existing_state.is_running: - logger.warning( - "Adapter already running", - extra={"adapter": config.name}, - ) - return - - # Adapter was stopped - restart with preserved state - # Update config and restart the adapter - existing_state.config = config - existing_state.cancel_event.clear() - - # Reinitialize the adapter with new config - existing_state.adapter = self._create_adapter(config) - await existing_state.adapter.startup() - - # Start the loop task - existing_state.task = asyncio.create_task( - self._run_adapter_loop(existing_state) - ) - - # Calculate next poll time for logging - if existing_state.last_completed_poll: - next_poll_at = datetime.fromtimestamp( - existing_state.last_completed_poll.timestamp() + config.cadence_s, - tz=timezone.utc, - ) - if next_poll_at <= datetime.now(timezone.utc): - next_poll_at = datetime.now(timezone.utc) - else: - next_poll_at = datetime.now(timezone.utc) - - logger.info( - "Adapter restarted", - extra={ - "adapter": config.name, - "cadence_s": config.cadence_s, - "preserved_last_poll": existing_state.last_completed_poll.isoformat() - if existing_state.last_completed_poll - else None, - "next_poll": next_poll_at.isoformat(), - }, - ) - return - - # New adapter - create fresh state - try: - adapter = self._create_adapter(config) - except ValueError as e: - logger.warning(str(e), extra={"adapter": config.name}) - return - - await adapter.startup() - - state = AdapterState( - name=config.name, - adapter=adapter, - config=config, - ) - state.task = asyncio.create_task(self._run_adapter_loop(state)) - self._adapter_states[config.name] = state - - logger.info( - "Adapter started", - extra={ - "adapter": config.name, - "cadence_s": config.cadence_s, - }, - ) - - async def _stop_adapter(self, name: str) -> None: - """Stop a running adapter but preserve state for potential restart. - - The adapter state (including last_completed_poll) is preserved so that - if the adapter is re-enabled, the rate-limit guarantee is maintained. - Use _remove_adapter() to fully remove an adapter from tracking. - """ - state = self._adapter_states.get(name) - if state is None: - return - - if not state.is_running: - # Already stopped - return - - # Signal the loop to stop - state.cancel_event.set() - - if state.task: - state.task.cancel() - try: - await state.task - except asyncio.CancelledError: - pass - state.task = None - - await state.adapter.shutdown() - logger.info( - "Adapter stopped", - extra={ - "adapter": name, - "preserved_last_poll": state.last_completed_poll.isoformat() - if state.last_completed_poll - else None, - }, - ) - - async def _remove_adapter(self, name: str) -> None: - """Fully remove an adapter, dropping all preserved state. - - Called when an adapter is deleted from the database (not just disabled). - """ - state = self._adapter_states.pop(name, None) - if state is None: - return - - # Stop if running - if state.is_running: - state.cancel_event.set() - if state.task: - state.task.cancel() - try: - await state.task - except asyncio.CancelledError: - pass - - await state.adapter.shutdown() - - logger.info( - "Adapter removed", - extra={"adapter": name}, - ) - - async def _reschedule_adapter( - self, - name: str, - new_config: AdapterConfig, - ) -> AdapterState | None: - """Reschedule an adapter with new configuration. - - Maintains rate-limit guarantee: next poll at - (last_completed_poll + new_cadence_s), not now + new_cadence_s. - - Returns the AdapterState to signal, or None if no signal needed. - The caller must signal cancel_event AFTER releasing any locks to - ensure immediate event delivery to the sleeping loop. - """ - state = self._adapter_states.get(name) - if state is None: - # Adapter not running - just start it - await self._start_adapter(new_config) - return None - - if not state.is_running: - # Adapter stopped - restart it - await self._start_adapter(new_config) - return None - - old_cadence = state.config.cadence_s - new_cadence = new_config.cadence_s - - # Update config - state.config = new_config - - # Apply config to adapter (generic - each adapter handles its own settings) - await state.adapter.apply_config(new_config) - - # Calculate next poll time for logging - if state.last_completed_poll: - next_poll_at = datetime.fromtimestamp( - state.last_completed_poll.timestamp() + new_cadence, - tz=timezone.utc, - ) - else: - next_poll_at = datetime.now(timezone.utc) - - logger.info( - "Rescheduled adapter", - extra={ - "adapter": name, - "old_cadence_s": old_cadence, - "new_cadence_s": new_cadence, - "next_poll": next_poll_at.isoformat(), - }, - ) - - # Return state so caller can signal OUTSIDE any locks. - # This ensures immediate event delivery to the sleeping loop. - return state - - async def _ensure_streams(self) -> None: - """Ensure all configured streams exist with correct settings.""" - if not self._stream_manager: - return - - streams = await self._config_store.list_streams() - for stream_config in streams: - subjects = STREAM_SUBJECTS.get(stream_config.name, []) - if not subjects: - logger.warning( - "No subjects configured for stream", - extra={"stream": stream_config.name}, - ) - continue - - try: - await self._stream_manager.ensure_stream( - stream_config.name, - subjects, - stream_config, - ) - except Exception as e: - logger.error( - "Failed to ensure stream", - extra={"stream": stream_config.name, "error": str(e)}, - ) - - async def _handle_stream_change(self, stream_name: str) -> None: - """Handle a stream configuration change.""" - if not self._stream_manager: - return - - stream_config = await self._config_store.get_stream(stream_name) - if stream_config is None: - logger.warning( - "Stream config not found", - extra={"stream": stream_name}, - ) - return - - try: - # Apply retention settings - await self._stream_manager.apply_retention(stream_name, stream_config) - - # Immediate recompute of max_bytes - new_max_bytes = await self._stream_manager.recompute_max_bytes( - stream_name, stream_config.max_age_s - ) - - # Update database (won't trigger NOTIFY due to column filter) - await self._config_store.update_stream_max_bytes(stream_name, new_max_bytes) - - logger.info( - "Stream retention updated", - extra={ - "stream": stream_name, - "max_age_s": stream_config.max_age_s, - "new_max_bytes": new_max_bytes, - }, - ) - except Exception as e: - logger.error( - "Failed to handle stream change", - extra={"stream": stream_name, "error": str(e)}, - ) - - async def _stream_retention_recompute_loop(self) -> None: - """Periodically recompute max_bytes for all streams.""" - while not self._shutdown_event.is_set(): - try: - await asyncio.wait_for( - self._shutdown_event.wait(), - timeout=STREAM_RECOMPUTE_INTERVAL_S, - ) - # Shutdown requested - break - except asyncio.TimeoutError: - pass - - # Recompute for all streams - if not self._stream_manager: - continue - - streams = await self._config_store.list_streams() - for stream_config in streams: - if not stream_config.managed_max_bytes: - continue - - try: - new_max_bytes = await self._stream_manager.recompute_max_bytes( - stream_config.name, stream_config.max_age_s - ) - - # Only update if change > 10% - change_ratio = abs(new_max_bytes - stream_config.max_bytes) / max(stream_config.max_bytes, 1) - if change_ratio > 0.10: - await self._config_store.update_stream_max_bytes( - stream_config.name, new_max_bytes - ) - await self._stream_manager.apply_retention( - stream_config.name, - await self._config_store.get_stream(stream_config.name), - ) - logger.info( - "Recomputed stream max_bytes", - extra={ - "stream": stream_config.name, - "old_max_bytes": stream_config.max_bytes, - "new_max_bytes": new_max_bytes, - "change_ratio": change_ratio, - }, - ) - except Exception as e: - logger.error( - "Failed to recompute stream max_bytes", - extra={"stream": stream_config.name, "error": str(e)}, - ) - - async def _on_config_change(self, table: str, key: str) -> None: - """Handle a configuration change notification. - - Called when NOTIFY fires for config changes. - """ - # Handle stream changes - if table == "streams": - stream_name = key - logger.info( - "Stream config change received", - extra={"stream": stream_name}, - ) - await self._handle_stream_change(stream_name) - return - - if table != "adapters": - return - - adapter_name = key - logger.info( - "Config change received", - extra={"table": table, "key": key}, - ) - - # Track state that needs signaling after lock release - state_to_signal: AdapterState | None = None - - async with self._lock: - # Fetch the current config for this adapter - new_config = await self._config_source.get_adapter(adapter_name) - current_state = self._adapter_states.get(adapter_name) - - if new_config is None: - # Adapter was deleted - fully remove, don't just stop - if current_state: - await self._remove_adapter(adapter_name) - logger.info( - "Adapter deleted, removed", - extra={"adapter": adapter_name}, - ) - return - - if not new_config.enabled or new_config.is_paused: - # Adapter disabled or paused - stop but preserve state - if current_state and current_state.is_running: - await self._stop_adapter(adapter_name) - logger.info( - "Adapter disabled/paused, stopped", - extra={ - "adapter": adapter_name, - "enabled": new_config.enabled, - "paused": new_config.is_paused, - }, - ) - return - - if current_state is None or not current_state.is_running: - # Adapter was enabled or created - start (will reuse state if exists) - await self._start_adapter(new_config) - logger.info( - "Adapter enabled, started", - extra={"adapter": adapter_name}, - ) - else: - # Adapter config changed (cadence, settings) - state_to_signal = await self._reschedule_adapter(adapter_name, new_config) - - # Signal OUTSIDE the lock to ensure immediate event delivery. - # This fixes cadence-decrease hot-reload where the signal was - # delayed by asyncio task scheduling while holding the lock. - if state_to_signal is not None: - state_to_signal.cancel_event.set() - - async def _heartbeat_loop(self) -> None: - """Publish periodic heartbeats.""" - while not self._shutdown_event.is_set(): - uptime = (datetime.now(timezone.utc) - self._start_time).total_seconds() - await self._publish_meta( - "central.meta.heartbeat", - {"ts": datetime.now(timezone.utc).isoformat(), "uptime_s": uptime} - ) - try: - await asyncio.wait_for( - self._shutdown_event.wait(), - timeout=30 - ) - except asyncio.TimeoutError: - pass - - async def start(self) -> None: - """Start the supervisor.""" - await self.connect() - - # Ensure streams exist with correct configuration - await self._ensure_streams() - - # Load and start enabled adapters - enabled_adapters = await self._config_source.list_enabled_adapters() - for config in enabled_adapters: - await self._start_adapter(config) - - # Start config watcher (runs forever, calling callback on changes) - self._config_watch_task = asyncio.create_task( - self._config_source.watch_for_changes(self._on_config_change) - ) - - # Start heartbeat - self._tasks.append(asyncio.create_task(self._heartbeat_loop())) - - # Start stream retention recompute loop - self._tasks.append(asyncio.create_task(self._stream_retention_recompute_loop())) - - logger.info( - "Supervisor started", - extra={"adapters": list(self._adapter_states.keys())}, - ) - - async def stop(self) -> None: - """Stop the supervisor gracefully.""" - logger.info("Supervisor shutting down") - self._shutdown_event.set() - - # Cancel config watcher - if self._config_watch_task: - self._config_watch_task.cancel() - try: - await self._config_watch_task - except asyncio.CancelledError: - pass - - # Cancel heartbeat and other tasks - for task in self._tasks: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - # Remove all adapters (full cleanup) - for name in list(self._adapter_states.keys()): - await self._remove_adapter(name) - - # Close config source - await self._config_source.close() - - await self.disconnect() - logger.info("Supervisor stopped") - - -async def async_main() -> None: - """Async entry point.""" - setup_logging() - - settings = get_settings() - logger.info( - "Config source: db", - extra={"config_source": "db"}, - ) - - # Create database config source and store - config_source = await DbConfigSource.create(settings.db_dsn) - config_store = await ConfigStore.create(settings.db_dsn) - - supervisor = Supervisor( - config_source=config_source, - config_store=config_store, - nats_url=settings.nats_url, - # CloudEvents uses protocol-level defaults from cloudevents_constants - cloudevents_config=None, - ) - logger.info( - "CloudEvents config: defaults", - extra={"cloudevents_source": "defaults"}, - ) - - loop = asyncio.get_running_loop() - shutdown_event = asyncio.Event() - - def handle_signal() -> None: - shutdown_event.set() - - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, handle_signal) - - await supervisor.start() - - # Wait for shutdown signal - await shutdown_event.wait() - - await supervisor.stop() - - -def main() -> None: - """Entry point.""" - asyncio.run(async_main()) - - -if __name__ == "__main__": - main() +"""Central supervisor - adapter scheduler and event publisher.""" + +import asyncio +import json +import logging +import signal +import sys +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import nats +from nats.js import JetStreamContext + +from central.adapter import SourceAdapter +from central.adapters.nws import NWSAdapter +from central.adapters.firms import FIRMSAdapter +from central.cloudevents_wire import wrap_event +from central.config_models import AdapterConfig +from central.config_source import ConfigSource, DbConfigSource +from central.config_store import ConfigStore +from central.bootstrap_config import get_settings +from central.models import subject_for_event +from central.stream_manager import StreamManager + +# Adapter registry - add new adapters here +_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { + "nws": NWSAdapter, + "firms": FIRMSAdapter, +} + +CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") + +# Stream subject mappings +STREAM_SUBJECTS = { + "CENTRAL_WX": ["central.wx.>"], + "CENTRAL_META": ["central.meta.>"], + "CENTRAL_FIRE": ["central.fire.>"], +} + +# Recompute interval for stream max_bytes (1 hour) +STREAM_RECOMPUTE_INTERVAL_S = 3600 + + +class JsonFormatter(logging.Formatter): + """JSON log formatter for structured logging.""" + + def format(self, record: logging.LogRecord) -> str: + log_obj: dict[str, Any] = { + "ts": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + if record.exc_info: + log_obj["exc"] = self.formatException(record.exc_info) + if hasattr(record, "extra"): + log_obj.update(record.extra) + for key in record.__dict__: + if key not in ( + "name", "msg", "args", "created", "filename", "funcName", + "levelname", "levelno", "lineno", "module", "msecs", + "pathname", "process", "processName", "relativeCreated", + "stack_info", "exc_info", "exc_text", "thread", "threadName", + "taskName", "message", + ): + log_obj[key] = record.__dict__[key] + return json.dumps(log_obj) + + +def setup_logging() -> None: + """Configure JSON logging to stdout.""" + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(JsonFormatter()) + logging.root.handlers = [handler] + logging.root.setLevel(logging.INFO) + + +logger = logging.getLogger("central.supervisor") + + +@dataclass +class AdapterState: + """Runtime state for a scheduled adapter.""" + + name: str + adapter: SourceAdapter + config: AdapterConfig + task: asyncio.Task[None] | None = None + last_completed_poll: datetime | None = None + cancel_event: asyncio.Event = field(default_factory=asyncio.Event) + + @property + def is_running(self) -> bool: + """Check if adapter loop is currently running.""" + return self.task is not None and not self.task.done() + + +class Supervisor: + """Main supervisor process.""" + + def __init__( + self, + config_source: ConfigSource, + config_store: ConfigStore, + nats_url: str, + cloudevents_config: Any = None, + ) -> None: + self._config_source = config_source + self._config_store = config_store + self._nats_url = nats_url + self._cloudevents_config = cloudevents_config + self._nc: nats.NATS | None = None + self._js: JetStreamContext | None = None + self._stream_manager: StreamManager | None = None + self._adapter_states: dict[str, AdapterState] = {} + self._tasks: list[asyncio.Task[None]] = [] + self._shutdown_event = asyncio.Event() + self._start_time = datetime.now(timezone.utc) + self._config_watch_task: asyncio.Task[None] | None = None + self._lock = asyncio.Lock() + + async def connect(self) -> None: + """Connect to NATS.""" + self._nc = await nats.connect(self._nats_url) + self._js = self._nc.jetstream() + self._stream_manager = StreamManager(self._js) + logger.info("Connected to NATS", extra={"url": self._nats_url}) + + async def disconnect(self) -> None: + """Disconnect from NATS.""" + if self._nc: + await self._nc.drain() + await self._nc.close() + self._nc = None + self._js = None + self._stream_manager = None + logger.info("Disconnected from NATS") + + async def _publish_meta(self, subject: str, data: dict[str, Any]) -> None: + """Publish a meta event (no Nats-Msg-Id).""" + if not self._nc: + return + payload = json.dumps(data).encode() + await self._nc.publish(subject, payload) + + async def _publish_event(self, subject: str, envelope: dict[str, Any], msg_id: str) -> None: + """Publish an event with dedup header.""" + if not self._js: + return + payload = json.dumps(envelope).encode() + await self._js.publish( + subject, + payload, + headers={"Nats-Msg-Id": msg_id}, + ) + + def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: + """Create an adapter instance based on config name.""" + cls = _ADAPTER_REGISTRY.get(config.name) + if cls is None: + raise ValueError(f"Unknown adapter type: {config.name}") + return cls( + config=config, + config_store=self._config_store, + cursor_db_path=CURSOR_DB_PATH, + ) + + async def _run_adapter_loop(self, state: AdapterState) -> None: + """Run an adapter poll loop with rate-limit aware scheduling.""" + while not self._shutdown_event.is_set(): + # Calculate next poll time based on rate-limit guarantee + now = datetime.now(timezone.utc) + + if state.last_completed_poll is not None: + next_poll_at = state.last_completed_poll.timestamp() + state.config.cadence_s + wait_time = max(0, next_poll_at - now.timestamp()) + else: + # First poll - run immediately + wait_time = 0 + + if wait_time > 0: + logger.debug( + "Waiting for next poll", + extra={ + "adapter": state.name, + "wait_s": wait_time, + "next_poll": datetime.fromtimestamp( + now.timestamp() + wait_time, tz=timezone.utc + ).isoformat(), + }, + ) + # Wait for either timeout or cancel signal + try: + await asyncio.wait_for( + state.cancel_event.wait(), + timeout=wait_time, + ) + # Cancel event was set - check if we should exit or reschedule + if self._shutdown_event.is_set(): + break + # Clear the cancel event and re-evaluate schedule + state.cancel_event.clear() + continue + except asyncio.TimeoutError: + pass + + # Check shutdown before polling + if self._shutdown_event.is_set(): + break + + # Check if adapter is still enabled + if not state.config.enabled or state.config.is_paused: + logger.info( + "Adapter disabled/paused, stopping loop", + extra={"adapter": state.name}, + ) + break + + poll_start = datetime.now(timezone.utc) + try: + async for event in state.adapter.poll(): + # Dedup check + if state.adapter.is_published(event.id): + state.adapter.bump_last_seen(event.id) + continue + + # Build CloudEvent (uses defaults if no config provided) + envelope, msg_id = wrap_event(event, self._cloudevents_config) + + subject = subject_for_event(event) + + # Publish + await self._publish_event(subject, envelope, msg_id) + state.adapter.mark_published(event.id) + + logger.info( + "Published event", + extra={"id": event.id, "subject": subject, "category": event.category} + ) + + # Publish success status + await self._publish_meta( + f"central.meta.adapter.{state.name}.status", + {"ok": True, "ts": datetime.now(timezone.utc).isoformat()} + ) + + # Mark poll completion time for rate limiting + state.last_completed_poll = datetime.now(timezone.utc) + + except Exception as e: + logger.exception("Adapter poll failed", extra={"adapter": state.name}) + await self._publish_meta( + f"central.meta.adapter.{state.name}.status", + { + "ok": False, + "error": str(e), + "ts": datetime.now(timezone.utc).isoformat() + } + ) + # Still mark completion time to avoid tight retry loops + state.last_completed_poll = datetime.now(timezone.utc) + + # Sweep old IDs + swept = state.adapter.sweep_old_ids() + if swept > 0: + logger.info("Swept old published IDs", extra={"count": swept}) + + async def _start_adapter(self, config: AdapterConfig) -> None: + """Start an adapter based on its configuration. + + If the adapter was previously stopped (state exists but task is not running), + reuses the existing state to preserve last_completed_poll for rate limiting. + """ + existing_state = self._adapter_states.get(config.name) + + if existing_state is not None: + if existing_state.is_running: + logger.warning( + "Adapter already running", + extra={"adapter": config.name}, + ) + return + + # Adapter was stopped - restart with preserved state + # Update config and restart the adapter + existing_state.config = config + existing_state.cancel_event.clear() + + # Reinitialize the adapter with new config + existing_state.adapter = self._create_adapter(config) + await existing_state.adapter.startup() + + # Start the loop task + existing_state.task = asyncio.create_task( + self._run_adapter_loop(existing_state) + ) + + # Calculate next poll time for logging + if existing_state.last_completed_poll: + next_poll_at = datetime.fromtimestamp( + existing_state.last_completed_poll.timestamp() + config.cadence_s, + tz=timezone.utc, + ) + if next_poll_at <= datetime.now(timezone.utc): + next_poll_at = datetime.now(timezone.utc) + else: + next_poll_at = datetime.now(timezone.utc) + + logger.info( + "Adapter restarted", + extra={ + "adapter": config.name, + "cadence_s": config.cadence_s, + "preserved_last_poll": existing_state.last_completed_poll.isoformat() + if existing_state.last_completed_poll + else None, + "next_poll": next_poll_at.isoformat(), + }, + ) + return + + # New adapter - create fresh state + try: + adapter = self._create_adapter(config) + except ValueError as e: + logger.warning(str(e), extra={"adapter": config.name}) + return + + await adapter.startup() + + state = AdapterState( + name=config.name, + adapter=adapter, + config=config, + ) + state.task = asyncio.create_task(self._run_adapter_loop(state)) + self._adapter_states[config.name] = state + + logger.info( + "Adapter started", + extra={ + "adapter": config.name, + "cadence_s": config.cadence_s, + }, + ) + + async def _stop_adapter(self, name: str) -> None: + """Stop a running adapter but preserve state for potential restart. + + The adapter state (including last_completed_poll) is preserved so that + if the adapter is re-enabled, the rate-limit guarantee is maintained. + Use _remove_adapter() to fully remove an adapter from tracking. + """ + state = self._adapter_states.get(name) + if state is None: + return + + if not state.is_running: + # Already stopped + return + + # Signal the loop to stop + state.cancel_event.set() + + if state.task: + state.task.cancel() + try: + await state.task + except asyncio.CancelledError: + pass + state.task = None + + await state.adapter.shutdown() + logger.info( + "Adapter stopped", + extra={ + "adapter": name, + "preserved_last_poll": state.last_completed_poll.isoformat() + if state.last_completed_poll + else None, + }, + ) + + async def _remove_adapter(self, name: str) -> None: + """Fully remove an adapter, dropping all preserved state. + + Called when an adapter is deleted from the database (not just disabled). + """ + state = self._adapter_states.pop(name, None) + if state is None: + return + + # Stop if running + if state.is_running: + state.cancel_event.set() + if state.task: + state.task.cancel() + try: + await state.task + except asyncio.CancelledError: + pass + + await state.adapter.shutdown() + + logger.info( + "Adapter removed", + extra={"adapter": name}, + ) + + async def _reschedule_adapter( + self, + name: str, + new_config: AdapterConfig, + ) -> AdapterState | None: + """Reschedule an adapter with new configuration. + + Maintains rate-limit guarantee: next poll at + (last_completed_poll + new_cadence_s), not now + new_cadence_s. + + Returns the AdapterState to signal, or None if no signal needed. + The caller must signal cancel_event AFTER releasing any locks to + ensure immediate event delivery to the sleeping loop. + """ + state = self._adapter_states.get(name) + if state is None: + # Adapter not running - just start it + await self._start_adapter(new_config) + return None + + if not state.is_running: + # Adapter stopped - restart it + await self._start_adapter(new_config) + return None + + old_cadence = state.config.cadence_s + new_cadence = new_config.cadence_s + + # Update config + state.config = new_config + + # Apply config to adapter (generic - each adapter handles its own settings) + await state.adapter.apply_config(new_config) + + # Calculate next poll time for logging + if state.last_completed_poll: + next_poll_at = datetime.fromtimestamp( + state.last_completed_poll.timestamp() + new_cadence, + tz=timezone.utc, + ) + else: + next_poll_at = datetime.now(timezone.utc) + + logger.info( + "Rescheduled adapter", + extra={ + "adapter": name, + "old_cadence_s": old_cadence, + "new_cadence_s": new_cadence, + "next_poll": next_poll_at.isoformat(), + }, + ) + + # Return state so caller can signal OUTSIDE any locks. + # This ensures immediate event delivery to the sleeping loop. + return state + + async def _ensure_streams(self) -> None: + """Ensure all configured streams exist with correct settings.""" + if not self._stream_manager: + return + + streams = await self._config_store.list_streams() + for stream_config in streams: + subjects = STREAM_SUBJECTS.get(stream_config.name, []) + if not subjects: + logger.warning( + "No subjects configured for stream", + extra={"stream": stream_config.name}, + ) + continue + + try: + await self._stream_manager.ensure_stream( + stream_config.name, + subjects, + stream_config, + ) + except Exception as e: + logger.error( + "Failed to ensure stream", + extra={"stream": stream_config.name, "error": str(e)}, + ) + + async def _handle_stream_change(self, stream_name: str) -> None: + """Handle a stream configuration change.""" + if not self._stream_manager: + return + + stream_config = await self._config_store.get_stream(stream_name) + if stream_config is None: + logger.warning( + "Stream config not found", + extra={"stream": stream_name}, + ) + return + + try: + # Apply retention settings + await self._stream_manager.apply_retention(stream_name, stream_config) + + # Immediate recompute of max_bytes + new_max_bytes = await self._stream_manager.recompute_max_bytes( + stream_name, stream_config.max_age_s + ) + + # Update database (won't trigger NOTIFY due to column filter) + await self._config_store.update_stream_max_bytes(stream_name, new_max_bytes) + + logger.info( + "Stream retention updated", + extra={ + "stream": stream_name, + "max_age_s": stream_config.max_age_s, + "new_max_bytes": new_max_bytes, + }, + ) + except Exception as e: + logger.error( + "Failed to handle stream change", + extra={"stream": stream_name, "error": str(e)}, + ) + + async def _stream_retention_recompute_loop(self) -> None: + """Periodically recompute max_bytes for all streams.""" + while not self._shutdown_event.is_set(): + try: + await asyncio.wait_for( + self._shutdown_event.wait(), + timeout=STREAM_RECOMPUTE_INTERVAL_S, + ) + # Shutdown requested + break + except asyncio.TimeoutError: + pass + + # Recompute for all streams + if not self._stream_manager: + continue + + streams = await self._config_store.list_streams() + for stream_config in streams: + if not stream_config.managed_max_bytes: + continue + + try: + new_max_bytes = await self._stream_manager.recompute_max_bytes( + stream_config.name, stream_config.max_age_s + ) + + # Only update if change > 10% + change_ratio = abs(new_max_bytes - stream_config.max_bytes) / max(stream_config.max_bytes, 1) + if change_ratio > 0.10: + await self._config_store.update_stream_max_bytes( + stream_config.name, new_max_bytes + ) + await self._stream_manager.apply_retention( + stream_config.name, + await self._config_store.get_stream(stream_config.name), + ) + logger.info( + "Recomputed stream max_bytes", + extra={ + "stream": stream_config.name, + "old_max_bytes": stream_config.max_bytes, + "new_max_bytes": new_max_bytes, + "change_ratio": change_ratio, + }, + ) + except Exception as e: + logger.error( + "Failed to recompute stream max_bytes", + extra={"stream": stream_config.name, "error": str(e)}, + ) + + async def _on_config_change(self, table: str, key: str) -> None: + """Handle a configuration change notification. + + Called when NOTIFY fires for config changes. + """ + # Handle stream changes + if table == "streams": + stream_name = key + logger.info( + "Stream config change received", + extra={"stream": stream_name}, + ) + await self._handle_stream_change(stream_name) + return + + if table != "adapters": + return + + adapter_name = key + logger.info( + "Config change received", + extra={"table": table, "key": key}, + ) + + # Track state that needs signaling after lock release + state_to_signal: AdapterState | None = None + + async with self._lock: + # Fetch the current config for this adapter + new_config = await self._config_source.get_adapter(adapter_name) + current_state = self._adapter_states.get(adapter_name) + + if new_config is None: + # Adapter was deleted - fully remove, don't just stop + if current_state: + await self._remove_adapter(adapter_name) + logger.info( + "Adapter deleted, removed", + extra={"adapter": adapter_name}, + ) + return + + if not new_config.enabled or new_config.is_paused: + # Adapter disabled or paused - stop but preserve state + if current_state and current_state.is_running: + await self._stop_adapter(adapter_name) + logger.info( + "Adapter disabled/paused, stopped", + extra={ + "adapter": adapter_name, + "enabled": new_config.enabled, + "paused": new_config.is_paused, + }, + ) + return + + if current_state is None or not current_state.is_running: + # Adapter was enabled or created - start (will reuse state if exists) + await self._start_adapter(new_config) + logger.info( + "Adapter enabled, started", + extra={"adapter": adapter_name}, + ) + else: + # Adapter config changed (cadence, settings) + state_to_signal = await self._reschedule_adapter(adapter_name, new_config) + + # Signal OUTSIDE the lock to ensure immediate event delivery. + # This fixes cadence-decrease hot-reload where the signal was + # delayed by asyncio task scheduling while holding the lock. + if state_to_signal is not None: + state_to_signal.cancel_event.set() + + async def _heartbeat_loop(self) -> None: + """Publish periodic heartbeats.""" + while not self._shutdown_event.is_set(): + uptime = (datetime.now(timezone.utc) - self._start_time).total_seconds() + await self._publish_meta( + "central.meta.heartbeat", + {"ts": datetime.now(timezone.utc).isoformat(), "uptime_s": uptime} + ) + try: + await asyncio.wait_for( + self._shutdown_event.wait(), + timeout=30 + ) + except asyncio.TimeoutError: + pass + + async def start(self) -> None: + """Start the supervisor.""" + await self.connect() + + # Ensure streams exist with correct configuration + await self._ensure_streams() + + # Load and start enabled adapters + enabled_adapters = await self._config_source.list_enabled_adapters() + for config in enabled_adapters: + await self._start_adapter(config) + + # Start config watcher (runs forever, calling callback on changes) + self._config_watch_task = asyncio.create_task( + self._config_source.watch_for_changes(self._on_config_change) + ) + + # Start heartbeat + self._tasks.append(asyncio.create_task(self._heartbeat_loop())) + + # Start stream retention recompute loop + self._tasks.append(asyncio.create_task(self._stream_retention_recompute_loop())) + + logger.info( + "Supervisor started", + extra={"adapters": list(self._adapter_states.keys())}, + ) + + async def stop(self) -> None: + """Stop the supervisor gracefully.""" + logger.info("Supervisor shutting down") + self._shutdown_event.set() + + # Cancel config watcher + if self._config_watch_task: + self._config_watch_task.cancel() + try: + await self._config_watch_task + except asyncio.CancelledError: + pass + + # Cancel heartbeat and other tasks + for task in self._tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Remove all adapters (full cleanup) + for name in list(self._adapter_states.keys()): + await self._remove_adapter(name) + + # Close config source + await self._config_source.close() + + await self.disconnect() + logger.info("Supervisor stopped") + + +async def async_main() -> None: + """Async entry point.""" + setup_logging() + + settings = get_settings() + logger.info( + "Config source: db", + extra={"config_source": "db"}, + ) + + # Create database config source and store + config_source = await DbConfigSource.create(settings.db_dsn) + config_store = await ConfigStore.create(settings.db_dsn) + + supervisor = Supervisor( + config_source=config_source, + config_store=config_store, + nats_url=settings.nats_url, + # CloudEvents uses protocol-level defaults from cloudevents_constants + cloudevents_config=None, + ) + logger.info( + "CloudEvents config: defaults", + extra={"cloudevents_source": "defaults"}, + ) + + loop = asyncio.get_running_loop() + shutdown_event = asyncio.Event() + + def handle_signal() -> None: + shutdown_event.set() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, handle_signal) + + await supervisor.start() + + # Wait for shutdown signal + await shutdown_event.wait() + + await supervisor.stop() + + +def main() -> None: + """Entry point.""" + asyncio.run(async_main()) + + +if __name__ == "__main__": + main() diff --git a/tests/test_firms.py b/tests/test_firms.py new file mode 100644 index 0000000..8e569ad --- /dev/null +++ b/tests/test_firms.py @@ -0,0 +1,410 @@ +"""Tests for FIRMS adapter.""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch +from pathlib import Path +import tempfile + +from central.adapters.firms import ( + FIRMSAdapter, + CONFIDENCE_MAP, + SATELLITE_SHORT, + subject_for_fire_hotspot, +) +from central.config_models import AdapterConfig, RegionConfig +from central.models import Event, Geo + + +# Sample FIRMS CSV response +SAMPLE_CSV = """latitude,longitude,bright_ti4,scan,track,acq_date,acq_time,satellite,instrument,confidence,version,bright_ti5,frp,daynight +45.123,-116.456,320.5,0.39,0.36,2026-05-16,1430,N,VIIRS,h,2.0NRT,290.2,15.3,D +46.789,-117.012,305.2,0.41,0.38,2026-05-16,1430,N,VIIRS,n,2.0NRT,285.1,8.7,D +45.123,-116.456,318.9,0.40,0.37,2026-05-16,1430,N,VIIRS,l,2.0NRT,288.5,12.1,D +""" + +# Sample CSV with duplicate (same location, date, time) +SAMPLE_CSV_WITH_DUPE = """latitude,longitude,bright_ti4,scan,track,acq_date,acq_time,satellite,instrument,confidence,version,bright_ti5,frp,daynight +45.123,-116.456,320.5,0.39,0.36,2026-05-16,1430,N,VIIRS,h,2.0NRT,290.2,15.3,D +45.123,-116.456,320.5,0.39,0.36,2026-05-16,1430,N,VIIRS,h,2.0NRT,290.2,15.3,D +""" + + +def make_adapter_config( + region: dict | None = None, + satellites: list[str] | None = None, +) -> AdapterConfig: + """Create an AdapterConfig for testing.""" + settings = { + "api_key_alias": "firms", + "satellites": satellites or ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"], + } + if region: + settings["region"] = region + else: + settings["region"] = { + "north": 49.5, + "south": 31.0, + "east": -102.0, + "west": -124.5, + } + + return AdapterConfig( + name="firms", + enabled=True, + cadence_s=300, + settings=settings, + updated_at=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def temp_db_path(): + """Create a temporary database path for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + yield Path(f.name) + + +@pytest.fixture +def mock_config_store(): + """Create a mock ConfigStore.""" + store = MagicMock() + store.get_api_key = AsyncMock(return_value="test_api_key") + return store + + +class TestConfidenceMapping: + """Test confidence value mapping.""" + + def test_low_confidence(self): + assert CONFIDENCE_MAP["l"] == "low" + + def test_nominal_confidence(self): + assert CONFIDENCE_MAP["n"] == "nominal" + + def test_high_confidence(self): + assert CONFIDENCE_MAP["h"] == "high" + + +class TestSatelliteShortNames: + """Test satellite short name mapping.""" + + def test_snpp_short_name(self): + assert SATELLITE_SHORT["VIIRS_SNPP_NRT"] == "viirs_snpp" + + def test_noaa20_short_name(self): + assert SATELLITE_SHORT["VIIRS_NOAA20_NRT"] == "viirs_noaa20" + + def test_noaa21_short_name(self): + assert SATELLITE_SHORT["VIIRS_NOAA21_NRT"] == "viirs_noaa21" + + +class TestStableIdGeneration: + """Test stable ID generation for deduplication.""" + + @pytest.mark.asyncio + async def test_stable_id_format(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + stable_id = adapter._build_stable_id( + satellite="VIIRS_SNPP_NRT", + acq_date="2026-05-16", + acq_time="1430", + lat=45.1234567, + lon=-116.4567890, + ) + + # Should be rounded to 3 decimal places + assert stable_id == "VIIRS_SNPP_NRT:2026-05-16:1430:45.123:-116.457" + + @pytest.mark.asyncio + async def test_stable_id_rounding(self, temp_db_path, mock_config_store): + """Test that small lat/lon differences within 0.001 round to same ID.""" + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + # Values that differ by less than 0.0005 should round to same value + id1 = adapter._build_stable_id("SAT", "2026-05-16", "1430", 45.1234, -116.4564) + id2 = adapter._build_stable_id("SAT", "2026-05-16", "1430", 45.1232, -116.4562) + + # Both should round to 45.124, -116.457 + assert id1 == id2 + + +class TestCsvParsing: + """Test CSV parsing.""" + + @pytest.mark.asyncio + async def test_parse_csv_rows(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + + assert len(rows) == 3 + assert rows[0]["latitude"] == 45.123 + assert rows[0]["longitude"] == -116.456 + assert rows[0]["confidence"] == "high" + assert rows[1]["confidence"] == "nominal" + assert rows[2]["confidence"] == "low" + + @pytest.mark.asyncio + async def test_parse_csv_brightness(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + + assert rows[0]["bright_ti4"] == 320.5 + assert rows[0]["bright_ti5"] == 290.2 + assert rows[0]["frp"] == 15.3 + + +class TestEventGeneration: + """Test Event generation from CSV rows.""" + + @pytest.mark.asyncio + async def test_event_category(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") + + assert event.category == "fire.hotspot.viirs_snpp.high" + + @pytest.mark.asyncio + async def test_event_severity(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + + high_event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") + nominal_event = adapter._row_to_event(rows[1], "VIIRS_SNPP_NRT") + low_event = adapter._row_to_event(rows[2], "VIIRS_SNPP_NRT") + + assert high_event.severity == 3 + assert nominal_event.severity == 2 + assert low_event.severity == 1 + + @pytest.mark.asyncio + async def test_event_geo(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") + + # GeoJSON order: lon, lat + assert event.geo.centroid == (-116.456, 45.123) + assert event.geo.bbox == (-116.456, 45.123, -116.456, 45.123) + + +class TestDeduplication: + """Test deduplication logic.""" + + @pytest.mark.asyncio + async def test_dedup_marks_published(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + stable_id = "VIIRS_SNPP_NRT:2026-05-16:1430:45.123:-116.456" + + # Not published initially + assert not adapter.is_published(stable_id) + + # Mark as published + adapter.mark_published(stable_id) + + # Now should be published + assert adapter.is_published(stable_id) + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_dedup_prevents_duplicates(self, temp_db_path, mock_config_store): + """Test that duplicate rows don't produce duplicate events.""" + # Use only one satellite to simplify the test + config = make_adapter_config(satellites=["VIIRS_SNPP_NRT"]) + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + # Mock the fetch to return CSV with duplicates + with patch.object(adapter, "_fetch_csv", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = SAMPLE_CSV_WITH_DUPE + + events = [] + async for event in adapter.poll(): + events.append(event) + + # Should only get one event despite two identical rows + assert len(events) == 1 + + await adapter.shutdown() + + +class TestSubjectGeneration: + """Test subject generation for fire hotspots.""" + + def test_subject_format(self): + event = Event( + id="test", + source="central/adapters/firms", + category="fire.hotspot.viirs_snpp.high", + time=datetime.now(timezone.utc), + severity=3, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + + subject = subject_for_fire_hotspot(event) + assert subject == "central.fire.hotspot.viirs_snpp.high" + + def test_subject_nominal_confidence(self): + event = Event( + id="test", + source="central/adapters/firms", + category="fire.hotspot.viirs_noaa20.nominal", + time=datetime.now(timezone.utc), + severity=2, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + + subject = subject_for_fire_hotspot(event) + assert subject == "central.fire.hotspot.viirs_noaa20.nominal" + + +class TestUrlBuilding: + """Test FIRMS API URL building.""" + + @pytest.mark.asyncio + async def test_url_format(self, temp_db_path, mock_config_store): + config = make_adapter_config( + region={"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5} + ) + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + url = adapter._build_url("VIIRS_SNPP_NRT") + + assert url is not None + assert "test_api_key" in url + assert "VIIRS_SNPP_NRT" in url + assert "-124.5,31.0,-102.0,49.5" in url # west,south,east,north + assert "/1" in url # dayRange + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_url_none_without_key(self, temp_db_path): + mock_store = MagicMock() + mock_store.get_api_key = AsyncMock(return_value=None) + + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + url = adapter._build_url("VIIRS_SNPP_NRT") + + assert url is None + + await adapter.shutdown() + + +class TestApplyConfig: + """Test hot-reload configuration application.""" + + @pytest.mark.asyncio + async def test_apply_config_updates_region(self, temp_db_path, mock_config_store): + config = make_adapter_config( + region={"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5} + ) + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + # Original region + assert adapter.region.north == 49.5 + + # Apply new config with different region + new_config = make_adapter_config( + region={"north": 48.0, "south": 45.0, "east": -115.0, "west": -125.0} + ) + await adapter.apply_config(new_config) + + assert adapter.region.north == 48.0 + assert adapter.region.south == 45.0 + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_apply_config_updates_satellites(self, temp_db_path, mock_config_store): + config = make_adapter_config(satellites=["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"]) + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + await adapter.startup() + + # Original satellites + assert len(adapter._satellites) == 2 + + # Apply config with single satellite + new_config = make_adapter_config(satellites=["VIIRS_NOAA20_NRT"]) + await adapter.apply_config(new_config) + + assert adapter._satellites == ["VIIRS_NOAA20_NRT"] + + await adapter.shutdown()