mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
Merge pull request #66 from zvx-echo6/feat/tomtom-incidents-per-bbox-cadence
feat(tomtom_incidents): per-bbox cadence (v0.9.5.1)
This commit is contained in:
commit
d241bfea26
3 changed files with 106 additions and 2 deletions
|
|
@ -1533,6 +1533,12 @@ coverage. One event per incident.
|
||||||
larger). Ships with Treasure Valley (Boise). **Cadence 1800s (30 min)** ->
|
larger). Ships with Treasure Valley (Boise). **Cadence 1800s (30 min)** ->
|
||||||
1 bbox = 1,440 calls/mo, 58% of the 2,500/mo free-tier cap. Adding bboxes must
|
1 bbox = 1,440 calls/mo, 58% of the 2,500/mo free-tier cap. Adding bboxes must
|
||||||
respect `N * (43200/cadence_min) <= 2500`.
|
respect `N * (43200/cadence_min) <= 2500`.
|
||||||
|
- **Per-bbox cadence:** each bbox may set an optional `cadence_s` (else falls back
|
||||||
|
to the adapter's `default_cadence_s`). The supervisor wakes the adapter at the
|
||||||
|
adapter-level cadence; `poll()` fetches only bboxes whose per-bbox interval has
|
||||||
|
elapsed (in-memory `_last_polled`, per process; first poll after restart fetches
|
||||||
|
all). Set the adapter cadence to the GCD of the per-bbox cadences for exact
|
||||||
|
intervals -- extra wakeups cost zero API calls.
|
||||||
- **Dedup key shape:** `<state_code>:tomtom:<tomtom_id>` (e.g.
|
- **Dedup key shape:** `<state_code>:tomtom:<tomtom_id>` (e.g.
|
||||||
`ID:tomtom:TTI-5df75143-...`); the upstream id is stable across polls.
|
`ID:tomtom:TTI-5df75143-...`); the upstream id is stable across polls.
|
||||||
- **Severity:** from `magnitudeOfDelay` (0->1, 1->1, 2->2, 3->3, 4->4; 4 ==
|
- **Severity:** from `magnitudeOfDelay` (0->1, 1->1, 2->2, 3->3, 4->4; 4 ==
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,7 @@ class BBox(BaseModel):
|
||||||
max_lon: float
|
max_lon: float
|
||||||
max_lat: float
|
max_lat: float
|
||||||
state_code: str
|
state_code: str
|
||||||
|
cadence_s: int | None = None # per-bbox poll interval; None -> adapter default_cadence_s
|
||||||
|
|
||||||
|
|
||||||
class TomTomIncidentsSettings(BaseModel):
|
class TomTomIncidentsSettings(BaseModel):
|
||||||
|
|
@ -122,6 +123,7 @@ class TomTomIncidentsAdapter(SourceAdapter):
|
||||||
self._bboxes: list[BBox] = self._read_bboxes(config)
|
self._bboxes: list[BBox] = self._read_bboxes(config)
|
||||||
self._api_key_alias: str = config.settings.get("api_key_alias", "tomtom")
|
self._api_key_alias: str = config.settings.get("api_key_alias", "tomtom")
|
||||||
self._api_key: str | None = None
|
self._api_key: str | None = None
|
||||||
|
self._last_polled: dict[str, datetime] = {} # bbox name -> last successful fetch (in-memory)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _read_bboxes(config: AdapterConfig) -> list[BBox]:
|
def _read_bboxes(config: AdapterConfig) -> list[BBox]:
|
||||||
|
|
@ -130,6 +132,14 @@ class TomTomIncidentsAdapter(SourceAdapter):
|
||||||
def _redact(self, text: str) -> str:
|
def _redact(self, text: str) -> str:
|
||||||
return text.replace(self._api_key, "<KEY>") if self._api_key else text
|
return text.replace(self._api_key, "<KEY>") if self._api_key else text
|
||||||
|
|
||||||
|
def _bbox_due(self, bbox: "BBox", now: datetime) -> bool:
|
||||||
|
"""True if this bbox is due to poll (never polled this process, or its
|
||||||
|
per-bbox cadence_s -- falling back to default_cadence_s -- has elapsed)."""
|
||||||
|
last = self._last_polled.get(bbox.name)
|
||||||
|
if last is None:
|
||||||
|
return True
|
||||||
|
return (now - last).total_seconds() >= (bbox.cadence_s or self.default_cadence_s)
|
||||||
|
|
||||||
async def startup(self) -> None:
|
async def startup(self) -> None:
|
||||||
self._session = aiohttp.ClientSession(
|
self._session = aiohttp.ClientSession(
|
||||||
timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S),
|
timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S),
|
||||||
|
|
@ -226,6 +236,8 @@ class TomTomIncidentsAdapter(SourceAdapter):
|
||||||
extra={"alias": self._api_key_alias})
|
extra={"alias": self._api_key_alias})
|
||||||
return
|
return
|
||||||
sem = asyncio.Semaphore(_FETCH_CONCURRENCY)
|
sem = asyncio.Semaphore(_FETCH_CONCURRENCY)
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
due = [b for b in self._bboxes if self._bbox_due(b, now)]
|
||||||
|
|
||||||
async def _one(bbox: BBox) -> list[Event]:
|
async def _one(bbox: BBox) -> list[Event]:
|
||||||
async with sem:
|
async with sem:
|
||||||
|
|
@ -235,6 +247,7 @@ class TomTomIncidentsAdapter(SourceAdapter):
|
||||||
logger.warning("tomtom_incidents bbox fetch failed",
|
logger.warning("tomtom_incidents bbox fetch failed",
|
||||||
extra={"bbox": bbox.name, "error": self._redact(str(exc))})
|
extra={"bbox": bbox.name, "error": self._redact(str(exc))})
|
||||||
return []
|
return []
|
||||||
|
self._last_polled[bbox.name] = now # only after a successful fetch
|
||||||
out: list[Event] = []
|
out: list[Event] = []
|
||||||
for inc in incidents:
|
for inc in incidents:
|
||||||
try:
|
try:
|
||||||
|
|
@ -246,7 +259,7 @@ class TomTomIncidentsAdapter(SourceAdapter):
|
||||||
out.append(ev)
|
out.append(ev)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
results = await asyncio.gather(*[_one(b) for b in self._bboxes])
|
results = await asyncio.gather(*[_one(b) for b in due])
|
||||||
yielded = 0
|
yielded = 0
|
||||||
for evs in results:
|
for evs in results:
|
||||||
for ev in evs:
|
for ev in evs:
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ mixin); polling is stateless.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import AsyncMock, MagicMock
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
|
@ -138,3 +138,88 @@ def test_inherits_dedup_mixin():
|
||||||
for m in ("is_published", "mark_published", "sweep_old_ids"):
|
for m in ("is_published", "mark_published", "sweep_old_ids"):
|
||||||
assert m not in TomTomIncidentsAdapter.__dict__, f"redefines {m}"
|
assert m not in TomTomIncidentsAdapter.__dict__, f"redefines {m}"
|
||||||
assert getattr(TomTomIncidentsAdapter, m) is getattr(SourceAdapter, m)
|
assert getattr(TomTomIncidentsAdapter, m) is getattr(SourceAdapter, m)
|
||||||
|
|
||||||
|
|
||||||
|
# --- v0.9.5.1 per-bbox cadence -----------------------------------------------
|
||||||
|
|
||||||
|
def _b(name, cadence_s=None):
|
||||||
|
return BBox(name=name, min_lon=0, min_lat=0, max_lon=1, max_lat=1,
|
||||||
|
state_code="ID", cadence_s=cadence_s)
|
||||||
|
|
||||||
|
|
||||||
|
def _cadence_adapter(tmp_path, bboxes):
|
||||||
|
cfg = AdapterConfig(
|
||||||
|
name="tomtom_incidents", enabled=True, cadence_s=1800,
|
||||||
|
settings={"api_key_alias": "tomtom", "bboxes": [b.model_dump() for b in bboxes]},
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
cs = MagicMock()
|
||||||
|
cs.get_api_key = AsyncMock(return_value="testkey")
|
||||||
|
return TomTomIncidentsAdapter(cfg, cs, tmp_path / "cursors.db")
|
||||||
|
|
||||||
|
|
||||||
|
def test_bbox_cadence_field_defaults_none():
|
||||||
|
assert _b("x").cadence_s is None
|
||||||
|
assert _b("y", 3600).cadence_s == 3600
|
||||||
|
|
||||||
|
|
||||||
|
def test_bbox_due_first_poll(adapter):
|
||||||
|
assert adapter._bbox_due(_b("never", 3600), datetime.now(timezone.utc)) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_bbox_due_respects_per_bbox_cadence(adapter):
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
b60 = _b("b60", 3600)
|
||||||
|
adapter._last_polled["b60"] = now - timedelta(minutes=31)
|
||||||
|
assert adapter._bbox_due(b60, now) is False # 31 < 60
|
||||||
|
adapter._last_polled["b60"] = now - timedelta(minutes=60)
|
||||||
|
assert adapter._bbox_due(b60, now) is True # 60 >= 60 boundary
|
||||||
|
|
||||||
|
|
||||||
|
def test_bbox_due_fallback_to_default(adapter):
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
bd = _b("bd") # cadence_s None -> default_cadence_s == 1800 (30 min)
|
||||||
|
adapter._last_polled["bd"] = now - timedelta(minutes=29)
|
||||||
|
assert adapter._bbox_due(bd, now) is False
|
||||||
|
adapter._last_polled["bd"] = now - timedelta(minutes=31)
|
||||||
|
assert adapter._bbox_due(bd, now) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_poll_only_fetches_due_bboxes(tmp_path):
|
||||||
|
# Matt's 60/90/60: seed all polled 70 min ago -> 60-min due, 90-min not.
|
||||||
|
bboxes = [_b("treasure_valley_ext", 3600), _b("mountain_home_corridor", 5400),
|
||||||
|
_b("magic_valley_burley", 3600)]
|
||||||
|
a = _cadence_adapter(tmp_path, bboxes)
|
||||||
|
await a.startup()
|
||||||
|
seed = datetime.now(timezone.utc) - timedelta(minutes=70)
|
||||||
|
for b in bboxes:
|
||||||
|
a._last_polled[b.name] = seed
|
||||||
|
fetched = []
|
||||||
|
|
||||||
|
async def fake_fetch(bbox):
|
||||||
|
fetched.append(bbox.name)
|
||||||
|
return []
|
||||||
|
|
||||||
|
a._fetch_bbox = fake_fetch
|
||||||
|
[e async for e in a.poll()]
|
||||||
|
await a.shutdown()
|
||||||
|
assert set(fetched) == {"treasure_valley_ext", "magic_valley_burley"} # 60-min due
|
||||||
|
assert "mountain_home_corridor" not in fetched # 90-min not due
|
||||||
|
assert a._last_polled["treasure_valley_ext"] > seed # advanced
|
||||||
|
assert a._last_polled["mountain_home_corridor"] == seed # untouched
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_fetch_does_not_update_last_polled(tmp_path):
|
||||||
|
import aiohttp
|
||||||
|
a = _cadence_adapter(tmp_path, [_b("bf", 3600)])
|
||||||
|
await a.startup()
|
||||||
|
|
||||||
|
async def boom(bbox):
|
||||||
|
raise aiohttp.ClientError("upstream down")
|
||||||
|
|
||||||
|
a._fetch_bbox = boom
|
||||||
|
[e async for e in a.poll()]
|
||||||
|
await a.shutdown()
|
||||||
|
assert "bf" not in a._last_polled # failed fetch -> not recorded -> still due next cycle
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue