mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
feat(3-K.5): operator-settable EnrichmentConfig (config plumbing)
Bridge PR for v0.5.0. PR J wired the supervisor with a hardcoded EnrichmentConfig() default; PR K added real backends to the registry but left no operator path to select one. K.5 closes that gap by mirroring the config.adapters storage + LISTEN/NOTIFY hot-reload pattern. config.enrichment (migration 024): single-row table (id BOOLEAN PK CHECK (id = true), mirroring config.system). Columns enricher_class, backend_class, backend_settings JSONB, cache_ttl_s, updated_at. Reuses the existing config.set_updated_at + config.notify_config_change triggers (the NOTIFY function's ELSE branch emits 'enrichment:' for this keyless single-row table). Seeds framework DEFAULTS ONLY — GeocoderEnricher + NoOpBackend, empty backend_settings, 24h TTL. NO URLs/IPs/auth in the seed; a fresh deploy runs NoOp out of the box. Idempotent (CREATE IF NOT EXISTS / DROP TRIGGER IF EXISTS / INSERT ON CONFLICT DO NOTHING). Supervisor: - Reads config.enrichment at startup (start() -> config_source .get_enrichment_config()), overriding the constructor default. - Hot-reloads via _on_config_change(table == "enrichment"): re-reads the row, rebuilds the enricher set, and invalidates the enrichment cache when the enricher/backend/settings changed (a new backend must not keep serving the old backend's cached bundles until TTL). TTL-only changes retain the cache. - build_enrichers now takes an explicit EnrichmentCache (the supervisor owns it so it can invalidate); cache no longer built inside build_enrichers. ConfigStore / ConfigSource: get_enrichment_config() (falls back to defaults if the row is somehow absent) + upsert_enrichment_config(). Mirrors the adapter accessors. cache.py: EnrichmentCache.invalidate(enricher_name=None) — DELETE all or enricher-scoped; returns rows deleted. GUI /enrichment: GET renders the EnrichmentConfig form via the generic describe_fields machinery (no enrichment-specific Jinja); POST validates via Pydantic, writes config.enrichment, and lets the NOTIFY trigger propagate the hot-reload. New enrichment.html + a nav link. backend_settings (a dict field) needed a generic "json" widget in describe_fields + the template — usable by any dict-typed settings field, not enrichment-specific. Necessary deviation (surfaced): PR K shipped a deployment-specific default DEFAULT_BASE_URL = "http://192.168.1.130:8440" in navi.py. Bar (b) forbids deployer IPs in src, and operator-settable base_url is exactly K.5's purpose, so the default is changed to http://localhost:8440 (matching Photon/Nominatim defaults). The live integration smoke (tests/, env-gated, skipped) now reads the endpoint from NAVI_BASE_URL — no IP anywhere in src. Tests (test_enrichment_config_plumbing.py, 10): ConfigStore read / default fallback / upsert-passes-dict; cache invalidate all + scoped; supervisor builds NaviBackend from config; hot-reload rebuilds + invalidates on backend change; no-invalidate on TTL-only change; describe_fields json widget; /enrichment GET render. test_firms updated for the build_enrichers signature change. Hot-reload mechanism mirrored: Postgres LISTEN/NOTIFY on channel 'config_changed' (payload 'table:key'), same path adapters/streams use; the supervisor's existing _on_config_change dispatch gains an "enrichment" branch. Verification: full pytest 535 passed, 1 skipped (was 525; +10). Migration applied cleanly on the live prod schema; SELECT * FROM config.enrichment returns the NoOp default row. grep subject_for_event/_ADAPTER_REGISTRY and grep 100.64.0./192.168.1. in src both empty. Does NOT activate NaviBackend (ships NoOp default; operator action) and does NOT declare enrichment_locations on other adapters (PR L scope). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
54238093a5
commit
04c1d07b3f
13 changed files with 604 additions and 15 deletions
|
|
@ -8,7 +8,7 @@ import logging
|
|||
from collections.abc import Awaitable, Callable
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from central.config_models import AdapterConfig
|
||||
from central.config_models import AdapterConfig, EnrichmentConfig
|
||||
from central.config_store import ConfigStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -26,6 +26,10 @@ class ConfigSource(Protocol):
|
|||
"""Get configuration for a specific adapter."""
|
||||
...
|
||||
|
||||
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||
"""Get the enrichment configuration."""
|
||||
...
|
||||
|
||||
async def watch_for_changes(
|
||||
self,
|
||||
callback: Callable[[str, str], Awaitable[None] | None],
|
||||
|
|
@ -65,6 +69,10 @@ class DbConfigSource:
|
|||
"""Get a specific adapter from database."""
|
||||
return await self._store.get_adapter(name)
|
||||
|
||||
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||
"""Get the enrichment configuration from database."""
|
||||
return await self._store.get_enrichment_config()
|
||||
|
||||
async def watch_for_changes(
|
||||
self,
|
||||
callback: Callable[[str, str], Awaitable[None] | None],
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ from typing import Any
|
|||
|
||||
import asyncpg
|
||||
|
||||
from central.config_models import AdapterConfig, StreamConfig
|
||||
from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig
|
||||
from central.crypto import decrypt, encrypt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -129,6 +129,48 @@ class ConfigStore:
|
|||
name,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Enrichment configuration (single-row config.enrichment)
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def get_enrichment_config(self) -> EnrichmentConfig:
|
||||
"""Read the single config.enrichment row.
|
||||
|
||||
Falls back to EnrichmentConfig() framework defaults if the row is
|
||||
somehow absent (it is migration-seeded, so this is belt-and-suspenders).
|
||||
"""
|
||||
async with self._pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
|
||||
FROM config.enrichment
|
||||
WHERE id = true
|
||||
"""
|
||||
)
|
||||
if row is None:
|
||||
return EnrichmentConfig()
|
||||
return EnrichmentConfig(**dict(row))
|
||||
|
||||
async def upsert_enrichment_config(self, config: EnrichmentConfig) -> None:
|
||||
"""Write the single config.enrichment row (id = true)."""
|
||||
async with self._pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO config.enrichment
|
||||
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
|
||||
VALUES (true, $1, $2, $3, $4)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
enricher_class = EXCLUDED.enricher_class,
|
||||
backend_class = EXCLUDED.backend_class,
|
||||
backend_settings = EXCLUDED.backend_settings,
|
||||
cache_ttl_s = EXCLUDED.cache_ttl_s
|
||||
""",
|
||||
config.enricher_class,
|
||||
config.backend_class,
|
||||
config.backend_settings, # JSON-encoded by the codec
|
||||
config.cache_ttl_s,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Stream configuration
|
||||
# -------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -21,7 +21,10 @@ from central.enrichment.geocoder import GEOCODER_FIELDS, all_null_bundle
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_BASE_URL = "http://192.168.1.130:8440"
|
||||
# Generic default — operators point this at their Navi instance via the
|
||||
# /enrichment config page (backend_settings.base_url). No deployment-specific
|
||||
# host belongs in source.
|
||||
DEFAULT_BASE_URL = "http://localhost:8440"
|
||||
# Boise — warmup coordinate, amortizes Photon/DEM cold-connection cost at startup.
|
||||
_WARMUP_LAT = 43.6150
|
||||
_WARMUP_LON = -116.2023
|
||||
|
|
|
|||
|
|
@ -124,3 +124,27 @@ class EnrichmentCache:
|
|||
) -> None:
|
||||
"""Cache a bundle (idempotent upsert on the rounded-coords key)."""
|
||||
await asyncio.to_thread(self._set_sync, enricher_name, lat, lon, payload)
|
||||
|
||||
def _invalidate_sync(self, enricher_name: str | None) -> int:
|
||||
conn = self._connect()
|
||||
try:
|
||||
if enricher_name is None:
|
||||
cur = conn.execute("DELETE FROM enrichment_cache")
|
||||
else:
|
||||
cur = conn.execute(
|
||||
"DELETE FROM enrichment_cache WHERE enricher_name = ?",
|
||||
(enricher_name,),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
async def invalidate(self, enricher_name: str | None = None) -> int:
|
||||
"""Drop cached bundles. Scoped to one enricher when given, else all.
|
||||
|
||||
Called when enrichment config changes — a new backend would otherwise
|
||||
keep returning the previous backend's cached results until TTL expiry.
|
||||
Returns the number of rows deleted.
|
||||
"""
|
||||
return await asyncio.to_thread(self._invalidate_sync, enricher_name)
|
||||
|
|
|
|||
|
|
@ -88,6 +88,11 @@ def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str,
|
|||
f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]"
|
||||
)
|
||||
|
||||
# dict -> json textarea (generic; e.g. EnrichmentConfig.backend_settings).
|
||||
# The form renders the value as JSON; the POST handler parses it back.
|
||||
if field_type is dict or origin is dict:
|
||||
return "json", None
|
||||
|
||||
# Check if it's a BaseModel subclass (nested model other than RegionConfig)
|
||||
if isinstance(field_type, type) and issubclass(field_type, BaseModel):
|
||||
raise NotImplementedError(
|
||||
|
|
|
|||
|
|
@ -1990,6 +1990,142 @@ async def streams_update(
|
|||
return RedirectResponse(url="/streams", status_code=302)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Enrichment config route
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _enrichment_fields(current: dict) -> list[FieldDescriptor]:
|
||||
"""Field descriptors for the single-row EnrichmentConfig form (generic
|
||||
machinery — same describe_fields used by adapter pages)."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
|
||||
return describe_fields(EnrichmentConfig, current)
|
||||
|
||||
|
||||
async def _read_enrichment_row(conn) -> dict:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT enricher_class, backend_class, backend_settings, cache_ttl_s
|
||||
FROM config.enrichment WHERE id = true
|
||||
"""
|
||||
)
|
||||
return dict(row) if row is not None else {}
|
||||
|
||||
|
||||
@router.get("/enrichment", response_class=HTMLResponse)
|
||||
async def enrichment_form(request: Request) -> HTMLResponse:
|
||||
"""Render the enrichment config form."""
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
operator = request.state.operator
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
current = await _read_enrichment_row(conn)
|
||||
|
||||
response = templates.TemplateResponse(
|
||||
request=request,
|
||||
name="enrichment.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": request.state.csrf_token,
|
||||
"fields": _enrichment_fields(current),
|
||||
"errors": None,
|
||||
"form_data": None,
|
||||
},
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
@router.post("/enrichment")
|
||||
async def enrichment_update(request: Request) -> Response:
|
||||
"""Validate + persist the enrichment config. Hot-reload picks it up via
|
||||
the config.enrichment NOTIFY trigger."""
|
||||
from central.config_models import EnrichmentConfig
|
||||
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
operator = request.state.operator
|
||||
|
||||
form = await request.form()
|
||||
if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
|
||||
raise CsrfValidationError("Invalid CSRF token")
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
form_data: dict[str, Any] = {}
|
||||
parsed: dict[str, Any] = {}
|
||||
|
||||
for field in _enrichment_fields({}):
|
||||
raw = form.get(field.name, "")
|
||||
form_data[field.name] = raw
|
||||
if field.widget == "number":
|
||||
try:
|
||||
parsed[field.name] = int(raw) if raw else None
|
||||
except ValueError:
|
||||
errors[field.name] = f"{field.label} must be a number"
|
||||
elif field.widget == "json":
|
||||
if not raw or not raw.strip():
|
||||
parsed[field.name] = {}
|
||||
else:
|
||||
try:
|
||||
loaded = json.loads(raw)
|
||||
if not isinstance(loaded, dict):
|
||||
errors[field.name] = f"{field.label} must be a JSON object"
|
||||
else:
|
||||
parsed[field.name] = loaded
|
||||
except json.JSONDecodeError as e:
|
||||
errors[field.name] = f"{field.label} is not valid JSON: {e}"
|
||||
else: # text
|
||||
parsed[field.name] = raw.strip() if raw else None
|
||||
|
||||
if not errors:
|
||||
try:
|
||||
validated = EnrichmentConfig(
|
||||
**{k: v for k, v in parsed.items() if v is not None}
|
||||
)
|
||||
except ValidationError as e:
|
||||
for err in e.errors():
|
||||
loc = err["loc"][0] if err["loc"] else "unknown"
|
||||
errors[str(loc)] = err["msg"]
|
||||
|
||||
if errors:
|
||||
async with pool.acquire() as conn:
|
||||
current = await _read_enrichment_row(conn)
|
||||
response = templates.TemplateResponse(
|
||||
request=request,
|
||||
name="enrichment.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": request.state.csrf_token,
|
||||
"fields": _enrichment_fields(current),
|
||||
"errors": errors,
|
||||
"form_data": form_data,
|
||||
},
|
||||
status_code=200,
|
||||
)
|
||||
return response
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO config.enrichment
|
||||
(id, enricher_class, backend_class, backend_settings, cache_ttl_s)
|
||||
VALUES (true, $1, $2, $3, $4)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
enricher_class = EXCLUDED.enricher_class,
|
||||
backend_class = EXCLUDED.backend_class,
|
||||
backend_settings = EXCLUDED.backend_settings,
|
||||
cache_ttl_s = EXCLUDED.cache_ttl_s
|
||||
""",
|
||||
validated.enricher_class,
|
||||
validated.backend_class,
|
||||
validated.backend_settings, # encoded as jsonb by the pool codec
|
||||
validated.cache_ttl_s,
|
||||
)
|
||||
|
||||
return RedirectResponse(url="/enrichment", status_code=302)
|
||||
|
||||
|
||||
# Alias validation regex
|
||||
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@
|
|||
<li><a href="/adapters">Adapters</a></li>
|
||||
<li><a href="/events">Events</a></li>
|
||||
<li><a href="/streams">Streams</a></li>
|
||||
<li><a href="/enrichment">Enrichment</a></li>
|
||||
<li><a href="/api-keys">API Keys</a></li>
|
||||
<li>{{ operator.username }}</li>
|
||||
<li><a href="/change-password">Change Password</a></li>
|
||||
|
|
|
|||
54
src/central/gui/templates/enrichment.html
Normal file
54
src/central/gui/templates/enrichment.html
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Central — Enrichment{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Enrichment</h1>
|
||||
<p class="secondary">
|
||||
Central-side event enrichment. Results are attached to each event under
|
||||
<code>data._enriched.<enricher></code>. Changes hot-reload into the
|
||||
supervisor; switching backend invalidates the enrichment cache.
|
||||
</p>
|
||||
|
||||
<form method="post" action="/enrichment">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
|
||||
|
||||
<fieldset>
|
||||
<legend>Configuration</legend>
|
||||
|
||||
{% for field in fields %}
|
||||
{% if field.widget == "text" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<input type="text" id="{{ field.name }}" name="{{ field.name }}"
|
||||
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||
{% if field.required %}required{% endif %}>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
|
||||
{% elif field.widget == "number" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<input type="number" id="{{ field.name }}" name="{{ field.name }}"
|
||||
value="{{ form_data[field.name] if form_data and field.name in form_data else field.current_value or '' }}"
|
||||
{% if field.required %}required{% endif %}>
|
||||
{% if field.description %}<small>{{ field.description }}</small>{% endif %}
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
|
||||
{% elif field.widget == "json" %}
|
||||
<label for="{{ field.name }}">{{ field.label }}</label>
|
||||
<textarea id="{{ field.name }}" name="{{ field.name }}" rows="6"
|
||||
placeholder="{}">{{ form_data[field.name] if form_data and field.name in form_data else (field.current_value | tojson if field.current_value else '{}') }}</textarea>
|
||||
<small>JSON object{% if field.description %} — {{ field.description }}{% endif %}</small>
|
||||
{% if errors and errors[field.name] %}
|
||||
<small style="color: var(--pico-color-red-500);">{{ errors[field.name] }}</small>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</fieldset>
|
||||
|
||||
<button type="submit">Save Changes</button>
|
||||
</form>
|
||||
{% endblock %}
|
||||
|
|
@ -47,16 +47,16 @@ _BACKEND_REGISTRY: dict[str, type] = {
|
|||
|
||||
def build_enrichers(
|
||||
enrichment_config: EnrichmentConfig,
|
||||
cache_db_path: Path = ENRICHMENT_CACHE_DB_PATH,
|
||||
cache: EnrichmentCache,
|
||||
) -> list[Enricher]:
|
||||
"""Instantiate the configured enricher(s) with their backend + cache.
|
||||
"""Instantiate the configured enricher(s) with their backend + the given cache.
|
||||
|
||||
Read once at supervisor startup — enrichment config is NOT hot-reloaded
|
||||
in PR J (see EnrichmentConfig docstring).
|
||||
The supervisor owns the cache (so it can invalidate it on a config change)
|
||||
and passes it in. Enrichment config is read from config.enrichment at
|
||||
startup and hot-reloaded via LISTEN/NOTIFY (see Supervisor._on_config_change).
|
||||
"""
|
||||
backend_cls = _BACKEND_REGISTRY[enrichment_config.backend_class]
|
||||
backend = backend_cls(**enrichment_config.backend_settings)
|
||||
cache = EnrichmentCache(cache_db_path, ttl_s=enrichment_config.cache_ttl_s)
|
||||
enricher_cls = _ENRICHER_REGISTRY[enrichment_config.enricher_class]
|
||||
return [enricher_cls(backend, cache=cache)]
|
||||
|
||||
|
|
@ -165,9 +165,15 @@ class Supervisor:
|
|||
self._config_store = config_store
|
||||
self._nats_url = nats_url
|
||||
self._cloudevents_config = cloudevents_config
|
||||
# Enrichment is read once at startup (no hot-reload in PR J).
|
||||
# Enrichment: a valid default set is built now so the supervisor is
|
||||
# never in a half-state; start() then overrides it from the live
|
||||
# config.enrichment row, and _on_config_change hot-reloads it.
|
||||
self._active_enrichment_config = enrichment_config or EnrichmentConfig()
|
||||
self._enrichment_cache = EnrichmentCache(
|
||||
ENRICHMENT_CACHE_DB_PATH, ttl_s=self._active_enrichment_config.cache_ttl_s
|
||||
)
|
||||
self._enrichers: list[Enricher] = build_enrichers(
|
||||
enrichment_config or EnrichmentConfig()
|
||||
self._active_enrichment_config, self._enrichment_cache
|
||||
)
|
||||
self._adapters = discover_adapters()
|
||||
self._nc: nats.NATS | None = None
|
||||
|
|
@ -673,11 +679,52 @@ class Supervisor:
|
|||
extra={"stream": stream_config.name, "error": str(e)},
|
||||
)
|
||||
|
||||
def _rebuild_enrichers(self, config: EnrichmentConfig) -> None:
|
||||
"""Rebuild the active enricher set + cache from an EnrichmentConfig."""
|
||||
self._active_enrichment_config = config
|
||||
self._enrichment_cache = EnrichmentCache(
|
||||
ENRICHMENT_CACHE_DB_PATH, ttl_s=config.cache_ttl_s
|
||||
)
|
||||
self._enrichers = build_enrichers(config, self._enrichment_cache)
|
||||
|
||||
async def _handle_enrichment_change(self) -> None:
|
||||
"""Re-read config.enrichment and rebuild enrichers. Invalidate the cache
|
||||
when the backend changed, so stale results from the previous backend
|
||||
don't survive until TTL expiry."""
|
||||
new_config = await self._config_source.get_enrichment_config()
|
||||
old_config = self._active_enrichment_config
|
||||
backend_changed = (
|
||||
new_config.backend_class != old_config.backend_class
|
||||
or new_config.backend_settings != old_config.backend_settings
|
||||
or new_config.enricher_class != old_config.enricher_class
|
||||
)
|
||||
self._rebuild_enrichers(new_config)
|
||||
if backend_changed:
|
||||
deleted = await self._enrichment_cache.invalidate()
|
||||
logger.info(
|
||||
"Enrichment backend changed; cache invalidated",
|
||||
extra={
|
||||
"enricher_class": new_config.enricher_class,
|
||||
"backend_class": new_config.backend_class,
|
||||
"rows_deleted": deleted,
|
||||
},
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Enrichment config changed (cache retained)",
|
||||
extra={"cache_ttl_s": new_config.cache_ttl_s},
|
||||
)
|
||||
|
||||
async def _on_config_change(self, table: str, key: str) -> None:
|
||||
"""Handle a configuration change notification.
|
||||
|
||||
Called when NOTIFY fires for config changes.
|
||||
"""
|
||||
# Handle enrichment config changes (single-row config.enrichment)
|
||||
if table == "enrichment":
|
||||
await self._handle_enrichment_change()
|
||||
return
|
||||
|
||||
# Handle stream changes
|
||||
if table == "streams":
|
||||
stream_name = key
|
||||
|
|
@ -769,6 +816,18 @@ class Supervisor:
|
|||
# Ensure streams exist with correct configuration
|
||||
await self._ensure_streams()
|
||||
|
||||
# Load the operator-set enrichment config (overrides the constructor
|
||||
# default); hot-reloaded thereafter via _on_config_change.
|
||||
enrichment_config = await self._config_source.get_enrichment_config()
|
||||
self._rebuild_enrichers(enrichment_config)
|
||||
logger.info(
|
||||
"Enrichment configured",
|
||||
extra={
|
||||
"enricher_class": enrichment_config.enricher_class,
|
||||
"backend_class": enrichment_config.backend_class,
|
||||
},
|
||||
)
|
||||
|
||||
# Load and start enabled adapters
|
||||
enabled_adapters = await self._config_source.list_enabled_adapters()
|
||||
for config in enabled_adapters:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue