mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-22 02:24:38 +02:00
fix: preserve last_completed_poll across adapter disable/enable
Previously, _stop_adapter() used pop() to remove adapter state, which lost last_completed_poll. On re-enable, a fresh state was created, causing immediate poll and violating rate-limit guarantee. Changes: - Add is_running property to AdapterState - _stop_adapter: preserve state, just cancel task - _start_adapter: reuse existing stopped state if present - Add _remove_adapter for full cleanup when adapter is deleted - _on_config_change: call _remove_adapter for deleted adapters Integration tests verify: - Test A: gap > cadence -> immediate poll (correct) - Test B: gap < cadence -> wait until last_poll + cadence (was broken) - Test C: delete + re-add -> fresh state (correct) Tests-fail-before-fix verified: Test A/B failed on unfixed code with "State was removed on stop!", pass with fix. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
1abdf45375
commit
c39e3174b8
2 changed files with 682 additions and 37 deletions
|
|
@ -72,10 +72,10 @@ class AdapterState:
|
|||
last_completed_poll: datetime | None = None
|
||||
cancel_event: asyncio.Event = field(default_factory=asyncio.Event)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
# Ensure cancel_event is created
|
||||
if self.cancel_event is None:
|
||||
self.cancel_event = asyncio.Event()
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Check if adapter loop is currently running."""
|
||||
return self.task is not None and not self.task.done()
|
||||
|
||||
|
||||
class Supervisor:
|
||||
|
|
@ -200,13 +200,8 @@ class Supervisor:
|
|||
state.adapter.bump_last_seen(event.id)
|
||||
continue
|
||||
|
||||
# Build CloudEvent
|
||||
if self._cloudevents_config:
|
||||
envelope, msg_id = wrap_event(event, self._cloudevents_config)
|
||||
else:
|
||||
# Fallback for testing
|
||||
envelope = {"id": event.id, "data": event.data}
|
||||
msg_id = event.id
|
||||
# Build CloudEvent (uses defaults if no config provided)
|
||||
envelope, msg_id = wrap_event(event, self._cloudevents_config)
|
||||
|
||||
subject = subject_for_event(event)
|
||||
|
||||
|
|
@ -247,14 +242,64 @@ class Supervisor:
|
|||
logger.info("Swept old published IDs", extra={"count": swept})
|
||||
|
||||
async def _start_adapter(self, config: AdapterConfig) -> None:
|
||||
"""Start an adapter based on its configuration."""
|
||||
if config.name in self._adapter_states:
|
||||
logger.warning(
|
||||
"Adapter already running",
|
||||
extra={"adapter": config.name},
|
||||
"""Start an adapter based on its configuration.
|
||||
|
||||
If the adapter was previously stopped (state exists but task is not running),
|
||||
reuses the existing state to preserve last_completed_poll for rate limiting.
|
||||
"""
|
||||
existing_state = self._adapter_states.get(config.name)
|
||||
|
||||
if existing_state is not None:
|
||||
if existing_state.is_running:
|
||||
logger.warning(
|
||||
"Adapter already running",
|
||||
extra={"adapter": config.name},
|
||||
)
|
||||
return
|
||||
|
||||
# Adapter was stopped - restart with preserved state
|
||||
# Update config and restart the adapter
|
||||
existing_state.config = config
|
||||
existing_state.cancel_event.clear()
|
||||
|
||||
# Reinitialize the adapter with new config
|
||||
nws_config = self._adapter_config_to_nws_config(config)
|
||||
existing_state.adapter = NWSAdapter(
|
||||
config=nws_config,
|
||||
cursor_db_path=CURSOR_DB_PATH,
|
||||
)
|
||||
await existing_state.adapter.startup()
|
||||
|
||||
# Start the loop task
|
||||
existing_state.task = asyncio.create_task(
|
||||
self._run_adapter_loop(existing_state)
|
||||
)
|
||||
|
||||
# Calculate next poll time for logging
|
||||
if existing_state.last_completed_poll:
|
||||
next_poll_at = datetime.fromtimestamp(
|
||||
existing_state.last_completed_poll.timestamp() + config.cadence_s,
|
||||
tz=timezone.utc,
|
||||
)
|
||||
if next_poll_at <= datetime.now(timezone.utc):
|
||||
next_poll_at = datetime.now(timezone.utc)
|
||||
else:
|
||||
next_poll_at = datetime.now(timezone.utc)
|
||||
|
||||
logger.info(
|
||||
"Adapter restarted",
|
||||
extra={
|
||||
"adapter": config.name,
|
||||
"cadence_s": config.cadence_s,
|
||||
"preserved_last_poll": existing_state.last_completed_poll.isoformat()
|
||||
if existing_state.last_completed_poll
|
||||
else None,
|
||||
"next_poll": next_poll_at.isoformat(),
|
||||
},
|
||||
)
|
||||
return
|
||||
|
||||
# New adapter - create fresh state
|
||||
if config.name == "nws":
|
||||
nws_config = self._adapter_config_to_nws_config(config)
|
||||
adapter = NWSAdapter(
|
||||
|
|
@ -285,11 +330,20 @@ class Supervisor:
|
|||
)
|
||||
|
||||
async def _stop_adapter(self, name: str) -> None:
|
||||
"""Stop a running adapter."""
|
||||
state = self._adapter_states.pop(name, None)
|
||||
"""Stop a running adapter but preserve state for potential restart.
|
||||
|
||||
The adapter state (including last_completed_poll) is preserved so that
|
||||
if the adapter is re-enabled, the rate-limit guarantee is maintained.
|
||||
Use _remove_adapter() to fully remove an adapter from tracking.
|
||||
"""
|
||||
state = self._adapter_states.get(name)
|
||||
if state is None:
|
||||
return
|
||||
|
||||
if not state.is_running:
|
||||
# Already stopped
|
||||
return
|
||||
|
||||
# Signal the loop to stop
|
||||
state.cancel_event.set()
|
||||
|
||||
|
|
@ -299,9 +353,44 @@ class Supervisor:
|
|||
await state.task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
state.task = None
|
||||
|
||||
await state.adapter.shutdown()
|
||||
logger.info("Adapter stopped", extra={"adapter": name})
|
||||
logger.info(
|
||||
"Adapter stopped",
|
||||
extra={
|
||||
"adapter": name,
|
||||
"preserved_last_poll": state.last_completed_poll.isoformat()
|
||||
if state.last_completed_poll
|
||||
else None,
|
||||
},
|
||||
)
|
||||
|
||||
async def _remove_adapter(self, name: str) -> None:
|
||||
"""Fully remove an adapter, dropping all preserved state.
|
||||
|
||||
Called when an adapter is deleted from the database (not just disabled).
|
||||
"""
|
||||
state = self._adapter_states.pop(name, None)
|
||||
if state is None:
|
||||
return
|
||||
|
||||
# Stop if running
|
||||
if state.is_running:
|
||||
state.cancel_event.set()
|
||||
if state.task:
|
||||
state.task.cancel()
|
||||
try:
|
||||
await state.task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
await state.adapter.shutdown()
|
||||
|
||||
logger.info(
|
||||
"Adapter removed",
|
||||
extra={"adapter": name},
|
||||
)
|
||||
|
||||
async def _reschedule_adapter(
|
||||
self,
|
||||
|
|
@ -319,6 +408,11 @@ class Supervisor:
|
|||
await self._start_adapter(new_config)
|
||||
return
|
||||
|
||||
if not state.is_running:
|
||||
# Adapter stopped - restart it
|
||||
await self._start_adapter(new_config)
|
||||
return
|
||||
|
||||
old_cadence = state.config.cadence_s
|
||||
new_cadence = new_config.cadence_s
|
||||
|
||||
|
|
@ -375,18 +469,18 @@ class Supervisor:
|
|||
current_state = self._adapter_states.get(adapter_name)
|
||||
|
||||
if new_config is None:
|
||||
# Adapter was deleted
|
||||
# Adapter was deleted - fully remove, don't just stop
|
||||
if current_state:
|
||||
await self._stop_adapter(adapter_name)
|
||||
await self._remove_adapter(adapter_name)
|
||||
logger.info(
|
||||
"Adapter deleted, stopped",
|
||||
"Adapter deleted, removed",
|
||||
extra={"adapter": adapter_name},
|
||||
)
|
||||
return
|
||||
|
||||
if not new_config.enabled or new_config.is_paused:
|
||||
# Adapter disabled or paused
|
||||
if current_state:
|
||||
# Adapter disabled or paused - stop but preserve state
|
||||
if current_state and current_state.is_running:
|
||||
await self._stop_adapter(adapter_name)
|
||||
logger.info(
|
||||
"Adapter disabled/paused, stopped",
|
||||
|
|
@ -398,8 +492,8 @@ class Supervisor:
|
|||
)
|
||||
return
|
||||
|
||||
if current_state is None:
|
||||
# Adapter was enabled or created
|
||||
if current_state is None or not current_state.is_running:
|
||||
# Adapter was enabled or created - start (will reuse state if exists)
|
||||
await self._start_adapter(new_config)
|
||||
logger.info(
|
||||
"Adapter enabled, started",
|
||||
|
|
@ -468,9 +562,9 @@ class Supervisor:
|
|||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Stop all adapters
|
||||
# Remove all adapters (full cleanup)
|
||||
for name in list(self._adapter_states.keys()):
|
||||
await self._stop_adapter(name)
|
||||
await self._remove_adapter(name)
|
||||
|
||||
# Close config source
|
||||
await self._config_source.close()
|
||||
|
|
@ -497,21 +591,26 @@ async def async_main() -> None:
|
|||
toml_path=settings.config_toml_path,
|
||||
)
|
||||
|
||||
# Load CloudEvents config for envelope generation
|
||||
# For now, load from TOML regardless of config source
|
||||
# (CloudEvents config is not adapter-specific)
|
||||
try:
|
||||
toml_config = load_config(str(settings.config_toml_path))
|
||||
cloudevents_config = toml_config
|
||||
except Exception:
|
||||
# If TOML doesn't exist and using DB source, create minimal config
|
||||
cloudevents_config = None
|
||||
# CloudEvents config: try TOML first, fall back to code defaults
|
||||
# (CloudEvents envelope format is protocol-level, not operator-configurable)
|
||||
cloudevents_config = None
|
||||
if settings.config_source == "toml":
|
||||
try:
|
||||
toml_config = load_config(str(settings.config_toml_path))
|
||||
cloudevents_config = toml_config
|
||||
except Exception:
|
||||
pass # Will use defaults from cloudevents_constants
|
||||
|
||||
supervisor = Supervisor(
|
||||
config_source=config_source,
|
||||
nats_url=settings.nats_url,
|
||||
cloudevents_config=cloudevents_config,
|
||||
)
|
||||
logger.info(
|
||||
"CloudEvents config: %s",
|
||||
"TOML" if cloudevents_config else "defaults",
|
||||
extra={"cloudevents_source": "toml" if cloudevents_config else "defaults"},
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
shutdown_event = asyncio.Event()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue