mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Merge pull request #15 from zvx-echo6/feature/1b-3b-archive-multi-stream
fix(archive): subscribe to all event streams
This commit is contained in:
commit
c31de2499d
2 changed files with 258 additions and 25 deletions
|
|
@ -1,4 +1,8 @@
|
||||||
"""Central archive consumer - JetStream to TimescaleDB."""
|
"""Central archive consumer - JetStream to TimescaleDB.
|
||||||
|
|
||||||
|
Consumes events from multiple NATS JetStream streams and archives them
|
||||||
|
to TimescaleDB. One durable consumer per stream for independent ack tracking.
|
||||||
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
|
@ -12,17 +16,27 @@ import asyncpg
|
||||||
import nats
|
import nats
|
||||||
from nats.js import JetStreamContext
|
from nats.js import JetStreamContext
|
||||||
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
|
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
|
||||||
|
from nats.js.errors import NotFoundError
|
||||||
|
|
||||||
from central.bootstrap_config import get_settings
|
from central.bootstrap_config import get_settings
|
||||||
|
|
||||||
CONSUMER_NAME = "archive"
|
# Event-bearing streams to consume (skip CENTRAL_META - status messages only)
|
||||||
STREAM_NAME = "CENTRAL_WX"
|
STREAMS = [
|
||||||
SUBJECT_FILTER = "central.wx.>"
|
("CENTRAL_WX", "central.wx.>"),
|
||||||
|
("CENTRAL_FIRE", "central.fire.>"),
|
||||||
|
("CENTRAL_QUAKE", "central.quake.>"),
|
||||||
|
]
|
||||||
|
|
||||||
BATCH_SIZE = 100
|
BATCH_SIZE = 100
|
||||||
FETCH_TIMEOUT = 5.0
|
FETCH_TIMEOUT = 5.0
|
||||||
ACK_WAIT = 30
|
ACK_WAIT = 30
|
||||||
|
|
||||||
|
|
||||||
|
def consumer_name_for(stream: str) -> str:
|
||||||
|
"""Generate consumer name for a stream."""
|
||||||
|
return f"archive-{stream.lower()}"
|
||||||
|
|
||||||
|
|
||||||
class JsonFormatter(logging.Formatter):
|
class JsonFormatter(logging.Formatter):
|
||||||
"""JSON log formatter for structured logging."""
|
"""JSON log formatter for structured logging."""
|
||||||
|
|
||||||
|
|
@ -125,24 +139,49 @@ class ArchiveConsumer:
|
||||||
self._js = None
|
self._js = None
|
||||||
logger.info("Disconnected")
|
logger.info("Disconnected")
|
||||||
|
|
||||||
async def _ensure_consumer(self) -> None:
|
async def _cleanup_orphaned_consumer(self) -> None:
|
||||||
"""Ensure the durable consumer exists."""
|
"""Remove orphaned 'archive' consumer from CENTRAL_WX if it exists.
|
||||||
|
|
||||||
|
The old single-stream code used a consumer named 'archive' on CENTRAL_WX.
|
||||||
|
Now we use 'archive-central_wx' instead. Clean up the old one.
|
||||||
|
"""
|
||||||
if not self._js:
|
if not self._js:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._js.consumer_info(STREAM_NAME, CONSUMER_NAME)
|
await self._js.consumer_info("CENTRAL_WX", "archive")
|
||||||
logger.info("Consumer exists", extra={"consumer": CONSUMER_NAME})
|
await self._js.delete_consumer("CENTRAL_WX", "archive")
|
||||||
except nats.js.errors.NotFoundError:
|
logger.info("Removed orphaned 'archive' consumer from CENTRAL_WX")
|
||||||
|
except NotFoundError:
|
||||||
|
pass # Already gone or never existed
|
||||||
|
|
||||||
|
async def _ensure_consumer(
|
||||||
|
self, stream_name: str, subject_filter: str, consumer_name: str
|
||||||
|
) -> None:
|
||||||
|
"""Ensure the durable consumer exists for a stream."""
|
||||||
|
if not self._js:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._js.consumer_info(stream_name, consumer_name)
|
||||||
|
logger.info(
|
||||||
|
"Consumer exists",
|
||||||
|
extra={"stream": stream_name, "consumer": consumer_name}
|
||||||
|
)
|
||||||
|
except NotFoundError:
|
||||||
consumer_config = ConsumerConfig(
|
consumer_config = ConsumerConfig(
|
||||||
durable_name=CONSUMER_NAME,
|
durable_name=consumer_name,
|
||||||
deliver_policy=DeliverPolicy.ALL,
|
deliver_policy=DeliverPolicy.ALL,
|
||||||
ack_policy=AckPolicy.EXPLICIT,
|
ack_policy=AckPolicy.EXPLICIT,
|
||||||
ack_wait=ACK_WAIT,
|
ack_wait=ACK_WAIT,
|
||||||
filter_subject=SUBJECT_FILTER,
|
max_deliver=5,
|
||||||
|
filter_subject=subject_filter,
|
||||||
|
)
|
||||||
|
await self._js.add_consumer(stream_name, consumer_config)
|
||||||
|
logger.info(
|
||||||
|
"Consumer created",
|
||||||
|
extra={"stream": stream_name, "consumer": consumer_name}
|
||||||
)
|
)
|
||||||
await self._js.add_consumer(STREAM_NAME, consumer_config)
|
|
||||||
logger.info("Consumer created", extra={"consumer": CONSUMER_NAME})
|
|
||||||
|
|
||||||
async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None:
|
async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None:
|
||||||
"""Process a single message and insert into database."""
|
"""Process a single message and insert into database."""
|
||||||
|
|
@ -241,22 +280,24 @@ class ArchiveConsumer:
|
||||||
)
|
)
|
||||||
# Don't ack - let it be redelivered
|
# Don't ack - let it be redelivered
|
||||||
|
|
||||||
async def _consume_loop(self) -> None:
|
async def _consume_stream(
|
||||||
"""Main consume loop."""
|
self, stream_name: str, subject_filter: str, consumer_name: str
|
||||||
|
) -> None:
|
||||||
|
"""Consume loop for a single stream."""
|
||||||
if not self._js or not self._pool:
|
if not self._js or not self._pool:
|
||||||
return
|
return
|
||||||
|
|
||||||
await self._ensure_consumer()
|
await self._ensure_consumer(stream_name, subject_filter, consumer_name)
|
||||||
|
|
||||||
sub = await self._js.pull_subscribe(
|
sub = await self._js.pull_subscribe(
|
||||||
SUBJECT_FILTER,
|
subject_filter,
|
||||||
durable=CONSUMER_NAME,
|
durable=consumer_name,
|
||||||
stream=STREAM_NAME,
|
stream=stream_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Subscribed to stream",
|
"Subscribed to stream",
|
||||||
extra={"stream": STREAM_NAME, "filter": SUBJECT_FILTER}
|
extra={"stream": stream_name, "filter": subject_filter}
|
||||||
)
|
)
|
||||||
|
|
||||||
while not self._shutdown_event.is_set():
|
while not self._shutdown_event.is_set():
|
||||||
|
|
@ -277,19 +318,62 @@ class ArchiveConsumer:
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception("Error in consume loop", extra={"error": str(e)})
|
logger.exception(
|
||||||
|
"Error in consume loop",
|
||||||
|
extra={"stream": stream_name, "error": str(e)}
|
||||||
|
)
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
logger.info("Consume loop stopped")
|
logger.info("Consume loop stopped", extra={"stream": stream_name})
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Start the consumer."""
|
"""Start the consumer."""
|
||||||
await self.connect()
|
await self.connect()
|
||||||
|
await self._cleanup_orphaned_consumer()
|
||||||
logger.info("Archive consumer ready")
|
logger.info("Archive consumer ready")
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
"""Run the consume loop until shutdown."""
|
"""Run consume loops for all streams until shutdown."""
|
||||||
await self._consume_loop()
|
tasks = []
|
||||||
|
for stream_name, subject_filter in STREAMS:
|
||||||
|
consumer_name = consumer_name_for(stream_name)
|
||||||
|
task = asyncio.create_task(
|
||||||
|
self._consume_stream(stream_name, subject_filter, consumer_name),
|
||||||
|
name=f"consume-{stream_name}",
|
||||||
|
)
|
||||||
|
tasks.append(task)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Wait for all tasks; if one fails, cancel the others
|
||||||
|
done, pending = await asyncio.wait(
|
||||||
|
tasks,
|
||||||
|
return_when=asyncio.FIRST_EXCEPTION,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check for exceptions in completed tasks
|
||||||
|
for task in done:
|
||||||
|
if task.exception():
|
||||||
|
logger.error(
|
||||||
|
"Stream consumer failed",
|
||||||
|
extra={"task": task.get_name(), "error": str(task.exception())}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cancel any remaining tasks
|
||||||
|
for task in pending:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# Shutdown requested, cancel all tasks
|
||||||
|
for task in tasks:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop the consumer gracefully."""
|
"""Stop the consumer gracefully."""
|
||||||
|
|
@ -308,7 +392,6 @@ async def async_main() -> None:
|
||||||
"Archive starting",
|
"Archive starting",
|
||||||
extra={
|
extra={
|
||||||
"nats_url": settings.nats_url,
|
"nats_url": settings.nats_url,
|
||||||
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
150
tests/test_archive_multi_stream.py
Normal file
150
tests/test_archive_multi_stream.py
Normal file
|
|
@ -0,0 +1,150 @@
|
||||||
|
"""Tests for multi-stream archive consumer."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
from central.archive import (
|
||||||
|
STREAMS,
|
||||||
|
consumer_name_for,
|
||||||
|
ArchiveConsumer,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConsumerNaming:
|
||||||
|
"""Test consumer naming convention."""
|
||||||
|
|
||||||
|
def test_consumer_name_for_central_wx(self):
|
||||||
|
"""Consumer name for CENTRAL_WX is archive-central_wx."""
|
||||||
|
assert consumer_name_for("CENTRAL_WX") == "archive-central_wx"
|
||||||
|
|
||||||
|
def test_consumer_name_for_central_fire(self):
|
||||||
|
"""Consumer name for CENTRAL_FIRE is archive-central_fire."""
|
||||||
|
assert consumer_name_for("CENTRAL_FIRE") == "archive-central_fire"
|
||||||
|
|
||||||
|
def test_consumer_name_for_central_quake(self):
|
||||||
|
"""Consumer name for CENTRAL_QUAKE is archive-central_quake."""
|
||||||
|
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreamsConfiguration:
|
||||||
|
"""Test streams configuration."""
|
||||||
|
|
||||||
|
def test_streams_list_has_three_entries(self):
|
||||||
|
"""STREAMS list has three event-bearing streams."""
|
||||||
|
assert len(STREAMS) == 3
|
||||||
|
|
||||||
|
def test_streams_contains_central_wx(self):
|
||||||
|
"""STREAMS contains CENTRAL_WX with correct filter."""
|
||||||
|
assert ("CENTRAL_WX", "central.wx.>") in STREAMS
|
||||||
|
|
||||||
|
def test_streams_contains_central_fire(self):
|
||||||
|
"""STREAMS contains CENTRAL_FIRE with correct filter."""
|
||||||
|
assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
|
||||||
|
|
||||||
|
def test_streams_contains_central_quake(self):
|
||||||
|
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
|
||||||
|
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
|
||||||
|
|
||||||
|
def test_streams_excludes_central_meta(self):
|
||||||
|
"""STREAMS does not contain CENTRAL_META (status messages only)."""
|
||||||
|
stream_names = [s[0] for s in STREAMS]
|
||||||
|
assert "CENTRAL_META" not in stream_names
|
||||||
|
|
||||||
|
|
||||||
|
class TestOrphanedConsumerCleanup:
|
||||||
|
"""Test cleanup of orphaned 'archive' consumer."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cleanup_removes_orphaned_consumer_when_exists(self):
|
||||||
|
"""Cleanup removes 'archive' consumer from CENTRAL_WX when it exists."""
|
||||||
|
consumer = ArchiveConsumer(
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
postgres_dsn="postgresql://test:test@localhost/test",
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_js = AsyncMock()
|
||||||
|
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
||||||
|
mock_js.delete_consumer = AsyncMock()
|
||||||
|
consumer._js = mock_js
|
||||||
|
|
||||||
|
await consumer._cleanup_orphaned_consumer()
|
||||||
|
|
||||||
|
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
||||||
|
mock_js.delete_consumer.assert_called_once_with("CENTRAL_WX", "archive")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cleanup_handles_not_found_gracefully(self):
|
||||||
|
"""Cleanup handles NotFoundError when 'archive' consumer doesn't exist."""
|
||||||
|
from nats.js.errors import NotFoundError
|
||||||
|
|
||||||
|
consumer = ArchiveConsumer(
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
postgres_dsn="postgresql://test:test@localhost/test",
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_js = AsyncMock()
|
||||||
|
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
||||||
|
mock_js.delete_consumer = AsyncMock()
|
||||||
|
consumer._js = mock_js
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
await consumer._cleanup_orphaned_consumer()
|
||||||
|
|
||||||
|
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
||||||
|
mock_js.delete_consumer.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnsureConsumer:
|
||||||
|
"""Test consumer creation for each stream."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ensure_consumer_creates_when_not_exists(self):
|
||||||
|
"""_ensure_consumer creates consumer when it doesn't exist."""
|
||||||
|
from nats.js.errors import NotFoundError
|
||||||
|
|
||||||
|
consumer = ArchiveConsumer(
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
postgres_dsn="postgresql://test:test@localhost/test",
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_js = AsyncMock()
|
||||||
|
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
||||||
|
mock_js.add_consumer = AsyncMock()
|
||||||
|
consumer._js = mock_js
|
||||||
|
|
||||||
|
await consumer._ensure_consumer(
|
||||||
|
"CENTRAL_FIRE", "central.fire.>", "archive-central_fire"
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_js.consumer_info.assert_called_once_with(
|
||||||
|
"CENTRAL_FIRE", "archive-central_fire"
|
||||||
|
)
|
||||||
|
mock_js.add_consumer.assert_called_once()
|
||||||
|
# Verify the consumer config
|
||||||
|
call_args = mock_js.add_consumer.call_args
|
||||||
|
assert call_args[0][0] == "CENTRAL_FIRE"
|
||||||
|
config = call_args[0][1]
|
||||||
|
assert config.durable_name == "archive-central_fire"
|
||||||
|
assert config.filter_subject == "central.fire.>"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ensure_consumer_skips_when_exists(self):
|
||||||
|
"""_ensure_consumer does nothing when consumer already exists."""
|
||||||
|
consumer = ArchiveConsumer(
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
postgres_dsn="postgresql://test:test@localhost/test",
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_js = AsyncMock()
|
||||||
|
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
||||||
|
mock_js.add_consumer = AsyncMock()
|
||||||
|
consumer._js = mock_js
|
||||||
|
|
||||||
|
await consumer._ensure_consumer(
|
||||||
|
"CENTRAL_QUAKE", "central.quake.>", "archive-central_quake"
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_js.consumer_info.assert_called_once_with(
|
||||||
|
"CENTRAL_QUAKE", "archive-central_quake"
|
||||||
|
)
|
||||||
|
mock_js.add_consumer.assert_not_called()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue