Merge pull request #3 from zvx-echo6/feature/1a-3-phase-c-toml-retirement

Phase 1a-3 Part C: Retire TOML config source
This commit is contained in:
malice 2026-05-15 22:00:02 -06:00 committed by GitHub
commit 0b23cc4572
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 201 additions and 465 deletions

View file

@ -193,3 +193,66 @@ All upstream alerts found in DB ✓
| 6 | Data integrity | ✅ All alerts in DB |
**Phase B Complete.** System running stable on DB-backed config.
---
## Cadence Revert (Close-out)
**Timestamp:** 2026-05-16T03:54:14Z
### Issue Discovered
During close-out verification, polls were observed at 90s intervals despite
DB showing `cadence_s = 60`. Investigation revealed the live reschedule
from 90→60 (done at 03:23:08 during Phase B) didn't properly update the
in-flight scheduling.
### Resolution
Supervisor restart was required to clear stale state:
```bash
systemctl restart central-supervisor
```
### Post-Restart Verification
**DB State:**
```sql
SELECT name, cadence_s, updated_at FROM config.adapters WHERE name='nws';
```
```
name | cadence_s | updated_at
------+-----------+-------------------------------
nws | 60 | 2026-05-16 03:50:53.210963+00
```
**Poll Intervals After Restart:**
```
03:54:14.621376 - NWS poll completed (first poll after restart)
03:55:15.028963 - NWS poll completed (61s later) ✅
03:56:15.429013 - NWS poll completed (60s later) ✅
```
**Startup Log:**
```json
{"ts": "2026-05-16T03:54:14.318479+00:00", "msg": "Adapter started", "adapter": "nws", "cadence_s": 60}
```
### Bug Note
The cadence DECREASE (90→60) rate-limit test from Phase B showed correct
log output ("Rescheduled adapter" with new_cadence_s=60) but the actual
scheduling didn't update properly. The increase test (60→90) worked
correctly.
**Root cause:** Unknown - requires investigation. The `_reschedule_adapter`
method updates `state.config` and `state.adapter.cadence_s`, and signals
via `cancel_event`, but the scheduling loop may not be re-evaluating
correctly for decreases.
**Mitigation:** After any cadence change, verify actual poll intervals match
expected cadence. If not, restart supervisor.
**Result:** Cadence confirmed at 60s after restart. ✅

View file

@ -308,7 +308,7 @@ async def async_main() -> None:
"Archive starting",
extra={
"nats_url": settings.nats_url,
"config_source": settings.config_source,
},
)

View file

