mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-22 02:24:38 +02:00
Merge pull request #4 from zvx-echo6/feature/1a-4-cadence-decrease-fix
fix: cadence-decrease hot-reload + integration tests + env docs
This commit is contained in:
commit
6deccf1cf8
4 changed files with 880 additions and 6 deletions
211
docs/BUG-CADENCE-DECREASE.md
Normal file
211
docs/BUG-CADENCE-DECREASE.md
Normal file
|
|
@ -0,0 +1,211 @@
|
||||||
|
# Bug Investigation: Cadence Decrease Hot-Reload
|
||||||
|
|
||||||
|
**Date:** 2026-05-16
|
||||||
|
**Component:** central-supervisor
|
||||||
|
**File:** `supervisor.py`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1. Reproduction
|
||||||
|
|
||||||
|
### Test Case: Decrease 60s → 30s
|
||||||
|
```
|
||||||
|
Tlast (poll completed): 04:18:24Z
|
||||||
|
Config change applied: 04:18:30Z (approx)
|
||||||
|
Expected next poll: 04:18:54Z (Tlast + 30s)
|
||||||
|
Actual next poll: 04:19:24Z (Tlast + 60s - OLD cadence)
|
||||||
|
Subsequent polls: Also at 60s intervals
|
||||||
|
```
|
||||||
|
|
||||||
|
### Log Evidence
|
||||||
|
```json
|
||||||
|
{"ts": "...", "msg": "Rescheduled adapter", "adapter": "nws", "old_cadence_s": 60, "new_cadence_s": 30, "next_poll": "2026-05-16T04:18:54+00:00"}
|
||||||
|
```
|
||||||
|
- "Rescheduled adapter" log fires with **correct** calculated next_poll
|
||||||
|
- Actual poll occurs at OLD cadence time
|
||||||
|
- Subsequent polls continue at OLD cadence
|
||||||
|
|
||||||
|
### Contrast: Increase 60s → 90s (WORKS)
|
||||||
|
```
|
||||||
|
Tlast: 03:16:34Z
|
||||||
|
Config change: 03:16:36Z
|
||||||
|
Expected next poll: 03:18:04Z (Tlast + 90s)
|
||||||
|
Actual next poll: 03:18:04Z ✅
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. Root Cause
|
||||||
|
|
||||||
|
### Location
|
||||||
|
`supervisor.py` lines 395-450 (`_reschedule_adapter`) and lines 144-181 (`_run_adapter_loop`)
|
||||||
|
|
||||||
|
### The Bug
|
||||||
|
The `cancel_event.set()` call in `_reschedule_adapter` does not reliably wake the `asyncio.wait_for()` in the adapter loop when the cadence is **decreased**.
|
||||||
|
|
||||||
|
### Why It Happens
|
||||||
|
|
||||||
|
1. **Event handler holds lock during signal:**
|
||||||
|
```python
|
||||||
|
# _on_config_change (line 466)
|
||||||
|
async with self._lock:
|
||||||
|
new_config = await self._config_source.get_adapter(adapter_name)
|
||||||
|
# ...
|
||||||
|
await self._reschedule_adapter(adapter_name, new_config) # sets cancel_event here
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Reschedule updates config then signals:**
|
||||||
|
```python
|
||||||
|
# _reschedule_adapter
|
||||||
|
state.config = new_config # Line 420
|
||||||
|
state.adapter.cadence_s = new_cadence # Line 423
|
||||||
|
# ... logging ...
|
||||||
|
state.cancel_event.set() # Line 450 - inside lock context
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Asyncio event delivery delay:**
|
||||||
|
The `asyncio.Event.set()` queues a wakeup for waiting tasks, but the signal delivery is subject to asyncio's task scheduler. When called from within an `async with` block, the event may not be processed until the current task yields or the lock context exits.
|
||||||
|
|
||||||
|
4. **Timing difference between increase and decrease:**
|
||||||
|
- **Increase (60→90):** Loop has ~30-50s remaining sleep. Event signal arrives well before timeout.
|
||||||
|
- **Decrease (90→60):** Loop may be ~10s from timeout. By the time event signal is processed, timeout has already fired.
|
||||||
|
|
||||||
|
5. **Why subsequent polls use old cadence:**
|
||||||
|
When the loop times out naturally (rather than being woken by event), it proceeds to poll. After poll completes, `state.last_completed_poll` is updated. The loop then reads `state.config.cadence_s` for the NEXT iteration — but if `state.config` was somehow not durably updated (or there's a stale reference), it uses the old value.
|
||||||
|
|
||||||
|
**Alternative theory:** The `state.config = new_config` assignment creates a new config object, but the loop may be reading from a captured reference to the old object if there's any closure behavior we're not seeing.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. Proposed Fix
|
||||||
|
|
||||||
|
### Option A: Force immediate reschedule (Recommended)
|
||||||
|
|
||||||
|
Move the cancel logic OUTSIDE the lock, and use a more aggressive wake pattern:
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None:
|
||||||
|
state = self._adapter_states.get(name)
|
||||||
|
if state is None or not state.is_running:
|
||||||
|
await self._start_adapter(new_config)
|
||||||
|
return
|
||||||
|
|
||||||
|
old_cadence = state.config.cadence_s
|
||||||
|
new_cadence = new_config.cadence_s
|
||||||
|
|
||||||
|
# Update config atomically
|
||||||
|
state.config = new_config
|
||||||
|
state.adapter.cadence_s = new_cadence
|
||||||
|
|
||||||
|
# ... (NWS-specific updates, logging) ...
|
||||||
|
|
||||||
|
# Cancel and wait for acknowledgment
|
||||||
|
state.cancel_event.set()
|
||||||
|
await asyncio.sleep(0) # Force task switch to process event
|
||||||
|
```
|
||||||
|
|
||||||
|
### Option B: Stop and restart the loop task
|
||||||
|
|
||||||
|
For cadence changes, stop the current loop task and create a new one:
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None:
|
||||||
|
state = self._adapter_states.get(name)
|
||||||
|
if state is None:
|
||||||
|
await self._start_adapter(new_config)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Preserve last_completed_poll
|
||||||
|
preserved_poll = state.last_completed_poll
|
||||||
|
|
||||||
|
# Stop current loop
|
||||||
|
await self._stop_adapter(name)
|
||||||
|
|
||||||
|
# Update config
|
||||||
|
state.config = new_config
|
||||||
|
state.last_completed_poll = preserved_poll
|
||||||
|
|
||||||
|
# Restart loop
|
||||||
|
await self._start_adapter(new_config)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Option C: Double-signal pattern
|
||||||
|
|
||||||
|
Set the event, yield, then set again to ensure delivery:
|
||||||
|
|
||||||
|
```python
|
||||||
|
state.cancel_event.set()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
state.cancel_event.set() # Redundant but ensures visibility
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. Test Gap
|
||||||
|
|
||||||
|
### Missing Tests
|
||||||
|
|
||||||
|
The test file `test_config_source_new.py` only tests ConfigSource behavior (list, get, protocol compliance). There are **no tests** for:
|
||||||
|
|
||||||
|
1. `_reschedule_adapter` interrupting a sleeping loop
|
||||||
|
2. Cadence decrease being applied mid-sleep
|
||||||
|
3. Cadence increase being applied mid-sleep
|
||||||
|
4. Rate-limit guarantee after reschedule
|
||||||
|
5. `cancel_event` mechanism in isolation
|
||||||
|
|
||||||
|
### Recommended Tests
|
||||||
|
|
||||||
|
```python
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cadence_decrease_applies_immediately():
|
||||||
|
"""Cadence decrease should wake sleeping loop and reschedule."""
|
||||||
|
# Setup: Adapter polling at 60s cadence
|
||||||
|
# Action: Change cadence to 30s while sleeping
|
||||||
|
# Assert: Next poll at last_poll + 30s, not last_poll + 60s
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cadence_increase_applies_on_next_cycle():
|
||||||
|
"""Cadence increase should wake sleeping loop and extend wait."""
|
||||||
|
# Setup: Adapter polling at 60s cadence
|
||||||
|
# Action: Change cadence to 90s while sleeping
|
||||||
|
# Assert: Next poll at last_poll + 90s
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cancel_event_wakes_sleeping_loop():
|
||||||
|
"""cancel_event.set() should interrupt asyncio.wait_for()."""
|
||||||
|
# Unit test for the event mechanism in isolation
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. State at End
|
||||||
|
|
||||||
|
### LXC State (Reverted)
|
||||||
|
- **Cadence in DB:** 60s ✅
|
||||||
|
- **Actual poll interval:** 60s ✅
|
||||||
|
- **Supervisor restarted:** 2026-05-16T04:43:40Z
|
||||||
|
- **Verified polls:**
|
||||||
|
```
|
||||||
|
04:43:40.964 - First poll after restart
|
||||||
|
04:44:41.171 - Second poll (61s later) ✅
|
||||||
|
```
|
||||||
|
|
||||||
|
### Mitigation Until Fix
|
||||||
|
After any cadence change (especially decrease), verify actual poll intervals. If incorrect, restart supervisor:
|
||||||
|
```bash
|
||||||
|
systemctl restart central-supervisor
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
| Item | Details |
|
||||||
|
|------|---------|
|
||||||
|
| **Bug** | Cadence decrease hot-reload doesn't apply without restart |
|
||||||
|
| **Root cause** | `cancel_event.set()` inside lock context has delayed delivery |
|
||||||
|
| **Affects** | Cadence decreases only; increases work correctly |
|
||||||
|
| **Workaround** | Restart supervisor after cadence decrease |
|
||||||
|
| **Fix effort** | Low - add `await asyncio.sleep(0)` after event.set() |
|
||||||
|
| **Test coverage** | None for hot-reload mechanism |
|
||||||
|
|
||||||
96
docs/environment.md
Normal file
96
docs/environment.md
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
# Central Data Hub - Environment Reference
|
||||||
|
|
||||||
|
## Development Locations
|
||||||
|
|
||||||
|
### Active Development: CT104 (Central LXC)
|
||||||
|
|
||||||
|
All development work happens on the Central LXC container:
|
||||||
|
|
||||||
|
| Property | Value |
|
||||||
|
|----------|-------|
|
||||||
|
| **Hostname** | `central` |
|
||||||
|
| **Tailscale IP** | `100.64.0.12` |
|
||||||
|
| **LAN IP** | `192.168.1.104` |
|
||||||
|
| **SSH access** | `zvx@central` or `zvx@100.64.0.12` |
|
||||||
|
| **Repository path** | `/opt/central` |
|
||||||
|
| **Python venv** | `/opt/central/.venv` |
|
||||||
|
| **Services** | `central-supervisor`, `central-archive` |
|
||||||
|
|
||||||
|
### Parked Clone: Cortex
|
||||||
|
|
||||||
|
The cortex VM at `/home/zvx/projects/central` contains a clone that is
|
||||||
|
**not actively used for development**. It may be retired in the future.
|
||||||
|
Do not make changes there.
|
||||||
|
|
||||||
|
### Local Workstation: matt-desktop
|
||||||
|
|
||||||
|
The Windows workstation (matt-desktop) has no Central repository clones.
|
||||||
|
The directory `C:\Users\mtthw\central_work\` is scratch space only and
|
||||||
|
should not be used for commits.
|
||||||
|
|
||||||
|
## Repository
|
||||||
|
|
||||||
|
| Property | Value |
|
||||||
|
|----------|-------|
|
||||||
|
| **Origin** | `git@github.com:zvx-echo6/central.git` |
|
||||||
|
| **Main branch** | `main` |
|
||||||
|
| **Default user** | `central` (on CT104) |
|
||||||
|
|
||||||
|
## Services
|
||||||
|
|
||||||
|
### central-supervisor
|
||||||
|
|
||||||
|
The main adapter scheduler and event publisher. Polls upstream APIs,
|
||||||
|
normalizes events, and publishes to NATS JetStream.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Status
|
||||||
|
systemctl status central-supervisor
|
||||||
|
|
||||||
|
# Logs
|
||||||
|
journalctl -u central-supervisor -f
|
||||||
|
|
||||||
|
# Restart (requires sudo)
|
||||||
|
sudo systemctl restart central-supervisor
|
||||||
|
```
|
||||||
|
|
||||||
|
### central-archive
|
||||||
|
|
||||||
|
Consumes events from NATS JetStream and archives to PostgreSQL/TimescaleDB.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Status
|
||||||
|
systemctl status central-archive
|
||||||
|
|
||||||
|
# Logs
|
||||||
|
journalctl -u central-archive -f
|
||||||
|
```
|
||||||
|
|
||||||
|
## Database
|
||||||
|
|
||||||
|
PostgreSQL 16 with TimescaleDB runs on CT104:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Connect as central user
|
||||||
|
psql -h localhost -U central -d central
|
||||||
|
|
||||||
|
# Check adapter config
|
||||||
|
SELECT name, cadence_s, enabled FROM config.adapters;
|
||||||
|
|
||||||
|
# Check recent events
|
||||||
|
SELECT id, time, category FROM events ORDER BY time DESC LIMIT 10;
|
||||||
|
```
|
||||||
|
|
||||||
|
## SSH Access from Windows
|
||||||
|
|
||||||
|
From matt-desktop, connect via Tailscale:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Direct connection
|
||||||
|
ssh zvx@100.64.0.12
|
||||||
|
|
||||||
|
# Using hostname (if Tailscale DNS configured)
|
||||||
|
ssh zvx@central
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: The `zvx` user requires password for sudo operations.
|
||||||
|
|
@ -396,22 +396,26 @@ class Supervisor:
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
new_config: AdapterConfig,
|
new_config: AdapterConfig,
|
||||||
) -> None:
|
) -> AdapterState | None:
|
||||||
"""Reschedule an adapter with new configuration.
|
"""Reschedule an adapter with new configuration.
|
||||||
|
|
||||||
Maintains rate-limit guarantee: next poll at
|
Maintains rate-limit guarantee: next poll at
|
||||||
(last_completed_poll + new_cadence_s), not now + new_cadence_s.
|
(last_completed_poll + new_cadence_s), not now + new_cadence_s.
|
||||||
|
|
||||||
|
Returns the AdapterState to signal, or None if no signal needed.
|
||||||
|
The caller must signal cancel_event AFTER releasing any locks to
|
||||||
|
ensure immediate event delivery to the sleeping loop.
|
||||||
"""
|
"""
|
||||||
state = self._adapter_states.get(name)
|
state = self._adapter_states.get(name)
|
||||||
if state is None:
|
if state is None:
|
||||||
# Adapter not running - just start it
|
# Adapter not running - just start it
|
||||||
await self._start_adapter(new_config)
|
await self._start_adapter(new_config)
|
||||||
return
|
return None
|
||||||
|
|
||||||
if not state.is_running:
|
if not state.is_running:
|
||||||
# Adapter stopped - restart it
|
# Adapter stopped - restart it
|
||||||
await self._start_adapter(new_config)
|
await self._start_adapter(new_config)
|
||||||
return
|
return None
|
||||||
|
|
||||||
old_cadence = state.config.cadence_s
|
old_cadence = state.config.cadence_s
|
||||||
new_cadence = new_config.cadence_s
|
new_cadence = new_config.cadence_s
|
||||||
|
|
@ -446,8 +450,9 @@ class Supervisor:
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Signal the loop to re-evaluate its schedule
|
# Return state so caller can signal OUTSIDE any locks.
|
||||||
state.cancel_event.set()
|
# This ensures immediate event delivery to the sleeping loop.
|
||||||
|
return state
|
||||||
|
|
||||||
async def _on_config_change(self, table: str, key: str) -> None:
|
async def _on_config_change(self, table: str, key: str) -> None:
|
||||||
"""Handle a configuration change notification.
|
"""Handle a configuration change notification.
|
||||||
|
|
@ -463,6 +468,9 @@ class Supervisor:
|
||||||
extra={"table": table, "key": key},
|
extra={"table": table, "key": key},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Track state that needs signaling after lock release
|
||||||
|
state_to_signal: AdapterState | None = None
|
||||||
|
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
# Fetch the current config for this adapter
|
# Fetch the current config for this adapter
|
||||||
new_config = await self._config_source.get_adapter(adapter_name)
|
new_config = await self._config_source.get_adapter(adapter_name)
|
||||||
|
|
@ -501,7 +509,13 @@ class Supervisor:
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Adapter config changed (cadence, settings)
|
# Adapter config changed (cadence, settings)
|
||||||
await self._reschedule_adapter(adapter_name, new_config)
|
state_to_signal = await self._reschedule_adapter(adapter_name, new_config)
|
||||||
|
|
||||||
|
# Signal OUTSIDE the lock to ensure immediate event delivery.
|
||||||
|
# This fixes cadence-decrease hot-reload where the signal was
|
||||||
|
# delayed by asyncio task scheduling while holding the lock.
|
||||||
|
if state_to_signal is not None:
|
||||||
|
state_to_signal.cancel_event.set()
|
||||||
|
|
||||||
async def _heartbeat_loop(self) -> None:
|
async def _heartbeat_loop(self) -> None:
|
||||||
"""Publish periodic heartbeats."""
|
"""Publish periodic heartbeats."""
|
||||||
|
|
|
||||||
553
tests/test_cadence_hotreload_loop.py
Normal file
553
tests/test_cadence_hotreload_loop.py
Normal file
|
|
@ -0,0 +1,553 @@
|
||||||
|
"""Integration tests for cadence hot-reload exercising the ACTUAL running loop.
|
||||||
|
|
||||||
|
These tests run _run_adapter_loop and verify that cancel_event.set() properly
|
||||||
|
interrupts the sleeping loop. They are designed to:
|
||||||
|
|
||||||
|
- FAIL on unfixed code (cancel_event.set() inside lock delays signal delivery)
|
||||||
|
- PASS on fixed code (signal delivered after lock release)
|
||||||
|
|
||||||
|
Key difference from existing tests: these tests actually run the loop and
|
||||||
|
observe real poll timing, rather than testing AdapterState math in isolation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import AsyncIterator
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
|
||||||
|
from central.config_models import AdapterConfig
|
||||||
|
from central.crypto import KEY_SIZE, clear_key_cache
|
||||||
|
|
||||||
|
|
||||||
|
# Test database DSN
|
||||||
|
TEST_DB_DSN = os.environ.get(
|
||||||
|
"CENTRAL_TEST_DB_DSN",
|
||||||
|
"postgresql://central_test:testpass@localhost/central_test",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
|
||||||
|
"""Create a master key file for the test session."""
|
||||||
|
key = os.urandom(KEY_SIZE)
|
||||||
|
key_path = tmp_path_factory.mktemp("keys") / "master.key"
|
||||||
|
key_path.write_text(base64.b64encode(key).decode())
|
||||||
|
return key_path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""Configure master key path for all tests."""
|
||||||
|
clear_key_cache()
|
||||||
|
monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN)
|
||||||
|
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
|
||||||
|
|
||||||
|
|
||||||
|
class MockConfigSource:
|
||||||
|
"""Mock ConfigSource for testing Supervisor without DB."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._adapters: dict[str, AdapterConfig] = {}
|
||||||
|
|
||||||
|
def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None:
|
||||||
|
if config is None:
|
||||||
|
if name:
|
||||||
|
self._adapters.pop(name, None)
|
||||||
|
else:
|
||||||
|
self._adapters[config.name] = config
|
||||||
|
|
||||||
|
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
||||||
|
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
|
||||||
|
|
||||||
|
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
||||||
|
return self._adapters.get(name)
|
||||||
|
|
||||||
|
async def watch_for_changes(self, callback) -> None:
|
||||||
|
return
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FastMockNWSAdapter:
|
||||||
|
"""Mock NWSAdapter that completes polls instantly and tracks timing."""
|
||||||
|
|
||||||
|
def __init__(self, *, config, cursor_db_path) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.cadence_s = config.cadence_s
|
||||||
|
self.states = set(s.upper() for s in config.states)
|
||||||
|
self.poll_times: list[datetime] = []
|
||||||
|
self._published_ids: set[str] = set()
|
||||||
|
|
||||||
|
async def startup(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def poll(self) -> AsyncIterator:
|
||||||
|
"""Record poll time and yield nothing (no events)."""
|
||||||
|
self.poll_times.append(datetime.now(timezone.utc))
|
||||||
|
return
|
||||||
|
yield # Make this an async generator
|
||||||
|
|
||||||
|
def is_published(self, event_id: str) -> bool:
|
||||||
|
return event_id in self._published_ids
|
||||||
|
|
||||||
|
def mark_published(self, event_id: str) -> None:
|
||||||
|
self._published_ids.add(event_id)
|
||||||
|
|
||||||
|
def bump_last_seen(self, event_id: str) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def sweep_old_ids(self) -> int:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_nats():
|
||||||
|
"""Mock NATS connection."""
|
||||||
|
nc = MagicMock()
|
||||||
|
nc.publish = AsyncMock()
|
||||||
|
nc.drain = AsyncMock()
|
||||||
|
nc.close = AsyncMock()
|
||||||
|
js = MagicMock()
|
||||||
|
js.publish = AsyncMock()
|
||||||
|
nc.jetstream = MagicMock(return_value=js)
|
||||||
|
return nc
|
||||||
|
|
||||||
|
|
||||||
|
class TestCadenceHotReloadLoop:
|
||||||
|
"""Tests that exercise the ACTUAL running loop with cancel_event signaling."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cadence_decrease_wakes_loop_immediately(
|
||||||
|
self, mock_nats, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""Test 1: Cadence decrease (60->30) - THE BUG WE ARE FIXING.
|
||||||
|
|
||||||
|
This test MUST FAIL on unfixed code where cancel_event.set() is
|
||||||
|
called inside the lock, causing delayed signal delivery.
|
||||||
|
|
||||||
|
- Start adapter with 60s cadence
|
||||||
|
- Let first poll complete
|
||||||
|
- Change cadence to 30s via _on_config_change
|
||||||
|
- Assert next poll fires at ~last_poll+30s, NOT last_poll+60s
|
||||||
|
|
||||||
|
On unfixed code: loop sleeps full 60s, poll at T+60
|
||||||
|
On fixed code: loop wakes immediately, recalculates, polls at T+30
|
||||||
|
"""
|
||||||
|
from central.supervisor import Supervisor
|
||||||
|
|
||||||
|
config_source = MockConfigSource()
|
||||||
|
initial_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=60,
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(initial_config)
|
||||||
|
|
||||||
|
supervisor = Supervisor(
|
||||||
|
config_source=config_source,
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
cloudevents_config=None,
|
||||||
|
)
|
||||||
|
supervisor._nc = mock_nats
|
||||||
|
supervisor._js = mock_nats.jetstream()
|
||||||
|
|
||||||
|
# Track the mock adapter instance
|
||||||
|
adapter_instance = None
|
||||||
|
|
||||||
|
def capture_adapter(*, config, cursor_db_path):
|
||||||
|
nonlocal adapter_instance
|
||||||
|
adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||||
|
return adapter_instance
|
||||||
|
|
||||||
|
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||||
|
# Start adapter - this creates the loop task
|
||||||
|
await supervisor._start_adapter(initial_config)
|
||||||
|
|
||||||
|
state = supervisor._adapter_states.get("nws")
|
||||||
|
assert state is not None
|
||||||
|
assert state.task is not None
|
||||||
|
|
||||||
|
# Wait for first poll to complete
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
assert len(adapter_instance.poll_times) >= 1, "First poll should complete"
|
||||||
|
first_poll_time = adapter_instance.poll_times[-1]
|
||||||
|
|
||||||
|
# Now the loop is sleeping for 60 seconds. Change cadence to 30s.
|
||||||
|
new_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=30, # Decreased from 60
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(new_config)
|
||||||
|
|
||||||
|
# This should wake the loop via cancel_event
|
||||||
|
await supervisor._on_config_change("adapters", "nws")
|
||||||
|
|
||||||
|
# Wait for second poll - should happen at first_poll + 30s
|
||||||
|
# Since we just changed cadence, if the fix works, the loop should
|
||||||
|
# wake up, recalculate, and either poll immediately (if 30s passed)
|
||||||
|
# or wait the remaining time.
|
||||||
|
#
|
||||||
|
# For this test, we wait up to 35s and verify the poll happens
|
||||||
|
# around the 30s mark, not the 60s mark.
|
||||||
|
start_wait = datetime.now(timezone.utc)
|
||||||
|
timeout = 35 # Should complete well before 60s
|
||||||
|
|
||||||
|
while len(adapter_instance.poll_times) < 2:
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
elapsed = (datetime.now(timezone.utc) - start_wait).total_seconds()
|
||||||
|
if elapsed > timeout:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Verify second poll happened
|
||||||
|
assert len(adapter_instance.poll_times) >= 2, (
|
||||||
|
f"Second poll did not happen within {timeout}s. "
|
||||||
|
f"Bug: cancel_event.set() did not wake the sleeping loop. "
|
||||||
|
f"Poll times: {adapter_instance.poll_times}"
|
||||||
|
)
|
||||||
|
|
||||||
|
second_poll_time = adapter_instance.poll_times[1]
|
||||||
|
interval = (second_poll_time - first_poll_time).total_seconds()
|
||||||
|
|
||||||
|
# The interval should be ~30s (new cadence), not 60s (old cadence)
|
||||||
|
# Allow some tolerance for test execution overhead
|
||||||
|
assert interval < 40, (
|
||||||
|
f"Poll interval was {interval:.1f}s, expected ~30s. "
|
||||||
|
f"Bug: loop used old cadence instead of new cadence after reschedule."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
supervisor._shutdown_event.set()
|
||||||
|
state.cancel_event.set()
|
||||||
|
if state.task:
|
||||||
|
state.task.cancel()
|
||||||
|
try:
|
||||||
|
await state.task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cadence_increase_extends_wait(
|
||||||
|
self, mock_nats, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""Test 2: Cadence increase (10->20) extends wait correctly.
|
||||||
|
|
||||||
|
- Start adapter with 10s cadence
|
||||||
|
- Let first poll complete
|
||||||
|
- Immediately change cadence to 20s
|
||||||
|
- Assert next poll fires at ~last_poll+20s, not last_poll+10s
|
||||||
|
"""
|
||||||
|
from central.supervisor import Supervisor
|
||||||
|
|
||||||
|
config_source = MockConfigSource()
|
||||||
|
initial_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=10, # Short for faster test
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(initial_config)
|
||||||
|
|
||||||
|
supervisor = Supervisor(
|
||||||
|
config_source=config_source,
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
cloudevents_config=None,
|
||||||
|
)
|
||||||
|
supervisor._nc = mock_nats
|
||||||
|
supervisor._js = mock_nats.jetstream()
|
||||||
|
|
||||||
|
adapter_instance = None
|
||||||
|
|
||||||
|
def capture_adapter(*, config, cursor_db_path):
|
||||||
|
nonlocal adapter_instance
|
||||||
|
adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||||
|
return adapter_instance
|
||||||
|
|
||||||
|
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||||
|
await supervisor._start_adapter(initial_config)
|
||||||
|
state = supervisor._adapter_states.get("nws")
|
||||||
|
|
||||||
|
# Wait for first poll
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
assert len(adapter_instance.poll_times) >= 1
|
||||||
|
first_poll_time = adapter_instance.poll_times[-1]
|
||||||
|
|
||||||
|
# Change cadence to 20s (increase)
|
||||||
|
new_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=20,
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(new_config)
|
||||||
|
await supervisor._on_config_change("adapters", "nws")
|
||||||
|
|
||||||
|
# Wait 12 seconds - should NOT poll yet (20s cadence)
|
||||||
|
await asyncio.sleep(12)
|
||||||
|
|
||||||
|
# Should still be at 1 poll
|
||||||
|
assert len(adapter_instance.poll_times) == 1, (
|
||||||
|
f"Poll happened too early! Expected 1 poll, got {len(adapter_instance.poll_times)}. "
|
||||||
|
f"Cadence increase should extend wait time."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait remaining time plus buffer
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
|
||||||
|
# Now should have second poll
|
||||||
|
assert len(adapter_instance.poll_times) >= 2, (
|
||||||
|
f"Second poll did not happen at 20s mark. "
|
||||||
|
f"Poll times: {adapter_instance.poll_times}"
|
||||||
|
)
|
||||||
|
|
||||||
|
second_poll_time = adapter_instance.poll_times[1]
|
||||||
|
interval = (second_poll_time - first_poll_time).total_seconds()
|
||||||
|
|
||||||
|
# Should be ~20s
|
||||||
|
assert 18 < interval < 25, (
|
||||||
|
f"Poll interval was {interval:.1f}s, expected ~20s."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
supervisor._shutdown_event.set()
|
||||||
|
state.cancel_event.set()
|
||||||
|
if state.task:
|
||||||
|
state.task.cancel()
|
||||||
|
try:
|
||||||
|
await state.task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_enable_disable_enable_gap_exceeds_cadence(
|
||||||
|
self, mock_nats, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""Test 3: Enable->disable->enable with gap > cadence polls immediately.
|
||||||
|
|
||||||
|
- Start adapter, complete one poll at T1
|
||||||
|
- Disable adapter
|
||||||
|
- Wait > cadence_s
|
||||||
|
- Re-enable
|
||||||
|
- Assert poll fires immediately (gap exceeded cadence)
|
||||||
|
"""
|
||||||
|
from central.supervisor import Supervisor
|
||||||
|
|
||||||
|
config_source = MockConfigSource()
|
||||||
|
config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=2, # Short cadence for faster test
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(config)
|
||||||
|
|
||||||
|
supervisor = Supervisor(
|
||||||
|
config_source=config_source,
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
cloudevents_config=None,
|
||||||
|
)
|
||||||
|
supervisor._nc = mock_nats
|
||||||
|
supervisor._js = mock_nats.jetstream()
|
||||||
|
|
||||||
|
adapter_instances = []
|
||||||
|
|
||||||
|
def capture_adapter(*, config, cursor_db_path):
|
||||||
|
inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||||
|
adapter_instances.append(inst)
|
||||||
|
return inst
|
||||||
|
|
||||||
|
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||||
|
await supervisor._start_adapter(config)
|
||||||
|
state = supervisor._adapter_states.get("nws")
|
||||||
|
|
||||||
|
# Wait for first poll
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
first_adapter = adapter_instances[0]
|
||||||
|
assert len(first_adapter.poll_times) >= 1
|
||||||
|
|
||||||
|
# Disable adapter
|
||||||
|
disabled_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=False,
|
||||||
|
cadence_s=2,
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(disabled_config)
|
||||||
|
await supervisor._on_config_change("adapters", "nws")
|
||||||
|
|
||||||
|
# Wait longer than cadence
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
# Re-enable
|
||||||
|
reenabled_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=2,
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(reenabled_config)
|
||||||
|
|
||||||
|
reenable_time = datetime.now(timezone.utc)
|
||||||
|
await supervisor._on_config_change("adapters", "nws")
|
||||||
|
|
||||||
|
# Wait a bit for immediate poll
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
new_state = supervisor._adapter_states.get("nws")
|
||||||
|
assert new_state is not None
|
||||||
|
|
||||||
|
# Check that a poll happened quickly after re-enable
|
||||||
|
# The new adapter instance should have polled
|
||||||
|
if len(adapter_instances) > 1:
|
||||||
|
new_adapter = adapter_instances[-1]
|
||||||
|
assert len(new_adapter.poll_times) >= 1, (
|
||||||
|
"Poll should happen immediately when gap > cadence"
|
||||||
|
)
|
||||||
|
poll_delay = (new_adapter.poll_times[0] - reenable_time).total_seconds()
|
||||||
|
assert poll_delay < 1, (
|
||||||
|
f"Poll took {poll_delay:.1f}s after re-enable, expected immediate"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
supervisor._shutdown_event.set()
|
||||||
|
if new_state.task:
|
||||||
|
new_state.cancel_event.set()
|
||||||
|
new_state.task.cancel()
|
||||||
|
try:
|
||||||
|
await new_state.task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_enable_disable_enable_gap_within_cadence(
|
||||||
|
self, mock_nats, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""Test 4: Enable->disable->enable with gap < cadence waits.
|
||||||
|
|
||||||
|
- Start adapter with 10s cadence, complete poll at T1
|
||||||
|
- Disable adapter
|
||||||
|
- Re-enable quickly (within cadence window)
|
||||||
|
- Assert next poll fires at T1 + cadence_s, NOT immediately
|
||||||
|
"""
|
||||||
|
from central.supervisor import Supervisor
|
||||||
|
|
||||||
|
config_source = MockConfigSource()
|
||||||
|
config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=10, # 10 second cadence
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(config)
|
||||||
|
|
||||||
|
supervisor = Supervisor(
|
||||||
|
config_source=config_source,
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
cloudevents_config=None,
|
||||||
|
)
|
||||||
|
supervisor._nc = mock_nats
|
||||||
|
supervisor._js = mock_nats.jetstream()
|
||||||
|
|
||||||
|
adapter_instances = []
|
||||||
|
|
||||||
|
def capture_adapter(*, config, cursor_db_path):
|
||||||
|
inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||||
|
adapter_instances.append(inst)
|
||||||
|
return inst
|
||||||
|
|
||||||
|
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||||
|
await supervisor._start_adapter(config)
|
||||||
|
state = supervisor._adapter_states.get("nws")
|
||||||
|
|
||||||
|
# Wait for first poll
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
first_adapter = adapter_instances[0]
|
||||||
|
assert len(first_adapter.poll_times) >= 1
|
||||||
|
first_poll_time = first_adapter.poll_times[-1]
|
||||||
|
|
||||||
|
# Disable adapter quickly
|
||||||
|
disabled_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=False,
|
||||||
|
cadence_s=10,
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(disabled_config)
|
||||||
|
await supervisor._on_config_change("adapters", "nws")
|
||||||
|
|
||||||
|
# Re-enable immediately (within cadence window)
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
reenabled_config = AdapterConfig(
|
||||||
|
name="nws",
|
||||||
|
enabled=True,
|
||||||
|
cadence_s=10,
|
||||||
|
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||||
|
paused_at=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
config_source.set_adapter(reenabled_config)
|
||||||
|
await supervisor._on_config_change("adapters", "nws")
|
||||||
|
|
||||||
|
new_state = supervisor._adapter_states.get("nws")
|
||||||
|
assert new_state is not None
|
||||||
|
|
||||||
|
# Wait 3 seconds - should NOT poll yet (still within 10s cadence)
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
# The new adapter instance should not have polled yet
|
||||||
|
if len(adapter_instances) > 1:
|
||||||
|
new_adapter = adapter_instances[-1]
|
||||||
|
assert len(new_adapter.poll_times) == 0, (
|
||||||
|
f"Poll happened too early! Gap < cadence should wait. "
|
||||||
|
f"Polls: {new_adapter.poll_times}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait for remaining time (about 7 more seconds)
|
||||||
|
await asyncio.sleep(8)
|
||||||
|
|
||||||
|
# Now should have polled
|
||||||
|
if len(adapter_instances) > 1:
|
||||||
|
new_adapter = adapter_instances[-1]
|
||||||
|
assert len(new_adapter.poll_times) >= 1, (
|
||||||
|
"Poll should have happened by now (10s cadence elapsed)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
supervisor._shutdown_event.set()
|
||||||
|
if new_state.task:
|
||||||
|
new_state.cancel_event.set()
|
||||||
|
new_state.task.cancel()
|
||||||
|
try:
|
||||||
|
await new_state.task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
Loading…
Add table
Add a link
Reference in a new issue