mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Eliminates the duplication that has been hand-bumped through PRs B, C, D, E.
Adding a stream is now one StreamEntry in src/central/streams.py + one
migration row in config.streams. supervisor STREAM_SUBJECTS / archive
STREAMS / gui DASHBOARD_STREAMS all derive at import time. No drift
possible because there is one source.
Pure refactor; no behavior change. Runtime verified: derived structures
are byte-equivalent to the previous literal definitions.
src/central/streams.py (new):
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True # archive consumes from this stream
dashboard: bool = True # GUI dashboard surfaces this stream
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
Consumers derive:
supervisor.STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAMS}
(includes META: supervisor must create every stream in JetStream)
archive.STREAMS = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing]
(excludes META: status messages, not events)
gui.DASHBOARD_STREAMS = [s.name for s in STREAMS if s.dashboard]
To resolve the name collision between the registry STREAMS and the
existing archive.STREAMS public symbol, archive.py imports the registry
under an alias: from central.streams import STREAMS as STREAM_REGISTRY.
The archives STREAMS surface (the tuple-list) is unchanged for callers.
Same alias used in supervisor.py and gui/routes.py for symmetry.
Migration files unchanged. config.streams keeps seeding retention/bytes --
operator-tunable ops state, separate SoT from the structural mapping.
Tests:
Dropped from test_archive_multi_stream.py (7, all tautological vs. registry):
test_streams_list_has_five_entries (magic-number count)
test_streams_contains_central_wx / fire / quake / space / disaster
test_streams_excludes_central_meta
Dropped from test_dashboard.py:
`assert len(streams) == 6` line inside test_single_stream_failure_doesnt_crash_card
(the test itself stays; only the magic-number assertion is removed)
Added in test_stream_registry.py (8 invariant tests):
test_stream_names_unique
test_subject_filters_unique
test_subject_filter_central_prefix_wildcard
test_meta_is_only_non_event_bearing
test_supervisor_stream_subjects_includes_meta
test_supervisor_stream_subjects_includes_all
test_archive_streams_excludes_non_event_bearing
test_dashboard_streams_matches_dashboard_flag
The new tests assert properties (uniqueness, format, derivation correctness),
not literals. Future stream additions need zero new test code -- every
invariant automatically covers them.
Note: test file named tests/test_stream_registry.py (not test_streams.py)
to avoid colliding with the pre-existing tests/test_streams.py, which
covers the GUI streams-management page.
Full suite: 427 passed (was 426 on main: -7 dropped + 8 added).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
125 lines
4.3 KiB
Python
125 lines
4.3 KiB
Python
"""Tests for multi-stream archive consumer."""
|
|
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from central.archive import (
|
|
STREAMS,
|
|
consumer_name_for,
|
|
ArchiveConsumer,
|
|
)
|
|
|
|
|
|
class TestConsumerNaming:
|
|
"""Test consumer naming convention."""
|
|
|
|
def test_consumer_name_for_central_wx(self):
|
|
"""Consumer name for CENTRAL_WX is archive-central_wx."""
|
|
assert consumer_name_for("CENTRAL_WX") == "archive-central_wx"
|
|
|
|
def test_consumer_name_for_central_fire(self):
|
|
"""Consumer name for CENTRAL_FIRE is archive-central_fire."""
|
|
assert consumer_name_for("CENTRAL_FIRE") == "archive-central_fire"
|
|
|
|
def test_consumer_name_for_central_quake(self):
|
|
"""Consumer name for CENTRAL_QUAKE is archive-central_quake."""
|
|
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
|
|
|
|
|
|
class TestOrphanedConsumerCleanup:
|
|
"""Test cleanup of orphaned 'archive' consumer."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_removes_orphaned_consumer_when_exists(self):
|
|
"""Cleanup removes 'archive' consumer from CENTRAL_WX when it exists."""
|
|
consumer = ArchiveConsumer(
|
|
nats_url="nats://localhost:4222",
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
)
|
|
|
|
mock_js = AsyncMock()
|
|
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
|
mock_js.delete_consumer = AsyncMock()
|
|
consumer._js = mock_js
|
|
|
|
await consumer._cleanup_orphaned_consumer()
|
|
|
|
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
|
mock_js.delete_consumer.assert_called_once_with("CENTRAL_WX", "archive")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_handles_not_found_gracefully(self):
|
|
"""Cleanup handles NotFoundError when 'archive' consumer doesn't exist."""
|
|
from nats.js.errors import NotFoundError
|
|
|
|
consumer = ArchiveConsumer(
|
|
nats_url="nats://localhost:4222",
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
)
|
|
|
|
mock_js = AsyncMock()
|
|
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
|
mock_js.delete_consumer = AsyncMock()
|
|
consumer._js = mock_js
|
|
|
|
# Should not raise
|
|
await consumer._cleanup_orphaned_consumer()
|
|
|
|
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
|
mock_js.delete_consumer.assert_not_called()
|
|
|
|
|
|
class TestEnsureConsumer:
|
|
"""Test consumer creation for each stream."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ensure_consumer_creates_when_not_exists(self):
|
|
"""_ensure_consumer creates consumer when it doesn't exist."""
|
|
from nats.js.errors import NotFoundError
|
|
|
|
consumer = ArchiveConsumer(
|
|
nats_url="nats://localhost:4222",
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
)
|
|
|
|
mock_js = AsyncMock()
|
|
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
|
mock_js.add_consumer = AsyncMock()
|
|
consumer._js = mock_js
|
|
|
|
await consumer._ensure_consumer(
|
|
"CENTRAL_FIRE", "central.fire.>", "archive-central_fire"
|
|
)
|
|
|
|
mock_js.consumer_info.assert_called_once_with(
|
|
"CENTRAL_FIRE", "archive-central_fire"
|
|
)
|
|
mock_js.add_consumer.assert_called_once()
|
|
# Verify the consumer config
|
|
call_args = mock_js.add_consumer.call_args
|
|
assert call_args[0][0] == "CENTRAL_FIRE"
|
|
config = call_args[0][1]
|
|
assert config.durable_name == "archive-central_fire"
|
|
assert config.filter_subject == "central.fire.>"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ensure_consumer_skips_when_exists(self):
|
|
"""_ensure_consumer does nothing when consumer already exists."""
|
|
consumer = ArchiveConsumer(
|
|
nats_url="nats://localhost:4222",
|
|
postgres_dsn="postgresql://test:test@localhost/test",
|
|
)
|
|
|
|
mock_js = AsyncMock()
|
|
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
|
mock_js.add_consumer = AsyncMock()
|
|
consumer._js = mock_js
|
|
|
|
await consumer._ensure_consumer(
|
|
"CENTRAL_QUAKE", "central.quake.>", "archive-central_quake"
|
|
)
|
|
|
|
mock_js.consumer_info.assert_called_once_with(
|
|
"CENTRAL_QUAKE", "archive-central_quake"
|
|
)
|
|
mock_js.add_consumer.assert_not_called()
|