refactor(nws): remove internal AsyncLimiter rate limiting

The NWSAdapter had an internal AsyncLimiter that duplicated the
supervisor's rate-limit guarantee. When cadence changed, only
state.adapter.cadence_s was updated, not the internal limiter,
causing the cadence-decrease bug.

Since the supervisor already enforces rate limiting via
last_completed_poll + cadence_s scheduling, the adapter-level
limiter was redundant and caused the 30-second blocking observed
in diagnostic logs.

Removes:
- aiolimiter import
- self.cadence_s attribute (unused elsewhere)
- self._limiter creation
- async with self._limiter context in _fetch_alerts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-16 17:17:11 +00:00
commit 9d4ba97537

View file

@ -10,7 +10,6 @@ from pathlib import Path
from typing import Any from typing import Any
import aiohttp import aiohttp
from aiolimiter import AsyncLimiter
from tenacity import ( from tenacity import (
retry, retry,
stop_after_attempt, stop_after_attempt,
@ -199,11 +198,9 @@ class NWSAdapter(SourceAdapter):
cursor_db_path: Path, cursor_db_path: Path,
) -> None: ) -> None:
self.config = config self.config = config
self.cadence_s = config.cadence_s
self.states = set(s.upper() for s in config.states) self.states = set(s.upper() for s in config.states)
self.cursor_db_path = cursor_db_path self.cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None self._session: aiohttp.ClientSession | None = None
self._limiter = AsyncLimiter(1, config.cadence_s)
self._db: sqlite3.Connection | None = None self._db: sqlite3.Connection | None = None
async def startup(self) -> None: async def startup(self) -> None:
@ -329,38 +326,37 @@ class NWSAdapter(SourceAdapter):
) )
async def _fetch_alerts(self) -> tuple[int, dict[str, Any] | None, str | None]: async def _fetch_alerts(self) -> tuple[int, dict[str, Any] | None, str | None]:
"""Fetch alerts from NWS API with conditional request.""" """Fetch alerts from NWS API with conditional request."""
async with self._limiter: if not self._session:
if not self._session: raise RuntimeError("Session not initialized")
raise RuntimeError("Session not initialized")
headers: dict[str, str] = {} headers: dict[str, str] = {}
cursor = self._get_cursor() cursor = self._get_cursor()
if cursor: if cursor:
headers["If-Modified-Since"] = cursor headers["If-Modified-Since"] = cursor
async with self._session.get(NWS_API_URL, headers=headers) as resp: async with self._session.get(NWS_API_URL, headers=headers) as resp:
if resp.status in (429, 403): if resp.status in (429, 403):
retry_after = resp.headers.get("Retry-After", "60") retry_after = resp.headers.get("Retry-After", "60")
try: try:
wait_time = int(retry_after) wait_time = int(retry_after)
except ValueError: except ValueError:
wait_time = 60 wait_time = 60
logger.warning( logger.warning(
"Rate limited by NWS", "Rate limited by NWS",
extra={"status": resp.status, "retry_after": wait_time} extra={"status": resp.status, "retry_after": wait_time}
) )
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
raise aiohttp.ClientError(f"Rate limited: {resp.status}") raise aiohttp.ClientError(f"Rate limited: {resp.status}")
if resp.status == 304: if resp.status == 304:
return (304, None, None) return (304, None, None)
resp.raise_for_status() resp.raise_for_status()
data = await resp.json() data = await resp.json()
last_modified = resp.headers.get("Last-Modified") last_modified = resp.headers.get("Last-Modified")
return (resp.status, data, last_modified) return (resp.status, data, last_modified)
def _normalize_feature(self, feature: dict[str, Any]) -> Event | None: def _normalize_feature(self, feature: dict[str, Any]) -> Event | None:
"""Normalize a GeoJSON feature to an Event.""" """Normalize a GeoJSON feature to an Event."""