Merge PR #70: generic model_list adapter editor + TomTom bbox validation & quota (v0.9.9)

fix(gui): generic model_list editor — un-breaks 4 list-of-model adapter Edit pages + TomTom bbox validation & quota (v0.9.9)
This commit is contained in:
malice 2026-05-26 00:14:29 -06:00 committed by GitHub
commit 9ff8a33415
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 483 additions and 6 deletions

View file

@ -185,3 +185,17 @@ class SourceAdapter(ABC):
matched zero rows the framework renders that distinctly from None. matched zero rows the framework renders that distinctly from None.
""" """
return None return None
@classmethod
def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None:
"""Optional. Override to surface a read-only API-quota panel on the edit
page and to hard-block saves that would exceed a provider's free tier.
Pure function of (settings, cadence_s) no I/O, no instance state so
the framework calls it on the class in both the GET (render) and POST
(pre-persist gate) paths. Return None to skip (default).
Return dict with: calls_per_month:int, cap:int, percent:float,
warn:bool (>=80%), blocked:bool (>=100% -> POST refused 422), detail:str.
"""
return None

View file

@ -12,6 +12,7 @@ Dedup is inherited from SourceAdapter; ids use the upstream-stable TomTom id.
import asyncio import asyncio
import logging import logging
import math
import sqlite3 import sqlite3
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from datetime import datetime, timezone from datetime import datetime, timezone
@ -19,7 +20,7 @@ from pathlib import Path
from typing import Any from typing import Any
import aiohttp import aiohttp
from pydantic import BaseModel from pydantic import BaseModel, field_validator, model_validator
from tenacity import ( from tenacity import (
retry, retry,
retry_if_exception_type, retry_if_exception_type,
@ -52,6 +53,21 @@ _DEDUP_DDL = (
"PRIMARY KEY (adapter, event_id))" "PRIMARY KEY (adapter, event_id))"
) )
_MAX_BBOX_AREA_KM2 = 10_000.0 # TomTom incidentDetails hard cap per bbox
_MIN_BBOX_CADENCE_S = 60 # per-bbox poll-interval floor
_EARTH_RADIUS_KM = 6371.0
_SECONDS_PER_MONTH = 30 * 24 * 3600 # 2_592_000, for quota estimation
# TomTom Orbis free tier: 2,500 incidentDetails calls/month. A paid-tier
# operator can raise this ceiling (v0.9.9 follow-up hook).
TOMTOM_FREE_TIER_CALLS_PER_MONTH = 2500
def _bbox_area_km2(min_lon: float, min_lat: float, max_lon: float, max_lat: float) -> float:
"""Spherical area of a lon/lat bbox in km^2."""
lat1, lat2 = math.radians(min_lat), math.radians(max_lat)
dlon = math.radians(max_lon - min_lon)
return _EARTH_RADIUS_KM ** 2 * abs(math.sin(lat2) - math.sin(lat1)) * abs(dlon)
def _parse_iso(value: str | None) -> datetime | None: def _parse_iso(value: str | None) -> datetime | None:
if not value: if not value:
@ -85,6 +101,26 @@ class BBox(BaseModel):
state_code: str state_code: str
cadence_s: int | None = None # per-bbox poll interval; None -> adapter default_cadence_s cadence_s: int | None = None # per-bbox poll interval; None -> adapter default_cadence_s
@field_validator("cadence_s")
@classmethod
def _cadence_floor(cls, v: int | None) -> int | None:
if v is not None and v < _MIN_BBOX_CADENCE_S:
raise ValueError(f"cadence_s must be >= {_MIN_BBOX_CADENCE_S} seconds")
return v
@model_validator(mode="after")
def _validate_box(self) -> "BBox":
if not (-180.0 <= self.min_lon < self.max_lon <= 180.0):
raise ValueError("require -180 <= min_lon < max_lon <= 180")
if not (-90.0 <= self.min_lat < self.max_lat <= 90.0):
raise ValueError("require -90 <= min_lat < max_lat <= 90")
area = _bbox_area_km2(self.min_lon, self.min_lat, self.max_lon, self.max_lat)
if area > _MAX_BBOX_AREA_KM2:
raise ValueError(
f"bbox area {area:.0f} km^2 exceeds TomTom limit of {_MAX_BBOX_AREA_KM2:.0f} km^2"
)
return self
class TomTomIncidentsSettings(BaseModel): class TomTomIncidentsSettings(BaseModel):
"""bboxes: metro boxes to poll (each <= 10,000 km^2). api_key_alias: config key.""" """bboxes: metro boxes to poll (each <= 10,000 km^2). api_key_alias: config key."""
@ -92,6 +128,14 @@ class TomTomIncidentsSettings(BaseModel):
bboxes: list[BBox] = [] bboxes: list[BBox] = []
api_key_alias: str = "tomtom" api_key_alias: str = "tomtom"
@model_validator(mode="after")
def _unique_names(self) -> "TomTomIncidentsSettings":
names = [b.name for b in self.bboxes]
dupes = sorted({n for n in names if names.count(n) > 1})
if dupes:
raise ValueError(f"duplicate bbox names: {', '.join(dupes)}")
return self
class TomTomIncidentsAdapter(SourceAdapter): class TomTomIncidentsAdapter(SourceAdapter):
"""TomTom Orbis incidentDetails adapter (per-bbox real-time incidents).""" """TomTom Orbis incidentDetails adapter (per-bbox real-time incidents)."""
@ -273,3 +317,27 @@ class TomTomIncidentsAdapter(SourceAdapter):
def subject_for(self, event: Event) -> str: def subject_for(self, event: Event) -> str:
code = (event.data.get("state_code") or "").lower() or "unknown" code = (event.data.get("state_code") or "").lower() or "unknown"
return f"central.traffic.incident.{code}" return f"central.traffic.incident.{code}"
@classmethod
def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None:
bboxes = getattr(settings, "bboxes", None) or []
if not bboxes:
return None
calls = sum(
_SECONDS_PER_MONTH / max(cadence_s, b.cadence_s or cls.default_cadence_s)
for b in bboxes
)
calls_per_month = round(calls)
cap = TOMTOM_FREE_TIER_CALLS_PER_MONTH
percent = (calls_per_month / cap * 100) if cap else 0.0
return {
"calls_per_month": calls_per_month,
"cap": cap,
"percent": percent,
"warn": percent >= 80.0,
"blocked": percent >= 100.0,
"detail": (
f"{calls_per_month:,} est. calls/month across {len(bboxes)} "
f"bbox(es) vs {cap:,}/month free tier ({percent:.0f}%)"
),
}

View file

@ -25,6 +25,7 @@ class FieldDescriptor:
description: str description: str
required: bool required: bool
options: list[str] | None = None # For select/checkboxes widgets options: list[str] | None = None # For select/checkboxes widgets
sub_fields: list["FieldDescriptor"] | None = None # For model_list: per-column descriptors
def _is_literal(tp: type) -> bool: def _is_literal(tp: type) -> bool:
@ -86,6 +87,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
if inner_type is str: if inner_type is str:
return "csv", None return "csv", None
# list[<BaseModel>] -> repeatable per-row editor (sub-columns resolved
# by describe_fields, which recurses into the row model).
if isinstance(inner_type, type) and issubclass(inner_type, BaseModel):
return "model_list", None
raise NotImplementedError( raise NotImplementedError(
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]" f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
) )
@ -118,6 +124,21 @@ def _is_undefined(value: Any) -> bool:
return value is PydanticUndefined return value is PydanticUndefined
def _list_row_model(field_type: type) -> type[BaseModel] | None:
"""Row model M for a ``list[M]`` (or ``Optional[list[M]]``) field where M is
a BaseModel subclass; else None."""
origin = get_origin(field_type)
args = get_args(field_type)
if origin is Union or (origin is not None and type(None) in args):
non_none = [a for a in args if a is not type(None)]
return _list_row_model(non_none[0]) if non_none else None
if origin is list:
inner = args[0] if args else None
if isinstance(inner, type) and issubclass(inner, BaseModel):
return inner
return None
def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDescriptor]: def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDescriptor]:
"""Generate field descriptors for a Pydantic model. """Generate field descriptors for a Pydantic model.
@ -137,6 +158,13 @@ def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDesc
# Determine widget and options # Determine widget and options
widget, options = _type_to_widget_and_options(field_name, field_type) widget, options = _type_to_widget_and_options(field_name, field_type)
# For a list-of-model field, recurse once to get column descriptors.
sub_fields = None
if widget == "model_list":
row_model = _list_row_model(field_type)
if row_model is not None:
sub_fields = describe_fields(row_model, {})
# Get current value, falling back to default # Get current value, falling back to default
if field_name in current: if field_name in current:
current_value = current[field_name] current_value = current[field_name]
@ -165,6 +193,7 @@ def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDesc
description=description, description=description,
required=required, required=required,
options=options, options=options,
sub_fields=sub_fields,
)) ))
return descriptors return descriptors

View file

@ -1392,6 +1392,39 @@ async def adapters_list(
return response return response
def _parse_model_list(form, field) -> list[dict]:
"""Reconstruct a list[dict] from indexed form keys ``<field>-<i>-<sub>``.
Coerces numeric sub-fields; omits blank optional numbers (model default
applies); drops fully-empty rows (e.g. a blank cloned template row)."""
sub_widgets = {sf.name: sf.widget for sf in (field.sub_fields or [])}
rows: dict[int, dict] = {}
prefix = f"{field.name}-"
for key, value in form.multi_items():
if not key.startswith(prefix):
continue
idx_str, _, sub = key[len(prefix):].partition("-")
if not sub or not idx_str.isdigit():
continue
rows.setdefault(int(idx_str), {})[sub] = value
out: list[dict] = []
for idx in sorted(rows):
row: dict = {}
for sub, raw in rows[idx].items():
val = (raw or "").strip()
if sub_widgets.get(sub) == "number":
if val == "":
continue # -> use model default (e.g. cadence_s=None)
try:
row[sub] = float(val) if "." in val or "e" in val.lower() else int(val)
except ValueError:
row[sub] = val # let Pydantic raise a typed error
else:
row[sub] = val
if any(v != "" for v in row.values()):
out.append(row)
return out
@router.get("/adapters/{name}", response_class=HTMLResponse) @router.get("/adapters/{name}", response_class=HTMLResponse)
async def adapters_edit_form( async def adapters_edit_form(
request: Request, request: Request,
@ -1484,6 +1517,16 @@ async def adapters_edit_form(
except Exception as exc: except Exception as exc:
preview_error = f"Preview unavailable: {exc}" preview_error = f"Preview unavailable: {exc}"
# Read-only API-quota panel (adapters opt in via quota_estimate; duck-typed).
quota: dict | None = None
if adapter_cls is not None and hasattr(adapter_cls, "settings_schema"):
try:
quota = adapter_cls.quota_estimate(
adapter_cls.settings_schema(**settings), row["cadence_s"]
)
except Exception:
quota = None
csrf_token = request.state.csrf_token csrf_token = request.state.csrf_token
response = templates.TemplateResponse( response = templates.TemplateResponse(
request=request, request=request,
@ -1502,6 +1545,7 @@ async def adapters_edit_form(
"requires_api_key_alias": requires_api_key_alias, "requires_api_key_alias": requires_api_key_alias,
"preview_rows": preview_rows, "preview_rows": preview_rows,
"preview_error": preview_error, "preview_error": preview_error,
"quota": quota,
}, },
) )
return response return response
@ -1613,6 +1657,10 @@ async def adapters_edit_submit(
# API key select - validate against existing keys # API key select - validate against existing keys
value = raw.strip() if raw else None value = raw.strip() if raw else None
parsed_values[field.name] = value parsed_values[field.name] = value
elif field.widget == "model_list":
rows = _parse_model_list(form, field)
form_data[field.name] = rows
parsed_values[field.name] = rows
elif field.widget == "region": elif field.widget == "region":
# Region handled separately below # Region handled separately below
pass pass
@ -1661,10 +1709,24 @@ async def adapters_edit_submit(
validated_data = {k: v for k, v in parsed_values.items() if v is not None} validated_data = {k: v for k, v in parsed_values.items() if v is not None}
validated = schema(**validated_data) validated = schema(**validated_data)
new_settings = validated.model_dump(mode="json") new_settings = validated.model_dump(mode="json")
# Hard-block a save that would blow the provider free tier.
q = adapter_cls.quota_estimate(validated, cadence_s)
if q and q.get("blocked"):
ml = next((f.name for f in fields if f.widget == "model_list"), "quota")
errors[ml] = (
f"Estimated {q['calls_per_month']:,} calls/month exceeds the "
f"{q['cap']:,}/month free-tier cap — raise cadence or remove rows."
)
except ValidationError as e: except ValidationError as e:
ml_name = next((f.name for f in fields if f.widget == "model_list"), None)
for err in e.errors(): for err in e.errors():
field_name = err["loc"][0] if err["loc"] else "unknown" loc = err["loc"]
errors[str(field_name)] = err["msg"] key = str(loc[0]) if loc else (ml_name or "unknown")
if len(loc) >= 2 and isinstance(loc[1], int):
errors[key] = f"Row {loc[1] + 1}: {err['msg']}"
else:
errors[key] = err["msg"]
else: else:
# No schema - just preserve existing settings # No schema - just preserve existing settings
new_settings = dict(current_settings) new_settings = dict(current_settings)
@ -1709,6 +1771,18 @@ async def adapters_edit_submit(
) )
api_key_missing = not has_key api_key_missing = not has_key
quota = None
if adapter_cls and hasattr(adapter_cls, "settings_schema"):
try:
quota = adapter_cls.quota_estimate(
adapter_cls.settings_schema(**(new_settings or current_settings)),
cadence_s,
)
except Exception:
quota = None
# list-of-model validation failures are a 422; single-region stays 200.
status = 422 if any(f.widget == "model_list" for f in fields) else 200
csrf_token = request.state.csrf_token csrf_token = request.state.csrf_token
response = templates.TemplateResponse( response = templates.TemplateResponse(
request=request, request=request,
@ -1725,8 +1799,9 @@ async def adapters_edit_submit(
"tile_attribution": tile_attribution, "tile_attribution": tile_attribution,
"api_key_missing": api_key_missing, "api_key_missing": api_key_missing,
"requires_api_key_alias": requires_api_key_alias, "requires_api_key_alias": requires_api_key_alias,
"quota": quota,
}, },
status_code=200, status_code=status,
) )
return response return response

View file

@ -0,0 +1,97 @@
{# Generic repeatable-row editor for a list[<BaseModel>] settings field.
Driven entirely by field.sub_fields — no adapter-name branching.
Vanilla IIFE JS, matching _region_picker.html. #}
{% set rows = (form_data[field.name] if form_data and field.name in form_data else field.current_value) or [] %}
<div class="model-list" data-field="{{ field.name }}">
<label>{{ field.label }}</label>
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
{% if errors and errors[field.name] %}
<small class="field-error">{{ errors[field.name] }}</small>
{% endif %}
<div class="table-wrap"><table class="model-list-table">
<thead><tr>
{% for sub in field.sub_fields %}<th>{{ sub.label }}</th>{% endfor %}<th></th>
</tr></thead>
<tbody class="model-list-body">
{% for row in rows %}{% set ridx = loop.index0 %}
<tr class="model-list-row">
{% for sub in field.sub_fields %}
<td>
{% if sub.widget == "select" %}
<select data-sub="{{ sub.name }}" name="{{ field.name }}-{{ ridx }}-{{ sub.name }}">
{% for opt in sub.options %}<option value="{{ opt }}" {% if row.get(sub.name)|string == opt %}selected{% endif %}>{{ opt }}</option>{% endfor %}
</select>
{% elif sub.widget == "number" %}
<input type="number" step="any" data-sub="{{ sub.name }}"
name="{{ field.name }}-{{ ridx }}-{{ sub.name }}"
value="{{ row.get(sub.name) if row.get(sub.name) is not none else '' }}">
{% else %}
<input type="text" data-sub="{{ sub.name }}"
name="{{ field.name }}-{{ ridx }}-{{ sub.name }}"
value="{{ row.get(sub.name) if row.get(sub.name) is not none else '' }}">
{% endif %}
</td>
{% endfor %}
<td><button type="button" class="btn-outline model-list-del">Remove</button></td>
</tr>
{% endfor %}
</tbody>
</table></div>
<button type="button" class="btn-secondary model-list-add">+ Add row</button>
<template class="model-list-template"><tr class="model-list-row">
{% for sub in field.sub_fields %}
<td>
{% if sub.widget == "select" %}
<select data-sub="{{ sub.name }}">{% for opt in sub.options %}<option value="{{ opt }}">{{ opt }}</option>{% endfor %}</select>
{% elif sub.widget == "number" %}
<input type="number" step="any" data-sub="{{ sub.name }}">
{% else %}
<input type="text" data-sub="{{ sub.name }}">
{% endif %}
</td>
{% endfor %}
<td><button type="button" class="btn-outline model-list-del">Remove</button></td>
</tr></template>
</div>
{% if quota %}
<div class="quota-panel flash {% if quota.blocked %}flash-error{% elif quota.warn %}flash-warn{% endif %}">
<strong>API quota:</strong> {{ quota.detail }}
{% if quota.blocked %}<br>⛔ Over free-tier cap — reduce calls before saving.
{% elif quota.warn %}<br>⚠️ Approaching free-tier cap.{% endif %}
</div>
{% endif %}
<script>
(function() {
document.querySelectorAll('.model-list').forEach(function(container) {
var field = container.dataset.field;
var body = container.querySelector('.model-list-body');
var tmpl = container.querySelector('.model-list-template');
var addBtn = container.querySelector('.model-list-add');
function renumber() {
body.querySelectorAll('.model-list-row').forEach(function(row, i) {
row.querySelectorAll('[data-sub]').forEach(function(el) {
el.name = field + '-' + i + '-' + el.dataset.sub;
});
});
}
function wireDelete(row) {
var del = row.querySelector('.model-list-del');
if (del) del.addEventListener('click', function() { row.remove(); renumber(); });
}
body.querySelectorAll('.model-list-row').forEach(wireDelete);
addBtn.addEventListener('click', function() {
var frag = tmpl.content.cloneNode(true);
var row = frag.querySelector('.model-list-row');
body.appendChild(frag);
wireDelete(row);
renumber();
});
renumber();
});
})();
</script>

View file

@ -155,6 +155,8 @@
{% if errors and errors[field.name] %} {% if errors and errors[field.name] %}
<small class="field-error">{{ errors[field.name] }}</small> <small class="field-error">{{ errors[field.name] }}</small>
{% endif %} {% endif %}
{% elif field.widget == "model_list" %}
{% include "_partials/model_list.html" %}
{% endif %} {% endif %}
{% endfor %} {% endfor %}
</fieldset> </fieldset>

View file

@ -0,0 +1,192 @@
"""v0.9.9 — adapter edit form: generic model_list editor (fixes the
describe_fields list[BaseModel] 500 across 4 adapters), TomTom bbox validation,
and quota gating."""
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from starlette.datastructures import FormData
from starlette.requests import Request
from central.gui import templates as gui_templates
from central.gui.form_descriptors import describe_fields
from central.gui.routes import adapters_edit_form, adapters_edit_submit
from central.adapters.tomtom_incidents import (
TomTomIncidentsAdapter, TomTomIncidentsSettings, TOMTOM_FREE_TIER_CALLS_PER_MONTH,
)
_BBOXES = [
{"name": "treasure_valley_ext", "min_lon": -117.05, "min_lat": 43.30,
"max_lon": -116.00, "max_lat": 44.30, "state_code": "ID", "cadence_s": 3600},
{"name": "mountain_home_corridor", "min_lon": -116.00, "min_lat": 42.65,
"max_lon": -114.85, "max_lat": 43.40, "state_code": "ID", "cadence_s": 5400},
{"name": "magic_valley_burley", "min_lon": -114.85, "min_lat": 42.30,
"max_lon": -113.40, "max_lat": 42.95, "state_code": "ID", "cadence_s": 3600},
]
_SETTINGS = {"bboxes": _BBOXES, "api_key_alias": "tomtom"}
# A small (~2.3k km^2) valid box used as the base for POST tests.
_SMALL = {"min_lon": "-117.0", "min_lat": "43.0", "max_lon": "-116.5", "max_lat": "43.5"}
def _render(name, ctx):
req = Request({"type": "http", "method": "GET", "path": "/", "headers": [], "query_string": b""})
return gui_templates.TemplateResponse(request=req, name=name, context=ctx).body.decode()
def _ctx(settings, fields, quota, name="tomtom_incidents", display="TomTom"):
return {
"operator": SimpleNamespace(username="admin"), "csrf_token": "x",
"adapter": {"name": name, "display_name": display, "description": "", "enabled": True,
"cadence_s": 1800, "settings": settings, "paused_at": None,
"updated_at": None, "last_error": None},
"fields": fields, "api_keys": [{"alias": "tomtom"}], "errors": None, "form_data": None,
"tile_url": "https://t/{z}/{x}/{y}.png", "tile_attribution": "OSM",
"api_key_missing": False, "requires_api_key_alias": "tomtom",
"preview_rows": None, "preview_error": None, "quota": quota,
}
class TestModelListRender:
def test_tomtom_renders_rows_and_quota(self):
fields = describe_fields(TomTomIncidentsAdapter.settings_schema, _SETTINGS)
quota = TomTomIncidentsAdapter.quota_estimate(TomTomIncidentsSettings(**_SETTINGS), 1800)
out = _render("adapters_edit.html", _ctx(_SETTINGS, fields, quota))
assert "treasure_valley_ext" in out and "magic_valley_burley" in out
assert "model-list" in out and 'name="bboxes-0-name"' in out
assert "free tier" in out # quota panel rendered
def test_nws_region_intact_no_model_list(self):
from central.adapters.nws import NWSSettings
s = {"contact_email": "a@b.com",
"region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}}
out = _render("adapters_edit.html",
_ctx(s, describe_fields(NWSSettings, s), None, name="nws", display="NWS"))
assert "region-map" in out # region picker intact
assert "model-list" not in out and "free tier" not in out
def test_tomtom_flow_renders_tiles_editor(self):
from central.adapters.tomtom_flow import TomTomFlowAdapter
s = {"tiles": [{"z": 10, "x": 181, "y": 373}], "api_key_alias": "tomtom"}
out = _render("adapters_edit.html",
_ctx(s, describe_fields(TomTomFlowAdapter.settings_schema, s), None,
name="tomtom_flow", display="Flow"))
assert "model-list" in out and 'name="tiles-0-z"' in out
class TestDescribeFieldsModelList:
def test_bbox_is_model_list_with_subfields(self):
bf = next(f for f in describe_fields(TomTomIncidentsSettings, _SETTINGS) if f.name == "bboxes")
assert bf.widget == "model_list"
assert [s.name for s in bf.sub_fields] == \
["name", "min_lon", "min_lat", "max_lon", "max_lat", "state_code", "cadence_s"]
assert bf.current_value[0]["name"] == "treasure_valley_ext"
def _post_req(pairs, cadence_s="1800"):
req = MagicMock()
req.state.operator = SimpleNamespace(id=1, username="admin")
req.state.csrf_token = "x"
req.form = AsyncMock(return_value=FormData(
[("csrf_token", "x"), ("enabled", "on"), ("cadence_s", cadence_s)] + pairs))
return req
def _pool(settings=_SETTINGS):
conn = AsyncMock()
conn.fetchrow.side_effect = [
{"name": "tomtom_incidents", "enabled": True, "cadence_s": 1800, "settings": settings,
"paused_at": None, "updated_at": None, "last_error": None},
{"map_tile_url": None, "map_attribution": None},
]
conn.fetch.return_value = [{"alias": "tomtom"}]
pool = MagicMock()
pool.acquire.return_value.__aenter__ = AsyncMock(return_value=conn)
pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
return pool, conn
def _bbox_pairs(i, name, state="ID", cadence=None, **coords):
c = {**_SMALL, **coords}
p = [(f"bboxes-{i}-name", name), (f"bboxes-{i}-state_code", state),
(f"bboxes-{i}-min_lon", c["min_lon"]), (f"bboxes-{i}-min_lat", c["min_lat"]),
(f"bboxes-{i}-max_lon", c["max_lon"]), (f"bboxes-{i}-max_lat", c["max_lat"])]
if cadence is not None:
p.append((f"bboxes-{i}-cadence_s", cadence))
return p
async def _submit_expect_422(pairs, cadence_s="1800"):
pool, _ = _pool()
tmpl = MagicMock(); tmpl.TemplateResponse.return_value = MagicMock()
with patch("central.gui.routes._get_templates", return_value=tmpl), \
patch("central.gui.routes.get_pool", return_value=pool), \
patch("central.gui.routes.adapter_has_resolved_api_key",
new=AsyncMock(return_value=(True, "tomtom"))):
await adapters_edit_submit(_post_req(pairs + [("api_key_alias", "tomtom")], cadence_s),
"tomtom_incidents")
ca = tmpl.TemplateResponse.call_args
assert ca.kwargs["status_code"] == 422
return ca.kwargs["context"]["errors"]
class TestBboxValidation:
@pytest.mark.asyncio
async def test_valid_persists(self):
pool, conn = _pool()
captured = {}
async def cap(q, *a):
if "UPDATE config.adapters" in q:
captured["s"] = a[2]
conn.execute.side_effect = cap
pairs = _bbox_pairs(0, "metro", cadence="3600") + [("api_key_alias", "tomtom")]
with patch("central.gui.routes.get_pool", return_value=pool), \
patch("central.gui.routes.write_audit", new=AsyncMock()):
res = await adapters_edit_submit(_post_req(pairs), "tomtom_incidents")
assert res.status_code == 302
assert captured["s"]["bboxes"][0]["name"] == "metro"
@pytest.mark.asyncio
async def test_oversized_422(self):
errs = await _submit_expect_422(
_bbox_pairs(0, "big", min_lon="-120", min_lat="30", max_lon="-100", max_lat="45"))
assert "bboxes" in errs
@pytest.mark.asyncio
async def test_degenerate_422(self):
errs = await _submit_expect_422(_bbox_pairs(0, "bad", min_lon="-100", max_lon="-110"))
assert "bboxes" in errs
@pytest.mark.asyncio
async def test_duplicate_names_422(self):
errs = await _submit_expect_422(_bbox_pairs(0, "dup") + _bbox_pairs(1, "dup"))
assert "bboxes" in errs
@pytest.mark.asyncio
async def test_cadence_floor_422(self):
errs = await _submit_expect_422(_bbox_pairs(0, "metro", cadence="30"))
assert "bboxes" in errs
@pytest.mark.asyncio
async def test_quota_block_422(self):
# top cadence 300, one bbox cadence 60 -> 8,640/mo > 2,500 cap
errs = await _submit_expect_422(_bbox_pairs(0, "metro", cadence="60"), cadence_s="300")
assert "bboxes" in errs and "free-tier" in errs["bboxes"]
class TestGetRoute:
@pytest.mark.asyncio
async def test_get_200_with_quota_in_context(self):
pool, _ = _pool()
tmpl = MagicMock(); tmpl.TemplateResponse.return_value = MagicMock(status_code=200)
req = MagicMock()
req.state.operator = SimpleNamespace(id=1, username="admin")
req.state.csrf_token = "x"
with patch("central.gui.routes._get_templates", return_value=tmpl), \
patch("central.gui.routes.get_pool", return_value=pool), \
patch("central.gui.routes.adapter_has_resolved_api_key",
new=AsyncMock(return_value=(True, "tomtom"))):
await adapters_edit_form(req, "tomtom_incidents")
ctx = tmpl.TemplateResponse.call_args.kwargs["context"]
assert ctx["quota"]["calls_per_month"] == 1920 # 720+480+720
assert ctx["quota"]["blocked"] is False

View file

@ -98,7 +98,7 @@ def test_subject_for_idaho(adapter):
def test_subject_unknown(adapter): def test_subject_unknown(adapter):
e = adapter._build_event(INC[0], BBox(name="x", min_lon=0, min_lat=0, max_lon=1, max_lat=1, state_code="")) e = adapter._build_event(INC[0], BBox(name="x", min_lon=0, min_lat=0, max_lon=0.5, max_lat=0.5, state_code=""))
assert adapter.subject_for(e) == "central.traffic.incident.unknown" assert adapter.subject_for(e) == "central.traffic.incident.unknown"
@ -143,7 +143,7 @@ def test_inherits_dedup_mixin():
# --- v0.9.5.1 per-bbox cadence ----------------------------------------------- # --- v0.9.5.1 per-bbox cadence -----------------------------------------------
def _b(name, cadence_s=None): def _b(name, cadence_s=None):
return BBox(name=name, min_lon=0, min_lat=0, max_lon=1, max_lat=1, return BBox(name=name, min_lon=0, min_lat=0, max_lon=0.5, max_lat=0.5,
state_code="ID", cadence_s=cadence_s) state_code="ID", cadence_s=cadence_s)