diff --git a/docs/BUG-CADENCE-DECREASE.md b/docs/BUG-CADENCE-DECREASE.md new file mode 100644 index 0000000..4d45e5f --- /dev/null +++ b/docs/BUG-CADENCE-DECREASE.md @@ -0,0 +1,211 @@ +# Bug Investigation: Cadence Decrease Hot-Reload + +**Date:** 2026-05-16 +**Component:** central-supervisor +**File:** `supervisor.py` + +--- + +## 1. Reproduction + +### Test Case: Decrease 60s → 30s +``` +Tlast (poll completed): 04:18:24Z +Config change applied: 04:18:30Z (approx) +Expected next poll: 04:18:54Z (Tlast + 30s) +Actual next poll: 04:19:24Z (Tlast + 60s - OLD cadence) +Subsequent polls: Also at 60s intervals +``` + +### Log Evidence +```json +{"ts": "...", "msg": "Rescheduled adapter", "adapter": "nws", "old_cadence_s": 60, "new_cadence_s": 30, "next_poll": "2026-05-16T04:18:54+00:00"} +``` +- "Rescheduled adapter" log fires with **correct** calculated next_poll +- Actual poll occurs at OLD cadence time +- Subsequent polls continue at OLD cadence + +### Contrast: Increase 60s → 90s (WORKS) +``` +Tlast: 03:16:34Z +Config change: 03:16:36Z +Expected next poll: 03:18:04Z (Tlast + 90s) +Actual next poll: 03:18:04Z ✅ +``` + +--- + +## 2. Root Cause + +### Location +`supervisor.py` lines 395-450 (`_reschedule_adapter`) and lines 144-181 (`_run_adapter_loop`) + +### The Bug +The `cancel_event.set()` call in `_reschedule_adapter` does not reliably wake the `asyncio.wait_for()` in the adapter loop when the cadence is **decreased**. + +### Why It Happens + +1. **Event handler holds lock during signal:** + ```python + # _on_config_change (line 466) + async with self._lock: + new_config = await self._config_source.get_adapter(adapter_name) + # ... + await self._reschedule_adapter(adapter_name, new_config) # sets cancel_event here + ``` + +2. **Reschedule updates config then signals:** + ```python + # _reschedule_adapter + state.config = new_config # Line 420 + state.adapter.cadence_s = new_cadence # Line 423 + # ... logging ... + state.cancel_event.set() # Line 450 - inside lock context + ``` + +3. **Asyncio event delivery delay:** + The `asyncio.Event.set()` queues a wakeup for waiting tasks, but the signal delivery is subject to asyncio's task scheduler. When called from within an `async with` block, the event may not be processed until the current task yields or the lock context exits. + +4. **Timing difference between increase and decrease:** + - **Increase (60→90):** Loop has ~30-50s remaining sleep. Event signal arrives well before timeout. + - **Decrease (90→60):** Loop may be ~10s from timeout. By the time event signal is processed, timeout has already fired. + +5. **Why subsequent polls use old cadence:** + When the loop times out naturally (rather than being woken by event), it proceeds to poll. After poll completes, `state.last_completed_poll` is updated. The loop then reads `state.config.cadence_s` for the NEXT iteration — but if `state.config` was somehow not durably updated (or there's a stale reference), it uses the old value. + + **Alternative theory:** The `state.config = new_config` assignment creates a new config object, but the loop may be reading from a captured reference to the old object if there's any closure behavior we're not seeing. + +--- + +## 3. Proposed Fix + +### Option A: Force immediate reschedule (Recommended) + +Move the cancel logic OUTSIDE the lock, and use a more aggressive wake pattern: + +```python +async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None: + state = self._adapter_states.get(name) + if state is None or not state.is_running: + await self._start_adapter(new_config) + return + + old_cadence = state.config.cadence_s + new_cadence = new_config.cadence_s + + # Update config atomically + state.config = new_config + state.adapter.cadence_s = new_cadence + + # ... (NWS-specific updates, logging) ... + + # Cancel and wait for acknowledgment + state.cancel_event.set() + await asyncio.sleep(0) # Force task switch to process event +``` + +### Option B: Stop and restart the loop task + +For cadence changes, stop the current loop task and create a new one: + +```python +async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None: + state = self._adapter_states.get(name) + if state is None: + await self._start_adapter(new_config) + return + + # Preserve last_completed_poll + preserved_poll = state.last_completed_poll + + # Stop current loop + await self._stop_adapter(name) + + # Update config + state.config = new_config + state.last_completed_poll = preserved_poll + + # Restart loop + await self._start_adapter(new_config) +``` + +### Option C: Double-signal pattern + +Set the event, yield, then set again to ensure delivery: + +```python +state.cancel_event.set() +await asyncio.sleep(0) +state.cancel_event.set() # Redundant but ensures visibility +``` + +--- + +## 4. Test Gap + +### Missing Tests + +The test file `test_config_source_new.py` only tests ConfigSource behavior (list, get, protocol compliance). There are **no tests** for: + +1. `_reschedule_adapter` interrupting a sleeping loop +2. Cadence decrease being applied mid-sleep +3. Cadence increase being applied mid-sleep +4. Rate-limit guarantee after reschedule +5. `cancel_event` mechanism in isolation + +### Recommended Tests + +```python +@pytest.mark.asyncio +async def test_cadence_decrease_applies_immediately(): + """Cadence decrease should wake sleeping loop and reschedule.""" + # Setup: Adapter polling at 60s cadence + # Action: Change cadence to 30s while sleeping + # Assert: Next poll at last_poll + 30s, not last_poll + 60s + +@pytest.mark.asyncio +async def test_cadence_increase_applies_on_next_cycle(): + """Cadence increase should wake sleeping loop and extend wait.""" + # Setup: Adapter polling at 60s cadence + # Action: Change cadence to 90s while sleeping + # Assert: Next poll at last_poll + 90s + +@pytest.mark.asyncio +async def test_cancel_event_wakes_sleeping_loop(): + """cancel_event.set() should interrupt asyncio.wait_for().""" + # Unit test for the event mechanism in isolation +``` + +--- + +## 5. State at End + +### LXC State (Reverted) +- **Cadence in DB:** 60s ✅ +- **Actual poll interval:** 60s ✅ +- **Supervisor restarted:** 2026-05-16T04:43:40Z +- **Verified polls:** + ``` + 04:43:40.964 - First poll after restart + 04:44:41.171 - Second poll (61s later) ✅ + ``` + +### Mitigation Until Fix +After any cadence change (especially decrease), verify actual poll intervals. If incorrect, restart supervisor: +```bash +systemctl restart central-supervisor +``` + +--- + +## Summary + +| Item | Details | +|------|---------| +| **Bug** | Cadence decrease hot-reload doesn't apply without restart | +| **Root cause** | `cancel_event.set()` inside lock context has delayed delivery | +| **Affects** | Cadence decreases only; increases work correctly | +| **Workaround** | Restart supervisor after cadence decrease | +| **Fix effort** | Low - add `await asyncio.sleep(0)` after event.set() | +| **Test coverage** | None for hot-reload mechanism | + diff --git a/docs/environment.md b/docs/environment.md new file mode 100644 index 0000000..b2f04f0 --- /dev/null +++ b/docs/environment.md @@ -0,0 +1,96 @@ +# Central Data Hub - Environment Reference + +## Development Locations + +### Active Development: CT104 (Central LXC) + +All development work happens on the Central LXC container: + +| Property | Value | +|----------|-------| +| **Hostname** | `central` | +| **Tailscale IP** | `100.64.0.12` | +| **LAN IP** | `192.168.1.104` | +| **SSH access** | `zvx@central` or `zvx@100.64.0.12` | +| **Repository path** | `/opt/central` | +| **Python venv** | `/opt/central/.venv` | +| **Services** | `central-supervisor`, `central-archive` | + +### Parked Clone: Cortex + +The cortex VM at `/home/zvx/projects/central` contains a clone that is +**not actively used for development**. It may be retired in the future. +Do not make changes there. + +### Local Workstation: matt-desktop + +The Windows workstation (matt-desktop) has no Central repository clones. +The directory `C:\Users\mtthw\central_work\` is scratch space only and +should not be used for commits. + +## Repository + +| Property | Value | +|----------|-------| +| **Origin** | `git@github.com:zvx-echo6/central.git` | +| **Main branch** | `main` | +| **Default user** | `central` (on CT104) | + +## Services + +### central-supervisor + +The main adapter scheduler and event publisher. Polls upstream APIs, +normalizes events, and publishes to NATS JetStream. + +```bash +# Status +systemctl status central-supervisor + +# Logs +journalctl -u central-supervisor -f + +# Restart (requires sudo) +sudo systemctl restart central-supervisor +``` + +### central-archive + +Consumes events from NATS JetStream and archives to PostgreSQL/TimescaleDB. + +```bash +# Status +systemctl status central-archive + +# Logs +journalctl -u central-archive -f +``` + +## Database + +PostgreSQL 16 with TimescaleDB runs on CT104: + +```bash +# Connect as central user +psql -h localhost -U central -d central + +# Check adapter config +SELECT name, cadence_s, enabled FROM config.adapters; + +# Check recent events +SELECT id, time, category FROM events ORDER BY time DESC LIMIT 10; +``` + +## SSH Access from Windows + +From matt-desktop, connect via Tailscale: + +```bash +# Direct connection +ssh zvx@100.64.0.12 + +# Using hostname (if Tailscale DNS configured) +ssh zvx@central +``` + +Note: The `zvx` user requires password for sudo operations. diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 06d7c72..bedd11f 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -396,22 +396,26 @@ class Supervisor: self, name: str, new_config: AdapterConfig, - ) -> None: + ) -> AdapterState | None: """Reschedule an adapter with new configuration. Maintains rate-limit guarantee: next poll at (last_completed_poll + new_cadence_s), not now + new_cadence_s. + + Returns the AdapterState to signal, or None if no signal needed. + The caller must signal cancel_event AFTER releasing any locks to + ensure immediate event delivery to the sleeping loop. """ state = self._adapter_states.get(name) if state is None: # Adapter not running - just start it await self._start_adapter(new_config) - return + return None if not state.is_running: # Adapter stopped - restart it await self._start_adapter(new_config) - return + return None old_cadence = state.config.cadence_s new_cadence = new_config.cadence_s @@ -446,8 +450,9 @@ class Supervisor: }, ) - # Signal the loop to re-evaluate its schedule - state.cancel_event.set() + # Return state so caller can signal OUTSIDE any locks. + # This ensures immediate event delivery to the sleeping loop. + return state async def _on_config_change(self, table: str, key: str) -> None: """Handle a configuration change notification. @@ -463,6 +468,9 @@ class Supervisor: extra={"table": table, "key": key}, ) + # Track state that needs signaling after lock release + state_to_signal: AdapterState | None = None + async with self._lock: # Fetch the current config for this adapter new_config = await self._config_source.get_adapter(adapter_name) @@ -501,7 +509,13 @@ class Supervisor: ) else: # Adapter config changed (cadence, settings) - await self._reschedule_adapter(adapter_name, new_config) + state_to_signal = await self._reschedule_adapter(adapter_name, new_config) + + # Signal OUTSIDE the lock to ensure immediate event delivery. + # This fixes cadence-decrease hot-reload where the signal was + # delayed by asyncio task scheduling while holding the lock. + if state_to_signal is not None: + state_to_signal.cancel_event.set() async def _heartbeat_loop(self) -> None: """Publish periodic heartbeats.""" diff --git a/tests/test_cadence_hotreload_loop.py b/tests/test_cadence_hotreload_loop.py new file mode 100644 index 0000000..90d5e86 --- /dev/null +++ b/tests/test_cadence_hotreload_loop.py @@ -0,0 +1,553 @@ +"""Integration tests for cadence hot-reload exercising the ACTUAL running loop. + +These tests run _run_adapter_loop and verify that cancel_event.set() properly +interrupts the sleeping loop. They are designed to: + +- FAIL on unfixed code (cancel_event.set() inside lock delays signal delivery) +- PASS on fixed code (signal delivered after lock release) + +Key difference from existing tests: these tests actually run the loop and +observe real poll timing, rather than testing AdapterState math in isolation. +""" + +import asyncio +import base64 +import os +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import AsyncIterator +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio + +from central.config_models import AdapterConfig +from central.crypto import KEY_SIZE, clear_key_cache + + +# Test database DSN +TEST_DB_DSN = os.environ.get( + "CENTRAL_TEST_DB_DSN", + "postgresql://central_test:testpass@localhost/central_test", +) + + +@pytest.fixture(scope="session") +def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path: + """Create a master key file for the test session.""" + key = os.urandom(KEY_SIZE) + key_path = tmp_path_factory.mktemp("keys") / "master.key" + key_path.write_text(base64.b64encode(key).decode()) + return key_path + + +@pytest.fixture(autouse=True) +def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Configure master key path for all tests.""" + clear_key_cache() + monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN) + monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path)) + + +class MockConfigSource: + """Mock ConfigSource for testing Supervisor without DB.""" + + def __init__(self) -> None: + self._adapters: dict[str, AdapterConfig] = {} + + def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None: + if config is None: + if name: + self._adapters.pop(name, None) + else: + self._adapters[config.name] = config + + async def list_enabled_adapters(self) -> list[AdapterConfig]: + return [a for a in self._adapters.values() if a.enabled and not a.is_paused] + + async def get_adapter(self, name: str) -> AdapterConfig | None: + return self._adapters.get(name) + + async def watch_for_changes(self, callback) -> None: + return + + async def close(self) -> None: + pass + + +class FastMockNWSAdapter: + """Mock NWSAdapter that completes polls instantly and tracks timing.""" + + def __init__(self, *, config, cursor_db_path) -> None: + self.config = config + self.cadence_s = config.cadence_s + self.states = set(s.upper() for s in config.states) + self.poll_times: list[datetime] = [] + self._published_ids: set[str] = set() + + async def startup(self) -> None: + pass + + async def shutdown(self) -> None: + pass + + async def poll(self) -> AsyncIterator: + """Record poll time and yield nothing (no events).""" + self.poll_times.append(datetime.now(timezone.utc)) + return + yield # Make this an async generator + + def is_published(self, event_id: str) -> bool: + return event_id in self._published_ids + + def mark_published(self, event_id: str) -> None: + self._published_ids.add(event_id) + + def bump_last_seen(self, event_id: str) -> None: + pass + + def sweep_old_ids(self) -> int: + return 0 + + +@pytest.fixture +def mock_nats(): + """Mock NATS connection.""" + nc = MagicMock() + nc.publish = AsyncMock() + nc.drain = AsyncMock() + nc.close = AsyncMock() + js = MagicMock() + js.publish = AsyncMock() + nc.jetstream = MagicMock(return_value=js) + return nc + + +class TestCadenceHotReloadLoop: + """Tests that exercise the ACTUAL running loop with cancel_event signaling.""" + + @pytest.mark.asyncio + async def test_cadence_decrease_wakes_loop_immediately( + self, mock_nats, tmp_path: Path + ) -> None: + """Test 1: Cadence decrease (60->30) - THE BUG WE ARE FIXING. + + This test MUST FAIL on unfixed code where cancel_event.set() is + called inside the lock, causing delayed signal delivery. + + - Start adapter with 60s cadence + - Let first poll complete + - Change cadence to 30s via _on_config_change + - Assert next poll fires at ~last_poll+30s, NOT last_poll+60s + + On unfixed code: loop sleeps full 60s, poll at T+60 + On fixed code: loop wakes immediately, recalculates, polls at T+30 + """ + from central.supervisor import Supervisor + + config_source = MockConfigSource() + initial_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(initial_config) + + supervisor = Supervisor( + config_source=config_source, + nats_url="nats://localhost:4222", + cloudevents_config=None, + ) + supervisor._nc = mock_nats + supervisor._js = mock_nats.jetstream() + + # Track the mock adapter instance + adapter_instance = None + + def capture_adapter(*, config, cursor_db_path): + nonlocal adapter_instance + adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) + return adapter_instance + + with patch("central.supervisor.NWSAdapter", capture_adapter): + # Start adapter - this creates the loop task + await supervisor._start_adapter(initial_config) + + state = supervisor._adapter_states.get("nws") + assert state is not None + assert state.task is not None + + # Wait for first poll to complete + await asyncio.sleep(0.1) + assert len(adapter_instance.poll_times) >= 1, "First poll should complete" + first_poll_time = adapter_instance.poll_times[-1] + + # Now the loop is sleeping for 60 seconds. Change cadence to 30s. + new_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=30, # Decreased from 60 + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(new_config) + + # This should wake the loop via cancel_event + await supervisor._on_config_change("adapters", "nws") + + # Wait for second poll - should happen at first_poll + 30s + # Since we just changed cadence, if the fix works, the loop should + # wake up, recalculate, and either poll immediately (if 30s passed) + # or wait the remaining time. + # + # For this test, we wait up to 35s and verify the poll happens + # around the 30s mark, not the 60s mark. + start_wait = datetime.now(timezone.utc) + timeout = 35 # Should complete well before 60s + + while len(adapter_instance.poll_times) < 2: + await asyncio.sleep(0.5) + elapsed = (datetime.now(timezone.utc) - start_wait).total_seconds() + if elapsed > timeout: + break + + # Verify second poll happened + assert len(adapter_instance.poll_times) >= 2, ( + f"Second poll did not happen within {timeout}s. " + f"Bug: cancel_event.set() did not wake the sleeping loop. " + f"Poll times: {adapter_instance.poll_times}" + ) + + second_poll_time = adapter_instance.poll_times[1] + interval = (second_poll_time - first_poll_time).total_seconds() + + # The interval should be ~30s (new cadence), not 60s (old cadence) + # Allow some tolerance for test execution overhead + assert interval < 40, ( + f"Poll interval was {interval:.1f}s, expected ~30s. " + f"Bug: loop used old cadence instead of new cadence after reschedule." + ) + + # Cleanup + supervisor._shutdown_event.set() + state.cancel_event.set() + if state.task: + state.task.cancel() + try: + await state.task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_cadence_increase_extends_wait( + self, mock_nats, tmp_path: Path + ) -> None: + """Test 2: Cadence increase (10->20) extends wait correctly. + + - Start adapter with 10s cadence + - Let first poll complete + - Immediately change cadence to 20s + - Assert next poll fires at ~last_poll+20s, not last_poll+10s + """ + from central.supervisor import Supervisor + + config_source = MockConfigSource() + initial_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=10, # Short for faster test + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(initial_config) + + supervisor = Supervisor( + config_source=config_source, + nats_url="nats://localhost:4222", + cloudevents_config=None, + ) + supervisor._nc = mock_nats + supervisor._js = mock_nats.jetstream() + + adapter_instance = None + + def capture_adapter(*, config, cursor_db_path): + nonlocal adapter_instance + adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) + return adapter_instance + + with patch("central.supervisor.NWSAdapter", capture_adapter): + await supervisor._start_adapter(initial_config) + state = supervisor._adapter_states.get("nws") + + # Wait for first poll + await asyncio.sleep(0.1) + assert len(adapter_instance.poll_times) >= 1 + first_poll_time = adapter_instance.poll_times[-1] + + # Change cadence to 20s (increase) + new_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=20, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(new_config) + await supervisor._on_config_change("adapters", "nws") + + # Wait 12 seconds - should NOT poll yet (20s cadence) + await asyncio.sleep(12) + + # Should still be at 1 poll + assert len(adapter_instance.poll_times) == 1, ( + f"Poll happened too early! Expected 1 poll, got {len(adapter_instance.poll_times)}. " + f"Cadence increase should extend wait time." + ) + + # Wait remaining time plus buffer + await asyncio.sleep(10) + + # Now should have second poll + assert len(adapter_instance.poll_times) >= 2, ( + f"Second poll did not happen at 20s mark. " + f"Poll times: {adapter_instance.poll_times}" + ) + + second_poll_time = adapter_instance.poll_times[1] + interval = (second_poll_time - first_poll_time).total_seconds() + + # Should be ~20s + assert 18 < interval < 25, ( + f"Poll interval was {interval:.1f}s, expected ~20s." + ) + + # Cleanup + supervisor._shutdown_event.set() + state.cancel_event.set() + if state.task: + state.task.cancel() + try: + await state.task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_enable_disable_enable_gap_exceeds_cadence( + self, mock_nats, tmp_path: Path + ) -> None: + """Test 3: Enable->disable->enable with gap > cadence polls immediately. + + - Start adapter, complete one poll at T1 + - Disable adapter + - Wait > cadence_s + - Re-enable + - Assert poll fires immediately (gap exceeded cadence) + """ + from central.supervisor import Supervisor + + config_source = MockConfigSource() + config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=2, # Short cadence for faster test + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(config) + + supervisor = Supervisor( + config_source=config_source, + nats_url="nats://localhost:4222", + cloudevents_config=None, + ) + supervisor._nc = mock_nats + supervisor._js = mock_nats.jetstream() + + adapter_instances = [] + + def capture_adapter(*, config, cursor_db_path): + inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) + adapter_instances.append(inst) + return inst + + with patch("central.supervisor.NWSAdapter", capture_adapter): + await supervisor._start_adapter(config) + state = supervisor._adapter_states.get("nws") + + # Wait for first poll + await asyncio.sleep(0.1) + first_adapter = adapter_instances[0] + assert len(first_adapter.poll_times) >= 1 + + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=2, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") + + # Wait longer than cadence + await asyncio.sleep(3) + + # Re-enable + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=2, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + + reenable_time = datetime.now(timezone.utc) + await supervisor._on_config_change("adapters", "nws") + + # Wait a bit for immediate poll + await asyncio.sleep(0.5) + + new_state = supervisor._adapter_states.get("nws") + assert new_state is not None + + # Check that a poll happened quickly after re-enable + # The new adapter instance should have polled + if len(adapter_instances) > 1: + new_adapter = adapter_instances[-1] + assert len(new_adapter.poll_times) >= 1, ( + "Poll should happen immediately when gap > cadence" + ) + poll_delay = (new_adapter.poll_times[0] - reenable_time).total_seconds() + assert poll_delay < 1, ( + f"Poll took {poll_delay:.1f}s after re-enable, expected immediate" + ) + + # Cleanup + supervisor._shutdown_event.set() + if new_state.task: + new_state.cancel_event.set() + new_state.task.cancel() + try: + await new_state.task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_enable_disable_enable_gap_within_cadence( + self, mock_nats, tmp_path: Path + ) -> None: + """Test 4: Enable->disable->enable with gap < cadence waits. + + - Start adapter with 10s cadence, complete poll at T1 + - Disable adapter + - Re-enable quickly (within cadence window) + - Assert next poll fires at T1 + cadence_s, NOT immediately + """ + from central.supervisor import Supervisor + + config_source = MockConfigSource() + config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=10, # 10 second cadence + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(config) + + supervisor = Supervisor( + config_source=config_source, + nats_url="nats://localhost:4222", + cloudevents_config=None, + ) + supervisor._nc = mock_nats + supervisor._js = mock_nats.jetstream() + + adapter_instances = [] + + def capture_adapter(*, config, cursor_db_path): + inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path) + adapter_instances.append(inst) + return inst + + with patch("central.supervisor.NWSAdapter", capture_adapter): + await supervisor._start_adapter(config) + state = supervisor._adapter_states.get("nws") + + # Wait for first poll + await asyncio.sleep(0.1) + first_adapter = adapter_instances[0] + assert len(first_adapter.poll_times) >= 1 + first_poll_time = first_adapter.poll_times[-1] + + # Disable adapter quickly + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=10, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") + + # Re-enable immediately (within cadence window) + await asyncio.sleep(0.5) + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=10, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") + + new_state = supervisor._adapter_states.get("nws") + assert new_state is not None + + # Wait 3 seconds - should NOT poll yet (still within 10s cadence) + await asyncio.sleep(3) + + # The new adapter instance should not have polled yet + if len(adapter_instances) > 1: + new_adapter = adapter_instances[-1] + assert len(new_adapter.poll_times) == 0, ( + f"Poll happened too early! Gap < cadence should wait. " + f"Polls: {new_adapter.poll_times}" + ) + + # Wait for remaining time (about 7 more seconds) + await asyncio.sleep(8) + + # Now should have polled + if len(adapter_instances) > 1: + new_adapter = adapter_instances[-1] + assert len(new_adapter.poll_times) >= 1, ( + "Poll should have happened by now (10s cadence elapsed)" + ) + + # Cleanup + supervisor._shutdown_event.set() + if new_state.task: + new_state.cancel_event.set() + new_state.task.cancel() + try: + await new_state.task + except asyncio.CancelledError: + pass