@ -1,61 +1,46 @@
"""Bootstrap configuration from environment variables.
This module provides early-stage configuration loading from environment
variables or a .env file. Used before the database-backed config store
is available.
"""
from functools import lru_cache
from pathlib import Path
from typing import Literal
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Bootstrap settings loaded from environment or .env file."""
model_config = SettingsConfigDict(
env_prefix="CENTRAL_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
db_dsn: str = Field(description="PostgreSQL connection string")
nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL")
master_key_path: Path = Field(
default=Path("/etc/central/master.key"),
description="Path to AES-256 master key file",
)
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(
default="INFO",
description="Logging level",
)
config_source: Literal["toml", "db"] = Field(
default="toml",
description="Configuration source: 'toml' for TOML file, 'db' for database",
)
config_toml_path: Path = Field(
default=Path("/etc/central/central.toml"),
description="Path to TOML config file (when config_source=toml)",
)
@field_validator("config_source")
@classmethod
def validate_config_source(cls, v: str) -> str:
if v not in ("toml", "db"):
raise ValueError(f"config_source must be 'toml' or 'db', got {v!r}")
return v
@lru_cache
def get_settings(env_file: Path | None = None) -> Settings:
"""Load settings, optionally from a specific .env file.
Results are cached. Call get_settings.cache_clear() to reload.
"""
if env_file is not None:
return Settings(_env_file=env_file)
return Settings()
"""Bootstrap configuration from environment variables.
This module provides early-stage configuration loading from environment
variables or a .env file. Used before the database-backed config store
is available.
"""
from functools import lru_cache
from pathlib import Path
from typing import Literal
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Bootstrap settings loaded from environment or .env file."""
model_config = SettingsConfigDict(
env_prefix="CENTRAL_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
db_dsn: str = Field(description="PostgreSQL connection string")
nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL")
master_key_path: Path = Field(
default=Path("/etc/central/master.key"),
description="Path to AES-256 master key file",
)
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(
default="INFO",
description="Logging level",
)
@lru_cache
def get_settings(env_file: Path | None = None) -> Settings:
"""Load settings, optionally from a specific .env file.
Results are cached. Call get_settings.cache_clear() to reload.
"""
if env_file is not None:
return Settings(_env_file=env_file)
return Settings()

View file

@ -1,187 +1,80 @@
"""Configuration source abstraction.
Provides a unified interface for loading adapter configuration from
either TOML files or the database-backed config store.
"""
import logging
from collections.abc import Awaitable, Callable
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
import tomllib
from central.config import NWSAdapterConfig
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
logger = logging.getLogger(__name__)
@runtime_checkable
class ConfigSource(Protocol):
"""Protocol for configuration sources."""
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters."""
...
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get configuration for a specific adapter."""
...
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""Watch for configuration changes.
For TOML source, this is a no-op (returns immediately).
For DB source, this runs forever, calling callback(table, key) on changes.
"""
...
async def close(self) -> None:
"""Clean up resources."""
...
class TomlConfigSource:
"""Configuration source backed by a TOML file.
This is the legacy configuration path. Does not support hot-reload.
"""
def __init__(self, toml_path: Path) -> None:
self._toml_path = toml_path
self._adapters: dict[str, AdapterConfig] = {}
self._loaded = False
def _load(self) -> None:
"""Load configuration from TOML file."""
if self._loaded:
return
with self._toml_path.open("rb") as f:
data = tomllib.load(f)
adapters_raw = data.get("adapters", {})
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
for name, adapter_data in adapters_raw.items():
# Convert TOML adapter config to unified AdapterConfig
# TOML uses NWSAdapterConfig shape, we need to convert to AdapterConfig
enabled = adapter_data.get("enabled", True)
cadence_s = adapter_data.get("cadence_s", 60)
# Extract settings (everything except enabled/cadence_s)
settings = {
k: v
for k, v in adapter_data.items()
if k not in ("enabled", "cadence_s")
}
self._adapters[name] = AdapterConfig(
name=name,
enabled=enabled,
cadence_s=cadence_s,
settings=settings,
paused_at=None,
updated_at=now,
)
self._loaded = True
logger.info(
"Loaded TOML config",
extra={"path": str(self._toml_path), "adapters": list(self._adapters.keys())},
)
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters from TOML."""
self._load()
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get a specific adapter from TOML."""
self._load()
return self._adapters.get(name)
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""TOML does not support hot-reload. Returns immediately."""
logger.debug("TOML config source does not support hot-reload")
return
async def close(self) -> None:
"""No resources to clean up for TOML source."""
pass
class DbConfigSource:
"""Configuration source backed by the Postgres config store.
Supports hot-reload via LISTEN/NOTIFY.
"""
def __init__(self, config_store: ConfigStore) -> None:
self._store = config_store
@classmethod
async def create(cls, dsn: str) -> "DbConfigSource":
"""Create a DbConfigSource with a new ConfigStore."""
store = await ConfigStore.create(dsn)
return cls(store)
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters from database."""
all_adapters = await self._store.list_adapters()
return [a for a in all_adapters if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get a specific adapter from database."""
return await self._store.get_adapter(name)
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""Watch for changes via Postgres LISTEN/NOTIFY.
Runs forever, calling callback(table, key) on each change.
"""
await self._store.listen_for_changes(callback)
async def close(self) -> None:
"""Close the underlying config store."""
await self._store.close()
async def create_config_source(
source_type: str,
dsn: str | None = None,
toml_path: Path | None = None,
) -> ConfigSource:
"""Factory function to create the appropriate config source.
Args:
source_type: "toml" or "db"
dsn: PostgreSQL DSN (required for "db")
toml_path: Path to TOML file (required for "toml")
Returns:
ConfigSource implementation
"""
if source_type == "toml":
if toml_path is None:
raise ValueError("toml_path required for toml config source")
return TomlConfigSource(toml_path)
elif source_type == "db":
if dsn is None:
raise ValueError("dsn required for db config source")
return await DbConfigSource.create(dsn)
else:
raise ValueError(f"Unknown config source type: {source_type}")
"""Configuration source abstraction.
Provides a unified interface for loading adapter configuration from
the database-backed config store.
"""
import logging
from collections.abc import Awaitable, Callable
from typing import Protocol, runtime_checkable
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
logger = logging.getLogger(__name__)
@runtime_checkable
class ConfigSource(Protocol):
"""Protocol for configuration sources."""
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters."""
...
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get configuration for a specific adapter."""
...
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""Watch for configuration changes.
Runs forever, calling callback(table, key) on changes.
"""
...
async def close(self) -> None:
"""Clean up resources."""
...
class DbConfigSource:
"""Configuration source backed by the Postgres config store.
Supports hot-reload via LISTEN/NOTIFY.
"""
def __init__(self, config_store: ConfigStore) -> None:
self._store = config_store
@classmethod
async def create(cls, dsn: str) -> "DbConfigSource":
"""Create a DbConfigSource with a new ConfigStore."""
store = await ConfigStore.create(dsn)
return cls(store)
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters from database."""
all_adapters = await self._store.list_adapters()
return [a for a in all_adapters if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get a specific adapter from database."""
return await self._store.get_adapter(name)
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""Watch for changes via Postgres LISTEN/NOTIFY.
Runs forever, calling callback(table, key) on each change.
"""
await self._store.listen_for_changes(callback)
async def close(self) -> None:
"""Close the underlying config store."""
await self._store.close()

View file

@ -15,9 +15,9 @@ from nats.js import JetStreamContext
from central.adapters.nws import NWSAdapter
from central.cloudevents_wire import wrap_event
from central.config import load_config, Config, NWSAdapterConfig
from central.config import NWSAdapterConfig
from central.config_models import AdapterConfig
from central.config_source import ConfigSource, create_config_source
from central.config_source import ConfigSource, DbConfigSource
from central.bootstrap_config import get_settings
from central.models import subject_for_event
@ -528,7 +528,7 @@ class Supervisor:
for config in enabled_adapters:
await self._start_adapter(config)
# Start config watcher (for DB source, this runs forever; for TOML, returns immediately)
# Start config watcher (runs forever, calling callback on changes)
self._config_watch_task = asyncio.create_task(
self._config_source.watch_for_changes(self._on_config_change)
)
@ -579,37 +579,22 @@ async def async_main() -> None:
settings = get_settings()
logger.info(
"Config source: %s",
settings.config_source,
extra={"config_source": settings.config_source},
"Config source: db",
extra={"config_source": "db"},
)
# Create config source based on setting
config_source = await create_config_source(
source_type=settings.config_source,
dsn=settings.db_dsn,
toml_path=settings.config_toml_path,
)
# CloudEvents config: try TOML first, fall back to code defaults
# (CloudEvents envelope format is protocol-level, not operator-configurable)
cloudevents_config = None
if settings.config_source == "toml":
try:
toml_config = load_config(str(settings.config_toml_path))
cloudevents_config = toml_config
except Exception:
pass # Will use defaults from cloudevents_constants
# Create database config source
config_source = await DbConfigSource.create(settings.db_dsn)
supervisor = Supervisor(
config_source=config_source,
nats_url=settings.nats_url,
cloudevents_config=cloudevents_config,
# CloudEvents uses protocol-level defaults from cloudevents_constants
cloudevents_config=None,
)
logger.info(
"CloudEvents config: %s",
"TOML" if cloudevents_config else "defaults",
extra={"cloudevents_source": "toml" if cloudevents_config else "defaults"},
"CloudEvents config: defaults",
extra={"cloudevents_source": "defaults"},
)
loop = asyncio.get_running_loop()

View file

@ -1,9 +1,7 @@
"""Tests for configuration source abstraction."""
import asyncio
import base64
import os
from datetime import datetime, timezone
from pathlib import Path
import asyncpg
@ -12,11 +10,8 @@ import pytest_asyncio
from central.config_source import (
ConfigSource,
TomlConfigSource,
DbConfigSource,
create_config_source,
)
from central.config_store import ConfigStore
from central.crypto import KEY_SIZE, clear_key_cache
# Test database DSN
@ -43,93 +38,6 @@ def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) ->
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
class TestTomlConfigSource:
"""Tests for TOML-based config source."""
@pytest.fixture
def toml_file(self, tmp_path: Path) -> Path:
"""Create a test TOML config file."""
toml_content = """
[adapters.nws]
enabled = true
cadence_s = 60
states = ["ID", "MT"]
contact_email = "test@example.com"
[adapters.disabled_adapter]
enabled = false
cadence_s = 300
states = []
contact_email = "test@example.com"
[cloudevents]
type_prefix = "central"
source = "central.local"
schema_version = "1.0"
[nats]
url = "nats://localhost:4222"
[postgres]
dsn = "postgresql://user:pass@localhost/db"
"""
path = tmp_path / "central.toml"
path.write_text(toml_content)
return path
@pytest.mark.asyncio
async def test_list_enabled_adapters(self, toml_file: Path) -> None:
"""list_enabled_adapters returns only enabled adapters."""
source = TomlConfigSource(toml_file)
adapters = await source.list_enabled_adapters()
assert len(adapters) == 1
assert adapters[0].name == "nws"
assert adapters[0].enabled is True
assert adapters[0].cadence_s == 60
@pytest.mark.asyncio
async def test_get_adapter(self, toml_file: Path) -> None:
"""get_adapter returns correct adapter config."""
source = TomlConfigSource(toml_file)
adapter = await source.get_adapter("nws")
assert adapter is not None
assert adapter.name == "nws"
assert adapter.settings["states"] == ["ID", "MT"]
assert adapter.settings["contact_email"] == "test@example.com"
@pytest.mark.asyncio
async def test_get_nonexistent_adapter(self, toml_file: Path) -> None:
"""get_adapter returns None for nonexistent adapter."""
source = TomlConfigSource(toml_file)
adapter = await source.get_adapter("does_not_exist")
assert adapter is None
@pytest.mark.asyncio
async def test_watch_for_changes_returns_immediately(self, toml_file: Path) -> None:
"""watch_for_changes is a no-op for TOML source."""
source = TomlConfigSource(toml_file)
callback_called = False
async def callback(table: str, key: str) -> None:
nonlocal callback_called
callback_called = True
# Should return immediately without blocking
await asyncio.wait_for(
source.watch_for_changes(callback),
timeout=1.0,
)
assert not callback_called
@pytest.mark.asyncio
async def test_implements_protocol(self, toml_file: Path) -> None:
"""TomlConfigSource implements ConfigSource protocol."""
source = TomlConfigSource(toml_file)
assert isinstance(source, ConfigSource)
@pytest_asyncio.fixture
async def db_conn() -> asyncpg.Connection:
"""Get a direct database connection for setup/teardown."""
@ -222,64 +130,3 @@ class TestDbConfigSource:
async def test_implements_protocol(self, db_source: DbConfigSource) -> None:
"""DbConfigSource implements ConfigSource protocol."""
assert isinstance(db_source, ConfigSource)
class TestCreateConfigSource:
"""Tests for the config source factory function."""
@pytest.fixture
def toml_file(self, tmp_path: Path) -> Path:
"""Create a minimal TOML config file."""
toml_content = """
[adapters.nws]
enabled = true
cadence_s = 60
states = []
contact_email = "test@example.com"
[cloudevents]
[nats]
[postgres]
dsn = "postgresql://test@localhost/test"
"""
path = tmp_path / "central.toml"
path.write_text(toml_content)
return path
@pytest.mark.asyncio
async def test_create_toml_source(self, toml_file: Path) -> None:
"""create_config_source returns TomlConfigSource for 'toml' type."""
source = await create_config_source(
source_type="toml",
toml_path=toml_file,
)
assert isinstance(source, TomlConfigSource)
await source.close()
@pytest.mark.asyncio
async def test_create_db_source(self, clean_config_schema: None) -> None:
"""create_config_source returns DbConfigSource for 'db' type."""
source = await create_config_source(
source_type="db",
dsn=TEST_DB_DSN,
)
assert isinstance(source, DbConfigSource)
await source.close()
@pytest.mark.asyncio
async def test_create_toml_requires_path(self) -> None:
"""create_config_source raises for 'toml' without path."""
with pytest.raises(ValueError, match="toml_path required"):
await create_config_source(source_type="toml")
@pytest.mark.asyncio
async def test_create_db_requires_dsn(self) -> None:
"""create_config_source raises for 'db' without dsn."""
with pytest.raises(ValueError, match="dsn required"):
await create_config_source(source_type="db")
@pytest.mark.asyncio
async def test_create_unknown_type_raises(self) -> None:
"""create_config_source raises for unknown type."""
with pytest.raises(ValueError, match="Unknown config source type"):
await create_config_source(source_type="unknown")

View file

@ -355,40 +355,3 @@ class TestRateLimitGuarantee:
f"Multiple NOTIFYs should not cause extra polls."
)
class TestBootstrapConfigFlag:
"""Tests for CENTRAL_CONFIG_SOURCE bootstrap flag."""
def test_default_is_toml(self) -> None:
"""Default config_source is 'toml'."""
from central.bootstrap_config import Settings
# Create settings with minimal required fields
settings = Settings(
db_dsn="postgresql://test@localhost/test",
_env_file=None,
)
assert settings.config_source == "toml"
def test_accepts_db(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""config_source accepts 'db' value."""
from central.bootstrap_config import Settings, get_settings
get_settings.cache_clear()
monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "db")
monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test")
settings = get_settings()
assert settings.config_source == "db"
def test_rejects_invalid(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""config_source rejects invalid values."""
from pydantic import ValidationError
from central.bootstrap_config import Settings, get_settings
get_settings.cache_clear()
monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "invalid")
monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test")
with pytest.raises(ValidationError):
get_settings()