central/tests/test_swpc.py
zvx-echo6 72ec498365 feat(2-D): add NOAA SWPC space weather adapters (alerts, kindex, protons)
Three independent adapters sharing src/central/adapters/swpc_common.py,
mirroring the WFIGS two-adapter pattern. Each adapter has its own row in
config.adapters (ships disabled), its own cadence, and its own dedup
state, so operators can independently enable/disable and so a broken
upstream endpoint does not silently mask a healthy one.

Subjects:
  swpc_alerts   -> central.space.alert.<product_id_lower>
  swpc_kindex   -> central.space.kindex
  swpc_protons  -> central.space.proton_flux

Dedup keys:
  alerts:   product_id + issue_datetime
  kindex:   time_tag
  protons:  time_tag + energy

Severity: G-scale on product_id for K0[5-9][AW] alerts (G1-G5 -> 1-4),
G-scale on Kp for kindex, 0 for protons (raw flux carried in event.data).

No geo on any SWPC events (centroid=None, regions=[], primary_region=None).
No fall-off detection for alerts -- a single 115-row sample cannot confirm
whether alerts disappear from the upstream JSON when expired; deferred to
a later pass after 24h of observation.

CENTRAL_SPACE stream seeded with 7-day retention / 1 GiB max_bytes, mirroring
CENTRAL_FIRE / CENTRAL_QUAKE. STREAM_SUBJECTS, archive STREAMS, and
DASHBOARD_STREAMS each pick up the new stream.

Tests: 16 new cases in tests/test_swpc.py using real-shape frozen JSON
fixtures (alerts product_ids EF3A/K05A/K07A; kindex Kp boundaries; protons
composite dedup). Two existing tests updated for the new stream count
(test_archive_multi_stream.test_streams_list_has_three_entries renamed to
_has_four_entries; test_dashboard expects 5 streams not 4); added a
test_streams_contains_central_space companion.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 05:55:29 +00:00

339 lines
12 KiB
Python

