Merge feature/2-d-swpc: NOAA SWPC space weather adapters

feat(2-D): NOAA SWPC space weather adapters
This commit is contained in:
malice 2026-05-19 00:28:20 -06:00 committed by GitHub
commit 37a778468d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1007 additions and 5 deletions

View file

@ -0,0 +1,11 @@
-- Migration: 018_add_swpc_adapters
-- Add NOAA SWPC space weather adapters to config.adapters.
-- All three ship disabled; operator enables individually via GUI.
-- Idempotent: uses ON CONFLICT DO NOTHING.
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES
('swpc_alerts', false, 300, '{}'::jsonb),
('swpc_kindex', false, 600, '{}'::jsonb),
('swpc_protons', false, 600, '{}'::jsonb)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,8 @@
-- Migration: 019_add_central_space_stream
-- Seeds the CENTRAL_SPACE JetStream stream row for central.space.> subjects.
-- 7-day retention, 1 GiB max_bytes (clamped by supervisor recompute) -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE.
-- Idempotent: uses ON CONFLICT DO NOTHING.
INSERT INTO config.streams (name, max_age_s, max_bytes)
VALUES ('CENTRAL_SPACE', 604800, 1073741824)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,186 @@
"""NOAA SWPC space weather alerts adapter."""
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.adapters.swpc_common import (
SWPC_ALERTS_URL,
SWPCSettings,
parse_swpc_timestamp,
severity_from_alert_product_id,
)
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
class SWPCAlertsAdapter(SourceAdapter):
"""NOAA SWPC space weather alerts adapter."""
name = "swpc_alerts"
display_name = "NOAA SWPC — Space Weather Alerts"
description = "Active NOAA SWPC space weather alerts, watches, warnings, and summaries."
settings_schema = SWPCSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 300
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
logger.info("SWPC alerts adapter started")
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("SWPC alerts adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
logger.info("SWPC alerts config updated")
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("SWPC alerts swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
product_id = event.data.get("product_id") or "unknown"
return f"central.space.alert.{product_id.lower()}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch(self) -> list[dict[str, Any]]:
if not self._session:
raise RuntimeError("Session not initialized")
async with self._session.get(
SWPC_ALERTS_URL, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
data = await resp.json()
logger.info("SWPC alerts fetch completed", extra={"item_count": len(data)})
return data
async def poll(self) -> AsyncIterator[Event]:
if not self._db:
raise RuntimeError("Database not initialized")
try:
items = await self._fetch()
except Exception as e:
logger.error("SWPC alerts fetch failed", extra={"error": str(e)})
raise
events_yielded = 0
for item in items:
product_id = item.get("product_id")
issue_dt_raw = item.get("issue_datetime")
if not product_id or not issue_dt_raw:
continue
event_id = f"{product_id}|{issue_dt_raw}"
if self.is_published(event_id):
continue
issue_dt = parse_swpc_timestamp(issue_dt_raw, "alerts") or datetime.now(timezone.utc)
event = Event(
id=event_id,
adapter=self.name,
category="space.alert",
time=issue_dt,
severity=severity_from_alert_product_id(product_id),
geo=Geo(),
data={
"product_id": product_id,
"issue_datetime": issue_dt_raw,
"message": item.get("message", ""),
},
)
yield event
self.mark_published(event_id)
events_yielded += 1
self.sweep_old_ids()
logger.info("SWPC alerts poll completed", extra={"events_yielded": events_yielded})

View file

@ -0,0 +1,81 @@
"""Shared utilities for NOAA SWPC space weather adapters."""
import re
from datetime import datetime, timezone
from pydantic import BaseModel
SWPC_ALERTS_URL = "https://services.swpc.noaa.gov/products/alerts.json"
SWPC_KINDEX_URL = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json"
SWPC_PROTONS_URL = "https://services.swpc.noaa.gov/json/goes/primary/integral-protons-1-day.json"
class SWPCSettings(BaseModel):
"""Settings schema for SWPC adapters. No operator-tunable knobs today."""
def parse_swpc_timestamp(raw: str | None, endpoint_kind: str) -> datetime | None:
"""Normalize SWPC timestamp strings to UTC datetime.
endpoint_kind shapes:
alerts -> "2026-05-19 05:14:59.780" (space-separated, no TZ; UTC per message body)
kindex -> "2026-05-12T00:00:00" (ISO without TZ; UTC by convention)
protons -> "2026-05-18T05:35:00Z" (ISO with Z)
"""
if not raw:
return None
if endpoint_kind == "alerts":
try:
dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S")
return dt.replace(tzinfo=timezone.utc)
if endpoint_kind == "kindex":
dt = datetime.fromisoformat(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
if endpoint_kind == "protons":
raw_norm = raw[:-1] + "+00:00" if raw.endswith("Z") else raw
dt = datetime.fromisoformat(raw_norm)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
raise ValueError(f"unknown endpoint_kind: {endpoint_kind!r}")
def severity_from_kp(kp: float | int | None) -> int:
"""Map planetary K-index value (0-9) to severity 0-4 via the G-scale.
Kp 5 = G1 = severity 1, Kp 6 = G2 = severity 2, Kp 7 = G3 = severity 3,
Kp 8 = G4 = severity 4, Kp 9 = G5 = severity 4 (capped).
"""
if kp is None:
return 0
if kp < 5:
return 0
if kp < 6:
return 1
if kp < 7:
return 2
if kp < 8:
return 3
return 4
_ALERT_KP_PATTERN = re.compile(r"^K0([5-9])[AW]$")
def severity_from_alert_product_id(product_id: str | None) -> int:
"""Best-effort severity for an alert from its product_id G-scale.
Product IDs of form K0[5-9][AW] identify Kp-based geomagnetic storm
alerts and warnings (K05A=G1, K06A=G2, K07A=G3, K08A=G4, K09A=G5).
All other product IDs return 0.
"""
if not product_id:
return 0
m = _ALERT_KP_PATTERN.match(product_id.upper())
if not m:
return 0
return severity_from_kp(int(m.group(1)))

View file

@ -0,0 +1,186 @@
"""NOAA SWPC Planetary K-Index adapter."""
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.adapters.swpc_common import (
SWPC_KINDEX_URL,
SWPCSettings,
parse_swpc_timestamp,
severity_from_kp,
)
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
class SWPCKindexAdapter(SourceAdapter):
"""NOAA SWPC planetary K-index adapter."""
name = "swpc_kindex"
display_name = "NOAA SWPC — Planetary K-Index"
description = "Planetary K-index measurements at 3-hour cadence from NOAA SWPC."
settings_schema = SWPCSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
logger.info("SWPC kindex adapter started")
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("SWPC kindex adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
logger.info("SWPC kindex config updated")
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("SWPC kindex swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
return "central.space.kindex"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch(self) -> list[dict[str, Any]]:
if not self._session:
raise RuntimeError("Session not initialized")
async with self._session.get(
SWPC_KINDEX_URL, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
data = await resp.json()
logger.info("SWPC kindex fetch completed", extra={"item_count": len(data)})
return data
async def poll(self) -> AsyncIterator[Event]:
if not self._db:
raise RuntimeError("Database not initialized")
try:
items = await self._fetch()
except Exception as e:
logger.error("SWPC kindex fetch failed", extra={"error": str(e)})
raise
events_yielded = 0
for item in items:
time_tag = item.get("time_tag")
kp = item.get("Kp")
if not time_tag or kp is None:
continue
event_id = time_tag
if self.is_published(event_id):
continue
event_time = parse_swpc_timestamp(time_tag, "kindex") or datetime.now(timezone.utc)
event = Event(
id=event_id,
adapter=self.name,
category="space.kindex",
time=event_time,
severity=severity_from_kp(kp),
geo=Geo(),
data={
"time_tag": time_tag,
"Kp": kp,
"a_running": item.get("a_running"),
"station_count": item.get("station_count"),
},
)
yield event
self.mark_published(event_id)
events_yielded += 1
self.sweep_old_ids()
logger.info("SWPC kindex poll completed", extra={"events_yielded": events_yielded})

View file

@ -0,0 +1,185 @@
"""NOAA SWPC GOES integral proton flux adapter."""
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.adapters.swpc_common import (
SWPC_PROTONS_URL,
SWPCSettings,
parse_swpc_timestamp,
)
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
class SWPCProtonsAdapter(SourceAdapter):
"""NOAA SWPC GOES integral proton flux adapter."""
name = "swpc_protons"
display_name = "NOAA SWPC — GOES Proton Flux"
description = "GOES primary satellite integral proton flux measurements (1-day window) from NOAA SWPC."
settings_schema = SWPCSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
logger.info("SWPC protons adapter started")
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("SWPC protons adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
logger.info("SWPC protons config updated")
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("SWPC protons swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
return "central.space.proton_flux"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch(self) -> list[dict[str, Any]]:
if not self._session:
raise RuntimeError("Session not initialized")
async with self._session.get(
SWPC_PROTONS_URL, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
data = await resp.json()
logger.info("SWPC protons fetch completed", extra={"item_count": len(data)})
return data
async def poll(self) -> AsyncIterator[Event]:
if not self._db:
raise RuntimeError("Database not initialized")
try:
items = await self._fetch()
except Exception as e:
logger.error("SWPC protons fetch failed", extra={"error": str(e)})
raise
events_yielded = 0
for item in items:
time_tag = item.get("time_tag")
energy = item.get("energy")
if not time_tag or not energy:
continue
event_id = f"{time_tag}|{energy}"
if self.is_published(event_id):
continue
event_time = parse_swpc_timestamp(time_tag, "protons") or datetime.now(timezone.utc)
event = Event(
id=event_id,
adapter=self.name,
category="space.proton_flux",
time=event_time,
severity=0,
geo=Geo(),
data={
"time_tag": time_tag,
"satellite": item.get("satellite"),
"flux": item.get("flux"),
"energy": energy,
},
)
yield event
self.mark_published(event_id)
events_yielded += 1
self.sweep_old_ids()
logger.info("SWPC protons poll completed", extra={"events_yielded": events_yielded})

View file

@ -25,6 +25,7 @@ STREAMS = [
("CENTRAL_WX", "central.wx.>"),
("CENTRAL_FIRE", "central.fire.>"),
("CENTRAL_QUAKE", "central.quake.>"),
("CENTRAL_SPACE", "central.space.>"),
]
BATCH_SIZE = 100

View file

@ -64,7 +64,7 @@ def _adapter_classes() -> dict:
router = APIRouter()
# Streams to display on dashboard
DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"]
DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"]
# Email validation regex (simple but effective)
ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$")

View file

@ -29,6 +29,7 @@ STREAM_SUBJECTS = {
"CENTRAL_META": ["central.meta.>"],
"CENTRAL_FIRE": ["central.fire.>"],
"CENTRAL_QUAKE": ["central.quake.>"],
"CENTRAL_SPACE": ["central.space.>"],
}
# Recompute interval for stream max_bytes (1 hour)

View file

@ -29,9 +29,9 @@ class TestConsumerNaming:
class TestStreamsConfiguration:
"""Test streams configuration."""
def test_streams_list_has_three_entries(self):
"""STREAMS list has three event-bearing streams."""
assert len(STREAMS) == 3
def test_streams_list_has_four_entries(self):
"""STREAMS list has four event-bearing streams."""
assert len(STREAMS) == 4
def test_streams_contains_central_wx(self):
"""STREAMS contains CENTRAL_WX with correct filter."""
@ -45,6 +45,10 @@ class TestStreamsConfiguration:
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
def test_streams_contains_central_space(self):
"""STREAMS contains CENTRAL_SPACE with correct filter."""
assert ("CENTRAL_SPACE", "central.space.>") in STREAMS
def test_streams_excludes_central_meta(self):
"""STREAMS does not contain CENTRAL_META (status messages only)."""
stream_names = [s[0] for s in STREAMS]

View file

@ -205,7 +205,7 @@ class TestDashboardStreamsIsolation:
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
streams = context["streams"]
assert len(streams) == 4
assert len(streams) == 5
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
assert fire_stream.get("error") == "unavailable"
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")

339
tests/test_swpc.py Normal file
View file

@ -0,0 +1,339 @@
"""Tests for NOAA SWPC space weather adapters."""
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event
# Frozen fixtures captured from upstream feeds; real shapes.
SAMPLE_ALERTS = [
{
"product_id": "EF3A",
"issue_datetime": "2026-05-19 05:14:59.780",
"message": (
"Space Weather Message Code: ALTEF3\r\nSerial Number: 3689\r\n"
"Issue Time: 2026 May 19 0514 UTC\r\n\r\n"
"ALERT: Electron 2MeV Integral Flux exceeded 1000pfu \n"
"Threshold Reached: 2026 May 16 1740 UTC\n"
"Station: GOES-19\n"
),
},
{
"product_id": "K05A",
"issue_datetime": "2026-05-15 14:30:00.000",
"message": (
"Space Weather Message Code: ALTK05\r\nSerial Number: 100\r\n"
"Issue Time: 2026 May 15 1430 UTC\r\n\r\n"
"ALERT: Geomagnetic K-index of 5\n"
),
},
{
"product_id": "K07A",
"issue_datetime": "2026-05-15 18:00:00.000",
"message": "Space Weather Message Code: ALTK07\r\nSerial Number: 101\r\n",
},
]
SAMPLE_KINDEX = [
{"time_tag": "2026-05-12T00:00:00", "Kp": 0.67, "a_running": 3, "station_count": 8},
{"time_tag": "2026-05-12T03:00:00", "Kp": 5.33, "a_running": 30, "station_count": 8},
{"time_tag": "2026-05-12T06:00:00", "Kp": 8.0, "a_running": 100, "station_count": 8},
]
SAMPLE_PROTONS = [
{"time_tag": "2026-05-18T05:35:00Z", "satellite": 19, "flux": 7.09, "energy": ">=1 MeV"},
{"time_tag": "2026-05-18T05:35:00Z", "satellite": 19, "flux": 0.21, "energy": ">=10 MeV"},
{"time_tag": "2026-05-18T05:40:00Z", "satellite": 19, "flux": 7.10, "energy": ">=1 MeV"},
]
def _config(name: str, cadence: int) -> AdapterConfig:
return AdapterConfig(
name=name,
enabled=True,
cadence_s=cadence,
settings={},
updated_at=datetime.now(timezone.utc),
)
class TestSWPCCommon:
"""Tests for swpc_common helpers."""
def test_parse_swpc_timestamp_alerts(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-19 05:14:59.780", "alerts")
assert dt == datetime(2026, 5, 19, 5, 14, 59, 780000, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_alerts_no_fraction(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-19 05:14:59", "alerts")
assert dt == datetime(2026, 5, 19, 5, 14, 59, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_kindex(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-12T03:00:00", "kindex")
assert dt == datetime(2026, 5, 12, 3, 0, 0, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_protons(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-18T05:35:00Z", "protons")
assert dt == datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_empty(self):
from central.adapters.swpc_common import parse_swpc_timestamp
assert parse_swpc_timestamp("", "alerts") is None
assert parse_swpc_timestamp(None, "alerts") is None
def test_severity_from_kp_boundaries(self):
from central.adapters.swpc_common import severity_from_kp
assert severity_from_kp(None) == 0
assert severity_from_kp(0) == 0
assert severity_from_kp(4.5) == 0
assert severity_from_kp(4.9) == 0
assert severity_from_kp(5.0) == 1
assert severity_from_kp(5.99) == 1
assert severity_from_kp(6.0) == 2
assert severity_from_kp(6.99) == 2
assert severity_from_kp(7.0) == 3
assert severity_from_kp(7.99) == 3
assert severity_from_kp(8.0) == 4
assert severity_from_kp(9.0) == 4
def test_severity_from_alert_product_id(self):
from central.adapters.swpc_common import severity_from_alert_product_id
assert severity_from_alert_product_id(None) == 0
assert severity_from_alert_product_id("") == 0
assert severity_from_alert_product_id("EF3A") == 0
assert severity_from_alert_product_id("BHIS") == 0
assert severity_from_alert_product_id("K04A") == 0
assert severity_from_alert_product_id("K05A") == 1
assert severity_from_alert_product_id("K05W") == 1
assert severity_from_alert_product_id("K06A") == 2
assert severity_from_alert_product_id("K07A") == 3
assert severity_from_alert_product_id("K08A") == 4
assert severity_from_alert_product_id("K09A") == 4
class TestSWPCAlertsAdapter:
"""Tests for SWPCAlertsAdapter."""
@pytest.mark.asyncio
async def test_alerts_normalization(self, tmp_path: Path):
from central.adapters.swpc_alerts import SWPCAlertsAdapter
adapter = SWPCAlertsAdapter(
_config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_ALERTS)
await adapter.startup()
events: list[Event] = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 3
ef3a = events[0]
assert ef3a.adapter == "swpc_alerts"
assert ef3a.category == "space.alert"
assert ef3a.id == "EF3A|2026-05-19 05:14:59.780"
assert ef3a.time == datetime(2026, 5, 19, 5, 14, 59, 780000, tzinfo=timezone.utc)
assert ef3a.severity == 0
assert ef3a.data["product_id"] == "EF3A"
assert ef3a.geo.centroid is None
assert ef3a.geo.regions == []
assert ef3a.geo.primary_region is None
k05a = events[1]
assert k05a.severity == 1
k07a = events[2]
assert k07a.severity == 3
@pytest.mark.asyncio
async def test_alerts_dedup(self, tmp_path: Path):
from central.adapters.swpc_alerts import SWPCAlertsAdapter
adapter = SWPCAlertsAdapter(
_config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_ALERTS)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 3
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_alerts_subject_for(self, tmp_path: Path):
from central.adapters.swpc_alerts import SWPCAlertsAdapter
from central.models import Geo
adapter = SWPCAlertsAdapter(
_config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db"
)
event = Event(
id="EF3A|2026-05-19 05:14:59.780",
adapter="swpc_alerts",
category="space.alert",
time=datetime(2026, 5, 19, 5, 14, 59, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={"product_id": "EF3A"},
)
assert adapter.subject_for(event) == "central.space.alert.ef3a"
event_k = Event(
id="K05A|...",
adapter="swpc_alerts",
category="space.alert",
time=datetime(2026, 5, 15, tzinfo=timezone.utc),
severity=1,
geo=Geo(),
data={"product_id": "K05A"},
)
assert adapter.subject_for(event_k) == "central.space.alert.k05a"
class TestSWPCKindexAdapter:
"""Tests for SWPCKindexAdapter."""
@pytest.mark.asyncio
async def test_kindex_normalization(self, tmp_path: Path):
from central.adapters.swpc_kindex import SWPCKindexAdapter
adapter = SWPCKindexAdapter(
_config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_KINDEX)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 3
quiet, g1, g4 = events
assert quiet.category == "space.kindex"
assert quiet.id == "2026-05-12T00:00:00"
assert quiet.severity == 0
assert quiet.data["Kp"] == 0.67
assert g1.severity == 1
assert g4.severity == 4
assert g4.time == datetime(2026, 5, 12, 6, 0, 0, tzinfo=timezone.utc)
@pytest.mark.asyncio
async def test_kindex_dedup(self, tmp_path: Path):
from central.adapters.swpc_kindex import SWPCKindexAdapter
adapter = SWPCKindexAdapter(
_config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_KINDEX)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 3
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_kindex_subject_for(self, tmp_path: Path):
from central.adapters.swpc_kindex import SWPCKindexAdapter
from central.models import Geo
adapter = SWPCKindexAdapter(
_config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db"
)
event = Event(
id="2026-05-12T03:00:00",
adapter="swpc_kindex",
category="space.kindex",
time=datetime(2026, 5, 12, 3, tzinfo=timezone.utc),
severity=1,
geo=Geo(),
data={"Kp": 5.33},
)
assert adapter.subject_for(event) == "central.space.kindex"
class TestSWPCProtonsAdapter:
"""Tests for SWPCProtonsAdapter."""
@pytest.mark.asyncio
async def test_protons_normalization(self, tmp_path: Path):
from central.adapters.swpc_protons import SWPCProtonsAdapter
adapter = SWPCProtonsAdapter(
_config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_PROTONS)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 3
first = events[0]
assert first.category == "space.proton_flux"
assert first.id == "2026-05-18T05:35:00Z|>=1 MeV"
assert first.severity == 0
assert first.data["energy"] == ">=1 MeV"
assert first.data["flux"] == 7.09
assert first.time == datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc)
assert first.geo.centroid is None
assert first.geo.regions == []
# Same time_tag, different energy -> distinct event_id
assert events[1].id == "2026-05-18T05:35:00Z|>=10 MeV"
@pytest.mark.asyncio
async def test_protons_dedup(self, tmp_path: Path):
from central.adapters.swpc_protons import SWPCProtonsAdapter
adapter = SWPCProtonsAdapter(
_config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_PROTONS)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 3
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_protons_subject_for(self, tmp_path: Path):
from central.adapters.swpc_protons import SWPCProtonsAdapter
from central.models import Geo
adapter = SWPCProtonsAdapter(
_config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db"
)
event = Event(
id="2026-05-18T05:35:00Z|>=10 MeV",
adapter="swpc_protons",
category="space.proton_flux",
time=datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={"energy": ">=10 MeV", "flux": 0.21},
)
assert adapter.subject_for(event) == "central.space.proton_flux"