diff --git a/docs/PHASE-1a-3-VERIFICATION.md b/docs/PHASE-1a-3-VERIFICATION.md index 0774379..bd8b6f7 100644 --- a/docs/PHASE-1a-3-VERIFICATION.md +++ b/docs/PHASE-1a-3-VERIFICATION.md @@ -193,3 +193,66 @@ All upstream alerts found in DB ✓ | 6 | Data integrity | ✅ All alerts in DB | **Phase B Complete.** System running stable on DB-backed config. + + +--- + +## Cadence Revert (Close-out) + +**Timestamp:** 2026-05-16T03:54:14Z + +### Issue Discovered + +During close-out verification, polls were observed at 90s intervals despite +DB showing `cadence_s = 60`. Investigation revealed the live reschedule +from 90→60 (done at 03:23:08 during Phase B) didn't properly update the +in-flight scheduling. + +### Resolution + +Supervisor restart was required to clear stale state: + +```bash +systemctl restart central-supervisor +``` + +### Post-Restart Verification + +**DB State:** +```sql +SELECT name, cadence_s, updated_at FROM config.adapters WHERE name='nws'; +``` +``` + name | cadence_s | updated_at +------+-----------+------------------------------- + nws | 60 | 2026-05-16 03:50:53.210963+00 +``` + +**Poll Intervals After Restart:** +``` +03:54:14.621376 - NWS poll completed (first poll after restart) +03:55:15.028963 - NWS poll completed (61s later) ✅ +03:56:15.429013 - NWS poll completed (60s later) ✅ +``` + +**Startup Log:** +```json +{"ts": "2026-05-16T03:54:14.318479+00:00", "msg": "Adapter started", "adapter": "nws", "cadence_s": 60} +``` + +### Bug Note + +The cadence DECREASE (90→60) rate-limit test from Phase B showed correct +log output ("Rescheduled adapter" with new_cadence_s=60) but the actual +scheduling didn't update properly. The increase test (60→90) worked +correctly. + +**Root cause:** Unknown - requires investigation. The `_reschedule_adapter` +method updates `state.config` and `state.adapter.cadence_s`, and signals +via `cancel_event`, but the scheduling loop may not be re-evaluating +correctly for decreases. + +**Mitigation:** After any cadence change, verify actual poll intervals match +expected cadence. If not, restart supervisor. + +**Result:** Cadence confirmed at 60s after restart. ✅ diff --git a/src/central/archive.py b/src/central/archive.py index 86cfabd..1b9e7c2 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -308,7 +308,7 @@ async def async_main() -> None: "Archive starting", extra={ "nats_url": settings.nats_url, - "config_source": settings.config_source, + }, ) diff --git a/src/central/bootstrap_config.py b/src/central/bootstrap_config.py index d0e36ea..f5e46bc 100644 --- a/src/central/bootstrap_config.py +++ b/src/central/bootstrap_config.py @@ -1,61 +1,46 @@ -"""Bootstrap configuration from environment variables. - -This module provides early-stage configuration loading from environment -variables or a .env file. Used before the database-backed config store -is available. -""" - -from functools import lru_cache -from pathlib import Path -from typing import Literal - -from pydantic import Field, field_validator -from pydantic_settings import BaseSettings, SettingsConfigDict - - -class Settings(BaseSettings): - """Bootstrap settings loaded from environment or .env file.""" - - model_config = SettingsConfigDict( - env_prefix="CENTRAL_", - env_file=".env", - env_file_encoding="utf-8", - extra="ignore", - ) - - db_dsn: str = Field(description="PostgreSQL connection string") - nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL") - master_key_path: Path = Field( - default=Path("/etc/central/master.key"), - description="Path to AES-256 master key file", - ) - log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field( - default="INFO", - description="Logging level", - ) - config_source: Literal["toml", "db"] = Field( - default="toml", - description="Configuration source: 'toml' for TOML file, 'db' for database", - ) - config_toml_path: Path = Field( - default=Path("/etc/central/central.toml"), - description="Path to TOML config file (when config_source=toml)", - ) - - @field_validator("config_source") - @classmethod - def validate_config_source(cls, v: str) -> str: - if v not in ("toml", "db"): - raise ValueError(f"config_source must be 'toml' or 'db', got {v!r}") - return v - - -@lru_cache -def get_settings(env_file: Path | None = None) -> Settings: - """Load settings, optionally from a specific .env file. - - Results are cached. Call get_settings.cache_clear() to reload. - """ - if env_file is not None: - return Settings(_env_file=env_file) - return Settings() +"""Bootstrap configuration from environment variables. + +This module provides early-stage configuration loading from environment +variables or a .env file. Used before the database-backed config store +is available. +""" + +from functools import lru_cache +from pathlib import Path +from typing import Literal + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """Bootstrap settings loaded from environment or .env file.""" + + model_config = SettingsConfigDict( + env_prefix="CENTRAL_", + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + ) + + db_dsn: str = Field(description="PostgreSQL connection string") + nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL") + master_key_path: Path = Field( + default=Path("/etc/central/master.key"), + description="Path to AES-256 master key file", + ) + log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field( + default="INFO", + description="Logging level", + ) + + +@lru_cache +def get_settings(env_file: Path | None = None) -> Settings: + """Load settings, optionally from a specific .env file. + + Results are cached. Call get_settings.cache_clear() to reload. + """ + if env_file is not None: + return Settings(_env_file=env_file) + return Settings() diff --git a/src/central/config_source.py b/src/central/config_source.py index c430ad0..b70ddf1 100644 --- a/src/central/config_source.py +++ b/src/central/config_source.py @@ -1,187 +1,80 @@ -"""Configuration source abstraction. - -Provides a unified interface for loading adapter configuration from -either TOML files or the database-backed config store. -""" - -import logging -from collections.abc import Awaitable, Callable -from pathlib import Path -from typing import Any, Protocol, runtime_checkable - -import tomllib - -from central.config import NWSAdapterConfig -from central.config_models import AdapterConfig -from central.config_store import ConfigStore - -logger = logging.getLogger(__name__) - - -@runtime_checkable -class ConfigSource(Protocol): - """Protocol for configuration sources.""" - - async def list_enabled_adapters(self) -> list[AdapterConfig]: - """List all enabled adapters.""" - ... - - async def get_adapter(self, name: str) -> AdapterConfig | None: - """Get configuration for a specific adapter.""" - ... - - async def watch_for_changes( - self, - callback: Callable[[str, str], Awaitable[None] | None], - ) -> None: - """Watch for configuration changes. - - For TOML source, this is a no-op (returns immediately). - For DB source, this runs forever, calling callback(table, key) on changes. - """ - ... - - async def close(self) -> None: - """Clean up resources.""" - ... - - -class TomlConfigSource: - """Configuration source backed by a TOML file. - - This is the legacy configuration path. Does not support hot-reload. - """ - - def __init__(self, toml_path: Path) -> None: - self._toml_path = toml_path - self._adapters: dict[str, AdapterConfig] = {} - self._loaded = False - - def _load(self) -> None: - """Load configuration from TOML file.""" - if self._loaded: - return - - with self._toml_path.open("rb") as f: - data = tomllib.load(f) - - adapters_raw = data.get("adapters", {}) - from datetime import datetime, timezone - - now = datetime.now(timezone.utc) - - for name, adapter_data in adapters_raw.items(): - # Convert TOML adapter config to unified AdapterConfig - # TOML uses NWSAdapterConfig shape, we need to convert to AdapterConfig - enabled = adapter_data.get("enabled", True) - cadence_s = adapter_data.get("cadence_s", 60) - - # Extract settings (everything except enabled/cadence_s) - settings = { - k: v - for k, v in adapter_data.items() - if k not in ("enabled", "cadence_s") - } - - self._adapters[name] = AdapterConfig( - name=name, - enabled=enabled, - cadence_s=cadence_s, - settings=settings, - paused_at=None, - updated_at=now, - ) - - self._loaded = True - logger.info( - "Loaded TOML config", - extra={"path": str(self._toml_path), "adapters": list(self._adapters.keys())}, - ) - - async def list_enabled_adapters(self) -> list[AdapterConfig]: - """List all enabled adapters from TOML.""" - self._load() - return [a for a in self._adapters.values() if a.enabled and not a.is_paused] - - async def get_adapter(self, name: str) -> AdapterConfig | None: - """Get a specific adapter from TOML.""" - self._load() - return self._adapters.get(name) - - async def watch_for_changes( - self, - callback: Callable[[str, str], Awaitable[None] | None], - ) -> None: - """TOML does not support hot-reload. Returns immediately.""" - logger.debug("TOML config source does not support hot-reload") - return - - async def close(self) -> None: - """No resources to clean up for TOML source.""" - pass - - -class DbConfigSource: - """Configuration source backed by the Postgres config store. - - Supports hot-reload via LISTEN/NOTIFY. - """ - - def __init__(self, config_store: ConfigStore) -> None: - self._store = config_store - - @classmethod - async def create(cls, dsn: str) -> "DbConfigSource": - """Create a DbConfigSource with a new ConfigStore.""" - store = await ConfigStore.create(dsn) - return cls(store) - - async def list_enabled_adapters(self) -> list[AdapterConfig]: - """List all enabled adapters from database.""" - all_adapters = await self._store.list_adapters() - return [a for a in all_adapters if a.enabled and not a.is_paused] - - async def get_adapter(self, name: str) -> AdapterConfig | None: - """Get a specific adapter from database.""" - return await self._store.get_adapter(name) - - async def watch_for_changes( - self, - callback: Callable[[str, str], Awaitable[None] | None], - ) -> None: - """Watch for changes via Postgres LISTEN/NOTIFY. - - Runs forever, calling callback(table, key) on each change. - """ - await self._store.listen_for_changes(callback) - - async def close(self) -> None: - """Close the underlying config store.""" - await self._store.close() - - -async def create_config_source( - source_type: str, - dsn: str | None = None, - toml_path: Path | None = None, -) -> ConfigSource: - """Factory function to create the appropriate config source. - - Args: - source_type: "toml" or "db" - dsn: PostgreSQL DSN (required for "db") - toml_path: Path to TOML file (required for "toml") - - Returns: - ConfigSource implementation - """ - if source_type == "toml": - if toml_path is None: - raise ValueError("toml_path required for toml config source") - return TomlConfigSource(toml_path) - elif source_type == "db": - if dsn is None: - raise ValueError("dsn required for db config source") - return await DbConfigSource.create(dsn) - else: - raise ValueError(f"Unknown config source type: {source_type}") +"""Configuration source abstraction. + +Provides a unified interface for loading adapter configuration from +the database-backed config store. +""" + +import logging +from collections.abc import Awaitable, Callable +from typing import Protocol, runtime_checkable + +from central.config_models import AdapterConfig +from central.config_store import ConfigStore + +logger = logging.getLogger(__name__) + + +@runtime_checkable +class ConfigSource(Protocol): + """Protocol for configuration sources.""" + + async def list_enabled_adapters(self) -> list[AdapterConfig]: + """List all enabled adapters.""" + ... + + async def get_adapter(self, name: str) -> AdapterConfig | None: + """Get configuration for a specific adapter.""" + ... + + async def watch_for_changes( + self, + callback: Callable[[str, str], Awaitable[None] | None], + ) -> None: + """Watch for configuration changes. + + Runs forever, calling callback(table, key) on changes. + """ + ... + + async def close(self) -> None: + """Clean up resources.""" + ... + + +class DbConfigSource: + """Configuration source backed by the Postgres config store. + + Supports hot-reload via LISTEN/NOTIFY. + """ + + def __init__(self, config_store: ConfigStore) -> None: + self._store = config_store + + @classmethod + async def create(cls, dsn: str) -> "DbConfigSource": + """Create a DbConfigSource with a new ConfigStore.""" + store = await ConfigStore.create(dsn) + return cls(store) + + async def list_enabled_adapters(self) -> list[AdapterConfig]: + """List all enabled adapters from database.""" + all_adapters = await self._store.list_adapters() + return [a for a in all_adapters if a.enabled and not a.is_paused] + + async def get_adapter(self, name: str) -> AdapterConfig | None: + """Get a specific adapter from database.""" + return await self._store.get_adapter(name) + + async def watch_for_changes( + self, + callback: Callable[[str, str], Awaitable[None] | None], + ) -> None: + """Watch for changes via Postgres LISTEN/NOTIFY. + + Runs forever, calling callback(table, key) on each change. + """ + await self._store.listen_for_changes(callback) + + async def close(self) -> None: + """Close the underlying config store.""" + await self._store.close() diff --git a/src/central/supervisor.py b/src/central/supervisor.py index b7466fb..06d7c72 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -15,9 +15,9 @@ from nats.js import JetStreamContext from central.adapters.nws import NWSAdapter from central.cloudevents_wire import wrap_event -from central.config import load_config, Config, NWSAdapterConfig +from central.config import NWSAdapterConfig from central.config_models import AdapterConfig -from central.config_source import ConfigSource, create_config_source +from central.config_source import ConfigSource, DbConfigSource from central.bootstrap_config import get_settings from central.models import subject_for_event @@ -528,7 +528,7 @@ class Supervisor: for config in enabled_adapters: await self._start_adapter(config) - # Start config watcher (for DB source, this runs forever; for TOML, returns immediately) + # Start config watcher (runs forever, calling callback on changes) self._config_watch_task = asyncio.create_task( self._config_source.watch_for_changes(self._on_config_change) ) @@ -579,37 +579,22 @@ async def async_main() -> None: settings = get_settings() logger.info( - "Config source: %s", - settings.config_source, - extra={"config_source": settings.config_source}, + "Config source: db", + extra={"config_source": "db"}, ) - # Create config source based on setting - config_source = await create_config_source( - source_type=settings.config_source, - dsn=settings.db_dsn, - toml_path=settings.config_toml_path, - ) - - # CloudEvents config: try TOML first, fall back to code defaults - # (CloudEvents envelope format is protocol-level, not operator-configurable) - cloudevents_config = None - if settings.config_source == "toml": - try: - toml_config = load_config(str(settings.config_toml_path)) - cloudevents_config = toml_config - except Exception: - pass # Will use defaults from cloudevents_constants + # Create database config source + config_source = await DbConfigSource.create(settings.db_dsn) supervisor = Supervisor( config_source=config_source, nats_url=settings.nats_url, - cloudevents_config=cloudevents_config, + # CloudEvents uses protocol-level defaults from cloudevents_constants + cloudevents_config=None, ) logger.info( - "CloudEvents config: %s", - "TOML" if cloudevents_config else "defaults", - extra={"cloudevents_source": "toml" if cloudevents_config else "defaults"}, + "CloudEvents config: defaults", + extra={"cloudevents_source": "defaults"}, ) loop = asyncio.get_running_loop() diff --git a/tests/test_config_source.py b/tests/test_config_source.py index a87cccb..0c49788 100644 --- a/tests/test_config_source.py +++ b/tests/test_config_source.py @@ -1,9 +1,7 @@ """Tests for configuration source abstraction.""" -import asyncio import base64 import os -from datetime import datetime, timezone from pathlib import Path import asyncpg @@ -12,11 +10,8 @@ import pytest_asyncio from central.config_source import ( ConfigSource, - TomlConfigSource, DbConfigSource, - create_config_source, ) -from central.config_store import ConfigStore from central.crypto import KEY_SIZE, clear_key_cache # Test database DSN @@ -43,93 +38,6 @@ def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) -> monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path)) -class TestTomlConfigSource: - """Tests for TOML-based config source.""" - - @pytest.fixture - def toml_file(self, tmp_path: Path) -> Path: - """Create a test TOML config file.""" - toml_content = """ -[adapters.nws] -enabled = true -cadence_s = 60 -states = ["ID", "MT"] -contact_email = "test@example.com" - -[adapters.disabled_adapter] -enabled = false -cadence_s = 300 -states = [] -contact_email = "test@example.com" - -[cloudevents] -type_prefix = "central" -source = "central.local" -schema_version = "1.0" - -[nats] -url = "nats://localhost:4222" - -[postgres] -dsn = "postgresql://user:pass@localhost/db" -""" - path = tmp_path / "central.toml" - path.write_text(toml_content) - return path - - @pytest.mark.asyncio - async def test_list_enabled_adapters(self, toml_file: Path) -> None: - """list_enabled_adapters returns only enabled adapters.""" - source = TomlConfigSource(toml_file) - adapters = await source.list_enabled_adapters() - - assert len(adapters) == 1 - assert adapters[0].name == "nws" - assert adapters[0].enabled is True - assert adapters[0].cadence_s == 60 - - @pytest.mark.asyncio - async def test_get_adapter(self, toml_file: Path) -> None: - """get_adapter returns correct adapter config.""" - source = TomlConfigSource(toml_file) - - adapter = await source.get_adapter("nws") - assert adapter is not None - assert adapter.name == "nws" - assert adapter.settings["states"] == ["ID", "MT"] - assert adapter.settings["contact_email"] == "test@example.com" - - @pytest.mark.asyncio - async def test_get_nonexistent_adapter(self, toml_file: Path) -> None: - """get_adapter returns None for nonexistent adapter.""" - source = TomlConfigSource(toml_file) - adapter = await source.get_adapter("does_not_exist") - assert adapter is None - - @pytest.mark.asyncio - async def test_watch_for_changes_returns_immediately(self, toml_file: Path) -> None: - """watch_for_changes is a no-op for TOML source.""" - source = TomlConfigSource(toml_file) - callback_called = False - - async def callback(table: str, key: str) -> None: - nonlocal callback_called - callback_called = True - - # Should return immediately without blocking - await asyncio.wait_for( - source.watch_for_changes(callback), - timeout=1.0, - ) - assert not callback_called - - @pytest.mark.asyncio - async def test_implements_protocol(self, toml_file: Path) -> None: - """TomlConfigSource implements ConfigSource protocol.""" - source = TomlConfigSource(toml_file) - assert isinstance(source, ConfigSource) - - @pytest_asyncio.fixture async def db_conn() -> asyncpg.Connection: """Get a direct database connection for setup/teardown.""" @@ -222,64 +130,3 @@ class TestDbConfigSource: async def test_implements_protocol(self, db_source: DbConfigSource) -> None: """DbConfigSource implements ConfigSource protocol.""" assert isinstance(db_source, ConfigSource) - - -class TestCreateConfigSource: - """Tests for the config source factory function.""" - - @pytest.fixture - def toml_file(self, tmp_path: Path) -> Path: - """Create a minimal TOML config file.""" - toml_content = """ -[adapters.nws] -enabled = true -cadence_s = 60 -states = [] -contact_email = "test@example.com" - -[cloudevents] -[nats] -[postgres] -dsn = "postgresql://test@localhost/test" -""" - path = tmp_path / "central.toml" - path.write_text(toml_content) - return path - - @pytest.mark.asyncio - async def test_create_toml_source(self, toml_file: Path) -> None: - """create_config_source returns TomlConfigSource for 'toml' type.""" - source = await create_config_source( - source_type="toml", - toml_path=toml_file, - ) - assert isinstance(source, TomlConfigSource) - await source.close() - - @pytest.mark.asyncio - async def test_create_db_source(self, clean_config_schema: None) -> None: - """create_config_source returns DbConfigSource for 'db' type.""" - source = await create_config_source( - source_type="db", - dsn=TEST_DB_DSN, - ) - assert isinstance(source, DbConfigSource) - await source.close() - - @pytest.mark.asyncio - async def test_create_toml_requires_path(self) -> None: - """create_config_source raises for 'toml' without path.""" - with pytest.raises(ValueError, match="toml_path required"): - await create_config_source(source_type="toml") - - @pytest.mark.asyncio - async def test_create_db_requires_dsn(self) -> None: - """create_config_source raises for 'db' without dsn.""" - with pytest.raises(ValueError, match="dsn required"): - await create_config_source(source_type="db") - - @pytest.mark.asyncio - async def test_create_unknown_type_raises(self) -> None: - """create_config_source raises for unknown type.""" - with pytest.raises(ValueError, match="Unknown config source type"): - await create_config_source(source_type="unknown") diff --git a/tests/test_supervisor_hotreload.py b/tests/test_supervisor_hotreload.py index 4579ee6..7e1090f 100644 --- a/tests/test_supervisor_hotreload.py +++ b/tests/test_supervisor_hotreload.py @@ -355,40 +355,3 @@ class TestRateLimitGuarantee: f"Multiple NOTIFYs should not cause extra polls." ) - -class TestBootstrapConfigFlag: - """Tests for CENTRAL_CONFIG_SOURCE bootstrap flag.""" - - def test_default_is_toml(self) -> None: - """Default config_source is 'toml'.""" - from central.bootstrap_config import Settings - - # Create settings with minimal required fields - settings = Settings( - db_dsn="postgresql://test@localhost/test", - _env_file=None, - ) - assert settings.config_source == "toml" - - def test_accepts_db(self, monkeypatch: pytest.MonkeyPatch) -> None: - """config_source accepts 'db' value.""" - from central.bootstrap_config import Settings, get_settings - - get_settings.cache_clear() - monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "db") - monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test") - - settings = get_settings() - assert settings.config_source == "db" - - def test_rejects_invalid(self, monkeypatch: pytest.MonkeyPatch) -> None: - """config_source rejects invalid values.""" - from pydantic import ValidationError - from central.bootstrap_config import Settings, get_settings - - get_settings.cache_clear() - monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "invalid") - monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test") - - with pytest.raises(ValidationError): - get_settings()