"""Integration tests for Supervisor hot-reload with enable/disable/enable flow. These tests exercise the actual Supervisor._on_config_change code path, not just AdapterState math in isolation. They verify the rate-limit guarantee is maintained across adapter stop/start cycles. IMPORTANT: These tests are designed to: - FAIL on unfixed code (Test B fails because last_completed_poll is lost) - PASS on fixed code (last_completed_poll is preserved across disable/enable) """ import asyncio import base64 import os from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest import pytest_asyncio from central.config_models import AdapterConfig from central.bootstrap_config import get_settings from central.crypto import KEY_SIZE, clear_key_cache def adapter_is_running(state) -> bool: """Check if adapter is running (compatible with both fixed and unfixed code).""" # Fixed code has is_running property; unfixed checks task directly if hasattr(state, 'is_running'): return state.is_running return state.task is not None and not state.task.done() async def cleanup_adapter(supervisor, name: str) -> None: """Clean up adapter (compatible with both fixed and unfixed code).""" # Fixed code has _remove_adapter; unfixed uses _stop_adapter which pops if hasattr(supervisor, '_remove_adapter'): await supervisor._remove_adapter(name) else: await supervisor._stop_adapter(name) # Test database DSN TEST_DB_DSN = os.environ.get( "CENTRAL_TEST_DB_DSN", "postgresql://central_test:testpass@localhost/central_test", ) @pytest.fixture(scope="session") def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path: """Create a master key file for the test session.""" key = os.urandom(KEY_SIZE) key_path = tmp_path_factory.mktemp("keys") / "master.key" key_path.write_text(base64.b64encode(key).decode()) return key_path @pytest.fixture(autouse=True) def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch): """Configure master key path for all tests. Clear get_settings (and the crypto key cache) AFTER setting the env so crypto rebuilds from the test key regardless of suite order, and again on teardown so the test key never leaks into a later test. See PR M-b. """ monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN) monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path)) clear_key_cache() get_settings.cache_clear() yield clear_key_cache() get_settings.cache_clear() class MockConfigSource: """Mock ConfigSource for testing Supervisor without DB.""" def __init__(self) -> None: self._adapters: dict[str, AdapterConfig] = {} def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None: """Set or remove an adapter config.""" if config is None: if name: self._adapters.pop(name, None) else: self._adapters[config.name] = config async def list_enabled_adapters(self) -> list[AdapterConfig]: return [a for a in self._adapters.values() if a.enabled and not a.is_paused] async def get_adapter(self, name: str) -> AdapterConfig | None: return self._adapters.get(name) async def watch_for_changes(self, callback) -> None: # No-op for testing return async def close(self) -> None: pass class MockNWSAdapter: """Mock NWSAdapter that tracks poll calls and allows control.""" requires_api_key = None # Mock adapters don't require API keys def __init__(self, config, config_store, cursor_db_path) -> None: self.config = config self._config_store = config_store self.cadence_s = config.cadence_s self.states = set(s.upper() for s in config.settings.get("states", [])) self.poll_count = 0 self.poll_times: list[datetime] = [] self._shutdown = False async def startup(self) -> None: pass async def shutdown(self) -> None: self._shutdown = True async def apply_config(self, config: AdapterConfig) -> None: """Apply new configuration.""" self.config = config self.cadence_s = config.cadence_s self.states = set(s.upper() for s in config.settings.get("states", [])) async def poll(self): """Yield nothing - we just track that poll was called.""" self.poll_count += 1 self.poll_times.append(datetime.now(timezone.utc)) return yield # Make this an async generator def is_published(self, event_id: str) -> bool: return False def mark_published(self, event_id: str) -> None: pass def bump_last_seen(self, event_id: str) -> None: pass def sweep_old_ids(self) -> int: return 0 @pytest.fixture def mock_nats(): """Mock NATS connection. nats-py's `nc.jetstream()` is synchronous, so model it with a sync MagicMock. (As an AsyncMock attribute, `supervisor._js = nc.jetstream()` would assign an unawaited coroutine — the "coroutine ... was never awaited" warning — rather than the JetStream mock.) """ mock_nc = AsyncMock() mock_nc.publish = AsyncMock() mock_js = AsyncMock() mock_js.publish = AsyncMock() mock_nc.jetstream = MagicMock(return_value=mock_js) return mock_nc @pytest.fixture def mock_config_store(): """Mock ConfigStore for testing Supervisor.""" store = MagicMock() store.list_streams = AsyncMock(return_value=[]) store.get_stream = AsyncMock(return_value=None) store.set_adapter_last_error = AsyncMock() store.get_api_key = AsyncMock(return_value=None) return store class TestEnableDisableEnableIntegration: """Integration tests for enable→disable→enable flow through Supervisor. These tests verify that _on_config_change → _stop_adapter → _start_adapter preserves last_completed_poll correctly. """ @pytest.mark.asyncio async def test_enable_disable_enable_gap_longer_than_cadence( self, mock_nats, mock_config_store, tmp_path: Path ) -> None: """Test A: Re-enable after gap longer than cadence polls immediately. - Start adapter (cadence 60s) - Simulate completed poll 5 minutes ago - Disable adapter - Re-enable adapter - Assert next poll fires immediately (last+cadence is in past) - Assert exactly ONE poll happens, not multiple catch-up """ from central.supervisor import Supervisor, AdapterState config_source = MockConfigSource() initial_config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(initial_config) supervisor = Supervisor( config_source=config_source, config_store=mock_config_store, nats_url="nats://localhost:4222", cloudevents_config=None, ) # Mock NATS connection supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() # Patch NWSAdapter to use our mock # Inject mock adapter into supervisor's registry supervisor._adapters["nws"] = MockNWSAdapter # Start supervisor (starts adapter) await supervisor._start_adapter(initial_config) state = supervisor._adapter_states.get("nws") assert state is not None assert adapter_is_running(state) # Simulate completed poll 5 minutes ago state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) saved_last_poll = state.last_completed_poll # Disable adapter disabled_config = AdapterConfig( name="nws", enabled=False, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(disabled_config) await supervisor._on_config_change("adapters", "nws") # Verify stopped but state preserved (THIS IS THE KEY CHECK) # On unfixed code, state will be NONE because pop() removes it # On fixed code, state still exists with is_running=False state = supervisor._adapter_states.get("nws") assert state is not None, ( "State was removed on stop! This violates the rate-limit guarantee. " "State should be preserved to maintain last_completed_poll." ) assert not adapter_is_running(state) assert state.last_completed_poll == saved_last_poll # Re-enable adapter reenabled_config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(reenabled_config) await supervisor._on_config_change("adapters", "nws") # Verify restarted with preserved last_completed_poll state = supervisor._adapter_states.get("nws") assert state is not None assert adapter_is_running(state) assert state.last_completed_poll == saved_last_poll # The loop should detect that last_poll + cadence is in the past # and poll immediately. Let's verify by checking the wait time logic. now = datetime.now(timezone.utc) next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s wait_time = max(0, next_poll_at - now.timestamp()) # last_poll was 5 minutes ago, cadence is 60s # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago # wait_time should be 0 (poll immediately) assert wait_time == 0, ( f"Expected immediate poll (wait=0), got wait={wait_time}s. " f"last_poll was {saved_last_poll}, now is {now}" ) # Cleanup supervisor._shutdown_event.set() await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_enable_gap_shorter_than_cadence( self, mock_nats, mock_config_store, tmp_path: Path ) -> None: """Test B: Re-enable after gap shorter than cadence respects rate limit. THIS IS THE KEY TEST that failed before the fix. - Start adapter (cadence 60s) - Simulate completed poll 10 seconds ago - Disable adapter - Re-enable adapter 20 seconds later (still within cadence window) - Assert next poll fires at last_poll + 60s, NOT immediately """ from central.supervisor import Supervisor, AdapterState config_source = MockConfigSource() initial_config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(initial_config) supervisor = Supervisor( config_source=config_source, config_store=mock_config_store, nats_url="nats://localhost:4222", cloudevents_config=None, ) supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() # Inject mock adapter into supervisor's registry supervisor._adapters["nws"] = MockNWSAdapter # Start adapter await supervisor._start_adapter(initial_config) state = supervisor._adapter_states.get("nws") assert state is not None # Simulate completed poll 10 seconds ago state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) saved_last_poll = state.last_completed_poll # Disable adapter disabled_config = AdapterConfig( name="nws", enabled=False, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(disabled_config) await supervisor._on_config_change("adapters", "nws") # Verify stopped but state preserved (THIS IS THE KEY CHECK) # On unfixed code, state will be NONE because pop() removes it # On fixed code, state still exists with is_running=False state = supervisor._adapter_states.get("nws") assert state is not None, ( "State was removed on stop! This violates the rate-limit guarantee. " "State should be preserved to maintain last_completed_poll." ) assert not adapter_is_running(state) assert state.last_completed_poll == saved_last_poll # Re-enable adapter (simulate 20 seconds later, but we're just # checking the rate limit logic) reenabled_config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(reenabled_config) await supervisor._on_config_change("adapters", "nws") # Verify restarted with preserved last_completed_poll state = supervisor._adapter_states.get("nws") assert state is not None assert adapter_is_running(state) assert state.last_completed_poll == saved_last_poll # The loop should detect that last_poll + cadence is still in the future # and wait until then. now = datetime.now(timezone.utc) next_poll_at = saved_last_poll.timestamp() + 60 wait_time = max(0, next_poll_at - now.timestamp()) # last_poll was ~10 seconds ago, cadence is 60s # wait_time should be ~50s (60 - 10 = 50) assert 45 < wait_time < 55, ( f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " f"Rate limit violated: poll would happen before last_poll + cadence" ) # Cleanup supervisor._shutdown_event.set() await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_delete_readd_fresh_state( self, mock_nats, mock_config_store, tmp_path: Path ) -> None: """Test C: Delete then re-add clears preserved state. - Start adapter - Simulate completed poll - Disable adapter - DELETE adapter from DB (not just disable) - Re-add adapter with same name - Assert preserved timestamp is dropped (fresh adapter, immediate poll) """ from central.supervisor import Supervisor config_source = MockConfigSource() initial_config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(initial_config) supervisor = Supervisor( config_source=config_source, config_store=mock_config_store, nats_url="nats://localhost:4222", cloudevents_config=None, ) supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() # Inject mock adapter into supervisor's registry supervisor._adapters["nws"] = MockNWSAdapter # Start adapter await supervisor._start_adapter(initial_config) state = supervisor._adapter_states.get("nws") assert state is not None # Simulate completed poll 10 seconds ago state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) # Disable adapter disabled_config = AdapterConfig( name="nws", enabled=False, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(disabled_config) await supervisor._on_config_change("adapters", "nws") # DELETE adapter from DB (remove from config source) config_source.set_adapter(None, name="nws") await supervisor._on_config_change("adapters", "nws") # Verify adapter fully removed assert "nws" not in supervisor._adapter_states # Re-add adapter with same name new_config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(new_config) await supervisor._on_config_change("adapters", "nws") # Verify new adapter started fresh state = supervisor._adapter_states.get("nws") assert state is not None assert adapter_is_running(state) # last_completed_poll should be None (fresh adapter) assert state.last_completed_poll is None, ( f"Expected None (fresh adapter), got {state.last_completed_poll}. " f"Preserved state not cleared on delete." ) # Cleanup supervisor._shutdown_event.set() await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_stop_preserves_state_start_reuses_it( self, mock_nats, mock_config_store, tmp_path: Path ) -> None: """Verify _stop_adapter preserves state and _start_adapter reuses it.""" from central.supervisor import Supervisor config_source = MockConfigSource() config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(config) supervisor = Supervisor( config_source=config_source, config_store=mock_config_store, nats_url="nats://localhost:4222", cloudevents_config=None, ) supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() # Inject mock adapter into supervisor's registry supervisor._adapters["nws"] = MockNWSAdapter # Start adapter await supervisor._start_adapter(config) state = supervisor._adapter_states.get("nws") state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) saved_poll = state.last_completed_poll # Stop adapter await supervisor._stop_adapter("nws") # State should still exist assert "nws" in supervisor._adapter_states state = supervisor._adapter_states["nws"] assert not adapter_is_running(state) assert state.last_completed_poll == saved_poll # Restart adapter await supervisor._start_adapter(config) # Should reuse existing state state = supervisor._adapter_states.get("nws") assert adapter_is_running(state) assert state.last_completed_poll == saved_poll # Cleanup supervisor._shutdown_event.set() await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_remove_adapter_clears_state( self, mock_nats, mock_config_store, tmp_path: Path ) -> None: """Verify _remove_adapter fully clears state.""" from central.supervisor import Supervisor config_source = MockConfigSource() config = AdapterConfig( name="nws", enabled=True, cadence_s=60, settings={"states": ["ID"], "contact_email": "test@test.com"}, paused_at=None, updated_at=datetime.now(timezone.utc), ) config_source.set_adapter(config) supervisor = Supervisor( config_source=config_source, config_store=mock_config_store, nats_url="nats://localhost:4222", cloudevents_config=None, ) supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() # Inject mock adapter into supervisor's registry supervisor._adapters["nws"] = MockNWSAdapter await supervisor._start_adapter(config) state = supervisor._adapter_states.get("nws") state.last_completed_poll = datetime.now(timezone.utc) # Remove adapter await cleanup_adapter(supervisor, "nws") # State should be gone assert "nws" not in supervisor._adapter_states def test_enrichment_cache_path_is_hermetic(mock_config_store, tmp_path: Path) -> None: """No test may touch the production enrichment cache. The autouse `isolate_enrichment_cache` fixture (conftest) must redirect ENRICHMENT_CACHE_DB_PATH off /var/lib/central onto a per-test temp dir, and constructing a Supervisor must open the cache there — not in production. """ import central.supervisor as supervisor_mod patched = supervisor_mod.ENRICHMENT_CACHE_DB_PATH assert tmp_path in patched.parents assert "/var/lib/central" not in str(patched) supervisor = supervisor_mod.Supervisor( config_source=MockConfigSource(), config_store=mock_config_store, nats_url="nats://localhost:4222", cloudevents_config=None, ) # __init__ opened the cache at the temp path, leaving the db file behind. assert patched.exists() assert supervisor._enrichment_cache is not None