central/tests/test_supervisor_integration.py

616 lines
22 KiB
Python
Raw Permalink Normal View History

2026-05-16 21:27:30 +00:00
"""Integration tests for Supervisor hot-reload with enable/disable/enable flow.
These tests exercise the actual Supervisor._on_config_change code path,
not just AdapterState math in isolation. They verify the rate-limit
guarantee is maintained across adapter stop/start cycles.
IMPORTANT: These tests are designed to:
- FAIL on unfixed code (Test B fails because last_completed_poll is lost)
- PASS on fixed code (last_completed_poll is preserved across disable/enable)
"""
import asyncio
import base64
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from central.config_models import AdapterConfig
from central.bootstrap_config import get_settings
2026-05-16 21:27:30 +00:00
from central.crypto import KEY_SIZE, clear_key_cache
def adapter_is_running(state) -> bool:
"""Check if adapter is running (compatible with both fixed and unfixed code)."""
# Fixed code has is_running property; unfixed checks task directly
if hasattr(state, 'is_running'):
return state.is_running
return state.task is not None and not state.task.done()
async def cleanup_adapter(supervisor, name: str) -> None:
"""Clean up adapter (compatible with both fixed and unfixed code)."""
# Fixed code has _remove_adapter; unfixed uses _stop_adapter which pops
if hasattr(supervisor, '_remove_adapter'):
await supervisor._remove_adapter(name)
else:
await supervisor._stop_adapter(name)
# Test database DSN
TEST_DB_DSN = os.environ.get(
"CENTRAL_TEST_DB_DSN",
"postgresql://central_test:testpass@localhost/central_test",
)
@pytest.fixture(scope="session")
def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
"""Create a master key file for the test session."""
key = os.urandom(KEY_SIZE)
key_path = tmp_path_factory.mktemp("keys") / "master.key"
key_path.write_text(base64.b64encode(key).decode())
return key_path
@pytest.fixture(autouse=True)
def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Configure master key path for all tests.
Clear get_settings (and the crypto key cache) AFTER setting the env so
crypto rebuilds from the test key regardless of suite order, and again on
teardown so the test key never leaks into a later test. See PR M-b.
"""
2026-05-16 21:27:30 +00:00
monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN)
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
clear_key_cache()
get_settings.cache_clear()
yield
clear_key_cache()
get_settings.cache_clear()
2026-05-16 21:27:30 +00:00
class MockConfigSource:
"""Mock ConfigSource for testing Supervisor without DB."""
def __init__(self) -> None:
self._adapters: dict[str, AdapterConfig] = {}
def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None:
"""Set or remove an adapter config."""
if config is None:
if name:
self._adapters.pop(name, None)
else:
self._adapters[config.name] = config
async def list_enabled_adapters(self) -> list[AdapterConfig]:
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
return self._adapters.get(name)
async def watch_for_changes(self, callback) -> None:
# No-op for testing
return
async def close(self) -> None:
pass
class MockNWSAdapter:
"""Mock NWSAdapter that tracks poll calls and allows control."""
requires_api_key = None # Mock adapters don't require API keys
def __init__(self, config, config_store, cursor_db_path) -> None:
2026-05-16 21:27:30 +00:00
self.config = config
self._config_store = config_store
2026-05-16 21:27:30 +00:00
self.cadence_s = config.cadence_s
self.states = set(s.upper() for s in config.settings.get("states", []))
2026-05-16 21:27:30 +00:00
self.poll_count = 0
self.poll_times: list[datetime] = []
self._shutdown = False
async def startup(self) -> None:
pass
async def shutdown(self) -> None:
self._shutdown = True
async def apply_config(self, config: AdapterConfig) -> None:
"""Apply new configuration."""
self.config = config
self.cadence_s = config.cadence_s
self.states = set(s.upper() for s in config.settings.get("states", []))
2026-05-16 21:27:30 +00:00
async def poll(self):
"""Yield nothing - we just track that poll was called."""
self.poll_count += 1
self.poll_times.append(datetime.now(timezone.utc))
return
yield # Make this an async generator
def is_published(self, event_id: str) -> bool:
return False
def mark_published(self, event_id: str) -> None:
pass
def bump_last_seen(self, event_id: str) -> None:
pass
def sweep_old_ids(self) -> int:
return 0
@pytest.fixture
def mock_nats():
"""Mock NATS connection.
nats-py's `nc.jetstream()` is synchronous, so model it with a sync
MagicMock. (As an AsyncMock attribute, `supervisor._js = nc.jetstream()`
would assign an unawaited coroutine the "coroutine ... was never awaited"
warning rather than the JetStream mock.)
"""
2026-05-16 21:27:30 +00:00
mock_nc = AsyncMock()
mock_nc.publish = AsyncMock()
mock_js = AsyncMock()
mock_js.publish = AsyncMock()
mock_nc.jetstream = MagicMock(return_value=mock_js)
2026-05-16 21:27:30 +00:00
return mock_nc
@pytest.fixture
def mock_config_store():
"""Mock ConfigStore for testing Supervisor."""
store = MagicMock()
store.list_streams = AsyncMock(return_value=[])
store.get_stream = AsyncMock(return_value=None)
store.set_adapter_last_error = AsyncMock()
store.get_api_key = AsyncMock(return_value=None)
return store
2026-05-16 21:27:30 +00:00
class TestEnableDisableEnableIntegration:
"""Integration tests for enable→disable→enable flow through Supervisor.
These tests verify that _on_config_change _stop_adapter _start_adapter
preserves last_completed_poll correctly.
"""
@pytest.mark.asyncio
async def test_enable_disable_enable_gap_longer_than_cadence(
self, mock_nats, mock_config_store, tmp_path: Path
2026-05-16 21:27:30 +00:00
) -> None:
"""Test A: Re-enable after gap longer than cadence polls immediately.
- Start adapter (cadence 60s)
- Simulate completed poll 5 minutes ago
- Disable adapter
- Re-enable adapter
- Assert next poll fires immediately (last+cadence is in past)
- Assert exactly ONE poll happens, not multiple catch-up
"""
from central.supervisor import Supervisor, AdapterState
config_source = MockConfigSource()
initial_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(initial_config)
supervisor = Supervisor(
config_source=config_source,
config_store=mock_config_store,
2026-05-16 21:27:30 +00:00
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
# Mock NATS connection
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Patch NWSAdapter to use our mock
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start supervisor (starts adapter)
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
# Simulate completed poll 5 minutes ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
saved_last_poll = state.last_completed_poll
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws")
assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll."
)
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Re-enable adapter
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# The loop should detect that last_poll + cadence is in the past
# and poll immediately. Let's verify by checking the wait time logic.
now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
wait_time = max(0, next_poll_at - now.timestamp())
# last_poll was 5 minutes ago, cadence is 60s
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
# wait_time should be 0 (poll immediately)
assert wait_time == 0, (
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
f"last_poll was {saved_last_poll}, now is {now}"
)
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
2026-05-16 21:27:30 +00:00
@pytest.mark.asyncio
async def test_enable_disable_enable_gap_shorter_than_cadence(
self, mock_nats, mock_config_store, tmp_path: Path
2026-05-16 21:27:30 +00:00
) -> None:
"""Test B: Re-enable after gap shorter than cadence respects rate limit.
THIS IS THE KEY TEST that failed before the fix.
- Start adapter (cadence 60s)
- Simulate completed poll 10 seconds ago
- Disable adapter
- Re-enable adapter 20 seconds later (still within cadence window)
- Assert next poll fires at last_poll + 60s, NOT immediately
"""
from central.supervisor import Supervisor, AdapterState
config_source = MockConfigSource()
initial_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(initial_config)
supervisor = Supervisor(
config_source=config_source,
config_store=mock_config_store,
2026-05-16 21:27:30 +00:00
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start adapter
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
# Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
saved_last_poll = state.last_completed_poll
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws")
assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll."
)
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Re-enable adapter (simulate 20 seconds later, but we're just
# checking the rate limit logic)
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# The loop should detect that last_poll + cadence is still in the future
# and wait until then.
now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60
wait_time = max(0, next_poll_at - now.timestamp())
# last_poll was ~10 seconds ago, cadence is 60s
# wait_time should be ~50s (60 - 10 = 50)
assert 45 < wait_time < 55, (
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
f"Rate limit violated: poll would happen before last_poll + cadence"
)
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
2026-05-16 21:27:30 +00:00
@pytest.mark.asyncio
async def test_enable_disable_delete_readd_fresh_state(
self, mock_nats, mock_config_store, tmp_path: Path
2026-05-16 21:27:30 +00:00
) -> None:
"""Test C: Delete then re-add clears preserved state.
- Start adapter
- Simulate completed poll
- Disable adapter
- DELETE adapter from DB (not just disable)
- Re-add adapter with same name
- Assert preserved timestamp is dropped (fresh adapter, immediate poll)
"""
from central.supervisor import Supervisor
config_source = MockConfigSource()
initial_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(initial_config)
supervisor = Supervisor(
config_source=config_source,
config_store=mock_config_store,
2026-05-16 21:27:30 +00:00
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start adapter
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
# Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# DELETE adapter from DB (remove from config source)
config_source.set_adapter(None, name="nws")
await supervisor._on_config_change("adapters", "nws")
# Verify adapter fully removed
assert "nws" not in supervisor._adapter_states
# Re-add adapter with same name
new_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(new_config)
await supervisor._on_config_change("adapters", "nws")
# Verify new adapter started fresh
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
# last_completed_poll should be None (fresh adapter)
assert state.last_completed_poll is None, (
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
f"Preserved state not cleared on delete."
)
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
2026-05-16 21:27:30 +00:00
@pytest.mark.asyncio
async def test_stop_preserves_state_start_reuses_it(
self, mock_nats, mock_config_store, tmp_path: Path
2026-05-16 21:27:30 +00:00
) -> None:
"""Verify _stop_adapter preserves state and _start_adapter reuses it."""
from central.supervisor import Supervisor
config_source = MockConfigSource()
config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(config)
supervisor = Supervisor(
config_source=config_source,
config_store=mock_config_store,
2026-05-16 21:27:30 +00:00
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start adapter
await supervisor._start_adapter(config)
2026-05-16 21:27:30 +00:00
state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
saved_poll = state.last_completed_poll
2026-05-16 21:27:30 +00:00
# Stop adapter
await supervisor._stop_adapter("nws")
2026-05-16 21:27:30 +00:00
# State should still exist
assert "nws" in supervisor._adapter_states
state = supervisor._adapter_states["nws"]
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_poll
2026-05-16 21:27:30 +00:00
# Restart adapter
await supervisor._start_adapter(config)
2026-05-16 21:27:30 +00:00
# Should reuse existing state
state = supervisor._adapter_states.get("nws")
assert adapter_is_running(state)
assert state.last_completed_poll == saved_poll
2026-05-16 21:27:30 +00:00
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
2026-05-16 21:27:30 +00:00
@pytest.mark.asyncio
async def test_remove_adapter_clears_state(
self, mock_nats, mock_config_store, tmp_path: Path
2026-05-16 21:27:30 +00:00
) -> None:
"""Verify _remove_adapter fully clears state."""
from central.supervisor import Supervisor
config_source = MockConfigSource()
config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(config)
supervisor = Supervisor(
config_source=config_source,
config_store=mock_config_store,
2026-05-16 21:27:30 +00:00
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(config)
2026-05-16 21:27:30 +00:00
state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc)
2026-05-16 21:27:30 +00:00
# Remove adapter
await cleanup_adapter(supervisor, "nws")
2026-05-16 21:27:30 +00:00
# State should be gone
assert "nws" not in supervisor._adapter_states
def test_enrichment_cache_path_is_hermetic(mock_config_store, tmp_path: Path) -> None:
"""No test may touch the production enrichment cache.
The autouse `isolate_enrichment_cache` fixture (conftest) must redirect
ENRICHMENT_CACHE_DB_PATH off /var/lib/central onto a per-test temp dir, and
constructing a Supervisor must open the cache there not in production.
"""
import central.supervisor as supervisor_mod
patched = supervisor_mod.ENRICHMENT_CACHE_DB_PATH
assert tmp_path in patched.parents
assert "/var/lib/central" not in str(patched)
supervisor = supervisor_mod.Supervisor(
config_source=MockConfigSource(),
config_store=mock_config_store,
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
# __init__ opened the cache at the temp path, leaving the db file behind.
assert patched.exists()
assert supervisor._enrichment_cache is not None