feat(gui): generic adapter edit form

Implement Central 2-A2: generic adapter edit form feature.

- Add form_descriptors.py with describe_fields() and FieldDescriptor
  - Maps Pydantic types to HTML widgets (text, number, checkbox, csv, region)
  - Handles Optional types by recursively resolving inner type
  - Uses PydanticUndefined handling for proper default values

- Update routes.py GET/POST handlers:
  - Use cached _adapter_classes() for adapter class lookup
  - Generate field descriptors from adapter settings_schema
  - Parse form values based on widget type in POST handler
  - Validate settings via Pydantic ValidationError

- Update adapters_edit.html template:
  - Render form dynamically from field descriptors
  - Support all widget types (text, number, checkbox, csv, region)
  - Use adapter.display_name and adapter.description from class

- Delete per-adapter templates:
  - adapters_edit_nws.html
  - adapters_edit_firms.html
  - adapters_edit_usgs_quake.html

- Add tests/test_form_descriptors.py with comprehensive coverage
- Update tests/test_adapters.py to include last_error in mock rows
- Update tests/test_region_picker.py to include last_error in mock rows

Adding a new adapter no longer requires GUI template work.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-18 23:16:37 +00:00
commit 966661305f
9 changed files with 606 additions and 304 deletions

View file

@ -0,0 +1,133 @@
"""Form field descriptors for adapter settings.
If a second nested settings type beyond RegionConfig appears,
refactor this helper to recurse over nested models.
"""
from dataclasses import dataclass
from typing import Any, Union, get_args, get_origin
from pydantic import BaseModel
from pydantic.fields import FieldInfo
from pydantic_core import PydanticUndefined
from central.config_models import RegionConfig
@dataclass
class FieldDescriptor:
"""Describes a form field for rendering."""
name: str
label: str
widget: str # "text", "number", "checkbox", "csv", "region"
current_value: Any
default: Any
description: str
required: bool
def _type_to_widget(field_name: str, field_type: type) -> str:
"""Map a Python type to a widget type."""
# Handle Optional/Union types
origin = get_origin(field_type)
args = get_args(field_type)
# Check for Optional[X] (Union[X, None])
if origin is Union or (origin is not None and type(None) in args):
# Get the non-None type
non_none_args = [a for a in args if a is not type(None)]
if non_none_args:
inner_type = non_none_args[0]
# Recursively determine widget for the inner type
return _type_to_widget(field_name, inner_type)
# Direct type checks
if field_type is str:
return "text"
if field_type is int:
return "number"
if field_type is bool:
return "checkbox"
if field_type is RegionConfig:
return "region"
# Check for list[str]
if origin is list:
if args and args[0] is str:
return "csv"
raise NotImplementedError(
f"Field '{field_name}' has unsupported list type: list[{args[0].__name__ if args else '?'}]"
)
# Check if it's a BaseModel subclass (nested model)
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
if field_type is RegionConfig:
return "region"
raise NotImplementedError(
f"Field '{field_name}' has unsupported nested type: {field_type.__name__}"
)
raise NotImplementedError(
f"Field '{field_name}' has unsupported type: {field_type}"
)
def _name_to_label(name: str) -> str:
"""Convert field name to human-readable label."""
return name.replace("_", " ").title()
def _is_undefined(value: Any) -> bool:
"""Check if a value is Pydantic's undefined sentinel."""
return value is PydanticUndefined
def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDescriptor]:
"""Generate field descriptors for a Pydantic model.
Args:
model_cls: The Pydantic model class (e.g., NWSSettings)
current: Current settings values from the database
Returns:
List of FieldDescriptor objects for rendering the form
"""
descriptors = []
for field_name, field_info in model_cls.model_fields.items():
# Get the field type
field_type = field_info.annotation
# Determine widget
widget = _type_to_widget(field_name, field_type)
# Get current value, falling back to default
if field_name in current:
current_value = current[field_name]
elif not _is_undefined(field_info.default):
current_value = field_info.default
else:
current_value = None
# Get default
default = field_info.default if not _is_undefined(field_info.default) else None
# Get description
description = ""
if field_info.description:
description = field_info.description
# Determine if required (no default and not Optional)
required = _is_undefined(field_info.default) and field_info.is_required()
descriptors.append(FieldDescriptor(
name=field_name,
label=_name_to_label(field_name),
widget=widget,
current_value=current_value,
default=default,
description=description,
required=required,
))
return descriptors

View file

@ -9,6 +9,7 @@ from typing import Any
logger = logging.getLogger("central.gui.routes") logger = logging.getLogger("central.gui.routes")
from fastapi import APIRouter, Depends, Form, Request from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi.responses import HTMLResponse, RedirectResponse, Response
from central.bootstrap_config import get_settings from central.bootstrap_config import get_settings
@ -43,7 +44,22 @@ from central.gui.audit import (
SYSTEM_UPDATE, SYSTEM_UPDATE,
write_audit, write_audit,
) )
from functools import cache
from central.gui.db import get_pool from central.gui.db import get_pool
from central.gui.form_descriptors import describe_fields, FieldDescriptor
from central.supervisor import discover_adapters
from pydantic import ValidationError
@cache
def _adapter_classes() -> dict:
"""Cached adapter class discovery.
GUI is a separate process from supervisor; walks pkgutil itself.
Python's import cache makes subsequent calls free.
"""
return discover_adapters()
router = APIRouter() router = APIRouter()
@ -1275,10 +1291,14 @@ async def adapters_edit_form(
pool = get_pool() pool = get_pool()
operator = request.state.operator operator = request.state.operator
# Look up the adapter class
adapter_classes = _adapter_classes()
adapter_cls = adapter_classes.get(name)
async with pool.acquire() as conn: async with pool.acquire() as conn:
row = await conn.fetchrow( row = await conn.fetchrow(
""" """
SELECT name, enabled, cadence_s, settings, paused_at, updated_at SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error
FROM config.adapters FROM config.adapters
WHERE name = $1 WHERE name = $1
""", """,
@ -1288,11 +1308,6 @@ async def adapters_edit_form(
if row is None: if row is None:
return Response(status_code=404, content="Adapter not found") return Response(status_code=404, content="Adapter not found")
# Get API keys for firms dropdown
api_keys = await conn.fetch(
"SELECT alias FROM config.api_keys ORDER BY alias"
)
# Get map tile settings from config.system # Get map tile settings from config.system
sys_row = await conn.fetchrow( sys_row = await conn.fetchrow(
"SELECT map_tile_url, map_attribution FROM config.system WHERE id = true" "SELECT map_tile_url, map_attribution FROM config.system WHERE id = true"
@ -1301,15 +1316,25 @@ async def adapters_edit_form(
tile_attribution = sys_row["map_attribution"] if sys_row else "&copy; OpenStreetMap contributors" tile_attribution = sys_row["map_attribution"] if sys_row else "&copy; OpenStreetMap contributors"
settings = row["settings"] or {} settings = row["settings"] or {}
# Build adapter dict with class metadata
adapter = { adapter = {
"name": row["name"], "name": row["name"],
"display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"],
"description": getattr(adapter_cls, "description", "") if adapter_cls else "",
"enabled": row["enabled"], "enabled": row["enabled"],
"cadence_s": row["cadence_s"], "cadence_s": row["cadence_s"],
"settings": settings, "settings": settings,
"paused_at": row["paused_at"], "paused_at": row["paused_at"],
"updated_at": row["updated_at"], "updated_at": row["updated_at"],
"last_error": row["last_error"],
} }
# Generate field descriptors if we have the adapter class
fields = []
if adapter_cls and hasattr(adapter_cls, "settings_schema"):
fields = describe_fields(adapter_cls.settings_schema, settings)
csrf_token = request.state.csrf_token csrf_token = request.state.csrf_token
response = templates.TemplateResponse( response = templates.TemplateResponse(
request=request, request=request,
@ -1318,11 +1343,9 @@ async def adapters_edit_form(
"operator": operator, "operator": operator,
"csrf_token": csrf_token, "csrf_token": csrf_token,
"adapter": adapter, "adapter": adapter,
"fields": fields,
"errors": None, "errors": None,
"form_data": None, "form_data": None,
"api_keys": [{"alias": k["alias"]} for k in api_keys],
"valid_satellites": _get_valid_satellites(),
"valid_feeds": sorted(_get_valid_feeds()),
"tile_url": tile_url, "tile_url": tile_url,
"tile_attribution": tile_attribution, "tile_attribution": tile_attribution,
}, },
@ -1347,19 +1370,20 @@ async def adapters_edit_submit(
if not form_csrf or form_csrf != request.state.csrf_token: if not form_csrf or form_csrf != request.state.csrf_token:
raise CsrfValidationError("Invalid CSRF token") raise CsrfValidationError("Invalid CSRF token")
# Parse form data # Look up the adapter class
form = await request.form() adapter_classes = _adapter_classes()
adapter_cls = adapter_classes.get(name)
# Parse common form fields
enabled = "enabled" in form enabled = "enabled" in form
cadence_s_str = form.get("cadence_s", "") cadence_s_str = form.get("cadence_s", "")
# Build form_data for re-render on error errors: dict[str, str] = {}
form_data: dict[str, Any] = { form_data: dict[str, Any] = {
"enabled": enabled, "enabled": enabled,
"cadence_s": cadence_s_str, "cadence_s": cadence_s_str,
} }
errors: dict[str, str] = {}
# Validate cadence_s # Validate cadence_s
try: try:
cadence_s = int(cadence_s_str) cadence_s = int(cadence_s_str)
@ -1373,7 +1397,7 @@ async def adapters_edit_submit(
# Get current adapter state # Get current adapter state
row = await conn.fetchrow( row = await conn.fetchrow(
""" """
SELECT name, enabled, cadence_s, settings, paused_at, updated_at SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error
FROM config.adapters FROM config.adapters
WHERE name = $1 WHERE name = $1
""", """,
@ -1384,56 +1408,38 @@ async def adapters_edit_submit(
return Response(status_code=404, content="Adapter not found") return Response(status_code=404, content="Adapter not found")
current_settings = row["settings"] or {} current_settings = row["settings"] or {}
new_settings = dict(current_settings)
# Adapter-specific validation and settings update # Parse and validate settings via Pydantic if we have the adapter class
if name == "nws": new_settings = {}
contact_email = form.get("contact_email", "").strip() if adapter_cls and hasattr(adapter_cls, "settings_schema"):
form_data["contact_email"] = contact_email schema = adapter_cls.settings_schema
if not contact_email: fields = describe_fields(schema, current_settings)
errors["contact_email"] = "Contact email is required"
elif not EMAIL_REGEX.match(contact_email): # Parse form values based on widget type
errors["contact_email"] = "Invalid email format" parsed_values = {}
for field in fields:
raw = form.get(field.name, "")
form_data[field.name] = raw
if field.widget == "text":
parsed_values[field.name] = raw.strip() if raw else None
elif field.widget == "number":
try:
parsed_values[field.name] = int(raw) if raw else None
except ValueError:
errors[field.name] = f"{field.label} must be a number"
elif field.widget == "checkbox":
parsed_values[field.name] = field.name in form
elif field.widget == "csv":
if raw.strip():
parsed_values[field.name] = [v.strip() for v in raw.split(",") if v.strip()]
else: else:
new_settings["contact_email"] = contact_email parsed_values[field.name] = []
elif field.widget == "region":
# Region handled separately below
pass
elif name == "firms": # Handle region fields (common pattern)
api_key_alias = form.get("api_key_alias", "").strip()
satellites = form.getlist("satellites")
form_data["api_key_alias"] = api_key_alias
form_data["satellites"] = satellites
# Validate api_key_alias if set
if api_key_alias:
key_exists = await conn.fetchrow(
"SELECT 1 FROM config.api_keys WHERE alias = $1",
api_key_alias,
)
if not key_exists:
errors["api_key_alias"] = f"API key alias '{api_key_alias}' does not exist"
else:
new_settings["api_key_alias"] = api_key_alias
else:
new_settings["api_key_alias"] = None
# Validate satellites
valid_sats = set(_get_valid_satellites())
invalid_sats = [s for s in satellites if s not in valid_sats]
if invalid_sats:
errors["satellites"] = f"Invalid satellites: {', '.join(invalid_sats)}"
else:
new_settings["satellites"] = satellites
elif name == "usgs_quake":
feed = form.get("feed", "").strip()
form_data["feed"] = feed
valid_feeds = _get_valid_feeds()
if feed not in valid_feeds:
errors["feed"] = f"Invalid feed. Must be one of: {', '.join(sorted(valid_feeds))}"
else:
new_settings["feed"] = feed
# Region validation (applies to all adapters)
region_north_str = form.get("region_north", "").strip() region_north_str = form.get("region_north", "").strip()
region_south_str = form.get("region_south", "").strip() region_south_str = form.get("region_south", "").strip()
region_east_str = form.get("region_east", "").strip() region_east_str = form.get("region_east", "").strip()
@ -1444,20 +1450,22 @@ async def adapters_edit_submit(
form_data["region_east"] = region_east_str form_data["region_east"] = region_east_str
form_data["region_west"] = region_west_str form_data["region_west"] = region_west_str
# Check if any region field has a value
has_region = any([region_north_str, region_south_str, region_east_str, region_west_str])
if has_region:
try: try:
region_north = float(region_north_str) region_north = float(region_north_str)
region_south = float(region_south_str) region_south = float(region_south_str)
region_east = float(region_east_str) region_east = float(region_east_str)
region_west = float(region_west_str) region_west = float(region_west_str)
# Validate latitude bounds
if not (-90 <= region_south < region_north <= 90): if not (-90 <= region_south < region_north <= 90):
errors["region"] = "Invalid latitude: south must be less than north, both between -90 and 90" errors["region"] = "Invalid latitude: south must be less than north, both between -90 and 90"
# Validate longitude bounds
elif not (-180 <= region_west < region_east <= 180): elif not (-180 <= region_west < region_east <= 180):
errors["region"] = "Invalid longitude: west must be less than east, both between -180 and 180" errors["region"] = "Invalid longitude: west must be less than east, both between -180 and 180"
else: else:
new_settings["region"] = { parsed_values["region"] = {
"north": region_north, "north": region_north,
"south": region_south, "south": region_south,
"east": region_east, "east": region_east,
@ -1465,22 +1473,26 @@ async def adapters_edit_submit(
} }
except ValueError: except ValueError:
errors["region"] = "Region coordinates must be valid numbers" errors["region"] = "Region coordinates must be valid numbers"
else:
parsed_values["region"] = None
# Only validate with Pydantic if no parse errors
if not errors:
try:
# Filter out None values for optional fields without defaults
validated_data = {k: v for k, v in parsed_values.items() if v is not None}
validated = schema(**validated_data)
new_settings = validated.model_dump(mode="json")
except ValidationError as e:
for err in e.errors():
field_name = err["loc"][0] if err["loc"] else "unknown"
errors[str(field_name)] = err["msg"]
else:
# No schema - just preserve existing settings
new_settings = dict(current_settings)
# If there are errors, re-render the form # If there are errors, re-render the form
if errors: if errors:
adapter = {
"name": row["name"],
"enabled": row["enabled"],
"cadence_s": row["cadence_s"],
"settings": current_settings,
"paused_at": row["paused_at"],
"updated_at": row["updated_at"],
}
api_keys = await conn.fetch(
"SELECT alias FROM config.api_keys ORDER BY alias"
)
# Get map tile settings for re-render # Get map tile settings for re-render
sys_row = await conn.fetchrow( sys_row = await conn.fetchrow(
"SELECT map_tile_url, map_attribution FROM config.system WHERE id = true" "SELECT map_tile_url, map_attribution FROM config.system WHERE id = true"
@ -1488,6 +1500,22 @@ async def adapters_edit_submit(
tile_url = sys_row["map_tile_url"] if sys_row else "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png" tile_url = sys_row["map_tile_url"] if sys_row else "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png"
tile_attribution = sys_row["map_attribution"] if sys_row else "&copy; OpenStreetMap contributors" tile_attribution = sys_row["map_attribution"] if sys_row else "&copy; OpenStreetMap contributors"
adapter = {
"name": row["name"],
"display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"],
"description": getattr(adapter_cls, "description", "") if adapter_cls else "",
"enabled": row["enabled"],
"cadence_s": row["cadence_s"],
"settings": current_settings,
"paused_at": row["paused_at"],
"updated_at": row["updated_at"],
"last_error": row["last_error"],
}
fields = []
if adapter_cls and hasattr(adapter_cls, "settings_schema"):
fields = describe_fields(adapter_cls.settings_schema, current_settings)
csrf_token = request.state.csrf_token csrf_token = request.state.csrf_token
response = templates.TemplateResponse( response = templates.TemplateResponse(
request=request, request=request,
@ -1496,11 +1524,9 @@ async def adapters_edit_submit(
"operator": operator, "operator": operator,
"csrf_token": csrf_token, "csrf_token": csrf_token,
"adapter": adapter, "adapter": adapter,
"fields": fields,
"errors": errors, "errors": errors,
"form_data": form_data, "form_data": form_data,
"api_keys": [{"alias": k["alias"]} for k in api_keys],
"valid_satellites": _get_valid_satellites(),
"valid_feeds": sorted(_get_valid_feeds()),
"tile_url": tile_url, "tile_url": tile_url,
"tile_attribution": tile_attribution, "tile_attribution": tile_attribution,
}, },

View file

@ -1,6 +1,6 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block title %}Central — Edit {{ adapter.name }}{% endblock %} {% block title %}Central — Edit {{ adapter.display_name }}{% endblock %}
{% block head %} {% block head %}
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin=""> <link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin="">
@ -10,35 +10,114 @@
{% endblock %} {% endblock %}
{% block content %} {% block content %}
<h1>Edit Adapter: {{ adapter.name }}</h1> <h1>{{ adapter.display_name }}</h1>
<p class="secondary">{{ adapter.description }}</p>
{% if adapter.paused_at %}
<article aria-label="Adapter Paused" style="background-color: var(--pico-mark-background-color); margin-bottom: 1rem;">
<strong>⏸️ Paused</strong> since {{ adapter.paused_at }}
</article>
{% endif %}
{% if adapter.last_error %}
<article aria-label="Last Error" style="background-color: var(--pico-del-color); margin-bottom: 1rem;">
<strong>Last Error:</strong> {{ adapter.last_error }}
</article>
{% endif %}
<form method="post" action="/adapters/{{ adapter.name }}"> <form method="post" action="/adapters/{{ adapter.name }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}"> <input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<fieldset> <fieldset>
<legend>Universal Settings</legend> <legend>Core Settings</legend>
<label> <label>
<input type="checkbox" name="enabled" {% if adapter.enabled %}checked{% endif %}> <input type="checkbox" name="enabled" {% if form_data %}{% if form_data.enabled %}checked{% endif %}{% elif adapter.enabled %}checked{% endif %}>
Enabled Enabled
</label> </label>
<label for="cadence_s">Cadence (seconds)</label> <label for="cadence_s">Cadence (seconds)</label>
<input type="number" id="cadence_s" name="cadence_s" value="{{ form_data.cadence_s if form_data else adapter.cadence_s }}" min="60" max="3600" required> <input type="number" id="cadence_s" name="cadence_s"
value="{{ form_data.cadence_s if form_data else adapter.cadence_s }}"
min="60" max="3600" required>
{% if errors and errors.cadence_s %} {% if errors and errors.cadence_s %}
<small style="color: var(--pico-color-red-500);">{{ errors.cadence_s }}</small> <small style="color: var(--pico-color-red-500);">{{ errors.cadence_s }}</small>
{% endif %} {% endif %}
</fieldset> </fieldset>
{% if fields %}
<fieldset> <fieldset>
<legend>Adapter-Specific Settings</legend> <legend>Adapter Settings</legend>
{% include "adapters_edit_" + adapter.name + ".html" %}
</fieldset>
{% for field in fields %}
{% if field.widget == "region" %}
{# Region is rendered in a separate fieldset below #}
{% elif field.widget == "text" %}
<label for="{{ field.name }}">{{ field.label }}</label>
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
{% if field.required %}required{% endif %}>
{% if field.description %}
<small>{{ field.description }}</small>
{% endif %}
{% if errors and errors[field.name] %}
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
{% endif %}
{% elif field.widget == "number" %}
<label for="{{ field.name }}">{{ field.label }}</label>
<input type="number" id="{{ field.name }}" name="{{ field.name }}"
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
{% if field.required %}required{% endif %}>
{% if field.description %}
<small>{{ field.description }}</small>
{% endif %}
{% if errors and errors[field.name] %}
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
{% endif %}
{% elif field.widget == "checkbox" %}
<label>
<input type="checkbox" name="{{ field.name }}"
{% if form_data and field.name in form_data %}
{% if form_data[field.name] %}checked{% endif %}
{% elif field.current_value %}checked{% endif %}>
{{ field.label }}
</label>
{% if field.description %}
<small>{{ field.description }}</small>
{% endif %}
{% if errors and errors[field.name] %}
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
{% endif %}
{% elif field.widget == "csv" %}
<label for="{{ field.name }}">{{ field.label }}</label>
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
value="{{ form_data[field.name] if form_data and field.name in form_data else (field.current_value | join(',') if field.current_value else '') }}"
{% if field.required %}required{% endif %}>
<small>Comma-separated values{% if field.description %} — {{ field.description }}{% endif %}</small>
{% if errors and errors[field.name] %}
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
{% endif %}
{% endif %}
{% endfor %}
</fieldset>
{% endif %}
{% set has_region = namespace(value=false) %}
{% for field in fields %}
{% if field.widget == "region" %}
{% set has_region.value = true %}
{% endif %}
{% endfor %}
{% if has_region.value %}
<fieldset> <fieldset>
<legend>Region</legend> <legend>Region</legend>
{% include "_region_picker.html" %} {% include "_region_picker.html" %}
</fieldset> </fieldset>
{% endif %}
<button type="submit">Save Changes</button> <button type="submit">Save Changes</button>
<a href="/adapters" role="button" class="outline">Cancel</a> <a href="/adapters" role="button" class="outline">Cancel</a>

View file

@ -1,21 +0,0 @@
<label for="api_key_alias">API Key Alias</label>
<select id="api_key_alias" name="api_key_alias">
<option value="" {% if not (form_data.api_key_alias if form_data else adapter.settings.api_key_alias) %}selected{% endif %}>(none)</option>
{% for key in api_keys %}
<option value="{{ key.alias }}" {% if (form_data.api_key_alias if form_data else adapter.settings.api_key_alias) == key.alias %}selected{% endif %}>{{ key.alias }}</option>
{% endfor %}
</select>
{% if errors and errors.api_key_alias %}
<small style="color: var(--pico-color-red-500);">{{ errors.api_key_alias }}</small>
{% endif %}
<label>Satellites</label>
{% for sat in valid_satellites %}
<label>
<input type="checkbox" name="satellites" value="{{ sat }}" {% if sat in (form_data.satellites if form_data else adapter.settings.satellites or []) %}checked{% endif %}>
{{ sat }}
</label>
{% endfor %}
{% if errors and errors.satellites %}
<small style="color: var(--pico-color-red-500);">{{ errors.satellites }}</small>
{% endif %}

View file

@ -1,5 +0,0 @@
<label for="contact_email">Contact Email</label>
<input type="email" id="contact_email" name="contact_email" value="{{ form_data.contact_email if form_data else adapter.settings.contact_email }}" required>
{% if errors and errors.contact_email %}
<small style="color: var(--pico-color-red-500);">{{ errors.contact_email }}</small>
{% endif %}

View file

@ -1,9 +0,0 @@
<label for="feed">Feed</label>
<select id="feed" name="feed" required>
{% for f in valid_feeds %}
<option value="{{ f }}" {% if (form_data.feed if form_data else adapter.settings.feed) == f %}selected{% endif %}>{{ f }}</option>
{% endfor %}
</select>
{% if errors and errors.feed %}
<small style="color: var(--pico-color-red-500);">{{ errors.feed }}</small>
{% endif %}

View file

@ -78,6 +78,7 @@ class TestAdaptersEditForm:
mock_request = MagicMock() mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="testop") mock_request.state.operator = MagicMock(id=1, username="testop")
mock_request.state.csrf_token = "test_csrf"
mock_conn = AsyncMock() mock_conn = AsyncMock()
mock_conn.fetchrow.side_effect = [ mock_conn.fetchrow.side_effect = [
@ -88,10 +89,10 @@ class TestAdaptersEditForm:
"settings": {"contact_email": "test@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, "settings": {"contact_email": "test@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
}, },
{"map_tile_url": "https://tile.example.com/{z}/{x}/{y}.png", "map_attribution": "Test"}, {"map_tile_url": "https://tile.example.com/{z}/{x}/{y}.png", "map_attribution": "Test"},
] ]
mock_conn.fetch.return_value = [] # No API keys
mock_pool = MagicMock() mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
@ -109,6 +110,8 @@ class TestAdaptersEditForm:
context = call_args.kwargs.get("context", call_args[1].get("context")) context = call_args.kwargs.get("context", call_args[1].get("context"))
assert context["adapter"]["name"] == "nws" assert context["adapter"]["name"] == "nws"
assert context["adapter"]["settings"]["contact_email"] == "test@example.com" assert context["adapter"]["settings"]["contact_email"] == "test@example.com"
# Verify fields are generated
assert "fields" in context
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_adapters_edit_nonexistent_returns_404(self): async def test_adapters_edit_nonexistent_returns_404(self):
@ -167,6 +170,7 @@ class TestAdaptersEditSubmit:
"settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, "settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
} }
mock_conn.execute = AsyncMock() mock_conn.execute = AsyncMock()
@ -190,9 +194,9 @@ class TestAdaptersEditSubmit:
mock_request = MagicMock() mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="testop") mock_request.state.operator = MagicMock(id=1, username="testop")
mock_request.state.csrf_token = "test_csrf_token"
mock_form = MagicMock() mock_form = MagicMock()
mock_request.state.csrf_token = "test_csrf_token"
mock_form.get.side_effect = lambda k, d="": { mock_form.get.side_effect = lambda k, d="": {
"csrf_token": "test_csrf_token", "csrf_token": "test_csrf_token",
"cadence_s": "30", "cadence_s": "30",
@ -215,10 +219,10 @@ class TestAdaptersEditSubmit:
"settings": {"contact_email": "test@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, "settings": {"contact_email": "test@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
}, },
{"map_tile_url": None, "map_attribution": None}, # system settings for re-render {"map_tile_url": None, "map_attribution": None}, # system settings for re-render
] ]
mock_conn.fetch.return_value = []
mock_pool = MagicMock() mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
@ -239,116 +243,6 @@ class TestAdaptersEditSubmit:
assert "cadence_s" in context["errors"] assert "cadence_s" in context["errors"]
assert "60" in context["errors"]["cadence_s"] or "3600" in context["errors"]["cadence_s"] assert "60" in context["errors"]["cadence_s"] or "3600" in context["errors"]["cadence_s"]
@pytest.mark.asyncio
async def test_adapters_edit_firms_unknown_api_key_shows_error(self):
"""POST /adapters/firms with unknown api_key_alias shows error."""
from central.gui.routes import adapters_edit_submit
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="testop")
mock_form = MagicMock()
mock_request.state.csrf_token = "test_csrf_token"
mock_form.get.side_effect = lambda k, d="": {
"csrf_token": "test_csrf_token",
"cadence_s": "300",
"api_key_alias": "nonexistent_key",
"region_north": "49.5",
"region_south": "31.0",
"region_east": "-102.0",
"region_west": "-124.5",
}.get(k, d)
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"]
mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form)
mock_conn = AsyncMock()
mock_conn.fetchrow.side_effect = [
{ # First call: get adapter
"name": "firms",
"enabled": True,
"cadence_s": 300,
"settings": {"api_key_alias": "firms", "satellites": ["VIIRS_SNPP_NRT"], "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}},
"paused_at": None,
"updated_at": None,
},
None, # Second call: check api_key exists - returns None
{"map_tile_url": None, "map_attribution": None}, # system settings for re-render
]
mock_conn.fetch.return_value = []
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_templates.TemplateResponse.return_value = mock_response
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await adapters_edit_submit(mock_request, "firms")
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert "api_key_alias" in context["errors"]
assert "nonexistent_key" in context["errors"]["api_key_alias"]
@pytest.mark.asyncio
async def test_adapters_edit_usgs_unknown_feed_shows_error(self):
"""POST /adapters/usgs_quake with unknown feed shows error."""
from central.gui.routes import adapters_edit_submit
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="testop")
mock_form = MagicMock()
mock_request.state.csrf_token = "test_csrf_token"
mock_form.get.side_effect = lambda k, d="": {
"csrf_token": "test_csrf_token",
"cadence_s": "120",
"feed": "invalid_feed",
"region_north": "49.0",
"region_south": "24.0",
"region_east": "-66.0",
"region_west": "-125.0",
}.get(k, d)
mock_form.getlist.return_value = []
mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form)
mock_conn = AsyncMock()
mock_conn.fetchrow.side_effect = [
{
"name": "usgs_quake",
"enabled": True,
"cadence_s": 120,
"settings": {"feed": "all_hour", "region": {"north": 49, "south": 24, "east": -66, "west": -125}},
"paused_at": None,
"updated_at": None,
},
{"map_tile_url": None, "map_attribution": None}, # system settings for re-render
]
mock_conn.fetch.return_value = []
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_templates.TemplateResponse.return_value = mock_response
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await adapters_edit_submit(mock_request, "usgs_quake")
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert "feed" in context["errors"]
class TestAdaptersAudit: class TestAdaptersAudit:
"""Test adapter audit logging.""" """Test adapter audit logging."""
@ -384,6 +278,7 @@ class TestAdaptersAudit:
"settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, "settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
} }
mock_conn.execute = AsyncMock() mock_conn.execute = AsyncMock()
@ -407,8 +302,6 @@ class TestAdaptersAudit:
assert captured_audit["target"] == "nws" assert captured_audit["target"] == "nws"
assert captured_audit["before"]["cadence_s"] == 60 assert captured_audit["before"]["cadence_s"] == 60
assert captured_audit["after"]["cadence_s"] == 120 assert captured_audit["after"]["cadence_s"] == 120
assert captured_audit["before"]["settings"]["contact_email"] == "old@example.com"
assert captured_audit["after"]["settings"]["contact_email"] == "new@example.com"
class TestAdaptersJsonbRegression: class TestAdaptersJsonbRegression:
@ -449,6 +342,7 @@ class TestAdaptersJsonbRegression:
"settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, # dict, as asyncpg returns "settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, # dict, as asyncpg returns
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
} }
mock_conn.execute = AsyncMock() mock_conn.execute = AsyncMock()
@ -468,7 +362,6 @@ class TestAdaptersJsonbRegression:
# CRITICAL: settings must be a dict, NOT a string # CRITICAL: settings must be a dict, NOT a string
# If json.dumps() was called, this would be a str like {contact_email: ...} # If json.dumps() was called, this would be a str like {contact_email: ...}
assert isinstance(settings_arg, dict), f"settings should be dict, got {type(settings_arg)}: {settings_arg}" assert isinstance(settings_arg, dict), f"settings should be dict, got {type(settings_arg)}: {settings_arg}"
assert settings_arg["contact_email"] == "test@example.com"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_audit_before_after_passed_as_dict(self): async def test_audit_before_after_passed_as_dict(self):
@ -501,6 +394,7 @@ class TestAdaptersJsonbRegression:
"settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, # dict "settings": {"contact_email": "old@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}}, # dict
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
} }
mock_conn.execute = AsyncMock() mock_conn.execute = AsyncMock()

View file

@ -0,0 +1,205 @@
"""Tests for form_descriptors module."""
import pytest
from pydantic import BaseModel
from typing import Optional
from central.gui.form_descriptors import describe_fields, FieldDescriptor, _type_to_widget
from central.config_models import RegionConfig
class SimpleSettings(BaseModel):
"""Simple settings model for testing."""
name: str
count: int
enabled: bool
class SettingsWithOptional(BaseModel):
"""Settings with optional fields."""
required_field: str
optional_field: Optional[str] = None
with_default: str = "default_value"
class SettingsWithList(BaseModel):
"""Settings with list field."""
tags: list[str]
class SettingsWithRegion(BaseModel):
"""Settings with region config."""
region: Optional[RegionConfig] = None
class TestTypeToWidget:
"""Tests for _type_to_widget function."""
def test_str_maps_to_text(self):
assert _type_to_widget("field", str) == "text"
def test_int_maps_to_number(self):
assert _type_to_widget("field", int) == "number"
def test_bool_maps_to_checkbox(self):
assert _type_to_widget("field", bool) == "checkbox"
def test_list_str_maps_to_csv(self):
assert _type_to_widget("field", list[str]) == "csv"
def test_region_config_maps_to_region(self):
assert _type_to_widget("field", RegionConfig) == "region"
def test_optional_region_maps_to_region(self):
assert _type_to_widget("field", Optional[RegionConfig]) == "region"
def test_optional_str_maps_to_text(self):
"""Optional[str] should map to text widget."""
assert _type_to_widget("field", Optional[str]) == "text"
def test_optional_int_maps_to_number(self):
"""Optional[int] should map to number widget."""
assert _type_to_widget("field", Optional[int]) == "number"
def test_unsupported_type_raises(self):
class CustomType:
pass
with pytest.raises(NotImplementedError):
_type_to_widget("field", CustomType)
class TestDescribeFields:
"""Tests for describe_fields function."""
def test_simple_model_fields(self):
"""describe_fields returns correct descriptors for simple model."""
fields = describe_fields(SimpleSettings, {"name": "test", "count": 5, "enabled": True})
assert len(fields) == 3
name_field = next(f for f in fields if f.name == "name")
assert name_field.label == "Name"
assert name_field.widget == "text"
assert name_field.current_value == "test"
count_field = next(f for f in fields if f.name == "count")
assert count_field.label == "Count"
assert count_field.widget == "number"
assert count_field.current_value == 5
enabled_field = next(f for f in fields if f.name == "enabled")
assert enabled_field.label == "Enabled"
assert enabled_field.widget == "checkbox"
assert enabled_field.current_value is True
def test_uses_current_values(self):
"""Current values from dict are used."""
fields = describe_fields(SimpleSettings, {"name": "current_name", "count": 42, "enabled": False})
name_field = next(f for f in fields if f.name == "name")
assert name_field.current_value == "current_name"
count_field = next(f for f in fields if f.name == "count")
assert count_field.current_value == 42
def test_missing_values_use_defaults(self):
"""Missing values fall back to model defaults."""
fields = describe_fields(SettingsWithOptional, {"required_field": "value"})
optional_field = next(f for f in fields if f.name == "optional_field")
assert optional_field.current_value is None
assert optional_field.widget == "text" # Optional[str] -> text
default_field = next(f for f in fields if f.name == "with_default")
assert default_field.current_value == "default_value"
def test_list_field_returns_csv_widget(self):
"""List[str] fields get csv widget."""
fields = describe_fields(SettingsWithList, {"tags": ["a", "b", "c"]})
tags_field = next(f for f in fields if f.name == "tags")
assert tags_field.widget == "csv"
assert tags_field.current_value == ["a", "b", "c"]
def test_region_field_returns_region_widget(self):
"""RegionConfig fields get region widget."""
fields = describe_fields(SettingsWithRegion, {
"region": {"north": 50.0, "south": 40.0, "east": -100.0, "west": -120.0}
})
region_field = next(f for f in fields if f.name == "region")
assert region_field.widget == "region"
def test_empty_current_dict(self):
"""Works with empty current values dict."""
fields = describe_fields(SettingsWithOptional, {})
required_field = next(f for f in fields if f.name == "required_field")
assert required_field.current_value is None
assert required_field.widget == "text"
def test_field_descriptor_attributes(self):
"""FieldDescriptor has all expected attributes."""
fields = describe_fields(SimpleSettings, {"name": "test", "count": 1, "enabled": True})
field = fields[0]
assert hasattr(field, "name")
assert hasattr(field, "label")
assert hasattr(field, "widget")
assert hasattr(field, "current_value")
assert hasattr(field, "default")
assert hasattr(field, "description")
assert hasattr(field, "required")
class TestRealAdapterSchemas:
"""Test with actual adapter settings schemas."""
def test_nws_settings(self):
"""NWSSettings generates correct field descriptors."""
from central.adapters.nws import NWSSettings
fields = describe_fields(NWSSettings, {"contact_email": "test@example.com"})
assert len(fields) >= 1
email_field = next(f for f in fields if f.name == "contact_email")
assert email_field.widget == "text"
assert email_field.current_value == "test@example.com"
def test_firms_settings(self):
"""FIRMSSettings generates correct field descriptors."""
from central.adapters.firms import FIRMSSettings
fields = describe_fields(FIRMSSettings, {
"api_key_alias": "firms_key",
"satellites": ["VIIRS_SNPP"]
})
key_field = next(f for f in fields if f.name == "api_key_alias")
assert key_field.widget == "text"
sat_field = next(f for f in fields if f.name == "satellites")
assert sat_field.widget == "csv"
assert sat_field.current_value == ["VIIRS_SNPP"]
def test_usgs_quake_settings(self):
"""USGSQuakeSettings generates correct field descriptors."""
from central.adapters.usgs_quake import USGSQuakeSettings
fields = describe_fields(USGSQuakeSettings, {"feed": "all_hour"})
feed_field = next(f for f in fields if f.name == "feed")
assert feed_field.widget == "text"
assert feed_field.current_value == "all_hour"
def test_all_adapters_have_region_field(self):
"""All adapter settings schemas include region field."""
from central.adapters.nws import NWSSettings
from central.adapters.firms import FIRMSSettings
from central.adapters.usgs_quake import USGSQuakeSettings
for schema in [NWSSettings, FIRMSSettings, USGSQuakeSettings]:
fields = describe_fields(schema, {})
region_field = next((f for f in fields if f.name == "region"), None)
assert region_field is not None, f"{schema.__name__} should have region field"
assert region_field.widget == "region"

View file

@ -21,6 +21,7 @@ class TestRegionPickerInTemplate:
mock_request = MagicMock() mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="testop") mock_request.state.operator = MagicMock(id=1, username="testop")
mock_request.state.csrf_token = "test_csrf"
mock_conn = AsyncMock() mock_conn = AsyncMock()
mock_conn.fetchrow.side_effect = [ mock_conn.fetchrow.side_effect = [
@ -35,13 +36,13 @@ class TestRegionPickerInTemplate:
}, },
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
}, },
{ # System settings row { # System settings row
"map_tile_url": "https://tile.example.com/{z}/{x}/{y}.png", "map_tile_url": "https://tile.example.com/{z}/{x}/{y}.png",
"map_attribution": "Test Attribution", "map_attribution": "Test Attribution",
}, },
] ]
mock_conn.fetch.return_value = [{"alias": "firms"}]
mock_pool = MagicMock() mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
@ -80,27 +81,26 @@ class TestRegionValidation:
"csrf_token": "test_csrf_token", "csrf_token": "test_csrf_token",
"cadence_s": "300", "cadence_s": "300",
"api_key_alias": "firms", "api_key_alias": "firms",
"satellites": "VIIRS_SNPP_NRT",
"region_north": "45.0", "region_north": "45.0",
"region_south": "35.0", "region_south": "35.0",
"region_east": "-100.0", "region_east": "-100.0",
"region_west": "-120.0", "region_west": "-120.0",
}.get(k, d) }.get(k, d)
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"] mock_form.getlist.return_value = []
mock_form.__contains__ = lambda self, k: k == "enabled" mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form) mock_request.form = AsyncMock(return_value=mock_form)
mock_conn = AsyncMock() mock_conn = AsyncMock()
mock_conn.fetchrow.side_effect = [ mock_conn.fetchrow.return_value = {
{ # Adapter row
"name": "firms", "name": "firms",
"enabled": True, "enabled": True,
"cadence_s": 300, "cadence_s": 300,
"settings": {"api_key_alias": "firms", "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}}, "settings": {"api_key_alias": "firms", "satellites": ["VIIRS_SNPP_NRT"], "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
}, "last_error": None,
{"id": 1}, # api_key exists check }
]
mock_conn.execute = AsyncMock() mock_conn.execute = AsyncMock()
mock_pool = MagicMock() mock_pool = MagicMock()
@ -139,12 +139,13 @@ class TestRegionValidation:
"csrf_token": "test_csrf_token", "csrf_token": "test_csrf_token",
"cadence_s": "300", "cadence_s": "300",
"api_key_alias": "firms", "api_key_alias": "firms",
"satellites": "VIIRS_SNPP_NRT",
"region_north": "30.0", # Less than south! "region_north": "30.0", # Less than south!
"region_south": "35.0", "region_south": "35.0",
"region_east": "-100.0", "region_east": "-100.0",
"region_west": "-120.0", "region_west": "-120.0",
}.get(k, d) }.get(k, d)
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"] mock_form.getlist.return_value = []
mock_form.__contains__ = lambda self, k: k == "enabled" mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form) mock_request.form = AsyncMock(return_value=mock_form)
@ -154,14 +155,13 @@ class TestRegionValidation:
"name": "firms", "name": "firms",
"enabled": True, "enabled": True,
"cadence_s": 300, "cadence_s": 300,
"settings": {"api_key_alias": "firms", "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}}, "settings": {"api_key_alias": "firms", "satellites": ["VIIRS_SNPP_NRT"], "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
}, },
{"id": 1}, # api_key exists
{"map_tile_url": None, "map_attribution": None}, # system settings for re-render {"map_tile_url": None, "map_attribution": None}, # system settings for re-render
] ]
mock_conn.fetch.return_value = [{"alias": "firms"}]
mock_pool = MagicMock() mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
@ -195,12 +195,13 @@ class TestRegionValidation:
"csrf_token": "test_csrf_token", "csrf_token": "test_csrf_token",
"cadence_s": "300", "cadence_s": "300",
"api_key_alias": "firms", "api_key_alias": "firms",
"satellites": "VIIRS_SNPP_NRT",
"region_north": "45.0", "region_north": "45.0",
"region_south": "35.0", "region_south": "35.0",
"region_east": "-130.0", # Less than west! "region_east": "-130.0", # Less than west!
"region_west": "-120.0", "region_west": "-120.0",
}.get(k, d) }.get(k, d)
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"] mock_form.getlist.return_value = []
mock_form.__contains__ = lambda self, k: k == "enabled" mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form) mock_request.form = AsyncMock(return_value=mock_form)
@ -210,14 +211,13 @@ class TestRegionValidation:
"name": "firms", "name": "firms",
"enabled": True, "enabled": True,
"cadence_s": 300, "cadence_s": 300,
"settings": {"api_key_alias": "firms", "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}}, "settings": {"api_key_alias": "firms", "satellites": ["VIIRS_SNPP_NRT"], "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
}, },
{"id": 1},
{"map_tile_url": None, "map_attribution": None}, {"map_tile_url": None, "map_attribution": None},
] ]
mock_conn.fetch.return_value = [{"alias": "firms"}]
mock_pool = MagicMock() mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
@ -251,12 +251,13 @@ class TestRegionValidation:
"csrf_token": "test_csrf_token", "csrf_token": "test_csrf_token",
"cadence_s": "300", "cadence_s": "300",
"api_key_alias": "firms", "api_key_alias": "firms",
"satellites": "VIIRS_SNPP_NRT",
"region_north": "95.0", # > 90! "region_north": "95.0", # > 90!
"region_south": "35.0", "region_south": "35.0",
"region_east": "-100.0", "region_east": "-100.0",
"region_west": "-120.0", "region_west": "-120.0",
}.get(k, d) }.get(k, d)
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"] mock_form.getlist.return_value = []
mock_form.__contains__ = lambda self, k: k == "enabled" mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form) mock_request.form = AsyncMock(return_value=mock_form)
@ -266,14 +267,13 @@ class TestRegionValidation:
"name": "firms", "name": "firms",
"enabled": True, "enabled": True,
"cadence_s": 300, "cadence_s": 300,
"settings": {"api_key_alias": "firms", "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}}, "settings": {"api_key_alias": "firms", "satellites": ["VIIRS_SNPP_NRT"], "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}},
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
"last_error": None,
}, },
{"id": 1},
{"map_tile_url": None, "map_attribution": None}, {"map_tile_url": None, "map_attribution": None},
] ]
mock_conn.fetch.return_value = [{"alias": "firms"}]
mock_pool = MagicMock() mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
@ -310,30 +310,30 @@ class TestRegionAuditLog:
"csrf_token": "test_csrf_token", "csrf_token": "test_csrf_token",
"cadence_s": "300", "cadence_s": "300",
"api_key_alias": "firms", "api_key_alias": "firms",
"satellites": "VIIRS_SNPP_NRT",
"region_north": "45.0", "region_north": "45.0",
"region_south": "35.0", "region_south": "35.0",
"region_east": "-100.0", "region_east": "-100.0",
"region_west": "-120.0", "region_west": "-120.0",
}.get(k, d) }.get(k, d)
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"] mock_form.getlist.return_value = []
mock_form.__contains__ = lambda self, k: k == "enabled" mock_form.__contains__ = lambda self, k: k == "enabled"
mock_request.form = AsyncMock(return_value=mock_form) mock_request.form = AsyncMock(return_value=mock_form)
mock_conn = AsyncMock() mock_conn = AsyncMock()
mock_conn.fetchrow.side_effect = [ mock_conn.fetchrow.return_value = {
{
"name": "firms", "name": "firms",
"enabled": True, "enabled": True,
"cadence_s": 300, "cadence_s": 300,
"settings": { "settings": {
"api_key_alias": "firms", "api_key_alias": "firms",
"satellites": ["VIIRS_SNPP_NRT"],
"region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5} "region": {"north": 49.5, "south": 31.0, "east": -102.0, "west": -124.5}
}, },
"paused_at": None, "paused_at": None,
"updated_at": None, "updated_at": None,
}, "last_error": None,
{"id": 1}, }
]
mock_conn.execute = AsyncMock() mock_conn.execute = AsyncMock()
mock_pool = MagicMock() mock_pool = MagicMock()