diff --git a/sql/migrations/018_add_swpc_adapters.sql b/sql/migrations/018_add_swpc_adapters.sql new file mode 100644 index 0000000..29a28d8 --- /dev/null +++ b/sql/migrations/018_add_swpc_adapters.sql @@ -0,0 +1,11 @@ +-- Migration: 018_add_swpc_adapters +-- Add NOAA SWPC space weather adapters to config.adapters. +-- All three ship disabled; operator enables individually via GUI. +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES + ('swpc_alerts', false, 300, '{}'::jsonb), + ('swpc_kindex', false, 600, '{}'::jsonb), + ('swpc_protons', false, 600, '{}'::jsonb) +ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/019_add_central_space_stream.sql b/sql/migrations/019_add_central_space_stream.sql new file mode 100644 index 0000000..f249227 --- /dev/null +++ b/sql/migrations/019_add_central_space_stream.sql @@ -0,0 +1,8 @@ +-- Migration: 019_add_central_space_stream +-- Seeds the CENTRAL_SPACE JetStream stream row for central.space.> subjects. +-- 7-day retention, 1 GiB max_bytes (clamped by supervisor recompute) -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE. +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_SPACE', 604800, 1073741824) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/swpc_alerts.py b/src/central/adapters/swpc_alerts.py new file mode 100644 index 0000000..3368824 --- /dev/null +++ b/src/central/adapters/swpc_alerts.py @@ -0,0 +1,186 @@ +"""NOAA SWPC space weather alerts adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.swpc_common import ( + SWPC_ALERTS_URL, + SWPCSettings, + parse_swpc_timestamp, + severity_from_alert_product_id, +) +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + + +class SWPCAlertsAdapter(SourceAdapter): + """NOAA SWPC space weather alerts adapter.""" + + name = "swpc_alerts" + display_name = "NOAA SWPC — Space Weather Alerts" + description = "Active NOAA SWPC space weather alerts, watches, warnings, and summaries." + settings_schema = SWPCSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 300 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + logger.info("SWPC alerts adapter started") + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("SWPC alerts adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + logger.info("SWPC alerts config updated") + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("SWPC alerts swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + product_id = event.data.get("product_id") or "unknown" + return f"central.space.alert.{product_id.lower()}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> list[dict[str, Any]]: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + SWPC_ALERTS_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + data = await resp.json() + logger.info("SWPC alerts fetch completed", extra={"item_count": len(data)}) + return data + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + items = await self._fetch() + except Exception as e: + logger.error("SWPC alerts fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + for item in items: + product_id = item.get("product_id") + issue_dt_raw = item.get("issue_datetime") + if not product_id or not issue_dt_raw: + continue + + event_id = f"{product_id}|{issue_dt_raw}" + if self.is_published(event_id): + continue + + issue_dt = parse_swpc_timestamp(issue_dt_raw, "alerts") or datetime.now(timezone.utc) + + event = Event( + id=event_id, + adapter=self.name, + category="space.alert", + time=issue_dt, + severity=severity_from_alert_product_id(product_id), + geo=Geo(), + data={ + "product_id": product_id, + "issue_datetime": issue_dt_raw, + "message": item.get("message", ""), + }, + ) + + yield event + self.mark_published(event_id) + events_yielded += 1 + + self.sweep_old_ids() + logger.info("SWPC alerts poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/swpc_common.py b/src/central/adapters/swpc_common.py new file mode 100644 index 0000000..004f965 --- /dev/null +++ b/src/central/adapters/swpc_common.py @@ -0,0 +1,81 @@ +"""Shared utilities for NOAA SWPC space weather adapters.""" + +import re +from datetime import datetime, timezone + +from pydantic import BaseModel + +SWPC_ALERTS_URL = "https://services.swpc.noaa.gov/products/alerts.json" +SWPC_KINDEX_URL = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json" +SWPC_PROTONS_URL = "https://services.swpc.noaa.gov/json/goes/primary/integral-protons-1-day.json" + + +class SWPCSettings(BaseModel): + """Settings schema for SWPC adapters. No operator-tunable knobs today.""" + + +def parse_swpc_timestamp(raw: str | None, endpoint_kind: str) -> datetime | None: + """Normalize SWPC timestamp strings to UTC datetime. + + endpoint_kind shapes: + alerts -> "2026-05-19 05:14:59.780" (space-separated, no TZ; UTC per message body) + kindex -> "2026-05-12T00:00:00" (ISO without TZ; UTC by convention) + protons -> "2026-05-18T05:35:00Z" (ISO with Z) + """ + if not raw: + return None + if endpoint_kind == "alerts": + try: + dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S.%f") + except ValueError: + dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") + return dt.replace(tzinfo=timezone.utc) + if endpoint_kind == "kindex": + dt = datetime.fromisoformat(raw) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + if endpoint_kind == "protons": + raw_norm = raw[:-1] + "+00:00" if raw.endswith("Z") else raw + dt = datetime.fromisoformat(raw_norm) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + raise ValueError(f"unknown endpoint_kind: {endpoint_kind!r}") + + +def severity_from_kp(kp: float | int | None) -> int: + """Map planetary K-index value (0-9) to severity 0-4 via the G-scale. + + Kp 5 = G1 = severity 1, Kp 6 = G2 = severity 2, Kp 7 = G3 = severity 3, + Kp 8 = G4 = severity 4, Kp 9 = G5 = severity 4 (capped). + """ + if kp is None: + return 0 + if kp < 5: + return 0 + if kp < 6: + return 1 + if kp < 7: + return 2 + if kp < 8: + return 3 + return 4 + + +_ALERT_KP_PATTERN = re.compile(r"^K0([5-9])[AW]$") + + +def severity_from_alert_product_id(product_id: str | None) -> int: + """Best-effort severity for an alert from its product_id G-scale. + + Product IDs of form K0[5-9][AW] identify Kp-based geomagnetic storm + alerts and warnings (K05A=G1, K06A=G2, K07A=G3, K08A=G4, K09A=G5). + All other product IDs return 0. + """ + if not product_id: + return 0 + m = _ALERT_KP_PATTERN.match(product_id.upper()) + if not m: + return 0 + return severity_from_kp(int(m.group(1))) diff --git a/src/central/adapters/swpc_kindex.py b/src/central/adapters/swpc_kindex.py new file mode 100644 index 0000000..f05bcbb --- /dev/null +++ b/src/central/adapters/swpc_kindex.py @@ -0,0 +1,186 @@ +"""NOAA SWPC Planetary K-Index adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.swpc_common import ( + SWPC_KINDEX_URL, + SWPCSettings, + parse_swpc_timestamp, + severity_from_kp, +) +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + + +class SWPCKindexAdapter(SourceAdapter): + """NOAA SWPC planetary K-index adapter.""" + + name = "swpc_kindex" + display_name = "NOAA SWPC — Planetary K-Index" + description = "Planetary K-index measurements at 3-hour cadence from NOAA SWPC." + settings_schema = SWPCSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + logger.info("SWPC kindex adapter started") + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("SWPC kindex adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + logger.info("SWPC kindex config updated") + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("SWPC kindex swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + return "central.space.kindex" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> list[dict[str, Any]]: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + SWPC_KINDEX_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + data = await resp.json() + logger.info("SWPC kindex fetch completed", extra={"item_count": len(data)}) + return data + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + items = await self._fetch() + except Exception as e: + logger.error("SWPC kindex fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + for item in items: + time_tag = item.get("time_tag") + kp = item.get("Kp") + if not time_tag or kp is None: + continue + + event_id = time_tag + if self.is_published(event_id): + continue + + event_time = parse_swpc_timestamp(time_tag, "kindex") or datetime.now(timezone.utc) + + event = Event( + id=event_id, + adapter=self.name, + category="space.kindex", + time=event_time, + severity=severity_from_kp(kp), + geo=Geo(), + data={ + "time_tag": time_tag, + "Kp": kp, + "a_running": item.get("a_running"), + "station_count": item.get("station_count"), + }, + ) + + yield event + self.mark_published(event_id) + events_yielded += 1 + + self.sweep_old_ids() + logger.info("SWPC kindex poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/swpc_protons.py b/src/central/adapters/swpc_protons.py new file mode 100644 index 0000000..1a3876e --- /dev/null +++ b/src/central/adapters/swpc_protons.py @@ -0,0 +1,185 @@ +"""NOAA SWPC GOES integral proton flux adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.swpc_common import ( + SWPC_PROTONS_URL, + SWPCSettings, + parse_swpc_timestamp, +) +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + + +class SWPCProtonsAdapter(SourceAdapter): + """NOAA SWPC GOES integral proton flux adapter.""" + + name = "swpc_protons" + display_name = "NOAA SWPC — GOES Proton Flux" + description = "GOES primary satellite integral proton flux measurements (1-day window) from NOAA SWPC." + settings_schema = SWPCSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + logger.info("SWPC protons adapter started") + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("SWPC protons adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + logger.info("SWPC protons config updated") + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("SWPC protons swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + return "central.space.proton_flux" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> list[dict[str, Any]]: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + SWPC_PROTONS_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + data = await resp.json() + logger.info("SWPC protons fetch completed", extra={"item_count": len(data)}) + return data + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + items = await self._fetch() + except Exception as e: + logger.error("SWPC protons fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + for item in items: + time_tag = item.get("time_tag") + energy = item.get("energy") + if not time_tag or not energy: + continue + + event_id = f"{time_tag}|{energy}" + if self.is_published(event_id): + continue + + event_time = parse_swpc_timestamp(time_tag, "protons") or datetime.now(timezone.utc) + + event = Event( + id=event_id, + adapter=self.name, + category="space.proton_flux", + time=event_time, + severity=0, + geo=Geo(), + data={ + "time_tag": time_tag, + "satellite": item.get("satellite"), + "flux": item.get("flux"), + "energy": energy, + }, + ) + + yield event + self.mark_published(event_id) + events_yielded += 1 + + self.sweep_old_ids() + logger.info("SWPC protons poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/archive.py b/src/central/archive.py index b64a187..c173805 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -25,6 +25,7 @@ STREAMS = [ ("CENTRAL_WX", "central.wx.>"), ("CENTRAL_FIRE", "central.fire.>"), ("CENTRAL_QUAKE", "central.quake.>"), + ("CENTRAL_SPACE", "central.space.>"), ] BATCH_SIZE = 100 diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index e231df8..b5c66f7 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -64,7 +64,7 @@ def _adapter_classes() -> dict: router = APIRouter() # Streams to display on dashboard -DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"] # Email validation regex (simple but effective) ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") diff --git a/src/central/supervisor.py b/src/central/supervisor.py index bebed6c..81fa1bf 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -29,6 +29,7 @@ STREAM_SUBJECTS = { "CENTRAL_META": ["central.meta.>"], "CENTRAL_FIRE": ["central.fire.>"], "CENTRAL_QUAKE": ["central.quake.>"], + "CENTRAL_SPACE": ["central.space.>"], } # Recompute interval for stream max_bytes (1 hour) diff --git a/tests/test_archive_multi_stream.py b/tests/test_archive_multi_stream.py index e3d09a3..bba544d 100644 --- a/tests/test_archive_multi_stream.py +++ b/tests/test_archive_multi_stream.py @@ -29,9 +29,9 @@ class TestConsumerNaming: class TestStreamsConfiguration: """Test streams configuration.""" - def test_streams_list_has_three_entries(self): - """STREAMS list has three event-bearing streams.""" - assert len(STREAMS) == 3 + def test_streams_list_has_four_entries(self): + """STREAMS list has four event-bearing streams.""" + assert len(STREAMS) == 4 def test_streams_contains_central_wx(self): """STREAMS contains CENTRAL_WX with correct filter.""" @@ -45,6 +45,10 @@ class TestStreamsConfiguration: """STREAMS contains CENTRAL_QUAKE with correct filter.""" assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS + def test_streams_contains_central_space(self): + """STREAMS contains CENTRAL_SPACE with correct filter.""" + assert ("CENTRAL_SPACE", "central.space.>") in STREAMS + def test_streams_excludes_central_meta(self): """STREAMS does not contain CENTRAL_META (status messages only).""" stream_names = [s[0] for s in STREAMS] diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py index f03f9ac..73df132 100644 --- a/tests/test_dashboard.py +++ b/tests/test_dashboard.py @@ -205,7 +205,7 @@ class TestDashboardStreamsIsolation: call_args = mock_templates.TemplateResponse.call_args context = call_args.kwargs.get("context", call_args[1].get("context")) streams = context["streams"] - assert len(streams) == 4 + assert len(streams) == 5 fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE") assert fire_stream.get("error") == "unavailable" wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX") diff --git a/tests/test_swpc.py b/tests/test_swpc.py new file mode 100644 index 0000000..2fce539 --- /dev/null +++ b/tests/test_swpc.py @@ -0,0 +1,339 @@ +"""Tests for NOAA SWPC space weather adapters.""" + +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.config_models import AdapterConfig +from central.models import Event + + +# Frozen fixtures captured from upstream feeds; real shapes. +SAMPLE_ALERTS = [ + { + "product_id": "EF3A", + "issue_datetime": "2026-05-19 05:14:59.780", + "message": ( + "Space Weather Message Code: ALTEF3\r\nSerial Number: 3689\r\n" + "Issue Time: 2026 May 19 0514 UTC\r\n\r\n" + "ALERT: Electron 2MeV Integral Flux exceeded 1000pfu \n" + "Threshold Reached: 2026 May 16 1740 UTC\n" + "Station: GOES-19\n" + ), + }, + { + "product_id": "K05A", + "issue_datetime": "2026-05-15 14:30:00.000", + "message": ( + "Space Weather Message Code: ALTK05\r\nSerial Number: 100\r\n" + "Issue Time: 2026 May 15 1430 UTC\r\n\r\n" + "ALERT: Geomagnetic K-index of 5\n" + ), + }, + { + "product_id": "K07A", + "issue_datetime": "2026-05-15 18:00:00.000", + "message": "Space Weather Message Code: ALTK07\r\nSerial Number: 101\r\n", + }, +] + +SAMPLE_KINDEX = [ + {"time_tag": "2026-05-12T00:00:00", "Kp": 0.67, "a_running": 3, "station_count": 8}, + {"time_tag": "2026-05-12T03:00:00", "Kp": 5.33, "a_running": 30, "station_count": 8}, + {"time_tag": "2026-05-12T06:00:00", "Kp": 8.0, "a_running": 100, "station_count": 8}, +] + +SAMPLE_PROTONS = [ + {"time_tag": "2026-05-18T05:35:00Z", "satellite": 19, "flux": 7.09, "energy": ">=1 MeV"}, + {"time_tag": "2026-05-18T05:35:00Z", "satellite": 19, "flux": 0.21, "energy": ">=10 MeV"}, + {"time_tag": "2026-05-18T05:40:00Z", "satellite": 19, "flux": 7.10, "energy": ">=1 MeV"}, +] + + +def _config(name: str, cadence: int) -> AdapterConfig: + return AdapterConfig( + name=name, + enabled=True, + cadence_s=cadence, + settings={}, + updated_at=datetime.now(timezone.utc), + ) + + +class TestSWPCCommon: + """Tests for swpc_common helpers.""" + + def test_parse_swpc_timestamp_alerts(self): + from central.adapters.swpc_common import parse_swpc_timestamp + + dt = parse_swpc_timestamp("2026-05-19 05:14:59.780", "alerts") + assert dt == datetime(2026, 5, 19, 5, 14, 59, 780000, tzinfo=timezone.utc) + + def test_parse_swpc_timestamp_alerts_no_fraction(self): + from central.adapters.swpc_common import parse_swpc_timestamp + + dt = parse_swpc_timestamp("2026-05-19 05:14:59", "alerts") + assert dt == datetime(2026, 5, 19, 5, 14, 59, tzinfo=timezone.utc) + + def test_parse_swpc_timestamp_kindex(self): + from central.adapters.swpc_common import parse_swpc_timestamp + + dt = parse_swpc_timestamp("2026-05-12T03:00:00", "kindex") + assert dt == datetime(2026, 5, 12, 3, 0, 0, tzinfo=timezone.utc) + + def test_parse_swpc_timestamp_protons(self): + from central.adapters.swpc_common import parse_swpc_timestamp + + dt = parse_swpc_timestamp("2026-05-18T05:35:00Z", "protons") + assert dt == datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc) + + def test_parse_swpc_timestamp_empty(self): + from central.adapters.swpc_common import parse_swpc_timestamp + + assert parse_swpc_timestamp("", "alerts") is None + assert parse_swpc_timestamp(None, "alerts") is None + + def test_severity_from_kp_boundaries(self): + from central.adapters.swpc_common import severity_from_kp + + assert severity_from_kp(None) == 0 + assert severity_from_kp(0) == 0 + assert severity_from_kp(4.5) == 0 + assert severity_from_kp(4.9) == 0 + assert severity_from_kp(5.0) == 1 + assert severity_from_kp(5.99) == 1 + assert severity_from_kp(6.0) == 2 + assert severity_from_kp(6.99) == 2 + assert severity_from_kp(7.0) == 3 + assert severity_from_kp(7.99) == 3 + assert severity_from_kp(8.0) == 4 + assert severity_from_kp(9.0) == 4 + + def test_severity_from_alert_product_id(self): + from central.adapters.swpc_common import severity_from_alert_product_id + + assert severity_from_alert_product_id(None) == 0 + assert severity_from_alert_product_id("") == 0 + assert severity_from_alert_product_id("EF3A") == 0 + assert severity_from_alert_product_id("BHIS") == 0 + assert severity_from_alert_product_id("K04A") == 0 + assert severity_from_alert_product_id("K05A") == 1 + assert severity_from_alert_product_id("K05W") == 1 + assert severity_from_alert_product_id("K06A") == 2 + assert severity_from_alert_product_id("K07A") == 3 + assert severity_from_alert_product_id("K08A") == 4 + assert severity_from_alert_product_id("K09A") == 4 + + +class TestSWPCAlertsAdapter: + """Tests for SWPCAlertsAdapter.""" + + @pytest.mark.asyncio + async def test_alerts_normalization(self, tmp_path: Path): + from central.adapters.swpc_alerts import SWPCAlertsAdapter + + adapter = SWPCAlertsAdapter( + _config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db" + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_ALERTS) + + await adapter.startup() + events: list[Event] = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(events) == 3 + + ef3a = events[0] + assert ef3a.adapter == "swpc_alerts" + assert ef3a.category == "space.alert" + assert ef3a.id == "EF3A|2026-05-19 05:14:59.780" + assert ef3a.time == datetime(2026, 5, 19, 5, 14, 59, 780000, tzinfo=timezone.utc) + assert ef3a.severity == 0 + assert ef3a.data["product_id"] == "EF3A" + assert ef3a.geo.centroid is None + assert ef3a.geo.regions == [] + assert ef3a.geo.primary_region is None + + k05a = events[1] + assert k05a.severity == 1 + k07a = events[2] + assert k07a.severity == 3 + + @pytest.mark.asyncio + async def test_alerts_dedup(self, tmp_path: Path): + from central.adapters.swpc_alerts import SWPCAlertsAdapter + + adapter = SWPCAlertsAdapter( + _config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db" + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_ALERTS) + + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(first_pass) == 3 + assert len(second_pass) == 0 + + @pytest.mark.asyncio + async def test_alerts_subject_for(self, tmp_path: Path): + from central.adapters.swpc_alerts import SWPCAlertsAdapter + from central.models import Geo + + adapter = SWPCAlertsAdapter( + _config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db" + ) + event = Event( + id="EF3A|2026-05-19 05:14:59.780", + adapter="swpc_alerts", + category="space.alert", + time=datetime(2026, 5, 19, 5, 14, 59, tzinfo=timezone.utc), + severity=0, + geo=Geo(), + data={"product_id": "EF3A"}, + ) + assert adapter.subject_for(event) == "central.space.alert.ef3a" + + event_k = Event( + id="K05A|...", + adapter="swpc_alerts", + category="space.alert", + time=datetime(2026, 5, 15, tzinfo=timezone.utc), + severity=1, + geo=Geo(), + data={"product_id": "K05A"}, + ) + assert adapter.subject_for(event_k) == "central.space.alert.k05a" + + +class TestSWPCKindexAdapter: + """Tests for SWPCKindexAdapter.""" + + @pytest.mark.asyncio + async def test_kindex_normalization(self, tmp_path: Path): + from central.adapters.swpc_kindex import SWPCKindexAdapter + + adapter = SWPCKindexAdapter( + _config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db" + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_KINDEX) + + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(events) == 3 + quiet, g1, g4 = events + assert quiet.category == "space.kindex" + assert quiet.id == "2026-05-12T00:00:00" + assert quiet.severity == 0 + assert quiet.data["Kp"] == 0.67 + assert g1.severity == 1 + assert g4.severity == 4 + assert g4.time == datetime(2026, 5, 12, 6, 0, 0, tzinfo=timezone.utc) + + @pytest.mark.asyncio + async def test_kindex_dedup(self, tmp_path: Path): + from central.adapters.swpc_kindex import SWPCKindexAdapter + + adapter = SWPCKindexAdapter( + _config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db" + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_KINDEX) + + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(first_pass) == 3 + assert len(second_pass) == 0 + + @pytest.mark.asyncio + async def test_kindex_subject_for(self, tmp_path: Path): + from central.adapters.swpc_kindex import SWPCKindexAdapter + from central.models import Geo + + adapter = SWPCKindexAdapter( + _config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db" + ) + event = Event( + id="2026-05-12T03:00:00", + adapter="swpc_kindex", + category="space.kindex", + time=datetime(2026, 5, 12, 3, tzinfo=timezone.utc), + severity=1, + geo=Geo(), + data={"Kp": 5.33}, + ) + assert adapter.subject_for(event) == "central.space.kindex" + + +class TestSWPCProtonsAdapter: + """Tests for SWPCProtonsAdapter.""" + + @pytest.mark.asyncio + async def test_protons_normalization(self, tmp_path: Path): + from central.adapters.swpc_protons import SWPCProtonsAdapter + + adapter = SWPCProtonsAdapter( + _config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db" + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_PROTONS) + + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(events) == 3 + first = events[0] + assert first.category == "space.proton_flux" + assert first.id == "2026-05-18T05:35:00Z|>=1 MeV" + assert first.severity == 0 + assert first.data["energy"] == ">=1 MeV" + assert first.data["flux"] == 7.09 + assert first.time == datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc) + assert first.geo.centroid is None + assert first.geo.regions == [] + + # Same time_tag, different energy -> distinct event_id + assert events[1].id == "2026-05-18T05:35:00Z|>=10 MeV" + + @pytest.mark.asyncio + async def test_protons_dedup(self, tmp_path: Path): + from central.adapters.swpc_protons import SWPCProtonsAdapter + + adapter = SWPCProtonsAdapter( + _config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db" + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_PROTONS) + + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(first_pass) == 3 + assert len(second_pass) == 0 + + @pytest.mark.asyncio + async def test_protons_subject_for(self, tmp_path: Path): + from central.adapters.swpc_protons import SWPCProtonsAdapter + from central.models import Geo + + adapter = SWPCProtonsAdapter( + _config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db" + ) + event = Event( + id="2026-05-18T05:35:00Z|>=10 MeV", + adapter="swpc_protons", + category="space.proton_flux", + time=datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc), + severity=0, + geo=Geo(), + data={"energy": ">=10 MeV", "flux": 0.21}, + ) + assert adapter.subject_for(event) == "central.space.proton_flux"