From cf1d96e15952adb383989643adb861e702dd79be Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Tue, 26 May 2026 00:04:02 +0000 Subject: [PATCH] feat(tomtom_flow): Navi passthrough endpoint /api/traffic/flow (v0.9.4) Phased PR2 of v0.9.3. Adds the on-demand tile passthrough so Navi can call Central instead of navi-traffic, and so tiles outside the polled coverage set still get persisted. central-gui restart only (new route); no supervisor, no archive, no migration, no new stream/dep. GET /api/traffic/flow///.{png,pbf} -- exactly matches navi-traffic's route for a drop-in frontend flip (s/navi-traffic/central:8000/). No auth (tailnet-trusted, matches navi-traffic's existing posture; exempted in middleware). Cache-Control max-age=60 (TomTom's advertised tile TTL). - .png -> passthrough + cache only (raster unparseable for storage). - .pbf -> passthrough + decode via the shared tomtom_flow_parse.decode_flow_tile and publish segments to CENTRAL_TRAFFIC_FLOW via the GUI's NATS connection (wrap_event, supervisor's exact envelope shape). Same minute-bucketed dedup id as the polling adapter, so adapter+passthrough fetches of one tile in the same minute collapse at the archive's (id, time) upsert -- intentional collision. - Publish is best-effort: wrapped in try/except + guards get_js() None, so NATS state never blocks serving the tile (HTTP > storage). - API key via ConfigStore(get_pool()).get_api_key("tomtom"); 503 if unset; redacted from any logged upstream error; 502 on upstream failure (no publish). Single-flight (shared in-flight upstream request per tile) queued for v0.9.5. navi-frontend flip + navi-traffic deprecation live in the navi repo (Matt's call). Full suite: 787 passed, 1 skipped (central and unprivileged zvx, 3x each). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/central/gui/middleware.py | 4 +- src/central/gui/routes.py | 80 ++++++++++++++++++ tests/test_tomtom_flow_passthrough.py | 113 ++++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 tests/test_tomtom_flow_passthrough.py diff --git a/src/central/gui/middleware.py b/src/central/gui/middleware.py index 2af6230..962cbc0 100644 --- a/src/central/gui/middleware.py +++ b/src/central/gui/middleware.py @@ -12,11 +12,11 @@ from central.gui.db import get_pool logger = logging.getLogger(__name__) # Paths that don't require setup to be complete -SETUP_EXEMPT_PREFIXES = ("/static/", "/setup") +SETUP_EXEMPT_PREFIXES = ("/static/", "/setup", "/api/traffic/flow/") # Paths that don't require authentication AUTH_EXEMPT_PATHS = {"/setup/operator", "/login", "/health"} -AUTH_EXEMPT_PREFIXES = ("/static/", "/setup/") +AUTH_EXEMPT_PREFIXES = ("/static/", "/setup/", "/api/traffic/flow/") # Browser-noise paths that trigger CSRF race conditions BROWSER_NOISE_PATHS = { diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 9466c0d..8a80647 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -9,6 +9,13 @@ from datetime import datetime, timedelta, timezone from typing import Any from urllib.parse import urlencode +import aiohttp + +from central.cloudevents_wire import wrap_event +from central.config_store import ConfigStore +from central.tomtom_flow_parse import decode_flow_tile +from central.gui.nats import get_js + logger = logging.getLogger("central.gui.routes") @@ -3434,3 +3441,76 @@ async def telemetry_list(request: Request) -> HTMLResponse: @router.get("/telemetry/rows", response_class=HTMLResponse) async def telemetry_rows(request: Request) -> HTMLResponse: return await _events_rows_fragment(request, "telemetry", "/telemetry") + + +# --- Traffic flow passthrough (Navi, v0.9.4) --------------------------------- +# Drop-in for navi-traffic's /api/traffic/flow///.{png,pbf} (no auth -- +# tailnet-trusted, exempted in middleware). PNG = passthrough + 60s cache; PBF = +# passthrough + decode-and-persist to CENTRAL_TRAFFIC_FLOW (best-effort; the +# minute-bucketed ids collide with the tomtom_flow poller so the archive's +# (id, time) upsert collapses duplicates). Single-flight is a v0.9.5 candidate. +_TOMTOM_FLOW_URL = "https://api.tomtom.com/maps/orbis/traffic/tile/flow/{z}/{x}/{y}.{fmt}?key={key}&apiVersion=1" +_FLOW_MEDIA = {"png": "image/png", "pbf": "application/x-protobuf"} +_FLOW_CACHE = "public, max-age=60" # matches TomTom's advertised 60s tile TTL + + +async def _fetch_tomtom_tile(z: int, x: int, y: int, fmt: str, key: str) -> bytes: + """Fetch one Orbis flow tile from TomTom. Raises on HTTP/transport error.""" + url = _TOMTOM_FLOW_URL.format(z=z, x=x, y=y, fmt=fmt, key=key) + timeout = aiohttp.ClientTimeout(total=30) + async with aiohttp.ClientSession( + timeout=timeout, headers={"User-Agent": "Central/0.9 (+tomtom_flow_passthrough)"} + ) as session: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.read() + + +async def _persist_flow_tile(pbf: bytes, z: int, x: int, y: int) -> None: + """Decode a vector tile and publish its segments to CENTRAL_TRAFFIC_FLOW. + + Reuses the polling adapter's decode + minute-bucketed ids so the archive + dedups poller/passthrough overlap. Caller wraps this best-effort. + """ + js = get_js() + if js is None: + return + subject = f"central.traffic_flow.{z}.{x}.{y}" + for ev in decode_flow_tile(pbf, z, x, y, datetime.now(timezone.utc)): + envelope, msg_id = wrap_event(ev) + await js.publish(subject, json.dumps(envelope).encode(), headers={"Nats-Msg-Id": msg_id}) + + +async def _flow_passthrough(z: int, x: int, y: int, fmt: str) -> Response: + """Proxy a TomTom Orbis flow tile; persist segments on .pbf (best-effort).""" + key = await ConfigStore(get_pool()).get_api_key("tomtom") + if not key: + return Response( + content=json.dumps({"error": "tomtom api key not configured"}), + status_code=503, media_type="application/json", + ) + try: + tile = await _fetch_tomtom_tile(z, x, y, fmt, key) + except Exception as exc: + logger.warning("flow passthrough upstream failed", + extra={"tile": [z, x, y], "fmt": fmt, "error": str(exc).replace(key, "")}) + return Response( + content=json.dumps({"error": "upstream tile fetch failed"}), + status_code=502, media_type="application/json", + ) + if fmt == "pbf": + try: + await _persist_flow_tile(tile, z, x, y) + except Exception: # storage is secondary to serving the tile + logger.exception("flow passthrough persist failed", extra={"tile": [z, x, y]}) + return Response(content=tile, media_type=_FLOW_MEDIA[fmt], headers={"Cache-Control": _FLOW_CACHE}) + + +@router.get("/api/traffic/flow/{z}/{x}/{y}.png") +async def traffic_flow_png(z: int, x: int, y: int) -> Response: + return await _flow_passthrough(z, x, y, "png") + + +@router.get("/api/traffic/flow/{z}/{x}/{y}.pbf") +async def traffic_flow_pbf(z: int, x: int, y: int) -> Response: + return await _flow_passthrough(z, x, y, "pbf") diff --git a/tests/test_tomtom_flow_passthrough.py b/tests/test_tomtom_flow_passthrough.py new file mode 100644 index 0000000..f658fad --- /dev/null +++ b/tests/test_tomtom_flow_passthrough.py @@ -0,0 +1,113 @@ +"""Tests for the v0.9.4 tomtom_flow Navi passthrough endpoint. + +Exercises _flow_passthrough directly with mocked upstream + NATS (mirrors how +test_events_feed_frontend calls route handlers). Uses the real Orbis fixture +from v0.9.3 for the decode-and-publish path. +""" + +import json +import re +from contextlib import contextmanager +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.gui import routes + +PBF = (Path(__file__).parent / "fixtures" / "tomtom_flow_orbis.pbf").read_bytes() +PNG = b"\x89PNG\r\n\x1a\n" + b"fake-png-bytes" + + +@contextmanager +def patches(key="testkey", tile=PNG, js=None, fetch_exc=None): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value=key) + fetch = AsyncMock(side_effect=fetch_exc) if fetch_exc else AsyncMock(return_value=tile) + with patch("central.gui.routes.get_pool", return_value=MagicMock()), \ + patch("central.gui.routes.ConfigStore", return_value=cs), \ + patch("central.gui.routes._fetch_tomtom_tile", fetch), \ + patch("central.gui.routes.get_js", return_value=js): + yield + + +@pytest.mark.asyncio +async def test_png_passthrough_no_publish(): + js = MagicMock() + js.publish = AsyncMock() + with patches(tile=PNG, js=js): + resp = await routes._flow_passthrough(10, 181, 374, "png") + assert resp.status_code == 200 + assert resp.body == PNG + assert resp.media_type == "image/png" + assert resp.headers["Cache-Control"] == "public, max-age=60" + js.publish.assert_not_called() # raster is unparseable -> no storage + + +@pytest.mark.asyncio +async def test_pbf_passthrough_publishes_segments(): + js = MagicMock() + js.publish = AsyncMock() + with patches(tile=PBF, js=js): + resp = await routes._flow_passthrough(10, 181, 374, "pbf") + assert resp.status_code == 200 + assert resp.body == PBF + assert resp.media_type == "application/x-protobuf" + assert js.publish.await_count > 50 + subject = js.publish.await_args_list[0].args[0] + payload = json.loads(js.publish.await_args_list[0].args[1]) + assert subject == "central.traffic_flow.10.181.374" + # dedup id collides with the polling adapter's minute-bucketed pattern + assert re.match(r"^10/181/374:\d+:\d{4}-\d\d-\d\dT\d\d:\d\d$", payload["data"]["id"]) + + +@pytest.mark.asyncio +async def test_missing_key_503(): + with patches(key=None): + resp = await routes._flow_passthrough(10, 181, 374, "pbf") + assert resp.status_code == 503 + assert b"not configured" in resp.body + + +@pytest.mark.asyncio +async def test_upstream_failure_502_no_publish(): + js = MagicMock() + js.publish = AsyncMock() + with patches(fetch_exc=RuntimeError("boom"), js=js): + resp = await routes._flow_passthrough(10, 181, 374, "pbf") + assert resp.status_code == 502 + assert b"upstream tile fetch failed" in resp.body + js.publish.assert_not_called() + + +@pytest.mark.asyncio +async def test_key_never_leaks_on_error(caplog): + secret = "SECRETKEY1234567890ABCDEF12345678" + with caplog.at_level("WARNING"): + with patches(key=secret, fetch_exc=RuntimeError(f"401 url=...key={secret}")): + resp = await routes._flow_passthrough(10, 181, 374, "png") + assert resp.status_code == 502 + assert secret not in resp.body.decode() + assert secret not in caplog.text + # the redacted error string rode in the log record's `error` extra field + errs = " ".join(str(getattr(r, "error", "")) for r in caplog.records) + assert secret not in errs + assert "" in errs + + +@pytest.mark.asyncio +async def test_publish_failure_still_returns_tile(): + js = MagicMock() + js.publish = AsyncMock(side_effect=RuntimeError("nats down")) + with patches(tile=PBF, js=js): + resp = await routes._flow_passthrough(10, 181, 374, "pbf") + assert resp.status_code == 200 # storage is best-effort; tile still served + assert resp.body == PBF + + +@pytest.mark.asyncio +async def test_no_js_still_returns_tile(): + with patches(tile=PBF, js=None): # GUI NATS not connected + resp = await routes._flow_passthrough(10, 181, 374, "pbf") + assert resp.status_code == 200 + assert resp.body == PBF