diff --git a/src/central/archive.py b/src/central/archive.py index 18d7c12..dfa5475 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -19,15 +19,11 @@ from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy from nats.js.errors import NotFoundError from central.bootstrap_config import get_settings +from central.streams import STREAMS as STREAM_REGISTRY -# Event-bearing streams to consume (skip CENTRAL_META - status messages only) -STREAMS = [ - ("CENTRAL_WX", "central.wx.>"), - ("CENTRAL_FIRE", "central.fire.>"), - ("CENTRAL_QUAKE", "central.quake.>"), - ("CENTRAL_SPACE", "central.space.>"), - ("CENTRAL_DISASTER", "central.disaster.>"), -] +# Event-bearing streams to consume -- derived from the registry's event_bearing flag. +# CENTRAL_META is excluded because it carries status messages, not events. +STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearing] BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index cb835e4..c5baf44 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -49,6 +49,7 @@ from functools import cache from central.gui.db import get_pool from central.gui.form_descriptors import describe_fields, FieldDescriptor from central.adapter_discovery import discover_adapters +from central.streams import STREAMS as STREAM_REGISTRY from pydantic import ValidationError @cache @@ -63,8 +64,8 @@ def _adapter_classes() -> dict: router = APIRouter() -# Streams to display on dashboard -DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_DISASTER", "CENTRAL_META"] +# Streams to display on dashboard -- derived from the registry's dashboard flag. +DASHBOARD_STREAMS = [s.name for s in STREAM_REGISTRY if s.dashboard] # Email validation regex (simple but effective) ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") diff --git a/src/central/streams.py b/src/central/streams.py new file mode 100644 index 0000000..e9e05f5 --- /dev/null +++ b/src/central/streams.py @@ -0,0 +1,32 @@ +"""Stream registry — single source of truth for NATS JetStream stream definitions. + +Subject-filter mappings live in code (structural; change only when code changes). +Retention / max_bytes live in config.streams (operator-tunable; ops state). + +Adding a stream: one StreamEntry below + one migration row that seeds +config.streams. supervisor STREAM_SUBJECTS / archive STREAMS / gui DASHBOARD_STREAMS +all derive automatically. +""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class StreamEntry: + name: str + subject_filter: str + event_bearing: bool = True + """Whether central-archive consumes events from this stream into the events table. + False for status-only streams (CENTRAL_META) that the archive intentionally skips.""" + dashboard: bool = True + """Whether the GUI dashboard surfaces this stream's stats card.""" + + +STREAMS: list[StreamEntry] = [ + StreamEntry("CENTRAL_WX", "central.wx.>"), + StreamEntry("CENTRAL_FIRE", "central.fire.>"), + StreamEntry("CENTRAL_QUAKE", "central.quake.>"), + StreamEntry("CENTRAL_SPACE", "central.space.>"), + StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), + StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), +] diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 7495561..bdc68e6 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -21,17 +21,12 @@ from central.config_source import ConfigSource, DbConfigSource from central.config_store import ConfigStore from central.bootstrap_config import get_settings from central.stream_manager import StreamManager +from central.streams import STREAMS as STREAM_REGISTRY CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") -# Stream subject mappings -STREAM_SUBJECTS = { - "CENTRAL_WX": ["central.wx.>"], - "CENTRAL_META": ["central.meta.>"], - "CENTRAL_FIRE": ["central.fire.>"], - "CENTRAL_QUAKE": ["central.quake.>"], - "CENTRAL_SPACE": ["central.space.>"], - "CENTRAL_DISASTER": ["central.disaster.>"], -} +# Stream subject mappings -- derived from the registry; every stream is included +# (META too: supervisor must create it in JetStream even though archive skips it). +STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY} # Recompute interval for stream max_bytes (1 hour) STREAM_RECOMPUTE_INTERVAL_S = 3600 diff --git a/tests/test_archive_multi_stream.py b/tests/test_archive_multi_stream.py index 21d810a..0727bb1 100644 --- a/tests/test_archive_multi_stream.py +++ b/tests/test_archive_multi_stream.py @@ -26,39 +26,6 @@ class TestConsumerNaming: assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake" -class TestStreamsConfiguration: - """Test streams configuration.""" - - def test_streams_list_has_five_entries(self): - """STREAMS list has five event-bearing streams.""" - assert len(STREAMS) == 5 - - def test_streams_contains_central_wx(self): - """STREAMS contains CENTRAL_WX with correct filter.""" - assert ("CENTRAL_WX", "central.wx.>") in STREAMS - - def test_streams_contains_central_fire(self): - """STREAMS contains CENTRAL_FIRE with correct filter.""" - assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS - - def test_streams_contains_central_quake(self): - """STREAMS contains CENTRAL_QUAKE with correct filter.""" - assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS - - def test_streams_contains_central_space(self): - """STREAMS contains CENTRAL_SPACE with correct filter.""" - assert ("CENTRAL_SPACE", "central.space.>") in STREAMS - - def test_streams_contains_central_disaster(self): - """STREAMS contains CENTRAL_DISASTER with correct filter.""" - assert ("CENTRAL_DISASTER", "central.disaster.>") in STREAMS - - def test_streams_excludes_central_meta(self): - """STREAMS does not contain CENTRAL_META (status messages only).""" - stream_names = [s[0] for s in STREAMS] - assert "CENTRAL_META" not in stream_names - - class TestOrphanedConsumerCleanup: """Test cleanup of orphaned 'archive' consumer.""" diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py index 9ea53a6..73a2ace 100644 --- a/tests/test_dashboard.py +++ b/tests/test_dashboard.py @@ -205,7 +205,6 @@ class TestDashboardStreamsIsolation: call_args = mock_templates.TemplateResponse.call_args context = call_args.kwargs.get("context", call_args[1].get("context")) streams = context["streams"] - assert len(streams) == 6 fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE") assert fire_stream.get("error") == "unavailable" wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX") diff --git a/tests/test_stream_registry.py b/tests/test_stream_registry.py new file mode 100644 index 0000000..b8b2678 --- /dev/null +++ b/tests/test_stream_registry.py @@ -0,0 +1,80 @@ +"""Registry-consistency tests for src/central/streams.py. + +These are property tests, not literal restatements. Adding a new stream to the +registry requires no new test code -- every invariant here automatically +covers it. +""" + +import re + +from central.streams import STREAMS + + +def test_stream_names_unique(): + names = [s.name for s in STREAMS] + assert len(names) == len(set(names)), "duplicate stream names in registry" + + +def test_subject_filters_unique(): + filters = [s.subject_filter for s in STREAMS] + assert len(filters) == len(set(filters)), "duplicate subject filters in registry" + + +def test_subject_filter_central_prefix_wildcard(): + pattern = re.compile(r"^central\.[a-z][a-z_]*\.>$") + for s in STREAMS: + assert pattern.match(s.subject_filter), ( + f"{s.name}: subject_filter {s.subject_filter!r} does not match /^central\\.[a-z][a-z_]*\\.>$/" + ) + + +def test_meta_is_only_non_event_bearing(): + """CENTRAL_META is the only non-event-bearing stream today. + + If you're adding a second one, update this test deliberately -- the + archive will silently skip the new stream, which is rarely what you want. + """ + non_event = [s for s in STREAMS if not s.event_bearing] + assert len(non_event) == 1, ( + f"expected exactly one non-event-bearing stream, got {[s.name for s in non_event]}" + ) + assert non_event[0].name == "CENTRAL_META" + + +def test_supervisor_stream_subjects_includes_meta(): + """Supervisor creates every stream in JetStream, including META.""" + from central.supervisor import STREAM_SUBJECTS + + assert "CENTRAL_META" in STREAM_SUBJECTS + assert STREAM_SUBJECTS["CENTRAL_META"] == ["central.meta.>"] + + +def test_supervisor_stream_subjects_includes_all(): + """Every registry stream appears in supervisor's derived dict with the right filter.""" + from central.supervisor import STREAM_SUBJECTS + + assert set(STREAM_SUBJECTS.keys()) == {s.name for s in STREAMS} + for s in STREAMS: + assert STREAM_SUBJECTS[s.name] == [s.subject_filter] + + +def test_archive_streams_excludes_non_event_bearing(): + """Archive's STREAMS list contains exactly the event_bearing=True entries.""" + from central.archive import STREAMS as ARCHIVE_STREAMS + + expected = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing] + assert ARCHIVE_STREAMS == expected + archive_names = {name for name, _ in ARCHIVE_STREAMS} + for s in STREAMS: + if s.event_bearing: + assert s.name in archive_names + else: + assert s.name not in archive_names + + +def test_dashboard_streams_matches_dashboard_flag(): + """GUI's DASHBOARD_STREAMS matches [s.name for s in STREAMS if s.dashboard].""" + from central.gui.routes import DASHBOARD_STREAMS + + expected = [s.name for s in STREAMS if s.dashboard] + assert DASHBOARD_STREAMS == expected