mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Merge pull request #43 from zvx-echo6/feature/3-k5-enrichment-config-plumbing
feat(3-K.5): operator-settable EnrichmentConfig (config plumbing)
This commit is contained in:
commit
bd809846ea
13 changed files with 604 additions and 15 deletions
44
sql/migrations/024_add_config_enrichment.sql
Normal file
44
sql/migrations/024_add_config_enrichment.sql
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
-- Migration: 024_add_config_enrichment
|
||||||
|
-- Adds config.enrichment — the single-row, operator-settable enrichment config
|
||||||
|
-- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY.
|
||||||
|
--
|
||||||
|
-- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)).
|
||||||
|
-- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty
|
||||||
|
-- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs,
|
||||||
|
-- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page
|
||||||
|
-- after this merges.
|
||||||
|
--
|
||||||
|
-- The seed mirrors central.config_models.EnrichmentConfig() defaults.
|
||||||
|
-- Regenerate via:
|
||||||
|
-- sudo -u central .venv/bin/python -c \
|
||||||
|
-- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())"
|
||||||
|
--
|
||||||
|
-- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ...
|
||||||
|
-- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER).
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS config.enrichment (
|
||||||
|
id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true),
|
||||||
|
enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher',
|
||||||
|
backend_class TEXT NOT NULL DEFAULT 'NoOpBackend',
|
||||||
|
backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||||
|
cache_ttl_s INTEGER NOT NULL DEFAULT 86400,
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Reuse the existing updated_at trigger function (migration 002).
|
||||||
|
DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment;
|
||||||
|
CREATE TRIGGER enrichment_set_updated_at
|
||||||
|
BEFORE UPDATE ON config.enrichment
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION config.set_updated_at();
|
||||||
|
|
||||||
|
-- Reuse the existing NOTIFY function (migration 001) so the supervisor's
|
||||||
|
-- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE
|
||||||
|
-- branch emits 'enrichment:' (empty key — single-row table has no natural key).
|
||||||
|
DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment;
|
||||||
|
CREATE TRIGGER enrichment_notify
|
||||||
|
AFTER INSERT OR UPDATE OR DELETE ON config.enrichment
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION config.notify_config_change();
|
||||||
|
|
||||||
|
-- Seed the single framework-default row (NoOp; no deployment-specific values).
|
||||||
|
INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING;
|
||||||
|
|
@ -8,7 +8,7 @@ import logging
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from typing import Protocol, runtime_checkable
|
from typing import Protocol, runtime_checkable
|
||||||
|
|
||||||
from central.config_models import AdapterConfig
|
from central.config_models import AdapterConfig, EnrichmentConfig
|
||||||
from central.config_store import ConfigStore
|
from central.config_store import ConfigStore
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -26,6 +26,10 @@ class ConfigSource(Protocol):
|
||||||
"""Get configuration for a specific adapter."""
|
"""Get configuration for a specific adapter."""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||||
|
"""Get the enrichment configuration."""
|
||||||
|
...
|
||||||
|
|
||||||
async def watch_for_changes(
|
async def watch_for_changes(
|
||||||
self,
|
self,
|
||||||
callback: Callable[[str, str], Awaitable[None] | None],
|
callback: Callable[[str, str], Awaitable[None] | None],
|
||||||
|
|
@ -65,6 +69,10 @@ class DbConfigSource:
|
||||||
"""Get a specific adapter from database."""
|
"""Get a specific adapter from database."""
|
||||||
return await self._store.get_adapter(name)
|
return await self._store.get_adapter(name)
|
||||||
|
|
||||||
|
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||||
|
"""Get the enrichment configuration from database."""
|
||||||
|
return await self._store.get_enrichment_config()
|
||||||
|
|
||||||
async def watch_for_changes(
|
async def watch_for_changes(
|
||||||
self,
|
self,
|
||||||
callback: Callable[[str, str], Awaitable[None] | None],
|
callback: Callable[[str, str], Awaitable[None] | None],
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ from typing import Any
|
||||||
|
|
||||||
import asyncpg
|
import asyncpg
|
||||||
|
|
||||||
from central.config_models import AdapterConfig, StreamConfig
|
from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig
|
||||||
from central.crypto import decrypt, encrypt
|
from central.crypto import decrypt, encrypt
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -129,6 +129,48 @@ class ConfigStore:
|
||||||
name,
|
name,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Enrichment configuration (single-row config.enrichment)
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||||
|
"""Read the single config.enrichment row.
|
||||||
|
|
||||||
|
Falls back to EnrichmentConfig() framework defaults if the row is
|
||||||
|
somehow absent (it is migration-seeded, so this is belt-and-suspenders).
|
||||||
|
"""
|
||||||
|
async with self._pool.acquire() as conn:
|
||||||
|
row = await conn.fetchrow(
|
||||||
|
"""
|
||||||
|
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
|
||||||
|
FROM config.enrichment
|
||||||
|
WHERE id = true
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
if row is None:
|
||||||
|
return EnrichmentConfig()
|
||||||
|
return EnrichmentConfig(**dict(row))
|
||||||
|
|
||||||
|
async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None:
|
||||||
|
"""Write the single config.enrichment row (id = true)."""
|
||||||
|
async with self._pool.acquire() as conn:
|
||||||
|
await conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO config.enrichment
|
||||||
|
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
|
||||||
|
VALUES (true, $1, $2, $3, $4)
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
enricher_class = EXCLUDED.enricher_class,
|
||||||
|
backend_class = EXCLUDED.backend_class,
|
||||||
|
backend_settings = EXCLUDED.backend_settings,
|
||||||
|
cache_ttl_s = EXCLUDED.cache_ttl_s
|
||||||
|
""",
|
||||||
|
config.enricher_class,
|
||||||
|
config.backend_class,
|
||||||
|
config.backend_settings, # JSON-encoded by the codec
|
||||||
|
config.cache_ttl_s,
|
||||||
|
)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
# Stream configuration
|
# Stream configuration
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,10 @@ from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
DEFAULT_BASE_URL = "http://192.168.1.130:8440"
|
# Generic default — operators point this at their Navi instance via the
|
||||||
|
# /enrichment config page (backend_settings.base_url). No deployment-specific
|
||||||
|
# host belongs in source.
|
||||||
|
DEFAULT_BASE_URL = "http://localhost:8440"
|
||||||
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
|
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
|
||||||
_WARMUP_LAT = 43.6150
|
_WARMUP_LAT = 43.6150
|
||||||
_WARMUP_LON = -116.2023
|
_WARMUP_LON = -116.2023
|
||||||
|
|
|
||||||
|
|
@ -124,3 +124,27 @@ class EnrichmentCache:
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Cache a bundle (idempotent upsert on the rounded-coords key)."""
|
"""Cache a bundle (idempotent upsert on the rounded-coords key)."""
|
||||||
await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload)
|
await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload)
|
||||||
|
|
||||||
|
def _invalidate_sync(self, enricher_name: str | None) -> int:
|
||||||
|
conn = self._connect()
|
||||||
|
try:
|
||||||
|
if enricher_name is None:
|
||||||
|
cur = conn.execute("DELETE FROM enrichment_cache")
|
||||||
|
else:
|
||||||
|
cur = conn.execute(
|
||||||
|
"DELETE FROM enrichment_cache WHERE enricher_name = ?",
|
||||||
|
(enricher_name,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
return cur.rowcount
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
async def invalidate(self, enricher_name: str | None = None) -> int:
|
||||||
|
"""Drop cached bundles. Scoped to one enricher when given, else all.
|
||||||
|
|
||||||
|
Called when enrichment config changes — a new backend would otherwise
|
||||||
|
keep returning the previous backend's cached results until TTL expiry.
|
||||||
|
Returns the number of rows deleted.
|
||||||
|
"""
|
||||||
|
return await asyncio.to_thread(self._invalidate_sync, enricher_name)
|
||||||
|
|
|
||||||
|
|
@ -88,6 +88,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
|
||||||
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
|
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings).
|
||||||
|
# The form renders the value as JSON; the POST handler parses it back.
|
||||||
|
if field_type is dict or origin is dict:
|
||||||
|
return "json", None
|
||||||
|
|
||||||
# Check if it's a BaseModel subclass (nested model other than RegionConfig)
|
# Check if it's a BaseModel subclass (nested model other than RegionConfig)
|
||||||
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
|
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
|
|
|
||||||
|
|
@ -1990,6 +1990,142 @@ async def streams_update(
|
||||||
return RedirectResponse(url="/streams", status_code=302)
|
return RedirectResponse(url="/streams", status_code=302)
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Enrichment config route
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def _enrichment_fields(current: dict) -> list[FieldDescriptor]:
|
||||||
|
"""Field descriptors for the single-row EnrichmentConfig form (generic
|
||||||
|
machinery — same describe_fields used by adapter pages)."""
|
||||||
|
from central.config_models import EnrichmentConfig
|
||||||
|
|
||||||
|
return describe_fields(EnrichmentConfig, current)
|
||||||
|
|
||||||
|
|
||||||
|
async def _read_enrichment_row(conn) -> dict:
|
||||||
|
row = await conn.fetchrow(
|
||||||
|
"""
|
||||||
|
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
|
||||||
|
FROM config.enrichment WHERE id = true
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
return dict(row) if row is not None else {}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/enrichment", response_class=HTMLResponse)
|
||||||
|
async def enrichment_form(request: Request) -> HTMLResponse:
|
||||||
|
"""Render the enrichment config form."""
|
||||||
|
templates = _get_templates()
|
||||||
|
pool = get_pool()
|
||||||
|
operator = request.state.operator
|
||||||
|
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
current = await _read_enrichment_row(conn)
|
||||||
|
|
||||||
|
response = templates.TemplateResponse(
|
||||||
|
request=request,
|
||||||
|
name="enrichment.html",
|
||||||
|
context={
|
||||||
|
"operator": operator,
|
||||||
|
"csrf_token": request.state.csrf_token,
|
||||||
|
"fields": _enrichment_fields(current),
|
||||||
|
"errors": None,
|
||||||
|
"form_data": None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/enrichment")
|
||||||
|
async def enrichment_update(request: Request) -> Response:
|
||||||
|
"""Validate + persist the enrichment config. Hot-reload picks it up via
|
||||||
|
the config.enrichment NOTIFY trigger."""
|
||||||
|
from central.config_models import EnrichmentConfig
|
||||||
|
|
||||||
|
templates = _get_templates()
|
||||||
|
pool = get_pool()
|
||||||
|
operator = request.state.operator
|
||||||
|
|
||||||
|
form = await request.form()
|
||||||
|
if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
|
||||||
|
raise CsrfValidationError("Invalid CSRF token")
|
||||||
|
|
||||||
|
errors: dict[str, str] = {}
|
||||||
|
form_data: dict[str, Any] = {}
|
||||||
|
parsed: dict[str, Any] = {}
|
||||||
|
|
||||||
|
for field in _enrichment_fields({}):
|
||||||
|
raw = form.get(field.name, "")
|
||||||
|
form_data[field.name] = raw
|
||||||
|
if field.widget == "number":
|
||||||
|
try:
|
||||||
|
parsed[field.name] = int(raw) if raw else None
|
||||||
|
except ValueError:
|
||||||
|
errors[field.name] = f"{field.label} must be a number"
|
||||||
|
elif field.widget == "json":
|
||||||
|
if not raw or not raw.strip():
|
||||||
|
parsed[field.name] = {}
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
loaded = json.loads(raw)
|
||||||
|
if not isinstance(loaded, dict):
|
||||||
|
errors[field.name] = f"{field.label} must be a JSON object"
|
||||||
|
else:
|
||||||
|
parsed[field.name] = loaded
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
errors[field.name] = f"{field.label} is not valid JSON: {e}"
|
||||||
|
else: # text
|
||||||
|
parsed[field.name] = raw.strip() if raw else None
|
||||||
|
|
||||||
|
if not errors:
|
||||||
|
try:
|
||||||
|
validated = EnrichmentConfig(
|
||||||
|
**{k: v for k, v in parsed.items() if v is not None}
|
||||||
|
)
|
||||||
|
except ValidationError as e:
|
||||||
|
for err in e.errors():
|
||||||
|
loc = err["loc"][0] if err["loc"] else "unknown"
|
||||||
|
errors[str(loc)] = err["msg"]
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
current = await _read_enrichment_row(conn)
|
||||||
|
response = templates.TemplateResponse(
|
||||||
|
request=request,
|
||||||
|
name="enrichment.html",
|
||||||
|
context={
|
||||||
|
"operator": operator,
|
||||||
|
"csrf_token": request.state.csrf_token,
|
||||||
|
"fields": _enrichment_fields(current),
|
||||||
|
"errors": errors,
|
||||||
|
"form_data": form_data,
|
||||||
|
},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
await conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO config.enrichment
|
||||||
|
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
|
||||||
|
VALUES (true, $1, $2, $3, $4)
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
enricher_class = EXCLUDED.enricher_class,
|
||||||
|
backend_class = EXCLUDED.backend_class,
|
||||||
|
backend_settings = EXCLUDED.backend_settings,
|
||||||
|
cache_ttl_s = EXCLUDED.cache_ttl_s
|
||||||
|
""",
|
||||||
|
validated.enricher_class,
|
||||||
|
validated.backend_class,
|
||||||
|
validated.backend_settings, # encoded as jsonb by the pool codec
|
||||||
|
validated.cache_ttl_s,
|
||||||
|
)
|
||||||
|
|
||||||
|
return RedirectResponse(url="/enrichment", status_code=302)
|
||||||
|
|
||||||
|
|
||||||
# Alias validation regex
|
# Alias validation regex
|
||||||
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')
|
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@
|
||||||
<li><a href="/adapters">Adapters</a></li>
|
<li><a href="/adapters">Adapters</a></li>
|
||||||
<li><a href="/events">Events</a></li>
|
<li><a href="/events">Events</a></li>
|
||||||
<li><a href="/streams">Streams</a></li>
|
<li><a href="/streams">Streams</a></li>
|
||||||
|
<li><a href="/enrichment">Enrichment</a></li>
|
||||||
<li><a href="/api-keys">API Keys</a></li>
|
<li><a href="/api-keys">API Keys</a></li>
|
||||||
<li>{{ operator.username }}</li>
|
<li>{{ operator.username }}</li>
|
||||||
<li><a href="/change-password">Change Password</a></li>
|
<li><a href="/change-password">Change Password</a></li>
|
||||||
|
|
|
||||||
54
src/central/gui/templates/enrichment.html
Normal file
54
src/central/gui/templates/enrichment.html
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Central — Enrichment{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<h1>Enrichment</h1>
|
||||||
|
<p class="secondary">
|
||||||
|
Central-side event enrichment. Results are attached to each event under
|
||||||
|
<code>data._enriched.<enricher></code>. Changes hot-reload into the
|
||||||
|
supervisor; switching backend invalidates the enrichment cache.
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<form method="post" action="/enrichment">
|
||||||
|
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
|
||||||
|
|
||||||
|
<fieldset>
|
||||||
|
<legend>Configuration</legend>
|
||||||
|
|
||||||
|
{% for field in fields %}
|
||||||
|
{% if field.widget == "text" %}
|
||||||
|
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||||
|
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
|
||||||
|
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||||
|
{% if field.required %}required{% endif %}>
|
||||||
|
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||||
|
{% if errors and errors[field.name] %}
|
||||||
|
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% elif field.widget == "number" %}
|
||||||
|
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||||
|
<input type="number" id="{{ field.name }}" name="{{ field.name }}"
|
||||||
|
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||||
|
{% if field.required %}required{% endif %}>
|
||||||
|
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||||
|
{% if errors and errors[field.name] %}
|
||||||
|
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% elif field.widget == "json" %}
|
||||||
|
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||||
|
<textarea id="{{ field.name }}" name="{{ field.name }}" rows="6"
|
||||||
|
placeholder="{}">{{ form_data[field.name] if form_data and field.name in form_data else (field.current_value | tojson if field.current_value else '{}') }}</textarea>
|
||||||
|
<small>JSON object{% if field.description %} — {{ field.description }}{% endif %}</small>
|
||||||
|
{% if errors and errors[field.name] %}
|
||||||
|
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||||
|
{% endif %}
|
||||||
|
{% endif %}
|
||||||
|
{% endfor %}
|
||||||
|
</fieldset>
|
||||||
|
|
||||||
|
<button type="submit">Save Changes</button>
|
||||||
|
</form>
|
||||||
|
{% endblock %}
|
||||||
|
|
@ -47,16 +47,16 @@ _BACKEND_REGISTRY: dict[str, type] = {
|
||||||
|
|
||||||
def build_enrichers(
|
def build_enrichers(
|
||||||
enrichment_config: EnrichmentConfig,
|
enrichment_config: EnrichmentConfig,
|
||||||
cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH,
|
cache: EnrichmentCache,
|
||||||
) -> list[Enricher]:
|
) -> list[Enricher]:
|
||||||
"""Instantiate the configured enricher(s) with their backend + cache.
|
"""Instantiate the configured enricher(s) with their backend + the given cache.
|
||||||
|
|
||||||
Read once at supervisor startup — enrichment config is NOT hot-reloaded
|
The supervisor owns the cache (so it can invalidate it on a config change)
|
||||||
in PR J (see EnrichmentConfig docstring).
|
and passes it in. Enrichment config is read from config.enrichment at
|
||||||
|
startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change).
|
||||||
"""
|
"""
|
||||||
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
|
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
|
||||||
backend = backend_cls(**enrichment_config.backend_settings)
|
backend = backend_cls(**enrichment_config.backend_settings)
|
||||||
cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s)
|
|
||||||
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
|
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
|
||||||
return [enricher_cls(backend, cache=cache)]
|
return [enricher_cls(backend, cache=cache)]
|
||||||
|
|
||||||
|
|
@ -165,9 +165,15 @@ class Supervisor:
|
||||||
self._config_store = config_store
|
self._config_store = config_store
|
||||||
self._nats_url = nats_url
|
self._nats_url = nats_url
|
||||||
self._cloudevents_config = cloudevents_config
|
self._cloudevents_config = cloudevents_config
|
||||||
# Enrichment is read once at startup (no hot-reload in PR J).
|
# Enrichment: a valid default set is built now so the supervisor is
|
||||||
|
# never in a half-state; start() then overrides it from the live
|
||||||
|
# config.enrichment row, and _on_config_change hot-reloads it.
|
||||||
|
self._active_enrichment_config = enrichment_config or EnrichmentConfig()
|
||||||
|
self._enrichment_cache = EnrichmentCache(
|
||||||
|
ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s
|
||||||
|
)
|
||||||
self._enrichers: list[Enricher] = build_enrichers(
|
self._enrichers: list[Enricher] = build_enrichers(
|
||||||
enrichment_config or EnrichmentConfig()
|
self._active_enrichment_config, self._enrichment_cache
|
||||||
)
|
)
|
||||||
self._adapters = discover_adapters()
|
self._adapters = discover_adapters()
|
||||||
self._nc: nats.NATS | None = None
|
self._nc: nats.NATS | None = None
|
||||||
|
|
@ -673,11 +679,52 @@ class Supervisor:
|
||||||
extra={"stream": stream_config.name, "error": str(e)},
|
extra={"stream": stream_config.name, "error": str(e)},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _rebuild_enrichers(self, config: EnrichmentConfig) -> None:
|
||||||
|
"""Rebuild the active enricher set + cache from an EnrichmentConfig."""
|
||||||
|
self._active_enrichment_config = config
|
||||||
|
self._enrichment_cache = EnrichmentCache(
|
||||||
|
ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s
|
||||||
|
)
|
||||||
|
self._enrichers = build_enrichers(config, self._enrichment_cache)
|
||||||
|
|
||||||
|
async def _handle_enrichment_change(self) -> None:
|
||||||
|
"""Re-read config.enrichment and rebuild enrichers. Invalidate the cache
|
||||||
|
when the backend changed, so stale results from the previous backend
|
||||||
|
don't survive until TTL expiry."""
|
||||||
|
new_config = await self._config_source.get_enrichment_config()
|
||||||
|
old_config = self._active_enrichment_config
|
||||||
|
backend_changed = (
|
||||||
|
new_config.backend_class != old_config.backend_class
|
||||||
|
or new_config.backend_settings != old_config.backend_settings
|
||||||
|
or new_config.enricher_class != old_config.enricher_class
|
||||||
|
)
|
||||||
|
self._rebuild_enrichers(new_config)
|
||||||
|
if backend_changed:
|
||||||
|
deleted = await self._enrichment_cache.invalidate()
|
||||||
|
logger.info(
|
||||||
|
"Enrichment backend changed; cache invalidated",
|
||||||
|
extra={
|
||||||
|
"enricher_class": new_config.enricher_class,
|
||||||
|
"backend_class": new_config.backend_class,
|
||||||
|
"rows_deleted": deleted,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"Enrichment config changed (cache retained)",
|
||||||
|
extra={"cache_ttl_s": new_config.cache_ttl_s},
|
||||||
|
)
|
||||||
|
|
||||||
async def _on_config_change(self, table: str, key: str) -> None:
|
async def _on_config_change(self, table: str, key: str) -> None:
|
||||||
"""Handle a configuration change notification.
|
"""Handle a configuration change notification.
|
||||||
|
|
||||||
Called when NOTIFY fires for config changes.
|
Called when NOTIFY fires for config changes.
|
||||||
"""
|
"""
|
||||||
|
# Handle enrichment config changes (single-row config.enrichment)
|
||||||
|
if table == "enrichment":
|
||||||
|
await self._handle_enrichment_change()
|
||||||
|
return
|
||||||
|
|
||||||
# Handle stream changes
|
# Handle stream changes
|
||||||
if table == "streams":
|
if table == "streams":
|
||||||
stream_name = key
|
stream_name = key
|
||||||
|
|
@ -769,6 +816,18 @@ class Supervisor:
|
||||||
# Ensure streams exist with correct configuration
|
# Ensure streams exist with correct configuration
|
||||||
await self._ensure_streams()
|
await self._ensure_streams()
|
||||||
|
|
||||||
|
# Load the operator-set enrichment config (overrides the constructor
|
||||||
|
# default); hot-reloaded thereafter via _on_config_change.
|
||||||
|
enrichment_config = await self._config_source.get_enrichment_config()
|
||||||
|
self._rebuild_enrichers(enrichment_config)
|
||||||
|
logger.info(
|
||||||
|
"Enrichment configured",
|
||||||
|
extra={
|
||||||
|
"enricher_class": enrichment_config.enricher_class,
|
||||||
|
"backend_class": enrichment_config.backend_class,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# Load and start enabled adapters
|
# Load and start enabled adapters
|
||||||
enabled_adapters = await self._config_source.list_enabled_adapters()
|
enabled_adapters = await self._config_source.list_enabled_adapters()
|
||||||
for config in enabled_adapters:
|
for config in enabled_adapters:
|
||||||
|
|
|
||||||
208
tests/test_enrichment_config_plumbing.py
Normal file
208
tests/test_enrichment_config_plumbing.py
Normal file
|
|
@ -0,0 +1,208 @@
|
||||||
|
"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5).
|
||||||
|
|
||||||
|
Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload
|
||||||
|
rebuild, cache invalidation on backend change (but not on TTL-only change),
|
||||||
|
EnrichmentCache.invalidate, the generic json widget for backend_settings, and
|
||||||
|
the /enrichment GUI render. No real DB / NATS — pool, config_source, and the
|
||||||
|
EnrichmentCache class are mocked.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from central.config_models import EnrichmentConfig
|
||||||
|
from central.enrichment.cache import EnrichmentCache
|
||||||
|
from central.enrichment.backends.navi import NaviBackend
|
||||||
|
from central.enrichment.backends.no_op import NoOpBackend
|
||||||
|
from central.gui.form_descriptors import describe_fields
|
||||||
|
|
||||||
|
|
||||||
|
# --- mock pool/conn helpers -------------------------------------------------
|
||||||
|
|
||||||
|
def _mock_pool(conn: MagicMock) -> MagicMock:
|
||||||
|
pool = MagicMock()
|
||||||
|
acquire_cm = MagicMock()
|
||||||
|
acquire_cm.__aenter__ = AsyncMock(return_value=conn)
|
||||||
|
acquire_cm.__aexit__ = AsyncMock(return_value=None)
|
||||||
|
pool.acquire = MagicMock(return_value=acquire_cm)
|
||||||
|
return pool
|
||||||
|
|
||||||
|
|
||||||
|
# --- ConfigStore --------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_config_store_reads_enrichment_row():
|
||||||
|
from central.config_store import ConfigStore
|
||||||
|
|
||||||
|
conn = MagicMock()
|
||||||
|
conn.fetchrow = AsyncMock(return_value={
|
||||||
|
"enricher_class": "GeocoderEnricher",
|
||||||
|
"backend_class": "NaviBackend",
|
||||||
|
"backend_settings": {"base_url": "http://example.test:8440"},
|
||||||
|
"cache_ttl_s": 3600,
|
||||||
|
})
|
||||||
|
store = ConfigStore(_mock_pool(conn))
|
||||||
|
cfg = await store.get_enrichment_config()
|
||||||
|
assert isinstance(cfg, EnrichmentConfig)
|
||||||
|
assert cfg.backend_class == "NaviBackend"
|
||||||
|
assert cfg.backend_settings == {"base_url": "http://example.test:8440"}
|
||||||
|
assert cfg.cache_ttl_s == 3600
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_config_store_falls_back_to_defaults_when_row_absent():
|
||||||
|
from central.config_store import ConfigStore
|
||||||
|
|
||||||
|
conn = MagicMock()
|
||||||
|
conn.fetchrow = AsyncMock(return_value=None)
|
||||||
|
store = ConfigStore(_mock_pool(conn))
|
||||||
|
cfg = await store.get_enrichment_config()
|
||||||
|
assert cfg == EnrichmentConfig() # framework defaults
|
||||||
|
assert cfg.backend_class == "NoOpBackend"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_config_store_upsert_passes_dict_settings():
|
||||||
|
from central.config_store import ConfigStore
|
||||||
|
|
||||||
|
conn = MagicMock()
|
||||||
|
conn.execute = AsyncMock()
|
||||||
|
store = ConfigStore(_mock_pool(conn))
|
||||||
|
cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"})
|
||||||
|
await store.upsert_enrichment_config(cfg)
|
||||||
|
args = conn.execute.call_args.args
|
||||||
|
assert "INSERT INTO config.enrichment" in args[0]
|
||||||
|
# backend_settings passed as a dict (pool codec encodes to jsonb), not a str.
|
||||||
|
assert {"base_url": "x"} in args
|
||||||
|
|
||||||
|
|
||||||
|
# --- EnrichmentCache.invalidate ----------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cache_invalidate_all(tmp_path):
|
||||||
|
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
|
||||||
|
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
|
||||||
|
await cache.set("geocoder", 3.0, 4.0, {"name": "y"})
|
||||||
|
deleted = await cache.invalidate()
|
||||||
|
assert deleted == 2
|
||||||
|
assert await cache.get("geocoder", 1.0, 2.0) is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cache_invalidate_scoped_to_enricher(tmp_path):
|
||||||
|
cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
|
||||||
|
await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
|
||||||
|
await cache.set("other", 1.0, 2.0, {"name": "z"})
|
||||||
|
deleted = await cache.invalidate("geocoder")
|
||||||
|
assert deleted == 1
|
||||||
|
assert await cache.get("geocoder", 1.0, 2.0) is None
|
||||||
|
assert await cache.get("other", 1.0, 2.0) == {"name": "z"}
|
||||||
|
|
||||||
|
|
||||||
|
# --- Supervisor startup read + hot-reload ------------------------------------
|
||||||
|
|
||||||
|
def _supervisor_with(enrichment_cfg: EnrichmentConfig):
|
||||||
|
"""Build a Supervisor with mocked deps and a mocked EnrichmentCache class
|
||||||
|
(so no real /var/lib cache file is touched)."""
|
||||||
|
from central import supervisor as sup_mod
|
||||||
|
|
||||||
|
config_source = MagicMock()
|
||||||
|
config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg)
|
||||||
|
config_store = MagicMock()
|
||||||
|
sup = sup_mod.Supervisor(
|
||||||
|
config_source=config_source,
|
||||||
|
config_store=config_store,
|
||||||
|
nats_url="nats://localhost:4222",
|
||||||
|
)
|
||||||
|
return sup
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_supervisor_builds_navi_from_config():
|
||||||
|
"""Given a config naming NaviBackend, the supervisor's enricher set wraps a
|
||||||
|
NaviBackend — proves the registry resolution end-to-end."""
|
||||||
|
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||||
|
cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
|
||||||
|
sup = _supervisor_with(
|
||||||
|
EnrichmentConfig(backend_class="NaviBackend",
|
||||||
|
backend_settings={"base_url": "http://x:8440", "warmup": False})
|
||||||
|
)
|
||||||
|
cfg = await sup._config_source.get_enrichment_config()
|
||||||
|
sup._rebuild_enrichers(cfg)
|
||||||
|
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_hot_reload_rebuilds_and_invalidates_on_backend_change():
|
||||||
|
from central import supervisor as sup_mod
|
||||||
|
|
||||||
|
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||||
|
invalidate = AsyncMock(return_value=5)
|
||||||
|
cache_cls.return_value = MagicMock(invalidate=invalidate)
|
||||||
|
# Start at NoOp.
|
||||||
|
sup = _supervisor_with(EnrichmentConfig())
|
||||||
|
sup._rebuild_enrichers(EnrichmentConfig())
|
||||||
|
assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
|
||||||
|
# Config flips to Navi.
|
||||||
|
sup._config_source.get_enrichment_config = AsyncMock(
|
||||||
|
return_value=EnrichmentConfig(
|
||||||
|
backend_class="NaviBackend",
|
||||||
|
backend_settings={"base_url": "http://x:8440", "warmup": False},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await sup._handle_enrichment_change()
|
||||||
|
assert isinstance(sup._enrichers[0]._backend, NaviBackend)
|
||||||
|
invalidate.assert_awaited() # backend changed -> cache wiped
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_hot_reload_does_not_invalidate_on_ttl_only_change():
|
||||||
|
with patch("central.supervisor.EnrichmentCache") as cache_cls:
|
||||||
|
invalidate = AsyncMock(return_value=0)
|
||||||
|
cache_cls.return_value = MagicMock(invalidate=invalidate)
|
||||||
|
sup = _supervisor_with(EnrichmentConfig())
|
||||||
|
sup._rebuild_enrichers(EnrichmentConfig())
|
||||||
|
# Same backend, only TTL changes.
|
||||||
|
sup._config_source.get_enrichment_config = AsyncMock(
|
||||||
|
return_value=EnrichmentConfig(cache_ttl_s=3600)
|
||||||
|
)
|
||||||
|
await sup._handle_enrichment_change()
|
||||||
|
invalidate.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
# --- generic json widget + GUI render ----------------------------------------
|
||||||
|
|
||||||
|
def test_describe_fields_renders_dict_as_json_widget():
|
||||||
|
fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})}
|
||||||
|
assert fields["backend_settings"] == "json"
|
||||||
|
assert fields["enricher_class"] == "text"
|
||||||
|
assert fields["cache_ttl_s"] == "number"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_enrichment_form_renders():
|
||||||
|
from central.gui.routes import enrichment_form
|
||||||
|
|
||||||
|
request = MagicMock()
|
||||||
|
request.state.operator = MagicMock(username="op")
|
||||||
|
request.state.csrf_token = "tok"
|
||||||
|
|
||||||
|
conn = MagicMock()
|
||||||
|
conn.fetchrow = AsyncMock(return_value={
|
||||||
|
"enricher_class": "GeocoderEnricher",
|
||||||
|
"backend_class": "NoOpBackend",
|
||||||
|
"backend_settings": {},
|
||||||
|
"cache_ttl_s": 86400,
|
||||||
|
})
|
||||||
|
templates = MagicMock()
|
||||||
|
templates.TemplateResponse.return_value = MagicMock()
|
||||||
|
|
||||||
|
with patch("central.gui.routes._get_templates", return_value=templates), \
|
||||||
|
patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
|
||||||
|
await enrichment_form(request)
|
||||||
|
|
||||||
|
ctx = templates.TemplateResponse.call_args.kwargs["context"]
|
||||||
|
field_widgets = {f.name: f.widget for f in ctx["fields"]}
|
||||||
|
assert field_widgets["backend_settings"] == "json"
|
||||||
|
assert ctx["csrf_token"] == "tok"
|
||||||
|
|
@ -456,6 +456,7 @@ class TestEnrichmentIntegration:
|
||||||
"""A FIRMS event run through the supervisor's enrichment stage emerges
|
"""A FIRMS event run through the supervisor's enrichment stage emerges
|
||||||
with data._enriched.geocoder populated (all-null under NoOpBackend)."""
|
with data._enriched.geocoder populated (all-null under NoOpBackend)."""
|
||||||
from central.config_models import EnrichmentConfig
|
from central.config_models import EnrichmentConfig
|
||||||
|
from central.enrichment.cache import EnrichmentCache
|
||||||
from central.enrichment.geocoder import all_null_bundle
|
from central.enrichment.geocoder import all_null_bundle
|
||||||
from central.supervisor import apply_enrichment, build_enrichers
|
from central.supervisor import apply_enrichment, build_enrichers
|
||||||
|
|
||||||
|
|
@ -469,9 +470,8 @@ class TestEnrichmentIntegration:
|
||||||
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
|
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
|
||||||
assert "_enriched" not in event.data
|
assert "_enriched" not in event.data
|
||||||
|
|
||||||
enrichers = build_enrichers(
|
cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
|
||||||
EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db"
|
enrichers = build_enrichers(EnrichmentConfig(), cache)
|
||||||
)
|
|
||||||
await apply_enrichment(event, adapter.enrichment_locations, enrichers)
|
await apply_enrichment(event, adapter.enrichment_locations, enrichers)
|
||||||
|
|
||||||
assert "_enriched" in event.data
|
assert "_enriched" in event.data
|
||||||
|
|
|
||||||
|
|
@ -110,8 +110,13 @@ async def test_headers_passed_through_config():
|
||||||
reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint",
|
reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint",
|
||||||
)
|
)
|
||||||
async def test_live_navi_boise():
|
async def test_live_navi_boise():
|
||||||
"""Integration smoke against the real endpoint (default skipped)."""
|
"""Integration smoke against the real endpoint (default skipped).
|
||||||
b = NaviBackend(warmup=False) # default base_url
|
|
||||||
|
The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific
|
||||||
|
address lives in source; defaults to localhost when unset.
|
||||||
|
"""
|
||||||
|
base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440")
|
||||||
|
b = NaviBackend(base_url=base_url, warmup=False)
|
||||||
result = await b.reverse(43.6150, -116.2023)
|
result = await b.reverse(43.6150, -116.2023)
|
||||||
assert result["name"] == "Where you are"
|
assert result["name"] == "Where you are"
|
||||||
assert result["city"] == "Boise"
|
assert result["city"] == "Boise"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue