Merge pull request #3 from zvx-echo6/feature/1a-3-phase-c-toml-retirement

Phase 1a-3 Part C: Retire TOML config source
This commit is contained in:
malice 2026-05-15 22:00:02 -06:00 committed by GitHub
commit 0b23cc4572
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 201 additions and 465 deletions

View file

@ -193,3 +193,66 @@ All upstream alerts found in DB ✓
| 6 | Data integrity | ✅ All alerts in DB | | 6 | Data integrity | ✅ All alerts in DB |
**Phase B Complete.** System running stable on DB-backed config. **Phase B Complete.** System running stable on DB-backed config.
---
## Cadence Revert (Close-out)
**Timestamp:** 2026-05-16T03:54:14Z
### Issue Discovered
During close-out verification, polls were observed at 90s intervals despite
DB showing `cadence_s = 60`. Investigation revealed the live reschedule
from 90→60 (done at 03:23:08 during Phase B) didn't properly update the
in-flight scheduling.
### Resolution
Supervisor restart was required to clear stale state:
```bash
systemctl restart central-supervisor
```
### Post-Restart Verification
**DB State:**
```sql
SELECT name, cadence_s, updated_at FROM config.adapters WHERE name='nws';
```
```
name | cadence_s | updated_at
------+-----------+-------------------------------
nws | 60 | 2026-05-16 03:50:53.210963+00
```
**Poll Intervals After Restart:**
```
03:54:14.621376 - NWS poll completed (first poll after restart)
03:55:15.028963 - NWS poll completed (61s later) ✅
03:56:15.429013 - NWS poll completed (60s later) ✅
```
**Startup Log:**
```json
{"ts": "2026-05-16T03:54:14.318479+00:00", "msg": "Adapter started", "adapter": "nws", "cadence_s": 60}
```
### Bug Note
The cadence DECREASE (90→60) rate-limit test from Phase B showed correct
log output ("Rescheduled adapter" with new_cadence_s=60) but the actual
scheduling didn't update properly. The increase test (60→90) worked
correctly.
**Root cause:** Unknown - requires investigation. The `_reschedule_adapter`
method updates `state.config` and `state.adapter.cadence_s`, and signals
via `cancel_event`, but the scheduling loop may not be re-evaluating
correctly for decreases.
**Mitigation:** After any cadence change, verify actual poll intervals match
expected cadence. If not, restart supervisor.
**Result:** Cadence confirmed at 60s after restart. ✅

View file

@ -308,7 +308,7 @@ async def async_main() -> None:
"Archive starting", "Archive starting",
extra={ extra={
"nats_url": settings.nats_url, "nats_url": settings.nats_url,
"config_source": settings.config_source,
}, },
) )

View file

