fix(archive): subscribe to all event streams

- One durable consumer per event-bearing stream (CENTRAL_WX,
  CENTRAL_FIRE, CENTRAL_QUAKE) for independent ack tracking
- max_deliver=5 prevents poison-message infinite loops
- Orphaned 'archive' consumer on CENTRAL_WX cleaned up on startup
- Consumer naming: archive-{stream_name_lower}

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-17 19:29:38 +00:00
commit 6b5f6709e4
2 changed files with 258 additions and 25 deletions

View file

@ -1,4 +1,8 @@
"""Central archive consumer - JetStream to TimescaleDB.""" """Central archive consumer - JetStream to TimescaleDB.
Consumes events from multiple NATS JetStream streams and archives them
to TimescaleDB. One durable consumer per stream for independent ack tracking.
"""
import asyncio import asyncio
import json import json
@ -12,17 +16,27 @@ import asyncpg
import nats import nats
from nats.js import JetStreamContext from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
from nats.js.errors import NotFoundError
from central.bootstrap_config import get_settings from central.bootstrap_config import get_settings
CONSUMER_NAME = "archive" # Event-bearing streams to consume (skip CENTRAL_META - status messages only)
STREAM_NAME = "CENTRAL_WX" STREAMS = [
SUBJECT_FILTER = "central.wx.>" ("CENTRAL_WX", "central.wx.>"),
("CENTRAL_FIRE", "central.fire.>"),
("CENTRAL_QUAKE", "central.quake.>"),
]
BATCH_SIZE = 100 BATCH_SIZE = 100
FETCH_TIMEOUT = 5.0 FETCH_TIMEOUT = 5.0
ACK_WAIT = 30 ACK_WAIT = 30
def consumer_name_for(stream: str) -> str:
"""Generate consumer name for a stream."""
return f"archive-{stream.lower()}"
class JsonFormatter(logging.Formatter): class JsonFormatter(logging.Formatter):
"""JSON log formatter for structured logging.""" """JSON log formatter for structured logging."""
@ -125,24 +139,49 @@ class ArchiveConsumer:
self._js = None self._js = None
logger.info("Disconnected") logger.info("Disconnected")
async def _ensure_consumer(self) -> None: async def _cleanup_orphaned_consumer(self) -> None:
"""Ensure the durable consumer exists.""" """Remove orphaned 'archive' consumer from CENTRAL_WX if it exists.
The old single-stream code used a consumer named 'archive' on CENTRAL_WX.
Now we use 'archive-central_wx' instead. Clean up the old one.
"""
if not self._js: if not self._js:
return return
try: try:
await self._js.consumer_info(STREAM_NAME, CONSUMER_NAME) await self._js.consumer_info("CENTRAL_WX", "archive")
logger.info("Consumer exists", extra={"consumer": CONSUMER_NAME}) await self._js.delete_consumer("CENTRAL_WX", "archive")
except nats.js.errors.NotFoundError: logger.info("Removed orphaned 'archive' consumer from CENTRAL_WX")
except NotFoundError:
pass # Already gone or never existed
async def _ensure_consumer(
self, stream_name: str, subject_filter: str, consumer_name: str
) -> None:
"""Ensure the durable consumer exists for a stream."""
if not self._js:
return
try:
await self._js.consumer_info(stream_name, consumer_name)
logger.info(
"Consumer exists",
extra={"stream": stream_name, "consumer": consumer_name}
)
except NotFoundError:
consumer_config = ConsumerConfig( consumer_config = ConsumerConfig(
durable_name=CONSUMER_NAME, durable_name=consumer_name,
deliver_policy=DeliverPolicy.ALL, deliver_policy=DeliverPolicy.ALL,
ack_policy=AckPolicy.EXPLICIT, ack_policy=AckPolicy.EXPLICIT,
ack_wait=ACK_WAIT, ack_wait=ACK_WAIT,
filter_subject=SUBJECT_FILTER, max_deliver=5,
filter_subject=subject_filter,
)
await self._js.add_consumer(stream_name, consumer_config)
logger.info(
"Consumer created",
extra={"stream": stream_name, "consumer": consumer_name}
) )
await self._js.add_consumer(STREAM_NAME, consumer_config)
logger.info("Consumer created", extra={"consumer": CONSUMER_NAME})
async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None: async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None:
"""Process a single message and insert into database.""" """Process a single message and insert into database."""
@ -241,22 +280,24 @@ class ArchiveConsumer:
) )
# Don't ack - let it be redelivered # Don't ack - let it be redelivered
async def _consume_loop(self) -> None: async def _consume_stream(
"""Main consume loop.""" self, stream_name: str, subject_filter: str, consumer_name: str
) -> None:
"""Consume loop for a single stream."""
if not self._js or not self._pool: if not self._js or not self._pool:
return return
await self._ensure_consumer() await self._ensure_consumer(stream_name, subject_filter, consumer_name)
sub = await self._js.pull_subscribe( sub = await self._js.pull_subscribe(
SUBJECT_FILTER, subject_filter,
durable=CONSUMER_NAME, durable=consumer_name,
stream=STREAM_NAME, stream=stream_name,
) )
logger.info( logger.info(
"Subscribed to stream", "Subscribed to stream",
extra={"stream": STREAM_NAME, "filter": SUBJECT_FILTER} extra={"stream": stream_name, "filter": subject_filter}
) )
while not self._shutdown_event.is_set(): while not self._shutdown_event.is_set():
@ -277,19 +318,62 @@ class ArchiveConsumer:
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
logger.exception("Error in consume loop", extra={"error": str(e)}) logger.exception(
"Error in consume loop",
extra={"stream": stream_name, "error": str(e)}
)
await asyncio.sleep(1) await asyncio.sleep(1)
logger.info("Consume loop stopped") logger.info("Consume loop stopped", extra={"stream": stream_name})
async def start(self) -> None: async def start(self) -> None:
"""Start the consumer.""" """Start the consumer."""
await self.connect() await self.connect()
await self._cleanup_orphaned_consumer()
logger.info("Archive consumer ready") logger.info("Archive consumer ready")
async def run(self) -> None: async def run(self) -> None:
"""Run the consume loop until shutdown.""" """Run consume loops for all streams until shutdown."""
await self._consume_loop() tasks = []
for stream_name, subject_filter in STREAMS:
consumer_name = consumer_name_for(stream_name)
task = asyncio.create_task(
self._consume_stream(stream_name, subject_filter, consumer_name),
name=f"consume-{stream_name}",
)
tasks.append(task)
try:
# Wait for all tasks; if one fails, cancel the others
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION,
)
# Check for exceptions in completed tasks
for task in done:
if task.exception():
logger.error(
"Stream consumer failed",
extra={"task": task.get_name(), "error": str(task.exception())}
)
# Cancel any remaining tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
# Shutdown requested, cancel all tasks
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the consumer gracefully.""" """Stop the consumer gracefully."""
@ -308,7 +392,6 @@ async def async_main() -> None:
"Archive starting", "Archive starting",
extra={ extra={
"nats_url": settings.nats_url, "nats_url": settings.nats_url,
}, },
) )

View file

@ -0,0 +1,150 @@
"""Tests for multi-stream archive consumer."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from central.archive import (
STREAMS,
consumer_name_for,
ArchiveConsumer,
)
class TestConsumerNaming:
"""Test consumer naming convention."""
def test_consumer_name_for_central_wx(self):
"""Consumer name for CENTRAL_WX is archive-central_wx."""
assert consumer_name_for("CENTRAL_WX") == "archive-central_wx"
def test_consumer_name_for_central_fire(self):
"""Consumer name for CENTRAL_FIRE is archive-central_fire."""
assert consumer_name_for("CENTRAL_FIRE") == "archive-central_fire"
def test_consumer_name_for_central_quake(self):
"""Consumer name for CENTRAL_QUAKE is archive-central_quake."""
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
class TestStreamsConfiguration:
"""Test streams configuration."""
def test_streams_list_has_three_entries(self):
"""STREAMS list has three event-bearing streams."""
assert len(STREAMS) == 3
def test_streams_contains_central_wx(self):
"""STREAMS contains CENTRAL_WX with correct filter."""
assert ("CENTRAL_WX", "central.wx.>") in STREAMS
def test_streams_contains_central_fire(self):
"""STREAMS contains CENTRAL_FIRE with correct filter."""
assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
def test_streams_contains_central_quake(self):
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
def test_streams_excludes_central_meta(self):
"""STREAMS does not contain CENTRAL_META (status messages only)."""
stream_names = [s[0] for s in STREAMS]
assert "CENTRAL_META" not in stream_names
class TestOrphanedConsumerCleanup:
"""Test cleanup of orphaned 'archive' consumer."""
@pytest.mark.asyncio
async def test_cleanup_removes_orphaned_consumer_when_exists(self):
"""Cleanup removes 'archive' consumer from CENTRAL_WX when it exists."""
consumer = ArchiveConsumer(
nats_url="nats://localhost:4222",
postgres_dsn="postgresql://test:test@localhost/test",
)
mock_js = AsyncMock()
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
mock_js.delete_consumer = AsyncMock()
consumer._js = mock_js
await consumer._cleanup_orphaned_consumer()
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
mock_js.delete_consumer.assert_called_once_with("CENTRAL_WX", "archive")
@pytest.mark.asyncio
async def test_cleanup_handles_not_found_gracefully(self):
"""Cleanup handles NotFoundError when 'archive' consumer doesn't exist."""
from nats.js.errors import NotFoundError
consumer = ArchiveConsumer(
nats_url="nats://localhost:4222",
postgres_dsn="postgresql://test:test@localhost/test",
)
mock_js = AsyncMock()
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
mock_js.delete_consumer = AsyncMock()
consumer._js = mock_js
# Should not raise
await consumer._cleanup_orphaned_consumer()
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
mock_js.delete_consumer.assert_not_called()
class TestEnsureConsumer:
"""Test consumer creation for each stream."""
@pytest.mark.asyncio
async def test_ensure_consumer_creates_when_not_exists(self):
"""_ensure_consumer creates consumer when it doesn't exist."""
from nats.js.errors import NotFoundError
consumer = ArchiveConsumer(
nats_url="nats://localhost:4222",
postgres_dsn="postgresql://test:test@localhost/test",
)
mock_js = AsyncMock()
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
mock_js.add_consumer = AsyncMock()
consumer._js = mock_js
await consumer._ensure_consumer(
"CENTRAL_FIRE", "central.fire.>", "archive-central_fire"
)
mock_js.consumer_info.assert_called_once_with(
"CENTRAL_FIRE", "archive-central_fire"
)
mock_js.add_consumer.assert_called_once()
# Verify the consumer config
call_args = mock_js.add_consumer.call_args
assert call_args[0][0] == "CENTRAL_FIRE"
config = call_args[0][1]
assert config.durable_name == "archive-central_fire"
assert config.filter_subject == "central.fire.>"
@pytest.mark.asyncio
async def test_ensure_consumer_skips_when_exists(self):
"""_ensure_consumer does nothing when consumer already exists."""
consumer = ArchiveConsumer(
nats_url="nats://localhost:4222",
postgres_dsn="postgresql://test:test@localhost/test",
)
mock_js = AsyncMock()
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
mock_js.add_consumer = AsyncMock()
consumer._js = mock_js
await consumer._ensure_consumer(
"CENTRAL_QUAKE", "central.quake.>", "archive-central_quake"
)
mock_js.consumer_info.assert_called_once_with(
"CENTRAL_QUAKE", "archive-central_quake"
)
mock_js.add_consumer.assert_not_called()