2026-05-17 19:29:38 +00:00
|
|
|
"""Tests for multi-stream archive consumer."""
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
|
|
|
|
|
|
from central.archive import (
|
|
|
|
|
STREAMS,
|
|
|
|
|
consumer_name_for,
|
|
|
|
|
ArchiveConsumer,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestConsumerNaming:
|
|
|
|
|
"""Test consumer naming convention."""
|
|
|
|
|
|
|
|
|
|
def test_consumer_name_for_central_wx(self):
|
|
|
|
|
"""Consumer name for CENTRAL_WX is archive-central_wx."""
|
|
|
|
|
assert consumer_name_for("CENTRAL_WX") == "archive-central_wx"
|
|
|
|
|
|
|
|
|
|
def test_consumer_name_for_central_fire(self):
|
|
|
|
|
"""Consumer name for CENTRAL_FIRE is archive-central_fire."""
|
|
|
|
|
assert consumer_name_for("CENTRAL_FIRE") == "archive-central_fire"
|
|
|
|
|
|
|
|
|
|
def test_consumer_name_for_central_quake(self):
|
|
|
|
|
"""Consumer name for CENTRAL_QUAKE is archive-central_quake."""
|
|
|
|
|
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestStreamsConfiguration:
|
|
|
|
|
"""Test streams configuration."""
|
|
|
|
|
|
feat(2-E): GDACS disaster adapter
Adds the GDACS (Global Disaster Alert and Coordination System) adapter
against the self-describing framework. Polls https://www.gdacs.org/xml/rss.xml
every 600s, parses the RSS items, and publishes to a new CENTRAL_DISASTER
JetStream stream on central.disaster.<eventtype_lower>.<country_lower>.
Locked decisions:
- Keep: WF, DR, FL, VO, TC. Drop: EQ (USGS canonical on central.quake.>),
plus any future-unknown eventtype.
- Filter via settings_schema event_types: list[str] so operators can
re-allow without a code change.
- Dedup by RSS guid (format <eventtype><eventid>, stable across reissue).
- Severity from gdacs:alertlevel (Green=1, Orange=2, Red=3, default 0).
- Fall-off uses GDACS gdacs:iscurrent=false as explicit tombstone signal,
with a fallback for items that vanish entirely from the feed. Tombstones
publish on disaster.removed.<eventtype>.<country>.
- Geo: centroid from geo:Point, bbox from gdacs:bbox (reordered to Geo
(minLon, minLat, maxLon, maxLat)), primary_region from gdacs:iso3.
CENTRAL_DISASTER stream: 7d retention, 1 GiB max_bytes, mirroring
CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE. Migrations 020 (adapter row,
enabled=false, default event_types in settings) and 021 (stream seed).
STREAM_SUBJECTS, archive STREAMS, GUI DASHBOARD_STREAMS each pick up
the new stream.
Tests: 14 new in tests/test_gdacs.py using frozen RSS fixtures with WF/DR/EQ/XX
items (covering normalization, EQ drop, unknown drop, settings override,
guid dedup, iscurrent=false tombstone, missing-from-feed tombstone,
helper boundaries). Stream-count assertions bumped 4->5 and 5->6 for
the new stream (anti-pattern noted; queued as a follow-up PR E.5).
+1 membership test test_streams_contains_central_disaster.
Full suite: 426 passed.
End-to-end on CT104: 48 events published on first poll (44 disaster.wf +
4 disaster.fl), zero EQ events, all subjects under central.disaster.>
with lowercase-hyphenated country suffixes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 06:58:52 +00:00
|
|
|
def test_streams_list_has_five_entries(self):
|
|
|
|
|
"""STREAMS list has five event-bearing streams."""
|
|
|
|
|
assert len(STREAMS) == 5
|
2026-05-17 19:29:38 +00:00
|
|
|
|
|
|
|
|
def test_streams_contains_central_wx(self):
|
|
|
|
|
"""STREAMS contains CENTRAL_WX with correct filter."""
|
|
|
|
|
assert ("CENTRAL_WX", "central.wx.>") in STREAMS
|
|
|
|
|
|
|
|
|
|
def test_streams_contains_central_fire(self):
|
|
|
|
|
"""STREAMS contains CENTRAL_FIRE with correct filter."""
|
|
|
|
|
assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
|
|
|
|
|
|
|
|
|
|
def test_streams_contains_central_quake(self):
|
|
|
|
|
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
|
|
|
|
|
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
|
|
|
|
|
|
feat(2-D): add NOAA SWPC space weather adapters (alerts, kindex, protons)
Three independent adapters sharing src/central/adapters/swpc_common.py,
mirroring the WFIGS two-adapter pattern. Each adapter has its own row in
config.adapters (ships disabled), its own cadence, and its own dedup
state, so operators can independently enable/disable and so a broken
upstream endpoint does not silently mask a healthy one.
Subjects:
swpc_alerts -> central.space.alert.<product_id_lower>
swpc_kindex -> central.space.kindex
swpc_protons -> central.space.proton_flux
Dedup keys:
alerts: product_id + issue_datetime
kindex: time_tag
protons: time_tag + energy
Severity: G-scale on product_id for K0[5-9][AW] alerts (G1-G5 -> 1-4),
G-scale on Kp for kindex, 0 for protons (raw flux carried in event.data).
No geo on any SWPC events (centroid=None, regions=[], primary_region=None).
No fall-off detection for alerts -- a single 115-row sample cannot confirm
whether alerts disappear from the upstream JSON when expired; deferred to
a later pass after 24h of observation.
CENTRAL_SPACE stream seeded with 7-day retention / 1 GiB max_bytes, mirroring
CENTRAL_FIRE / CENTRAL_QUAKE. STREAM_SUBJECTS, archive STREAMS, and
DASHBOARD_STREAMS each pick up the new stream.
Tests: 16 new cases in tests/test_swpc.py using real-shape frozen JSON
fixtures (alerts product_ids EF3A/K05A/K07A; kindex Kp boundaries; protons
composite dedup). Two existing tests updated for the new stream count
(test_archive_multi_stream.test_streams_list_has_three_entries renamed to
_has_four_entries; test_dashboard expects 5 streams not 4); added a
test_streams_contains_central_space companion.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 05:55:29 +00:00
|
|
|
def test_streams_contains_central_space(self):
|
|
|
|
|
"""STREAMS contains CENTRAL_SPACE with correct filter."""
|
|
|
|
|
assert ("CENTRAL_SPACE", "central.space.>") in STREAMS
|
|
|
|
|
|
feat(2-E): GDACS disaster adapter
Adds the GDACS (Global Disaster Alert and Coordination System) adapter
against the self-describing framework. Polls https://www.gdacs.org/xml/rss.xml
every 600s, parses the RSS items, and publishes to a new CENTRAL_DISASTER
JetStream stream on central.disaster.<eventtype_lower>.<country_lower>.
Locked decisions:
- Keep: WF, DR, FL, VO, TC. Drop: EQ (USGS canonical on central.quake.>),
plus any future-unknown eventtype.
- Filter via settings_schema event_types: list[str] so operators can
re-allow without a code change.
- Dedup by RSS guid (format <eventtype><eventid>, stable across reissue).
- Severity from gdacs:alertlevel (Green=1, Orange=2, Red=3, default 0).
- Fall-off uses GDACS gdacs:iscurrent=false as explicit tombstone signal,
with a fallback for items that vanish entirely from the feed. Tombstones
publish on disaster.removed.<eventtype>.<country>.
- Geo: centroid from geo:Point, bbox from gdacs:bbox (reordered to Geo
(minLon, minLat, maxLon, maxLat)), primary_region from gdacs:iso3.
CENTRAL_DISASTER stream: 7d retention, 1 GiB max_bytes, mirroring
CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE. Migrations 020 (adapter row,
enabled=false, default event_types in settings) and 021 (stream seed).
STREAM_SUBJECTS, archive STREAMS, GUI DASHBOARD_STREAMS each pick up
the new stream.
Tests: 14 new in tests/test_gdacs.py using frozen RSS fixtures with WF/DR/EQ/XX
items (covering normalization, EQ drop, unknown drop, settings override,
guid dedup, iscurrent=false tombstone, missing-from-feed tombstone,
helper boundaries). Stream-count assertions bumped 4->5 and 5->6 for
the new stream (anti-pattern noted; queued as a follow-up PR E.5).
+1 membership test test_streams_contains_central_disaster.
Full suite: 426 passed.
End-to-end on CT104: 48 events published on first poll (44 disaster.wf +
4 disaster.fl), zero EQ events, all subjects under central.disaster.>
with lowercase-hyphenated country suffixes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 06:58:52 +00:00
|
|
|
def test_streams_contains_central_disaster(self):
|
|
|
|
|
"""STREAMS contains CENTRAL_DISASTER with correct filter."""
|
|
|
|
|
assert ("CENTRAL_DISASTER", "central.disaster.>") in STREAMS
|
|
|
|
|
|
2026-05-17 19:29:38 +00:00
|
|
|
def test_streams_excludes_central_meta(self):
|
|
|
|
|
"""STREAMS does not contain CENTRAL_META (status messages only)."""
|
|
|
|
|
stream_names = [s[0] for s in STREAMS]
|
|
|
|
|
assert "CENTRAL_META" not in stream_names
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestOrphanedConsumerCleanup:
|
|
|
|
|
"""Test cleanup of orphaned 'archive' consumer."""
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_cleanup_removes_orphaned_consumer_when_exists(self):
|
|
|
|
|
"""Cleanup removes 'archive' consumer from CENTRAL_WX when it exists."""
|
|
|
|
|
consumer = ArchiveConsumer(
|
|
|
|
|
nats_url="nats://localhost:4222",
|
|
|
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
mock_js = AsyncMock()
|
|
|
|
|
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
|
|
|
|
mock_js.delete_consumer = AsyncMock()
|
|
|
|
|
consumer._js = mock_js
|
|
|
|
|
|
|
|
|
|
await consumer._cleanup_orphaned_consumer()
|
|
|
|
|
|
|
|
|
|
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
|
|
|
|
mock_js.delete_consumer.assert_called_once_with("CENTRAL_WX", "archive")
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_cleanup_handles_not_found_gracefully(self):
|
|
|
|
|
"""Cleanup handles NotFoundError when 'archive' consumer doesn't exist."""
|
|
|
|
|
from nats.js.errors import NotFoundError
|
|
|
|
|
|
|
|
|
|
consumer = ArchiveConsumer(
|
|
|
|
|
nats_url="nats://localhost:4222",
|
|
|
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
mock_js = AsyncMock()
|
|
|
|
|
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
|
|
|
|
mock_js.delete_consumer = AsyncMock()
|
|
|
|
|
consumer._js = mock_js
|
|
|
|
|
|
|
|
|
|
# Should not raise
|
|
|
|
|
await consumer._cleanup_orphaned_consumer()
|
|
|
|
|
|
|
|
|
|
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
|
|
|
|
mock_js.delete_consumer.assert_not_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestEnsureConsumer:
|
|
|
|
|
"""Test consumer creation for each stream."""
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_ensure_consumer_creates_when_not_exists(self):
|
|
|
|
|
"""_ensure_consumer creates consumer when it doesn't exist."""
|
|
|
|
|
from nats.js.errors import NotFoundError
|
|
|
|
|
|
|
|
|
|
consumer = ArchiveConsumer(
|
|
|
|
|
nats_url="nats://localhost:4222",
|
|
|
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
mock_js = AsyncMock()
|
|
|
|
|
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
|
|
|
|
mock_js.add_consumer = AsyncMock()
|
|
|
|
|
consumer._js = mock_js
|
|
|
|
|
|
|
|
|
|
await consumer._ensure_consumer(
|
|
|
|
|
"CENTRAL_FIRE", "central.fire.>", "archive-central_fire"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
mock_js.consumer_info.assert_called_once_with(
|
|
|
|
|
"CENTRAL_FIRE", "archive-central_fire"
|
|
|
|
|
)
|
|
|
|
|
mock_js.add_consumer.assert_called_once()
|
|
|
|
|
# Verify the consumer config
|
|
|
|
|
call_args = mock_js.add_consumer.call_args
|
|
|
|
|
assert call_args[0][0] == "CENTRAL_FIRE"
|
|
|
|
|
config = call_args[0][1]
|
|
|
|
|
assert config.durable_name == "archive-central_fire"
|
|
|
|
|
assert config.filter_subject == "central.fire.>"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_ensure_consumer_skips_when_exists(self):
|
|
|
|
|
"""_ensure_consumer does nothing when consumer already exists."""
|
|
|
|
|
consumer = ArchiveConsumer(
|
|
|
|
|
nats_url="nats://localhost:4222",
|
|
|
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
mock_js = AsyncMock()
|
|
|
|
|
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
|
|
|
|
mock_js.add_consumer = AsyncMock()
|
|
|
|
|
consumer._js = mock_js
|
|
|
|
|
|
|
|
|
|
await consumer._ensure_consumer(
|
|
|
|
|
"CENTRAL_QUAKE", "central.quake.>", "archive-central_quake"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
mock_js.consumer_info.assert_called_once_with(
|
|
|
|
|
"CENTRAL_QUAKE", "archive-central_quake"
|
|
|
|
|
)
|
|
|
|
|
mock_js.add_consumer.assert_not_called()
|