@ -1,61 +1,46 @@
"""Bootstrap configuration from environment variables. """Bootstrap configuration from environment variables.
This module provides early-stage configuration loading from environment This module provides early-stage configuration loading from environment
variables or a .env file. Used before the database-backed config store variables or a .env file. Used before the database-backed config store
is available. is available.
""" """
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Literal
from pydantic import Field, field_validator from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings): class Settings(BaseSettings):
"""Bootstrap settings loaded from environment or .env file.""" """Bootstrap settings loaded from environment or .env file."""
model_config = SettingsConfigDict( model_config = SettingsConfigDict(
env_prefix="CENTRAL_", env_prefix="CENTRAL_",
env_file=".env", env_file=".env",
env_file_encoding="utf-8", env_file_encoding="utf-8",
extra="ignore", extra="ignore",
) )
db_dsn: str = Field(description="PostgreSQL connection string") db_dsn: str = Field(description="PostgreSQL connection string")
nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL") nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL")
master_key_path: Path = Field( master_key_path: Path = Field(
default=Path("/etc/central/master.key"), default=Path("/etc/central/master.key"),
description="Path to AES-256 master key file", description="Path to AES-256 master key file",
) )
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field( log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(
default="INFO", default="INFO",
description="Logging level", description="Logging level",
) )
config_source: Literal["toml", "db"] = Field(
default="toml",
description="Configuration source: 'toml' for TOML file, 'db' for database", @lru_cache
) def get_settings(env_file: Path | None = None) -> Settings:
config_toml_path: Path = Field( """Load settings, optionally from a specific .env file.
default=Path("/etc/central/central.toml"),
description="Path to TOML config file (when config_source=toml)", Results are cached. Call get_settings.cache_clear() to reload.
) """
if env_file is not None:
@field_validator("config_source") return Settings(_env_file=env_file)
@classmethod return Settings()
def validate_config_source(cls, v: str) -> str:
if v not in ("toml", "db"):
raise ValueError(f"config_source must be 'toml' or 'db', got {v!r}")
return v
@lru_cache
def get_settings(env_file: Path | None = None) -> Settings:
"""Load settings, optionally from a specific .env file.
Results are cached. Call get_settings.cache_clear() to reload.
"""
if env_file is not None:
return Settings(_env_file=env_file)
return Settings()

View file

@ -1,187 +1,80 @@
"""Configuration source abstraction. """Configuration source abstraction.
Provides a unified interface for loading adapter configuration from Provides a unified interface for loading adapter configuration from
either TOML files or the database-backed config store. the database-backed config store.
""" """
import logging import logging
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from pathlib import Path from typing import Protocol, runtime_checkable
from typing import Any, Protocol, runtime_checkable
from central.config_models import AdapterConfig
import tomllib from central.config_store import ConfigStore
from central.config import NWSAdapterConfig logger = logging.getLogger(__name__)
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
@runtime_checkable
logger = logging.getLogger(__name__) class ConfigSource(Protocol):
"""Protocol for configuration sources."""
@runtime_checkable async def list_enabled_adapters(self) -> list[AdapterConfig]:
class ConfigSource(Protocol): """List all enabled adapters."""
"""Protocol for configuration sources.""" ...
async def list_enabled_adapters(self) -> list[AdapterConfig]: async def get_adapter(self, name: str) -> AdapterConfig | None:
"""List all enabled adapters.""" """Get configuration for a specific adapter."""
... ...
async def get_adapter(self, name: str) -> AdapterConfig | None: async def watch_for_changes(
"""Get configuration for a specific adapter.""" self,
... callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
async def watch_for_changes( """Watch for configuration changes.
self,
callback: Callable[[str, str], Awaitable[None] | None], Runs forever, calling callback(table, key) on changes.
) -> None: """
"""Watch for configuration changes. ...
For TOML source, this is a no-op (returns immediately). async def close(self) -> None:
For DB source, this runs forever, calling callback(table, key) on changes. """Clean up resources."""
""" ...
...
async def close(self) -> None: class DbConfigSource:
"""Clean up resources.""" """Configuration source backed by the Postgres config store.
...
Supports hot-reload via LISTEN/NOTIFY.
"""
class TomlConfigSource:
"""Configuration source backed by a TOML file. def __init__(self, config_store: ConfigStore) -> None:
self._store = config_store
This is the legacy configuration path. Does not support hot-reload.
""" @classmethod
async def create(cls, dsn: str) -> "DbConfigSource":
def __init__(self, toml_path: Path) -> None: """Create a DbConfigSource with a new ConfigStore."""
self._toml_path = toml_path store = await ConfigStore.create(dsn)
self._adapters: dict[str, AdapterConfig] = {} return cls(store)
self._loaded = False
async def list_enabled_adapters(self) -> list[AdapterConfig]:
def _load(self) -> None: """List all enabled adapters from database."""
"""Load configuration from TOML file.""" all_adapters = await self._store.list_adapters()
if self._loaded: return [a for a in all_adapters if a.enabled and not a.is_paused]
return
async def get_adapter(self, name: str) -> AdapterConfig | None:
with self._toml_path.open("rb") as f: """Get a specific adapter from database."""
data = tomllib.load(f) return await self._store.get_adapter(name)
adapters_raw = data.get("adapters", {}) async def watch_for_changes(
from datetime import datetime, timezone self,
callback: Callable[[str, str], Awaitable[None] | None],
now = datetime.now(timezone.utc) ) -> None:
"""Watch for changes via Postgres LISTEN/NOTIFY.
for name, adapter_data in adapters_raw.items():
# Convert TOML adapter config to unified AdapterConfig Runs forever, calling callback(table, key) on each change.
# TOML uses NWSAdapterConfig shape, we need to convert to AdapterConfig """
enabled = adapter_data.get("enabled", True) await self._store.listen_for_changes(callback)
cadence_s = adapter_data.get("cadence_s", 60)
async def close(self) -> None:
# Extract settings (everything except enabled/cadence_s) """Close the underlying config store."""
settings = { await self._store.close()
k: v
for k, v in adapter_data.items()
if k not in ("enabled", "cadence_s")
}
self._adapters[name] = AdapterConfig(
name=name,
enabled=enabled,
cadence_s=cadence_s,
settings=settings,
paused_at=None,
updated_at=now,
)
self._loaded = True
logger.info(
"Loaded TOML config",
extra={"path": str(self._toml_path), "adapters": list(self._adapters.keys())},
)
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters from TOML."""
self._load()
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get a specific adapter from TOML."""
self._load()
return self._adapters.get(name)
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""TOML does not support hot-reload. Returns immediately."""
logger.debug("TOML config source does not support hot-reload")
return
async def close(self) -> None:
"""No resources to clean up for TOML source."""
pass
class DbConfigSource:
"""Configuration source backed by the Postgres config store.
Supports hot-reload via LISTEN/NOTIFY.
"""
def __init__(self, config_store: ConfigStore) -> None:
self._store = config_store
@classmethod
async def create(cls, dsn: str) -> "DbConfigSource":
"""Create a DbConfigSource with a new ConfigStore."""
store = await ConfigStore.create(dsn)
return cls(store)
async def list_enabled_adapters(self) -> list[AdapterConfig]:
"""List all enabled adapters from database."""
all_adapters = await self._store.list_adapters()
return [a for a in all_adapters if a.enabled and not a.is_paused]
async def get_adapter(self, name: str) -> AdapterConfig | None:
"""Get a specific adapter from database."""
return await self._store.get_adapter(name)
async def watch_for_changes(
self,
callback: Callable[[str, str], Awaitable[None] | None],
) -> None:
"""Watch for changes via Postgres LISTEN/NOTIFY.
Runs forever, calling callback(table, key) on each change.
"""
await self._store.listen_for_changes(callback)
async def close(self) -> None:
"""Close the underlying config store."""
await self._store.close()
async def create_config_source(
source_type: str,
dsn: str | None = None,
toml_path: Path | None = None,
) -> ConfigSource:
"""Factory function to create the appropriate config source.
Args:
source_type: "toml" or "db"
dsn: PostgreSQL DSN (required for "db")
toml_path: Path to TOML file (required for "toml")
Returns:
ConfigSource implementation
"""
if source_type == "toml":
if toml_path is None:
raise ValueError("toml_path required for toml config source")
return TomlConfigSource(toml_path)
elif source_type == "db":
if dsn is None:
raise ValueError("dsn required for db config source")
return await DbConfigSource.create(dsn)
else:
raise ValueError(f"Unknown config source type: {source_type}")

View file

@ -15,9 +15,9 @@ from nats.js import JetStreamContext
from central.adapters.nws import NWSAdapter from central.adapters.nws import NWSAdapter
from central.cloudevents_wire import wrap_event from central.cloudevents_wire import wrap_event
from central.config import load_config, Config, NWSAdapterConfig from central.config import NWSAdapterConfig
from central.config_models import AdapterConfig from central.config_models import AdapterConfig
from central.config_source import ConfigSource, create_config_source from central.config_source import ConfigSource, DbConfigSource
from central.bootstrap_config import get_settings from central.bootstrap_config import get_settings
from central.models import subject_for_event from central.models import subject_for_event
@ -528,7 +528,7 @@ class Supervisor:
for config in enabled_adapters: for config in enabled_adapters:
await self._start_adapter(config) await self._start_adapter(config)
# Start config watcher (for DB source, this runs forever; for TOML, returns immediately) # Start config watcher (runs forever, calling callback on changes)
self._config_watch_task = asyncio.create_task( self._config_watch_task = asyncio.create_task(
self._config_source.watch_for_changes(self._on_config_change) self._config_source.watch_for_changes(self._on_config_change)
) )
@ -579,37 +579,22 @@ async def async_main() -> None:
settings = get_settings() settings = get_settings()
logger.info( logger.info(
"Config source: %s", "Config source: db",
settings.config_source, extra={"config_source": "db"},
extra={"config_source": settings.config_source},
) )
# Create config source based on setting # Create database config source
config_source = await create_config_source( config_source = await DbConfigSource.create(settings.db_dsn)
source_type=settings.config_source,
dsn=settings.db_dsn,
toml_path=settings.config_toml_path,
)
# CloudEvents config: try TOML first, fall back to code defaults
# (CloudEvents envelope format is protocol-level, not operator-configurable)
cloudevents_config = None
if settings.config_source == "toml":
try:
toml_config = load_config(str(settings.config_toml_path))
cloudevents_config = toml_config
except Exception:
pass # Will use defaults from cloudevents_constants
supervisor = Supervisor( supervisor = Supervisor(
config_source=config_source, config_source=config_source,
nats_url=settings.nats_url, nats_url=settings.nats_url,
cloudevents_config=cloudevents_config, # CloudEvents uses protocol-level defaults from cloudevents_constants
cloudevents_config=None,
) )
logger.info( logger.info(
"CloudEvents config: %s", "CloudEvents config: defaults",
"TOML" if cloudevents_config else "defaults", extra={"cloudevents_source": "defaults"},
extra={"cloudevents_source": "toml" if cloudevents_config else "defaults"},
) )
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()

View file

@ -1,9 +1,7 @@
"""Tests for configuration source abstraction.""" """Tests for configuration source abstraction."""
import asyncio
import base64 import base64
import os import os
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
import asyncpg import asyncpg
@ -12,11 +10,8 @@ import pytest_asyncio
from central.config_source import ( from central.config_source import (
ConfigSource, ConfigSource,
TomlConfigSource,
DbConfigSource, DbConfigSource,
create_config_source,
) )
from central.config_store import ConfigStore
from central.crypto import KEY_SIZE, clear_key_cache from central.crypto import KEY_SIZE, clear_key_cache
# Test database DSN # Test database DSN
@ -43,93 +38,6 @@ def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) ->
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path)) monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
class TestTomlConfigSource:
"""Tests for TOML-based config source."""
@pytest.fixture
def toml_file(self, tmp_path: Path) -> Path:
"""Create a test TOML config file."""
toml_content = """
[adapters.nws]
enabled = true
cadence_s = 60
states = ["ID", "MT"]
contact_email = "test@example.com"
[adapters.disabled_adapter]
enabled = false
cadence_s = 300
states = []
contact_email = "test@example.com"
[cloudevents]
type_prefix = "central"
source = "central.local"
schema_version = "1.0"
[nats]
url = "nats://localhost:4222"
[postgres]
dsn = "postgresql://user:pass@localhost/db"
"""
path = tmp_path / "central.toml"
path.write_text(toml_content)
return path
@pytest.mark.asyncio
async def test_list_enabled_adapters(self, toml_file: Path) -> None:
"""list_enabled_adapters returns only enabled adapters."""
source = TomlConfigSource(toml_file)
adapters = await source.list_enabled_adapters()
assert len(adapters) == 1
assert adapters[0].name == "nws"
assert adapters[0].enabled is True
assert adapters[0].cadence_s == 60
@pytest.mark.asyncio
async def test_get_adapter(self, toml_file: Path) -> None:
"""get_adapter returns correct adapter config."""
source = TomlConfigSource(toml_file)
adapter = await source.get_adapter("nws")
assert adapter is not None
assert adapter.name == "nws"
assert adapter.settings["states"] == ["ID", "MT"]
assert adapter.settings["contact_email"] == "test@example.com"
@pytest.mark.asyncio
async def test_get_nonexistent_adapter(self, toml_file: Path) -> None:
"""get_adapter returns None for nonexistent adapter."""
source = TomlConfigSource(toml_file)
adapter = await source.get_adapter("does_not_exist")
assert adapter is None
@pytest.mark.asyncio
async def test_watch_for_changes_returns_immediately(self, toml_file: Path) -> None:
"""watch_for_changes is a no-op for TOML source."""
source = TomlConfigSource(toml_file)
callback_called = False
async def callback(table: str, key: str) -> None:
nonlocal callback_called
callback_called = True
# Should return immediately without blocking
await asyncio.wait_for(
source.watch_for_changes(callback),
timeout=1.0,
)
assert not callback_called
@pytest.mark.asyncio
async def test_implements_protocol(self, toml_file: Path) -> None:
"""TomlConfigSource implements ConfigSource protocol."""
source = TomlConfigSource(toml_file)
assert isinstance(source, ConfigSource)
@pytest_asyncio.fixture @pytest_asyncio.fixture
async def db_conn() -> asyncpg.Connection: async def db_conn() -> asyncpg.Connection:
"""Get a direct database connection for setup/teardown.""" """Get a direct database connection for setup/teardown."""
@ -222,64 +130,3 @@ class TestDbConfigSource:
async def test_implements_protocol(self, db_source: DbConfigSource) -> None: async def test_implements_protocol(self, db_source: DbConfigSource) -> None:
"""DbConfigSource implements ConfigSource protocol.""" """DbConfigSource implements ConfigSource protocol."""
assert isinstance(db_source, ConfigSource) assert isinstance(db_source, ConfigSource)
class TestCreateConfigSource:
"""Tests for the config source factory function."""
@pytest.fixture
def toml_file(self, tmp_path: Path) -> Path:
"""Create a minimal TOML config file."""
toml_content = """
[adapters.nws]
enabled = true
cadence_s = 60
states = []
contact_email = "test@example.com"
[cloudevents]
[nats]
[postgres]
dsn = "postgresql://test@localhost/test"
"""
path = tmp_path / "central.toml"
path.write_text(toml_content)
return path
@pytest.mark.asyncio
async def test_create_toml_source(self, toml_file: Path) -> None:
"""create_config_source returns TomlConfigSource for 'toml' type."""
source = await create_config_source(
source_type="toml",
toml_path=toml_file,
)
assert isinstance(source, TomlConfigSource)
await source.close()
@pytest.mark.asyncio
async def test_create_db_source(self, clean_config_schema: None) -> None:
"""create_config_source returns DbConfigSource for 'db' type."""
source = await create_config_source(
source_type="db",
dsn=TEST_DB_DSN,
)
assert isinstance(source, DbConfigSource)
await source.close()
@pytest.mark.asyncio
async def test_create_toml_requires_path(self) -> None:
"""create_config_source raises for 'toml' without path."""
with pytest.raises(ValueError, match="toml_path required"):
await create_config_source(source_type="toml")
@pytest.mark.asyncio
async def test_create_db_requires_dsn(self) -> None:
"""create_config_source raises for 'db' without dsn."""
with pytest.raises(ValueError, match="dsn required"):
await create_config_source(source_type="db")
@pytest.mark.asyncio
async def test_create_unknown_type_raises(self) -> None:
"""create_config_source raises for unknown type."""
with pytest.raises(ValueError, match="Unknown config source type"):
await create_config_source(source_type="unknown")

View file

@ -355,40 +355,3 @@ class TestRateLimitGuarantee:
f"Multiple NOTIFYs should not cause extra polls." f"Multiple NOTIFYs should not cause extra polls."
) )
class TestBootstrapConfigFlag:
"""Tests for CENTRAL_CONFIG_SOURCE bootstrap flag."""
def test_default_is_toml(self) -> None:
"""Default config_source is 'toml'."""
from central.bootstrap_config import Settings
# Create settings with minimal required fields
settings = Settings(
db_dsn="postgresql://test@localhost/test",
_env_file=None,
)
assert settings.config_source == "toml"
def test_accepts_db(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""config_source accepts 'db' value."""
from central.bootstrap_config import Settings, get_settings
get_settings.cache_clear()
monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "db")
monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test")
settings = get_settings()
assert settings.config_source == "db"
def test_rejects_invalid(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""config_source rejects invalid values."""
from pydantic import ValidationError
from central.bootstrap_config import Settings, get_settings
get_settings.cache_clear()
monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "invalid")
monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test")
with pytest.raises(ValidationError):
get_settings()