From 04c1d07b3f0f827284934d63bf4beaab4a1ecac5 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Wed, 20 May 2026 18:52:22 +0000 Subject: [PATCH] feat(3-K.5): operator-settable EnrichmentConfig (config plumbing) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bridge PR for v0.5.0. PR J wired the supervisor with a hardcoded EnrichmentConfig() default; PR K added real backends to the registry but left no operator path to select one. K.5 closes that gap by mirroring the config.adapters storage + LISTEN/NOTIFY hot-reload pattern. config.enrichment (migration 024): single-row table (id BOOLEAN PK CHECK (id = true), mirroring config.system). Columns enricher_class, backend_class, backend_settings JSONB, cache_ttl_s, updated_at. Reuses the existing config.set_updated_at + config.notify_config_change triggers (the NOTIFY function's ELSE branch emits 'enrichment:' for this keyless single-row table). Seeds framework DEFAULTS ONLY — GeocoderEnricher + NoOpBackend, empty backend_settings, 24h TTL. NO URLs/IPs/auth in the seed; a fresh deploy runs NoOp out of the box. Idempotent (CREATE IF NOT EXISTS / DROP TRIGGER IF EXISTS / INSERT ON CONFLICT DO NOTHING). Supervisor: - Reads config.enrichment at startup (start() -> config_source .get_enrichment_config()), overriding the constructor default. - Hot-reloads via _on_config_change(table == "enrichment"): re-reads the row, rebuilds the enricher set, and invalidates the enrichment cache when the enricher/backend/settings changed (a new backend must not keep serving the old backend's cached bundles until TTL). TTL-only changes retain the cache. - build_enrichers now takes an explicit EnrichmentCache (the supervisor owns it so it can invalidate); cache no longer built inside build_enrichers. ConfigStore / ConfigSource: get_enrichment_config() (falls back to defaults if the row is somehow absent) + upsert_enrichment_config(). Mirrors the adapter accessors. cache.py: EnrichmentCache.invalidate(enricher_name=None) — DELETE all or enricher-scoped; returns rows deleted. GUI /enrichment: GET renders the EnrichmentConfig form via the generic describe_fields machinery (no enrichment-specific Jinja); POST validates via Pydantic, writes config.enrichment, and lets the NOTIFY trigger propagate the hot-reload. New enrichment.html + a nav link. backend_settings (a dict field) needed a generic "json" widget in describe_fields + the template — usable by any dict-typed settings field, not enrichment-specific. Necessary deviation (surfaced): PR K shipped a deployment-specific default DEFAULT_BASE_URL = "http://192.168.1.130:8440" in navi.py. Bar (b) forbids deployer IPs in src, and operator-settable base_url is exactly K.5's purpose, so the default is changed to http://localhost:8440 (matching Photon/Nominatim defaults). The live integration smoke (tests/, env-gated, skipped) now reads the endpoint from NAVI_BASE_URL — no IP anywhere in src. Tests (test_enrichment_config_plumbing.py, 10): ConfigStore read / default fallback / upsert-passes-dict; cache invalidate all + scoped; supervisor builds NaviBackend from config; hot-reload rebuilds + invalidates on backend change; no-invalidate on TTL-only change; describe_fields json widget; /enrichment GET render. test_firms updated for the build_enrichers signature change. Hot-reload mechanism mirrored: Postgres LISTEN/NOTIFY on channel 'config_changed' (payload 'table:key'), same path adapters/streams use; the supervisor's existing _on_config_change dispatch gains an "enrichment" branch. Verification: full pytest 535 passed, 1 skipped (was 525; +10). Migration applied cleanly on the live prod schema; SELECT * FROM config.enrichment returns the NoOp default row. grep subject_for_event/_ADAPTER_REGISTRY and grep 100.64.0./192.168.1. in src both empty. Does NOT activate NaviBackend (ships NoOp default; operator action) and does NOT declare enrichment_locations on other adapters (PR L scope). Co-Authored-By: Claude Opus 4.7 (1M context) --- sql/migrations/024_add_config_enrichment.sql | 44 ++++ src/central/config_source.py | 10 +- src/central/config_store.py | 44 +++- src/central/enrichment/backends/navi.py | 5 +- src/central/enrichment/cache.py | 24 +++ src/central/gui/form_descriptors.py | 5 + src/central/gui/routes.py | 136 ++++++++++++ src/central/gui/templates/base.html | 1 + src/central/gui/templates/enrichment.html | 54 +++++ src/central/supervisor.py | 73 ++++++- tests/test_enrichment_config_plumbing.py | 208 +++++++++++++++++++ tests/test_firms.py | 6 +- tests/test_navi_backend.py | 9 +- 13 files changed, 604 insertions(+), 15 deletions(-) create mode 100644 sql/migrations/024_add_config_enrichment.sql create mode 100644 src/central/gui/templates/enrichment.html create mode 100644 tests/test_enrichment_config_plumbing.py diff --git a/sql/migrations/024_add_config_enrichment.sql b/sql/migrations/024_add_config_enrichment.sql new file mode 100644 index 0000000..236c1a2 --- /dev/null +++ b/sql/migrations/024_add_config_enrichment.sql @@ -0,0 +1,44 @@ +-- Migration: 024_add_config_enrichment +-- Adds config.enrichment — the single-row, operator-settable enrichment config +-- the supervisor reads at startup and hot-reloads via LISTEN/NOTIFY. +-- +-- Single-row pattern mirrors config.system (id BOOLEAN PK CHECK (id = true)). +-- Seeds framework DEFAULTS ONLY: GeocoderEnricher + NoOpBackend, empty +-- backend_settings, 24h cache TTL. NO deployment-specific values (no URLs, +-- IPs, or auth) — operators set base_url / auth via the /enrichment GUI page +-- after this merges. +-- +-- The seed mirrors central.config_models.EnrichmentConfig() defaults. +-- Regenerate via: +-- sudo -u central .venv/bin/python -c \ +-- "from central.config_models import EnrichmentConfig; print(EnrichmentConfig().model_dump_json())" +-- +-- Idempotent per docs/migrations.md (CREATE TABLE IF NOT EXISTS, INSERT ... +-- ON CONFLICT DO NOTHING, DROP TRIGGER IF EXISTS before CREATE TRIGGER). + +CREATE TABLE IF NOT EXISTS config.enrichment ( + id BOOLEAN PRIMARY KEY DEFAULT true CHECK (id = true), + enricher_class TEXT NOT NULL DEFAULT 'GeocoderEnricher', + backend_class TEXT NOT NULL DEFAULT 'NoOpBackend', + backend_settings JSONB NOT NULL DEFAULT '{}'::jsonb, + cache_ttl_s INTEGER NOT NULL DEFAULT 86400, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Reuse the existing updated_at trigger function (migration 002). +DROP TRIGGER IF EXISTS enrichment_set_updated_at ON config.enrichment; +CREATE TRIGGER enrichment_set_updated_at + BEFORE UPDATE ON config.enrichment + FOR EACH ROW + EXECUTE FUNCTION config.set_updated_at(); + +-- Reuse the existing NOTIFY function (migration 001) so the supervisor's +-- LISTEN/NOTIFY hot-reload picks up enrichment changes. The function's ELSE +-- branch emits 'enrichment:' (empty key — single-row table has no natural key). +DROP TRIGGER IF EXISTS enrichment_notify ON config.enrichment; +CREATE TRIGGER enrichment_notify + AFTER INSERT OR UPDATE OR DELETE ON config.enrichment + FOR EACH ROW EXECUTE FUNCTION config.notify_config_change(); + +-- Seed the single framework-default row (NoOp; no deployment-specific values). +INSERT INTO config.enrichment (id) VALUES (true) ON CONFLICT DO NOTHING; diff --git a/src/central/config_source.py b/src/central/config_source.py index b70ddf1..04331f3 100644 --- a/src/central/config_source.py +++ b/src/central/config_source.py @@ -8,7 +8,7 @@ import logging from collections.abc import Awaitable, Callable from typing import Protocol, runtime_checkable -from central.config_models import AdapterConfig +from central.config_models import AdapterConfig, EnrichmentConfig from central.config_store import ConfigStore logger = logging.getLogger(__name__) @@ -26,6 +26,10 @@ class ConfigSource(Protocol): """Get configuration for a specific adapter.""" ... + async def get_enrichment_config(self) -> EnrichmentConfig: + """Get the enrichment configuration.""" + ... + async def watch_for_changes( self, callback: Callable[[str, str], Awaitable[None] | None], @@ -65,6 +69,10 @@ class DbConfigSource: """Get a specific adapter from database.""" return await self._store.get_adapter(name) + async def get_enrichment_config(self) -> EnrichmentConfig: + """Get the enrichment configuration from database.""" + return await self._store.get_enrichment_config() + async def watch_for_changes( self, callback: Callable[[str, str], Awaitable[None] | None], diff --git a/src/central/config_store.py b/src/central/config_store.py index 826f899..0d51658 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -12,7 +12,7 @@ from typing import Any import asyncpg -from central.config_models import AdapterConfig, StreamConfig +from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig from central.crypto import decrypt, encrypt logger = logging.getLogger(__name__) @@ -129,6 +129,48 @@ class ConfigStore: name, ) + # ------------------------------------------------------------------------- + # Enrichment configuration (single-row config.enrichment) + # ------------------------------------------------------------------------- + + async def get_enrichment_config(self) -> EnrichmentConfig: + """Read the single config.enrichment row. + + Falls back to EnrichmentConfig() framework defaults if the row is + somehow absent (it is migration-seeded, so this is belt-and-suspenders). + """ + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT enricher_class, backend_class, backend_settings, cache_ttl_s + FROM config.enrichment + WHERE id = true + """ + ) + if row is None: + return EnrichmentConfig() + return EnrichmentConfig(**dict(row)) + + async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None: + """Write the single config.enrichment row (id = true).""" + async with self._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO config.enrichment + (id, enricher_class, backend_class, backend_settings, cache_ttl_s) + VALUES (true, $1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE SET + enricher_class = EXCLUDED.enricher_class, + backend_class = EXCLUDED.backend_class, + backend_settings = EXCLUDED.backend_settings, + cache_ttl_s = EXCLUDED.cache_ttl_s + """, + config.enricher_class, + config.backend_class, + config.backend_settings, # JSON-encoded by the codec + config.cache_ttl_s, + ) + # ------------------------------------------------------------------------- # Stream configuration # ------------------------------------------------------------------------- diff --git a/src/central/enrichment/backends/navi.py b/src/central/enrichment/backends/navi.py index 5335388..be409ff 100644 --- a/src/central/enrichment/backends/navi.py +++ b/src/central/enrichment/backends/navi.py @@ -21,7 +21,10 @@ from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle logger = logging.getLogger(__name__) -DEFAULT_BASE_URL = "http://192.168.1.130:8440" +# Generic default — operators point this at their Navi instance via the +# /enrichment config page (backend_settings.base_url). No deployment-specific +# host belongs in source. +DEFAULT_BASE_URL = "http://localhost:8440" # Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup. _WARMUP_LAT = 43.6150 _WARMUP_LON = -116.2023 diff --git a/src/central/enrichment/cache.py b/src/central/enrichment/cache.py index 06be7af..d36691e 100644 --- a/src/central/enrichment/cache.py +++ b/src/central/enrichment/cache.py @@ -124,3 +124,27 @@ class EnrichmentCache: ) -> None: """Cache a bundle (idempotent upsert on the rounded-coords key).""" await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload) + + def _invalidate_sync(self, enricher_name: str | None) -> int: + conn = self._connect() + try: + if enricher_name is None: + cur = conn.execute("DELETE FROM enrichment_cache") + else: + cur = conn.execute( + "DELETE FROM enrichment_cache WHERE enricher_name = ?", + (enricher_name,), + ) + conn.commit() + return cur.rowcount + finally: + conn.close() + + async def invalidate(self, enricher_name: str | None = None) -> int: + """Drop cached bundles. Scoped to one enricher when given, else all. + + Called when enrichment config changes — a new backend would otherwise + keep returning the previous backend's cached results until TTL expiry. + Returns the number of rows deleted. + """ + return await asyncio.to_thread(self._invalidate_sync, enricher_name) diff --git a/src/central/gui/form_descriptors.py b/src/central/gui/form_descriptors.py index ef7588e..badfe60 100644 --- a/src/central/gui/form_descriptors.py +++ b/src/central/gui/form_descriptors.py @@ -88,6 +88,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]" ) + # dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings). + # The form renders the value as JSON; the POST handler parses it back. + if field_type is dict or origin is dict: + return "json", None + # Check if it's a BaseModel subclass (nested model other than RegionConfig) if isinstance(field_type, type) and issubclass(field_type, BaseModel): raise NotImplementedError( diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index ef7ba49..0481d10 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1990,6 +1990,142 @@ async def streams_update( return RedirectResponse(url="/streams", status_code=302) +# ============================================================================= +# Enrichment config route +# ============================================================================= + + +def _enrichment_fields(current: dict) -> list[FieldDescriptor]: + """Field descriptors for the single-row EnrichmentConfig form (generic + machinery — same describe_fields used by adapter pages).""" + from central.config_models import EnrichmentConfig + + return describe_fields(EnrichmentConfig, current) + + +async def _read_enrichment_row(conn) -> dict: + row = await conn.fetchrow( + """ + SELECT enricher_class, backend_class, backend_settings, cache_ttl_s + FROM config.enrichment WHERE id = true + """ + ) + return dict(row) if row is not None else {} + + +@router.get("/enrichment", response_class=HTMLResponse) +async def enrichment_form(request: Request) -> HTMLResponse: + """Render the enrichment config form.""" + templates = _get_templates() + pool = get_pool() + operator = request.state.operator + + async with pool.acquire() as conn: + current = await _read_enrichment_row(conn) + + response = templates.TemplateResponse( + request=request, + name="enrichment.html", + context={ + "operator": operator, + "csrf_token": request.state.csrf_token, + "fields": _enrichment_fields(current), + "errors": None, + "form_data": None, + }, + ) + return response + + +@router.post("/enrichment") +async def enrichment_update(request: Request) -> Response: + """Validate + persist the enrichment config. Hot-reload picks it up via + the config.enrichment NOTIFY trigger.""" + from central.config_models import EnrichmentConfig + + templates = _get_templates() + pool = get_pool() + operator = request.state.operator + + form = await request.form() + if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token: + raise CsrfValidationError("Invalid CSRF token") + + errors: dict[str, str] = {} + form_data: dict[str, Any] = {} + parsed: dict[str, Any] = {} + + for field in _enrichment_fields({}): + raw = form.get(field.name, "") + form_data[field.name] = raw + if field.widget == "number": + try: + parsed[field.name] = int(raw) if raw else None + except ValueError: + errors[field.name] = f"{field.label} must be a number" + elif field.widget == "json": + if not raw or not raw.strip(): + parsed[field.name] = {} + else: + try: + loaded = json.loads(raw) + if not isinstance(loaded, dict): + errors[field.name] = f"{field.label} must be a JSON object" + else: + parsed[field.name] = loaded + except json.JSONDecodeError as e: + errors[field.name] = f"{field.label} is not valid JSON: {e}" + else: # text + parsed[field.name] = raw.strip() if raw else None + + if not errors: + try: + validated = EnrichmentConfig( + **{k: v for k, v in parsed.items() if v is not None} + ) + except ValidationError as e: + for err in e.errors(): + loc = err["loc"][0] if err["loc"] else "unknown" + errors[str(loc)] = err["msg"] + + if errors: + async with pool.acquire() as conn: + current = await _read_enrichment_row(conn) + response = templates.TemplateResponse( + request=request, + name="enrichment.html", + context={ + "operator": operator, + "csrf_token": request.state.csrf_token, + "fields": _enrichment_fields(current), + "errors": errors, + "form_data": form_data, + }, + status_code=200, + ) + return response + + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO config.enrichment + (id, enricher_class, backend_class, backend_settings, cache_ttl_s) + VALUES (true, $1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE SET + enricher_class = EXCLUDED.enricher_class, + backend_class = EXCLUDED.backend_class, + backend_settings = EXCLUDED.backend_settings, + cache_ttl_s = EXCLUDED.cache_ttl_s + """, + validated.enricher_class, + validated.backend_class, + validated.backend_settings, # encoded as jsonb by the pool codec + validated.cache_ttl_s, + ) + + return RedirectResponse(url="/enrichment", status_code=302) + + # Alias validation regex ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$') diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index a7a667d..be7dbca 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -19,6 +19,7 @@
  • Adapters
  • Events
  • Streams
  • +
  • Enrichment
  • API Keys
  • {{ operator.username }}
  • Change Password
  • diff --git a/src/central/gui/templates/enrichment.html b/src/central/gui/templates/enrichment.html new file mode 100644 index 0000000..4afb58e --- /dev/null +++ b/src/central/gui/templates/enrichment.html @@ -0,0 +1,54 @@ +{% extends "base.html" %} + +{% block title %}Central — Enrichment{% endblock %} + +{% block content %} +

    Enrichment

    +

    + Central-side event enrichment. Results are attached to each event under + data._enriched.<enricher>. Changes hot-reload into the + supervisor; switching backend invalidates the enrichment cache. +

    + +
    + + +
    + Configuration + + {% for field in fields %} + {% if field.widget == "text" %} + + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "number" %} + + + {% if field.description %}{{ field.description }}{% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "json" %} + + + JSON object{% if field.description %} — {{ field.description }}{% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + {% endif %} + {% endfor %} +
    + + +
    +{% endblock %} diff --git a/src/central/supervisor.py b/src/central/supervisor.py index b3ec997..1ef7cdf 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -47,16 +47,16 @@ _BACKEND_REGISTRY: dict[str, type] = { def build_enrichers( enrichment_config: EnrichmentConfig, - cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH, + cache: EnrichmentCache, ) -> list[Enricher]: - """Instantiate the configured enricher(s) with their backend + cache. + """Instantiate the configured enricher(s) with their backend + the given cache. - Read once at supervisor startup — enrichment config is NOT hot-reloaded - in PR J (see EnrichmentConfig docstring). + The supervisor owns the cache (so it can invalidate it on a config change) + and passes it in. Enrichment config is read from config.enrichment at + startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change). """ backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class] backend = backend_cls(**enrichment_config.backend_settings) - cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s) enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class] return [enricher_cls(backend, cache=cache)] @@ -165,9 +165,15 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config - # Enrichment is read once at startup (no hot-reload in PR J). + # Enrichment: a valid default set is built now so the supervisor is + # never in a half-state; start() then overrides it from the live + # config.enrichment row, and _on_config_change hot-reloads it. + self._active_enrichment_config = enrichment_config or EnrichmentConfig() + self._enrichment_cache = EnrichmentCache( + ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s + ) self._enrichers: list[Enricher] = build_enrichers( - enrichment_config or EnrichmentConfig() + self._active_enrichment_config, self._enrichment_cache ) self._adapters = discover_adapters() self._nc: nats.NATS | None = None @@ -673,11 +679,52 @@ class Supervisor: extra={"stream": stream_config.name, "error": str(e)}, ) + def _rebuild_enrichers(self, config: EnrichmentConfig) -> None: + """Rebuild the active enricher set + cache from an EnrichmentConfig.""" + self._active_enrichment_config = config + self._enrichment_cache = EnrichmentCache( + ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s + ) + self._enrichers = build_enrichers(config, self._enrichment_cache) + + async def _handle_enrichment_change(self) -> None: + """Re-read config.enrichment and rebuild enrichers. Invalidate the cache + when the backend changed, so stale results from the previous backend + don't survive until TTL expiry.""" + new_config = await self._config_source.get_enrichment_config() + old_config = self._active_enrichment_config + backend_changed = ( + new_config.backend_class != old_config.backend_class + or new_config.backend_settings != old_config.backend_settings + or new_config.enricher_class != old_config.enricher_class + ) + self._rebuild_enrichers(new_config) + if backend_changed: + deleted = await self._enrichment_cache.invalidate() + logger.info( + "Enrichment backend changed; cache invalidated", + extra={ + "enricher_class": new_config.enricher_class, + "backend_class": new_config.backend_class, + "rows_deleted": deleted, + }, + ) + else: + logger.info( + "Enrichment config changed (cache retained)", + extra={"cache_ttl_s": new_config.cache_ttl_s}, + ) + async def _on_config_change(self, table: str, key: str) -> None: """Handle a configuration change notification. Called when NOTIFY fires for config changes. """ + # Handle enrichment config changes (single-row config.enrichment) + if table == "enrichment": + await self._handle_enrichment_change() + return + # Handle stream changes if table == "streams": stream_name = key @@ -769,6 +816,18 @@ class Supervisor: # Ensure streams exist with correct configuration await self._ensure_streams() + # Load the operator-set enrichment config (overrides the constructor + # default); hot-reloaded thereafter via _on_config_change. + enrichment_config = await self._config_source.get_enrichment_config() + self._rebuild_enrichers(enrichment_config) + logger.info( + "Enrichment configured", + extra={ + "enricher_class": enrichment_config.enricher_class, + "backend_class": enrichment_config.backend_class, + }, + ) + # Load and start enabled adapters enabled_adapters = await self._config_source.list_enabled_adapters() for config in enabled_adapters: diff --git a/tests/test_enrichment_config_plumbing.py b/tests/test_enrichment_config_plumbing.py new file mode 100644 index 0000000..5f3c0f1 --- /dev/null +++ b/tests/test_enrichment_config_plumbing.py @@ -0,0 +1,208 @@ +"""Tests for operator-settable EnrichmentConfig plumbing (PR K.5). + +Covers: ConfigStore DB read/upsert, supervisor startup read + hot-reload +rebuild, cache invalidation on backend change (but not on TTL-only change), +EnrichmentCache.invalidate, the generic json widget for backend_settings, and +the /enrichment GUI render. No real DB / NATS — pool, config_source, and the +EnrichmentCache class are mocked. +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.config_models import EnrichmentConfig +from central.enrichment.cache import EnrichmentCache +from central.enrichment.backends.navi import NaviBackend +from central.enrichment.backends.no_op import NoOpBackend +from central.gui.form_descriptors import describe_fields + + +# --- mock pool/conn helpers ------------------------------------------------- + +def _mock_pool(conn: MagicMock) -> MagicMock: + pool = MagicMock() + acquire_cm = MagicMock() + acquire_cm.__aenter__ = AsyncMock(return_value=conn) + acquire_cm.__aexit__ = AsyncMock(return_value=None) + pool.acquire = MagicMock(return_value=acquire_cm) + return pool + + +# --- ConfigStore -------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_config_store_reads_enrichment_row(): + from central.config_store import ConfigStore + + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value={ + "enricher_class": "GeocoderEnricher", + "backend_class": "NaviBackend", + "backend_settings": {"base_url": "http://example.test:8440"}, + "cache_ttl_s": 3600, + }) + store = ConfigStore(_mock_pool(conn)) + cfg = await store.get_enrichment_config() + assert isinstance(cfg, EnrichmentConfig) + assert cfg.backend_class == "NaviBackend" + assert cfg.backend_settings == {"base_url": "http://example.test:8440"} + assert cfg.cache_ttl_s == 3600 + + +@pytest.mark.asyncio +async def test_config_store_falls_back_to_defaults_when_row_absent(): + from central.config_store import ConfigStore + + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value=None) + store = ConfigStore(_mock_pool(conn)) + cfg = await store.get_enrichment_config() + assert cfg == EnrichmentConfig() # framework defaults + assert cfg.backend_class == "NoOpBackend" + + +@pytest.mark.asyncio +async def test_config_store_upsert_passes_dict_settings(): + from central.config_store import ConfigStore + + conn = MagicMock() + conn.execute = AsyncMock() + store = ConfigStore(_mock_pool(conn)) + cfg = EnrichmentConfig(backend_class="NaviBackend", backend_settings={"base_url": "x"}) + await store.upsert_enrichment_config(cfg) + args = conn.execute.call_args.args + assert "INSERT INTO config.enrichment" in args[0] + # backend_settings passed as a dict (pool codec encodes to jsonb), not a str. + assert {"base_url": "x"} in args + + +# --- EnrichmentCache.invalidate ---------------------------------------------- + +@pytest.mark.asyncio +async def test_cache_invalidate_all(tmp_path): + cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600) + await cache.set("geocoder", 1.0, 2.0, {"name": "x"}) + await cache.set("geocoder", 3.0, 4.0, {"name": "y"}) + deleted = await cache.invalidate() + assert deleted == 2 + assert await cache.get("geocoder", 1.0, 2.0) is None + + +@pytest.mark.asyncio +async def test_cache_invalidate_scoped_to_enricher(tmp_path): + cache = EnrichmentCache(tmp_path / "c.db", ttl_s=3600) + await cache.set("geocoder", 1.0, 2.0, {"name": "x"}) + await cache.set("other", 1.0, 2.0, {"name": "z"}) + deleted = await cache.invalidate("geocoder") + assert deleted == 1 + assert await cache.get("geocoder", 1.0, 2.0) is None + assert await cache.get("other", 1.0, 2.0) == {"name": "z"} + + +# --- Supervisor startup read + hot-reload ------------------------------------ + +def _supervisor_with(enrichment_cfg: EnrichmentConfig): + """Build a Supervisor with mocked deps and a mocked EnrichmentCache class + (so no real /var/lib cache file is touched).""" + from central import supervisor as sup_mod + + config_source = MagicMock() + config_source.get_enrichment_config = AsyncMock(return_value=enrichment_cfg) + config_store = MagicMock() + sup = sup_mod.Supervisor( + config_source=config_source, + config_store=config_store, + nats_url="nats://localhost:4222", + ) + return sup + + +@pytest.mark.asyncio +async def test_supervisor_builds_navi_from_config(): + """Given a config naming NaviBackend, the supervisor's enricher set wraps a + NaviBackend — proves the registry resolution end-to-end.""" + with patch("central.supervisor.EnrichmentCache") as cache_cls: + cache_cls.return_value = MagicMock(invalidate=AsyncMock(return_value=0)) + sup = _supervisor_with( + EnrichmentConfig(backend_class="NaviBackend", + backend_settings={"base_url": "http://x:8440", "warmup": False}) + ) + cfg = await sup._config_source.get_enrichment_config() + sup._rebuild_enrichers(cfg) + assert isinstance(sup._enrichers[0]._backend, NaviBackend) + + +@pytest.mark.asyncio +async def test_hot_reload_rebuilds_and_invalidates_on_backend_change(): + from central import supervisor as sup_mod + + with patch("central.supervisor.EnrichmentCache") as cache_cls: + invalidate = AsyncMock(return_value=5) + cache_cls.return_value = MagicMock(invalidate=invalidate) + # Start at NoOp. + sup = _supervisor_with(EnrichmentConfig()) + sup._rebuild_enrichers(EnrichmentConfig()) + assert isinstance(sup._enrichers[0]._backend, NoOpBackend) + # Config flips to Navi. + sup._config_source.get_enrichment_config = AsyncMock( + return_value=EnrichmentConfig( + backend_class="NaviBackend", + backend_settings={"base_url": "http://x:8440", "warmup": False}, + ) + ) + await sup._handle_enrichment_change() + assert isinstance(sup._enrichers[0]._backend, NaviBackend) + invalidate.assert_awaited() # backend changed -> cache wiped + + +@pytest.mark.asyncio +async def test_hot_reload_does_not_invalidate_on_ttl_only_change(): + with patch("central.supervisor.EnrichmentCache") as cache_cls: + invalidate = AsyncMock(return_value=0) + cache_cls.return_value = MagicMock(invalidate=invalidate) + sup = _supervisor_with(EnrichmentConfig()) + sup._rebuild_enrichers(EnrichmentConfig()) + # Same backend, only TTL changes. + sup._config_source.get_enrichment_config = AsyncMock( + return_value=EnrichmentConfig(cache_ttl_s=3600) + ) + await sup._handle_enrichment_change() + invalidate.assert_not_awaited() + + +# --- generic json widget + GUI render ---------------------------------------- + +def test_describe_fields_renders_dict_as_json_widget(): + fields = {f.name: f.widget for f in describe_fields(EnrichmentConfig, {})} + assert fields["backend_settings"] == "json" + assert fields["enricher_class"] == "text" + assert fields["cache_ttl_s"] == "number" + + +@pytest.mark.asyncio +async def test_enrichment_form_renders(): + from central.gui.routes import enrichment_form + + request = MagicMock() + request.state.operator = MagicMock(username="op") + request.state.csrf_token = "tok" + + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value={ + "enricher_class": "GeocoderEnricher", + "backend_class": "NoOpBackend", + "backend_settings": {}, + "cache_ttl_s": 86400, + }) + templates = MagicMock() + templates.TemplateResponse.return_value = MagicMock() + + with patch("central.gui.routes._get_templates", return_value=templates), \ + patch("central.gui.routes.get_pool", return_value=_mock_pool(conn)): + await enrichment_form(request) + + ctx = templates.TemplateResponse.call_args.kwargs["context"] + field_widgets = {f.name: f.widget for f in ctx["fields"]} + assert field_widgets["backend_settings"] == "json" + assert ctx["csrf_token"] == "tok" diff --git a/tests/test_firms.py b/tests/test_firms.py index 2ab1e1d..6a974e2 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -456,6 +456,7 @@ class TestEnrichmentIntegration: """A FIRMS event run through the supervisor's enrichment stage emerges with data._enriched.geocoder populated (all-null under NoOpBackend).""" from central.config_models import EnrichmentConfig + from central.enrichment.cache import EnrichmentCache from central.enrichment.geocoder import all_null_bundle from central.supervisor import apply_enrichment, build_enrichers @@ -469,9 +470,8 @@ class TestEnrichmentIntegration: event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") assert "_enriched" not in event.data - enrichers = build_enrichers( - EnrichmentConfig(), cache_db_path=tmp_path / "enrichment_cache.db" - ) + cache = EnrichmentCache(tmp_path / "enrichment_cache.db") + enrichers = build_enrichers(EnrichmentConfig(), cache) await apply_enrichment(event, adapter.enrichment_locations, enrichers) assert "_enriched" in event.data diff --git a/tests/test_navi_backend.py b/tests/test_navi_backend.py index 98870ba..2adf90d 100644 --- a/tests/test_navi_backend.py +++ b/tests/test_navi_backend.py @@ -110,8 +110,13 @@ async def test_headers_passed_through_config(): reason="set NAVI_INTEGRATION_TEST=1 to hit the live Navi endpoint", ) async def test_live_navi_boise(): - """Integration smoke against the real endpoint (default skipped).""" - b = NaviBackend(warmup=False) # default base_url + """Integration smoke against the real endpoint (default skipped). + + The endpoint host is supplied via NAVI_BASE_URL so no deployment-specific + address lives in source; defaults to localhost when unset. + """ + base_url = os.environ.get("NAVI_BASE_URL", "http://localhost:8440") + b = NaviBackend(base_url=base_url, warmup=False) result = await b.reverse(43.6150, -116.2023) assert result["name"] == "Where you are" assert result["city"] == "Boise"