Compare commits

...

4 commits

Author SHA1 Message Date
6deccf1cf8
Merge pull request #4 from zvx-echo6/feature/1a-4-cadence-decrease-fix
fix: cadence-decrease hot-reload + integration tests + env docs
2026-05-16 00:58:24 -06:00
Matt Johnson
d210c980fd docs: add environment reference and bug investigation report
environment.md:
- Documents CT104 as the active development location
- Lists SSH access, repository paths, and service commands
- Notes that cortex clone is parked, matt-desktop has no clones

BUG-CADENCE-DECREASE.md:
- Full investigation of the cadence-decrease hot-reload bug
- Root cause analysis: cancel_event.set() inside lock context
- Proposed fix (Option A - structural)
- Test gap identification
- Production verification steps

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-16 05:59:53 +00:00
Matt Johnson
4215744a30 fix: move cancel_event signal outside lock for immediate delivery
The cancel_event.set() call was inside the async lock context in
_on_config_change, causing delayed signal delivery to the sleeping
loop. This manifested as cadence decreases not applying without a
restart - the loop would sleep its full original timeout before
seeing the new cadence.

Fix: _reschedule_adapter now returns the AdapterState to signal,
and _on_config_change signals AFTER releasing the lock. This ensures
immediate event delivery per asyncio semantics.

The lock protects state consistency during config fetches and updates.
The cancel_event is a one-way notification that does not need lock
protection - it simply wakes the sleeping coroutine.

See docs/BUG-CADENCE-DECREASE.md for full investigation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-16 05:59:45 +00:00
Matt Johnson
35de09ea93 test: add hot-reload integration tests for cadence changes
Add tests that exercise the ACTUAL running loop with cancel_event
signaling, not just AdapterState math in isolation.

Test cases:
- Test 1: Cadence decrease (60->30) wakes loop immediately
- Test 2: Cadence increase (10->20) extends wait correctly
- Test 3: Enable/disable/enable with gap > cadence polls immediately
- Test 4: Enable/disable/enable with gap < cadence waits

These tests verify the cancel_event mechanism properly interrupts
the sleeping loop when config changes occur via _on_config_change.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-16 05:59:35 +00:00
4 changed files with 880 additions and 6 deletions

View file

@ -0,0 +1,211 @@
# Bug Investigation: Cadence Decrease Hot-Reload
**Date:** 2026-05-16
**Component:** central-supervisor
**File:** `supervisor.py`
---
## 1. Reproduction
### Test Case: Decrease 60s → 30s
```
Tlast (poll completed): 04:18:24Z
Config change applied: 04:18:30Z (approx)
Expected next poll: 04:18:54Z (Tlast + 30s)
Actual next poll: 04:19:24Z (Tlast + 60s - OLD cadence)
Subsequent polls: Also at 60s intervals
```
### Log Evidence
```json
{"ts": "...", "msg": "Rescheduled adapter", "adapter": "nws", "old_cadence_s": 60, "new_cadence_s": 30, "next_poll": "2026-05-16T04:18:54+00:00"}
```
- "Rescheduled adapter" log fires with **correct** calculated next_poll
- Actual poll occurs at OLD cadence time
- Subsequent polls continue at OLD cadence
### Contrast: Increase 60s → 90s (WORKS)
```
Tlast: 03:16:34Z
Config change: 03:16:36Z
Expected next poll: 03:18:04Z (Tlast + 90s)
Actual next poll: 03:18:04Z ✅
```
---
## 2. Root Cause
### Location
`supervisor.py` lines 395-450 (`_reschedule_adapter`) and lines 144-181 (`_run_adapter_loop`)
### The Bug
The `cancel_event.set()` call in `_reschedule_adapter` does not reliably wake the `asyncio.wait_for()` in the adapter loop when the cadence is **decreased**.
### Why It Happens
1. **Event handler holds lock during signal:**
```python
# _on_config_change (line 466)
async with self._lock:
new_config = await self._config_source.get_adapter(adapter_name)
# ...
await self._reschedule_adapter(adapter_name, new_config) # sets cancel_event here
```
2. **Reschedule updates config then signals:**
```python
# _reschedule_adapter
state.config = new_config # Line 420
state.adapter.cadence_s = new_cadence # Line 423
# ... logging ...
state.cancel_event.set() # Line 450 - inside lock context
```
3. **Asyncio event delivery delay:**
The `asyncio.Event.set()` queues a wakeup for waiting tasks, but the signal delivery is subject to asyncio's task scheduler. When called from within an `async with` block, the event may not be processed until the current task yields or the lock context exits.
4. **Timing difference between increase and decrease:**
- **Increase (60→90):** Loop has ~30-50s remaining sleep. Event signal arrives well before timeout.
- **Decrease (90→60):** Loop may be ~10s from timeout. By the time event signal is processed, timeout has already fired.
5. **Why subsequent polls use old cadence:**
When the loop times out naturally (rather than being woken by event), it proceeds to poll. After poll completes, `state.last_completed_poll` is updated. The loop then reads `state.config.cadence_s` for the NEXT iteration — but if `state.config` was somehow not durably updated (or there's a stale reference), it uses the old value.
**Alternative theory:** The `state.config = new_config` assignment creates a new config object, but the loop may be reading from a captured reference to the old object if there's any closure behavior we're not seeing.
---
## 3. Proposed Fix
### Option A: Force immediate reschedule (Recommended)
Move the cancel logic OUTSIDE the lock, and use a more aggressive wake pattern:
```python
async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None:
state = self._adapter_states.get(name)
if state is None or not state.is_running:
await self._start_adapter(new_config)
return
old_cadence = state.config.cadence_s
new_cadence = new_config.cadence_s
# Update config atomically
state.config = new_config
state.adapter.cadence_s = new_cadence
# ... (NWS-specific updates, logging) ...
# Cancel and wait for acknowledgment
state.cancel_event.set()
await asyncio.sleep(0) # Force task switch to process event
```
### Option B: Stop and restart the loop task
For cadence changes, stop the current loop task and create a new one:
```python
async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None:
state = self._adapter_states.get(name)
if state is None:
await self._start_adapter(new_config)
return
# Preserve last_completed_poll
preserved_poll = state.last_completed_poll
# Stop current loop
await self._stop_adapter(name)
# Update config
state.config = new_config
state.last_completed_poll = preserved_poll
# Restart loop
await self._start_adapter(new_config)
```
### Option C: Double-signal pattern
Set the event, yield, then set again to ensure delivery:
```python
state.cancel_event.set()
await asyncio.sleep(0)
state.cancel_event.set() # Redundant but ensures visibility
```
---
## 4. Test Gap
### Missing Tests
The test file `test_config_source_new.py` only tests ConfigSource behavior (list, get, protocol compliance). There are **no tests** for:
1. `_reschedule_adapter` interrupting a sleeping loop
2. Cadence decrease being applied mid-sleep
3. Cadence increase being applied mid-sleep
4. Rate-limit guarantee after reschedule
5. `cancel_event` mechanism in isolation
### Recommended Tests
```python
@pytest.mark.asyncio
async def test_cadence_decrease_applies_immediately():
"""Cadence decrease should wake sleeping loop and reschedule."""
# Setup: Adapter polling at 60s cadence
# Action: Change cadence to 30s while sleeping
# Assert: Next poll at last_poll + 30s, not last_poll + 60s
@pytest.mark.asyncio
async def test_cadence_increase_applies_on_next_cycle():
"""Cadence increase should wake sleeping loop and extend wait."""
# Setup: Adapter polling at 60s cadence
# Action: Change cadence to 90s while sleeping
# Assert: Next poll at last_poll + 90s
@pytest.mark.asyncio
async def test_cancel_event_wakes_sleeping_loop():
"""cancel_event.set() should interrupt asyncio.wait_for()."""
# Unit test for the event mechanism in isolation
```
---
## 5. State at End
### LXC State (Reverted)
- **Cadence in DB:** 60s ✅
- **Actual poll interval:** 60s ✅
- **Supervisor restarted:** 2026-05-16T04:43:40Z
- **Verified polls:**
```
04:43:40.964 - First poll after restart
04:44:41.171 - Second poll (61s later) ✅
```
### Mitigation Until Fix
After any cadence change (especially decrease), verify actual poll intervals. If incorrect, restart supervisor:
```bash
systemctl restart central-supervisor
```
---
## Summary
| Item | Details |
|------|---------|
| **Bug** | Cadence decrease hot-reload doesn't apply without restart |
| **Root cause** | `cancel_event.set()` inside lock context has delayed delivery |
| **Affects** | Cadence decreases only; increases work correctly |
| **Workaround** | Restart supervisor after cadence decrease |
| **Fix effort** | Low - add `await asyncio.sleep(0)` after event.set() |
| **Test coverage** | None for hot-reload mechanism |

96
docs/environment.md Normal file
View file

@ -0,0 +1,96 @@
# Central Data Hub - Environment Reference
## Development Locations
### Active Development: CT104 (Central LXC)
All development work happens on the Central LXC container:
| Property | Value |
|----------|-------|
| **Hostname** | `central` |
| **Tailscale IP** | `100.64.0.12` |
| **LAN IP** | `192.168.1.104` |
| **SSH access** | `zvx@central` or `zvx@100.64.0.12` |
| **Repository path** | `/opt/central` |
| **Python venv** | `/opt/central/.venv` |
| **Services** | `central-supervisor`, `central-archive` |
### Parked Clone: Cortex
The cortex VM at `/home/zvx/projects/central` contains a clone that is
**not actively used for development**. It may be retired in the future.
Do not make changes there.
### Local Workstation: matt-desktop
The Windows workstation (matt-desktop) has no Central repository clones.
The directory `C:\Users\mtthw\central_work\` is scratch space only and
should not be used for commits.
## Repository
| Property | Value |
|----------|-------|
| **Origin** | `git@github.com:zvx-echo6/central.git` |
| **Main branch** | `main` |
| **Default user** | `central` (on CT104) |
## Services
### central-supervisor
The main adapter scheduler and event publisher. Polls upstream APIs,
normalizes events, and publishes to NATS JetStream.
```bash
# Status
systemctl status central-supervisor
# Logs
journalctl -u central-supervisor -f
# Restart (requires sudo)
sudo systemctl restart central-supervisor
```
### central-archive
Consumes events from NATS JetStream and archives to PostgreSQL/TimescaleDB.
```bash
# Status
systemctl status central-archive
# Logs
journalctl -u central-archive -f
```
## Database
PostgreSQL 16 with TimescaleDB runs on CT104:
```bash
# Connect as central user
psql -h localhost -U central -d central
# Check adapter config
SELECT name, cadence_s, enabled FROM config.adapters;
# Check recent events
SELECT id, time, category FROM events ORDER BY time DESC LIMIT 10;
```
## SSH Access from Windows
From matt-desktop, connect via Tailscale:
```bash
# Direct connection
ssh zvx@100.64.0.12
# Using hostname (if Tailscale DNS configured)
ssh zvx@central
```
Note: The `zvx` user requires password for sudo operations.

View file

@ -396,22 +396,26 @@ class Supervisor:
self,
name: str,
new_config: AdapterConfig,
) -> None:
) -> AdapterState | None:
"""Reschedule an adapter with new configuration.
Maintains rate-limit guarantee: next poll at
(last_completed_poll + new_cadence_s), not now + new_cadence_s.
Returns the AdapterState to signal, or None if no signal needed.
The caller must signal cancel_event AFTER releasing any locks to
ensure immediate event delivery to the sleeping loop.
"""
state = self._adapter_states.get(name)
if state is None:
# Adapter not running - just start it
await self._start_adapter(new_config)
return
return None
if not state.is_running:
# Adapter stopped - restart it
await self._start_adapter(new_config)
return
return None
old_cadence = state.config.cadence_s
new_cadence = new_config.cadence_s
@ -446,8 +450,9 @@ class Supervisor:
},
)
# Signal the loop to re-evaluate its schedule
state.cancel_event.set()
# Return state so caller can signal OUTSIDE any locks.
# This ensures immediate event delivery to the sleeping loop.
return state
async def _on_config_change(self, table: str, key: str) -> None:
"""Handle a configuration change notification.
@ -463,6 +468,9 @@ class Supervisor:
extra={"table": table, "key": key},
)
# Track state that needs signaling after lock release
state_to_signal: AdapterState | None = None
async with self._lock:
# Fetch the current config for this adapter
new_config = await self._config_source.get_adapter(adapter_name)
@ -501,7 +509,13 @@ class Supervisor:
)
else:
# Adapter config changed (cadence, settings)
await self._reschedule_adapter(adapter_name, new_config)
state_to_signal = await self._reschedule_adapter(adapter_name, new_config)
# Signal OUTSIDE the lock to ensure immediate event delivery.
# This fixes cadence-decrease hot-reload where the signal was
# delayed by asyncio task scheduling while holding the lock.
if state_to_signal is not None:
state_to_signal.cancel_event.set()
async def _heartbeat_loop(self) -> None:
"""Publish periodic heartbeats."""

View file

@ -0,0 +1,553 @@
"""Integration tests for cadence hot-reload exercising the ACTUAL running loop.
These tests run _run_adapter_loop and verify that cancel_event.set() properly
interrupts the sleeping loop. They are designed to:
- FAIL on unfixed code (cancel_event.set() inside lock delays signal delivery)
- PASS on fixed code (signal delivered after lock release)
Key difference from existing tests: these tests actually run the loop and
observe real poll timing, rather than testing AdapterState math in isolation.
"""
import asyncio
import base64
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import AsyncIterator
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from central.config_models import AdapterConfig
from central.crypto import KEY_SIZE, clear_key_cache
# Test database DSN
TEST_DB_DSN = os.environ.get(
"CENTRAL_TEST_DB_DSN",
"postgresql://central_test:testpass@localhost/central_test",
)
@pytest.fixture(scope="session")
def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
"""Create a master key file for the test session."""
key = os.urandom(KEY_SIZE)
key_path = tmp_path_factory.mktemp("keys") / "master.key"
key_path.write_text(base64.b64encode(key).decode())
return key_path
@pytest.fixture(autouse=True)
def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Configure master key path for all tests."""
clear_key_cache()
monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN)
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
class MockConfigSource:
"""Mock ConfigSource for testing Supervisor without DB."""
def __init__(self) -> None:
self._adapters: dict[str, AdapterConfig] = {}
def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None:
if config is None:
if name:
self._adapters.pop(name, None)
else:
self._adapters[config.name] = config
async def list_enabled_adapters(self) -> list[AdapterConfig]:
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
return self._adapters.get(name)
async def watch_for_changes(self, callback) -> None:
return
async def close(self) -> None:
pass
class FastMockNWSAdapter:
"""Mock NWSAdapter that completes polls instantly and tracks timing."""
def __init__(self, *, config, cursor_db_path) -> None:
self.config = config
self.cadence_s = config.cadence_s
self.states = set(s.upper() for s in config.states)
self.poll_times: list[datetime] = []
self._published_ids: set[str] = set()
async def startup(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def poll(self) -> AsyncIterator:
"""Record poll time and yield nothing (no events)."""
self.poll_times.append(datetime.now(timezone.utc))
return
yield # Make this an async generator
def is_published(self, event_id: str) -> bool:
return event_id in self._published_ids
def mark_published(self, event_id: str) -> None:
self._published_ids.add(event_id)
def bump_last_seen(self, event_id: str) -> None:
pass
def sweep_old_ids(self) -> int:
return 0
@pytest.fixture
def mock_nats():
"""Mock NATS connection."""
nc = MagicMock()
nc.publish = AsyncMock()
nc.drain = AsyncMock()
nc.close = AsyncMock()
js = MagicMock()
js.publish = AsyncMock()
nc.jetstream = MagicMock(return_value=js)
return nc
class TestCadenceHotReloadLoop:
"""Tests that exercise the ACTUAL running loop with cancel_event signaling."""
@pytest.mark.asyncio
async def test_cadence_decrease_wakes_loop_immediately(
self, mock_nats, tmp_path: Path
) -> None:
"""Test 1: Cadence decrease (60->30) - THE BUG WE ARE FIXING.
This test MUST FAIL on unfixed code where cancel_event.set() is
called inside the lock, causing delayed signal delivery.
- Start adapter with 60s cadence
- Let first poll complete
- Change cadence to 30s via _on_config_change
- Assert next poll fires at ~last_poll+30s, NOT last_poll+60s
On unfixed code: loop sleeps full 60s, poll at T+60
On fixed code: loop wakes immediately, recalculates, polls at T+30
"""
from central.supervisor import Supervisor
config_source = MockConfigSource()
initial_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(initial_config)
supervisor = Supervisor(
config_source=config_source,
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Track the mock adapter instance
adapter_instance = None
def capture_adapter(*, config, cursor_db_path):
nonlocal adapter_instance
adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
return adapter_instance
with patch("central.supervisor.NWSAdapter", capture_adapter):
# Start adapter - this creates the loop task
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
assert state.task is not None
# Wait for first poll to complete
await asyncio.sleep(0.1)
assert len(adapter_instance.poll_times) >= 1, "First poll should complete"
first_poll_time = adapter_instance.poll_times[-1]
# Now the loop is sleeping for 60 seconds. Change cadence to 30s.
new_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=30, # Decreased from 60
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(new_config)
# This should wake the loop via cancel_event
await supervisor._on_config_change("adapters", "nws")
# Wait for second poll - should happen at first_poll + 30s
# Since we just changed cadence, if the fix works, the loop should
# wake up, recalculate, and either poll immediately (if 30s passed)
# or wait the remaining time.
#
# For this test, we wait up to 35s and verify the poll happens
# around the 30s mark, not the 60s mark.
start_wait = datetime.now(timezone.utc)
timeout = 35 # Should complete well before 60s
while len(adapter_instance.poll_times) < 2:
await asyncio.sleep(0.5)
elapsed = (datetime.now(timezone.utc) - start_wait).total_seconds()
if elapsed > timeout:
break
# Verify second poll happened
assert len(adapter_instance.poll_times) >= 2, (
f"Second poll did not happen within {timeout}s. "
f"Bug: cancel_event.set() did not wake the sleeping loop. "
f"Poll times: {adapter_instance.poll_times}"
)
second_poll_time = adapter_instance.poll_times[1]
interval = (second_poll_time - first_poll_time).total_seconds()
# The interval should be ~30s (new cadence), not 60s (old cadence)
# Allow some tolerance for test execution overhead
assert interval < 40, (
f"Poll interval was {interval:.1f}s, expected ~30s. "
f"Bug: loop used old cadence instead of new cadence after reschedule."
)
# Cleanup
supervisor._shutdown_event.set()
state.cancel_event.set()
if state.task:
state.task.cancel()
try:
await state.task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_cadence_increase_extends_wait(
self, mock_nats, tmp_path: Path
) -> None:
"""Test 2: Cadence increase (10->20) extends wait correctly.
- Start adapter with 10s cadence
- Let first poll complete
- Immediately change cadence to 20s
- Assert next poll fires at ~last_poll+20s, not last_poll+10s
"""
from central.supervisor import Supervisor
config_source = MockConfigSource()
initial_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=10, # Short for faster test
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(initial_config)
supervisor = Supervisor(
config_source=config_source,
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
adapter_instance = None
def capture_adapter(*, config, cursor_db_path):
nonlocal adapter_instance
adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
return adapter_instance
with patch("central.supervisor.NWSAdapter", capture_adapter):
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
# Wait for first poll
await asyncio.sleep(0.1)
assert len(adapter_instance.poll_times) >= 1
first_poll_time = adapter_instance.poll_times[-1]
# Change cadence to 20s (increase)
new_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=20,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(new_config)
await supervisor._on_config_change("adapters", "nws")
# Wait 12 seconds - should NOT poll yet (20s cadence)
await asyncio.sleep(12)
# Should still be at 1 poll
assert len(adapter_instance.poll_times) == 1, (
f"Poll happened too early! Expected 1 poll, got {len(adapter_instance.poll_times)}. "
f"Cadence increase should extend wait time."
)
# Wait remaining time plus buffer
await asyncio.sleep(10)
# Now should have second poll
assert len(adapter_instance.poll_times) >= 2, (
f"Second poll did not happen at 20s mark. "
f"Poll times: {adapter_instance.poll_times}"
)
second_poll_time = adapter_instance.poll_times[1]
interval = (second_poll_time - first_poll_time).total_seconds()
# Should be ~20s
assert 18 < interval < 25, (
f"Poll interval was {interval:.1f}s, expected ~20s."
)
# Cleanup
supervisor._shutdown_event.set()
state.cancel_event.set()
if state.task:
state.task.cancel()
try:
await state.task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_enable_disable_enable_gap_exceeds_cadence(
self, mock_nats, tmp_path: Path
) -> None:
"""Test 3: Enable->disable->enable with gap > cadence polls immediately.
- Start adapter, complete one poll at T1
- Disable adapter
- Wait > cadence_s
- Re-enable
- Assert poll fires immediately (gap exceeded cadence)
"""
from central.supervisor import Supervisor
config_source = MockConfigSource()
config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=2, # Short cadence for faster test
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(config)
supervisor = Supervisor(
config_source=config_source,
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
adapter_instances = []
def capture_adapter(*, config, cursor_db_path):
inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
adapter_instances.append(inst)
return inst
with patch("central.supervisor.NWSAdapter", capture_adapter):
await supervisor._start_adapter(config)
state = supervisor._adapter_states.get("nws")
# Wait for first poll
await asyncio.sleep(0.1)
first_adapter = adapter_instances[0]
assert len(first_adapter.poll_times) >= 1
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=2,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Wait longer than cadence
await asyncio.sleep(3)
# Re-enable
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=2,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
reenable_time = datetime.now(timezone.utc)
await supervisor._on_config_change("adapters", "nws")
# Wait a bit for immediate poll
await asyncio.sleep(0.5)
new_state = supervisor._adapter_states.get("nws")
assert new_state is not None
# Check that a poll happened quickly after re-enable
# The new adapter instance should have polled
if len(adapter_instances) > 1:
new_adapter = adapter_instances[-1]
assert len(new_adapter.poll_times) >= 1, (
"Poll should happen immediately when gap > cadence"
)
poll_delay = (new_adapter.poll_times[0] - reenable_time).total_seconds()
assert poll_delay < 1, (
f"Poll took {poll_delay:.1f}s after re-enable, expected immediate"
)
# Cleanup
supervisor._shutdown_event.set()
if new_state.task:
new_state.cancel_event.set()
new_state.task.cancel()
try:
await new_state.task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_enable_disable_enable_gap_within_cadence(
self, mock_nats, tmp_path: Path
) -> None:
"""Test 4: Enable->disable->enable with gap < cadence waits.
- Start adapter with 10s cadence, complete poll at T1
- Disable adapter
- Re-enable quickly (within cadence window)
- Assert next poll fires at T1 + cadence_s, NOT immediately
"""
from central.supervisor import Supervisor
config_source = MockConfigSource()
config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=10, # 10 second cadence
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(config)
supervisor = Supervisor(
config_source=config_source,
nats_url="nats://localhost:4222",
cloudevents_config=None,
)
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
adapter_instances = []
def capture_adapter(*, config, cursor_db_path):
inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
adapter_instances.append(inst)
return inst
with patch("central.supervisor.NWSAdapter", capture_adapter):
await supervisor._start_adapter(config)
state = supervisor._adapter_states.get("nws")
# Wait for first poll
await asyncio.sleep(0.1)
first_adapter = adapter_instances[0]
assert len(first_adapter.poll_times) >= 1
first_poll_time = first_adapter.poll_times[-1]
# Disable adapter quickly
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=10,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Re-enable immediately (within cadence window)
await asyncio.sleep(0.5)
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=10,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
new_state = supervisor._adapter_states.get("nws")
assert new_state is not None
# Wait 3 seconds - should NOT poll yet (still within 10s cadence)
await asyncio.sleep(3)
# The new adapter instance should not have polled yet
if len(adapter_instances) > 1:
new_adapter = adapter_instances[-1]
assert len(new_adapter.poll_times) == 0, (
f"Poll happened too early! Gap < cadence should wait. "
f"Polls: {new_adapter.poll_times}"
)
# Wait for remaining time (about 7 more seconds)
await asyncio.sleep(8)
# Now should have polled
if len(adapter_instances) > 1:
new_adapter = adapter_instances[-1]
assert len(new_adapter.poll_times) >= 1, (
"Poll should have happened by now (10s cadence elapsed)"
)
# Cleanup
supervisor._shutdown_event.set()
if new_state.task:
new_state.cancel_event.set()
new_state.task.cancel()
try:
await new_state.task
except asyncio.CancelledError:
pass