mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
Merge pull request #64 from zvx-echo6/feat/tomtom-flow-passthrough
feat(tomtom_flow): Navi passthrough endpoint /api/traffic/flow (v0.9.4)
This commit is contained in:
commit
a70b53572e
3 changed files with 195 additions and 2 deletions
|
|
@ -12,11 +12,11 @@ from central.gui.db import get_pool
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Paths that don't require setup to be complete
|
# Paths that don't require setup to be complete
|
||||||
SETUP_EXEMPT_PREFIXES = ("/static/", "/setup")
|
SETUP_EXEMPT_PREFIXES = ("/static/", "/setup", "/api/traffic/flow/")
|
||||||
|
|
||||||
# Paths that don't require authentication
|
# Paths that don't require authentication
|
||||||
AUTH_EXEMPT_PATHS = {"/setup/operator", "/login", "/health"}
|
AUTH_EXEMPT_PATHS = {"/setup/operator", "/login", "/health"}
|
||||||
AUTH_EXEMPT_PREFIXES = ("/static/", "/setup/")
|
AUTH_EXEMPT_PREFIXES = ("/static/", "/setup/", "/api/traffic/flow/")
|
||||||
|
|
||||||
# Browser-noise paths that trigger CSRF race conditions
|
# Browser-noise paths that trigger CSRF race conditions
|
||||||
BROWSER_NOISE_PATHS = {
|
BROWSER_NOISE_PATHS = {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,13 @@ from datetime import datetime, timedelta, timezone
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
|
from central.cloudevents_wire import wrap_event
|
||||||
|
from central.config_store import ConfigStore
|
||||||
|
from central.tomtom_flow_parse import decode_flow_tile
|
||||||
|
from central.gui.nats import get_js
|
||||||
|
|
||||||
logger = logging.getLogger("central.gui.routes")
|
logger = logging.getLogger("central.gui.routes")
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -3434,3 +3441,76 @@ async def telemetry_list(request: Request) -> HTMLResponse:
|
||||||
@router.get("/telemetry/rows", response_class=HTMLResponse)
|
@router.get("/telemetry/rows", response_class=HTMLResponse)
|
||||||
async def telemetry_rows(request: Request) -> HTMLResponse:
|
async def telemetry_rows(request: Request) -> HTMLResponse:
|
||||||
return await _events_rows_fragment(request, "telemetry", "/telemetry")
|
return await _events_rows_fragment(request, "telemetry", "/telemetry")
|
||||||
|
|
||||||
|
|
||||||
|
# --- Traffic flow passthrough (Navi, v0.9.4) ---------------------------------
|
||||||
|
# Drop-in for navi-traffic's /api/traffic/flow/<z>/<x>/<y>.{png,pbf} (no auth --
|
||||||
|
# tailnet-trusted, exempted in middleware). PNG = passthrough + 60s cache; PBF =
|
||||||
|
# passthrough + decode-and-persist to CENTRAL_TRAFFIC_FLOW (best-effort; the
|
||||||
|
# minute-bucketed ids collide with the tomtom_flow poller so the archive's
|
||||||
|
# (id, time) upsert collapses duplicates). Single-flight is a v0.9.5 candidate.
|
||||||
|
_TOMTOM_FLOW_URL = "https://api.tomtom.com/maps/orbis/traffic/tile/flow/{z}/{x}/{y}.{fmt}?key={key}&apiVersion=1"
|
||||||
|
_FLOW_MEDIA = {"png": "image/png", "pbf": "application/x-protobuf"}
|
||||||
|
_FLOW_CACHE = "public, max-age=60" # matches TomTom's advertised 60s tile TTL
|
||||||
|
|
||||||
|
|
||||||
|
async def _fetch_tomtom_tile(z: int, x: int, y: int, fmt: str, key: str) -> bytes:
|
||||||
|
"""Fetch one Orbis flow tile from TomTom. Raises on HTTP/transport error."""
|
||||||
|
url = _TOMTOM_FLOW_URL.format(z=z, x=x, y=y, fmt=fmt, key=key)
|
||||||
|
timeout = aiohttp.ClientTimeout(total=30)
|
||||||
|
async with aiohttp.ClientSession(
|
||||||
|
timeout=timeout, headers={"User-Agent": "Central/0.9 (+tomtom_flow_passthrough)"}
|
||||||
|
) as session:
|
||||||
|
async with session.get(url) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
return await resp.read()
|
||||||
|
|
||||||
|
|
||||||
|
async def _persist_flow_tile(pbf: bytes, z: int, x: int, y: int) -> None:
|
||||||
|
"""Decode a vector tile and publish its segments to CENTRAL_TRAFFIC_FLOW.
|
||||||
|
|
||||||
|
Reuses the polling adapter's decode + minute-bucketed ids so the archive
|
||||||
|
dedups poller/passthrough overlap. Caller wraps this best-effort.
|
||||||
|
"""
|
||||||
|
js = get_js()
|
||||||
|
if js is None:
|
||||||
|
return
|
||||||
|
subject = f"central.traffic_flow.{z}.{x}.{y}"
|
||||||
|
for ev in decode_flow_tile(pbf, z, x, y, datetime.now(timezone.utc)):
|
||||||
|
envelope, msg_id = wrap_event(ev)
|
||||||
|
await js.publish(subject, json.dumps(envelope).encode(), headers={"Nats-Msg-Id": msg_id})
|
||||||
|
|
||||||
|
|
||||||
|
async def _flow_passthrough(z: int, x: int, y: int, fmt: str) -> Response:
|
||||||
|
"""Proxy a TomTom Orbis flow tile; persist segments on .pbf (best-effort)."""
|
||||||
|
key = await ConfigStore(get_pool()).get_api_key("tomtom")
|
||||||
|
if not key:
|
||||||
|
return Response(
|
||||||
|
content=json.dumps({"error": "tomtom api key not configured"}),
|
||||||
|
status_code=503, media_type="application/json",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
tile = await _fetch_tomtom_tile(z, x, y, fmt, key)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("flow passthrough upstream failed",
|
||||||
|
extra={"tile": [z, x, y], "fmt": fmt, "error": str(exc).replace(key, "<KEY>")})
|
||||||
|
return Response(
|
||||||
|
content=json.dumps({"error": "upstream tile fetch failed"}),
|
||||||
|
status_code=502, media_type="application/json",
|
||||||
|
)
|
||||||
|
if fmt == "pbf":
|
||||||
|
try:
|
||||||
|
await _persist_flow_tile(tile, z, x, y)
|
||||||
|
except Exception: # storage is secondary to serving the tile
|
||||||
|
logger.exception("flow passthrough persist failed", extra={"tile": [z, x, y]})
|
||||||
|
return Response(content=tile, media_type=_FLOW_MEDIA[fmt], headers={"Cache-Control": _FLOW_CACHE})
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/api/traffic/flow/{z}/{x}/{y}.png")
|
||||||
|
async def traffic_flow_png(z: int, x: int, y: int) -> Response:
|
||||||
|
return await _flow_passthrough(z, x, y, "png")
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/api/traffic/flow/{z}/{x}/{y}.pbf")
|
||||||
|
async def traffic_flow_pbf(z: int, x: int, y: int) -> Response:
|
||||||
|
return await _flow_passthrough(z, x, y, "pbf")
|
||||||
|
|
|
||||||
113
tests/test_tomtom_flow_passthrough.py
Normal file
113
tests/test_tomtom_flow_passthrough.py
Normal file
|
|
@ -0,0 +1,113 @@
|
||||||
|
"""Tests for the v0.9.4 tomtom_flow Navi passthrough endpoint.
|
||||||
|
|
||||||
|
Exercises _flow_passthrough directly with mocked upstream + NATS (mirrors how
|
||||||
|
test_events_feed_frontend calls route handlers). Uses the real Orbis fixture
|
||||||
|
from v0.9.3 for the decode-and-publish path.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from central.gui import routes
|
||||||
|
|
||||||
|
PBF = (Path(__file__).parent / "fixtures" / "tomtom_flow_orbis.pbf").read_bytes()
|
||||||
|
PNG = b"\x89PNG\r\n\x1a\n" + b"fake-png-bytes"
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def patches(key="testkey", tile=PNG, js=None, fetch_exc=None):
|
||||||
|
cs = MagicMock()
|
||||||
|
cs.get_api_key = AsyncMock(return_value=key)
|
||||||
|
fetch = AsyncMock(side_effect=fetch_exc) if fetch_exc else AsyncMock(return_value=tile)
|
||||||
|
with patch("central.gui.routes.get_pool", return_value=MagicMock()), \
|
||||||
|
patch("central.gui.routes.ConfigStore", return_value=cs), \
|
||||||
|
patch("central.gui.routes._fetch_tomtom_tile", fetch), \
|
||||||
|
patch("central.gui.routes.get_js", return_value=js):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_png_passthrough_no_publish():
|
||||||
|
js = MagicMock()
|
||||||
|
js.publish = AsyncMock()
|
||||||
|
with patches(tile=PNG, js=js):
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "png")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.body == PNG
|
||||||
|
assert resp.media_type == "image/png"
|
||||||
|
assert resp.headers["Cache-Control"] == "public, max-age=60"
|
||||||
|
js.publish.assert_not_called() # raster is unparseable -> no storage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pbf_passthrough_publishes_segments():
|
||||||
|
js = MagicMock()
|
||||||
|
js.publish = AsyncMock()
|
||||||
|
with patches(tile=PBF, js=js):
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "pbf")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.body == PBF
|
||||||
|
assert resp.media_type == "application/x-protobuf"
|
||||||
|
assert js.publish.await_count > 50
|
||||||
|
subject = js.publish.await_args_list[0].args[0]
|
||||||
|
payload = json.loads(js.publish.await_args_list[0].args[1])
|
||||||
|
assert subject == "central.traffic_flow.10.181.374"
|
||||||
|
# dedup id collides with the polling adapter's minute-bucketed pattern
|
||||||
|
assert re.match(r"^10/181/374:\d+:\d{4}-\d\d-\d\dT\d\d:\d\d$", payload["data"]["id"])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_missing_key_503():
|
||||||
|
with patches(key=None):
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "pbf")
|
||||||
|
assert resp.status_code == 503
|
||||||
|
assert b"not configured" in resp.body
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_upstream_failure_502_no_publish():
|
||||||
|
js = MagicMock()
|
||||||
|
js.publish = AsyncMock()
|
||||||
|
with patches(fetch_exc=RuntimeError("boom"), js=js):
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "pbf")
|
||||||
|
assert resp.status_code == 502
|
||||||
|
assert b"upstream tile fetch failed" in resp.body
|
||||||
|
js.publish.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_key_never_leaks_on_error(caplog):
|
||||||
|
secret = "SECRETKEY1234567890ABCDEF12345678"
|
||||||
|
with caplog.at_level("WARNING"):
|
||||||
|
with patches(key=secret, fetch_exc=RuntimeError(f"401 url=...key={secret}")):
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "png")
|
||||||
|
assert resp.status_code == 502
|
||||||
|
assert secret not in resp.body.decode()
|
||||||
|
assert secret not in caplog.text
|
||||||
|
# the redacted error string rode in the log record's `error` extra field
|
||||||
|
errs = " ".join(str(getattr(r, "error", "")) for r in caplog.records)
|
||||||
|
assert secret not in errs
|
||||||
|
assert "<KEY>" in errs
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_failure_still_returns_tile():
|
||||||
|
js = MagicMock()
|
||||||
|
js.publish = AsyncMock(side_effect=RuntimeError("nats down"))
|
||||||
|
with patches(tile=PBF, js=js):
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "pbf")
|
||||||
|
assert resp.status_code == 200 # storage is best-effort; tile still served
|
||||||
|
assert resp.body == PBF
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_js_still_returns_tile():
|
||||||
|
with patches(tile=PBF, js=None): # GUI NATS not connected
|
||||||
|
resp = await routes._flow_passthrough(10, 181, 374, "pbf")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.body == PBF
|
||||||
Loading…
Add table
Add a link
Reference in a new issue