diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index f68fe7c..eaea094 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -1533,6 +1533,12 @@ coverage. One event per incident. larger). Ships with Treasure Valley (Boise). **Cadence 1800s (30 min)** -> 1 bbox = 1,440 calls/mo, 58% of the 2,500/mo free-tier cap. Adding bboxes must respect `N * (43200/cadence_min) <= 2500`. +- **Per-bbox cadence:** each bbox may set an optional `cadence_s` (else falls back + to the adapter's `default_cadence_s`). The supervisor wakes the adapter at the + adapter-level cadence; `poll()` fetches only bboxes whose per-bbox interval has + elapsed (in-memory `_last_polled`, per process; first poll after restart fetches + all). Set the adapter cadence to the GCD of the per-bbox cadences for exact + intervals -- extra wakeups cost zero API calls. - **Dedup key shape:** `:tomtom:` (e.g. `ID:tomtom:TTI-5df75143-...`); the upstream id is stable across polls. - **Severity:** from `magnitudeOfDelay` (0->1, 1->1, 2->2, 3->3, 4->4; 4 == diff --git a/src/central/adapters/tomtom_incidents.py b/src/central/adapters/tomtom_incidents.py index 809599e..3fab465 100644 --- a/src/central/adapters/tomtom_incidents.py +++ b/src/central/adapters/tomtom_incidents.py @@ -83,6 +83,7 @@ class BBox(BaseModel): max_lon: float max_lat: float state_code: str + cadence_s: int | None = None # per-bbox poll interval; None -> adapter default_cadence_s class TomTomIncidentsSettings(BaseModel): @@ -122,6 +123,7 @@ class TomTomIncidentsAdapter(SourceAdapter): self._bboxes: list[BBox] = self._read_bboxes(config) self._api_key_alias: str = config.settings.get("api_key_alias", "tomtom") self._api_key: str | None = None + self._last_polled: dict[str, datetime] = {} # bbox name -> last successful fetch (in-memory) @staticmethod def _read_bboxes(config: AdapterConfig) -> list[BBox]: @@ -130,6 +132,14 @@ class TomTomIncidentsAdapter(SourceAdapter): def _redact(self, text: str) -> str: return text.replace(self._api_key, "") if self._api_key else text + def _bbox_due(self, bbox: "BBox", now: datetime) -> bool: + """True if this bbox is due to poll (never polled this process, or its + per-bbox cadence_s -- falling back to default_cadence_s -- has elapsed).""" + last = self._last_polled.get(bbox.name) + if last is None: + return True + return (now - last).total_seconds() >= (bbox.cadence_s or self.default_cadence_s) + async def startup(self) -> None: self._session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S), @@ -226,6 +236,8 @@ class TomTomIncidentsAdapter(SourceAdapter): extra={"alias": self._api_key_alias}) return sem = asyncio.Semaphore(_FETCH_CONCURRENCY) + now = datetime.now(timezone.utc) + due = [b for b in self._bboxes if self._bbox_due(b, now)] async def _one(bbox: BBox) -> list[Event]: async with sem: @@ -235,6 +247,7 @@ class TomTomIncidentsAdapter(SourceAdapter): logger.warning("tomtom_incidents bbox fetch failed", extra={"bbox": bbox.name, "error": self._redact(str(exc))}) return [] + self._last_polled[bbox.name] = now # only after a successful fetch out: list[Event] = [] for inc in incidents: try: @@ -246,7 +259,7 @@ class TomTomIncidentsAdapter(SourceAdapter): out.append(ev) return out - results = await asyncio.gather(*[_one(b) for b in self._bboxes]) + results = await asyncio.gather(*[_one(b) for b in due]) yielded = 0 for evs in results: for ev in evs: diff --git a/tests/test_tomtom_incidents.py b/tests/test_tomtom_incidents.py index cdbbc59..51dca03 100644 --- a/tests/test_tomtom_incidents.py +++ b/tests/test_tomtom_incidents.py @@ -9,7 +9,7 @@ mixin); polling is stateless. """ import json -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import AsyncMock, MagicMock @@ -138,3 +138,88 @@ def test_inherits_dedup_mixin(): for m in ("is_published", "mark_published", "sweep_old_ids"): assert m not in TomTomIncidentsAdapter.__dict__, f"redefines {m}" assert getattr(TomTomIncidentsAdapter, m) is getattr(SourceAdapter, m) + + +# --- v0.9.5.1 per-bbox cadence ----------------------------------------------- + +def _b(name, cadence_s=None): + return BBox(name=name, min_lon=0, min_lat=0, max_lon=1, max_lat=1, + state_code="ID", cadence_s=cadence_s) + + +def _cadence_adapter(tmp_path, bboxes): + cfg = AdapterConfig( + name="tomtom_incidents", enabled=True, cadence_s=1800, + settings={"api_key_alias": "tomtom", "bboxes": [b.model_dump() for b in bboxes]}, + updated_at=datetime.now(timezone.utc), + ) + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value="testkey") + return TomTomIncidentsAdapter(cfg, cs, tmp_path / "cursors.db") + + +def test_bbox_cadence_field_defaults_none(): + assert _b("x").cadence_s is None + assert _b("y", 3600).cadence_s == 3600 + + +def test_bbox_due_first_poll(adapter): + assert adapter._bbox_due(_b("never", 3600), datetime.now(timezone.utc)) is True + + +def test_bbox_due_respects_per_bbox_cadence(adapter): + now = datetime.now(timezone.utc) + b60 = _b("b60", 3600) + adapter._last_polled["b60"] = now - timedelta(minutes=31) + assert adapter._bbox_due(b60, now) is False # 31 < 60 + adapter._last_polled["b60"] = now - timedelta(minutes=60) + assert adapter._bbox_due(b60, now) is True # 60 >= 60 boundary + + +def test_bbox_due_fallback_to_default(adapter): + now = datetime.now(timezone.utc) + bd = _b("bd") # cadence_s None -> default_cadence_s == 1800 (30 min) + adapter._last_polled["bd"] = now - timedelta(minutes=29) + assert adapter._bbox_due(bd, now) is False + adapter._last_polled["bd"] = now - timedelta(minutes=31) + assert adapter._bbox_due(bd, now) is True + + +@pytest.mark.asyncio +async def test_poll_only_fetches_due_bboxes(tmp_path): + # Matt's 60/90/60: seed all polled 70 min ago -> 60-min due, 90-min not. + bboxes = [_b("treasure_valley_ext", 3600), _b("mountain_home_corridor", 5400), + _b("magic_valley_burley", 3600)] + a = _cadence_adapter(tmp_path, bboxes) + await a.startup() + seed = datetime.now(timezone.utc) - timedelta(minutes=70) + for b in bboxes: + a._last_polled[b.name] = seed + fetched = [] + + async def fake_fetch(bbox): + fetched.append(bbox.name) + return [] + + a._fetch_bbox = fake_fetch + [e async for e in a.poll()] + await a.shutdown() + assert set(fetched) == {"treasure_valley_ext", "magic_valley_burley"} # 60-min due + assert "mountain_home_corridor" not in fetched # 90-min not due + assert a._last_polled["treasure_valley_ext"] > seed # advanced + assert a._last_polled["mountain_home_corridor"] == seed # untouched + + +@pytest.mark.asyncio +async def test_failed_fetch_does_not_update_last_polled(tmp_path): + import aiohttp + a = _cadence_adapter(tmp_path, [_b("bf", 3600)]) + await a.startup() + + async def boom(bbox): + raise aiohttp.ClientError("upstream down") + + a._fetch_bbox = boom + [e async for e in a.poll()] + await a.shutdown() + assert "bf" not in a._last_polled # failed fetch -> not recorded -> still due next cycle