"""Tests for NOAA SWPC space weather adapters."""
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event
# Frozen fixtures captured from upstream feeds; real shapes.
SAMPLE_ALERTS = [
{
"product_id": "EF3A",
"issue_datetime": "2026-05-19 05:14:59.780",
"message": (
"Space Weather Message Code: ALTEF3\r\nSerial Number: 3689\r\n"
"Issue Time: 2026 May 19 0514 UTC\r\n\r\n"
"ALERT: Electron 2MeV Integral Flux exceeded 1000pfu \n"
"Threshold Reached: 2026 May 16 1740 UTC\n"
"Station: GOES-19\n"
),
},
{
"product_id": "K05A",
"issue_datetime": "2026-05-15 14:30:00.000",
"message": (
"Space Weather Message Code: ALTK05\r\nSerial Number: 100\r\n"
"Issue Time: 2026 May 15 1430 UTC\r\n\r\n"
"ALERT: Geomagnetic K-index of 5\n"
),
},
{
"product_id": "K07A",
"issue_datetime": "2026-05-15 18:00:00.000",
"message": "Space Weather Message Code: ALTK07\r\nSerial Number: 101\r\n",
},
]
SAMPLE_KINDEX = [
{"time_tag": "2026-05-12T00:00:00", "Kp": 0.67, "a_running": 3, "station_count": 8},
{"time_tag": "2026-05-12T03:00:00", "Kp": 5.33, "a_running": 30, "station_count": 8},
{"time_tag": "2026-05-12T06:00:00", "Kp": 8.0, "a_running": 100, "station_count": 8},
]
SAMPLE_PROTONS = [
{"time_tag": "2026-05-18T05:35:00Z", "satellite": 19, "flux": 7.09, "energy": ">=1 MeV"},
{"time_tag": "2026-05-18T05:35:00Z", "satellite": 19, "flux": 0.21, "energy": ">=10 MeV"},
{"time_tag": "2026-05-18T05:40:00Z", "satellite": 19, "flux": 7.10, "energy": ">=1 MeV"},
]
def _config(name: str, cadence: int) -> AdapterConfig:
return AdapterConfig(
name=name,
enabled=True,
cadence_s=cadence,
settings={},
updated_at=datetime.now(timezone.utc),
)
class TestSWPCCommon:
"""Tests for swpc_common helpers."""
def test_parse_swpc_timestamp_alerts(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-19 05:14:59.780", "alerts")
assert dt == datetime(2026, 5, 19, 5, 14, 59, 780000, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_alerts_no_fraction(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-19 05:14:59", "alerts")
assert dt == datetime(2026, 5, 19, 5, 14, 59, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_kindex(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-12T03:00:00", "kindex")
assert dt == datetime(2026, 5, 12, 3, 0, 0, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_protons(self):
from central.adapters.swpc_common import parse_swpc_timestamp
dt = parse_swpc_timestamp("2026-05-18T05:35:00Z", "protons")
assert dt == datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc)
def test_parse_swpc_timestamp_empty(self):
from central.adapters.swpc_common import parse_swpc_timestamp
assert parse_swpc_timestamp("", "alerts") is None
assert parse_swpc_timestamp(None, "alerts") is None
def test_severity_from_kp_boundaries(self):
from central.adapters.swpc_common import severity_from_kp
assert severity_from_kp(None) == 0
assert severity_from_kp(0) == 0
assert severity_from_kp(4.5) == 0
assert severity_from_kp(4.9) == 0
assert severity_from_kp(5.0) == 1
assert severity_from_kp(5.99) == 1
assert severity_from_kp(6.0) == 2
assert severity_from_kp(6.99) == 2
assert severity_from_kp(7.0) == 3
assert severity_from_kp(7.99) == 3
assert severity_from_kp(8.0) == 4
assert severity_from_kp(9.0) == 4
def test_severity_from_alert_product_id(self):
from central.adapters.swpc_common import severity_from_alert_product_id
assert severity_from_alert_product_id(None) == 0
assert severity_from_alert_product_id("") == 0
assert severity_from_alert_product_id("EF3A") == 0
assert severity_from_alert_product_id("BHIS") == 0
assert severity_from_alert_product_id("K04A") == 0
assert severity_from_alert_product_id("K05A") == 1
assert severity_from_alert_product_id("K05W") == 1
assert severity_from_alert_product_id("K06A") == 2
assert severity_from_alert_product_id("K07A") == 3
assert severity_from_alert_product_id("K08A") == 4
assert severity_from_alert_product_id("K09A") == 4
class TestSWPCAlertsAdapter:
"""Tests for SWPCAlertsAdapter."""
@pytest.mark.asyncio
async def test_alerts_normalization(self, tmp_path: Path):
from central.adapters.swpc_alerts import SWPCAlertsAdapter
adapter = SWPCAlertsAdapter(
_config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_ALERTS)
await adapter.startup()
events: list[Event] = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 3
ef3a = events[0]
assert ef3a.adapter == "swpc_alerts"
assert ef3a.category == "space.alert"
assert ef3a.id == "EF3A|2026-05-19 05:14:59.780"
assert ef3a.time == datetime(2026, 5, 19, 5, 14, 59, 780000, tzinfo=timezone.utc)
assert ef3a.severity == 0
assert ef3a.data["product_id"] == "EF3A"
assert ef3a.geo.centroid is None
assert ef3a.geo.regions == []
assert ef3a.geo.primary_region is None
k05a = events[1]
assert k05a.severity == 1
k07a = events[2]
assert k07a.severity == 3
@pytest.mark.asyncio
async def test_alerts_dedup(self, tmp_path: Path):
from central.adapters.swpc_alerts import SWPCAlertsAdapter
adapter = SWPCAlertsAdapter(
_config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_ALERTS)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 3
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_alerts_subject_for(self, tmp_path: Path):
from central.adapters.swpc_alerts import SWPCAlertsAdapter
from central.models import Geo
adapter = SWPCAlertsAdapter(
_config("swpc_alerts", 300), MagicMock(), tmp_path / "cursors.db"
)
event = Event(
id="EF3A|2026-05-19 05:14:59.780",
adapter="swpc_alerts",
category="space.alert",
time=datetime(2026, 5, 19, 5, 14, 59, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={"product_id": "EF3A"},
)
assert adapter.subject_for(event) == "central.space.alert.ef3a"
event_k = Event(
id="K05A|...",
adapter="swpc_alerts",
category="space.alert",
time=datetime(2026, 5, 15, tzinfo=timezone.utc),
severity=1,
geo=Geo(),
data={"product_id": "K05A"},
)
assert adapter.subject_for(event_k) == "central.space.alert.k05a"
class TestSWPCKindexAdapter:
"""Tests for SWPCKindexAdapter."""
@pytest.mark.asyncio
async def test_kindex_normalization(self, tmp_path: Path):
from central.adapters.swpc_kindex import SWPCKindexAdapter
adapter = SWPCKindexAdapter(
_config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_KINDEX)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 3
quiet, g1, g4 = events
assert quiet.category == "space.kindex"
assert quiet.id == "2026-05-12T00:00:00"
assert quiet.severity == 0
assert quiet.data["Kp"] == 0.67
assert g1.severity == 1
assert g4.severity == 4
assert g4.time == datetime(2026, 5, 12, 6, 0, 0, tzinfo=timezone.utc)
@pytest.mark.asyncio
async def test_kindex_dedup(self, tmp_path: Path):
from central.adapters.swpc_kindex import SWPCKindexAdapter
adapter = SWPCKindexAdapter(
_config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_KINDEX)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 3
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_kindex_subject_for(self, tmp_path: Path):
from central.adapters.swpc_kindex import SWPCKindexAdapter
from central.models import Geo
adapter = SWPCKindexAdapter(
_config("swpc_kindex", 600), MagicMock(), tmp_path / "cursors.db"
)
event = Event(
id="2026-05-12T03:00:00",
adapter="swpc_kindex",
category="space.kindex",
time=datetime(2026, 5, 12, 3, tzinfo=timezone.utc),
severity=1,
geo=Geo(),
data={"Kp": 5.33},
)
assert adapter.subject_for(event) == "central.space.kindex"
class TestSWPCProtonsAdapter:
"""Tests for SWPCProtonsAdapter."""
@pytest.mark.asyncio
async def test_protons_normalization(self, tmp_path: Path):
from central.adapters.swpc_protons import SWPCProtonsAdapter
adapter = SWPCProtonsAdapter(
_config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_PROTONS)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 3
first = events[0]
assert first.category == "space.proton_flux"
assert first.id == "2026-05-18T05:35:00Z|>=1 MeV"
assert first.severity == 0
assert first.data["energy"] == ">=1 MeV"
assert first.data["flux"] == 7.09
assert first.time == datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc)
assert first.geo.centroid is None
assert first.geo.regions == []
# Same time_tag, different energy -> distinct event_id
assert events[1].id == "2026-05-18T05:35:00Z|>=10 MeV"
@pytest.mark.asyncio
async def test_protons_dedup(self, tmp_path: Path):
from central.adapters.swpc_protons import SWPCProtonsAdapter
adapter = SWPCProtonsAdapter(
_config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db"
)
adapter._fetch = AsyncMock(return_value=SAMPLE_PROTONS)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 3
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_protons_subject_for(self, tmp_path: Path):
from central.adapters.swpc_protons import SWPCProtonsAdapter
from central.models import Geo
adapter = SWPCProtonsAdapter(
_config("swpc_protons", 600), MagicMock(), tmp_path / "cursors.db"
)
event = Event(
id="2026-05-18T05:35:00Z|>=10 MeV",
adapter="swpc_protons",
category="space.proton_flux",
time=datetime(2026, 5, 18, 5, 35, 0, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={"energy": ">=10 MeV", "flux": 0.21},
)
assert adapter.subject_for(event) == "central.space.proton_flux"