diff --git a/src/central/config_store.py b/src/central/config_store.py index 2fda7bc..0f03826 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -6,6 +6,7 @@ Postgres LISTEN/NOTIFY for real-time config change notifications. import asyncio import json +import logging from collections.abc import Awaitable, Callable from typing import Any @@ -14,6 +15,8 @@ import asyncpg from central.config_models import AdapterConfig from central.crypto import decrypt, encrypt +logger = logging.getLogger(__name__) + async def _setup_json_codec(conn: asyncpg.Connection) -> None: """Set up JSON codec for asyncpg connection.""" @@ -188,36 +191,79 @@ class ConfigStore: Runs forever, calling callback(table, key) each time a change is detected. The callback can be sync or async. + On connection loss, automatically reconnects with exponential backoff. + Cancellation (via task.cancel()) propagates cleanly. + Args: callback: Function called with (table_name, row_key) on each change. """ - conn = await self._pool.acquire() - try: + backoff = 1.0 + max_backoff = 30.0 - def notification_handler( - conn: asyncpg.Connection, - pid: int, - channel: str, - payload: str, - ) -> None: - # payload format: "table_name:key" - if ":" in payload: - table, key = payload.split(":", 1) - else: - table, key = payload, "" + while True: + conn = None + try: + conn = await self._pool.acquire() + logger.info("Config listener connected to database") + backoff = 1.0 # Reset backoff on successful connect - result = callback(table, key) - if asyncio.iscoroutine(result): - asyncio.create_task(result) + def notification_handler( + conn: asyncpg.Connection, + pid: int, + channel: str, + payload: str, + ) -> None: + # payload format: "table_name:key" + if ":" in payload: + table, key = payload.split(":", 1) + else: + table, key = payload, "" - await conn.add_listener("config_changed", notification_handler) + result = callback(table, key) + if asyncio.iscoroutine(result): + asyncio.create_task(result) - # Keep connection alive - while True: - await asyncio.sleep(60) - # Periodic keepalive query - await conn.execute("SELECT 1") + await conn.add_listener("config_changed", notification_handler) - finally: - await conn.remove_listener("config_changed", notification_handler) - await self._pool.release(conn) + try: + # Keep connection alive with periodic keepalive + while True: + await asyncio.sleep(60) + await conn.execute("SELECT 1") + finally: + await conn.remove_listener("config_changed", notification_handler) + + except asyncio.CancelledError: + # Cancellation must propagate cleanly + logger.info("Config listener cancelled") + raise + + except ( + asyncpg.PostgresConnectionError, + asyncpg.InterfaceError, + ConnectionResetError, + OSError, + ) as e: + logger.warning( + "Config listener connection lost, reconnecting in %.1fs: %s", + backoff, + e, + ) + await asyncio.sleep(backoff) + backoff = min(backoff * 2, max_backoff) + + except Exception as e: + # Unexpected error - log and retry with backoff + logger.exception( + "Config listener unexpected error, reconnecting in %.1fs", + backoff, + ) + await asyncio.sleep(backoff) + backoff = min(backoff * 2, max_backoff) + + finally: + if conn is not None: + try: + await self._pool.release(conn) + except Exception: + pass # Connection may already be invalid diff --git a/tests/test_config_store.py b/tests/test_config_store.py index 30ac9a8..7a627e8 100644 --- a/tests/test_config_store.py +++ b/tests/test_config_store.py @@ -1,7 +1,7 @@ """Tests for database-backed configuration store. These tests require a real Postgres database. Set CENTRAL_TEST_DB_DSN -environment variable or the tests will use the default test database. +environment variable to override the default test database connection. """ import asyncio @@ -16,10 +16,11 @@ import pytest_asyncio from central.config_store import ConfigStore from central.crypto import KEY_SIZE, clear_key_cache -# Test database DSN - uses central_test database +# Test database DSN - uses central_test database with well-known test password. +# Override via CENTRAL_TEST_DB_DSN env var if your test DB differs. TEST_DB_DSN = os.environ.get( "CENTRAL_TEST_DB_DSN", - "postgresql://central:3LNVFQJHsK3e7dOcAdvK3oS6d70f@localhost/central_test", + "postgresql://central_test:testpass@localhost/central_test", ) @@ -308,3 +309,31 @@ class TestNotifications: await listen_task except asyncio.CancelledError: pass + + +class TestListenerReconnect: + """Tests for listener reconnection on connection loss.""" + + @pytest.mark.asyncio + async def test_listener_cancellation_propagates( + self, config_store: ConfigStore + ) -> None: + """Cancellation cleanly stops the listener without reconnect loop.""" + async def callback(table: str, key: str) -> None: + pass + + listen_task = asyncio.create_task(config_store.listen_for_changes(callback)) + + # Give listener time to start + await asyncio.sleep(0.1) + + # Cancel and verify it stops + listen_task.cancel() + try: + await asyncio.wait_for(listen_task, timeout=2.0) + except asyncio.CancelledError: + pass # Expected + except asyncio.TimeoutError: + pytest.fail("Listener did not stop after cancellation") + + assert listen_task.cancelled() or listen_task.done()