From 9d4ba975378d80236ad43e0ce85e8acab93f2bc5 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sat, 16 May 2026 17:17:11 +0000 Subject: [PATCH] refactor(nws): remove internal AsyncLimiter rate limiting The NWSAdapter had an internal AsyncLimiter that duplicated the supervisor's rate-limit guarantee. When cadence changed, only state.adapter.cadence_s was updated, not the internal limiter, causing the cadence-decrease bug. Since the supervisor already enforces rate limiting via last_completed_poll + cadence_s scheduling, the adapter-level limiter was redundant and caused the 30-second blocking observed in diagnostic logs. Removes: - aiolimiter import - self.cadence_s attribute (unused elsewhere) - self._limiter creation - async with self._limiter context in _fetch_alerts Co-Authored-By: Claude Opus 4.5 --- src/central/adapters/nws.py | 54 +++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index bb77c4f..c8d3c6a 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -10,7 +10,6 @@ from pathlib import Path from typing import Any import aiohttp -from aiolimiter import AsyncLimiter from tenacity import ( retry, stop_after_attempt, @@ -199,11 +198,9 @@ class NWSAdapter(SourceAdapter): cursor_db_path: Path, ) -> None: self.config = config - self.cadence_s = config.cadence_s self.states = set(s.upper() for s in config.states) self.cursor_db_path = cursor_db_path self._session: aiohttp.ClientSession | None = None - self._limiter = AsyncLimiter(1, config.cadence_s) self._db: sqlite3.Connection | None = None async def startup(self) -> None: @@ -329,38 +326,37 @@ class NWSAdapter(SourceAdapter): ) async def _fetch_alerts(self) -> tuple[int, dict[str, Any] | None, str | None]: """Fetch alerts from NWS API with conditional request.""" - async with self._limiter: - if not self._session: - raise RuntimeError("Session not initialized") + if not self._session: + raise RuntimeError("Session not initialized") - headers: dict[str, str] = {} - cursor = self._get_cursor() - if cursor: - headers["If-Modified-Since"] = cursor + headers: dict[str, str] = {} + cursor = self._get_cursor() + if cursor: + headers["If-Modified-Since"] = cursor - async with self._session.get(NWS_API_URL, headers=headers) as resp: - if resp.status in (429, 403): - retry_after = resp.headers.get("Retry-After", "60") - try: - wait_time = int(retry_after) - except ValueError: - wait_time = 60 - logger.warning( - "Rate limited by NWS", - extra={"status": resp.status, "retry_after": wait_time} - ) - await asyncio.sleep(wait_time) - raise aiohttp.ClientError(f"Rate limited: {resp.status}") + async with self._session.get(NWS_API_URL, headers=headers) as resp: + if resp.status in (429, 403): + retry_after = resp.headers.get("Retry-After", "60") + try: + wait_time = int(retry_after) + except ValueError: + wait_time = 60 + logger.warning( + "Rate limited by NWS", + extra={"status": resp.status, "retry_after": wait_time} + ) + await asyncio.sleep(wait_time) + raise aiohttp.ClientError(f"Rate limited: {resp.status}") - if resp.status == 304: - return (304, None, None) + if resp.status == 304: + return (304, None, None) - resp.raise_for_status() + resp.raise_for_status() - data = await resp.json() - last_modified = resp.headers.get("Last-Modified") + data = await resp.json() + last_modified = resp.headers.get("Last-Modified") - return (resp.status, data, last_modified) + return (resp.status, data, last_modified) def _normalize_feature(self, feature: dict[str, Any]) -> Event | None: """Normalize a GeoJSON feature to an Event."""