mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Compare commits
4 commits
c4a65a2ad7
...
6deccf1cf8
| Author | SHA1 | Date | |
|---|---|---|---|
|
6deccf1cf8 |
|||
|
|
d210c980fd | ||
|
|
4215744a30 | ||
|
|
35de09ea93 |
4 changed files with 880 additions and 6 deletions
211
docs/BUG-CADENCE-DECREASE.md
Normal file
211
docs/BUG-CADENCE-DECREASE.md
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
# Bug Investigation: Cadence Decrease Hot-Reload
|
||||
|
||||
**Date:** 2026-05-16
|
||||
**Component:** central-supervisor
|
||||
**File:** `supervisor.py`
|
||||
|
||||
---
|
||||
|
||||
## 1. Reproduction
|
||||
|
||||
### Test Case: Decrease 60s → 30s
|
||||
```
|
||||
Tlast (poll completed): 04:18:24Z
|
||||
Config change applied: 04:18:30Z (approx)
|
||||
Expected next poll: 04:18:54Z (Tlast + 30s)
|
||||
Actual next poll: 04:19:24Z (Tlast + 60s - OLD cadence)
|
||||
Subsequent polls: Also at 60s intervals
|
||||
```
|
||||
|
||||
### Log Evidence
|
||||
```json
|
||||
{"ts": "...", "msg": "Rescheduled adapter", "adapter": "nws", "old_cadence_s": 60, "new_cadence_s": 30, "next_poll": "2026-05-16T04:18:54+00:00"}
|
||||
```
|
||||
- "Rescheduled adapter" log fires with **correct** calculated next_poll
|
||||
- Actual poll occurs at OLD cadence time
|
||||
- Subsequent polls continue at OLD cadence
|
||||
|
||||
### Contrast: Increase 60s → 90s (WORKS)
|
||||
```
|
||||
Tlast: 03:16:34Z
|
||||
Config change: 03:16:36Z
|
||||
Expected next poll: 03:18:04Z (Tlast + 90s)
|
||||
Actual next poll: 03:18:04Z ✅
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 2. Root Cause
|
||||
|
||||
### Location
|
||||
`supervisor.py` lines 395-450 (`_reschedule_adapter`) and lines 144-181 (`_run_adapter_loop`)
|
||||
|
||||
### The Bug
|
||||
The `cancel_event.set()` call in `_reschedule_adapter` does not reliably wake the `asyncio.wait_for()` in the adapter loop when the cadence is **decreased**.
|
||||
|
||||
### Why It Happens
|
||||
|
||||
1. **Event handler holds lock during signal:**
|
||||
```python
|
||||
# _on_config_change (line 466)
|
||||
async with self._lock:
|
||||
new_config = await self._config_source.get_adapter(adapter_name)
|
||||
# ...
|
||||
await self._reschedule_adapter(adapter_name, new_config) # sets cancel_event here
|
||||
```
|
||||
|
||||
2. **Reschedule updates config then signals:**
|
||||
```python
|
||||
# _reschedule_adapter
|
||||
state.config = new_config # Line 420
|
||||
state.adapter.cadence_s = new_cadence # Line 423
|
||||
# ... logging ...
|
||||
state.cancel_event.set() # Line 450 - inside lock context
|
||||
```
|
||||
|
||||
3. **Asyncio event delivery delay:**
|
||||
The `asyncio.Event.set()` queues a wakeup for waiting tasks, but the signal delivery is subject to asyncio's task scheduler. When called from within an `async with` block, the event may not be processed until the current task yields or the lock context exits.
|
||||
|
||||
4. **Timing difference between increase and decrease:**
|
||||
- **Increase (60→90):** Loop has ~30-50s remaining sleep. Event signal arrives well before timeout.
|
||||
- **Decrease (90→60):** Loop may be ~10s from timeout. By the time event signal is processed, timeout has already fired.
|
||||
|
||||
5. **Why subsequent polls use old cadence:**
|
||||
When the loop times out naturally (rather than being woken by event), it proceeds to poll. After poll completes, `state.last_completed_poll` is updated. The loop then reads `state.config.cadence_s` for the NEXT iteration — but if `state.config` was somehow not durably updated (or there's a stale reference), it uses the old value.
|
||||
|
||||
**Alternative theory:** The `state.config = new_config` assignment creates a new config object, but the loop may be reading from a captured reference to the old object if there's any closure behavior we're not seeing.
|
||||
|
||||
---
|
||||
|
||||
## 3. Proposed Fix
|
||||
|
||||
### Option A: Force immediate reschedule (Recommended)
|
||||
|
||||
Move the cancel logic OUTSIDE the lock, and use a more aggressive wake pattern:
|
||||
|
||||
```python
|
||||
async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None:
|
||||
state = self._adapter_states.get(name)
|
||||
if state is None or not state.is_running:
|
||||
await self._start_adapter(new_config)
|
||||
return
|
||||
|
||||
old_cadence = state.config.cadence_s
|
||||
new_cadence = new_config.cadence_s
|
||||
|
||||
# Update config atomically
|
||||
state.config = new_config
|
||||
state.adapter.cadence_s = new_cadence
|
||||
|
||||
# ... (NWS-specific updates, logging) ...
|
||||
|
||||
# Cancel and wait for acknowledgment
|
||||
state.cancel_event.set()
|
||||
await asyncio.sleep(0) # Force task switch to process event
|
||||
```
|
||||
|
||||
### Option B: Stop and restart the loop task
|
||||
|
||||
For cadence changes, stop the current loop task and create a new one:
|
||||
|
||||
```python
|
||||
async def _reschedule_adapter(self, name: str, new_config: AdapterConfig) -> None:
|
||||
state = self._adapter_states.get(name)
|
||||
if state is None:
|
||||
await self._start_adapter(new_config)
|
||||
return
|
||||
|
||||
# Preserve last_completed_poll
|
||||
preserved_poll = state.last_completed_poll
|
||||
|
||||
# Stop current loop
|
||||
await self._stop_adapter(name)
|
||||
|
||||
# Update config
|
||||
state.config = new_config
|
||||
state.last_completed_poll = preserved_poll
|
||||
|
||||
# Restart loop
|
||||
await self._start_adapter(new_config)
|
||||
```
|
||||
|
||||
### Option C: Double-signal pattern
|
||||
|
||||
Set the event, yield, then set again to ensure delivery:
|
||||
|
||||
```python
|
||||
state.cancel_event.set()
|
||||
await asyncio.sleep(0)
|
||||
state.cancel_event.set() # Redundant but ensures visibility
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. Test Gap
|
||||
|
||||
### Missing Tests
|
||||
|
||||
The test file `test_config_source_new.py` only tests ConfigSource behavior (list, get, protocol compliance). There are **no tests** for:
|
||||
|
||||
1. `_reschedule_adapter` interrupting a sleeping loop
|
||||
2. Cadence decrease being applied mid-sleep
|
||||
3. Cadence increase being applied mid-sleep
|
||||
4. Rate-limit guarantee after reschedule
|
||||
5. `cancel_event` mechanism in isolation
|
||||
|
||||
### Recommended Tests
|
||||
|
||||
```python
|
||||
@pytest.mark.asyncio
|
||||
async def test_cadence_decrease_applies_immediately():
|
||||
"""Cadence decrease should wake sleeping loop and reschedule."""
|
||||
# Setup: Adapter polling at 60s cadence
|
||||
# Action: Change cadence to 30s while sleeping
|
||||
# Assert: Next poll at last_poll + 30s, not last_poll + 60s
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cadence_increase_applies_on_next_cycle():
|
||||
"""Cadence increase should wake sleeping loop and extend wait."""
|
||||
# Setup: Adapter polling at 60s cadence
|
||||
# Action: Change cadence to 90s while sleeping
|
||||
# Assert: Next poll at last_poll + 90s
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_event_wakes_sleeping_loop():
|
||||
"""cancel_event.set() should interrupt asyncio.wait_for()."""
|
||||
# Unit test for the event mechanism in isolation
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. State at End
|
||||
|
||||
### LXC State (Reverted)
|
||||
- **Cadence in DB:** 60s ✅
|
||||
- **Actual poll interval:** 60s ✅
|
||||
- **Supervisor restarted:** 2026-05-16T04:43:40Z
|
||||
- **Verified polls:**
|
||||
```
|
||||
04:43:40.964 - First poll after restart
|
||||
04:44:41.171 - Second poll (61s later) ✅
|
||||
```
|
||||
|
||||
### Mitigation Until Fix
|
||||
After any cadence change (especially decrease), verify actual poll intervals. If incorrect, restart supervisor:
|
||||
```bash
|
||||
systemctl restart central-supervisor
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| Item | Details |
|
||||
|------|---------|
|
||||
| **Bug** | Cadence decrease hot-reload doesn't apply without restart |
|
||||
| **Root cause** | `cancel_event.set()` inside lock context has delayed delivery |
|
||||
| **Affects** | Cadence decreases only; increases work correctly |
|
||||
| **Workaround** | Restart supervisor after cadence decrease |
|
||||
| **Fix effort** | Low - add `await asyncio.sleep(0)` after event.set() |
|
||||
| **Test coverage** | None for hot-reload mechanism |
|
||||
|
||||
96
docs/environment.md
Normal file
96
docs/environment.md
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
# Central Data Hub - Environment Reference
|
||||
|
||||
## Development Locations
|
||||
|
||||
### Active Development: CT104 (Central LXC)
|
||||
|
||||
All development work happens on the Central LXC container:
|
||||
|
||||
| Property | Value |
|
||||
|----------|-------|
|
||||
| **Hostname** | `central` |
|
||||
| **Tailscale IP** | `100.64.0.12` |
|
||||
| **LAN IP** | `192.168.1.104` |
|
||||
| **SSH access** | `zvx@central` or `zvx@100.64.0.12` |
|
||||
| **Repository path** | `/opt/central` |
|
||||
| **Python venv** | `/opt/central/.venv` |
|
||||
| **Services** | `central-supervisor`, `central-archive` |
|
||||
|
||||
### Parked Clone: Cortex
|
||||
|
||||
The cortex VM at `/home/zvx/projects/central` contains a clone that is
|
||||
**not actively used for development**. It may be retired in the future.
|
||||
Do not make changes there.
|
||||
|
||||
### Local Workstation: matt-desktop
|
||||
|
||||
The Windows workstation (matt-desktop) has no Central repository clones.
|
||||
The directory `C:\Users\mtthw\central_work\` is scratch space only and
|
||||
should not be used for commits.
|
||||
|
||||
## Repository
|
||||
|
||||
| Property | Value |
|
||||
|----------|-------|
|
||||
| **Origin** | `git@github.com:zvx-echo6/central.git` |
|
||||
| **Main branch** | `main` |
|
||||
| **Default user** | `central` (on CT104) |
|
||||
|
||||
## Services
|
||||
|
||||
### central-supervisor
|
||||
|
||||
The main adapter scheduler and event publisher. Polls upstream APIs,
|
||||
normalizes events, and publishes to NATS JetStream.
|
||||
|
||||
```bash
|
||||
# Status
|
||||
systemctl status central-supervisor
|
||||
|
||||
# Logs
|
||||
journalctl -u central-supervisor -f
|
||||
|
||||
# Restart (requires sudo)
|
||||
sudo systemctl restart central-supervisor
|
||||
```
|
||||
|
||||
### central-archive
|
||||
|
||||
Consumes events from NATS JetStream and archives to PostgreSQL/TimescaleDB.
|
||||
|
||||
```bash
|
||||
# Status
|
||||
systemctl status central-archive
|
||||
|
||||
# Logs
|
||||
journalctl -u central-archive -f
|
||||
```
|
||||
|
||||
## Database
|
||||
|
||||
PostgreSQL 16 with TimescaleDB runs on CT104:
|
||||
|
||||
```bash
|
||||
# Connect as central user
|
||||
psql -h localhost -U central -d central
|
||||
|
||||
# Check adapter config
|
||||
SELECT name, cadence_s, enabled FROM config.adapters;
|
||||
|
||||
# Check recent events
|
||||
SELECT id, time, category FROM events ORDER BY time DESC LIMIT 10;
|
||||
```
|
||||
|
||||
## SSH Access from Windows
|
||||
|
||||
From matt-desktop, connect via Tailscale:
|
||||
|
||||
```bash
|
||||
# Direct connection
|
||||
ssh zvx@100.64.0.12
|
||||
|
||||
# Using hostname (if Tailscale DNS configured)
|
||||
ssh zvx@central
|
||||
```
|
||||
|
||||
Note: The `zvx` user requires password for sudo operations.
|
||||
|
|
@ -396,22 +396,26 @@ class Supervisor:
|
|||
self,
|
||||
name: str,
|
||||
new_config: AdapterConfig,
|
||||
) -> None:
|
||||
) -> AdapterState | None:
|
||||
"""Reschedule an adapter with new configuration.
|
||||
|
||||
Maintains rate-limit guarantee: next poll at
|
||||
(last_completed_poll + new_cadence_s), not now + new_cadence_s.
|
||||
|
||||
Returns the AdapterState to signal, or None if no signal needed.
|
||||
The caller must signal cancel_event AFTER releasing any locks to
|
||||
ensure immediate event delivery to the sleeping loop.
|
||||
"""
|
||||
state = self._adapter_states.get(name)
|
||||
if state is None:
|
||||
# Adapter not running - just start it
|
||||
await self._start_adapter(new_config)
|
||||
return
|
||||
return None
|
||||
|
||||
if not state.is_running:
|
||||
# Adapter stopped - restart it
|
||||
await self._start_adapter(new_config)
|
||||
return
|
||||
return None
|
||||
|
||||
old_cadence = state.config.cadence_s
|
||||
new_cadence = new_config.cadence_s
|
||||
|
|
@ -446,8 +450,9 @@ class Supervisor:
|
|||
},
|
||||
)
|
||||
|
||||
# Signal the loop to re-evaluate its schedule
|
||||
state.cancel_event.set()
|
||||
# Return state so caller can signal OUTSIDE any locks.
|
||||
# This ensures immediate event delivery to the sleeping loop.
|
||||
return state
|
||||
|
||||
async def _on_config_change(self, table: str, key: str) -> None:
|
||||
"""Handle a configuration change notification.
|
||||
|
|
@ -463,6 +468,9 @@ class Supervisor:
|
|||
extra={"table": table, "key": key},
|
||||
)
|
||||
|
||||
# Track state that needs signaling after lock release
|
||||
state_to_signal: AdapterState | None = None
|
||||
|
||||
async with self._lock:
|
||||
# Fetch the current config for this adapter
|
||||
new_config = await self._config_source.get_adapter(adapter_name)
|
||||
|
|
@ -501,7 +509,13 @@ class Supervisor:
|
|||
)
|
||||
else:
|
||||
# Adapter config changed (cadence, settings)
|
||||
await self._reschedule_adapter(adapter_name, new_config)
|
||||
state_to_signal = await self._reschedule_adapter(adapter_name, new_config)
|
||||
|
||||
# Signal OUTSIDE the lock to ensure immediate event delivery.
|
||||
# This fixes cadence-decrease hot-reload where the signal was
|
||||
# delayed by asyncio task scheduling while holding the lock.
|
||||
if state_to_signal is not None:
|
||||
state_to_signal.cancel_event.set()
|
||||
|
||||
async def _heartbeat_loop(self) -> None:
|
||||
"""Publish periodic heartbeats."""
|
||||
|
|
|
|||
553
tests/test_cadence_hotreload_loop.py
Normal file
553
tests/test_cadence_hotreload_loop.py
Normal file
|
|
@ -0,0 +1,553 @@
|
|||
"""Integration tests for cadence hot-reload exercising the ACTUAL running loop.
|
||||
|
||||
These tests run _run_adapter_loop and verify that cancel_event.set() properly
|
||||
interrupts the sleeping loop. They are designed to:
|
||||
|
||||
- FAIL on unfixed code (cancel_event.set() inside lock delays signal delivery)
|
||||
- PASS on fixed code (signal delivered after lock release)
|
||||
|
||||
Key difference from existing tests: these tests actually run the loop and
|
||||
observe real poll timing, rather than testing AdapterState math in isolation.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import AsyncIterator
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from central.config_models import AdapterConfig
|
||||
from central.crypto import KEY_SIZE, clear_key_cache
|
||||
|
||||
|
||||
# Test database DSN
|
||||
TEST_DB_DSN = os.environ.get(
|
||||
"CENTRAL_TEST_DB_DSN",
|
||||
"postgresql://central_test:testpass@localhost/central_test",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def master_key_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
|
||||
"""Create a master key file for the test session."""
|
||||
key = os.urandom(KEY_SIZE)
|
||||
key_path = tmp_path_factory.mktemp("keys") / "master.key"
|
||||
key_path.write_text(base64.b64encode(key).decode())
|
||||
return key_path
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Configure master key path for all tests."""
|
||||
clear_key_cache()
|
||||
monkeypatch.setenv("CENTRAL_DB_DSN", TEST_DB_DSN)
|
||||
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
|
||||
|
||||
|
||||
class MockConfigSource:
|
||||
"""Mock ConfigSource for testing Supervisor without DB."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._adapters: dict[str, AdapterConfig] = {}
|
||||
|
||||
def set_adapter(self, config: AdapterConfig | None, name: str | None = None) -> None:
|
||||
if config is None:
|
||||
if name:
|
||||
self._adapters.pop(name, None)
|
||||
else:
|
||||
self._adapters[config.name] = config
|
||||
|
||||
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
||||
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
|
||||
|
||||
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
||||
return self._adapters.get(name)
|
||||
|
||||
async def watch_for_changes(self, callback) -> None:
|
||||
return
|
||||
|
||||
async def close(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class FastMockNWSAdapter:
|
||||
"""Mock NWSAdapter that completes polls instantly and tracks timing."""
|
||||
|
||||
def __init__(self, *, config, cursor_db_path) -> None:
|
||||
self.config = config
|
||||
self.cadence_s = config.cadence_s
|
||||
self.states = set(s.upper() for s in config.states)
|
||||
self.poll_times: list[datetime] = []
|
||||
self._published_ids: set[str] = set()
|
||||
|
||||
async def startup(self) -> None:
|
||||
pass
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
pass
|
||||
|
||||
async def poll(self) -> AsyncIterator:
|
||||
"""Record poll time and yield nothing (no events)."""
|
||||
self.poll_times.append(datetime.now(timezone.utc))
|
||||
return
|
||||
yield # Make this an async generator
|
||||
|
||||
def is_published(self, event_id: str) -> bool:
|
||||
return event_id in self._published_ids
|
||||
|
||||
def mark_published(self, event_id: str) -> None:
|
||||
self._published_ids.add(event_id)
|
||||
|
||||
def bump_last_seen(self, event_id: str) -> None:
|
||||
pass
|
||||
|
||||
def sweep_old_ids(self) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_nats():
|
||||
"""Mock NATS connection."""
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock()
|
||||
nc.drain = AsyncMock()
|
||||
nc.close = AsyncMock()
|
||||
js = MagicMock()
|
||||
js.publish = AsyncMock()
|
||||
nc.jetstream = MagicMock(return_value=js)
|
||||
return nc
|
||||
|
||||
|
||||
class TestCadenceHotReloadLoop:
|
||||
"""Tests that exercise the ACTUAL running loop with cancel_event signaling."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cadence_decrease_wakes_loop_immediately(
|
||||
self, mock_nats, tmp_path: Path
|
||||
) -> None:
|
||||
"""Test 1: Cadence decrease (60->30) - THE BUG WE ARE FIXING.
|
||||
|
||||
This test MUST FAIL on unfixed code where cancel_event.set() is
|
||||
called inside the lock, causing delayed signal delivery.
|
||||
|
||||
- Start adapter with 60s cadence
|
||||
- Let first poll complete
|
||||
- Change cadence to 30s via _on_config_change
|
||||
- Assert next poll fires at ~last_poll+30s, NOT last_poll+60s
|
||||
|
||||
On unfixed code: loop sleeps full 60s, poll at T+60
|
||||
On fixed code: loop wakes immediately, recalculates, polls at T+30
|
||||
"""
|
||||
from central.supervisor import Supervisor
|
||||
|
||||
config_source = MockConfigSource()
|
||||
initial_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(initial_config)
|
||||
|
||||
supervisor = Supervisor(
|
||||
config_source=config_source,
|
||||
nats_url="nats://localhost:4222",
|
||||
cloudevents_config=None,
|
||||
)
|
||||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
# Track the mock adapter instance
|
||||
adapter_instance = None
|
||||
|
||||
def capture_adapter(*, config, cursor_db_path):
|
||||
nonlocal adapter_instance
|
||||
adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||
return adapter_instance
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||
# Start adapter - this creates the loop task
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert state.task is not None
|
||||
|
||||
# Wait for first poll to complete
|
||||
await asyncio.sleep(0.1)
|
||||
assert len(adapter_instance.poll_times) >= 1, "First poll should complete"
|
||||
first_poll_time = adapter_instance.poll_times[-1]
|
||||
|
||||
# Now the loop is sleeping for 60 seconds. Change cadence to 30s.
|
||||
new_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=30, # Decreased from 60
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(new_config)
|
||||
|
||||
# This should wake the loop via cancel_event
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Wait for second poll - should happen at first_poll + 30s
|
||||
# Since we just changed cadence, if the fix works, the loop should
|
||||
# wake up, recalculate, and either poll immediately (if 30s passed)
|
||||
# or wait the remaining time.
|
||||
#
|
||||
# For this test, we wait up to 35s and verify the poll happens
|
||||
# around the 30s mark, not the 60s mark.
|
||||
start_wait = datetime.now(timezone.utc)
|
||||
timeout = 35 # Should complete well before 60s
|
||||
|
||||
while len(adapter_instance.poll_times) < 2:
|
||||
await asyncio.sleep(0.5)
|
||||
elapsed = (datetime.now(timezone.utc) - start_wait).total_seconds()
|
||||
if elapsed > timeout:
|
||||
break
|
||||
|
||||
# Verify second poll happened
|
||||
assert len(adapter_instance.poll_times) >= 2, (
|
||||
f"Second poll did not happen within {timeout}s. "
|
||||
f"Bug: cancel_event.set() did not wake the sleeping loop. "
|
||||
f"Poll times: {adapter_instance.poll_times}"
|
||||
)
|
||||
|
||||
second_poll_time = adapter_instance.poll_times[1]
|
||||
interval = (second_poll_time - first_poll_time).total_seconds()
|
||||
|
||||
# The interval should be ~30s (new cadence), not 60s (old cadence)
|
||||
# Allow some tolerance for test execution overhead
|
||||
assert interval < 40, (
|
||||
f"Poll interval was {interval:.1f}s, expected ~30s. "
|
||||
f"Bug: loop used old cadence instead of new cadence after reschedule."
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
state.cancel_event.set()
|
||||
if state.task:
|
||||
state.task.cancel()
|
||||
try:
|
||||
await state.task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cadence_increase_extends_wait(
|
||||
self, mock_nats, tmp_path: Path
|
||||
) -> None:
|
||||
"""Test 2: Cadence increase (10->20) extends wait correctly.
|
||||
|
||||
- Start adapter with 10s cadence
|
||||
- Let first poll complete
|
||||
- Immediately change cadence to 20s
|
||||
- Assert next poll fires at ~last_poll+20s, not last_poll+10s
|
||||
"""
|
||||
from central.supervisor import Supervisor
|
||||
|
||||
config_source = MockConfigSource()
|
||||
initial_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=10, # Short for faster test
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(initial_config)
|
||||
|
||||
supervisor = Supervisor(
|
||||
config_source=config_source,
|
||||
nats_url="nats://localhost:4222",
|
||||
cloudevents_config=None,
|
||||
)
|
||||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
adapter_instance = None
|
||||
|
||||
def capture_adapter(*, config, cursor_db_path):
|
||||
nonlocal adapter_instance
|
||||
adapter_instance = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||
return adapter_instance
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||
await supervisor._start_adapter(initial_config)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
|
||||
# Wait for first poll
|
||||
await asyncio.sleep(0.1)
|
||||
assert len(adapter_instance.poll_times) >= 1
|
||||
first_poll_time = adapter_instance.poll_times[-1]
|
||||
|
||||
# Change cadence to 20s (increase)
|
||||
new_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=20,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(new_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Wait 12 seconds - should NOT poll yet (20s cadence)
|
||||
await asyncio.sleep(12)
|
||||
|
||||
# Should still be at 1 poll
|
||||
assert len(adapter_instance.poll_times) == 1, (
|
||||
f"Poll happened too early! Expected 1 poll, got {len(adapter_instance.poll_times)}. "
|
||||
f"Cadence increase should extend wait time."
|
||||
)
|
||||
|
||||
# Wait remaining time plus buffer
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# Now should have second poll
|
||||
assert len(adapter_instance.poll_times) >= 2, (
|
||||
f"Second poll did not happen at 20s mark. "
|
||||
f"Poll times: {adapter_instance.poll_times}"
|
||||
)
|
||||
|
||||
second_poll_time = adapter_instance.poll_times[1]
|
||||
interval = (second_poll_time - first_poll_time).total_seconds()
|
||||
|
||||
# Should be ~20s
|
||||
assert 18 < interval < 25, (
|
||||
f"Poll interval was {interval:.1f}s, expected ~20s."
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
state.cancel_event.set()
|
||||
if state.task:
|
||||
state.task.cancel()
|
||||
try:
|
||||
await state.task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_enable_gap_exceeds_cadence(
|
||||
self, mock_nats, tmp_path: Path
|
||||
) -> None:
|
||||
"""Test 3: Enable->disable->enable with gap > cadence polls immediately.
|
||||
|
||||
- Start adapter, complete one poll at T1
|
||||
- Disable adapter
|
||||
- Wait > cadence_s
|
||||
- Re-enable
|
||||
- Assert poll fires immediately (gap exceeded cadence)
|
||||
"""
|
||||
from central.supervisor import Supervisor
|
||||
|
||||
config_source = MockConfigSource()
|
||||
config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=2, # Short cadence for faster test
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(config)
|
||||
|
||||
supervisor = Supervisor(
|
||||
config_source=config_source,
|
||||
nats_url="nats://localhost:4222",
|
||||
cloudevents_config=None,
|
||||
)
|
||||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
adapter_instances = []
|
||||
|
||||
def capture_adapter(*, config, cursor_db_path):
|
||||
inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||
adapter_instances.append(inst)
|
||||
return inst
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||
await supervisor._start_adapter(config)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
|
||||
# Wait for first poll
|
||||
await asyncio.sleep(0.1)
|
||||
first_adapter = adapter_instances[0]
|
||||
assert len(first_adapter.poll_times) >= 1
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=2,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Wait longer than cadence
|
||||
await asyncio.sleep(3)
|
||||
|
||||
# Re-enable
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=2,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
|
||||
reenable_time = datetime.now(timezone.utc)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Wait a bit for immediate poll
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
new_state = supervisor._adapter_states.get("nws")
|
||||
assert new_state is not None
|
||||
|
||||
# Check that a poll happened quickly after re-enable
|
||||
# The new adapter instance should have polled
|
||||
if len(adapter_instances) > 1:
|
||||
new_adapter = adapter_instances[-1]
|
||||
assert len(new_adapter.poll_times) >= 1, (
|
||||
"Poll should happen immediately when gap > cadence"
|
||||
)
|
||||
poll_delay = (new_adapter.poll_times[0] - reenable_time).total_seconds()
|
||||
assert poll_delay < 1, (
|
||||
f"Poll took {poll_delay:.1f}s after re-enable, expected immediate"
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
if new_state.task:
|
||||
new_state.cancel_event.set()
|
||||
new_state.task.cancel()
|
||||
try:
|
||||
await new_state.task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_enable_gap_within_cadence(
|
||||
self, mock_nats, tmp_path: Path
|
||||
) -> None:
|
||||
"""Test 4: Enable->disable->enable with gap < cadence waits.
|
||||
|
||||
- Start adapter with 10s cadence, complete poll at T1
|
||||
- Disable adapter
|
||||
- Re-enable quickly (within cadence window)
|
||||
- Assert next poll fires at T1 + cadence_s, NOT immediately
|
||||
"""
|
||||
from central.supervisor import Supervisor
|
||||
|
||||
config_source = MockConfigSource()
|
||||
config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=10, # 10 second cadence
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(config)
|
||||
|
||||
supervisor = Supervisor(
|
||||
config_source=config_source,
|
||||
nats_url="nats://localhost:4222",
|
||||
cloudevents_config=None,
|
||||
)
|
||||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
adapter_instances = []
|
||||
|
||||
def capture_adapter(*, config, cursor_db_path):
|
||||
inst = FastMockNWSAdapter(config=config, cursor_db_path=cursor_db_path)
|
||||
adapter_instances.append(inst)
|
||||
return inst
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", capture_adapter):
|
||||
await supervisor._start_adapter(config)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
|
||||
# Wait for first poll
|
||||
await asyncio.sleep(0.1)
|
||||
first_adapter = adapter_instances[0]
|
||||
assert len(first_adapter.poll_times) >= 1
|
||||
first_poll_time = first_adapter.poll_times[-1]
|
||||
|
||||
# Disable adapter quickly
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=10,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Re-enable immediately (within cadence window)
|
||||
await asyncio.sleep(0.5)
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=10,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
new_state = supervisor._adapter_states.get("nws")
|
||||
assert new_state is not None
|
||||
|
||||
# Wait 3 seconds - should NOT poll yet (still within 10s cadence)
|
||||
await asyncio.sleep(3)
|
||||
|
||||
# The new adapter instance should not have polled yet
|
||||
if len(adapter_instances) > 1:
|
||||
new_adapter = adapter_instances[-1]
|
||||
assert len(new_adapter.poll_times) == 0, (
|
||||
f"Poll happened too early! Gap < cadence should wait. "
|
||||
f"Polls: {new_adapter.poll_times}"
|
||||
)
|
||||
|
||||
# Wait for remaining time (about 7 more seconds)
|
||||
await asyncio.sleep(8)
|
||||
|
||||
# Now should have polled
|
||||
if len(adapter_instances) > 1:
|
||||
new_adapter = adapter_instances[-1]
|
||||
assert len(new_adapter.poll_times) >= 1, (
|
||||
"Poll should have happened by now (10s cadence elapsed)"
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
if new_state.task:
|
||||
new_state.cancel_event.set()
|
||||
new_state.task.cancel()
|
||||
try:
|
||||
await new_state.task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
Loading…
Add table
Add a link
Reference in a new issue