From 4215744a301192fd91f16fec56eb6bd7b8968a6e Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sat, 16 May 2026 05:59:45 +0000 Subject: [PATCH] fix: move cancel_event signal outside lock for immediate delivery The cancel_event.set() call was inside the async lock context in _on_config_change, causing delayed signal delivery to the sleeping loop. This manifested as cadence decreases not applying without a restart - the loop would sleep its full original timeout before seeing the new cadence. Fix: _reschedule_adapter now returns the AdapterState to signal, and _on_config_change signals AFTER releasing the lock. This ensures immediate event delivery per asyncio semantics. The lock protects state consistency during config fetches and updates. The cancel_event is a one-way notification that does not need lock protection - it simply wakes the sleeping coroutine. See docs/BUG-CADENCE-DECREASE.md for full investigation. Co-Authored-By: Claude Opus 4.5 --- src/central/supervisor.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 06d7c72..bedd11f 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -396,22 +396,26 @@ class Supervisor: self, name: str, new_config: AdapterConfig, - ) -> None: + ) -> AdapterState | None: """Reschedule an adapter with new configuration. Maintains rate-limit guarantee: next poll at (last_completed_poll + new_cadence_s), not now + new_cadence_s. + + Returns the AdapterState to signal, or None if no signal needed. + The caller must signal cancel_event AFTER releasing any locks to + ensure immediate event delivery to the sleeping loop. """ state = self._adapter_states.get(name) if state is None: # Adapter not running - just start it await self._start_adapter(new_config) - return + return None if not state.is_running: # Adapter stopped - restart it await self._start_adapter(new_config) - return + return None old_cadence = state.config.cadence_s new_cadence = new_config.cadence_s @@ -446,8 +450,9 @@ class Supervisor: }, ) - # Signal the loop to re-evaluate its schedule - state.cancel_event.set() + # Return state so caller can signal OUTSIDE any locks. + # This ensures immediate event delivery to the sleeping loop. + return state async def _on_config_change(self, table: str, key: str) -> None: """Handle a configuration change notification. @@ -463,6 +468,9 @@ class Supervisor: extra={"table": table, "key": key}, ) + # Track state that needs signaling after lock release + state_to_signal: AdapterState | None = None + async with self._lock: # Fetch the current config for this adapter new_config = await self._config_source.get_adapter(adapter_name) @@ -501,7 +509,13 @@ class Supervisor: ) else: # Adapter config changed (cadence, settings) - await self._reschedule_adapter(adapter_name, new_config) + state_to_signal = await self._reschedule_adapter(adapter_name, new_config) + + # Signal OUTSIDE the lock to ensure immediate event delivery. + # This fixes cadence-decrease hot-reload where the signal was + # delayed by asyncio task scheduling while holding the lock. + if state_to_signal is not None: + state_to_signal.cancel_event.set() async def _heartbeat_loop(self) -> None: """Publish periodic heartbeats."""