feat(tomtom_flow): Navi passthrough endpoint /api/traffic/flow (v0.9.4)

Phased PR2 of v0.9.3. Adds the on-demand tile passthrough so Navi can call
Central instead of navi-traffic, and so tiles outside the polled coverage set
still get persisted. central-gui restart only (new route); no supervisor, no
archive, no migration, no new stream/dep.

GET /api/traffic/flow/<z>/<x>/<y>.{png,pbf} -- exactly matches navi-traffic's
route for a drop-in frontend flip (s/navi-traffic/central:8000/). No auth
(tailnet-trusted, matches navi-traffic's existing posture; exempted in
middleware). Cache-Control max-age=60 (TomTom's advertised tile TTL).

- .png -> passthrough + cache only (raster unparseable for storage).
- .pbf -> passthrough + decode via the shared tomtom_flow_parse.decode_flow_tile
  and publish segments to CENTRAL_TRAFFIC_FLOW via the GUI's NATS connection
  (wrap_event, supervisor's exact envelope shape). Same minute-bucketed dedup id
  as the polling adapter, so adapter+passthrough fetches of one tile in the same
  minute collapse at the archive's (id, time) upsert -- intentional collision.
- Publish is best-effort: wrapped in try/except + guards get_js() None, so NATS
  state never blocks serving the tile (HTTP > storage).
- API key via ConfigStore(get_pool()).get_api_key("tomtom"); 503 if unset;
  redacted from any logged upstream error; 502 on upstream failure (no publish).

Single-flight (shared in-flight upstream request per tile) queued for v0.9.5.
navi-frontend flip + navi-traffic deprecation live in the navi repo (Matt's call).

Full suite: 787 passed, 1 skipped (central and unprivileged zvx, 3x each).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-26 00:04:02 +00:00
commit cf1d96e159
3 changed files with 195 additions and 2 deletions

View file

@ -9,6 +9,13 @@ from datetime import datetime, timedelta, timezone
from typing import Any
from urllib.parse import urlencode
import aiohttp
from central.cloudevents_wire import wrap_event
from central.config_store import ConfigStore
from central.tomtom_flow_parse import decode_flow_tile
from central.gui.nats import get_js
logger = logging.getLogger("central.gui.routes")
@ -3434,3 +3441,76 @@ async def telemetry_list(request: Request) -> HTMLResponse:
@router.get("/telemetry/rows", response_class=HTMLResponse)
async def telemetry_rows(request: Request) -> HTMLResponse:
return await _events_rows_fragment(request, "telemetry", "/telemetry")
# --- Traffic flow passthrough (Navi, v0.9.4) ---------------------------------
# Drop-in for navi-traffic's /api/traffic/flow/<z>/<x>/<y>.{png,pbf} (no auth --
# tailnet-trusted, exempted in middleware). PNG = passthrough + 60s cache; PBF =
# passthrough + decode-and-persist to CENTRAL_TRAFFIC_FLOW (best-effort; the
# minute-bucketed ids collide with the tomtom_flow poller so the archive's
# (id, time) upsert collapses duplicates). Single-flight is a v0.9.5 candidate.
_TOMTOM_FLOW_URL = "https://api.tomtom.com/maps/orbis/traffic/tile/flow/{z}/{x}/{y}.{fmt}?key={key}&apiVersion=1"
_FLOW_MEDIA = {"png": "image/png", "pbf": "application/x-protobuf"}
_FLOW_CACHE = "public, max-age=60" # matches TomTom's advertised 60s tile TTL
async def _fetch_tomtom_tile(z: int, x: int, y: int, fmt: str, key: str) -> bytes:
"""Fetch one Orbis flow tile from TomTom. Raises on HTTP/transport error."""
url = _TOMTOM_FLOW_URL.format(z=z, x=x, y=y, fmt=fmt, key=key)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
timeout=timeout, headers={"User-Agent": "Central/0.9 (+tomtom_flow_passthrough)"}
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.read()
async def _persist_flow_tile(pbf: bytes, z: int, x: int, y: int) -> None:
"""Decode a vector tile and publish its segments to CENTRAL_TRAFFIC_FLOW.
Reuses the polling adapter's decode + minute-bucketed ids so the archive
dedups poller/passthrough overlap. Caller wraps this best-effort.
"""
js = get_js()
if js is None:
return
subject = f"central.traffic_flow.{z}.{x}.{y}"
for ev in decode_flow_tile(pbf, z, x, y, datetime.now(timezone.utc)):
envelope, msg_id = wrap_event(ev)
await js.publish(subject, json.dumps(envelope).encode(), headers={"Nats-Msg-Id": msg_id})
async def _flow_passthrough(z: int, x: int, y: int, fmt: str) -> Response:
"""Proxy a TomTom Orbis flow tile; persist segments on .pbf (best-effort)."""
key = await ConfigStore(get_pool()).get_api_key("tomtom")
if not key:
return Response(
content=json.dumps({"error": "tomtom api key not configured"}),
status_code=503, media_type="application/json",
)
try:
tile = await _fetch_tomtom_tile(z, x, y, fmt, key)
except Exception as exc:
logger.warning("flow passthrough upstream failed",
extra={"tile": [z, x, y], "fmt": fmt, "error": str(exc).replace(key, "<KEY>")})
return Response(
content=json.dumps({"error": "upstream tile fetch failed"}),
status_code=502, media_type="application/json",
)
if fmt == "pbf":
try:
await _persist_flow_tile(tile, z, x, y)
except Exception: # storage is secondary to serving the tile
logger.exception("flow passthrough persist failed", extra={"tile": [z, x, y]})
return Response(content=tile, media_type=_FLOW_MEDIA[fmt], headers={"Cache-Control": _FLOW_CACHE})
@router.get("/api/traffic/flow/{z}/{x}/{y}.png")
async def traffic_flow_png(z: int, x: int, y: int) -> Response:
return await _flow_passthrough(z, x, y, "png")
@router.get("/api/traffic/flow/{z}/{x}/{y}.pbf")
async def traffic_flow_pbf(z: int, x: int, y: int) -> Response:
return await _flow_passthrough(z, x, y, "pbf")