diff --git a/sql/migrations/024_add_config_enrichment.sql b/sql/migrations/024_add_config_enrichment.sql
new file mode 100644
index 0000000..236c1a2
--- /dev/null
+++ b/sql/migrations/024_add_config_enrichment.sql
@@ -0,0 +1,44 @@
+-- Migration: 024_add_config_enrichment
+-- Adds config.enrichment — the single-row, operator-settable enrichment config
+-- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY.
+--
+-- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)).
+-- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty
+-- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs,
+-- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page
+-- after this merges.
+--
+-- The seed mirrors central.config_models.EnrichmentConfig() defaults.
+-- Regenerate via:
+-- sudo -u central .venv/bin/python -c \
+-- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())"
+--
+-- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ...
+-- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER).
+
+CREATE TABLE IF NOT EXISTS config.enrichment (
+ id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true),
+ enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher',
+ backend_class TEXT NOT NULL DEFAULT 'NoOpBackend',
+ backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb,
+ cache_ttl_s INTEGER NOT NULL DEFAULT 86400,
+ updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
+);
+
+-- Reuse the existing updated_at trigger function (migration 002).
+DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment;
+CREATE TRIGGER enrichment_set_updated_at
+ BEFORE UPDATE ON config.enrichment
+ FOR EACH ROW
+ EXECUTE FUNCTION config.set_updated_at();
+
+-- Reuse the existing NOTIFY function (migration 001) so the supervisor's
+-- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE
+-- branch emits 'enrichment:' (empty key — single-row table has no natural key).
+DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment;
+CREATE TRIGGER enrichment_notify
+ AFTER INSERT OR UPDATE OR DELETE ON config.enrichment
+ FOR EACH ROW EXECUTE FUNCTION config.notify_config_change();
+
+-- Seed the single framework-default row (NoOp; no deployment-specific values).
+INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING;
diff --git a/src/central/config_source.py b/src/central/config_source.py
index b70ddf1..04331f3 100644
--- a/src/central/config_source.py
+++ b/src/central/config_source.py
@@ -8,7 +8,7 @@ import logging
from collections.abc import Awaitable, Callable
from typing import Protocol, runtime_checkable
-from central.config_models import AdapterConfig
+from central.config_models import AdapterConfig, EnrichmentConfig
from central.config_store import ConfigStore
logger = logging.getLogger(__name__)
@@ -26,6 +26,10 @@ class ConfigSource(Protocol):
"""Get configuration for a specific adapter."""
...
+ async def get_enrichment_config(self) -> EnrichmentConfig:
+ """Get the enrichment configuration."""
+ ...
+
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
@@ -65,6 +69,10 @@ class DbConfigSource:
"""Get a specific adapter from database."""
return await self._store.get_adapter(name)
+ async def get_enrichment_config(self) -> EnrichmentConfig:
+ """Get the enrichment configuration from database."""
+ return await self._store.get_enrichment_config()
+
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
diff --git a/src/central/config_store.py b/src/central/config_store.py
index 826f899..0d51658 100644
--- a/src/central/config_store.py
+++ b/src/central/config_store.py
@@ -12,7 +12,7 @@ from typing import Any
import asyncpg
-from central.config_models import AdapterConfig, StreamConfig
+from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig
from central.crypto import decrypt, encrypt
logger = logging.getLogger(__name__)
@@ -129,6 +129,48 @@ class ConfigStore:
name,
)
+ # -------------------------------------------------------------------------
+ # Enrichment configuration (single-row config.enrichment)
+ # -------------------------------------------------------------------------
+
+ async def get_enrichment_config(self) -> EnrichmentConfig:
+ """Read the single config.enrichment row.
+
+ Falls back to EnrichmentConfig() framework defaults if the row is
+ somehow absent (it is migration-seeded, so this is belt-and-suspenders).
+ """
+ async with self._pool.acquire() as conn:
+ row = await conn.fetchrow(
+ """
+ SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
+ FROM config.enrichment
+ WHERE id = true
+ """
+ )
+ if row is None:
+ return EnrichmentConfig()
+ return EnrichmentConfig(**dict(row))
+
+ async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None:
+ """Write the single config.enrichment row (id = true)."""
+ async with self._pool.acquire() as conn:
+ await conn.execute(
+ """
+ INSERT INTO config.enrichment
+ (id, enricher_class, backend_class, backend_settings, cache_ttl_s)
+ VALUES (true, $1, $2, $3, $4)
+ ON CONFLICT (id) DO UPDATE SET
+ enricher_class = EXCLUDED.enricher_class,
+ backend_class = EXCLUDED.backend_class,
+ backend_settings = EXCLUDED.backend_settings,
+ cache_ttl_s = EXCLUDED.cache_ttl_s
+ """,
+ config.enricher_class,
+ config.backend_class,
+ config.backend_settings, # JSON-encoded by the codec
+ config.cache_ttl_s,
+ )
+
# -------------------------------------------------------------------------
# Stream configuration
# -------------------------------------------------------------------------
diff --git a/src/central/enrichment/backends/navi.py b/src/central/enrichment/backends/navi.py
index 5335388..be409ff 100644
--- a/src/central/enrichment/backends/navi.py
+++ b/src/central/enrichment/backends/navi.py
@@ -21,7 +21,10 @@ from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
logger = logging.getLogger(__name__)
-DEFAULT_BASE_URL = "http://192.168.1.130:8440"
+# Generic default — operators point this at their Navi instance via the
+# /enrichment config page (backend_settings.base_url). No deployment-specific
+# host belongs in source.
+DEFAULT_BASE_URL = "http://localhost:8440"
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
_WARMUP_LAT = 43.6150
_WARMUP_LON = -116.2023
diff --git a/src/central/enrichment/cache.py b/src/central/enrichment/cache.py
index 06be7af..d36691e 100644
--- a/src/central/enrichment/cache.py
+++ b/src/central/enrichment/cache.py
@@ -124,3 +124,27 @@ class EnrichmentCache:
) -> None:
"""Cache a bundle (idempotent upsert on the rounded-coords key)."""
await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload)
+
+ def _invalidate_sync(self, enricher_name: str | None) -> int:
+ conn = self._connect()
+ try:
+ if enricher_name is None:
+ cur = conn.execute("DELETE FROM enrichment_cache")
+ else:
+ cur = conn.execute(
+ "DELETE FROM enrichment_cache WHERE enricher_name = ?",
+ (enricher_name,),
+ )
+ conn.commit()
+ return cur.rowcount
+ finally:
+ conn.close()
+
+ async def invalidate(self, enricher_name: str | None = None) -> int:
+ """Drop cached bundles. Scoped to one enricher when given, else all.
+
+ Called when enrichment config changes — a new backend would otherwise
+ keep returning the previous backend's cached results until TTL expiry.
+ Returns the number of rows deleted.
+ """
+ return await asyncio.to_thread(self._invalidate_sync, enricher_name)
diff --git a/src/central/gui/form_descriptors.py b/src/central/gui/form_descriptors.py
index ef7588e..badfe60 100644
--- a/src/central/gui/form_descriptors.py
+++ b/src/central/gui/form_descriptors.py
@@ -88,6 +88,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
)
+ # dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings).
+ # The form renders the value as JSON; the POST handler parses it back.
+ if field_type is dict or origin is dict:
+ return "json", None
+
# Check if it's a BaseModel subclass (nested model other than RegionConfig)
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
raise NotImplementedError(
diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py
index ef7ba49..0481d10 100644
--- a/src/central/gui/routes.py
+++ b/src/central/gui/routes.py
@@ -1990,6 +1990,142 @@ async def streams_update(
return RedirectResponse(url="/streams", status_code=302)
+# =============================================================================
+# Enrichment config route
+# =============================================================================
+
+
+def _enrichment_fields(current: dict) -> list[FieldDescriptor]:
+ """Field descriptors for the single-row EnrichmentConfig form (generic
+ machinery — same describe_fields used by adapter pages)."""
+ from central.config_models import EnrichmentConfig
+
+ return describe_fields(EnrichmentConfig, current)
+
+
+async def _read_enrichment_row(conn) -> dict:
+ row = await conn.fetchrow(
+ """
+ SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
+ FROM config.enrichment WHERE id = true
+ """
+ )
+ return dict(row) if row is not None else {}
+
+
+@router.get("/enrichment", response_class=HTMLResponse)
+async def enrichment_form(request: Request) -> HTMLResponse:
+ """Render the enrichment config form."""
+ templates = _get_templates()
+ pool = get_pool()
+ operator = request.state.operator
+
+ async with pool.acquire() as conn:
+ current = await _read_enrichment_row(conn)
+
+ response = templates.TemplateResponse(
+ request=request,
+ name="enrichment.html",
+ context={
+ "operator": operator,
+ "csrf_token": request.state.csrf_token,
+ "fields": _enrichment_fields(current),
+ "errors": None,
+ "form_data": None,
+ },
+ )
+ return response
+
+
+@router.post("/enrichment")
+async def enrichment_update(request: Request) -> Response:
+ """Validate + persist the enrichment config. Hot-reload picks it up via
+ the config.enrichment NOTIFY trigger."""
+ from central.config_models import EnrichmentConfig
+
+ templates = _get_templates()
+ pool = get_pool()
+ operator = request.state.operator
+
+ form = await request.form()
+ if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
+ raise CsrfValidationError("Invalid CSRF token")
+
+ errors: dict[str, str] = {}
+ form_data: dict[str, Any] = {}
+ parsed: dict[str, Any] = {}
+
+ for field in _enrichment_fields({}):
+ raw = form.get(field.name, "")
+ form_data[field.name] = raw
+ if field.widget == "number":
+ try:
+ parsed[field.name] = int(raw) if raw else None
+ except ValueError:
+ errors[field.name] = f"{field.label} must be a number"
+ elif field.widget == "json":
+ if not raw or not raw.strip():
+ parsed[field.name] = {}
+ else:
+ try:
+ loaded = json.loads(raw)
+ if not isinstance(loaded, dict):
+ errors[field.name] = f"{field.label} must be a JSON object"
+ else:
+ parsed[field.name] = loaded
+ except json.JSONDecodeError as e:
+ errors[field.name] = f"{field.label} is not valid JSON: {e}"
+ else: # text
+ parsed[field.name] = raw.strip() if raw else None
+
+ if not errors:
+ try:
+ validated = EnrichmentConfig(
+ **{k: v for k, v in parsed.items() if v is not None}
+ )
+ except ValidationError as e:
+ for err in e.errors():
+ loc = err["loc"][0] if err["loc"] else "unknown"
+ errors[str(loc)] = err["msg"]
+
+ if errors:
+ async with pool.acquire() as conn:
+ current = await _read_enrichment_row(conn)
+ response = templates.TemplateResponse(
+ request=request,
+ name="enrichment.html",
+ context={
+ "operator": operator,
+ "csrf_token": request.state.csrf_token,
+ "fields": _enrichment_fields(current),
+ "errors": errors,
+ "form_data": form_data,
+ },
+ status_code=200,
+ )
+ return response
+
+ async with pool.acquire() as conn:
+ await conn.execute(
+ """
+ INSERT INTO config.enrichment
+ (id, enricher_class, backend_class, backend_settings, cache_ttl_s)
+ VALUES (true, $1, $2, $3, $4)
+ ON CONFLICT (id) DO UPDATE SET
+ enricher_class = EXCLUDED.enricher_class,
+ backend_class = EXCLUDED.backend_class,
+ backend_settings = EXCLUDED.backend_settings,
+ cache_ttl_s = EXCLUDED.cache_ttl_s
+ """,
+ validated.enricher_class,
+ validated.backend_class,
+ validated.backend_settings, # encoded as jsonb by the pool codec
+ validated.cache_ttl_s,
+ )
+
+ return RedirectResponse(url="/enrichment", status_code=302)
+
+
# Alias validation regex
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')
diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html
index a7a667d..be7dbca 100644
--- a/src/central/gui/templates/base.html
+++ b/src/central/gui/templates/base.html
@@ -19,6 +19,7 @@
Adapters
Events
Streams
+ Enrichment
API Keys
{{ operator.username }}
Change Password
diff --git a/src/central/gui/templates/enrichment.html b/src/central/gui/templates/enrichment.html
new file mode 100644
index 0000000..4afb58e
--- /dev/null
+++ b/src/central/gui/templates/enrichment.html
@@ -0,0 +1,54 @@
+{% extends "base.html" %}
+
+{% block title %}Central — Enrichment{% endblock %}
+
+{% block content %}
+Enrichment
+
+ Central-side event enrichment. Results are attached to each event under
+ data._enriched.<enricher>. Changes hot-reload into the
+ supervisor; switching backend invalidates the enrichment cache.
+
+
+
+{% endblock %}
diff --git a/src/central/supervisor.py b/src/central/supervisor.py
index b3ec997..1ef7cdf 100644
--- a/src/central/supervisor.py
+++ b/src/central/supervisor.py
@@ -47,16 +47,16 @@ _BACKEND_REGISTRY: dict[str, type] = {
def build_enrichers(
enrichment_config: EnrichmentConfig,
- cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH,
+ cache: EnrichmentCache,
) -> list[Enricher]:
- """Instantiate the configured enricher(s) with their backend + cache.
+ """Instantiate the configured enricher(s) with their backend + the given cache.
- Read once at supervisor startup — enrichment config is NOT hot-reloaded
- in PR J (see EnrichmentConfig docstring).
+ The supervisor owns the cache (so it can invalidate it on a config change)
+ and passes it in. Enrichment config is read from config.enrichment at
+ startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change).
"""
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
backend = backend_cls(**enrichment_config.backend_settings)
- cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s)
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
return [enricher_cls(backend, cache=cache)]
@@ -165,9 +165,15 @@ class Supervisor:
self._config_store = config_store
self._nats_url = nats_url
self._cloudevents_config = cloudevents_config
- # Enrichment is read once at startup (no hot-reload in PR J).
+ # Enrichment: a valid default set is built now so the supervisor is
+ # never in a half-state; start() then overrides it from the live
+ # config.enrichment row, and _on_config_change hot-reloads it.
+ self._active_enrichment_config = enrichment_config or EnrichmentConfig()
+ self._enrichment_cache = EnrichmentCache(
+ ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s
+ )
self._enrichers: list[Enricher] = build_enrichers(
- enrichment_config or EnrichmentConfig()
+ self._active_enrichment_config, self._enrichment_cache
)
self._adapters = discover_adapters()
self._nc: nats.NATS | None = None
@@ -673,11 +679,52 @@ class Supervisor:
extra={"stream": stream_config.name, "error": str(e)},
)
+ def _rebuild_enrichers(self, config: EnrichmentConfig) -> None:
+ """Rebuild the active enricher set + cache from an EnrichmentConfig."""
+ self._active_enrichment_config = config
+ self._enrichment_cache = EnrichmentCache(
+ ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s
+ )
+ self._enrichers = build_enrichers(config, self._enrichment_cache)
+
+ async def _handle_enrichment_change(self) -> None:
+ """Re-read config.enrichment and rebuild enrichers. Invalidate the cache
+ when the backend changed, so stale results from the previous backend
+ don't survive until TTL expiry."""
+ new_config = await self._config_source.get_enrichment_config()
+ old_config = self._active_enrichment_config
+ backend_changed = (
+ new_config.backend_class != old_config.backend_class
+ or new_config.backend_settings != old_config.backend_settings
+ or new_config.enricher_class != old_config.enricher_class
+ )
+ self._rebuild_enrichers(new_config)
+ if backend_changed:
+ deleted = await self._enrichment_cache.invalidate()
+ logger.info(
+ "Enrichment backend changed; cache invalidated",
+ extra={
+ "enricher_class": new_config.enricher_class,
+ "backend_class": new_config.backend_class,
+ "rows_deleted": deleted,
+ },
+ )
+ else:
+ logger.info(
+ "Enrichment config changed (cache retained)",
+ extra={"cache_ttl_s": new_config.cache_ttl_s},
+ )
+
async def _on_config_change(self, table: str, key: str) -> None:
"""Handle a configuration change notification.
Called when NOTIFY fires for config changes.
"""
+ # Handle enrichment config changes (single-row config.enrichment)
+ if table == "enrichment":
+ await self._handle_enrichment_change()
+ return
+
# Handle stream changes
if table == "streams":
stream_name = key
@@ -769,6 +816,18 @@ class Supervisor:
# Ensure streams exist with correct configuration
await self._ensure_streams()
+ # Load the operator-set enrichment config (overrides the constructor
+ # default); hot-reloaded thereafter via _on_config_change.
+ enrichment_config = await self._config_source.get_enrichment_config()
+ self._rebuild_enrichers(enrichment_config)
+ logger.info(
+ "Enrichment configured",
+ extra={
+ "enricher_class": enrichment_config.enricher_class,
+ "backend_class": enrichment_config.backend_class,
+ },
+ )
+
# Load and start enabled adapters
enabled_adapters = await self._config_source.list_enabled_adapters()
for config in enabled_adapters:
diff --git a/tests/test_enrichment_config_plumbing.py b/tests/test_enrichment_config_plumbing.py
new file mode 100644
index 0000000..5f3c0f1
--- /dev/null
+++ b/tests/test_enrichment_config_plumbing.py
@@ -0,0 +1,208 @@
+"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5).
+
+Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload
+rebuild, cache invalidation on backend change (but not on TTL-only change),
+EnrichmentCache.invalidate, the generic json widget for backend_settings, and
+the /enrichment GUI render. No real DB / NATS — pool, config_source, and the
+EnrichmentCache class are mocked.
+"""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from central.config_models import EnrichmentConfig
+from central.enrichment.cache import EnrichmentCache
+from central.enrichment.backends.navi import NaviBackend
+from central.enrichment.backends.no_op import NoOpBackend
+from central.gui.form_descriptors import describe_fields
+
+
+# --- mock pool/conn helpers -------------------------------------------------
+
+def _mock_pool(conn: MagicMock) -> MagicMock:
+ pool = MagicMock()
+ acquire_cm = MagicMock()
+ acquire_cm.__aenter__ = AsyncMock(return_value=conn)
+ acquire_cm.__aexit__ = AsyncMock(return_value=None)
+ pool.acquire = MagicMock(return_value=acquire_cm)
+ return pool
+
+
+# --- ConfigStore --------------------------------------------------------------
+
+@pytest.mark.asyncio
+async def test_config_store_reads_enrichment_row():
+ from central.config_store import ConfigStore
+
+ conn = MagicMock()
+ conn.fetchrow = AsyncMock(return_value={
+ "enricher_class": "GeocoderEnricher",
+ "backend_class": "NaviBackend",
+ "backend_settings": {"base_url": "http://example.test:8440"},
+ "cache_ttl_s": 3600,
+ })
+ store = ConfigStore(_mock_pool(conn))
+ cfg = await store.get_enrichment_config()
+ assert isinstance(cfg, EnrichmentConfig)
+ assert cfg.backend_class == "NaviBackend"
+ assert cfg.backend_settings == {"base_url": "http://example.test:8440"}
+ assert cfg.cache_ttl_s == 3600
+
+
+@pytest.mark.asyncio
+async def test_config_store_falls_back_to_defaults_when_row_absent():
+ from central.config_store import ConfigStore
+
+ conn = MagicMock()
+ conn.fetchrow = AsyncMock(return_value=None)
+ store = ConfigStore(_mock_pool(conn))
+ cfg = await store.get_enrichment_config()
+ assert cfg == EnrichmentConfig() # framework defaults
+ assert cfg.backend_class == "NoOpBackend"
+
+
+@pytest.mark.asyncio
+async def test_config_store_upsert_passes_dict_settings():
+ from central.config_store import ConfigStore
+
+ conn = MagicMock()
+ conn.execute = AsyncMock()
+ store = ConfigStore(_mock_pool(conn))
+ cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"})
+ await store.upsert_enrichment_config(cfg)
+ args = conn.execute.call_args.args
+ assert "INSERT INTO config.enrichment" in args[0]
+ # backend_settings passed as a dict (pool codec encodes to jsonb), not a str.
+ assert {"base_url": "x"} in args
+
+
+# --- EnrichmentCache.invalidate ----------------------------------------------
+
+@pytest.mark.asyncio
+async def test_cache_invalidate_all(tmp_path):
+ cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
+ await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
+ await cache.set("geocoder", 3.0, 4.0, {"name": "y"})
+ deleted = await cache.invalidate()
+ assert deleted == 2
+ assert await cache.get("geocoder", 1.0, 2.0) is None
+
+
+@pytest.mark.asyncio
+async def test_cache_invalidate_scoped_to_enricher(tmp_path):
+ cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600)
+ await cache.set("geocoder", 1.0, 2.0, {"name": "x"})
+ await cache.set("other", 1.0, 2.0, {"name": "z"})
+ deleted = await cache.invalidate("geocoder")
+ assert deleted == 1
+ assert await cache.get("geocoder", 1.0, 2.0) is None
+ assert await cache.get("other", 1.0, 2.0) == {"name": "z"}
+
+
+# --- Supervisor startup read + hot-reload ------------------------------------
+
+def _supervisor_with(enrichment_cfg: EnrichmentConfig):
+ """Build a Supervisor with mocked deps and a mocked EnrichmentCache class
+ (so no real /var/lib cache file is touched)."""
+ from central import supervisor as sup_mod
+
+ config_source = MagicMock()
+ config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg)
+ config_store = MagicMock()
+ sup = sup_mod.Supervisor(
+ config_source=config_source,
+ config_store=config_store,
+ nats_url="nats://localhost:4222",
+ )
+ return sup
+
+
+@pytest.mark.asyncio
+async def test_supervisor_builds_navi_from_config():
+ """Given a config naming NaviBackend, the supervisor's enricher set wraps a
+ NaviBackend — proves the registry resolution end-to-end."""
+ with patch("central.supervisor.EnrichmentCache") as cache_cls:
+ cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0))
+ sup = _supervisor_with(
+ EnrichmentConfig(backend_class="NaviBackend",
+ backend_settings={"base_url": "http://x:8440", "warmup": False})
+ )
+ cfg = await sup._config_source.get_enrichment_config()
+ sup._rebuild_enrichers(cfg)
+ assert isinstance(sup._enrichers[0]._backend, NaviBackend)
+
+
+@pytest.mark.asyncio
+async def test_hot_reload_rebuilds_and_invalidates_on_backend_change():
+ from central import supervisor as sup_mod
+
+ with patch("central.supervisor.EnrichmentCache") as cache_cls:
+ invalidate = AsyncMock(return_value=5)
+ cache_cls.return_value = MagicMock(invalidate=invalidate)
+ # Start at NoOp.
+ sup = _supervisor_with(EnrichmentConfig())
+ sup._rebuild_enrichers(EnrichmentConfig())
+ assert isinstance(sup._enrichers[0]._backend, NoOpBackend)
+ # Config flips to Navi.
+ sup._config_source.get_enrichment_config = AsyncMock(
+ return_value=EnrichmentConfig(
+ backend_class="NaviBackend",
+ backend_settings={"base_url": "http://x:8440", "warmup": False},
+ )
+ )
+ await sup._handle_enrichment_change()
+ assert isinstance(sup._enrichers[0]._backend, NaviBackend)
+ invalidate.assert_awaited() # backend changed -> cache wiped
+
+
+@pytest.mark.asyncio
+async def test_hot_reload_does_not_invalidate_on_ttl_only_change():
+ with patch("central.supervisor.EnrichmentCache") as cache_cls:
+ invalidate = AsyncMock(return_value=0)
+ cache_cls.return_value = MagicMock(invalidate=invalidate)
+ sup = _supervisor_with(EnrichmentConfig())
+ sup._rebuild_enrichers(EnrichmentConfig())
+ # Same backend, only TTL changes.
+ sup._config_source.get_enrichment_config = AsyncMock(
+ return_value=EnrichmentConfig(cache_ttl_s=3600)
+ )
+ await sup._handle_enrichment_change()
+ invalidate.assert_not_awaited()
+
+
+# --- generic json widget + GUI render ----------------------------------------
+
+def test_describe_fields_renders_dict_as_json_widget():
+ fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})}
+ assert fields["backend_settings"] == "json"
+ assert fields["enricher_class"] == "text"
+ assert fields["cache_ttl_s"] == "number"
+
+
+@pytest.mark.asyncio
+async def test_enrichment_form_renders():
+ from central.gui.routes import enrichment_form
+
+ request = MagicMock()
+ request.state.operator = MagicMock(username="op")
+ request.state.csrf_token = "tok"
+
+ conn = MagicMock()
+ conn.fetchrow = AsyncMock(return_value={
+ "enricher_class": "GeocoderEnricher",
+ "backend_class": "NoOpBackend",
+ "backend_settings": {},
+ "cache_ttl_s": 86400,
+ })
+ templates = MagicMock()
+ templates.TemplateResponse.return_value = MagicMock()
+
+ with patch("central.gui.routes._get_templates", return_value=templates), \
+ patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)):
+ await enrichment_form(request)
+
+ ctx = templates.TemplateResponse.call_args.kwargs["context"]
+ field_widgets = {f.name: f.widget for f in ctx["fields"]}
+ assert field_widgets["backend_settings"] == "json"
+ assert ctx["csrf_token"] == "tok"
diff --git a/tests/test_firms.py b/tests/test_firms.py
index 2ab1e1d..6a974e2 100644
--- a/tests/test_firms.py
+++ b/tests/test_firms.py
@@ -456,6 +456,7 @@ class TestEnrichmentIntegration:
"""A FIRMS event run through the supervisor's enrichment stage emerges
with data._enriched.geocoder populated (all-null under NoOpBackend)."""
from central.config_models import EnrichmentConfig
+ from central.enrichment.cache import EnrichmentCache
from central.enrichment.geocoder import all_null_bundle
from central.supervisor import apply_enrichment, build_enrichers
@@ -469,9 +470,8 @@ class TestEnrichmentIntegration:
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
assert "_enriched" not in event.data
- enrichers = build_enrichers(
- EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db"
- )
+ cache = EnrichmentCache(tmp_path / "enrichment_cache.db")
+ enrichers = build_enrichers(EnrichmentConfig(), cache)
await apply_enrichment(event, adapter.enrichment_locations, enrichers)
assert "_enriched" in event.data
diff --git a/tests/test_navi_backend.py b/tests/test_navi_backend.py
index 98870ba..2adf90d 100644
--- a/tests/test_navi_backend.py
+++ b/tests/test_navi_backend.py
@@ -110,8 +110,13 @@ async def test_headers_passed_through_config():
reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint",
)
async def test_live_navi_boise():
- """Integration smoke against the real endpoint (default skipped)."""
- b = NaviBackend(warmup=False) # default base_url
+ """Integration smoke against the real endpoint (default skipped).
+
+ The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific
+ address lives in source; defaults to localhost when unset.
+ """
+ base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440")
+ b = NaviBackend(base_url=base_url, warmup=False)
result = await b.reverse(43.6150, -116.2023)
assert result["name"] == "Where you are"
assert result["city"] == "Boise"