mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
fix(gui): generic model_list editor for list-of-model adapters + TomTom bbox validation & quota (v0.9.9)
Fixes a shared form_descriptors 500 (NotImplementedError: unsupported list type) that broke the Edit page for ALL FOUR adapters whose settings carry a list[<BaseModel>] field: tomtom_incidents, tomtom_flow, state_511_atis, state_511_atis_cameras. - form_descriptors: list[BaseModel] -> generic "model_list" widget with recursive per-column sub_field descriptors. - New _partials/model_list.html: vanilla-JS repeatable-row editor (add/remove/renumber), driven entirely by sub_fields (no adapter-name branching). Single-region edit pages render byte-identically. - TomTom: BBox/Settings Pydantic validators (10,000 km^2 cap, coord ranges, min<max, cadence_s>=60, unique names) as the single source of truth (enforced at supervisor load AND GUI POST). Duck-typed quota_estimate hook + read-only quota panel; POST hard-blocks estimates over the 2,500/mo free tier (422). TOMTOM_FREE_TIER_CALLS_PER_MONTH is a tunable for paid tiers. - routes: model_list form parse, row-aware ValidationError messages, 422 for model_list failures (single-region region errors still re-render at 200). - tests: 11 new (real-Jinja render across 3 adapters + byte-identical nws no-regression guard, POST persist + oversized/degenerate/duplicate/cadence/ quota 422 matrix, quota estimate). Full suite 848 passed, 1 skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
00a450b22f
commit
7a5092c77f
8 changed files with 483 additions and 6 deletions
|
|
@ -185,3 +185,17 @@ class SourceAdapter(ABC):
|
|||
matched zero rows — the framework renders that distinctly from None.
|
||||
"""
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None:
|
||||
"""Optional. Override to surface a read-only API-quota panel on the edit
|
||||
page and to hard-block saves that would exceed a provider's free tier.
|
||||
|
||||
Pure function of (settings, cadence_s) — no I/O, no instance state — so
|
||||
the framework calls it on the class in both the GET (render) and POST
|
||||
(pre-persist gate) paths. Return None to skip (default).
|
||||
|
||||
Return dict with: calls_per_month:int, cap:int, percent:float,
|
||||
warn:bool (>=80%), blocked:bool (>=100% -> POST refused 422), detail:str.
|
||||
"""
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ Dedup is inherited from SourceAdapter; ids use the upstream-stable TomTom id.
|
|||
|
||||
import asyncio
|
||||
import logging
|
||||
import math
|
||||
import sqlite3
|
||||
from collections.abc import AsyncIterator
|
||||
from datetime import datetime, timezone
|
||||
|
|
@ -19,7 +20,7 @@ from pathlib import Path
|
|||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, field_validator, model_validator
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
|
|
@ -52,6 +53,21 @@ _DEDUP_DDL = (
|
|||
"PRIMARY KEY (adapter, event_id))"
|
||||
)
|
||||
|
||||
_MAX_BBOX_AREA_KM2 = 10_000.0 # TomTom incidentDetails hard cap per bbox
|
||||
_MIN_BBOX_CADENCE_S = 60 # per-bbox poll-interval floor
|
||||
_EARTH_RADIUS_KM = 6371.0
|
||||
_SECONDS_PER_MONTH = 30 * 24 * 3600 # 2_592_000, for quota estimation
|
||||
# TomTom Orbis free tier: 2,500 incidentDetails calls/month. A paid-tier
|
||||
# operator can raise this ceiling (v0.9.9 follow-up hook).
|
||||
TOMTOM_FREE_TIER_CALLS_PER_MONTH = 2500
|
||||
|
||||
|
||||
def _bbox_area_km2(min_lon: float, min_lat: float, max_lon: float, max_lat: float) -> float:
|
||||
"""Spherical area of a lon/lat bbox in km^2."""
|
||||
lat1, lat2 = math.radians(min_lat), math.radians(max_lat)
|
||||
dlon = math.radians(max_lon - min_lon)
|
||||
return _EARTH_RADIUS_KM ** 2 * abs(math.sin(lat2) - math.sin(lat1)) * abs(dlon)
|
||||
|
||||
|
||||
def _parse_iso(value: str | None) -> datetime | None:
|
||||
if not value:
|
||||
|
|
@ -85,6 +101,26 @@ class BBox(BaseModel):
|
|||
state_code: str
|
||||
cadence_s: int | None = None # per-bbox poll interval; None -> adapter default_cadence_s
|
||||
|
||||
@field_validator("cadence_s")
|
||||
@classmethod
|
||||
def _cadence_floor(cls, v: int | None) -> int | None:
|
||||
if v is not None and v < _MIN_BBOX_CADENCE_S:
|
||||
raise ValueError(f"cadence_s must be >= {_MIN_BBOX_CADENCE_S} seconds")
|
||||
return v
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_box(self) -> "BBox":
|
||||
if not (-180.0 <= self.min_lon < self.max_lon <= 180.0):
|
||||
raise ValueError("require -180 <= min_lon < max_lon <= 180")
|
||||
if not (-90.0 <= self.min_lat < self.max_lat <= 90.0):
|
||||
raise ValueError("require -90 <= min_lat < max_lat <= 90")
|
||||
area = _bbox_area_km2(self.min_lon, self.min_lat, self.max_lon, self.max_lat)
|
||||
if area > _MAX_BBOX_AREA_KM2:
|
||||
raise ValueError(
|
||||
f"bbox area {area:.0f} km^2 exceeds TomTom limit of {_MAX_BBOX_AREA_KM2:.0f} km^2"
|
||||
)
|
||||
return self
|
||||
|
||||
|
||||
class TomTomIncidentsSettings(BaseModel):
|
||||
"""bboxes: metro boxes to poll (each <= 10,000 km^2). api_key_alias: config key."""
|
||||
|
|
@ -92,6 +128,14 @@ class TomTomIncidentsSettings(BaseModel):
|
|||
bboxes: list[BBox] = []
|
||||
api_key_alias: str = "tomtom"
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _unique_names(self) -> "TomTomIncidentsSettings":
|
||||
names = [b.name for b in self.bboxes]
|
||||
dupes = sorted({n for n in names if names.count(n) > 1})
|
||||
if dupes:
|
||||
raise ValueError(f"duplicate bbox names: {', '.join(dupes)}")
|
||||
return self
|
||||
|
||||
|
||||
class TomTomIncidentsAdapter(SourceAdapter):
|
||||
"""TomTom Orbis incidentDetails adapter (per-bbox real-time incidents)."""
|
||||
|
|
@ -273,3 +317,27 @@ class TomTomIncidentsAdapter(SourceAdapter):
|
|||
def subject_for(self, event: Event) -> str:
|
||||
code = (event.data.get("state_code") or "").lower() or "unknown"
|
||||
return f"central.traffic.incident.{code}"
|
||||
|
||||
@classmethod
|
||||
def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None:
|
||||
bboxes = getattr(settings, "bboxes", None) or []
|
||||
if not bboxes:
|
||||
return None
|
||||
calls = sum(
|
||||
_SECONDS_PER_MONTH / max(cadence_s, b.cadence_s or cls.default_cadence_s)
|
||||
for b in bboxes
|
||||
)
|
||||
calls_per_month = round(calls)
|
||||
cap = TOMTOM_FREE_TIER_CALLS_PER_MONTH
|
||||
percent = (calls_per_month / cap * 100) if cap else 0.0
|
||||
return {
|
||||
"calls_per_month": calls_per_month,
|
||||
"cap": cap,
|
||||
"percent": percent,
|
||||
"warn": percent >= 80.0,
|
||||
"blocked": percent >= 100.0,
|
||||
"detail": (
|
||||
f"{calls_per_month:,} est. calls/month across {len(bboxes)} "
|
||||
f"bbox(es) vs {cap:,}/month free tier ({percent:.0f}%)"
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ class FieldDescriptor:
|
|||
description: str
|
||||
required: bool
|
||||
options: list[str] | None = None # For select/checkboxes widgets
|
||||
sub_fields: list["FieldDescriptor"] | None = None # For model_list: per-column descriptors
|
||||
|
||||
|
||||
def _is_literal(tp: type) -> bool:
|
||||
|
|
@ -86,6 +87,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
|
|||
if inner_type is str:
|
||||
return "csv", None
|
||||
|
||||
# list[<BaseModel>] -> repeatable per-row editor (sub-columns resolved
|
||||
# by describe_fields, which recurses into the row model).
|
||||
if isinstance(inner_type, type) and issubclass(inner_type, BaseModel):
|
||||
return "model_list", None
|
||||
|
||||
raise NotImplementedError(
|
||||
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
|
||||
)
|
||||
|
|
@ -118,6 +124,21 @@ def _is_undefined(value: Any) -> bool:
|
|||
return value is PydanticUndefined
|
||||
|
||||
|
||||
def _list_row_model(field_type: type) -> type[BaseModel] | None:
|
||||
"""Row model M for a ``list[M]`` (or ``Optional[list[M]]``) field where M is
|
||||
a BaseModel subclass; else None."""
|
||||
origin = get_origin(field_type)
|
||||
args = get_args(field_type)
|
||||
if origin is Union or (origin is not None and type(None) in args):
|
||||
non_none = [a for a in args if a is not type(None)]
|
||||
return _list_row_model(non_none[0]) if non_none else None
|
||||
if origin is list:
|
||||
inner = args[0] if args else None
|
||||
if isinstance(inner, type) and issubclass(inner, BaseModel):
|
||||
return inner
|
||||
return None
|
||||
|
||||
|
||||
def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDescriptor]:
|
||||
"""Generate field descriptors for a Pydantic model.
|
||||
|
||||
|
|
@ -137,6 +158,13 @@ def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDesc
|
|||
# Determine widget and options
|
||||
widget, options = _type_to_widget_and_options(field_name, field_type)
|
||||
|
||||
# For a list-of-model field, recurse once to get column descriptors.
|
||||
sub_fields = None
|
||||
if widget == "model_list":
|
||||
row_model = _list_row_model(field_type)
|
||||
if row_model is not None:
|
||||
sub_fields = describe_fields(row_model, {})
|
||||
|
||||
# Get current value, falling back to default
|
||||
if field_name in current:
|
||||
current_value = current[field_name]
|
||||
|
|
@ -165,6 +193,7 @@ def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDesc
|
|||
description=description,
|
||||
required=required,
|
||||
options=options,
|
||||
sub_fields=sub_fields,
|
||||
))
|
||||
|
||||
return descriptors
|
||||
|
|
|
|||
|
|
@ -1392,6 +1392,39 @@ async def adapters_list(
|
|||
return response
|
||||
|
||||
|
||||
def _parse_model_list(form, field) -> list[dict]:
|
||||
"""Reconstruct a list[dict] from indexed form keys ``<field>-<i>-<sub>``.
|
||||
Coerces numeric sub-fields; omits blank optional numbers (model default
|
||||
applies); drops fully-empty rows (e.g. a blank cloned template row)."""
|
||||
sub_widgets = {sf.name: sf.widget for sf in (field.sub_fields or [])}
|
||||
rows: dict[int, dict] = {}
|
||||
prefix = f"{field.name}-"
|
||||
for key, value in form.multi_items():
|
||||
if not key.startswith(prefix):
|
||||
continue
|
||||
idx_str, _, sub = key[len(prefix):].partition("-")
|
||||
if not sub or not idx_str.isdigit():
|
||||
continue
|
||||
rows.setdefault(int(idx_str), {})[sub] = value
|
||||
out: list[dict] = []
|
||||
for idx in sorted(rows):
|
||||
row: dict = {}
|
||||
for sub, raw in rows[idx].items():
|
||||
val = (raw or "").strip()
|
||||
if sub_widgets.get(sub) == "number":
|
||||
if val == "":
|
||||
continue # -> use model default (e.g. cadence_s=None)
|
||||
try:
|
||||
row[sub] = float(val) if "." in val or "e" in val.lower() else int(val)
|
||||
except ValueError:
|
||||
row[sub] = val # let Pydantic raise a typed error
|
||||
else:
|
||||
row[sub] = val
|
||||
if any(v != "" for v in row.values()):
|
||||
out.append(row)
|
||||
return out
|
||||
|
||||
|
||||
@router.get("/adapters/{name}", response_class=HTMLResponse)
|
||||
async def adapters_edit_form(
|
||||
request: Request,
|
||||
|
|
@ -1484,6 +1517,16 @@ async def adapters_edit_form(
|
|||
except Exception as exc:
|
||||
preview_error = f"Preview unavailable: {exc}"
|
||||
|
||||
# Read-only API-quota panel (adapters opt in via quota_estimate; duck-typed).
|
||||
quota: dict | None = None
|
||||
if adapter_cls is not None and hasattr(adapter_cls, "settings_schema"):
|
||||
try:
|
||||
quota = adapter_cls.quota_estimate(
|
||||
adapter_cls.settings_schema(**settings), row["cadence_s"]
|
||||
)
|
||||
except Exception:
|
||||
quota = None
|
||||
|
||||
csrf_token = request.state.csrf_token
|
||||
response = templates.TemplateResponse(
|
||||
request=request,
|
||||
|
|
@ -1502,6 +1545,7 @@ async def adapters_edit_form(
|
|||
"requires_api_key_alias": requires_api_key_alias,
|
||||
"preview_rows": preview_rows,
|
||||
"preview_error": preview_error,
|
||||
"quota": quota,
|
||||
},
|
||||
)
|
||||
return response
|
||||
|
|
@ -1613,6 +1657,10 @@ async def adapters_edit_submit(
|
|||
# API key select - validate against existing keys
|
||||
value = raw.strip() if raw else None
|
||||
parsed_values[field.name] = value
|
||||
elif field.widget == "model_list":
|
||||
rows = _parse_model_list(form, field)
|
||||
form_data[field.name] = rows
|
||||
parsed_values[field.name] = rows
|
||||
elif field.widget == "region":
|
||||
# Region handled separately below
|
||||
pass
|
||||
|
|
@ -1661,10 +1709,24 @@ async def adapters_edit_submit(
|
|||
validated_data = {k: v for k, v in parsed_values.items() if v is not None}
|
||||
validated = schema(**validated_data)
|
||||
new_settings = validated.model_dump(mode="json")
|
||||
|
||||
# Hard-block a save that would blow the provider free tier.
|
||||
q = adapter_cls.quota_estimate(validated, cadence_s)
|
||||
if q and q.get("blocked"):
|
||||
ml = next((f.name for f in fields if f.widget == "model_list"), "quota")
|
||||
errors[ml] = (
|
||||
f"Estimated {q['calls_per_month']:,} calls/month exceeds the "
|
||||
f"{q['cap']:,}/month free-tier cap — raise cadence or remove rows."
|
||||
)
|
||||
except ValidationError as e:
|
||||
ml_name = next((f.name for f in fields if f.widget == "model_list"), None)
|
||||
for err in e.errors():
|
||||
field_name = err["loc"][0] if err["loc"] else "unknown"
|
||||
errors[str(field_name)] = err["msg"]
|
||||
loc = err["loc"]
|
||||
key = str(loc[0]) if loc else (ml_name or "unknown")
|
||||
if len(loc) >= 2 and isinstance(loc[1], int):
|
||||
errors[key] = f"Row {loc[1] + 1}: {err['msg']}"
|
||||
else:
|
||||
errors[key] = err["msg"]
|
||||
else:
|
||||
# No schema - just preserve existing settings
|
||||
new_settings = dict(current_settings)
|
||||
|
|
@ -1709,6 +1771,18 @@ async def adapters_edit_submit(
|
|||
)
|
||||
api_key_missing = not has_key
|
||||
|
||||
quota = None
|
||||
if adapter_cls and hasattr(adapter_cls, "settings_schema"):
|
||||
try:
|
||||
quota = adapter_cls.quota_estimate(
|
||||
adapter_cls.settings_schema(**(new_settings or current_settings)),
|
||||
cadence_s,
|
||||
)
|
||||
except Exception:
|
||||
quota = None
|
||||
# list-of-model validation failures are a 422; single-region stays 200.
|
||||
status = 422 if any(f.widget == "model_list" for f in fields) else 200
|
||||
|
||||
csrf_token = request.state.csrf_token
|
||||
response = templates.TemplateResponse(
|
||||
request=request,
|
||||
|
|
@ -1725,8 +1799,9 @@ async def adapters_edit_submit(
|
|||
"tile_attribution": tile_attribution,
|
||||
"api_key_missing": api_key_missing,
|
||||
"requires_api_key_alias": requires_api_key_alias,
|
||||
"quota": quota,
|
||||
},
|
||||
status_code=200,
|
||||
status_code=status,
|
||||
)
|
||||
return response
|
||||
|
||||
|
|
|
|||
97
src/central/gui/templates/_partials/model_list.html
Normal file
97
src/central/gui/templates/_partials/model_list.html
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
{# Generic repeatable-row editor for a list[<BaseModel>] settings field.
|
||||
Driven entirely by field.sub_fields — no adapter-name branching.
|
||||
Vanilla IIFE JS, matching _region_picker.html. #}
|
||||
{% set rows = (form_data[field.name] if form_data and field.name in form_data else field.current_value) or [] %}
|
||||
<div class="model-list" data-field="{{ field.name }}">
|
||||
<label>{{ field.label }}</label>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[field.name] %}
|
||||
<small class="field-error">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
|
||||
<div class="table-wrap"><table class="model-list-table">
|
||||
<thead><tr>
|
||||
{% for sub in field.sub_fields %}<th>{{ sub.label }}</th>{% endfor %}<th></th>
|
||||
</tr></thead>
|
||||
<tbody class="model-list-body">
|
||||
{% for row in rows %}{% set ridx = loop.index0 %}
|
||||
<tr class="model-list-row">
|
||||
{% for sub in field.sub_fields %}
|
||||
<td>
|
||||
{% if sub.widget == "select" %}
|
||||
<select data-sub="{{ sub.name }}" name="{{ field.name }}-{{ ridx }}-{{ sub.name }}">
|
||||
{% for opt in sub.options %}<option value="{{ opt }}" {% if row.get(sub.name)|string == opt %}selected{% endif %}>{{ opt }}</option>{% endfor %}
|
||||
</select>
|
||||
{% elif sub.widget == "number" %}
|
||||
<input type="number" step="any" data-sub="{{ sub.name }}"
|
||||
name="{{ field.name }}-{{ ridx }}-{{ sub.name }}"
|
||||
value="{{ row.get(sub.name) if row.get(sub.name) is not none else '' }}">
|
||||
{% else %}
|
||||
<input type="text" data-sub="{{ sub.name }}"
|
||||
name="{{ field.name }}-{{ ridx }}-{{ sub.name }}"
|
||||
value="{{ row.get(sub.name) if row.get(sub.name) is not none else '' }}">
|
||||
{% endif %}
|
||||
</td>
|
||||
{% endfor %}
|
||||
<td><button type="button" class="btn-outline model-list-del">Remove</button></td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table></div>
|
||||
|
||||
<button type="button" class="btn-secondary model-list-add">+ Add row</button>
|
||||
|
||||
<template class="model-list-template"><tr class="model-list-row">
|
||||
{% for sub in field.sub_fields %}
|
||||
<td>
|
||||
{% if sub.widget == "select" %}
|
||||
<select data-sub="{{ sub.name }}">{% for opt in sub.options %}<option value="{{ opt }}">{{ opt }}</option>{% endfor %}</select>
|
||||
{% elif sub.widget == "number" %}
|
||||
<input type="number" step="any" data-sub="{{ sub.name }}">
|
||||
{% else %}
|
||||
<input type="text" data-sub="{{ sub.name }}">
|
||||
{% endif %}
|
||||
</td>
|
||||
{% endfor %}
|
||||
<td><button type="button" class="btn-outline model-list-del">Remove</button></td>
|
||||
</tr></template>
|
||||
</div>
|
||||
|
||||
{% if quota %}
|
||||
<div class="quota-panel flash {% if quota.blocked %}flash-error{% elif quota.warn %}flash-warn{% endif %}">
|
||||
<strong>API quota:</strong> {{ quota.detail }}
|
||||
{% if quota.blocked %}<br>⛔ Over free-tier cap — reduce calls before saving.
|
||||
{% elif quota.warn %}<br>⚠️ Approaching free-tier cap.{% endif %}
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<script>
|
||||
(function() {
|
||||
document.querySelectorAll('.model-list').forEach(function(container) {
|
||||
var field = container.dataset.field;
|
||||
var body = container.querySelector('.model-list-body');
|
||||
var tmpl = container.querySelector('.model-list-template');
|
||||
var addBtn = container.querySelector('.model-list-add');
|
||||
function renumber() {
|
||||
body.querySelectorAll('.model-list-row').forEach(function(row, i) {
|
||||
row.querySelectorAll('[data-sub]').forEach(function(el) {
|
||||
el.name = field + '-' + i + '-' + el.dataset.sub;
|
||||
});
|
||||
});
|
||||
}
|
||||
function wireDelete(row) {
|
||||
var del = row.querySelector('.model-list-del');
|
||||
if (del) del.addEventListener('click', function() { row.remove(); renumber(); });
|
||||
}
|
||||
body.querySelectorAll('.model-list-row').forEach(wireDelete);
|
||||
addBtn.addEventListener('click', function() {
|
||||
var frag = tmpl.content.cloneNode(true);
|
||||
var row = frag.querySelector('.model-list-row');
|
||||
body.appendChild(frag);
|
||||
wireDelete(row);
|
||||
renumber();
|
||||
});
|
||||
renumber();
|
||||
});
|
||||
})();
|
||||
</script>
|
||||
|
|
@ -155,6 +155,8 @@
|
|||
{% if errors and errors[field.name] %}
|
||||
<small class="field-error">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "model_list" %}
|
||||
{% include "_partials/model_list.html" %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</fieldset>
|
||||
|
|
|
|||
192
tests/test_gui_adapter_edit.py
Normal file
192
tests/test_gui_adapter_edit.py
Normal file
|
|
@ -0,0 +1,192 @@
|
|||
"""v0.9.9 — adapter edit form: generic model_list editor (fixes the
|
||||
describe_fields list[BaseModel] 500 across 4 adapters), TomTom bbox validation,
|
||||
and quota gating."""
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from starlette.datastructures import FormData
|
||||
from starlette.requests import Request
|
||||
|
||||
from central.gui import templates as gui_templates
|
||||
from central.gui.form_descriptors import describe_fields
|
||||
from central.gui.routes import adapters_edit_form, adapters_edit_submit
|
||||
from central.adapters.tomtom_incidents import (
|
||||
TomTomIncidentsAdapter, TomTomIncidentsSettings, TOMTOM_FREE_TIER_CALLS_PER_MONTH,
|
||||
)
|
||||
|
||||
_BBOXES = [
|
||||
{"name": "treasure_valley_ext", "min_lon": -117.05, "min_lat": 43.30,
|
||||
"max_lon": -116.00, "max_lat": 44.30, "state_code": "ID", "cadence_s": 3600},
|
||||
{"name": "mountain_home_corridor", "min_lon": -116.00, "min_lat": 42.65,
|
||||
"max_lon": -114.85, "max_lat": 43.40, "state_code": "ID", "cadence_s": 5400},
|
||||
{"name": "magic_valley_burley", "min_lon": -114.85, "min_lat": 42.30,
|
||||
"max_lon": -113.40, "max_lat": 42.95, "state_code": "ID", "cadence_s": 3600},
|
||||
]
|
||||
_SETTINGS = {"bboxes": _BBOXES, "api_key_alias": "tomtom"}
|
||||
# A small (~2.3k km^2) valid box used as the base for POST tests.
|
||||
_SMALL = {"min_lon": "-117.0", "min_lat": "43.0", "max_lon": "-116.5", "max_lat": "43.5"}
|
||||
|
||||
|
||||
def _render(name, ctx):
|
||||
req = Request({"type": "http", "method": "GET", "path": "/", "headers": [], "query_string": b""})
|
||||
return gui_templates.TemplateResponse(request=req, name=name, context=ctx).body.decode()
|
||||
|
||||
|
||||
def _ctx(settings, fields, quota, name="tomtom_incidents", display="TomTom"):
|
||||
return {
|
||||
"operator": SimpleNamespace(username="admin"), "csrf_token": "x",
|
||||
"adapter": {"name": name, "display_name": display, "description": "", "enabled": True,
|
||||
"cadence_s": 1800, "settings": settings, "paused_at": None,
|
||||
"updated_at": None, "last_error": None},
|
||||
"fields": fields, "api_keys": [{"alias": "tomtom"}], "errors": None, "form_data": None,
|
||||
"tile_url": "https://t/{z}/{x}/{y}.png", "tile_attribution": "OSM",
|
||||
"api_key_missing": False, "requires_api_key_alias": "tomtom",
|
||||
"preview_rows": None, "preview_error": None, "quota": quota,
|
||||
}
|
||||
|
||||
|
||||
class TestModelListRender:
|
||||
def test_tomtom_renders_rows_and_quota(self):
|
||||
fields = describe_fields(TomTomIncidentsAdapter.settings_schema, _SETTINGS)
|
||||
quota = TomTomIncidentsAdapter.quota_estimate(TomTomIncidentsSettings(**_SETTINGS), 1800)
|
||||
out = _render("adapters_edit.html", _ctx(_SETTINGS, fields, quota))
|
||||
assert "treasure_valley_ext" in out and "magic_valley_burley" in out
|
||||
assert "model-list" in out and 'name="bboxes-0-name"' in out
|
||||
assert "free tier" in out # quota panel rendered
|
||||
|
||||
def test_nws_region_intact_no_model_list(self):
|
||||
from central.adapters.nws import NWSSettings
|
||||
s = {"contact_email": "a@b.com",
|
||||
"region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}}
|
||||
out = _render("adapters_edit.html",
|
||||
_ctx(s, describe_fields(NWSSettings, s), None, name="nws", display="NWS"))
|
||||
assert "region-map" in out # region picker intact
|
||||
assert "model-list" not in out and "free tier" not in out
|
||||
|
||||
def test_tomtom_flow_renders_tiles_editor(self):
|
||||
from central.adapters.tomtom_flow import TomTomFlowAdapter
|
||||
s = {"tiles": [{"z": 10, "x": 181, "y": 373}], "api_key_alias": "tomtom"}
|
||||
out = _render("adapters_edit.html",
|
||||
_ctx(s, describe_fields(TomTomFlowAdapter.settings_schema, s), None,
|
||||
name="tomtom_flow", display="Flow"))
|
||||
assert "model-list" in out and 'name="tiles-0-z"' in out
|
||||
|
||||
|
||||
class TestDescribeFieldsModelList:
|
||||
def test_bbox_is_model_list_with_subfields(self):
|
||||
bf = next(f for f in describe_fields(TomTomIncidentsSettings, _SETTINGS) if f.name == "bboxes")
|
||||
assert bf.widget == "model_list"
|
||||
assert [s.name for s in bf.sub_fields] == \
|
||||
["name", "min_lon", "min_lat", "max_lon", "max_lat", "state_code", "cadence_s"]
|
||||
assert bf.current_value[0]["name"] == "treasure_valley_ext"
|
||||
|
||||
|
||||
def _post_req(pairs, cadence_s="1800"):
|
||||
req = MagicMock()
|
||||
req.state.operator = SimpleNamespace(id=1, username="admin")
|
||||
req.state.csrf_token = "x"
|
||||
req.form = AsyncMock(return_value=FormData(
|
||||
[("csrf_token", "x"), ("enabled", "on"), ("cadence_s", cadence_s)] + pairs))
|
||||
return req
|
||||
|
||||
|
||||
def _pool(settings=_SETTINGS):
|
||||
conn = AsyncMock()
|
||||
conn.fetchrow.side_effect = [
|
||||
{"name": "tomtom_incidents", "enabled": True, "cadence_s": 1800, "settings": settings,
|
||||
"paused_at": None, "updated_at": None, "last_error": None},
|
||||
{"map_tile_url": None, "map_attribution": None},
|
||||
]
|
||||
conn.fetch.return_value = [{"alias": "tomtom"}]
|
||||
pool = MagicMock()
|
||||
pool.acquire.return_value.__aenter__ = AsyncMock(return_value=conn)
|
||||
pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
return pool, conn
|
||||
|
||||
|
||||
def _bbox_pairs(i, name, state="ID", cadence=None, **coords):
|
||||
c = {**_SMALL, **coords}
|
||||
p = [(f"bboxes-{i}-name", name), (f"bboxes-{i}-state_code", state),
|
||||
(f"bboxes-{i}-min_lon", c["min_lon"]), (f"bboxes-{i}-min_lat", c["min_lat"]),
|
||||
(f"bboxes-{i}-max_lon", c["max_lon"]), (f"bboxes-{i}-max_lat", c["max_lat"])]
|
||||
if cadence is not None:
|
||||
p.append((f"bboxes-{i}-cadence_s", cadence))
|
||||
return p
|
||||
|
||||
|
||||
async def _submit_expect_422(pairs, cadence_s="1800"):
|
||||
pool, _ = _pool()
|
||||
tmpl = MagicMock(); tmpl.TemplateResponse.return_value = MagicMock()
|
||||
with patch("central.gui.routes._get_templates", return_value=tmpl), \
|
||||
patch("central.gui.routes.get_pool", return_value=pool), \
|
||||
patch("central.gui.routes.adapter_has_resolved_api_key",
|
||||
new=AsyncMock(return_value=(True, "tomtom"))):
|
||||
await adapters_edit_submit(_post_req(pairs + [("api_key_alias", "tomtom")], cadence_s),
|
||||
"tomtom_incidents")
|
||||
ca = tmpl.TemplateResponse.call_args
|
||||
assert ca.kwargs["status_code"] == 422
|
||||
return ca.kwargs["context"]["errors"]
|
||||
|
||||
|
||||
class TestBboxValidation:
|
||||
@pytest.mark.asyncio
|
||||
async def test_valid_persists(self):
|
||||
pool, conn = _pool()
|
||||
captured = {}
|
||||
async def cap(q, *a):
|
||||
if "UPDATE config.adapters" in q:
|
||||
captured["s"] = a[2]
|
||||
conn.execute.side_effect = cap
|
||||
pairs = _bbox_pairs(0, "metro", cadence="3600") + [("api_key_alias", "tomtom")]
|
||||
with patch("central.gui.routes.get_pool", return_value=pool), \
|
||||
patch("central.gui.routes.write_audit", new=AsyncMock()):
|
||||
res = await adapters_edit_submit(_post_req(pairs), "tomtom_incidents")
|
||||
assert res.status_code == 302
|
||||
assert captured["s"]["bboxes"][0]["name"] == "metro"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_oversized_422(self):
|
||||
errs = await _submit_expect_422(
|
||||
_bbox_pairs(0, "big", min_lon="-120", min_lat="30", max_lon="-100", max_lat="45"))
|
||||
assert "bboxes" in errs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_degenerate_422(self):
|
||||
errs = await _submit_expect_422(_bbox_pairs(0, "bad", min_lon="-100", max_lon="-110"))
|
||||
assert "bboxes" in errs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_duplicate_names_422(self):
|
||||
errs = await _submit_expect_422(_bbox_pairs(0, "dup") + _bbox_pairs(1, "dup"))
|
||||
assert "bboxes" in errs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cadence_floor_422(self):
|
||||
errs = await _submit_expect_422(_bbox_pairs(0, "metro", cadence="30"))
|
||||
assert "bboxes" in errs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_quota_block_422(self):
|
||||
# top cadence 300, one bbox cadence 60 -> 8,640/mo > 2,500 cap
|
||||
errs = await _submit_expect_422(_bbox_pairs(0, "metro", cadence="60"), cadence_s="300")
|
||||
assert "bboxes" in errs and "free-tier" in errs["bboxes"]
|
||||
|
||||
|
||||
class TestGetRoute:
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_200_with_quota_in_context(self):
|
||||
pool, _ = _pool()
|
||||
tmpl = MagicMock(); tmpl.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
req = MagicMock()
|
||||
req.state.operator = SimpleNamespace(id=1, username="admin")
|
||||
req.state.csrf_token = "x"
|
||||
with patch("central.gui.routes._get_templates", return_value=tmpl), \
|
||||
patch("central.gui.routes.get_pool", return_value=pool), \
|
||||
patch("central.gui.routes.adapter_has_resolved_api_key",
|
||||
new=AsyncMock(return_value=(True, "tomtom"))):
|
||||
await adapters_edit_form(req, "tomtom_incidents")
|
||||
ctx = tmpl.TemplateResponse.call_args.kwargs["context"]
|
||||
assert ctx["quota"]["calls_per_month"] == 1920 # 720+480+720
|
||||
assert ctx["quota"]["blocked"] is False
|
||||
|
|
@ -98,7 +98,7 @@ def test_subject_for_idaho(adapter):
|
|||
|
||||
|
||||
def test_subject_unknown(adapter):
|
||||
e = adapter._build_event(INC[0], BBox(name="x", min_lon=0, min_lat=0, max_lon=1, max_lat=1, state_code=""))
|
||||
e = adapter._build_event(INC[0], BBox(name="x", min_lon=0, min_lat=0, max_lon=0.5, max_lat=0.5, state_code=""))
|
||||
assert adapter.subject_for(e) == "central.traffic.incident.unknown"
|
||||
|
||||
|
||||
|
|
@ -143,7 +143,7 @@ def test_inherits_dedup_mixin():
|
|||
# --- v0.9.5.1 per-bbox cadence -----------------------------------------------
|
||||
|
||||
def _b(name, cadence_s=None):
|
||||
return BBox(name=name, min_lon=0, min_lat=0, max_lon=1, max_lat=1,
|
||||
return BBox(name=name, min_lon=0, min_lat=0, max_lon=0.5, max_lat=0.5,
|
||||
state_code="ID", cadence_s=cadence_s)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue