mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Merge pull request #44 from zvx-echo6/feature/3-l5-backend-settings-schema
fix(3-L.5): per-backend settings schemas (fixes build_enrichers TypeError)
This commit is contained in:
commit
3c27534e9e
11 changed files with 493 additions and 63 deletions
|
|
@ -16,6 +16,7 @@ import logging
|
|||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
||||
|
||||
|
|
@ -30,9 +31,24 @@ _WARMUP_LAT = 43.6150
|
|||
_WARMUP_LON = -116.2023
|
||||
|
||||
|
||||
class NaviBackendSettings(BaseModel):
|
||||
"""Settings for NaviBackend. Mirrors __init__ defaults exactly."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
base_url: str = Field(default=DEFAULT_BASE_URL, description="Navi /api/reverse base URL")
|
||||
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
|
||||
headers: dict[str, str] | None = Field(
|
||||
default=None, description="Extra request headers (e.g. Authorization)"
|
||||
)
|
||||
warmup: bool = Field(default=True, description="Fire a warmup ping on construction")
|
||||
|
||||
|
||||
class NaviBackend:
|
||||
"""GeocoderBackend backed by the composed Navi /api/reverse endpoint."""
|
||||
|
||||
settings_schema = NaviBackendSettings
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
|
|
|
|||
|
|
@ -7,11 +7,23 @@ which satisfies the GeocoderBackend contract while resolving nothing.
|
|||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
|
||||
|
||||
class NoOpBackendSettings(BaseModel):
|
||||
"""No-op backend takes no settings. extra='forbid' makes switching to
|
||||
NoOpBackend while stale backend_settings (e.g. a base_url) remain a clean
|
||||
ValidationError instead of a TypeError at construction."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
|
||||
class NoOpBackend:
|
||||
"""GeocoderBackend that resolves no fields."""
|
||||
|
||||
settings_schema = NoOpBackendSettings
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
return all_null_bundle()
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from typing import Any
|
|||
from urllib.parse import urlencode
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
|
||||
|
|
@ -25,6 +26,21 @@ DEFAULT_BASE_URL = "https://nominatim.openstreetmap.org"
|
|||
DEFAULT_USER_AGENT = "central-enrichment/0.5 (https://github.com/zvx-echo6/central)"
|
||||
|
||||
|
||||
class NominatimBackendSettings(BaseModel):
|
||||
"""Settings for NominatimBackend. Mirrors __init__ defaults exactly."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
base_url: str = Field(default=DEFAULT_BASE_URL, description="Nominatim /reverse base URL")
|
||||
user_agent: str = Field(
|
||||
default=DEFAULT_USER_AGENT, description="User-Agent (public OSM requires one)"
|
||||
)
|
||||
rate_limit_per_sec: float = Field(
|
||||
default=1.0, description="Outbound request cap; 0 disables (self-hosted)"
|
||||
)
|
||||
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
|
||||
|
||||
|
||||
class NominatimBackend:
|
||||
"""GeocoderBackend backed by an OSM Nominatim /reverse endpoint.
|
||||
|
||||
|
|
@ -32,6 +48,8 @@ class NominatimBackend:
|
|||
set it to 0 to disable for self-hosted instances.
|
||||
"""
|
||||
|
||||
settings_schema = NominatimBackendSettings
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ from typing import Any
|
|||
from urllib.parse import urlencode
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from central.enrichment.geocoder import all_null_bundle
|
||||
|
||||
|
|
@ -23,9 +24,21 @@ logger = logging.getLogger(__name__)
|
|||
DEFAULT_BASE_URL = "http://localhost:2322"
|
||||
|
||||
|
||||
class PhotonBackendSettings(BaseModel):
|
||||
"""Settings for PhotonBackend. Mirrors __init__ defaults exactly."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
base_url: str = Field(default=DEFAULT_BASE_URL, description="Photon /reverse base URL")
|
||||
timeout_s: float = Field(default=10.0, description="Per-request timeout in seconds")
|
||||
headers: dict[str, str] | None = Field(default=None, description="Extra request headers")
|
||||
|
||||
|
||||
class PhotonBackend:
|
||||
"""GeocoderBackend backed by a raw Photon /reverse endpoint."""
|
||||
|
||||
settings_schema = PhotonBackendSettings
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ land in PR K.
|
|||
import logging
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -38,6 +40,12 @@ def all_null_bundle() -> dict[str, Any]:
|
|||
class GeocoderBackend(Protocol):
|
||||
"""The pluggable reverse-geocoding layer beneath GeocoderEnricher."""
|
||||
|
||||
# Pydantic model (extra='forbid') describing this backend's accepted
|
||||
# settings. The supervisor validates config.enrichment.backend_settings
|
||||
# against it before instantiating, turning a config/settings mismatch into
|
||||
# a clean ValidationError instead of a constructor TypeError.
|
||||
settings_schema: type[BaseModel]
|
||||
|
||||
async def reverse(self, lat: float, lon: float) -> dict[str, Any]:
|
||||
"""Return canonical geocoder fields (see GEOCODER_FIELDS).
|
||||
|
||||
|
|
|
|||
|
|
@ -66,6 +66,8 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
|
|||
return "text", None
|
||||
if field_type is int:
|
||||
return "number", None
|
||||
if field_type is float:
|
||||
return "number", None
|
||||
if field_type is bool:
|
||||
return "checkbox", None
|
||||
if field_type is RegionConfig:
|
||||
|
|
|
|||
|
|
@ -1995,12 +1995,26 @@ async def streams_update(
|
|||
# =============================================================================
|
||||
|
||||
|
||||
def _enrichment_fields(current: dict) -> list[FieldDescriptor]:
|
||||
"""Field descriptors for the single-row EnrichmentConfig form (generic
|
||||
machinery — same describe_fields used by adapter pages)."""
|
||||
def _outer_enrichment_fields(current: dict) -> list[FieldDescriptor]:
|
||||
"""EnrichmentConfig form fields EXCEPT backend_settings — that one is
|
||||
rendered as a per-backend <fieldset> via _backend_fields()."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
|
||||
return describe_fields(EnrichmentConfig, current)
|
||||
return [
|
||||
f for f in describe_fields(EnrichmentConfig, current)
|
||||
if f.name != "backend_settings"
|
||||
]
|
||||
|
||||
|
||||
def _backend_fields(backend_class: str | None, current_bs: dict) -> list[FieldDescriptor]:
|
||||
"""Field descriptors for the selected backend's settings_schema, or [] when
|
||||
the backend class is unknown. Same generic describe_fields machinery."""
|
||||
from central.supervisor import _BACKEND_REGISTRY
|
||||
|
||||
cls = _BACKEND_REGISTRY.get(backend_class or "")
|
||||
if cls is None:
|
||||
return []
|
||||
return describe_fields(cls.settings_schema, current_bs or {})
|
||||
|
||||
|
||||
async def _read_enrichment_row(conn) -> dict:
|
||||
|
|
@ -2013,39 +2027,55 @@ async def _read_enrichment_row(conn) -> dict:
|
|||
return dict(row) if row is not None else {}
|
||||
|
||||
|
||||
def _enrichment_context(request, *, outer_fields, backend_fields, backend_class,
|
||||
errors=None, form_data=None, backend_form_data=None):
|
||||
return {
|
||||
"operator": request.state.operator,
|
||||
"csrf_token": request.state.csrf_token,
|
||||
"outer_fields": outer_fields,
|
||||
"backend_fields": backend_fields,
|
||||
"backend_class": backend_class,
|
||||
"errors": errors,
|
||||
"form_data": form_data,
|
||||
"backend_form_data": backend_form_data,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/enrichment", response_class=HTMLResponse)
|
||||
async def enrichment_form(request: Request) -> HTMLResponse:
|
||||
"""Render the enrichment config form."""
|
||||
"""Render the enrichment config form (outer fields + a per-backend fieldset
|
||||
for the currently-selected backend_class)."""
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
operator = request.state.operator
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
current = await _read_enrichment_row(conn)
|
||||
|
||||
response = templates.TemplateResponse(
|
||||
backend_class = current.get("backend_class") or "NoOpBackend"
|
||||
current_bs = current.get("backend_settings") or {}
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="enrichment.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": request.state.csrf_token,
|
||||
"fields": _enrichment_fields(current),
|
||||
"errors": None,
|
||||
"form_data": None,
|
||||
},
|
||||
context=_enrichment_context(
|
||||
request,
|
||||
outer_fields=_outer_enrichment_fields(current),
|
||||
backend_fields=_backend_fields(backend_class, current_bs),
|
||||
backend_class=backend_class,
|
||||
),
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
@router.post("/enrichment")
|
||||
async def enrichment_update(request: Request) -> Response:
|
||||
"""Validate + persist the enrichment config. Hot-reload picks it up via
|
||||
the config.enrichment NOTIFY trigger."""
|
||||
"""Validate + persist the enrichment config. Hot-reload picks it up via the
|
||||
config.enrichment NOTIFY trigger. backend_settings is validated against the
|
||||
SUBMITTED backend_class's settings_schema."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.supervisor import _BACKEND_REGISTRY
|
||||
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
operator = request.state.operator
|
||||
|
||||
form = await request.form()
|
||||
if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
|
||||
|
|
@ -2053,9 +2083,11 @@ async def enrichment_update(request: Request) -> Response:
|
|||
|
||||
errors: dict[str, str] = {}
|
||||
form_data: dict[str, Any] = {}
|
||||
backend_form_data: dict[str, Any] = {}
|
||||
parsed: dict[str, Any] = {}
|
||||
|
||||
for field in _enrichment_fields({}):
|
||||
# --- outer EnrichmentConfig fields (backend_settings excluded) ---
|
||||
for field in _outer_enrichment_fields({}):
|
||||
raw = form.get(field.name, "")
|
||||
form_data[field.name] = raw
|
||||
if field.widget == "number":
|
||||
|
|
@ -2063,25 +2095,49 @@ async def enrichment_update(request: Request) -> Response:
|
|||
parsed[field.name] = int(raw) if raw else None
|
||||
except ValueError:
|
||||
errors[field.name] = f"{field.label} must be a number"
|
||||
elif field.widget == "json":
|
||||
if not raw or not raw.strip():
|
||||
parsed[field.name] = {}
|
||||
else:
|
||||
try:
|
||||
loaded = json.loads(raw)
|
||||
if not isinstance(loaded, dict):
|
||||
errors[field.name] = f"{field.label} must be a JSON object"
|
||||
else:
|
||||
parsed[field.name] = loaded
|
||||
except json.JSONDecodeError as e:
|
||||
errors[field.name] = f"{field.label} is not valid JSON: {e}"
|
||||
else: # text
|
||||
parsed[field.name] = raw.strip() if raw else None
|
||||
|
||||
submitted_backend_class = parsed.get("backend_class")
|
||||
|
||||
# --- backend settings fieldset, validated against the SUBMITTED backend ---
|
||||
backend_settings: dict[str, Any] = {}
|
||||
backend_cls = _BACKEND_REGISTRY.get(submitted_backend_class or "")
|
||||
if backend_cls is None and submitted_backend_class:
|
||||
errors["backend_class"] = f"Unknown backend: {submitted_backend_class}"
|
||||
elif backend_cls is not None:
|
||||
for f in describe_fields(backend_cls.settings_schema, {}):
|
||||
formkey = f"bs_{f.name}"
|
||||
raw = form.get(formkey, "")
|
||||
backend_form_data[formkey] = raw
|
||||
if f.widget == "checkbox":
|
||||
backend_settings[f.name] = formkey in form
|
||||
elif f.widget == "json":
|
||||
if raw and raw.strip():
|
||||
try:
|
||||
backend_settings[f.name] = json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
errors[formkey] = f"{f.label} is not valid JSON: {e}"
|
||||
# blank -> omit, schema default applies
|
||||
else: # text / number — let pydantic coerce, omit blanks for defaults
|
||||
if raw.strip() != "":
|
||||
backend_settings[f.name] = raw.strip()
|
||||
if not errors:
|
||||
try:
|
||||
backend_settings = backend_cls.settings_schema.model_validate(
|
||||
backend_settings
|
||||
).model_dump()
|
||||
except ValidationError as e:
|
||||
for err in e.errors():
|
||||
loc = err["loc"][0] if err["loc"] else "unknown"
|
||||
errors[f"bs_{loc}"] = err["msg"]
|
||||
|
||||
# --- outer EnrichmentConfig validation ---
|
||||
if not errors:
|
||||
try:
|
||||
validated = EnrichmentConfig(
|
||||
**{k: v for k, v in parsed.items() if v is not None}
|
||||
**{k: v for k, v in parsed.items() if v is not None},
|
||||
backend_settings=backend_settings,
|
||||
)
|
||||
except ValidationError as e:
|
||||
for err in e.errors():
|
||||
|
|
@ -2089,21 +2145,22 @@ async def enrichment_update(request: Request) -> Response:
|
|||
errors[str(loc)] = err["msg"]
|
||||
|
||||
if errors:
|
||||
async with pool.acquire() as conn:
|
||||
current = await _read_enrichment_row(conn)
|
||||
response = templates.TemplateResponse(
|
||||
# Re-render against the SUBMITTED backend_class so field errors attach
|
||||
# to the right schema (operator may be mid-switch with a typo).
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="enrichment.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": request.state.csrf_token,
|
||||
"fields": _enrichment_fields(current),
|
||||
"errors": errors,
|
||||
"form_data": form_data,
|
||||
},
|
||||
context=_enrichment_context(
|
||||
request,
|
||||
outer_fields=_outer_enrichment_fields({}),
|
||||
backend_fields=_backend_fields(submitted_backend_class, backend_settings),
|
||||
backend_class=submitted_backend_class,
|
||||
errors=errors,
|
||||
form_data=form_data,
|
||||
backend_form_data=backend_form_data,
|
||||
),
|
||||
status_code=200,
|
||||
)
|
||||
return response
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
|
|
|
|||
|
|
@ -7,7 +7,9 @@
|
|||
<p class="secondary">
|
||||
Central-side event enrichment. Results are attached to each event under
|
||||
<code>data._enriched.<enricher></code>. Changes hot-reload into the
|
||||
supervisor; switching backend invalidates the enrichment cache.
|
||||
supervisor; switching backend invalidates the enrichment cache. Backend
|
||||
settings below are validated against the selected backend; switching
|
||||
backend then saving re-renders this form with the new backend's fields.
|
||||
</p>
|
||||
|
||||
<form method="post" action="/enrichment">
|
||||
|
|
@ -15,8 +17,7 @@
|
|||
|
||||
<fieldset>
|
||||
<legend>Configuration</legend>
|
||||
|
||||
{% for field in fields %}
|
||||
{% for field in outer_fields %}
|
||||
{% if field.widget == "text" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
|
||||
|
|
@ -26,24 +27,59 @@
|
|||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
|
||||
{% elif field.widget == "number" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<input type="number" id="{{ field.name }}" name="{{ field.name }}"
|
||||
<input type="number" step="any" id="{{ field.name }}" name="{{ field.name }}"
|
||||
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||
{% if field.required %}required{% endif %}>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</fieldset>
|
||||
|
||||
<fieldset>
|
||||
<legend>Backend settings — {{ backend_class }}</legend>
|
||||
{% if not backend_fields %}
|
||||
<small>This backend takes no settings.</small>
|
||||
{% endif %}
|
||||
{% for field in backend_fields %}
|
||||
{% set fk = "bs_" ~ field.name %}
|
||||
{% if field.widget == "text" %}
|
||||
<label for="{{ fk }}">{{ field.label }}</label>
|
||||
<input type="text" id="{{ fk }}" name="{{ fk }}"
|
||||
value="{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else field.current_value or '' }}">
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "number" %}
|
||||
<label for="{{ fk }}">{{ field.label }}</label>
|
||||
<input type="number" step="any" id="{{ fk }}" name="{{ fk }}"
|
||||
value="{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else field.current_value or '' }}">
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "checkbox" %}
|
||||
<label>
|
||||
<input type="checkbox" name="{{ fk }}"
|
||||
{% if backend_form_data %}{% if backend_form_data[fk] %}checked{% endif %}{% elif field.current_value %}checked{% endif %}>
|
||||
{{ field.label }}
|
||||
</label>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% elif field.widget == "json" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<textarea id="{{ field.name }}" name="{{ field.name }}" rows="6"
|
||||
placeholder="{}">{{ form_data[field.name] if form_data and field.name in form_data else (field.current_value | tojson if field.current_value else '{}') }}</textarea>
|
||||
<label for="{{ fk }}">{{ field.label }}</label>
|
||||
<textarea id="{{ fk }}" name="{{ fk }}" rows="4"
|
||||
placeholder="{}">{{ backend_form_data[fk] if backend_form_data and fk in backend_form_data else (field.current_value | tojson if field.current_value else '') }}</textarea>
|
||||
<small>JSON object{% if field.description %} — {{ field.description }}{% endif %}</small>
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% if errors and errors[fk] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[fk] }}</small>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from typing import Any
|
|||
|
||||
import nats
|
||||
from nats.js import JetStreamContext
|
||||
from pydantic import ValidationError
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapter_discovery import discover_adapters
|
||||
|
|
@ -56,7 +57,12 @@ def build_enrichers(
|
|||
startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change).
|
||||
"""
|
||||
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
|
||||
backend = backend_cls(**enrichment_config.backend_settings)
|
||||
# Validate backend_settings against the backend's settings_schema BEFORE
|
||||
# constructing. A mismatch (e.g. a stale base_url left in the row after
|
||||
# switching to NoOpBackend) raises a clean pydantic ValidationError here
|
||||
# instead of a TypeError inside the backend constructor.
|
||||
validated = backend_cls.settings_schema.model_validate(enrichment_config.backend_settings)
|
||||
backend = backend_cls(**validated.model_dump())
|
||||
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
|
||||
return [enricher_cls(backend, cache=cache)]
|
||||
|
||||
|
|
@ -680,17 +686,27 @@ class Supervisor:
|
|||
)
|
||||
|
||||
def _rebuild_enrichers(self, config: EnrichmentConfig) -> None:
|
||||
"""Rebuild the active enricher set + cache from an EnrichmentConfig."""
|
||||
"""Rebuild the active enricher set + cache from an EnrichmentConfig.
|
||||
|
||||
Builds into locals first and commits to instance state only on success,
|
||||
so a ValidationError (bad backend_settings) leaves the previously-active
|
||||
enrichers/config/cache untouched and propagates to the caller.
|
||||
"""
|
||||
cache = EnrichmentCache(ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s)
|
||||
enrichers = build_enrichers(config, cache) # may raise ValidationError
|
||||
self._active_enrichment_config = config
|
||||
self._enrichment_cache = EnrichmentCache(
|
||||
ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s
|
||||
)
|
||||
self._enrichers = build_enrichers(config, self._enrichment_cache)
|
||||
self._enrichment_cache = cache
|
||||
self._enrichers = enrichers
|
||||
|
||||
async def _handle_enrichment_change(self) -> None:
|
||||
"""Re-read config.enrichment and rebuild enrichers. Invalidate the cache
|
||||
when the backend changed, so stale results from the previous backend
|
||||
don't survive until TTL expiry."""
|
||||
don't survive until TTL expiry.
|
||||
|
||||
Invalid backend_settings (ValidationError) leave the previous backend
|
||||
running — the supervisor stays up; the operator fixes the row and the
|
||||
next NOTIFY brings it in cleanly.
|
||||
"""
|
||||
new_config = await self._config_source.get_enrichment_config()
|
||||
old_config = self._active_enrichment_config
|
||||
backend_changed = (
|
||||
|
|
@ -698,7 +714,19 @@ class Supervisor:
|
|||
or new_config.backend_settings != old_config.backend_settings
|
||||
or new_config.enricher_class != old_config.enricher_class
|
||||
)
|
||||
self._rebuild_enrichers(new_config)
|
||||
try:
|
||||
self._rebuild_enrichers(new_config)
|
||||
except ValidationError as e:
|
||||
logger.error(
|
||||
"Enrichment config invalid; keeping previous backend",
|
||||
extra={
|
||||
"enricher_class": new_config.enricher_class,
|
||||
"backend_class": new_config.backend_class,
|
||||
"backend_settings": new_config.backend_settings,
|
||||
"errors": e.errors(),
|
||||
},
|
||||
)
|
||||
return
|
||||
if backend_changed:
|
||||
deleted = await self._enrichment_cache.invalidate()
|
||||
logger.info(
|
||||
|
|
|
|||
235
tests/test_backend_settings_schema.py
Normal file
235
tests/test_backend_settings_schema.py
Normal file
|
|
@ -0,0 +1,235 @@
|
|||
"""Tests for per-backend settings schemas (PR L.5).
|
||||
|
||||
Each GeocoderBackend declares a Pydantic settings_schema (extra='forbid').
|
||||
build_enrichers validates backend_settings against it BEFORE constructing, so
|
||||
a config/settings mismatch is a clean ValidationError, not a TypeError. The
|
||||
supervisor's hot-reload keeps the previous backend running on ValidationError;
|
||||
the GUI POST re-renders with field errors and does not write the DB row.
|
||||
|
||||
Regression guard for the 2026-05-20 incident: switching backend_class to
|
||||
NoOpBackend while backend_settings still held {"base_url": ...} crashed
|
||||
_rebuild_enrichers with `TypeError: NoOpBackend() takes no arguments`.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from central.config_models import EnrichmentConfig
|
||||
from central.enrichment.cache import EnrichmentCache
|
||||
from central.supervisor import _BACKEND_REGISTRY, build_enrichers
|
||||
|
||||
|
||||
# --- schemas exist + extra='forbid' -----------------------------------------
|
||||
|
||||
def test_every_backend_declares_a_settings_schema():
|
||||
for name, cls in _BACKEND_REGISTRY.items():
|
||||
schema = getattr(cls, "settings_schema", None)
|
||||
assert schema is not None, f"{name} has no settings_schema"
|
||||
assert isinstance(schema, type) and issubclass(schema, BaseModel), name
|
||||
|
||||
|
||||
def test_every_settings_schema_forbids_extra():
|
||||
for name, cls in _BACKEND_REGISTRY.items():
|
||||
with pytest.raises(ValidationError):
|
||||
cls.settings_schema.model_validate({"definitely_not_a_field": 1})
|
||||
|
||||
|
||||
def test_navi_schema_accepts_valid_kwargs_and_preserves_defaults():
|
||||
from central.enrichment.backends.navi import NaviBackendSettings
|
||||
|
||||
m = NaviBackendSettings()
|
||||
assert m.base_url == "http://localhost:8440"
|
||||
assert m.timeout_s == 10.0 # preserved from __init__, NOT 5.0
|
||||
assert m.headers is None
|
||||
assert m.warmup is True
|
||||
m2 = NaviBackendSettings(base_url="http://navi:8440", timeout_s=3.0, warmup=False)
|
||||
assert m2.base_url == "http://navi:8440" and m2.timeout_s == 3.0
|
||||
|
||||
|
||||
def test_noop_schema_has_no_fields():
|
||||
from central.enrichment.backends.no_op import NoOpBackendSettings
|
||||
|
||||
assert list(NoOpBackendSettings.model_fields) == []
|
||||
|
||||
|
||||
# --- build_enrichers validation ----------------------------------------------
|
||||
|
||||
def _cache(tmp_path) -> EnrichmentCache:
|
||||
return EnrichmentCache(tmp_path / "c.db")
|
||||
|
||||
|
||||
def test_build_enrichers_raises_validation_error_not_typeerror(tmp_path):
|
||||
"""The exact 2026-05-20 bug: NoOpBackend + stale base_url. Must be a clean
|
||||
ValidationError, never a TypeError from the constructor."""
|
||||
cfg = EnrichmentConfig(backend_class="NoOpBackend", backend_settings={"base_url": "http://x"})
|
||||
with pytest.raises(ValidationError):
|
||||
build_enrichers(cfg, _cache(tmp_path))
|
||||
|
||||
|
||||
def test_build_enrichers_navi_valid_settings(tmp_path):
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
|
||||
cfg = EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "warmup": False},
|
||||
)
|
||||
enrichers = build_enrichers(cfg, _cache(tmp_path))
|
||||
assert isinstance(enrichers[0]._backend, NaviBackend)
|
||||
|
||||
|
||||
def test_build_enrichers_navi_unknown_setting_rejected(tmp_path):
|
||||
cfg = EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "bogus": 1},
|
||||
)
|
||||
with pytest.raises(ValidationError):
|
||||
build_enrichers(cfg, _cache(tmp_path))
|
||||
|
||||
|
||||
# --- supervisor hot-reload keeps previous backend on ValidationError ---------
|
||||
|
||||
def _supervisor():
|
||||
from central import supervisor as sup_mod
|
||||
|
||||
config_source = MagicMock()
|
||||
config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig())
|
||||
return sup_mod.Supervisor(
|
||||
config_source=config_source,
|
||||
config_store=MagicMock(),
|
||||
nats_url="nats://localhost:4222",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_enrichment_change_keeps_previous_on_validation_error():
|
||||
"""Navi active, then a bad NOTIFY (NoOp + leftover base_url) arrives. The
|
||||
supervisor must keep NaviBackend running and not crash."""
|
||||
from central.enrichment.backends.navi import NaviBackend
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
|
||||
sup = _supervisor()
|
||||
# Establish a valid NaviBackend active state.
|
||||
sup._rebuild_enrichers(EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "warmup": False},
|
||||
))
|
||||
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||
active_before = sup._active_enrichment_config
|
||||
|
||||
# Bad config arrives via NOTIFY: NoOp + stale base_url -> ValidationError.
|
||||
sup._config_source.get_enrichment_config = AsyncMock(
|
||||
return_value=EnrichmentConfig(
|
||||
backend_class="NoOpBackend",
|
||||
backend_settings={"base_url": "http://navi:8440"},
|
||||
)
|
||||
)
|
||||
await sup._handle_enrichment_change() # must NOT raise
|
||||
|
||||
# Previous backend still active; config unchanged; cache NOT invalidated.
|
||||
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||
assert sup._active_enrichment_config is active_before
|
||||
cache_cls.return_value.invalidate.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_enrichment_change_applies_valid_noop_with_empty_settings():
|
||||
"""Switching to NoOp with empty backend_settings (the correct way) applies
|
||||
cleanly and invalidates the cache."""
|
||||
from central.enrichment.backends.no_op import NoOpBackend
|
||||
|
||||
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=3))
|
||||
sup = _supervisor()
|
||||
sup._rebuild_enrichers(EnrichmentConfig(
|
||||
backend_class="NaviBackend",
|
||||
backend_settings={"base_url": "http://navi:8440", "warmup": False},
|
||||
))
|
||||
sup._config_source.get_enrichment_config = AsyncMock(
|
||||
return_value=EnrichmentConfig(backend_class="NoOpBackend", backend_settings={})
|
||||
)
|
||||
await sup._handle_enrichment_change()
|
||||
assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
|
||||
cache_cls.return_value.invalidate.assert_awaited()
|
||||
|
||||
|
||||
# --- GUI POST validates backend_settings + does not write on error ----------
|
||||
|
||||
def _mock_pool(conn):
|
||||
pool = MagicMock()
|
||||
cm = MagicMock()
|
||||
cm.__aenter__ = AsyncMock(return_value=conn)
|
||||
cm.__aexit__ = AsyncMock(return_value=None)
|
||||
pool.acquire = MagicMock(return_value=cm)
|
||||
return pool
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gui_post_rejects_bad_backend_settings_without_writing():
|
||||
"""POST with a bad backend setting (timeout_s non-numeric) re-renders with a
|
||||
field error keyed bs_timeout_s and does NOT execute the DB upsert."""
|
||||
from central.gui.routes import enrichment_update
|
||||
|
||||
request = MagicMock()
|
||||
request.state.operator = MagicMock(username="op")
|
||||
request.state.csrf_token = "tok"
|
||||
|
||||
form = {
|
||||
"csrf_token": "tok",
|
||||
"enricher_class": "GeocoderEnricher",
|
||||
"backend_class": "NaviBackend",
|
||||
"cache_ttl_s": "86400",
|
||||
"bs_base_url": "http://navi:8440",
|
||||
"bs_timeout_s": "not-a-number",
|
||||
}
|
||||
request.form = AsyncMock(return_value=form)
|
||||
|
||||
conn = MagicMock()
|
||||
conn.execute = AsyncMock()
|
||||
conn.fetchrow = AsyncMock(return_value={})
|
||||
templates = MagicMock()
|
||||
templates.TemplateResponse.return_value = MagicMock()
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=templates), \
|
||||
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
|
||||
await enrichment_update(request)
|
||||
|
||||
conn.execute.assert_not_awaited() # no DB write on validation failure
|
||||
ctx = templates.TemplateResponse.call_args.kwargs["context"]
|
||||
assert "bs_timeout_s" in ctx["errors"]
|
||||
assert ctx["backend_class"] == "NaviBackend" # re-rendered against SUBMITTED backend
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gui_post_writes_on_valid_settings():
|
||||
from central.gui.routes import enrichment_update
|
||||
|
||||
request = MagicMock()
|
||||
request.state.operator = MagicMock(username="op")
|
||||
request.state.csrf_token = "tok"
|
||||
form = {
|
||||
"csrf_token": "tok",
|
||||
"enricher_class": "GeocoderEnricher",
|
||||
"backend_class": "NaviBackend",
|
||||
"cache_ttl_s": "86400",
|
||||
"bs_base_url": "http://navi:8440",
|
||||
"bs_timeout_s": "10",
|
||||
}
|
||||
request.form = AsyncMock(return_value=form)
|
||||
conn = MagicMock()
|
||||
conn.execute = AsyncMock()
|
||||
templates = MagicMock()
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=templates), \
|
||||
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
|
||||
resp = await enrichment_update(request)
|
||||
|
||||
conn.execute.assert_awaited() # DB upsert ran
|
||||
# the backend_settings written are the validated/dumped dict
|
||||
args = conn.execute.call_args.args
|
||||
assert "INSERT INTO config.enrichment" in args[0]
|
||||
assert resp.status_code == 302
|
||||
|
|
@ -203,6 +203,11 @@ async def test_enrichment_form_renders():
|
|||
await enrichment_form(request)
|
||||
|
||||
ctx = templates.TemplateResponse.call_args.kwargs["context"]
|
||||
field_widgets = {f.name: f.widget for f in ctx["fields"]}
|
||||
assert field_widgets["backend_settings"] == "json"
|
||||
# PR L.5: outer fields exclude backend_settings (now a per-backend fieldset);
|
||||
# NoOpBackend's fieldset has zero fields.
|
||||
outer = {f.name for f in ctx["outer_fields"]}
|
||||
assert "backend_settings" not in outer
|
||||
assert "backend_class" in outer
|
||||
assert ctx["backend_class"] == "NoOpBackend"
|
||||
assert ctx["backend_fields"] == []
|
||||
assert ctx["csrf_token"] == "tok"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue