mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
feat(2-E.5): single-source-of-truth stream registry
Eliminates the duplication that has been hand-bumped through PRs B, C, D, E.
Adding a stream is now one StreamEntry in src/central/streams.py + one
migration row in config.streams. supervisor STREAM_SUBJECTS / archive
STREAMS / gui DASHBOARD_STREAMS all derive at import time. No drift
possible because there is one source.
Pure refactor; no behavior change. Runtime verified: derived structures
are byte-equivalent to the previous literal definitions.
src/central/streams.py (new):
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True # archive consumes from this stream
dashboard: bool = True # GUI dashboard surfaces this stream
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
Consumers derive:
supervisor.STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAMS}
(includes META: supervisor must create every stream in JetStream)
archive.STREAMS = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing]
(excludes META: status messages, not events)
gui.DASHBOARD_STREAMS = [s.name for s in STREAMS if s.dashboard]
To resolve the name collision between the registry STREAMS and the
existing archive.STREAMS public symbol, archive.py imports the registry
under an alias: from central.streams import STREAMS as STREAM_REGISTRY.
The archives STREAMS surface (the tuple-list) is unchanged for callers.
Same alias used in supervisor.py and gui/routes.py for symmetry.
Migration files unchanged. config.streams keeps seeding retention/bytes --
operator-tunable ops state, separate SoT from the structural mapping.
Tests:
Dropped from test_archive_multi_stream.py (7, all tautological vs. registry):
test_streams_list_has_five_entries (magic-number count)
test_streams_contains_central_wx / fire / quake / space / disaster
test_streams_excludes_central_meta
Dropped from test_dashboard.py:
`assert len(streams) == 6` line inside test_single_stream_failure_doesnt_crash_card
(the test itself stays; only the magic-number assertion is removed)
Added in test_stream_registry.py (8 invariant tests):
test_stream_names_unique
test_subject_filters_unique
test_subject_filter_central_prefix_wildcard
test_meta_is_only_non_event_bearing
test_supervisor_stream_subjects_includes_meta
test_supervisor_stream_subjects_includes_all
test_archive_streams_excludes_non_event_bearing
test_dashboard_streams_matches_dashboard_flag
The new tests assert properties (uniqueness, format, derivation correctness),
not literals. Future stream additions need zero new test code -- every
invariant automatically covers them.
Note: test file named tests/test_stream_registry.py (not test_streams.py)
to avoid colliding with the pre-existing tests/test_streams.py, which
covers the GUI streams-management page.
Full suite: 427 passed (was 426 on main: -7 dropped + 8 added).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f1779e3233
commit
456a744bb4
7 changed files with 123 additions and 53 deletions
|
|
@ -19,15 +19,11 @@ from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
|
||||||
from nats.js.errors import NotFoundError
|
from nats.js.errors import NotFoundError
|
||||||
|
|
||||||
from central.bootstrap_config import get_settings
|
from central.bootstrap_config import get_settings
|
||||||
|
from central.streams import STREAMS as STREAM_REGISTRY
|
||||||
|
|
||||||
# Event-bearing streams to consume (skip CENTRAL_META - status messages only)
|
# Event-bearing streams to consume -- derived from the registry's event_bearing flag.
|
||||||
STREAMS = [
|
# CENTRAL_META is excluded because it carries status messages, not events.
|
||||||
("CENTRAL_WX", "central.wx.>"),
|
STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearing]
|
||||||
("CENTRAL_FIRE", "central.fire.>"),
|
|
||||||
("CENTRAL_QUAKE", "central.quake.>"),
|
|
||||||
("CENTRAL_SPACE", "central.space.>"),
|
|
||||||
("CENTRAL_DISASTER", "central.disaster.>"),
|
|
||||||
]
|
|
||||||
|
|
||||||
BATCH_SIZE = 100
|
BATCH_SIZE = 100
|
||||||
FETCH_TIMEOUT = 5.0
|
FETCH_TIMEOUT = 5.0
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ from functools import cache
|
||||||
from central.gui.db import get_pool
|
from central.gui.db import get_pool
|
||||||
from central.gui.form_descriptors import describe_fields, FieldDescriptor
|
from central.gui.form_descriptors import describe_fields, FieldDescriptor
|
||||||
from central.adapter_discovery import discover_adapters
|
from central.adapter_discovery import discover_adapters
|
||||||
|
from central.streams import STREAMS as STREAM_REGISTRY
|
||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
|
|
||||||
@cache
|
@cache
|
||||||
|
|
@ -63,8 +64,8 @@ def _adapter_classes() -> dict:
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
# Streams to display on dashboard
|
# Streams to display on dashboard -- derived from the registry's dashboard flag.
|
||||||
DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_DISASTER", "CENTRAL_META"]
|
DASHBOARD_STREAMS = [s.name for s in STREAM_REGISTRY if s.dashboard]
|
||||||
|
|
||||||
# Email validation regex (simple but effective)
|
# Email validation regex (simple but effective)
|
||||||
ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$")
|
ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$")
|
||||||
|
|
|
||||||
32
src/central/streams.py
Normal file
32
src/central/streams.py
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
"""Stream registry — single source of truth for NATS JetStream stream definitions.
|
||||||
|
|
||||||
|
Subject-filter mappings live in code (structural; change only when code changes).
|
||||||
|
Retention / max_bytes live in config.streams (operator-tunable; ops state).
|
||||||
|
|
||||||
|
Adding a stream: one StreamEntry below + one migration row that seeds
|
||||||
|
config.streams. supervisor STREAM_SUBJECTS / archive STREAMS / gui DASHBOARD_STREAMS
|
||||||
|
all derive automatically.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class StreamEntry:
|
||||||
|
name: str
|
||||||
|
subject_filter: str
|
||||||
|
event_bearing: bool = True
|
||||||
|
"""Whether central-archive consumes events from this stream into the events table.
|
||||||
|
False for status-only streams (CENTRAL_META) that the archive intentionally skips."""
|
||||||
|
dashboard: bool = True
|
||||||
|
"""Whether the GUI dashboard surfaces this stream's stats card."""
|
||||||
|
|
||||||
|
|
||||||
|
STREAMS: list[StreamEntry] = [
|
||||||
|
StreamEntry("CENTRAL_WX", "central.wx.>"),
|
||||||
|
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
|
||||||
|
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
|
||||||
|
StreamEntry("CENTRAL_SPACE", "central.space.>"),
|
||||||
|
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
|
||||||
|
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
|
||||||
|
]
|
||||||
|
|
@ -21,17 +21,12 @@ from central.config_source import ConfigSource, DbConfigSource
|
||||||
from central.config_store import ConfigStore
|
from central.config_store import ConfigStore
|
||||||
from central.bootstrap_config import get_settings
|
from central.bootstrap_config import get_settings
|
||||||
from central.stream_manager import StreamManager
|
from central.stream_manager import StreamManager
|
||||||
|
from central.streams import STREAMS as STREAM_REGISTRY
|
||||||
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
|
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
|
||||||
|
|
||||||
# Stream subject mappings
|
# Stream subject mappings -- derived from the registry; every stream is included
|
||||||
STREAM_SUBJECTS = {
|
# (META too: supervisor must create it in JetStream even though archive skips it).
|
||||||
"CENTRAL_WX": ["central.wx.>"],
|
STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY}
|
||||||
"CENTRAL_META": ["central.meta.>"],
|
|
||||||
"CENTRAL_FIRE": ["central.fire.>"],
|
|
||||||
"CENTRAL_QUAKE": ["central.quake.>"],
|
|
||||||
"CENTRAL_SPACE": ["central.space.>"],
|
|
||||||
"CENTRAL_DISASTER": ["central.disaster.>"],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Recompute interval for stream max_bytes (1 hour)
|
# Recompute interval for stream max_bytes (1 hour)
|
||||||
STREAM_RECOMPUTE_INTERVAL_S = 3600
|
STREAM_RECOMPUTE_INTERVAL_S = 3600
|
||||||
|
|
|
||||||
|
|
@ -26,39 +26,6 @@ class TestConsumerNaming:
|
||||||
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
|
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
|
||||||
|
|
||||||
|
|
||||||
class TestStreamsConfiguration:
|
|
||||||
"""Test streams configuration."""
|
|
||||||
|
|
||||||
def test_streams_list_has_five_entries(self):
|
|
||||||
"""STREAMS list has five event-bearing streams."""
|
|
||||||
assert len(STREAMS) == 5
|
|
||||||
|
|
||||||
def test_streams_contains_central_wx(self):
|
|
||||||
"""STREAMS contains CENTRAL_WX with correct filter."""
|
|
||||||
assert ("CENTRAL_WX", "central.wx.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_contains_central_fire(self):
|
|
||||||
"""STREAMS contains CENTRAL_FIRE with correct filter."""
|
|
||||||
assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_contains_central_quake(self):
|
|
||||||
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
|
|
||||||
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_contains_central_space(self):
|
|
||||||
"""STREAMS contains CENTRAL_SPACE with correct filter."""
|
|
||||||
assert ("CENTRAL_SPACE", "central.space.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_contains_central_disaster(self):
|
|
||||||
"""STREAMS contains CENTRAL_DISASTER with correct filter."""
|
|
||||||
assert ("CENTRAL_DISASTER", "central.disaster.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_excludes_central_meta(self):
|
|
||||||
"""STREAMS does not contain CENTRAL_META (status messages only)."""
|
|
||||||
stream_names = [s[0] for s in STREAMS]
|
|
||||||
assert "CENTRAL_META" not in stream_names
|
|
||||||
|
|
||||||
|
|
||||||
class TestOrphanedConsumerCleanup:
|
class TestOrphanedConsumerCleanup:
|
||||||
"""Test cleanup of orphaned 'archive' consumer."""
|
"""Test cleanup of orphaned 'archive' consumer."""
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -205,7 +205,6 @@ class TestDashboardStreamsIsolation:
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
call_args = mock_templates.TemplateResponse.call_args
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||||
streams = context["streams"]
|
streams = context["streams"]
|
||||||
assert len(streams) == 6
|
|
||||||
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
|
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
|
||||||
assert fire_stream.get("error") == "unavailable"
|
assert fire_stream.get("error") == "unavailable"
|
||||||
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")
|
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")
|
||||||
|
|
|
||||||
80
tests/test_stream_registry.py
Normal file
80
tests/test_stream_registry.py
Normal file
|
|
@ -0,0 +1,80 @@
|
||||||
|
"""Registry-consistency tests for src/central/streams.py.
|
||||||
|
|
||||||
|
These are property tests, not literal restatements. Adding a new stream to the
|
||||||
|
registry requires no new test code -- every invariant here automatically
|
||||||
|
covers it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
from central.streams import STREAMS
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_names_unique():
|
||||||
|
names = [s.name for s in STREAMS]
|
||||||
|
assert len(names) == len(set(names)), "duplicate stream names in registry"
|
||||||
|
|
||||||
|
|
||||||
|
def test_subject_filters_unique():
|
||||||
|
filters = [s.subject_filter for s in STREAMS]
|
||||||
|
assert len(filters) == len(set(filters)), "duplicate subject filters in registry"
|
||||||
|
|
||||||
|
|
||||||
|
def test_subject_filter_central_prefix_wildcard():
|
||||||
|
pattern = re.compile(r"^central\.[a-z][a-z_]*\.>$")
|
||||||
|
for s in STREAMS:
|
||||||
|
assert pattern.match(s.subject_filter), (
|
||||||
|
f"{s.name}: subject_filter {s.subject_filter!r} does not match /^central\\.[a-z][a-z_]*\\.>$/"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_meta_is_only_non_event_bearing():
|
||||||
|
"""CENTRAL_META is the only non-event-bearing stream today.
|
||||||
|
|
||||||
|
If you're adding a second one, update this test deliberately -- the
|
||||||
|
archive will silently skip the new stream, which is rarely what you want.
|
||||||
|
"""
|
||||||
|
non_event = [s for s in STREAMS if not s.event_bearing]
|
||||||
|
assert len(non_event) == 1, (
|
||||||
|
f"expected exactly one non-event-bearing stream, got {[s.name for s in non_event]}"
|
||||||
|
)
|
||||||
|
assert non_event[0].name == "CENTRAL_META"
|
||||||
|
|
||||||
|
|
||||||
|
def test_supervisor_stream_subjects_includes_meta():
|
||||||
|
"""Supervisor creates every stream in JetStream, including META."""
|
||||||
|
from central.supervisor import STREAM_SUBJECTS
|
||||||
|
|
||||||
|
assert "CENTRAL_META" in STREAM_SUBJECTS
|
||||||
|
assert STREAM_SUBJECTS["CENTRAL_META"] == ["central.meta.>"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_supervisor_stream_subjects_includes_all():
|
||||||
|
"""Every registry stream appears in supervisor's derived dict with the right filter."""
|
||||||
|
from central.supervisor import STREAM_SUBJECTS
|
||||||
|
|
||||||
|
assert set(STREAM_SUBJECTS.keys()) == {s.name for s in STREAMS}
|
||||||
|
for s in STREAMS:
|
||||||
|
assert STREAM_SUBJECTS[s.name] == [s.subject_filter]
|
||||||
|
|
||||||
|
|
||||||
|
def test_archive_streams_excludes_non_event_bearing():
|
||||||
|
"""Archive's STREAMS list contains exactly the event_bearing=True entries."""
|
||||||
|
from central.archive import STREAMS as ARCHIVE_STREAMS
|
||||||
|
|
||||||
|
expected = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing]
|
||||||
|
assert ARCHIVE_STREAMS == expected
|
||||||
|
archive_names = {name for name, _ in ARCHIVE_STREAMS}
|
||||||
|
for s in STREAMS:
|
||||||
|
if s.event_bearing:
|
||||||
|
assert s.name in archive_names
|
||||||
|
else:
|
||||||
|
assert s.name not in archive_names
|
||||||
|
|
||||||
|
|
||||||
|
def test_dashboard_streams_matches_dashboard_flag():
|
||||||
|
"""GUI's DASHBOARD_STREAMS matches [s.name for s in STREAMS if s.dashboard]."""
|
||||||
|
from central.gui.routes import DASHBOARD_STREAMS
|
||||||
|
|
||||||
|
expected = [s.name for s in STREAMS if s.dashboard]
|
||||||
|
assert DASHBOARD_STREAMS == expected
|
||||||
Loading…
Add table
Add a link
Reference in a new issue