From f9426caa27a035f2069cae4a77a0f1a0b5aa345f Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sat, 16 May 2026 18:50:12 +0000 Subject: [PATCH] feat: add stream management infrastructure - config_store: add stream CRUD methods - stream_manager: ensure_stream, apply_retention, recompute_max_bytes - Auto-clamp max_bytes to [1GB floor, 30% ceiling] - Parse server max_file_store from nats-server.conf Co-Authored-By: Claude Opus 4.5 --- src/central/config_store.py | 65 ++++++++- src/central/stream_manager.py | 262 ++++++++++++++++++++++++++++++++++ 2 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 src/central/stream_manager.py diff --git a/src/central/config_store.py b/src/central/config_store.py index 0f03826..20415b9 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -12,7 +12,7 @@ from typing import Any import asyncpg -from central.config_models import AdapterConfig +from central.config_models import AdapterConfig, StreamConfig from central.crypto import decrypt, encrypt logger = logging.getLogger(__name__) @@ -129,6 +129,69 @@ class ConfigStore: name, ) + # ------------------------------------------------------------------------- + # Stream configuration + # ------------------------------------------------------------------------- + + async def get_stream(self, name: str) -> StreamConfig | None: + """Get configuration for a specific stream.""" + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT name, max_age_s, max_bytes, managed_max_bytes, updated_at + FROM config.streams + WHERE name = $1 + """, + name, + ) + if row is None: + return None + return StreamConfig(**dict(row)) + + async def list_streams(self) -> list[StreamConfig]: + """List all configured streams.""" + async with self._pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT name, max_age_s, max_bytes, managed_max_bytes, updated_at + FROM config.streams + ORDER BY name + """ + ) + return [StreamConfig(**dict(row)) for row in rows] + + async def upsert_stream(self, name: str, max_age_s: int) -> None: + """Insert or update a stream's max_age_s (operator-facing).""" + async with self._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO config.streams (name, max_age_s, updated_at) + VALUES ($1, $2, now()) + ON CONFLICT (name) DO UPDATE SET + max_age_s = EXCLUDED.max_age_s, + updated_at = now() + """, + name, + max_age_s, + ) + + async def update_stream_max_bytes(self, name: str, max_bytes: int) -> None: + """Update a stream's max_bytes (supervisor-internal). + + This update only touches max_bytes, which does NOT trigger + the column-filtered NOTIFY (only max_age_s changes fire NOTIFY). + """ + async with self._pool.acquire() as conn: + await conn.execute( + """ + UPDATE config.streams + SET max_bytes = $2, updated_at = now() + WHERE name = $1 + """, + name, + max_bytes, + ) + # ------------------------------------------------------------------------- # API key management # ------------------------------------------------------------------------- diff --git a/src/central/stream_manager.py b/src/central/stream_manager.py new file mode 100644 index 0000000..4b9b5ba --- /dev/null +++ b/src/central/stream_manager.py @@ -0,0 +1,262 @@ +"""JetStream stream manager for retention configuration.""" + +import logging +import re +from pathlib import Path +from typing import Any + +from nats.js import JetStreamContext +from nats.js.api import StreamConfig, DiscardPolicy, RetentionPolicy + +from central.config_models import StreamConfig as StreamConfigModel + +logger = logging.getLogger(__name__) + +# Constants +ONE_GB = 1024 * 1024 * 1024 # 1 GiB in bytes +NATS_CONFIG_PATH = Path("/etc/nats/nats-server.conf") + + +class StreamManager: + """Manages JetStream stream configuration and retention.""" + + def __init__(self, js: JetStreamContext) -> None: + self._js = js + self._server_max_file_store: int | None = None + + async def server_max_file_store_bytes(self) -> int: + """Get the server's max_file_store setting in bytes. + + Parses the NATS server config file and caches the result. + Returns a default of 20GB if config cannot be read. + """ + if self._server_max_file_store is not None: + return self._server_max_file_store + + default_value = 20 * ONE_GB # 20GB default + + try: + config_text = NATS_CONFIG_PATH.read_text() + + # Parse max_file_store value (supports GB/MB/KB suffixes) + match = re.search(r'max_file_store:\s*(\d+)(GB|MB|KB|G|M|K)?', config_text, re.IGNORECASE) + if match: + value = int(match.group(1)) + suffix = (match.group(2) or "").upper() + + if suffix in ("GB", "G"): + value *= ONE_GB + elif suffix in ("MB", "M"): + value *= 1024 * 1024 + elif suffix in ("KB", "K"): + value *= 1024 + # else: assume bytes + + self._server_max_file_store = value + logger.info( + "Parsed server max_file_store", + extra={"max_file_store_bytes": value}, + ) + return value + + logger.warning( + "max_file_store not found in config, using default", + extra={"default": default_value}, + ) + self._server_max_file_store = default_value + return default_value + + except Exception as e: + logger.warning( + "Failed to read NATS config, using default", + extra={"error": str(e), "default": default_value}, + ) + self._server_max_file_store = default_value + return default_value + + def _compute_ceiling(self, server_max: int) -> int: + """Compute per-stream ceiling as 30% of server max_file_store.""" + return int(server_max * 0.30) + + async def ensure_stream( + self, + name: str, + subjects: list[str], + config: StreamConfigModel, + ) -> None: + """Ensure a stream exists with the given configuration. + + Creates the stream if it doesn't exist, or updates it if it does. + Always enforces: discard=old, max_msgs=-1 (unlimited). + """ + server_max = await self.server_max_file_store_bytes() + ceiling = self._compute_ceiling(server_max) + + # Clamp max_bytes to [1GB, ceiling] + max_bytes = max(ONE_GB, min(config.max_bytes, ceiling)) + + stream_config = StreamConfig( + name=name, + subjects=subjects, + retention=RetentionPolicy.LIMITS, + discard=DiscardPolicy.OLD, + max_age=config.max_age_s, + max_bytes=max_bytes, + max_msgs=-1, # Unlimited messages + ) + + try: + # Try to get existing stream + existing = await self._js.stream_info(name) + + # Update if config differs + await self._js.update_stream(config=stream_config) + logger.info( + "Updated stream", + extra={ + "stream": name, + "max_age_s": config.max_age_s, + "max_bytes": max_bytes, + }, + ) + + except Exception as e: + if "stream not found" in str(e).lower(): + # Create new stream + await self._js.add_stream(config=stream_config) + logger.info( + "Created stream", + extra={ + "stream": name, + "subjects": subjects, + "max_age_s": config.max_age_s, + "max_bytes": max_bytes, + }, + ) + else: + raise + + async def apply_retention(self, name: str, config: StreamConfigModel) -> None: + """Apply retention settings to an existing stream. + + Updates max_age and max_bytes. Always enforces discard=old, max_msgs=-1. + """ + server_max = await self.server_max_file_store_bytes() + ceiling = self._compute_ceiling(server_max) + + # Clamp max_bytes to [1GB, ceiling] + max_bytes = max(ONE_GB, min(config.max_bytes, ceiling)) + + try: + # Get current stream config + info = await self._js.stream_info(name) + current = info.config + + # Build updated config + updated = StreamConfig( + name=name, + subjects=current.subjects, + retention=RetentionPolicy.LIMITS, + discard=DiscardPolicy.OLD, + max_age=config.max_age_s, + max_bytes=max_bytes, + max_msgs=-1, + ) + + await self._js.update_stream(config=updated) + logger.info( + "Applied retention", + extra={ + "stream": name, + "max_age_s": config.max_age_s, + "max_bytes": max_bytes, + }, + ) + + except Exception as e: + logger.error( + "Failed to apply retention", + extra={"stream": name, "error": str(e)}, + ) + raise + + async def recompute_max_bytes(self, name: str, max_age_s: int) -> int: + """Recompute max_bytes based on observed throughput. + + Formula: rate × max_age × 1.5 safety margin, clamped to [1GB, ceiling]. + + Returns the computed max_bytes value. + """ + server_max = await self.server_max_file_store_bytes() + ceiling = self._compute_ceiling(server_max) + + try: + info = await self._js.stream_info(name) + current_bytes = info.state.bytes + current_msgs = info.state.messages + + # Get stream age from first message + first_seq = info.state.first_seq + last_seq = info.state.last_seq + + if current_msgs == 0 or last_seq == 0: + # No messages yet, use floor + return ONE_GB + + # Estimate message age span (approximation) + # Use stream's configured max_age as the observation window + configured_max_age = info.config.max_age + + if configured_max_age > 0: + # Rate = current_bytes / configured_max_age (in seconds) + rate_per_second = current_bytes / configured_max_age + else: + # Fallback: assume 1 day of data + rate_per_second = current_bytes / 86400 + + # Project bytes needed for new max_age with 1.5x safety margin + projected = int(rate_per_second * max_age_s * 1.5) + + # Clamp to [1GB, ceiling] + result = max(ONE_GB, min(projected, ceiling)) + + logger.info( + "Recomputed max_bytes", + extra={ + "stream": name, + "current_bytes": current_bytes, + "rate_per_second": rate_per_second, + "max_age_s": max_age_s, + "projected": projected, + "result": result, + "ceiling": ceiling, + }, + ) + + return result + + except Exception as e: + logger.error( + "Failed to recompute max_bytes, using floor", + extra={"stream": name, "error": str(e)}, + ) + return ONE_GB + + async def get_stream_stats(self, name: str) -> dict[str, Any]: + """Get current stream statistics for monitoring.""" + try: + info = await self._js.stream_info(name) + return { + "stream": name, + "bytes": info.state.bytes, + "messages": info.state.messages, + "max_bytes": info.config.max_bytes, + "max_age_s": info.config.max_age, + "consumers": info.state.consumer_count, + } + except Exception as e: + logger.error( + "Failed to get stream stats", + extra={"stream": name, "error": str(e)}, + ) + return {"stream": name, "error": str(e)}