fix: move cancel_event signal outside lock for immediate delivery

The cancel_event.set() call was inside the async lock context in
_on_config_change, causing delayed signal delivery to the sleeping
loop. This manifested as cadence decreases not applying without a
restart - the loop would sleep its full original timeout before
seeing the new cadence.

Fix: _reschedule_adapter now returns the AdapterState to signal,
and _on_config_change signals AFTER releasing the lock. This ensures
immediate event delivery per asyncio semantics.

The lock protects state consistency during config fetches and updates.
The cancel_event is a one-way notification that does not need lock
protection - it simply wakes the sleeping coroutine.

See docs/BUG-CADENCE-DECREASE.md for full investigation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-16 05:59:45 +00:00
commit 4215744a30

View file

@ -396,22 +396,26 @@ class Supervisor:
self, self,
name: str, name: str,
new_config: AdapterConfig, new_config: AdapterConfig,
) -> None: ) -> AdapterState | None:
"""Reschedule an adapter with new configuration. """Reschedule an adapter with new configuration.
Maintains rate-limit guarantee: next poll at Maintains rate-limit guarantee: next poll at
(last_completed_poll + new_cadence_s), not now + new_cadence_s. (last_completed_poll + new_cadence_s), not now + new_cadence_s.
Returns the AdapterState to signal, or None if no signal needed.
The caller must signal cancel_event AFTER releasing any locks to
ensure immediate event delivery to the sleeping loop.
""" """
state = self._adapter_states.get(name) state = self._adapter_states.get(name)
if state is None: if state is None:
# Adapter not running - just start it # Adapter not running - just start it
await self._start_adapter(new_config) await self._start_adapter(new_config)
return return None
if not state.is_running: if not state.is_running:
# Adapter stopped - restart it # Adapter stopped - restart it
await self._start_adapter(new_config) await self._start_adapter(new_config)
return return None
old_cadence = state.config.cadence_s old_cadence = state.config.cadence_s
new_cadence = new_config.cadence_s new_cadence = new_config.cadence_s
@ -446,8 +450,9 @@ class Supervisor:
}, },
) )
# Signal the loop to re-evaluate its schedule # Return state so caller can signal OUTSIDE any locks.
state.cancel_event.set() # This ensures immediate event delivery to the sleeping loop.
return state
async def _on_config_change(self, table: str, key: str) -> None: async def _on_config_change(self, table: str, key: str) -> None:
"""Handle a configuration change notification. """Handle a configuration change notification.
@ -463,6 +468,9 @@ class Supervisor:
extra={"table": table, "key": key}, extra={"table": table, "key": key},
) )
# Track state that needs signaling after lock release
state_to_signal: AdapterState | None = None
async with self._lock: async with self._lock:
# Fetch the current config for this adapter # Fetch the current config for this adapter
new_config = await self._config_source.get_adapter(adapter_name) new_config = await self._config_source.get_adapter(adapter_name)
@ -501,7 +509,13 @@ class Supervisor:
) )
else: else:
# Adapter config changed (cadence, settings) # Adapter config changed (cadence, settings)
await self._reschedule_adapter(adapter_name, new_config) state_to_signal = await self._reschedule_adapter(adapter_name, new_config)
# Signal OUTSIDE the lock to ensure immediate event delivery.
# This fixes cadence-decrease hot-reload where the signal was
# delayed by asyncio task scheduling while holding the lock.
if state_to_signal is not None:
state_to_signal.cancel_event.set()
async def _heartbeat_loop(self) -> None: async def _heartbeat_loop(self) -> None:
"""Publish periodic heartbeats.""" """Publish periodic heartbeats."""