From 6b5f6709e45506423c460517a87bcc0cc07c9694 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sun, 17 May 2026 19:29:38 +0000 Subject: [PATCH] fix(archive): subscribe to all event streams - One durable consumer per event-bearing stream (CENTRAL_WX, CENTRAL_FIRE, CENTRAL_QUAKE) for independent ack tracking - max_deliver=5 prevents poison-message infinite loops - Orphaned 'archive' consumer on CENTRAL_WX cleaned up on startup - Consumer naming: archive-{stream_name_lower} Co-Authored-By: Claude Opus 4.5 --- src/central/archive.py | 133 ++++++++++++++++++++----- tests/test_archive_multi_stream.py | 150 +++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+), 25 deletions(-) create mode 100644 tests/test_archive_multi_stream.py diff --git a/src/central/archive.py b/src/central/archive.py index 203d1c7..b64a187 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -1,4 +1,8 @@ -"""Central archive consumer - JetStream to TimescaleDB.""" +"""Central archive consumer - JetStream to TimescaleDB. + +Consumes events from multiple NATS JetStream streams and archives them +to TimescaleDB. One durable consumer per stream for independent ack tracking. +""" import asyncio import json @@ -12,17 +16,27 @@ import asyncpg import nats from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy +from nats.js.errors import NotFoundError from central.bootstrap_config import get_settings -CONSUMER_NAME = "archive" -STREAM_NAME = "CENTRAL_WX" -SUBJECT_FILTER = "central.wx.>" +# Event-bearing streams to consume (skip CENTRAL_META - status messages only) +STREAMS = [ + ("CENTRAL_WX", "central.wx.>"), + ("CENTRAL_FIRE", "central.fire.>"), + ("CENTRAL_QUAKE", "central.quake.>"), +] + BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 ACK_WAIT = 30 +def consumer_name_for(stream: str) -> str: + """Generate consumer name for a stream.""" + return f"archive-{stream.lower()}" + + class JsonFormatter(logging.Formatter): """JSON log formatter for structured logging.""" @@ -125,24 +139,49 @@ class ArchiveConsumer: self._js = None logger.info("Disconnected") - async def _ensure_consumer(self) -> None: - """Ensure the durable consumer exists.""" + async def _cleanup_orphaned_consumer(self) -> None: + """Remove orphaned 'archive' consumer from CENTRAL_WX if it exists. + + The old single-stream code used a consumer named 'archive' on CENTRAL_WX. + Now we use 'archive-central_wx' instead. Clean up the old one. + """ if not self._js: return try: - await self._js.consumer_info(STREAM_NAME, CONSUMER_NAME) - logger.info("Consumer exists", extra={"consumer": CONSUMER_NAME}) - except nats.js.errors.NotFoundError: + await self._js.consumer_info("CENTRAL_WX", "archive") + await self._js.delete_consumer("CENTRAL_WX", "archive") + logger.info("Removed orphaned 'archive' consumer from CENTRAL_WX") + except NotFoundError: + pass # Already gone or never existed + + async def _ensure_consumer( + self, stream_name: str, subject_filter: str, consumer_name: str + ) -> None: + """Ensure the durable consumer exists for a stream.""" + if not self._js: + return + + try: + await self._js.consumer_info(stream_name, consumer_name) + logger.info( + "Consumer exists", + extra={"stream": stream_name, "consumer": consumer_name} + ) + except NotFoundError: consumer_config = ConsumerConfig( - durable_name=CONSUMER_NAME, + durable_name=consumer_name, deliver_policy=DeliverPolicy.ALL, ack_policy=AckPolicy.EXPLICIT, ack_wait=ACK_WAIT, - filter_subject=SUBJECT_FILTER, + max_deliver=5, + filter_subject=subject_filter, + ) + await self._js.add_consumer(stream_name, consumer_config) + logger.info( + "Consumer created", + extra={"stream": stream_name, "consumer": consumer_name} ) - await self._js.add_consumer(STREAM_NAME, consumer_config) - logger.info("Consumer created", extra={"consumer": CONSUMER_NAME}) async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None: """Process a single message and insert into database.""" @@ -241,22 +280,24 @@ class ArchiveConsumer: ) # Don't ack - let it be redelivered - async def _consume_loop(self) -> None: - """Main consume loop.""" + async def _consume_stream( + self, stream_name: str, subject_filter: str, consumer_name: str + ) -> None: + """Consume loop for a single stream.""" if not self._js or not self._pool: return - await self._ensure_consumer() + await self._ensure_consumer(stream_name, subject_filter, consumer_name) sub = await self._js.pull_subscribe( - SUBJECT_FILTER, - durable=CONSUMER_NAME, - stream=STREAM_NAME, + subject_filter, + durable=consumer_name, + stream=stream_name, ) logger.info( "Subscribed to stream", - extra={"stream": STREAM_NAME, "filter": SUBJECT_FILTER} + extra={"stream": stream_name, "filter": subject_filter} ) while not self._shutdown_event.is_set(): @@ -277,19 +318,62 @@ class ArchiveConsumer: except asyncio.CancelledError: break except Exception as e: - logger.exception("Error in consume loop", extra={"error": str(e)}) + logger.exception( + "Error in consume loop", + extra={"stream": stream_name, "error": str(e)} + ) await asyncio.sleep(1) - logger.info("Consume loop stopped") + logger.info("Consume loop stopped", extra={"stream": stream_name}) async def start(self) -> None: """Start the consumer.""" await self.connect() + await self._cleanup_orphaned_consumer() logger.info("Archive consumer ready") async def run(self) -> None: - """Run the consume loop until shutdown.""" - await self._consume_loop() + """Run consume loops for all streams until shutdown.""" + tasks = [] + for stream_name, subject_filter in STREAMS: + consumer_name = consumer_name_for(stream_name) + task = asyncio.create_task( + self._consume_stream(stream_name, subject_filter, consumer_name), + name=f"consume-{stream_name}", + ) + tasks.append(task) + + try: + # Wait for all tasks; if one fails, cancel the others + done, pending = await asyncio.wait( + tasks, + return_when=asyncio.FIRST_EXCEPTION, + ) + + # Check for exceptions in completed tasks + for task in done: + if task.exception(): + logger.error( + "Stream consumer failed", + extra={"task": task.get_name(), "error": str(task.exception())} + ) + + # Cancel any remaining tasks + for task in pending: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + except asyncio.CancelledError: + # Shutdown requested, cancel all tasks + for task in tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass async def stop(self) -> None: """Stop the consumer gracefully.""" @@ -308,7 +392,6 @@ async def async_main() -> None: "Archive starting", extra={ "nats_url": settings.nats_url, - }, ) diff --git a/tests/test_archive_multi_stream.py b/tests/test_archive_multi_stream.py new file mode 100644 index 0000000..e3d09a3 --- /dev/null +++ b/tests/test_archive_multi_stream.py @@ -0,0 +1,150 @@ +"""Tests for multi-stream archive consumer.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from central.archive import ( + STREAMS, + consumer_name_for, + ArchiveConsumer, +) + + +class TestConsumerNaming: + """Test consumer naming convention.""" + + def test_consumer_name_for_central_wx(self): + """Consumer name for CENTRAL_WX is archive-central_wx.""" + assert consumer_name_for("CENTRAL_WX") == "archive-central_wx" + + def test_consumer_name_for_central_fire(self): + """Consumer name for CENTRAL_FIRE is archive-central_fire.""" + assert consumer_name_for("CENTRAL_FIRE") == "archive-central_fire" + + def test_consumer_name_for_central_quake(self): + """Consumer name for CENTRAL_QUAKE is archive-central_quake.""" + assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake" + + +class TestStreamsConfiguration: + """Test streams configuration.""" + + def test_streams_list_has_three_entries(self): + """STREAMS list has three event-bearing streams.""" + assert len(STREAMS) == 3 + + def test_streams_contains_central_wx(self): + """STREAMS contains CENTRAL_WX with correct filter.""" + assert ("CENTRAL_WX", "central.wx.>") in STREAMS + + def test_streams_contains_central_fire(self): + """STREAMS contains CENTRAL_FIRE with correct filter.""" + assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS + + def test_streams_contains_central_quake(self): + """STREAMS contains CENTRAL_QUAKE with correct filter.""" + assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS + + def test_streams_excludes_central_meta(self): + """STREAMS does not contain CENTRAL_META (status messages only).""" + stream_names = [s[0] for s in STREAMS] + assert "CENTRAL_META" not in stream_names + + +class TestOrphanedConsumerCleanup: + """Test cleanup of orphaned 'archive' consumer.""" + + @pytest.mark.asyncio + async def test_cleanup_removes_orphaned_consumer_when_exists(self): + """Cleanup removes 'archive' consumer from CENTRAL_WX when it exists.""" + consumer = ArchiveConsumer( + nats_url="nats://localhost:4222", + postgres_dsn="postgresql://test:test@localhost/test", + ) + + mock_js = AsyncMock() + mock_js.consumer_info = AsyncMock(return_value=MagicMock()) + mock_js.delete_consumer = AsyncMock() + consumer._js = mock_js + + await consumer._cleanup_orphaned_consumer() + + mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive") + mock_js.delete_consumer.assert_called_once_with("CENTRAL_WX", "archive") + + @pytest.mark.asyncio + async def test_cleanup_handles_not_found_gracefully(self): + """Cleanup handles NotFoundError when 'archive' consumer doesn't exist.""" + from nats.js.errors import NotFoundError + + consumer = ArchiveConsumer( + nats_url="nats://localhost:4222", + postgres_dsn="postgresql://test:test@localhost/test", + ) + + mock_js = AsyncMock() + mock_js.consumer_info = AsyncMock(side_effect=NotFoundError()) + mock_js.delete_consumer = AsyncMock() + consumer._js = mock_js + + # Should not raise + await consumer._cleanup_orphaned_consumer() + + mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive") + mock_js.delete_consumer.assert_not_called() + + +class TestEnsureConsumer: + """Test consumer creation for each stream.""" + + @pytest.mark.asyncio + async def test_ensure_consumer_creates_when_not_exists(self): + """_ensure_consumer creates consumer when it doesn't exist.""" + from nats.js.errors import NotFoundError + + consumer = ArchiveConsumer( + nats_url="nats://localhost:4222", + postgres_dsn="postgresql://test:test@localhost/test", + ) + + mock_js = AsyncMock() + mock_js.consumer_info = AsyncMock(side_effect=NotFoundError()) + mock_js.add_consumer = AsyncMock() + consumer._js = mock_js + + await consumer._ensure_consumer( + "CENTRAL_FIRE", "central.fire.>", "archive-central_fire" + ) + + mock_js.consumer_info.assert_called_once_with( + "CENTRAL_FIRE", "archive-central_fire" + ) + mock_js.add_consumer.assert_called_once() + # Verify the consumer config + call_args = mock_js.add_consumer.call_args + assert call_args[0][0] == "CENTRAL_FIRE" + config = call_args[0][1] + assert config.durable_name == "archive-central_fire" + assert config.filter_subject == "central.fire.>" + + @pytest.mark.asyncio + async def test_ensure_consumer_skips_when_exists(self): + """_ensure_consumer does nothing when consumer already exists.""" + consumer = ArchiveConsumer( + nats_url="nats://localhost:4222", + postgres_dsn="postgresql://test:test@localhost/test", + ) + + mock_js = AsyncMock() + mock_js.consumer_info = AsyncMock(return_value=MagicMock()) + mock_js.add_consumer = AsyncMock() + consumer._js = mock_js + + await consumer._ensure_consumer( + "CENTRAL_QUAKE", "central.quake.>", "archive-central_quake" + ) + + mock_js.consumer_info.assert_called_once_with( + "CENTRAL_QUAKE", "archive-central_quake" + ) + mock_js.add_consumer.assert_not_called()