"""Tests for database-backed configuration store. These tests require a real Postgres database. Set CENTRAL_TEST_DB_DSN environment variable to override the default test database connection. """ import asyncio import base64 import os from pathlib import Path import asyncpg import pytest import pytest_asyncio from central.bootstrap_config import get_settings from central.config_store import ConfigStore from central.crypto import KEY_SIZE, clear_key_cache # Test database DSN - uses central_test database with well-known test password. # Override via CENTRAL_TEST_DB_DSN env var if your test DB differs. TEST_DB_DSN = os.environ.get( "CENTRAL_TEST_DB_DSN", "postgresql://central_test:testpass@localhost/central_test", ) @pytest.fixture(scope="session") def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path: """Create a master key file for the test session.""" key = os.urandom(KEY_SIZE) key_path = tmp_path_factory.mktemp("keys") / "master.key" key_path.write_text(base64.b64encode(key).decode()) return key_path @pytest.fixture(autouse=True) def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch): """Configure master key path for all tests. CENTRAL_MASTER_KEY_PATH feeds Settings, which get_settings() lru-caches. An earlier test can warm that cache with the default /etc/central/master.key before this fixture runs, so the env change alone is not enough — clear get_settings (and the crypto key cache) AFTER setting the env so crypto rebuilds from the test key regardless of suite order, and again on teardown so the test key never leaks into a later test. """ monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN) monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path)) monkeypatch.setenv("CENTRAL_CSRF_SECRET", "test-csrf-secret-for-testing-only-32chars") clear_key_cache() get_settings.cache_clear() yield clear_key_cache() get_settings.cache_clear() @pytest_asyncio.fixture async def db_conn() -> asyncpg.Connection: """Get a direct database connection for setup/teardown.""" conn = await asyncpg.connect(TEST_DB_DSN) yield conn await conn.close() @pytest_asyncio.fixture async def clean_config_schema(db_conn: asyncpg.Connection) -> None: """Ensure config schema exists and is clean before each test.""" # Create schema if not exists await db_conn.execute("CREATE SCHEMA IF NOT EXISTS config") # Create tables if not exist await db_conn.execute(""" CREATE TABLE IF NOT EXISTS config.adapters ( name TEXT PRIMARY KEY, enabled BOOLEAN NOT NULL DEFAULT true, cadence_s INTEGER NOT NULL, settings JSONB NOT NULL DEFAULT '{}'::jsonb, paused_at TIMESTAMPTZ, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) await db_conn.execute(""" CREATE TABLE IF NOT EXISTS config.api_keys ( alias TEXT PRIMARY KEY, encrypted_value BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), rotated_at TIMESTAMPTZ, last_used_at TIMESTAMPTZ ) """) # Create notify function with proper key detection await db_conn.execute(""" CREATE OR REPLACE FUNCTION config.notify_config_change() RETURNS trigger AS $$ DECLARE key_value TEXT; BEGIN IF TG_TABLE_NAME = 'adapters' THEN key_value := COALESCE(NEW.name, OLD.name, ''); ELSIF TG_TABLE_NAME = 'api_keys' THEN key_value := COALESCE(NEW.alias, OLD.alias, ''); ELSE key_value := ''; END IF; PERFORM pg_notify('config_changed', TG_TABLE_NAME || ':' || key_value); RETURN COALESCE(NEW, OLD); END; $$ LANGUAGE plpgsql """) # Create triggers if not exist await db_conn.execute(""" DROP TRIGGER IF EXISTS adapters_notify ON config.adapters; CREATE TRIGGER adapters_notify AFTER INSERT OR UPDATE OR DELETE ON config.adapters FOR EACH ROW EXECUTE FUNCTION config.notify_config_change() """) await db_conn.execute(""" DROP TRIGGER IF EXISTS api_keys_notify ON config.api_keys; CREATE TRIGGER api_keys_notify AFTER INSERT OR UPDATE OR DELETE ON config.api_keys FOR EACH ROW EXECUTE FUNCTION config.notify_config_change() """) # Clean tables await db_conn.execute("DELETE FROM config.adapters") await db_conn.execute("DELETE FROM config.api_keys") @pytest_asyncio.fixture async def config_store(clean_config_schema: None) -> ConfigStore: """Create a ConfigStore connected to the test database.""" store = await ConfigStore.create(TEST_DB_DSN) yield store await store.close() class TestAdapterConfig: """Tests for adapter configuration operations.""" @pytest.mark.asyncio async def test_upsert_and_get(self, config_store: ConfigStore) -> None: """Can insert and retrieve adapter config.""" await config_store.upsert_adapter( name="test_adapter", enabled=True, cadence_s=120, settings={"key": "value"}, ) adapter = await config_store.get_adapter("test_adapter") assert adapter is not None assert adapter.name == "test_adapter" assert adapter.enabled is True assert adapter.cadence_s == 120 assert adapter.settings == {"key": "value"} @pytest.mark.asyncio async def test_get_nonexistent(self, config_store: ConfigStore) -> None: """Getting nonexistent adapter returns None.""" adapter = await config_store.get_adapter("does_not_exist") assert adapter is None @pytest.mark.asyncio async def test_list_adapters(self, config_store: ConfigStore) -> None: """Can list all adapters.""" await config_store.upsert_adapter("adapter_a", True, 60, {}) await config_store.upsert_adapter("adapter_b", False, 300, {"x": 1}) adapters = await config_store.list_adapters() assert len(adapters) == 2 names = [a.name for a in adapters] assert "adapter_a" in names assert "adapter_b" in names @pytest.mark.asyncio async def test_upsert_updates_existing(self, config_store: ConfigStore) -> None: """Upsert updates existing adapter.""" await config_store.upsert_adapter("updater", True, 60, {"v": 1}) await config_store.upsert_adapter("updater", False, 120, {"v": 2}) adapter = await config_store.get_adapter("updater") assert adapter is not None assert adapter.enabled is False assert adapter.cadence_s == 120 assert adapter.settings == {"v": 2} @pytest.mark.asyncio async def test_pause_unpause(self, config_store: ConfigStore) -> None: """Can pause and unpause adapter.""" await config_store.upsert_adapter("pausable", True, 60, {}) await config_store.pause_adapter("pausable") adapter = await config_store.get_adapter("pausable") assert adapter is not None assert adapter.is_paused is True await config_store.unpause_adapter("pausable") adapter = await config_store.get_adapter("pausable") assert adapter is not None assert adapter.is_paused is False class TestApiKeys: """Tests for API key operations.""" @pytest.mark.asyncio async def test_set_and_get_key(self, config_store: ConfigStore) -> None: """Can store and retrieve encrypted API key.""" await config_store.set_api_key("test_key", "super_secret_value") value = await config_store.get_api_key("test_key") assert value == "super_secret_value" @pytest.mark.asyncio async def test_get_nonexistent_key(self, config_store: ConfigStore) -> None: """Getting nonexistent key returns None.""" value = await config_store.get_api_key("does_not_exist") assert value is None @pytest.mark.asyncio async def test_key_rotation(self, config_store: ConfigStore) -> None: """Updating key sets rotated_at.""" await config_store.set_api_key("rotate_me", "value1") await config_store.set_api_key("rotate_me", "value2") value = await config_store.get_api_key("rotate_me") assert value == "value2" @pytest.mark.asyncio async def test_delete_key(self, config_store: ConfigStore) -> None: """Can delete API key.""" await config_store.set_api_key("delete_me", "value") deleted = await config_store.delete_api_key("delete_me") assert deleted is True value = await config_store.get_api_key("delete_me") assert value is None @pytest.mark.asyncio async def test_delete_nonexistent(self, config_store: ConfigStore) -> None: """Deleting nonexistent key returns False.""" deleted = await config_store.delete_api_key("never_existed") assert deleted is False class TestNotifications: """Tests for LISTEN/NOTIFY functionality.""" @pytest.mark.asyncio async def test_notify_on_adapter_change(self, config_store: ConfigStore) -> None: """NOTIFY fires when adapter is changed.""" notifications: list[tuple[str, str]] = [] notification_received = asyncio.Event() async def callback(table: str, key: str) -> None: notifications.append((table, key)) notification_received.set() # Start listener in background listen_task = asyncio.create_task(config_store.listen_for_changes(callback)) try: # Give listener time to subscribe await asyncio.sleep(0.1) # Trigger a change await config_store.upsert_adapter("notify_test", True, 60, {}) # Wait for notification (with timeout) try: await asyncio.wait_for(notification_received.wait(), timeout=5.0) except asyncio.TimeoutError: pytest.fail("Notification not received within timeout") assert len(notifications) >= 1 assert notifications[0][0] == "adapters" assert notifications[0][1] == "notify_test" finally: listen_task.cancel() try: await listen_task except asyncio.CancelledError: pass @pytest.mark.asyncio async def test_notify_on_api_key_change(self, config_store: ConfigStore) -> None: """NOTIFY fires when API key is changed.""" notifications: list[tuple[str, str]] = [] notification_received = asyncio.Event() async def callback(table: str, key: str) -> None: notifications.append((table, key)) notification_received.set() listen_task = asyncio.create_task(config_store.listen_for_changes(callback)) try: await asyncio.sleep(0.1) await config_store.set_api_key("notify_key", "secret") try: await asyncio.wait_for(notification_received.wait(), timeout=5.0) except asyncio.TimeoutError: pytest.fail("Notification not received within timeout") assert len(notifications) >= 1 assert notifications[0][0] == "api_keys" assert notifications[0][1] == "notify_key" finally: listen_task.cancel() try: await listen_task except asyncio.CancelledError: pass class TestListenerReconnect: """Tests for listener reconnection on connection loss.""" @pytest.mark.asyncio async def test_listener_cancellation_propagates( self, config_store: ConfigStore ) -> None: """Cancellation cleanly stops the listener without reconnect loop.""" async def callback(table: str, key: str) -> None: pass listen_task = asyncio.create_task(config_store.listen_for_changes(callback)) # Give listener time to start await asyncio.sleep(0.1) # Cancel and verify it stops listen_task.cancel() try: await asyncio.wait_for(listen_task, timeout=2.0) except asyncio.CancelledError: pass # Expected except asyncio.TimeoutError: pytest.fail("Listener did not stop after cancellation") assert listen_task.cancelled() or listen_task.done() def test_master_key_path_is_isolated(master_key_path: Path) -> None: """Contract: after setup_master_key runs, get_settings() resolves the master key to the per-session test key — never the production /etc/central path — regardless of suite order. Fails on the pre-fix code in a full-suite run where get_settings was warmed with the default path by an earlier test. """ assert get_settings().master_key_path == master_key_path assert get_settings().master_key_path != Path("/etc/central/master.key")