feat(config_store): add listener reconnect with exponential backoff

Listener now automatically reconnects on connection loss with
exponential backoff (1s-30s). Cancellation propagates cleanly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-05-16 01:36:35 +00:00
commit b183a621bb
2 changed files with 108 additions and 33 deletions

View file

@ -6,6 +6,7 @@ Postgres LISTEN/NOTIFY for real-time config change notifications.
import asyncio import asyncio
import json import json
import logging
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import Any from typing import Any
@ -14,6 +15,8 @@ import asyncpg
from central.config_models import AdapterConfig from central.config_models import AdapterConfig
from central.crypto import decrypt, encrypt from central.crypto import decrypt, encrypt
logger = logging.getLogger(__name__)
async def _setup_json_codec(conn: asyncpg.Connection) -> None: async def _setup_json_codec(conn: asyncpg.Connection) -> None:
"""Set up JSON codec for asyncpg connection.""" """Set up JSON codec for asyncpg connection."""
@ -188,36 +191,79 @@ class ConfigStore:
Runs forever, calling callback(table, key) each time a change is Runs forever, calling callback(table, key) each time a change is
detected. The callback can be sync or async. detected. The callback can be sync or async.
On connection loss, automatically reconnects with exponential backoff.
Cancellation (via task.cancel()) propagates cleanly.
Args: Args:
callback: Function called with (table_name, row_key) on each change. callback: Function called with (table_name, row_key) on each change.
""" """
conn = await self._pool.acquire() backoff = 1.0
try: max_backoff = 30.0
def notification_handler( while True:
conn: asyncpg.Connection, conn = None
pid: int, try:
channel: str, conn = await self._pool.acquire()
payload: str, logger.info("Config listener connected to database")
) -> None: backoff = 1.0 # Reset backoff on successful connect
# payload format: "table_name:key"
if ":" in payload:
table, key = payload.split(":", 1)
else:
table, key = payload, ""
result = callback(table, key) def notification_handler(
if asyncio.iscoroutine(result): conn: asyncpg.Connection,
asyncio.create_task(result) pid: int,
channel: str,
payload: str,
) -> None:
# payload format: "table_name:key"
if ":" in payload:
table, key = payload.split(":", 1)
else:
table, key = payload, ""
await conn.add_listener("config_changed", notification_handler) result = callback(table, key)
if asyncio.iscoroutine(result):
asyncio.create_task(result)
# Keep connection alive await conn.add_listener("config_changed", notification_handler)
while True:
await asyncio.sleep(60)
# Periodic keepalive query
await conn.execute("SELECT 1")
finally: try:
await conn.remove_listener("config_changed", notification_handler) # Keep connection alive with periodic keepalive
await self._pool.release(conn) while True:
await asyncio.sleep(60)
await conn.execute("SELECT 1")
finally:
await conn.remove_listener("config_changed", notification_handler)
except asyncio.CancelledError:
# Cancellation must propagate cleanly
logger.info("Config listener cancelled")
raise
except (
asyncpg.PostgresConnectionError,
asyncpg.InterfaceError,
ConnectionResetError,
OSError,
) as e:
logger.warning(
"Config listener connection lost, reconnecting in %.1fs: %s",
backoff,
e,
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
except Exception as e:
# Unexpected error - log and retry with backoff
logger.exception(
"Config listener unexpected error, reconnecting in %.1fs",
backoff,
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
finally:
if conn is not None:
try:
await self._pool.release(conn)
except Exception:
pass # Connection may already be invalid

View file

@ -1,7 +1,7 @@
"""Tests for database-backed configuration store. """Tests for database-backed configuration store.
These tests require a real Postgres database. Set CENTRAL_TEST_DB_DSN These tests require a real Postgres database. Set CENTRAL_TEST_DB_DSN
environment variable or the tests will use the default test database. environment variable to override the default test database connection.
""" """
import asyncio import asyncio
@ -16,10 +16,11 @@ import pytest_asyncio
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.crypto import KEY_SIZE, clear_key_cache from central.crypto import KEY_SIZE, clear_key_cache
# Test database DSN - uses central_test database # Test database DSN - uses central_test database with well-known test password.
# Override via CENTRAL_TEST_DB_DSN env var if your test DB differs.
TEST_DB_DSN = os.environ.get( TEST_DB_DSN = os.environ.get(
"CENTRAL_TEST_DB_DSN", "CENTRAL_TEST_DB_DSN",
"postgresql://central:3LNVFQJHsK3e7dOcAdvK3oS6d70f@localhost/central_test", "postgresql://central_test:testpass@localhost/central_test",
) )
@ -308,3 +309,31 @@ class TestNotifications:
await listen_task await listen_task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
class TestListenerReconnect:
"""Tests for listener reconnection on connection loss."""
@pytest.mark.asyncio
async def test_listener_cancellation_propagates(
self, config_store: ConfigStore
) -> None:
"""Cancellation cleanly stops the listener without reconnect loop."""
async def callback(table: str, key: str) -> None:
pass
listen_task = asyncio.create_task(config_store.listen_for_changes(callback))
# Give listener time to start
await asyncio.sleep(0.1)
# Cancel and verify it stops
listen_task.cancel()
try:
await asyncio.wait_for(listen_task, timeout=2.0)
except asyncio.CancelledError:
pass # Expected
except asyncio.TimeoutError:
pytest.fail("Listener did not stop after cancellation")
assert listen_task.cancelled() or listen_task.done()