diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index b941104..87d88be 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -132,6 +132,7 @@ Central's archive. | `CENTRAL_DISASTER` | `central.disaster.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_HYDRO` | `central.hydro.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_TRAFFIC` | `central.traffic.>` | 7 | 1 GiB | ✓ | ✓ | +| `CENTRAL_TRAFFIC_FLOW` | `central.traffic_flow.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_META` | `central.meta.>` | 1 | 1 GiB | — | ✓ | Retention and storage caps are migration-seeded defaults visible in `config.streams`; @@ -1517,6 +1518,40 @@ road name, description, county, severity). Verified for Idaho only. - **Removal semantics:** none in v1. Events age out of the upstream feed; the 14-day dedup sweep expires stale ids. +### tomtom_flow — TomTom Orbis vector flow tiles (per-segment speed, telemetry) + +Per-road-segment traffic speed from TomTom Orbis **vector** flow tiles, polled for +a configured tile coverage set (Idaho metros at z=10). Each segment is one +telemetry Event carrying a LineString/MultiLineString geometry (drawn as a +colored polyline on the `/telemetry` map) — green free-flowing, red jammed. + +- **Stream:** `CENTRAL_TRAFFIC_FLOW` (telemetry; `/telemetry` tab). +- **Subject pattern:** `central.traffic_flow.{z}.{x}.{y}` — tile-routable (segments + carry no state). Distinct token from `central.traffic.>` (no overlap). +- **GUI event_type:** `flow` (from `category = "flow.tomtom_flow"`). +- **Cadence default:** 300s (5 min). **Retention:** 7 days (high-volume telemetry). +- **Dedup key shape:** `{z}/{x}/{y}:{segment_index}:{minute}` — minute-bucketed so + an adapter poll and an on-demand passthrough fetch of the same tile in the same + minute don't double-store (TomTom advertises a 60s tile TTL). +- **Event.data fields:** + + | key | type | nullable | description | + |---|---|---|---| + | `relative_speed` | float | yes | 0-1 ratio of current to free-flow speed (drives severity + color) | + | `road_category` | str | yes | `motorway` / `trunk` / `primary` / `secondary` | + | `tile_z` / `tile_x` / `tile_y` | int | no | Source slippy tile | + | `segment_index` | int | no | Index within the tile's "Traffic flow" layer | + | `tier` | str | no | `orbis` | + | `fetched_at` | str (ISO 8601 UTC) | no | Poll timestamp | + + Geometry (the road polyline) is on `geo.geometry`, persisted to the PostGIS + `geom` column and returned by the map as GeoJSON. +- **Severity:** from `relative_speed` — `>=0.75`=1 (free), `0.5-0.75`=2, `0.25-0.5`=3, + `<0.25`=4 (jam). +- **Decipherable as-is:** yes — speed ratio + road class + geometry are self-contained. +- **Removal semantics:** none; time-series telemetry, one snapshot per poll, swept + by the 7-day retention. + ### wzdx — FHWA Work Zone Data Exchange (state-DOT work zones) Active road work zones discovered from the federal WZDx Feed Registry and each diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index 026bc82..6276df4 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -362,7 +362,7 @@ central..[....] ``` - `` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`, - `traffic`, `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding + `traffic`, `traffic_flow`, `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding one). Operators MUST be able to subscribe to all of one domain with `central..>`. - `` is adapter-driven and identifies the event category within the @@ -551,6 +551,7 @@ STREAMS: list[StreamEntry] = [ StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), StreamEntry("CENTRAL_TRAFFIC", "central.traffic.>"), + StreamEntry("CENTRAL_TRAFFIC_FLOW", "central.traffic_flow.>"), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), ] ``` diff --git a/pyproject.toml b/pyproject.toml index 2922e9f..9dd4745 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,8 @@ license = {text = "MIT"} authors = [{name = "Matt Johnson"}] dependencies = [ "aiohttp>=3.13.5", + "mapbox-vector-tile>=2.0", + "itsdangerous>=2.2", # used by gui/csrf.py + gui/wizard.py (was an undeclared transitive) "argon2-cffi>=25.1.0", "asyncpg>=0.31.0", "cloudevents>=2.0.0", diff --git a/sql/migrations/027_add_tomtom_flow_adapter_and_flow_stream.sql b/sql/migrations/027_add_tomtom_flow_adapter_and_flow_stream.sql new file mode 100644 index 0000000..a256c2b --- /dev/null +++ b/sql/migrations/027_add_tomtom_flow_adapter_and_flow_stream.sql @@ -0,0 +1,20 @@ +-- Migration: 027_add_tomtom_flow_adapter_and_flow_stream +-- Adds the CENTRAL_TRAFFIC_FLOW JetStream stream (telemetry; central.traffic_flow.>, +-- non-overlapping with CENTRAL_TRAFFIC's central.traffic.>) AND the tomtom_flow +-- adapter row. NEW event-bearing stream -> central-archive restart required at deploy +-- (feedback_new_stream_needs_archive_restart). 7-day retention (high-volume telemetry). +-- Ships disabled; operator adds a "tomtom" api_key + enables. Idaho metros at z=10. +-- Additive-only: idempotent via ON CONFLICT DO NOTHING. + +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_TRAFFIC_FLOW', 604800, 1073741824) +ON CONFLICT (name) DO NOTHING; + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'tomtom_flow', + false, + 300, + '{"api_key_alias": "tomtom", "tiles": [{"z":10,"x":181,"y":373},{"z":10,"x":180,"y":374},{"z":10,"x":179,"y":357},{"z":10,"x":193,"y":374},{"z":10,"x":192,"y":376},{"z":10,"x":186,"y":377},{"z":10,"x":179,"y":362},{"z":10,"x":181,"y":374},{"z":10,"x":182,"y":373},{"z":10,"x":182,"y":374}]}'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/tomtom_flow.py b/src/central/adapters/tomtom_flow.py new file mode 100644 index 0000000..81ab5c1 --- /dev/null +++ b/src/central/adapters/tomtom_flow.py @@ -0,0 +1,179 @@ +"""TomTom Orbis vector flow-tile polling adapter (telemetry). + +Polls a configured set of slippy tiles (Idaho metros at z=10), fetches each +Orbis vector flow tile, decodes per-segment relative_speed + geometry via +``tomtom_flow_parse``, and emits one telemetry Event per road segment per poll +to CENTRAL_TRAFFIC_FLOW (subject ``central.traffic_flow.{z}.{x}.{y}``). The +v0.9.4 on-demand passthrough route reuses the same decode helper. + +Dedup is inherited from SourceAdapter; ids are minute-bucketed so an adapter +poll and a passthrough fetch of the same tile in the same minute don't double +store (TomTom advertises a 60s tile TTL). +""" + +import asyncio +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event +from central.tomtom_flow_parse import decode_flow_tile + +logger = logging.getLogger(__name__) + +_ORBIS_FLOW_URL = "https://api.tomtom.com/maps/orbis/traffic/tile/flow/{z}/{x}/{y}.pbf?key={key}&apiVersion=1" +_FETCH_CONCURRENCY = 4 +_FETCH_TIMEOUT_S = 30 + +_DEDUP_DDL = ( + "CREATE TABLE IF NOT EXISTS published_ids (" + "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " + "first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" +) + + +class TileCoord(BaseModel): + z: int + x: int + y: int + + +class TomTomFlowSettings(BaseModel): + """tiles: slippy coordinates to poll. api_key_alias: config.api_keys entry.""" + + tiles: list[TileCoord] = [] + api_key_alias: str = "tomtom" + + +class TomTomFlowAdapter(SourceAdapter): + """TomTom Orbis vector flow-tile telemetry adapter.""" + + name = "tomtom_flow" + display_name = "TomTom Traffic Flow" + description = ( + "Per-road-segment traffic speed (relative_speed) from TomTom Orbis vector " + "flow tiles, for a configured tile coverage set. Telemetry; renders as " + "colored polylines on the map." + ) + settings_schema = TomTomFlowSettings + requires_api_key = "tomtom" + api_key_field = "api_key_alias" + wizard_order = None # Ships disabled + default_cadence_s = 300 + data_class = "telemetry" + enrichment_locations = [] # segments carry their own geometry; no point geocode + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._tiles: list[TileCoord] = self._read_tiles(config) + self._api_key_alias: str = config.settings.get("api_key_alias", "tomtom") + self._api_key: str | None = None + + @staticmethod + def _read_tiles(config: AdapterConfig) -> list[TileCoord]: + return [TileCoord(**t) for t in (config.settings.get("tiles") or [])] + + def _redact(self, text: str) -> str: + return text.replace(self._api_key, "") if self._api_key else text + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S), + headers={"User-Agent": "Central/0.9 (+tomtom_flow)"}, + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(_DEDUP_DDL) + self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") + self._db.commit() + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info("tomtom_flow adapter started", + extra={"tiles": len(self._tiles), "api_key_present": bool(self._api_key)}) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + self._tiles = self._read_tiles(new_config) + self._api_key_alias = new_config.settings.get("api_key_alias", "tomtom") + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info("tomtom_flow config updated", + extra={"tiles": len(self._tiles), "api_key_present": bool(self._api_key)}) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_tile(self, z: int, x: int, y: int) -> bytes: + assert self._session is not None + url = _ORBIS_FLOW_URL.format(z=z, x=x, y=y, key=self._api_key) + async with self._session.get(url) as resp: + resp.raise_for_status() + return await resp.read() + + async def poll(self) -> AsyncIterator[Event]: + if not self._session: + raise RuntimeError("Session not initialized") + if not self._api_key: + logger.warning("tomtom_flow: no API key for alias; skipping poll", + extra={"alias": self._api_key_alias}) + return + now = datetime.now(timezone.utc) + sem = asyncio.Semaphore(_FETCH_CONCURRENCY) + + async def _one(tile: TileCoord) -> list[Event]: + async with sem: + try: + pbf = await self._fetch_tile(tile.z, tile.x, tile.y) + except (aiohttp.ClientError, TimeoutError) as exc: + logger.warning("tomtom_flow tile fetch failed", + extra={"tile": [tile.z, tile.x, tile.y], "error": self._redact(str(exc))}) + return [] + try: + return decode_flow_tile(pbf, tile.z, tile.x, tile.y, now) + except Exception: + logger.exception("tomtom_flow decode failed", extra={"tile": [tile.z, tile.x, tile.y]}) + return [] + + results = await asyncio.gather(*[_one(t) for t in self._tiles]) + yielded = 0 + for evs in results: + for ev in evs: + yield ev + yielded += 1 + + self.sweep_old_ids() + logger.info("tomtom_flow poll completed", extra={"events_yielded": yielded, "tiles": len(self._tiles)}) + + def subject_for(self, event: Event) -> str: + d = event.data + return f"central.traffic_flow.{d.get('tile_z')}.{d.get('tile_x')}.{d.get('tile_y')}" diff --git a/src/central/archive.py b/src/central/archive.py index dfa5475..f26a104 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -75,6 +75,13 @@ def _build_geom_sql(geo_data: dict[str, Any] | None) -> str | None: if not geo_data: return None + # A full GeoJSON geometry (e.g. flow-segment LineString) wins over the + # bbox/centroid fallbacks so the map renders the real shape. Inert for + # adapters that don't set geo.geometry. + geometry = geo_data.get("geometry") + if geometry: + return json.dumps(geometry) + bbox = geo_data.get("bbox") centroid = geo_data.get("centroid") diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index e87e25a..9466c0d 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2651,7 +2651,7 @@ ADAPTER_GROUPS = { "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"], "Geophysical": ["usgs_quake", "nwis"], "Earth Observation": ["eonet"], - "Transportation": ["wzdx", "state_511_atis"], + "Transportation": ["wzdx", "state_511_atis", "tomtom_flow"], } # Same palette the map legend uses, indexed by sorted-adapter position. EVENTS_PALETTE = [ diff --git a/src/central/gui/templates/_event_rows/tomtom_flow.html b/src/central/gui/templates/_event_rows/tomtom_flow.html new file mode 100644 index 0000000..07672e1 --- /dev/null +++ b/src/central/gui/templates/_event_rows/tomtom_flow.html @@ -0,0 +1,6 @@ +{# TomTom flow segment detail rows. Fields from payload->data->data. #} +{% set d = (event.data.get('data') or {}).get('data') or {} %} +{% if d.get('road_category') %}
Road class
{{ d.road_category }}
{% endif %} +{% if d.get('relative_speed') is not none %}
Relative speed
{{ (d.relative_speed * 100) | round | int }}% of free-flow
{% endif %} +{% if d.get('tile_z') is not none %}
Tile
{{ d.tile_z }}/{{ d.tile_x }}/{{ d.tile_y }} (segment {{ d.segment_index }})
{% endif %} +{% if d.get('fetched_at') %}
Fetched
{{ d.fetched_at }}
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/tomtom_flow.html b/src/central/gui/templates/_event_summaries/tomtom_flow.html new file mode 100644 index 0000000..7146512 --- /dev/null +++ b/src/central/gui/templates/_event_summaries/tomtom_flow.html @@ -0,0 +1,5 @@ +{# TomTom flow segment one-line subject. relative_speed -> % of free-flow. + Fields from payload->data->data. #} +{% set d = (event.data.get('data') or {}).get('data') or {} %} +{%- set rs = d.get('relative_speed') -%} +Traffic flow{% if d.get('road_category') %} ({{ d.road_category }}){% endif %}{% if rs is not none %} — {{ (rs * 100) | round | int }}% of free-flow{% endif %} diff --git a/src/central/gui/templates/events_list.html b/src/central/gui/templates/events_list.html index df9d54a..73bcf1e 100644 --- a/src/central/gui/templates/events_list.html +++ b/src/central/gui/templates/events_list.html @@ -136,6 +136,11 @@ return ADAPTER_COLORS[adapter] || "#3388ff"; } + // tomtom_flow segments color by severity (relative_speed band): green=free + // flowing, red=jammed. Mirrors the green/yellow/red flow overlay. + var FLOW_COLOR = { "1": "#2ecc71", "2": "#f1c40f", "3": "#e67e22", "4": "#e74c3c" }; + function flowColor(sev) { return FLOW_COLOR[sev] || "#7f8c8d"; } + // Flatten arbitrarily-nested GeoJSON coordinates into a flat [lng, lat] list. function flattenCoords(coords, out) { if (coords.length && typeof coords[0] === "number") { @@ -301,7 +306,7 @@ if (!geom) return; var adapter = row.dataset.adapter || ""; - var color = getAdapterColor(adapter); + var color = adapter === "tomtom_flow" ? flowColor(row.dataset.severity) : getAdapterColor(adapter); var op = severityOpacity(row); // Point-like geometries (Points + zero-extent polygons from diff --git a/src/central/models.py b/src/central/models.py index c7a2159..4407405 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -15,6 +15,7 @@ class Geo(BaseModel): bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] primary_region: str | None = None # alphabetically first region, used for subject + geometry: dict[str, Any] | None = None # full GeoJSON geometry; preferred by the archive over bbox/centroid for the map geom column class Event(BaseModel): diff --git a/src/central/streams.py b/src/central/streams.py index cc940d8..98af7d4 100644 --- a/src/central/streams.py +++ b/src/central/streams.py @@ -30,5 +30,6 @@ STREAMS: list[StreamEntry] = [ StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), StreamEntry("CENTRAL_TRAFFIC", "central.traffic.>"), + StreamEntry("CENTRAL_TRAFFIC_FLOW", "central.traffic_flow.>"), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), ] diff --git a/src/central/tomtom_flow_parse.py b/src/central/tomtom_flow_parse.py new file mode 100644 index 0000000..09c5c01 --- /dev/null +++ b/src/central/tomtom_flow_parse.py @@ -0,0 +1,113 @@ +"""Decode TomTom Orbis vector flow tiles into per-segment telemetry Events. + +Shared by the ``tomtom_flow`` polling adapter and (v0.9.4) the on-demand +passthrough route. A vector flow tile's ``"Traffic flow"`` layer carries one +feature per road segment with ``relative_speed`` (0-1 current/free-flow ratio) +and ``road_category``, in tile-local MVT coordinates (extent 4096, y-up). We +georeference each vertex to lon/lat via the slippy-tile + Web-Mercator inverse +and emit one telemetry Event per segment, carrying the LineString geometry (so +the map draws a colored polyline) and a severity derived from relative_speed. +""" + +import math +from datetime import datetime +from typing import Any + +import mapbox_vector_tile + +from central.models import Event, Geo + +FLOW_LAYER = "Traffic flow" +ADAPTER_NAME = "tomtom_flow" + + +def severity_from_relative_speed(rs: float | None) -> int: + """relative_speed (0-1, current/free-flow) -> severity; lower speed = worse.""" + if rs is None: + return 1 + if rs >= 0.75: + return 1 + if rs >= 0.5: + return 2 + if rs >= 0.25: + return 3 + return 4 + + +def _merc_y_to_lat(t: float) -> float: + """Normalized web-mercator row (0=north world edge .. 1=south) -> latitude deg.""" + return math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * t)))) + + +def _local_to_lonlat(lx: float, ly: float, z: int, x: int, y: int, extent: int) -> list[float]: + """MVT tile-local (lx, ly) [y-up, 0..extent] -> [lon, lat] degrees.""" + n = 2 ** z + lon_left = x / n * 360.0 - 180.0 + lon_right = (x + 1) / n * 360.0 - 180.0 + fx = lx / extent + fy = ly / extent # mapbox_vector_tile default orientation is y-up (0 = tile bottom) + lon = lon_left + fx * (lon_right - lon_left) + lat = _merc_y_to_lat((y + (1 - fy)) / n) # fy=1 (top)->y/n ; fy=0 (bottom)->(y+1)/n + return [round(lon, 6), round(lat, 6)] + + +def _transform_coords(coords: Any, z: int, x: int, y: int, extent: int) -> Any: + """Recursively georeference nested MVT coordinate arrays to lon/lat.""" + if coords and isinstance(coords[0], (int, float)): + return _local_to_lonlat(coords[0], coords[1], z, x, y, extent) + return [_transform_coords(c, z, x, y, extent) for c in coords] + + +def _midpoint(coordinates: Any) -> tuple[float, float] | None: + """Mean (lon, lat) over all vertices — the clustering centroid.""" + pts: list[list[float]] = [] + + def walk(c: Any) -> None: + if c and isinstance(c[0], (int, float)): + pts.append(c) + else: + for sub in c: + walk(sub) + + walk(coordinates or []) + if not pts: + return None + return (sum(p[0] for p in pts) / len(pts), sum(p[1] for p in pts) / len(pts)) + + +def decode_flow_tile(pbf: bytes, z: int, x: int, y: int, fetched_at: datetime) -> list[Event]: + """Decode one Orbis vector flow tile into per-segment telemetry Events.""" + decoded = mapbox_vector_tile.decode(pbf) + layer = decoded.get(FLOW_LAYER) + if not layer: + return [] + extent = layer.get("extent", 4096) + minute = fetched_at.strftime("%Y-%m-%dT%H:%M") + events: list[Event] = [] + for idx, feat in enumerate(layer.get("features", [])): + props = feat.get("properties") or {} + geom = feat.get("geometry") or {} + coords = _transform_coords(geom.get("coordinates") or [], z, x, y, extent) + gj = {"type": geom.get("type"), "coordinates": coords} + rs = props.get("relative_speed") + events.append( + Event( + id=f"{z}/{x}/{y}:{idx}:{minute}", + adapter=ADAPTER_NAME, + category="flow.tomtom_flow", + time=fetched_at, + severity=severity_from_relative_speed(rs), + geo=Geo(centroid=_midpoint(coords), geometry=gj, regions=[], primary_region=None), + data={ + "relative_speed": rs, + "road_category": props.get("road_category"), + "tile_z": z, + "tile_x": x, + "tile_y": y, + "segment_index": idx, + "tier": "orbis", + "fetched_at": fetched_at.isoformat(), + }, + ) + ) + return events diff --git a/tests/fixtures/tomtom_flow_orbis.pbf b/tests/fixtures/tomtom_flow_orbis.pbf new file mode 100644 index 0000000..4b37ebc Binary files /dev/null and b/tests/fixtures/tomtom_flow_orbis.pbf differ diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py index 142f0c7..cf41ddb 100644 --- a/tests/test_events_feed_frontend.py +++ b/tests/test_events_feed_frontend.py @@ -1144,6 +1144,7 @@ _SAMPLE_INNER = { "wfigs_perimeters": {"county": "Carbon", "state": "MT"}, "wzdx": {"road_names": ["I-80"], "direction": "eastbound"}, "state_511_atis": {"layer": "Incidents", "roadway_name": "US-95", "location_description": "Ponderosa Mobile Home Park"}, + "tomtom_flow": {"road_category": "primary", "relative_speed": 0.11}, } # Exact expected subjects for the deterministic adapters. swpc_alerts is omitted @@ -1163,6 +1164,7 @@ _EXPECTED_SUBJECT = { "wfigs_incidents": "Wildfire incident — Montezuma, CO", "wfigs_perimeters": "Wildfire perimeter — Carbon, MT", "wzdx": "Work zone on I-80 eastbound", + "tomtom_flow": "Traffic flow (primary) — 11% of free-flow", } diff --git a/tests/test_telemetry_separation.py b/tests/test_telemetry_separation.py index 6d0d97a..137d71a 100644 --- a/tests/test_telemetry_separation.py +++ b/tests/test_telemetry_separation.py @@ -10,6 +10,9 @@ from central.adapter import SourceAdapter from central.adapter_discovery import discover_adapters from central.gui import routes +# Adapters with data_class="telemetry" (the pinned split; grow as telemetry adapters land). +_TELEMETRY = ["nwis", "tomtom_flow"] + # --- data_class defaults / registry split ----------------------------------- @@ -22,15 +25,15 @@ def test_registry_split_11_event_1_telemetry(): by_class = {} for name, cls in reg.items(): by_class.setdefault(getattr(cls, "data_class", "event"), []).append(name) - assert by_class.get("telemetry") == ["nwis"] + assert sorted(by_class.get("telemetry", [])) == _TELEMETRY # Everything else is event-class; the split must cover the whole registry. - assert sorted(by_class.get("event", [])) == sorted(n for n in reg if n != "nwis") - assert len(by_class.get("event", [])) == len(reg) - 1 + assert sorted(by_class.get("event", [])) == sorted(n for n in reg if n not in _TELEMETRY) + assert len(by_class.get("event", [])) == len(reg) - len(_TELEMETRY) def test_class_adapter_names(): assert "nwis" not in routes._class_adapter_names("event") - assert routes._class_adapter_names("telemetry") == ["nwis"] + assert sorted(routes._class_adapter_names("telemetry")) == _TELEMETRY assert "usgs_quake" in routes._class_adapter_names("event") @@ -40,16 +43,16 @@ def test_event_options_exclude_nwis(): flat, grouped = routes._adapter_filter_options("event") names = {a["name"] for a in flat} assert "nwis" not in names - assert len(flat) == len(discover_adapters()) - 1 + assert len(flat) == len(discover_adapters()) - len(_TELEMETRY) grouped_values = {opt["value"] for _, items in grouped for opt in items} assert "nwis" not in grouped_values def test_telemetry_options_only_nwis(): flat, grouped = routes._adapter_filter_options("telemetry") - assert [a["name"] for a in flat] == ["nwis"] + assert sorted(a["name"] for a in flat) == _TELEMETRY grouped_values = [opt["value"] for _, items in grouped for opt in items] - assert grouped_values == ["nwis"] + assert sorted(grouped_values) == _TELEMETRY def test_colors_stable_across_classes(): diff --git a/tests/test_tomtom_flow.py b/tests/test_tomtom_flow.py new file mode 100644 index 0000000..957fe07 --- /dev/null +++ b/tests/test_tomtom_flow.py @@ -0,0 +1,128 @@ +"""Tests for the tomtom_flow adapter + shared decode module (v0.9.3). + +Fixture is a real Orbis vector flow tile (Boise z10/181/374): + tests/fixtures/tomtom_flow_orbis.pbf + -- curl 'https://api.tomtom.com/maps/orbis/traffic/tile/flow/10/181/374.pbf?key=…&apiVersion=1' + +No tests/conftest isolation entry: dedup uses the supervisor-injected cursors.db +(inherited mixin); polling is stateless. +""" + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.adapter import SourceAdapter +from central.adapters.tomtom_flow import TomTomFlowAdapter +from central.archive import _build_geom_sql +from central.config_models import AdapterConfig +from central.tomtom_flow_parse import ( + _local_to_lonlat, + decode_flow_tile, + severity_from_relative_speed, +) + +FIX = Path(__file__).parent / "fixtures" / "tomtom_flow_orbis.pbf" +PBF = FIX.read_bytes() +AT = datetime(2026, 5, 25, 22, 3, tzinfo=timezone.utc) + + +def _cfg(tiles=None): + return AdapterConfig( + name="tomtom_flow", enabled=True, cadence_s=300, + settings={"api_key_alias": "tomtom", "tiles": tiles or [{"z": 10, "x": 181, "y": 374}]}, + updated_at=datetime.now(timezone.utc), + ) + + +@pytest.mark.parametrize("rs,sev", [ + (1.0, 1), (0.75, 1), (0.74, 2), (0.5, 2), (0.49, 3), (0.25, 3), (0.24, 4), (0.0, 4), (None, 1), +]) +def test_severity_from_relative_speed(rs, sev): + assert severity_from_relative_speed(rs) == sev + + +def test_local_to_lonlat_boise_tile(): + # tile z10/181/374; local (0,0)=bottom-left -> SW corner ~ Boise area + lon, lat = _local_to_lonlat(2048, 2048, 10, 181, 374, 4096) # tile center + assert -116.4 < lon < -116.0 and 43.4 < lat < 43.8 + + +def test_decode_flow_tile_real_fixture(): + evs = decode_flow_tile(PBF, 10, 181, 374, AT) + assert len(evs) > 50 + e = evs[0] + assert e.category == "flow.tomtom_flow" + assert e.adapter == "tomtom_flow" + assert e.geo.geometry["type"] in ("LineString", "MultiLineString") + assert e.data["road_category"] in ("motorway", "trunk", "primary", "secondary") + assert e.data["tile_z"] == 10 and e.data["tile_x"] == 181 and e.data["tile_y"] == 374 + # vertices georeferenced near Boise + v = e.geo.geometry["coordinates"] + while isinstance(v[0], list): + v = v[0] + assert -117 < v[0] < -116 and 43 < v[1] < 44 + + +def test_dedup_key_minute_bucketed(): + evs = decode_flow_tile(PBF, 10, 181, 374, AT) + assert evs[0].id == "10/181/374:0:2026-05-25T22:03" + later = decode_flow_tile(PBF, 10, 181, 374, datetime(2026, 5, 25, 22, 4, tzinfo=timezone.utc)) + assert later[0].id != evs[0].id # different minute -> different id + + +def test_subject_for(): + adapter = TomTomFlowAdapter(_cfg(), MagicMock(), Path("/tmp/unused.db")) + e = decode_flow_tile(PBF, 10, 181, 374, AT)[0] + assert adapter.subject_for(e) == "central.traffic_flow.10.181.374" + + +def test_archive_prefers_geo_geometry(): + line = {"type": "LineString", "coordinates": [[-116.2, 43.6], [-116.1, 43.7]]} + # geometry present -> returned verbatim (not bbox/centroid) + out = _build_geom_sql({"geometry": line, "centroid": [-116.2, 43.6], "bbox": [-116.3, 43.5, -116.0, 43.8]}) + assert json.loads(out) == line + # no geometry -> falls back to centroid Point (regression guard) + out2 = _build_geom_sql({"centroid": [-116.2, 43.6]}) + assert json.loads(out2)["type"] == "Point" + + +@pytest.mark.asyncio +async def test_poll_yields_segments(tmp_path): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value="testkey") + adapter = TomTomFlowAdapter(_cfg(), cs, tmp_path / "cursors.db") + await adapter.startup() + adapter._fetch_tile = AsyncMock(return_value=PBF) # bypass retry + network + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert len(events) > 50 + assert all(e.adapter == "tomtom_flow" for e in events) + assert all(e.category == "flow.tomtom_flow" for e in events) + + +@pytest.mark.asyncio +async def test_poll_skips_without_key(tmp_path): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value=None) + adapter = TomTomFlowAdapter(_cfg(), cs, tmp_path / "cursors.db") + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert events == [] # no key -> no fetch, clean skip + + +def test_summary_partial_renders(): + from central.gui.routes import _derive_subject + inner = {"road_category": "primary", "relative_speed": 0.11} + row = {"adapter": "tomtom_flow", "data": {"data": {"data": inner}}} + assert _derive_subject(row) == "Traffic flow (primary) — 11% of free-flow" + + +def test_inherits_dedup_mixin(): + for m in ("is_published", "mark_published", "sweep_old_ids"): + assert m not in TomTomFlowAdapter.__dict__, f"redefines {m}" + assert getattr(TomTomFlowAdapter, m) is getattr(SourceAdapter, m) diff --git a/uv.lock b/uv.lock index ee8cabc..15c7f7f 100644 --- a/uv.lock +++ b/uv.lock @@ -172,7 +172,7 @@ wheels = [ [[package]] name = "central" -version = "0.1.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "aiohttp" }, @@ -181,8 +181,9 @@ dependencies = [ { name = "cloudevents" }, { name = "cryptography" }, { name = "fastapi" }, - { name = "fastapi-csrf-protect" }, + { name = "itsdangerous" }, { name = "jinja2" }, + { name = "mapbox-vector-tile" }, { name = "nats-py" }, { name = "pydantic" }, { name = "pydantic-settings" }, @@ -209,8 +210,9 @@ requires-dist = [ { name = "cloudevents", specifier = ">=2.0.0" }, { name = "cryptography", specifier = ">=44.0.0" }, { name = "fastapi", specifier = ">=0.115.0" }, - { name = "fastapi-csrf-protect", specifier = ">=0.4.0" }, + { name = "itsdangerous", specifier = ">=2.2" }, { name = "jinja2", specifier = ">=3.1.6" }, + { name = "mapbox-vector-tile", specifier = ">=2.0" }, { name = "nats-py", specifier = ">=2.14.0" }, { name = "pydantic", specifier = ">=2,<3" }, { name = "pydantic-settings", specifier = ">=2.7.0" }, @@ -362,21 +364,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5a/ff/2e4eca3ade2c22fe1dea7043b8ee9dabe47753349eb1b56a202de8af6349/fastapi-0.136.1-py3-none-any.whl", hash = "sha256:a6e9d7eeada96c93a4d69cb03836b44fa34e2854accb7244a1ece36cd4781c3f", size = 117683, upload-time = "2026-04-23T16:49:42.437Z" }, ] -[[package]] -name = "fastapi-csrf-protect" -version = "1.0.7" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "itsdangerous" }, - { name = "pydantic" }, - { name = "pydantic-settings" }, - { name = "starlette" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/f6/1a/fedbcb4aba24ccc8abfb5d30e08112073c6a9f20b8d88adbdd3051ceedac/fastapi_csrf_protect-1.0.7.tar.gz", hash = "sha256:888b15b232625aae5b997fbcf81ef45633a7694f0312a054f1eec6d132b295fb", size = 207326, upload-time = "2025-09-16T07:06:08.586Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/bf/10/f248aab919678444723d557da918088e5c737b44e03e3aa4a0ad7afc7dae/fastapi_csrf_protect-1.0.7-py3-none-any.whl", hash = "sha256:ca3c5b50564af932ac4ed3d06caeed61bf16eed13a31cfe2bdfc3f7c1e8612a3", size = 18412, upload-time = "2025-09-16T07:06:05.926Z" }, -] - [[package]] name = "frozenlist" version = "1.8.0" @@ -514,6 +501,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/59/67/a6739ac96e28b7855808bdb0370e250606104a859750d209e5a0716fe7ab/librt-0.11.0-cp312-cp312-win_arm64.whl", hash = "sha256:2f10cf143e4a9bb0f4f5af568a00df94a2d69ef41c2579584454bb0fe5cc642c", size = 103470, upload-time = "2026-05-10T18:16:10.369Z" }, ] +[[package]] +name = "mapbox-vector-tile" +version = "2.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, + { name = "pyclipper" }, + { name = "shapely" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e9/e0/b511bd7433105d363f37bb83f00a6e15502b04ebcec68c25e3da630d2b53/mapbox_vector_tile-2.2.0.tar.gz", hash = "sha256:9fbf2e94890429ccdaf8e047019dccadd9deb03f5b2ae9b5c5561d27a20a0eb3", size = 26038, upload-time = "2025-07-08T02:20:09.532Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/50/79/cb2a50533c9c3b545eace2deffba0d002b56713c68b26b6ac1e53a4c1d18/mapbox_vector_tile-2.2.0-py3-none-any.whl", hash = "sha256:d26ad320ade60cc6c0b66edc6ee4b6f53663aedf0b444b115c6ba68e9ba1e6d1", size = 23986, upload-time = "2025-07-08T02:20:08.415Z" }, +] + [[package]] name = "markupsafe" version = "3.0.3" @@ -673,6 +674,35 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3a/ed/1cdcab6ba3d6ab7feca11fc14f0eeea80755bb53ef4e892079f31b10a25f/propcache-0.5.2-py3-none-any.whl", hash = "sha256:be1ddfcbb376e3de5d2e2db1d58d6d67463e6b4f9f040c000de8e300295465fe", size = 14036, upload-time = "2026-05-08T21:02:10.673Z" }, ] +[[package]] +name = "protobuf" +version = "6.33.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/66/70/e908e9c5e52ef7c3a6c7902c9dfbb34c7e29c25d2f81ade3856445fd5c94/protobuf-6.33.6.tar.gz", hash = "sha256:a6768d25248312c297558af96a9f9c929e8c4cee0659cb07e780731095f38135", size = 444531, upload-time = "2026-03-18T19:05:00.988Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fc/9f/2f509339e89cfa6f6a4c4ff50438db9ca488dec341f7e454adad60150b00/protobuf-6.33.6-cp310-abi3-win32.whl", hash = "sha256:7d29d9b65f8afef196f8334e80d6bc1d5d4adedb449971fefd3723824e6e77d3", size = 425739, upload-time = "2026-03-18T19:04:48.373Z" }, + { url = "https://files.pythonhosted.org/packages/76/5d/683efcd4798e0030c1bab27374fd13a89f7c2515fb1f3123efdfaa5eab57/protobuf-6.33.6-cp310-abi3-win_amd64.whl", hash = "sha256:0cd27b587afca21b7cfa59a74dcbd48a50f0a6400cfb59391340ad729d91d326", size = 437089, upload-time = "2026-03-18T19:04:50.381Z" }, + { url = "https://files.pythonhosted.org/packages/5c/01/a3c3ed5cd186f39e7880f8303cc51385a198a81469d53d0fdecf1f64d929/protobuf-6.33.6-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:9720e6961b251bde64edfdab7d500725a2af5280f3f4c87e57c0208376aa8c3a", size = 427737, upload-time = "2026-03-18T19:04:51.866Z" }, + { url = "https://files.pythonhosted.org/packages/ee/90/b3c01fdec7d2f627b3a6884243ba328c1217ed2d978def5c12dc50d328a3/protobuf-6.33.6-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:e2afbae9b8e1825e3529f88d514754e094278bb95eadc0e199751cdd9a2e82a2", size = 324610, upload-time = "2026-03-18T19:04:53.096Z" }, + { url = "https://files.pythonhosted.org/packages/9b/ca/25afc144934014700c52e05103c2421997482d561f3101ff352e1292fb81/protobuf-6.33.6-cp39-abi3-manylinux2014_s390x.whl", hash = "sha256:c96c37eec15086b79762ed265d59ab204dabc53056e3443e702d2681f4b39ce3", size = 339381, upload-time = "2026-03-18T19:04:54.616Z" }, + { url = "https://files.pythonhosted.org/packages/16/92/d1e32e3e0d894fe00b15ce28ad4944ab692713f2e7f0a99787405e43533a/protobuf-6.33.6-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:e9db7e292e0ab79dd108d7f1a94fe31601ce1ee3f7b79e0692043423020b0593", size = 323436, upload-time = "2026-03-18T19:04:55.768Z" }, + { url = "https://files.pythonhosted.org/packages/c4/72/02445137af02769918a93807b2b7890047c32bfb9f90371cbc12688819eb/protobuf-6.33.6-py3-none-any.whl", hash = "sha256:77179e006c476e69bf8e8ce866640091ec42e1beb80b213c3900006ecfba6901", size = 170656, upload-time = "2026-03-18T19:04:59.826Z" }, +] + +[[package]] +name = "pyclipper" +version = "1.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f6/21/3c06205bb407e1f79b73b7b4dfb3950bd9537c4f625a68ab5cc41177f5bc/pyclipper-1.4.0.tar.gz", hash = "sha256:9882bd889f27da78add4dd6f881d25697efc740bf840274e749988d25496c8e1", size = 54489, upload-time = "2025-12-01T13:15:35.015Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/90/1b/7a07b68e0842324d46c03e512d8eefa9cb92ba2a792b3b4ebf939dafcac3/pyclipper-1.4.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:222ac96c8b8281b53d695b9c4fedc674f56d6d4320ad23f1bdbd168f4e316140", size = 265676, upload-time = "2025-12-01T13:15:04.15Z" }, + { url = "https://files.pythonhosted.org/packages/6b/dd/8bd622521c05d04963420ae6664093f154343ed044c53ea260a310c8bb4d/pyclipper-1.4.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:f3672dbafbb458f1b96e1ee3e610d174acb5ace5bd2ed5d1252603bb797f2fc6", size = 140458, upload-time = "2025-12-01T13:15:05.76Z" }, + { url = "https://files.pythonhosted.org/packages/7a/06/6e3e241882bf7d6ab23d9c69ba4e85f1ec47397cbbeee948a16cf75e21ed/pyclipper-1.4.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d1f807e2b4760a8e5c6d6b4e8c1d71ef52b7fe1946ff088f4fa41e16a881a5ca", size = 978235, upload-time = "2025-12-01T13:15:06.993Z" }, + { url = "https://files.pythonhosted.org/packages/cf/f4/3418c1cd5eea640a9fa2501d4bc0b3655fa8d40145d1a4f484b987990a75/pyclipper-1.4.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce1f83c9a4e10ea3de1959f0ae79e9a5bd41346dff648fee6228ba9eaf8b3872", size = 961388, upload-time = "2025-12-01T13:15:08.467Z" }, + { url = "https://files.pythonhosted.org/packages/ac/94/c85401d24be634af529c962dd5d781f3cb62a67cd769534df2cb3feee97a/pyclipper-1.4.0-cp312-cp312-win32.whl", hash = "sha256:3ef44b64666ebf1cb521a08a60c3e639d21b8c50bfbe846ba7c52a0415e936f4", size = 95169, upload-time = "2025-12-01T13:15:10.098Z" }, + { url = "https://files.pythonhosted.org/packages/97/77/dfea08e3b230b82ee22543c30c35d33d42f846a77f96caf7c504dd54fab1/pyclipper-1.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:d1e5498d883b706a4ce636247f0d830c6eb34a25b843a1b78e2c969754ca9037", size = 104619, upload-time = "2025-12-01T13:15:11.592Z" }, +] + [[package]] name = "pycparser" version = "3.0"