mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
Merge pull request #65 from zvx-echo6/feat/tomtom-incidents
feat(tomtom_incidents): TomTom real-time traffic incidents adapter (v0.9.5)
This commit is contained in:
commit
0cb8f5a96b
9 changed files with 479 additions and 1 deletions
|
|
@ -1518,6 +1518,47 @@ road name, description, county, severity). Verified for Idaho only.
|
|||
- **Removal semantics:** none in v1. Events age out of the upstream feed; the
|
||||
14-day dedup sweep expires stale ids.
|
||||
|
||||
### tomtom_incidents — TomTom real-time traffic incidents (commercial coverage)
|
||||
|
||||
Real-time incidents (closures, jams, hazards, road work, accidents) from TomTom
|
||||
Orbis `incidentDetails`, polled per metro bbox. Complements wzdx (federal work
|
||||
zones) and state_511_atis (state-DOT reports) with commercial vehicle-telematics
|
||||
coverage. One event per incident.
|
||||
|
||||
- **Stream:** `CENTRAL_TRAFFIC` (event class). **event_type:** `incident` (from
|
||||
`category = "incident.tomtom_incidents"`); shares the type with state_511_atis.
|
||||
- **Subject pattern:** `central.traffic.incident.<state>` (e.g.
|
||||
`central.traffic.incident.id`); `<state>` is the per-bbox `state_code`.
|
||||
- **Coverage:** configured metro bboxes, **each <= 10,000 km^2** (TomTom rejects
|
||||
larger). Ships with Treasure Valley (Boise). **Cadence 1800s (30 min)** ->
|
||||
1 bbox = 1,440 calls/mo, 58% of the 2,500/mo free-tier cap. Adding bboxes must
|
||||
respect `N * (43200/cadence_min) <= 2500`.
|
||||
- **Dedup key shape:** `<state_code>:tomtom:<tomtom_id>` (e.g.
|
||||
`ID:tomtom:TTI-5df75143-...`); the upstream id is stable across polls.
|
||||
- **Severity:** from `magnitudeOfDelay` (0->1, 1->1, 2->2, 3->3, 4->4; 4 ==
|
||||
closure/blocking). Never None.
|
||||
- **Event.data fields:**
|
||||
|
||||
| key | type | nullable | description |
|
||||
|---|---|---|---|
|
||||
| `description` | str | yes | Event text, e.g. `Roadworks`, `Closed` |
|
||||
| `from` / `to` | str | yes | Affected segment endpoints |
|
||||
| `magnitude_of_delay` | int | yes | 0-4 (drives severity) |
|
||||
| `icon_category` | int | yes | TomTom icon enum (8=closed, 9=roadworks, 6=jam, ...) |
|
||||
| `length` / `delay` | float | yes | Affected length (m) / delay (s) |
|
||||
| `road_numbers` | list[str] | yes | Route numbers if known |
|
||||
| `start_time` / `end_time` | str (ISO 8601) | yes | Incident window; `end_time` also sets `Event.expires` |
|
||||
| `time_validity` | str | yes | e.g. `present` |
|
||||
| `state_code` / `bbox_name` | str | no | Routing + source bbox |
|
||||
| `latitude` / `longitude` | float | yes | First geometry vertex (enrichment input) |
|
||||
|
||||
The affected-road geometry (Point or LineString) rides on `geo.geometry` and
|
||||
renders as a polyline on the map (v0.9.3 framework).
|
||||
- **Decipherable as-is:** yes -- description + from/to + magnitude are user-ready;
|
||||
geocoder fills city/county.
|
||||
- **Removal semantics:** none in v1; incidents drop out of the feed when cleared,
|
||||
swept by the 14-day dedup window.
|
||||
|
||||
### tomtom_flow — TomTom Orbis vector flow tiles (per-segment speed, telemetry)
|
||||
|
||||
Per-road-segment traffic speed from TomTom Orbis **vector** flow tiles, polled for
|
||||
|
|
|
|||
17
sql/migrations/028_add_tomtom_incidents_adapter.sql
Normal file
17
sql/migrations/028_add_tomtom_incidents_adapter.sql
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
-- Migration: 028_add_tomtom_incidents_adapter
|
||||
-- Adds the tomtom_incidents adapter onto the EXISTING CENTRAL_TRAFFIC stream
|
||||
-- (central.traffic.incident.<state>). No new stream -> no central-archive restart.
|
||||
-- Reuses the existing "tomtom" api key. Ships disabled; operator enables via GUI.
|
||||
-- NOTE: TomTom incidentDetails rejects any bbox > 10,000 km^2, so coverage is
|
||||
-- per-metro bboxes (Treasure Valley here), NOT statewide. Expansion = more bbox
|
||||
-- rows, but mind the 2,500/mo free-tier cap: N_bboxes * (43200/cadence_min) <= 2500.
|
||||
-- Additive-only: idempotent via ON CONFLICT DO NOTHING.
|
||||
|
||||
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
|
||||
VALUES (
|
||||
'tomtom_incidents',
|
||||
false,
|
||||
1800,
|
||||
'{"api_key_alias": "tomtom", "bboxes": [{"name": "treasure_valley", "min_lon": -116.85, "min_lat": 43.30, "max_lon": -115.65, "max_lat": 44.10, "state_code": "ID"}]}'::jsonb
|
||||
)
|
||||
ON CONFLICT (name) DO NOTHING;
|
||||
262
src/central/adapters/tomtom_incidents.py
Normal file
262
src/central/adapters/tomtom_incidents.py
Normal file
|
|
@ -0,0 +1,262 @@
|
|||
"""TomTom Traffic Incidents adapter — commercial real-time incidents (event).
|
||||
|
||||
Polls the TomTom Orbis incidentDetails endpoint for configured bounding boxes
|
||||
(each must be <= 10,000 km^2 per the API limit), emitting one event per incident
|
||||
to CENTRAL_TRAFFIC (subject central.traffic.incident.{state}). Discrete events
|
||||
with start/end times -> data_class="event". The incident geometry (Point or
|
||||
LineString) is already GeoJSON lon/lat, shipped via geo.geometry (the v0.9.3
|
||||
framework) so the affected road renders as a polyline on the map.
|
||||
|
||||
Dedup is inherited from SourceAdapter; ids use the upstream-stable TomTom id.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sqlite3
|
||||
from collections.abc import AsyncIterator
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential_jitter,
|
||||
)
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.config_models import AdapterConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_INCIDENTS_URL = "https://api.tomtom.com/maps/orbis/traffic/incidentDetails"
|
||||
_FIELDS = ("{incidents{type,geometry{type,coordinates},properties{id,iconCategory,"
|
||||
"magnitudeOfDelay,events{description,code},startTime,endTime,from,to,"
|
||||
"length,delay,roadNumbers,timeValidity}}}")
|
||||
# TomTom magnitudeOfDelay (0 unknown, 1 minor, 2 moderate, 3 major, 4 undefined/
|
||||
# closure) -> severity; never None (v0.8.0 "real signal or 1" rule).
|
||||
_MAGNITUDE_SEVERITY = {0: 1, 1: 1, 2: 2, 3: 3, 4: 4}
|
||||
_FETCH_CONCURRENCY = 4
|
||||
_FETCH_TIMEOUT_S = 30
|
||||
|
||||
_DEDUP_DDL = (
|
||||
"CREATE TABLE IF NOT EXISTS published_ids ("
|
||||
"adapter TEXT NOT NULL, event_id TEXT NOT NULL, "
|
||||
"first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
|
||||
"last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
|
||||
"PRIMARY KEY (adapter, event_id))"
|
||||
)
|
||||
|
||||
|
||||
def _parse_iso(value: str | None) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def _first_vertex(geom: dict[str, Any] | None) -> tuple[float | None, float | None]:
|
||||
"""First (lat, lon) from a Point or LineString (coords are [lon, lat])."""
|
||||
coords = (geom or {}).get("coordinates")
|
||||
gtype = (geom or {}).get("type")
|
||||
try:
|
||||
if gtype == "Point":
|
||||
return (float(coords[1]), float(coords[0]))
|
||||
if gtype == "LineString":
|
||||
return (float(coords[0][1]), float(coords[0][0]))
|
||||
except (TypeError, IndexError, ValueError):
|
||||
pass
|
||||
return (None, None)
|
||||
|
||||
|
||||
class BBox(BaseModel):
|
||||
name: str
|
||||
min_lon: float
|
||||
min_lat: float
|
||||
max_lon: float
|
||||
max_lat: float
|
||||
state_code: str
|
||||
|
||||
|
||||
class TomTomIncidentsSettings(BaseModel):
|
||||
"""bboxes: metro boxes to poll (each <= 10,000 km^2). api_key_alias: config key."""
|
||||
|
||||
bboxes: list[BBox] = []
|
||||
api_key_alias: str = "tomtom"
|
||||
|
||||
|
||||
class TomTomIncidentsAdapter(SourceAdapter):
|
||||
"""TomTom Orbis incidentDetails adapter (per-bbox real-time incidents)."""
|
||||
|
||||
name = "tomtom_incidents"
|
||||
display_name = "TomTom Traffic Incidents"
|
||||
description = (
|
||||
"Real-time traffic incidents (closures, jams, hazards, road work) from "
|
||||
"TomTom Orbis incidentDetails for configured metro bboxes (each <= 10,000 km^2)."
|
||||
)
|
||||
settings_schema = TomTomIncidentsSettings
|
||||
requires_api_key = "tomtom"
|
||||
api_key_field = "api_key_alias"
|
||||
wizard_order = None # Ships disabled
|
||||
default_cadence_s = 1800
|
||||
data_class = "event"
|
||||
enrichment_locations = [("latitude", "longitude")]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: AdapterConfig,
|
||||
config_store: ConfigStore,
|
||||
cursor_db_path: Path,
|
||||
) -> None:
|
||||
self._config_store = config_store
|
||||
self._cursor_db_path = cursor_db_path
|
||||
self._session: aiohttp.ClientSession | None = None
|
||||
self._db: sqlite3.Connection | None = None
|
||||
self._bboxes: list[BBox] = self._read_bboxes(config)
|
||||
self._api_key_alias: str = config.settings.get("api_key_alias", "tomtom")
|
||||
self._api_key: str | None = None
|
||||
|
||||
@staticmethod
|
||||
def _read_bboxes(config: AdapterConfig) -> list[BBox]:
|
||||
return [BBox(**b) for b in (config.settings.get("bboxes") or [])]
|
||||
|
||||
def _redact(self, text: str) -> str:
|
||||
return text.replace(self._api_key, "<KEY>") if self._api_key else text
|
||||
|
||||
async def startup(self) -> None:
|
||||
self._session = aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S),
|
||||
headers={"User-Agent": "Central/0.9 (+tomtom_incidents)"},
|
||||
)
|
||||
self._db = sqlite3.connect(self._cursor_db_path)
|
||||
self._db.execute(_DEDUP_DDL)
|
||||
self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)")
|
||||
self._db.commit()
|
||||
self._api_key = await self._config_store.get_api_key(self._api_key_alias)
|
||||
logger.info("tomtom_incidents adapter started",
|
||||
extra={"bboxes": len(self._bboxes), "api_key_present": bool(self._api_key)})
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
if self._session:
|
||||
await self._session.close()
|
||||
self._session = None
|
||||
if self._db:
|
||||
self._db.close()
|
||||
self._db = None
|
||||
|
||||
async def apply_config(self, new_config: AdapterConfig) -> None:
|
||||
self._bboxes = self._read_bboxes(new_config)
|
||||
self._api_key_alias = new_config.settings.get("api_key_alias", "tomtom")
|
||||
self._api_key = await self._config_store.get_api_key(self._api_key_alias)
|
||||
logger.info("tomtom_incidents config updated",
|
||||
extra={"bboxes": len(self._bboxes), "api_key_present": bool(self._api_key)})
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential_jitter(initial=1, max=30),
|
||||
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
|
||||
)
|
||||
async def _fetch_bbox(self, bbox: BBox) -> list[dict[str, Any]]:
|
||||
assert self._session is not None
|
||||
params = {
|
||||
"bbox": f"{bbox.min_lon},{bbox.min_lat},{bbox.max_lon},{bbox.max_lat}",
|
||||
"fields": _FIELDS,
|
||||
"key": self._api_key,
|
||||
"apiVersion": "1",
|
||||
}
|
||||
async with self._session.get(_INCIDENTS_URL, params=params) as resp:
|
||||
resp.raise_for_status()
|
||||
doc = await resp.json(content_type=None)
|
||||
return doc.get("incidents") or []
|
||||
|
||||
def _build_event(self, inc: dict[str, Any], bbox: BBox) -> Event | None:
|
||||
props = inc.get("properties") or {}
|
||||
tid = props.get("id")
|
||||
if not tid:
|
||||
return None
|
||||
geom = inc.get("geometry") or {}
|
||||
lat, lon = _first_vertex(geom)
|
||||
events = props.get("events") or []
|
||||
first = events[0] if events else {}
|
||||
return Event(
|
||||
id=f"{bbox.state_code}:tomtom:{tid}",
|
||||
adapter=self.name,
|
||||
category="incident.tomtom_incidents",
|
||||
time=(_parse_iso(props.get("startTime")) or datetime.now(timezone.utc)),
|
||||
expires=_parse_iso(props.get("endTime")),
|
||||
severity=_MAGNITUDE_SEVERITY.get(props.get("magnitudeOfDelay"), 1),
|
||||
geo=Geo(
|
||||
centroid=(lon, lat) if lat is not None and lon is not None else None,
|
||||
geometry=geom if geom.get("coordinates") else None,
|
||||
regions=[f"US-{bbox.state_code}"],
|
||||
primary_region=f"US-{bbox.state_code}",
|
||||
),
|
||||
data={
|
||||
"description": first.get("description"),
|
||||
"event_code": first.get("code"),
|
||||
"from": props.get("from"),
|
||||
"to": props.get("to"),
|
||||
"magnitude_of_delay": props.get("magnitudeOfDelay"),
|
||||
"icon_category": props.get("iconCategory"),
|
||||
"length": props.get("length"),
|
||||
"delay": props.get("delay"),
|
||||
"road_numbers": props.get("roadNumbers") or [],
|
||||
"start_time": props.get("startTime"),
|
||||
"end_time": props.get("endTime"),
|
||||
"time_validity": props.get("timeValidity"),
|
||||
"state_code": bbox.state_code,
|
||||
"bbox_name": bbox.name,
|
||||
"latitude": lat,
|
||||
"longitude": lon,
|
||||
},
|
||||
)
|
||||
|
||||
async def poll(self) -> AsyncIterator[Event]:
|
||||
if not self._session:
|
||||
raise RuntimeError("Session not initialized")
|
||||
if not self._api_key:
|
||||
logger.warning("tomtom_incidents: no API key for alias; skipping poll",
|
||||
extra={"alias": self._api_key_alias})
|
||||
return
|
||||
sem = asyncio.Semaphore(_FETCH_CONCURRENCY)
|
||||
|
||||
async def _one(bbox: BBox) -> list[Event]:
|
||||
async with sem:
|
||||
try:
|
||||
incidents = await self._fetch_bbox(bbox)
|
||||
except (aiohttp.ClientError, TimeoutError) as exc:
|
||||
logger.warning("tomtom_incidents bbox fetch failed",
|
||||
extra={"bbox": bbox.name, "error": self._redact(str(exc))})
|
||||
return []
|
||||
out: list[Event] = []
|
||||
for inc in incidents:
|
||||
try:
|
||||
ev = self._build_event(inc, bbox)
|
||||
except Exception:
|
||||
logger.exception("tomtom_incidents parse failed", extra={"bbox": bbox.name})
|
||||
continue
|
||||
if ev is not None:
|
||||
out.append(ev)
|
||||
return out
|
||||
|
||||
results = await asyncio.gather(*[_one(b) for b in self._bboxes])
|
||||
yielded = 0
|
||||
for evs in results:
|
||||
for ev in evs:
|
||||
yield ev
|
||||
yielded += 1
|
||||
|
||||
self.sweep_old_ids()
|
||||
logger.info("tomtom_incidents poll completed",
|
||||
extra={"events_yielded": yielded, "bboxes": len(self._bboxes)})
|
||||
|
||||
def subject_for(self, event: Event) -> str:
|
||||
code = (event.data.get("state_code") or "").lower() or "unknown"
|
||||
return f"central.traffic.incident.{code}"
|
||||
|
|
@ -2658,7 +2658,7 @@ ADAPTER_GROUPS = {
|
|||
"Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"],
|
||||
"Geophysical": ["usgs_quake", "nwis"],
|
||||
"Earth Observation": ["eonet"],
|
||||
"Transportation": ["wzdx", "state_511_atis", "tomtom_flow"],
|
||||
"Transportation": ["wzdx", "state_511_atis", "tomtom_flow", "tomtom_incidents"],
|
||||
}
|
||||
# Same palette the map legend uses, indexed by sorted-adapter position.
|
||||
EVENTS_PALETTE = [
|
||||
|
|
|
|||
10
src/central/gui/templates/_event_rows/tomtom_incidents.html
Normal file
10
src/central/gui/templates/_event_rows/tomtom_incidents.html
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
{# TomTom incident detail rows. Fields from payload->data->data. #}
|
||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||
{% if d.get('description') %}<dt>Incident</dt><dd>{{ d.description }}</dd>{% endif %}
|
||||
{% if d.get('from') %}<dt>From</dt><dd>{{ d.get('from') }}{% if d.get('to') %} → {{ d.get('to') }}{% endif %}</dd>{% endif %}
|
||||
{% if d.get('magnitude_of_delay') is not none %}<dt>Delay magnitude</dt><dd>{{ d.magnitude_of_delay }}/4</dd>{% endif %}
|
||||
{% if d.get('delay') is not none %}<dt>Delay</dt><dd>{{ d.delay }} s</dd>{% endif %}
|
||||
{% if d.get('length') is not none %}<dt>Length</dt><dd>{{ d.length | round | int }} m</dd>{% endif %}
|
||||
{% if d.get('road_numbers') %}<dt>Roads</dt><dd>{{ d.road_numbers | join(', ') }}</dd>{% endif %}
|
||||
{% if d.get('start_time') %}<dt>Started</dt><dd>{{ d.start_time }}</dd>{% endif %}
|
||||
{% if d.get('end_time') is not none %}<dt>Ends</dt><dd>{{ d.end_time }}</dd>{% endif %}
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
{# TomTom incident one-line subject. "<description> on <from> -> <to>". Location
|
||||
column shows geocoded city/county. Fields from payload->data->data. #}
|
||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||
{%- set desc = d.get('description') or 'Traffic incident' -%}
|
||||
{{ desc }}{% if d.get('from') %} on {{ d.get('from') }}{% if d.get('to') %} → {{ d.get('to') }}{% endif %}{% endif %}
|
||||
1
tests/fixtures/tomtom_incidents_sample.json
vendored
Normal file
1
tests/fixtures/tomtom_incidents_sample.json
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
{"incidents":[{"type":"Feature","properties":{"id":"TTI-5df75143-312c-45ab-86c9-2b8e4e49d562-TTR14671884444019001","iconCategory":9,"magnitudeOfDelay":0,"startTime":"2026-04-03T03:27:30Z","endTime":null,"from":"Early Road","to":"Slade Road","length":234.054,"delay":null,"roadNumbers":[],"timeValidity":"present","events":[{"code":701,"description":"Roadworks","iconCategory":9}]},"geometry":{"type":"LineString","coordinates":[[-116.7368214523,43.794556286],[-116.7363762056,43.7939957253],[-116.7360181307,43.7935411016],[-116.7357297933,43.793107893],[-116.7354816889,43.7926894443]]}},{"type":"Feature","properties":{"id":"TTI-5df75143-312c-45ab-86c9-2b8e4e49d562-TTR14712278436123000","iconCategory":8,"magnitudeOfDelay":4,"startTime":"2026-05-16T01:22:00Z","endTime":null,"from":"Wagner Road","to":"Farmway Road / West Ustick Road","length":1483.2713788089,"delay":null,"roadNumbers":[],"timeValidity":"present","events":[{"code":401,"description":"Closed","iconCategory":8}]},"geometry":{"type":"LineString","coordinates":[[-116.7331709659,43.6332509025],[-116.7326506173,43.6332536325],[-116.73007972,43.6332683743],[-116.7281579172,43.6332831161],[-116.7265311574,43.6332965232],[-116.726281712,43.6332991925],[-116.724549005,43.633311265],[-116.7240957117,43.633311265],[-116.7231542563,43.6333139343],[-116.7217729186,43.633331406],[-116.7217608487,43.633331406],[-116.7206691896,43.6333434785],[-116.719480971,43.6333448132],[-116.7194689011,43.6333448132],[-116.7188157832,43.6333461478],[-116.7181224322,43.6333514864],[-116.7181103622,43.6333514864],[-116.7168953216,43.6333595549],[-116.7153798735,43.6333689581],[-116.7147415077,43.6333729621]]}}]}
|
||||
|
|
@ -1145,6 +1145,7 @@ _SAMPLE_INNER = {
|
|||
"wzdx": {"road_names": ["I-80"], "direction": "eastbound"},
|
||||
"state_511_atis": {"layer": "Incidents", "roadway_name": "US-95", "location_description": "Ponderosa Mobile Home Park"},
|
||||
"tomtom_flow": {"road_category": "primary", "relative_speed": 0.11},
|
||||
"tomtom_incidents": {"description": "Roadworks", "from": "Early Road", "to": "Slade Road"},
|
||||
}
|
||||
|
||||
# Exact expected subjects for the deterministic adapters. swpc_alerts is omitted
|
||||
|
|
@ -1165,6 +1166,7 @@ _EXPECTED_SUBJECT = {
|
|||
"wfigs_perimeters": "Wildfire perimeter — Carbon, MT",
|
||||
"wzdx": "Work zone on I-80 eastbound",
|
||||
"tomtom_flow": "Traffic flow (primary) — 11% of free-flow",
|
||||
"tomtom_incidents": "Roadworks on Early Road → Slade Road",
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
140
tests/test_tomtom_incidents.py
Normal file
140
tests/test_tomtom_incidents.py
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
"""Tests for the tomtom_incidents adapter (v0.9.5).
|
||||
|
||||
Fixture is a real Orbis incidentDetails capture (2 incidents, varied
|
||||
magnitudeOfDelay) from the Treasure Valley bbox:
|
||||
tests/fixtures/tomtom_incidents_sample.json
|
||||
|
||||
No conftest entry: dedup uses the supervisor-injected cursors.db (inherited
|
||||
mixin); polling is stateless.
|
||||
"""
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapters.tomtom_incidents import (
|
||||
BBox,
|
||||
TomTomIncidentsAdapter,
|
||||
_first_vertex,
|
||||
_MAGNITUDE_SEVERITY,
|
||||
)
|
||||
from central.config_models import AdapterConfig
|
||||
|
||||
INC = json.loads((Path(__file__).parent / "fixtures" / "tomtom_incidents_sample.json").read_text())["incidents"]
|
||||
BB = BBox(name="treasure_valley", min_lon=-116.85, min_lat=43.30, max_lon=-115.65, max_lat=44.10, state_code="ID")
|
||||
|
||||
|
||||
def _cfg():
|
||||
return AdapterConfig(
|
||||
name="tomtom_incidents", enabled=True, cadence_s=1800,
|
||||
settings={"api_key_alias": "tomtom", "bboxes": [BB.model_dump()]},
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter(tmp_path):
|
||||
return TomTomIncidentsAdapter(_cfg(), MagicMock(), tmp_path / "cursors.db")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mag,sev", [(0, 1), (1, 1), (2, 2), (3, 3), (4, 4), (None, 1), (99, 1)])
|
||||
def test_severity_mapping(mag, sev):
|
||||
assert _MAGNITUDE_SEVERITY.get(mag, 1) == sev
|
||||
|
||||
|
||||
def test_dedup_key(adapter):
|
||||
e = adapter._build_event(INC[0], BB)
|
||||
assert e.id == f"ID:tomtom:{INC[0]['properties']['id']}"
|
||||
|
||||
|
||||
def test_build_event_linestring(adapter):
|
||||
e = adapter._build_event(INC[0], BB) # mag-0 Roadworks LineString
|
||||
assert e.category == "incident.tomtom_incidents"
|
||||
assert e.severity == 1
|
||||
assert e.data["description"] == "Roadworks"
|
||||
assert e.data["from"] == "Early Road" and e.data["to"] == "Slade Road"
|
||||
assert e.data["state_code"] == "ID"
|
||||
assert e.data["latitude"] is not None and e.data["longitude"] is not None
|
||||
|
||||
|
||||
def test_build_event_closure_severity(adapter):
|
||||
e = adapter._build_event(INC[1], BB) # mag-4 Closed
|
||||
assert e.data["magnitude_of_delay"] == 4
|
||||
assert e.severity == 4
|
||||
|
||||
|
||||
def test_geo_geometry_for_linestring(adapter):
|
||||
# v0.9.3 framework: the affected-road LineString rides on geo.geometry.
|
||||
e = adapter._build_event(INC[0], BB)
|
||||
assert e.geo.geometry["type"] == "LineString"
|
||||
assert e.geo.geometry["coordinates"] == INC[0]["geometry"]["coordinates"]
|
||||
|
||||
|
||||
def test_build_event_point():
|
||||
a = TomTomIncidentsAdapter(_cfg(), MagicMock(), Path("/tmp/unused.db"))
|
||||
inc = {"geometry": {"type": "Point", "coordinates": [-116.2, 43.6]},
|
||||
"properties": {"id": "TTI-x", "magnitudeOfDelay": 2,
|
||||
"events": [{"description": "Accident", "code": 1}]}}
|
||||
e = a._build_event(inc, BB)
|
||||
assert e.geo.geometry["type"] == "Point"
|
||||
assert e.severity == 2
|
||||
assert e.data["latitude"] == 43.6 and e.data["longitude"] == -116.2
|
||||
|
||||
|
||||
def test_first_vertex():
|
||||
assert _first_vertex({"type": "Point", "coordinates": [-116.2, 43.6]}) == (43.6, -116.2)
|
||||
assert _first_vertex({"type": "LineString", "coordinates": [[-116.2, 43.6], [-116.1, 43.7]]}) == (43.6, -116.2)
|
||||
assert _first_vertex(None) == (None, None)
|
||||
assert _first_vertex({"type": "Polygon", "coordinates": []}) == (None, None)
|
||||
|
||||
|
||||
def test_subject_for_idaho(adapter):
|
||||
e = adapter._build_event(INC[0], BB)
|
||||
assert adapter.subject_for(e) == "central.traffic.incident.id"
|
||||
|
||||
|
||||
def test_subject_unknown(adapter):
|
||||
e = adapter._build_event(INC[0], BBox(name="x", min_lon=0, min_lat=0, max_lon=1, max_lat=1, state_code=""))
|
||||
assert adapter.subject_for(e) == "central.traffic.incident.unknown"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_yields_events(tmp_path):
|
||||
cs = MagicMock()
|
||||
cs.get_api_key = AsyncMock(return_value="testkey")
|
||||
adapter = TomTomIncidentsAdapter(_cfg(), cs, tmp_path / "cursors.db")
|
||||
await adapter.startup()
|
||||
adapter._fetch_bbox = AsyncMock(return_value=INC) # bypass retry + network
|
||||
events = [e async for e in adapter.poll()]
|
||||
await adapter.shutdown()
|
||||
assert len(events) == 2
|
||||
assert all(e.adapter == "tomtom_incidents" for e in events)
|
||||
assert all(e.category == "incident.tomtom_incidents" for e in events)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_skips_without_key(tmp_path):
|
||||
cs = MagicMock()
|
||||
cs.get_api_key = AsyncMock(return_value=None)
|
||||
adapter = TomTomIncidentsAdapter(_cfg(), cs, tmp_path / "cursors.db")
|
||||
await adapter.startup()
|
||||
events = [e async for e in adapter.poll()]
|
||||
await adapter.shutdown()
|
||||
assert events == []
|
||||
|
||||
|
||||
def test_summary_partial_renders():
|
||||
from central.gui.routes import _derive_subject
|
||||
inner = {"description": "Roadworks", "from": "Early Road", "to": "Slade Road"}
|
||||
row = {"adapter": "tomtom_incidents", "data": {"data": {"data": inner}}}
|
||||
assert _derive_subject(row) == "Roadworks on Early Road → Slade Road"
|
||||
|
||||
|
||||
def test_inherits_dedup_mixin():
|
||||
for m in ("is_published", "mark_published", "sweep_old_ids"):
|
||||
assert m not in TomTomIncidentsAdapter.__dict__, f"redefines {m}"
|
||||
assert getattr(TomTomIncidentsAdapter, m) is getattr(SourceAdapter, m)
|
||||
Loading…
Add table
Add a link
Reference in a new issue