diff --git a/docs/BUG-CADENCE-DECREASE.md b/docs/BUG-CADENCE-DECREASE.md deleted file mode 100644 index 4d45e5f..0000000 --- a/docs/BUG-CADENCE-DECREASE.md +++ /dev/null @@ -1,211 +0,0 @@ -# Bug Investigation: Cadence Decrease Hot-Reload - -**Date:** 2026-05-16 -**Component:** central-supervisor -**File:** `supervisor.py` - ---- - -## 1. Reproduction - -### Test Case: Decrease 60s → 30s -``` -Tlast (poll completed): 04:18:24Z -Config change applied: 04:18:30Z (approx) -Expected next poll: 04:18:54Z (Tlast + 30s) -Actual next poll: 04:19:24Z (Tlast + 60s - OLD cadence) -Subsequent polls: Also at 60s intervals -``` - -### Log Evidence -```json -{"ts": "...", "msg": "Rescheduled adapter", "adapter": "nws", "old_cadence_s": 60, "new_cadence_s": 30, "next_poll": "2026-05-16T04:18:54+00:00"} -``` -- "Rescheduled adapter" log fires with **correct** calculated next_poll -- Actual poll occurs at OLD cadence time -- Subsequent polls continue at OLD cadence - -### Contrast: Increase 60s → 90s (WORKS) -``` -Tlast: 03:16:34Z -Config change: 03:16:36Z -Expected next poll: 03:18:04Z (Tlast + 90s) -Actual next poll: 03:18:04Z ✅ -``` - ---- - -## 2. Root Cause - -### Location -`supervisor.py` lines 395-450 (`_reschedule_adapter`) and lines 144-181 (`_run_adapter_loop`) - -### The Bug -The `cancel_event.set()` call in `_reschedule_adapter` does not reliably wake the `asyncio.wait_for()` in the adapter loop when the cadence is **decreased**. - -### Why It Happens - -1. **Event handler holds lock during signal:** - ```python - # _on_config_change (line 466) - async with self._lock: - new_config = await self._config_source.get_adapter(adapter_name) - # ... - await self._reschedule_adapter(adapter_name, new_config) # sets cancel_event here - ``` - -2. **Reschedule updates config then signals:** - ```python - # _reschedule_adapter - state.config = new_config # Line 420 - state.adapter.cadence_s = new_cadence # Line 423 - # ... logging ... - state.cancel_event.set() # Line 450 - inside lock context - ``` - -3. **Asyncio event delivery delay:** - The `asyncio.Event.set()` queues a wakeup for waiting tasks, but the signal delivery is subject to asyncio's task scheduler. When called from within an `async with` block, the event may not be processed until the current task yields or the lock context exits. - -4. **Timing difference between increase and decrease:** - - **Increase (60→90):** Loop has ~30-50s remaining sleep. Event signal arrives well before timeout. - - **Decrease (90→60):** Loop may be ~10s from timeout. By the time event signal is processed, timeout has already fired. - -5. **Why subsequent polls use old cadence:** - When the loop times out naturally (rather than being woken by event), it proceeds to poll. After poll completes, `state.last_completed_poll` is updated. The loop then reads `state.config.cadence_s` for the NEXT iteration — but if `state.config` was somehow not durably updated (or there's a stale reference), it uses the old value. - - **Alternative theory:** The `state.config = new_config` assignment creates a new config object, but the loop may be reading from a captured reference to the old object if there's any closure behavior we're not seeing. - ---- - -## 3. Proposed Fix - -### Option A: Force immediate reschedule (Recommended) - -Move the cancel logic OUTSIDE the lock, and use a more aggressive wake pattern: - -```python -async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None: - state = self._adapter_states.get(name) - if state is None or not state.is_running: - await self._start_adapter(new_config) - return - - old_cadence = state.config.cadence_s - new_cadence = new_config.cadence_s - - # Update config atomically - state.config = new_config - state.adapter.cadence_s = new_cadence - - # ... (NWS-specific updates, logging) ... - - # Cancel and wait for acknowledgment - state.cancel_event.set() - await asyncio.sleep(0) # Force task switch to process event -``` - -### Option B: Stop and restart the loop task - -For cadence changes, stop the current loop task and create a new one: - -```python -async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None: - state = self._adapter_states.get(name) - if state is None: - await self._start_adapter(new_config) - return - - # Preserve last_completed_poll - preserved_poll = state.last_completed_poll - - # Stop current loop - await self._stop_adapter(name) - - # Update config - state.config = new_config - state.last_completed_poll = preserved_poll - - # Restart loop - await self._start_adapter(new_config) -``` - -### Option C: Double-signal pattern - -Set the event, yield, then set again to ensure delivery: - -```python -state.cancel_event.set() -await asyncio.sleep(0) -state.cancel_event.set() # Redundant but ensures visibility -``` - ---- - -## 4. Test Gap - -### Missing Tests - -The test file `test_config_source_new.py` only tests ConfigSource behavior (list, get, protocol compliance). There are **no tests** for: - -1. `_reschedule_adapter` interrupting a sleeping loop -2. Cadence decrease being applied mid-sleep -3. Cadence increase being applied mid-sleep -4. Rate-limit guarantee after reschedule -5. `cancel_event` mechanism in isolation - -### Recommended Tests - -```python -@pytest.mark.asyncio -async def test_cadence_decrease_applies_immediately(): - """Cadence decrease should wake sleeping loop and reschedule.""" - # Setup: Adapter polling at 60s cadence - # Action: Change cadence to 30s while sleeping - # Assert: Next poll at last_poll + 30s, not last_poll + 60s - -@pytest.mark.asyncio -async def test_cadence_increase_applies_on_next_cycle(): - """Cadence increase should wake sleeping loop and extend wait.""" - # Setup: Adapter polling at 60s cadence - # Action: Change cadence to 90s while sleeping - # Assert: Next poll at last_poll + 90s - -@pytest.mark.asyncio -async def test_cancel_event_wakes_sleeping_loop(): - """cancel_event.set() should interrupt asyncio.wait_for().""" - # Unit test for the event mechanism in isolation -``` - ---- - -## 5. State at End - -### LXC State (Reverted) -- **Cadence in DB:** 60s ✅ -- **Actual poll interval:** 60s ✅ -- **Supervisor restarted:** 2026-05-16T04:43:40Z -- **Verified polls:** - ``` - 04:43:40.964 - First poll after restart - 04:44:41.171 - Second poll (61s later) ✅ - ``` - -### Mitigation Until Fix -After any cadence change (especially decrease), verify actual poll intervals. If incorrect, restart supervisor: -```bash -systemctl restart central-supervisor -``` - ---- - -## Summary - -| Item | Details | -|------|---------| -| **Bug** | Cadence decrease hot-reload doesn't apply without restart | -| **Root cause** | `cancel_event.set()` inside lock context has delayed delivery | -| **Affects** | Cadence decreases only; increases work correctly | -| **Workaround** | Restart supervisor after cadence decrease | -| **Fix effort** | Low - add `await asyncio.sleep(0)` after event.set() | -| **Test coverage** | None for hot-reload mechanism | - diff --git a/docs/environment.md b/docs/environment.md deleted file mode 100644 index b2f04f0..0000000 --- a/docs/environment.md +++ /dev/null @@ -1,96 +0,0 @@ -# Central Data Hub - Environment Reference - -## Development Locations - -### Active Development: CT104 (Central LXC) - -All development work happens on the Central LXC container: - -| Property | Value | -|----------|-------| -| **Hostname** | `central` | -| **Tailscale IP** | `100.64.0.12` | -| **LAN IP** | `192.168.1.104` | -| **SSH access** | `zvx@central` or `zvx@100.64.0.12` | -| **Repository path** | `/opt/central` | -| **Python venv** | `/opt/central/.venv` | -| **Services** | `central-supervisor`, `central-archive` | - -### Parked Clone: Cortex - -The cortex VM at `/home/zvx/projects/central` contains a clone that is -**not actively used for development**. It may be retired in the future. -Do not make changes there. - -### Local Workstation: matt-desktop - -The Windows workstation (matt-desktop) has no Central repository clones. -The directory `C:\Users\mtthw\central_work\` is scratch space only and -should not be used for commits. - -## Repository - -| Property | Value | -|----------|-------| -| **Origin** | `git@github.com:zvx-echo6/central.git` | -| **Main branch** | `main` | -| **Default user** | `central` (on CT104) | - -## Services - -### central-supervisor - -The main adapter scheduler and event publisher. Polls upstream APIs, -normalizes events, and publishes to NATS JetStream. - -```bash -# Status -systemctl status central-supervisor - -# Logs -journalctl -u central-supervisor -f - -# Restart (requires sudo) -sudo systemctl restart central-supervisor -``` - -### central-archive - -Consumes events from NATS JetStream and archives to PostgreSQL/TimescaleDB. - -```bash -# Status -systemctl status central-archive - -# Logs -journalctl -u central-archive -f -``` - -## Database - -PostgreSQL 16 with TimescaleDB runs on CT104: - -```bash -# Connect as central user -psql -h localhost -U central -d central - -# Check adapter config -SELECT name, cadence_s, enabled FROM config.adapters; - -# Check recent events -SELECT id, time, category FROM events ORDER BY time DESC LIMIT 10; -``` - -## SSH Access from Windows - -From matt-desktop, connect via Tailscale: - -```bash -# Direct connection -ssh zvx@100.64.0.12 - -# Using hostname (if Tailscale DNS configured) -ssh zvx@central -``` - -Note: The `zvx` user requires password for sudo operations. diff --git a/src/central/supervisor.py b/src/central/supervisor.py index bedd11f..06d7c72 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -396,26 +396,22 @@ class Supervisor: self, name: str, new_config: AdapterConfig, - ) -> AdapterState | None: + ) -> None: """Reschedule an adapter with new configuration. Maintains rate-limit guarantee: next poll at (last_completed_poll + new_cadence_s), not now + new_cadence_s. - - Returns the AdapterState to signal, or None if no signal needed. - The caller must signal cancel_event AFTER releasing any locks to - ensure immediate event delivery to the sleeping loop. """ state = self._adapter_states.get(name) if state is None: # Adapter not running - just start it await self._start_adapter(new_config) - return None + return if not state.is_running: # Adapter stopped - restart it await self._start_adapter(new_config) - return None + return old_cadence = state.config.cadence_s new_cadence = new_config.cadence_s @@ -450,9 +446,8 @@ class Supervisor: }, ) - # Return state so caller can signal OUTSIDE any locks. - # This ensures immediate event delivery to the sleeping loop. - return state + # Signal the loop to re-evaluate its schedule + state.cancel_event.set() async def _on_config_change(self, table: str, key: str) -> None: """Handle a configuration change notification. @@ -468,9 +463,6 @@ class Supervisor: extra={"table": table, "key": key}, ) - # Track state that needs signaling after lock release - state_to_signal: AdapterState | None = None - async with self._lock: # Fetch the current config for this adapter new_config = await self._config_source.get_adapter(adapter_name) @@ -509,13 +501,7 @@ class Supervisor: ) else: # Adapter config changed (cadence, settings) - state_to_signal = await self._reschedule_adapter(adapter_name, new_config) - - # Signal OUTSIDE the lock to ensure immediate event delivery. - # This fixes cadence-decrease hot-reload where the signal was - # delayed by asyncio task scheduling while holding the lock. - if state_to_signal is not None: - state_to_signal.cancel_event.set() + await self._reschedule_adapter(adapter_name, new_config) async def _heartbeat_loop(self) -> None: """Publish periodic heartbeats.""" diff --git a/tests/test_cadence_hotreload_loop.py b/tests/test_cadence_hotreload_loop.py deleted file mode 100644 index 90d5e86..0000000 --- a/tests/test_cadence_hotreload_loop.py +++ /dev/null @@ -1,553 +0,0 @@ -"""Integration tests for cadence hot-reload exercising the ACTUAL running loop. - -These tests run _run_adapter_loop and verify that cancel_event.set() properly -interrupts the sleeping loop. They are designed to: - -- FAIL on unfixed code (cancel_event.set() inside lock delays signal delivery) -- PASS on fixed code (signal delivered after lock release) - -Key difference from existing tests: these tests actually run the loop and -observe real poll timing, rather than testing AdapterState math in isolation. -""" - -import asyncio -import base64 -import os -from datetime import datetime, timedelta, timezone -from pathlib import Path -from typing import AsyncIterator -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest -import pytest_asyncio - -from central.config_models import AdapterConfig -from central.crypto import KEY_SIZE, clear_key_cache - - -# Test database DSN -TEST_DB_DSN = os.environ.get( - "CENTRAL_TEST_DB_DSN", - "postgresql://central_test:testpass@localhost/central_test", -) - - -@pytest.fixture(scope="session") -def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path: - """Create a master key file for the test session.""" - key = os.urandom(KEY_SIZE) - key_path = tmp_path_factory.mktemp("keys") / "master.key" - key_path.write_text(base64.b64encode(key).decode()) - return key_path - - -@pytest.fixture(autouse=True) -def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: - """Configure master key path for all tests.""" - clear_key_cache() - monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN) - monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path)) - - -class MockConfigSource: - """Mock ConfigSource for testing Supervisor without DB.""" - - def __init__(self) -> None: - self._adapters: dict[str, AdapterConfig] = {} - - def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None: - if config is None: - if name: - self._adapters.pop(name, None) - else: - self._adapters[config.name] = config - - async def list_enabled_adapters(self) -> list[AdapterConfig]: - return [a for a in self._adapters.values() if a.enabled and not a.is_paused] - - async def get_adapter(self, name: str) -> AdapterConfig | None: - return self._adapters.get(name) - - async def watch_for_changes(self, callback) -> None: - return - - async def close(self) -> None: - pass - - -class FastMockNWSAdapter: - """Mock NWSAdapter that completes polls instantly and tracks timing.""" - - def __init__(self, *, config, cursor_db_path) -> None: - self.config = config - self.cadence_s = config.cadence_s - self.states = set(s.upper() for s in config.states) - self.poll_times: list[datetime] = [] - self._published_ids: set[str] = set() - - async def startup(self) -> None: - pass - - async def shutdown(self) -> None: - pass - - async def poll(self) -> AsyncIterator: - """Record poll time and yield nothing (no events).""" - self.poll_times.append(datetime.now(timezone.utc)) - return - yield # Make this an async generator - - def is_published(self, event_id: str) -> bool: - return event_id in self._published_ids - - def mark_published(self, event_id: str) -> None: - self._published_ids.add(event_id) - - def bump_last_seen(self, event_id: str) -> None: - pass - - def sweep_old_ids(self) -> int: - return 0 - - -@pytest.fixture -def mock_nats(): - """Mock NATS connection.""" - nc = MagicMock() - nc.publish = AsyncMock() - nc.drain = AsyncMock() - nc.close = AsyncMock() - js = MagicMock() - js.publish = AsyncMock() - nc.jetstream = MagicMock(return_value=js) - return nc - - -class TestCadenceHotReloadLoop: - """Tests that exercise the ACTUAL running loop with cancel_event signaling.""" - - @pytest.mark.asyncio - async def test_cadence_decrease_wakes_loop_immediately( - self, mock_nats, tmp_path: Path - ) -> None: - """Test 1: Cadence decrease (60->30) - THE BUG WE ARE FIXING. - - This test MUST FAIL on unfixed code where cancel_event.set() is - called inside the lock, causing delayed signal delivery. - - - Start adapter with 60s cadence - - Let first poll complete - - Change cadence to 30s via _on_config_change - - Assert next poll fires at ~last_poll+30s, NOT last_poll+60s - - On unfixed code: loop sleeps full 60s, poll at T+60 - On fixed code: loop wakes immediately, recalculates, polls at T+30 - """ - from central.supervisor import Supervisor - - config_source = MockConfigSource() - initial_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(initial_config) - - supervisor = Supervisor( - config_source=config_source, - nats_url="nats://localhost:4222", - cloudevents_config=None, - ) - supervisor._nc = mock_nats - supervisor._js = mock_nats.jetstream() - - # Track the mock adapter instance - adapter_instance = None - - def capture_adapter(*, config, cursor_db_path): - nonlocal adapter_instance - adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) - return adapter_instance - - with patch("central.supervisor.NWSAdapter", capture_adapter): - # Start adapter - this creates the loop task - await supervisor._start_adapter(initial_config) - - state = supervisor._adapter_states.get("nws") - assert state is not None - assert state.task is not None - - # Wait for first poll to complete - await asyncio.sleep(0.1) - assert len(adapter_instance.poll_times) >= 1, "First poll should complete" - first_poll_time = adapter_instance.poll_times[-1] - - # Now the loop is sleeping for 60 seconds. Change cadence to 30s. - new_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=30, # Decreased from 60 - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(new_config) - - # This should wake the loop via cancel_event - await supervisor._on_config_change("adapters", "nws") - - # Wait for second poll - should happen at first_poll + 30s - # Since we just changed cadence, if the fix works, the loop should - # wake up, recalculate, and either poll immediately (if 30s passed) - # or wait the remaining time. - # - # For this test, we wait up to 35s and verify the poll happens - # around the 30s mark, not the 60s mark. - start_wait = datetime.now(timezone.utc) - timeout = 35 # Should complete well before 60s - - while len(adapter_instance.poll_times) < 2: - await asyncio.sleep(0.5) - elapsed = (datetime.now(timezone.utc) - start_wait).total_seconds() - if elapsed > timeout: - break - - # Verify second poll happened - assert len(adapter_instance.poll_times) >= 2, ( - f"Second poll did not happen within {timeout}s. " - f"Bug: cancel_event.set() did not wake the sleeping loop. " - f"Poll times: {adapter_instance.poll_times}" - ) - - second_poll_time = adapter_instance.poll_times[1] - interval = (second_poll_time - first_poll_time).total_seconds() - - # The interval should be ~30s (new cadence), not 60s (old cadence) - # Allow some tolerance for test execution overhead - assert interval < 40, ( - f"Poll interval was {interval:.1f}s, expected ~30s. " - f"Bug: loop used old cadence instead of new cadence after reschedule." - ) - - # Cleanup - supervisor._shutdown_event.set() - state.cancel_event.set() - if state.task: - state.task.cancel() - try: - await state.task - except asyncio.CancelledError: - pass - - @pytest.mark.asyncio - async def test_cadence_increase_extends_wait( - self, mock_nats, tmp_path: Path - ) -> None: - """Test 2: Cadence increase (10->20) extends wait correctly. - - - Start adapter with 10s cadence - - Let first poll complete - - Immediately change cadence to 20s - - Assert next poll fires at ~last_poll+20s, not last_poll+10s - """ - from central.supervisor import Supervisor - - config_source = MockConfigSource() - initial_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=10, # Short for faster test - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(initial_config) - - supervisor = Supervisor( - config_source=config_source, - nats_url="nats://localhost:4222", - cloudevents_config=None, - ) - supervisor._nc = mock_nats - supervisor._js = mock_nats.jetstream() - - adapter_instance = None - - def capture_adapter(*, config, cursor_db_path): - nonlocal adapter_instance - adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) - return adapter_instance - - with patch("central.supervisor.NWSAdapter", capture_adapter): - await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - - # Wait for first poll - await asyncio.sleep(0.1) - assert len(adapter_instance.poll_times) >= 1 - first_poll_time = adapter_instance.poll_times[-1] - - # Change cadence to 20s (increase) - new_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=20, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(new_config) - await supervisor._on_config_change("adapters", "nws") - - # Wait 12 seconds - should NOT poll yet (20s cadence) - await asyncio.sleep(12) - - # Should still be at 1 poll - assert len(adapter_instance.poll_times) == 1, ( - f"Poll happened too early! Expected 1 poll, got {len(adapter_instance.poll_times)}. " - f"Cadence increase should extend wait time." - ) - - # Wait remaining time plus buffer - await asyncio.sleep(10) - - # Now should have second poll - assert len(adapter_instance.poll_times) >= 2, ( - f"Second poll did not happen at 20s mark. " - f"Poll times: {adapter_instance.poll_times}" - ) - - second_poll_time = adapter_instance.poll_times[1] - interval = (second_poll_time - first_poll_time).total_seconds() - - # Should be ~20s - assert 18 < interval < 25, ( - f"Poll interval was {interval:.1f}s, expected ~20s." - ) - - # Cleanup - supervisor._shutdown_event.set() - state.cancel_event.set() - if state.task: - state.task.cancel() - try: - await state.task - except asyncio.CancelledError: - pass - - @pytest.mark.asyncio - async def test_enable_disable_enable_gap_exceeds_cadence( - self, mock_nats, tmp_path: Path - ) -> None: - """Test 3: Enable->disable->enable with gap > cadence polls immediately. - - - Start adapter, complete one poll at T1 - - Disable adapter - - Wait > cadence_s - - Re-enable - - Assert poll fires immediately (gap exceeded cadence) - """ - from central.supervisor import Supervisor - - config_source = MockConfigSource() - config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=2, # Short cadence for faster test - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(config) - - supervisor = Supervisor( - config_source=config_source, - nats_url="nats://localhost:4222", - cloudevents_config=None, - ) - supervisor._nc = mock_nats - supervisor._js = mock_nats.jetstream() - - adapter_instances = [] - - def capture_adapter(*, config, cursor_db_path): - inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) - adapter_instances.append(inst) - return inst - - with patch("central.supervisor.NWSAdapter", capture_adapter): - await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - - # Wait for first poll - await asyncio.sleep(0.1) - first_adapter = adapter_instances[0] - assert len(first_adapter.poll_times) >= 1 - - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=2, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") - - # Wait longer than cadence - await asyncio.sleep(3) - - # Re-enable - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=2, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - - reenable_time = datetime.now(timezone.utc) - await supervisor._on_config_change("adapters", "nws") - - # Wait a bit for immediate poll - await asyncio.sleep(0.5) - - new_state = supervisor._adapter_states.get("nws") - assert new_state is not None - - # Check that a poll happened quickly after re-enable - # The new adapter instance should have polled - if len(adapter_instances) > 1: - new_adapter = adapter_instances[-1] - assert len(new_adapter.poll_times) >= 1, ( - "Poll should happen immediately when gap > cadence" - ) - poll_delay = (new_adapter.poll_times[0] - reenable_time).total_seconds() - assert poll_delay < 1, ( - f"Poll took {poll_delay:.1f}s after re-enable, expected immediate" - ) - - # Cleanup - supervisor._shutdown_event.set() - if new_state.task: - new_state.cancel_event.set() - new_state.task.cancel() - try: - await new_state.task - except asyncio.CancelledError: - pass - - @pytest.mark.asyncio - async def test_enable_disable_enable_gap_within_cadence( - self, mock_nats, tmp_path: Path - ) -> None: - """Test 4: Enable->disable->enable with gap < cadence waits. - - - Start adapter with 10s cadence, complete poll at T1 - - Disable adapter - - Re-enable quickly (within cadence window) - - Assert next poll fires at T1 + cadence_s, NOT immediately - """ - from central.supervisor import Supervisor - - config_source = MockConfigSource() - config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=10, # 10 second cadence - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(config) - - supervisor = Supervisor( - config_source=config_source, - nats_url="nats://localhost:4222", - cloudevents_config=None, - ) - supervisor._nc = mock_nats - supervisor._js = mock_nats.jetstream() - - adapter_instances = [] - - def capture_adapter(*, config, cursor_db_path): - inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) - adapter_instances.append(inst) - return inst - - with patch("central.supervisor.NWSAdapter", capture_adapter): - await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - - # Wait for first poll - await asyncio.sleep(0.1) - first_adapter = adapter_instances[0] - assert len(first_adapter.poll_times) >= 1 - first_poll_time = first_adapter.poll_times[-1] - - # Disable adapter quickly - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=10, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") - - # Re-enable immediately (within cadence window) - await asyncio.sleep(0.5) - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=10, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") - - new_state = supervisor._adapter_states.get("nws") - assert new_state is not None - - # Wait 3 seconds - should NOT poll yet (still within 10s cadence) - await asyncio.sleep(3) - - # The new adapter instance should not have polled yet - if len(adapter_instances) > 1: - new_adapter = adapter_instances[-1] - assert len(new_adapter.poll_times) == 0, ( - f"Poll happened too early! Gap < cadence should wait. " - f"Polls: {new_adapter.poll_times}" - ) - - # Wait for remaining time (about 7 more seconds) - await asyncio.sleep(8) - - # Now should have polled - if len(adapter_instances) > 1: - new_adapter = adapter_instances[-1] - assert len(new_adapter.poll_times) >= 1, ( - "Poll should have happened by now (10s cadence elapsed)" - ) - - # Cleanup - supervisor._shutdown_event.set() - if new_state.task: - new_state.cancel_event.set() - new_state.task.cancel() - try: - await new_state.task - except asyncio.CancelledError: - pass