mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Merge pull request #3 from zvx-echo6/feature/1a-3-phase-c-toml-retirement
Phase 1a-3 Part C: Retire TOML config source
This commit is contained in:
commit
0b23cc4572
7 changed files with 201 additions and 465 deletions
|
|
@ -193,3 +193,66 @@ All upstream alerts found in DB ✓
|
||||||
| 6 | Data integrity | ✅ All alerts in DB |
|
| 6 | Data integrity | ✅ All alerts in DB |
|
||||||
|
|
||||||
**Phase B Complete.** System running stable on DB-backed config.
|
**Phase B Complete.** System running stable on DB-backed config.
|
||||||
|
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Cadence Revert (Close-out)
|
||||||
|
|
||||||
|
**Timestamp:** 2026-05-16T03:54:14Z
|
||||||
|
|
||||||
|
### Issue Discovered
|
||||||
|
|
||||||
|
During close-out verification, polls were observed at 90s intervals despite
|
||||||
|
DB showing `cadence_s = 60`. Investigation revealed the live reschedule
|
||||||
|
from 90→60 (done at 03:23:08 during Phase B) didn't properly update the
|
||||||
|
in-flight scheduling.
|
||||||
|
|
||||||
|
### Resolution
|
||||||
|
|
||||||
|
Supervisor restart was required to clear stale state:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
systemctl restart central-supervisor
|
||||||
|
```
|
||||||
|
|
||||||
|
### Post-Restart Verification
|
||||||
|
|
||||||
|
**DB State:**
|
||||||
|
```sql
|
||||||
|
SELECT name, cadence_s, updated_at FROM config.adapters WHERE name='nws';
|
||||||
|
```
|
||||||
|
```
|
||||||
|
name | cadence_s | updated_at
|
||||||
|
------+-----------+-------------------------------
|
||||||
|
nws | 60 | 2026-05-16 03:50:53.210963+00
|
||||||
|
```
|
||||||
|
|
||||||
|
**Poll Intervals After Restart:**
|
||||||
|
```
|
||||||
|
03:54:14.621376 - NWS poll completed (first poll after restart)
|
||||||
|
03:55:15.028963 - NWS poll completed (61s later) ✅
|
||||||
|
03:56:15.429013 - NWS poll completed (60s later) ✅
|
||||||
|
```
|
||||||
|
|
||||||
|
**Startup Log:**
|
||||||
|
```json
|
||||||
|
{"ts": "2026-05-16T03:54:14.318479+00:00", "msg": "Adapter started", "adapter": "nws", "cadence_s": 60}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Bug Note
|
||||||
|
|
||||||
|
The cadence DECREASE (90→60) rate-limit test from Phase B showed correct
|
||||||
|
log output ("Rescheduled adapter" with new_cadence_s=60) but the actual
|
||||||
|
scheduling didn't update properly. The increase test (60→90) worked
|
||||||
|
correctly.
|
||||||
|
|
||||||
|
**Root cause:** Unknown - requires investigation. The `_reschedule_adapter`
|
||||||
|
method updates `state.config` and `state.adapter.cadence_s`, and signals
|
||||||
|
via `cancel_event`, but the scheduling loop may not be re-evaluating
|
||||||
|
correctly for decreases.
|
||||||
|
|
||||||
|
**Mitigation:** After any cadence change, verify actual poll intervals match
|
||||||
|
expected cadence. If not, restart supervisor.
|
||||||
|
|
||||||
|
**Result:** Cadence confirmed at 60s after restart. ✅
|
||||||
|
|
|
||||||
|
|
@ -308,7 +308,7 @@ async def async_main() -> None:
|
||||||
"Archive starting",
|
"Archive starting",
|
||||||
extra={
|
extra={
|
||||||
"nats_url": settings.nats_url,
|
"nats_url": settings.nats_url,
|
||||||
"config_source": settings.config_source,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,61 +1,46 @@
|
||||||
"""Bootstrap configuration from environment variables.
|
"""Bootstrap configuration from environment variables.
|
||||||
|
|
||||||
This module provides early-stage configuration loading from environment
|
This module provides early-stage configuration loading from environment
|
||||||
variables or a .env file. Used before the database-backed config store
|
variables or a .env file. Used before the database-backed config store
|
||||||
is available.
|
is available.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
|
|
||||||
from pydantic import Field, field_validator
|
from pydantic import Field
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
class Settings(BaseSettings):
|
||||||
"""Bootstrap settings loaded from environment or .env file."""
|
"""Bootstrap settings loaded from environment or .env file."""
|
||||||
|
|
||||||
model_config = SettingsConfigDict(
|
model_config = SettingsConfigDict(
|
||||||
env_prefix="CENTRAL_",
|
env_prefix="CENTRAL_",
|
||||||
env_file=".env",
|
env_file=".env",
|
||||||
env_file_encoding="utf-8",
|
env_file_encoding="utf-8",
|
||||||
extra="ignore",
|
extra="ignore",
|
||||||
)
|
)
|
||||||
|
|
||||||
db_dsn: str = Field(description="PostgreSQL connection string")
|
db_dsn: str = Field(description="PostgreSQL connection string")
|
||||||
nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL")
|
nats_url: str = Field(default="nats://localhost:4222", description="NATS server URL")
|
||||||
master_key_path: Path = Field(
|
master_key_path: Path = Field(
|
||||||
default=Path("/etc/central/master.key"),
|
default=Path("/etc/central/master.key"),
|
||||||
description="Path to AES-256 master key file",
|
description="Path to AES-256 master key file",
|
||||||
)
|
)
|
||||||
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(
|
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(
|
||||||
default="INFO",
|
default="INFO",
|
||||||
description="Logging level",
|
description="Logging level",
|
||||||
)
|
)
|
||||||
config_source: Literal["toml", "db"] = Field(
|
|
||||||
default="toml",
|
|
||||||
description="Configuration source: 'toml' for TOML file, 'db' for database",
|
@lru_cache
|
||||||
)
|
def get_settings(env_file: Path | None = None) -> Settings:
|
||||||
config_toml_path: Path = Field(
|
"""Load settings, optionally from a specific .env file.
|
||||||
default=Path("/etc/central/central.toml"),
|
|
||||||
description="Path to TOML config file (when config_source=toml)",
|
Results are cached. Call get_settings.cache_clear() to reload.
|
||||||
)
|
"""
|
||||||
|
if env_file is not None:
|
||||||
@field_validator("config_source")
|
return Settings(_env_file=env_file)
|
||||||
@classmethod
|
return Settings()
|
||||||
def validate_config_source(cls, v: str) -> str:
|
|
||||||
if v not in ("toml", "db"):
|
|
||||||
raise ValueError(f"config_source must be 'toml' or 'db', got {v!r}")
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
@lru_cache
|
|
||||||
def get_settings(env_file: Path | None = None) -> Settings:
|
|
||||||
"""Load settings, optionally from a specific .env file.
|
|
||||||
|
|
||||||
Results are cached. Call get_settings.cache_clear() to reload.
|
|
||||||
"""
|
|
||||||
if env_file is not None:
|
|
||||||
return Settings(_env_file=env_file)
|
|
||||||
return Settings()
|
|
||||||
|
|
|
||||||
|
|
@ -1,187 +1,80 @@
|
||||||
"""Configuration source abstraction.
|
"""Configuration source abstraction.
|
||||||
|
|
||||||
Provides a unified interface for loading adapter configuration from
|
Provides a unified interface for loading adapter configuration from
|
||||||
either TOML files or the database-backed config store.
|
the database-backed config store.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from pathlib import Path
|
from typing import Protocol, runtime_checkable
|
||||||
from typing import Any, Protocol, runtime_checkable
|
|
||||||
|
from central.config_models import AdapterConfig
|
||||||
import tomllib
|
from central.config_store import ConfigStore
|
||||||
|
|
||||||
from central.config import NWSAdapterConfig
|
logger = logging.getLogger(__name__)
|
||||||
from central.config_models import AdapterConfig
|
|
||||||
from central.config_store import ConfigStore
|
|
||||||
|
@runtime_checkable
|
||||||
logger = logging.getLogger(__name__)
|
class ConfigSource(Protocol):
|
||||||
|
"""Protocol for configuration sources."""
|
||||||
|
|
||||||
@runtime_checkable
|
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
||||||
class ConfigSource(Protocol):
|
"""List all enabled adapters."""
|
||||||
"""Protocol for configuration sources."""
|
...
|
||||||
|
|
||||||
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
||||||
"""List all enabled adapters."""
|
"""Get configuration for a specific adapter."""
|
||||||
...
|
...
|
||||||
|
|
||||||
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
async def watch_for_changes(
|
||||||
"""Get configuration for a specific adapter."""
|
self,
|
||||||
...
|
callback: Callable[[str, str], Awaitable[None] | None],
|
||||||
|
) -> None:
|
||||||
async def watch_for_changes(
|
"""Watch for configuration changes.
|
||||||
self,
|
|
||||||
callback: Callable[[str, str], Awaitable[None] | None],
|
Runs forever, calling callback(table, key) on changes.
|
||||||
) -> None:
|
"""
|
||||||
"""Watch for configuration changes.
|
...
|
||||||
|
|
||||||
For TOML source, this is a no-op (returns immediately).
|
async def close(self) -> None:
|
||||||
For DB source, this runs forever, calling callback(table, key) on changes.
|
"""Clean up resources."""
|
||||||
"""
|
...
|
||||||
...
|
|
||||||
|
|
||||||
async def close(self) -> None:
|
class DbConfigSource:
|
||||||
"""Clean up resources."""
|
"""Configuration source backed by the Postgres config store.
|
||||||
...
|
|
||||||
|
Supports hot-reload via LISTEN/NOTIFY.
|
||||||
|
"""
|
||||||
class TomlConfigSource:
|
|
||||||
"""Configuration source backed by a TOML file.
|
def __init__(self, config_store: ConfigStore) -> None:
|
||||||
|
self._store = config_store
|
||||||
This is the legacy configuration path. Does not support hot-reload.
|
|
||||||
"""
|
@classmethod
|
||||||
|
async def create(cls, dsn: str) -> "DbConfigSource":
|
||||||
def __init__(self, toml_path: Path) -> None:
|
"""Create a DbConfigSource with a new ConfigStore."""
|
||||||
self._toml_path = toml_path
|
store = await ConfigStore.create(dsn)
|
||||||
self._adapters: dict[str, AdapterConfig] = {}
|
return cls(store)
|
||||||
self._loaded = False
|
|
||||||
|
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
||||||
def _load(self) -> None:
|
"""List all enabled adapters from database."""
|
||||||
"""Load configuration from TOML file."""
|
all_adapters = await self._store.list_adapters()
|
||||||
if self._loaded:
|
return [a for a in all_adapters if a.enabled and not a.is_paused]
|
||||||
return
|
|
||||||
|
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
||||||
with self._toml_path.open("rb") as f:
|
"""Get a specific adapter from database."""
|
||||||
data = tomllib.load(f)
|
return await self._store.get_adapter(name)
|
||||||
|
|
||||||
adapters_raw = data.get("adapters", {})
|
async def watch_for_changes(
|
||||||
from datetime import datetime, timezone
|
self,
|
||||||
|
callback: Callable[[str, str], Awaitable[None] | None],
|
||||||
now = datetime.now(timezone.utc)
|
) -> None:
|
||||||
|
"""Watch for changes via Postgres LISTEN/NOTIFY.
|
||||||
for name, adapter_data in adapters_raw.items():
|
|
||||||
# Convert TOML adapter config to unified AdapterConfig
|
Runs forever, calling callback(table, key) on each change.
|
||||||
# TOML uses NWSAdapterConfig shape, we need to convert to AdapterConfig
|
"""
|
||||||
enabled = adapter_data.get("enabled", True)
|
await self._store.listen_for_changes(callback)
|
||||||
cadence_s = adapter_data.get("cadence_s", 60)
|
|
||||||
|
async def close(self) -> None:
|
||||||
# Extract settings (everything except enabled/cadence_s)
|
"""Close the underlying config store."""
|
||||||
settings = {
|
await self._store.close()
|
||||||
k: v
|
|
||||||
for k, v in adapter_data.items()
|
|
||||||
if k not in ("enabled", "cadence_s")
|
|
||||||
}
|
|
||||||
|
|
||||||
self._adapters[name] = AdapterConfig(
|
|
||||||
name=name,
|
|
||||||
enabled=enabled,
|
|
||||||
cadence_s=cadence_s,
|
|
||||||
settings=settings,
|
|
||||||
paused_at=None,
|
|
||||||
updated_at=now,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._loaded = True
|
|
||||||
logger.info(
|
|
||||||
"Loaded TOML config",
|
|
||||||
extra={"path": str(self._toml_path), "adapters": list(self._adapters.keys())},
|
|
||||||
)
|
|
||||||
|
|
||||||
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
|
||||||
"""List all enabled adapters from TOML."""
|
|
||||||
self._load()
|
|
||||||
return [a for a in self._adapters.values() if a.enabled and not a.is_paused]
|
|
||||||
|
|
||||||
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
|
||||||
"""Get a specific adapter from TOML."""
|
|
||||||
self._load()
|
|
||||||
return self._adapters.get(name)
|
|
||||||
|
|
||||||
async def watch_for_changes(
|
|
||||||
self,
|
|
||||||
callback: Callable[[str, str], Awaitable[None] | None],
|
|
||||||
) -> None:
|
|
||||||
"""TOML does not support hot-reload. Returns immediately."""
|
|
||||||
logger.debug("TOML config source does not support hot-reload")
|
|
||||||
return
|
|
||||||
|
|
||||||
async def close(self) -> None:
|
|
||||||
"""No resources to clean up for TOML source."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class DbConfigSource:
|
|
||||||
"""Configuration source backed by the Postgres config store.
|
|
||||||
|
|
||||||
Supports hot-reload via LISTEN/NOTIFY.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, config_store: ConfigStore) -> None:
|
|
||||||
self._store = config_store
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def create(cls, dsn: str) -> "DbConfigSource":
|
|
||||||
"""Create a DbConfigSource with a new ConfigStore."""
|
|
||||||
store = await ConfigStore.create(dsn)
|
|
||||||
return cls(store)
|
|
||||||
|
|
||||||
async def list_enabled_adapters(self) -> list[AdapterConfig]:
|
|
||||||
"""List all enabled adapters from database."""
|
|
||||||
all_adapters = await self._store.list_adapters()
|
|
||||||
return [a for a in all_adapters if a.enabled and not a.is_paused]
|
|
||||||
|
|
||||||
async def get_adapter(self, name: str) -> AdapterConfig | None:
|
|
||||||
"""Get a specific adapter from database."""
|
|
||||||
return await self._store.get_adapter(name)
|
|
||||||
|
|
||||||
async def watch_for_changes(
|
|
||||||
self,
|
|
||||||
callback: Callable[[str, str], Awaitable[None] | None],
|
|
||||||
) -> None:
|
|
||||||
"""Watch for changes via Postgres LISTEN/NOTIFY.
|
|
||||||
|
|
||||||
Runs forever, calling callback(table, key) on each change.
|
|
||||||
"""
|
|
||||||
await self._store.listen_for_changes(callback)
|
|
||||||
|
|
||||||
async def close(self) -> None:
|
|
||||||
"""Close the underlying config store."""
|
|
||||||
await self._store.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def create_config_source(
|
|
||||||
source_type: str,
|
|
||||||
dsn: str | None = None,
|
|
||||||
toml_path: Path | None = None,
|
|
||||||
) -> ConfigSource:
|
|
||||||
"""Factory function to create the appropriate config source.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
source_type: "toml" or "db"
|
|
||||||
dsn: PostgreSQL DSN (required for "db")
|
|
||||||
toml_path: Path to TOML file (required for "toml")
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
ConfigSource implementation
|
|
||||||
"""
|
|
||||||
if source_type == "toml":
|
|
||||||
if toml_path is None:
|
|
||||||
raise ValueError("toml_path required for toml config source")
|
|
||||||
return TomlConfigSource(toml_path)
|
|
||||||
elif source_type == "db":
|
|
||||||
if dsn is None:
|
|
||||||
raise ValueError("dsn required for db config source")
|
|
||||||
return await DbConfigSource.create(dsn)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unknown config source type: {source_type}")
|
|
||||||
|
|
|
||||||
|
|
@ -15,9 +15,9 @@ from nats.js import JetStreamContext
|
||||||
|
|
||||||
from central.adapters.nws import NWSAdapter
|
from central.adapters.nws import NWSAdapter
|
||||||
from central.cloudevents_wire import wrap_event
|
from central.cloudevents_wire import wrap_event
|
||||||
from central.config import load_config, Config, NWSAdapterConfig
|
from central.config import NWSAdapterConfig
|
||||||
from central.config_models import AdapterConfig
|
from central.config_models import AdapterConfig
|
||||||
from central.config_source import ConfigSource, create_config_source
|
from central.config_source import ConfigSource, DbConfigSource
|
||||||
from central.bootstrap_config import get_settings
|
from central.bootstrap_config import get_settings
|
||||||
from central.models import subject_for_event
|
from central.models import subject_for_event
|
||||||
|
|
||||||
|
|
@ -528,7 +528,7 @@ class Supervisor:
|
||||||
for config in enabled_adapters:
|
for config in enabled_adapters:
|
||||||
await self._start_adapter(config)
|
await self._start_adapter(config)
|
||||||
|
|
||||||
# Start config watcher (for DB source, this runs forever; for TOML, returns immediately)
|
# Start config watcher (runs forever, calling callback on changes)
|
||||||
self._config_watch_task = asyncio.create_task(
|
self._config_watch_task = asyncio.create_task(
|
||||||
self._config_source.watch_for_changes(self._on_config_change)
|
self._config_source.watch_for_changes(self._on_config_change)
|
||||||
)
|
)
|
||||||
|
|
@ -579,37 +579,22 @@ async def async_main() -> None:
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Config source: %s",
|
"Config source: db",
|
||||||
settings.config_source,
|
extra={"config_source": "db"},
|
||||||
extra={"config_source": settings.config_source},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create config source based on setting
|
# Create database config source
|
||||||
config_source = await create_config_source(
|
config_source = await DbConfigSource.create(settings.db_dsn)
|
||||||
source_type=settings.config_source,
|
|
||||||
dsn=settings.db_dsn,
|
|
||||||
toml_path=settings.config_toml_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
# CloudEvents config: try TOML first, fall back to code defaults
|
|
||||||
# (CloudEvents envelope format is protocol-level, not operator-configurable)
|
|
||||||
cloudevents_config = None
|
|
||||||
if settings.config_source == "toml":
|
|
||||||
try:
|
|
||||||
toml_config = load_config(str(settings.config_toml_path))
|
|
||||||
cloudevents_config = toml_config
|
|
||||||
except Exception:
|
|
||||||
pass # Will use defaults from cloudevents_constants
|
|
||||||
|
|
||||||
supervisor = Supervisor(
|
supervisor = Supervisor(
|
||||||
config_source=config_source,
|
config_source=config_source,
|
||||||
nats_url=settings.nats_url,
|
nats_url=settings.nats_url,
|
||||||
cloudevents_config=cloudevents_config,
|
# CloudEvents uses protocol-level defaults from cloudevents_constants
|
||||||
|
cloudevents_config=None,
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"CloudEvents config: %s",
|
"CloudEvents config: defaults",
|
||||||
"TOML" if cloudevents_config else "defaults",
|
extra={"cloudevents_source": "defaults"},
|
||||||
extra={"cloudevents_source": "toml" if cloudevents_config else "defaults"},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,7 @@
|
||||||
"""Tests for configuration source abstraction."""
|
"""Tests for configuration source abstraction."""
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import base64
|
import base64
|
||||||
import os
|
import os
|
||||||
from datetime import datetime, timezone
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import asyncpg
|
import asyncpg
|
||||||
|
|
@ -12,11 +10,8 @@ import pytest_asyncio
|
||||||
|
|
||||||
from central.config_source import (
|
from central.config_source import (
|
||||||
ConfigSource,
|
ConfigSource,
|
||||||
TomlConfigSource,
|
|
||||||
DbConfigSource,
|
DbConfigSource,
|
||||||
create_config_source,
|
|
||||||
)
|
)
|
||||||
from central.config_store import ConfigStore
|
|
||||||
from central.crypto import KEY_SIZE, clear_key_cache
|
from central.crypto import KEY_SIZE, clear_key_cache
|
||||||
|
|
||||||
# Test database DSN
|
# Test database DSN
|
||||||
|
|
@ -43,93 +38,6 @@ def setup_master_key(master_key_path: Path, monkeypatch: pytest.MonkeyPatch) ->
|
||||||
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
|
monkeypatch.setenv("CENTRAL_MASTER_KEY_PATH", str(master_key_path))
|
||||||
|
|
||||||
|
|
||||||
class TestTomlConfigSource:
|
|
||||||
"""Tests for TOML-based config source."""
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def toml_file(self, tmp_path: Path) -> Path:
|
|
||||||
"""Create a test TOML config file."""
|
|
||||||
toml_content = """
|
|
||||||
[adapters.nws]
|
|
||||||
enabled = true
|
|
||||||
cadence_s = 60
|
|
||||||
states = ["ID", "MT"]
|
|
||||||
contact_email = "test@example.com"
|
|
||||||
|
|
||||||
[adapters.disabled_adapter]
|
|
||||||
enabled = false
|
|
||||||
cadence_s = 300
|
|
||||||
states = []
|
|
||||||
contact_email = "test@example.com"
|
|
||||||
|
|
||||||
[cloudevents]
|
|
||||||
type_prefix = "central"
|
|
||||||
source = "central.local"
|
|
||||||
schema_version = "1.0"
|
|
||||||
|
|
||||||
[nats]
|
|
||||||
url = "nats://localhost:4222"
|
|
||||||
|
|
||||||
[postgres]
|
|
||||||
dsn = "postgresql://user:pass@localhost/db"
|
|
||||||
"""
|
|
||||||
path = tmp_path / "central.toml"
|
|
||||||
path.write_text(toml_content)
|
|
||||||
return path
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_list_enabled_adapters(self, toml_file: Path) -> None:
|
|
||||||
"""list_enabled_adapters returns only enabled adapters."""
|
|
||||||
source = TomlConfigSource(toml_file)
|
|
||||||
adapters = await source.list_enabled_adapters()
|
|
||||||
|
|
||||||
assert len(adapters) == 1
|
|
||||||
assert adapters[0].name == "nws"
|
|
||||||
assert adapters[0].enabled is True
|
|
||||||
assert adapters[0].cadence_s == 60
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_adapter(self, toml_file: Path) -> None:
|
|
||||||
"""get_adapter returns correct adapter config."""
|
|
||||||
source = TomlConfigSource(toml_file)
|
|
||||||
|
|
||||||
adapter = await source.get_adapter("nws")
|
|
||||||
assert adapter is not None
|
|
||||||
assert adapter.name == "nws"
|
|
||||||
assert adapter.settings["states"] == ["ID", "MT"]
|
|
||||||
assert adapter.settings["contact_email"] == "test@example.com"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_nonexistent_adapter(self, toml_file: Path) -> None:
|
|
||||||
"""get_adapter returns None for nonexistent adapter."""
|
|
||||||
source = TomlConfigSource(toml_file)
|
|
||||||
adapter = await source.get_adapter("does_not_exist")
|
|
||||||
assert adapter is None
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_watch_for_changes_returns_immediately(self, toml_file: Path) -> None:
|
|
||||||
"""watch_for_changes is a no-op for TOML source."""
|
|
||||||
source = TomlConfigSource(toml_file)
|
|
||||||
callback_called = False
|
|
||||||
|
|
||||||
async def callback(table: str, key: str) -> None:
|
|
||||||
nonlocal callback_called
|
|
||||||
callback_called = True
|
|
||||||
|
|
||||||
# Should return immediately without blocking
|
|
||||||
await asyncio.wait_for(
|
|
||||||
source.watch_for_changes(callback),
|
|
||||||
timeout=1.0,
|
|
||||||
)
|
|
||||||
assert not callback_called
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_implements_protocol(self, toml_file: Path) -> None:
|
|
||||||
"""TomlConfigSource implements ConfigSource protocol."""
|
|
||||||
source = TomlConfigSource(toml_file)
|
|
||||||
assert isinstance(source, ConfigSource)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
@pytest_asyncio.fixture
|
||||||
async def db_conn() -> asyncpg.Connection:
|
async def db_conn() -> asyncpg.Connection:
|
||||||
"""Get a direct database connection for setup/teardown."""
|
"""Get a direct database connection for setup/teardown."""
|
||||||
|
|
@ -222,64 +130,3 @@ class TestDbConfigSource:
|
||||||
async def test_implements_protocol(self, db_source: DbConfigSource) -> None:
|
async def test_implements_protocol(self, db_source: DbConfigSource) -> None:
|
||||||
"""DbConfigSource implements ConfigSource protocol."""
|
"""DbConfigSource implements ConfigSource protocol."""
|
||||||
assert isinstance(db_source, ConfigSource)
|
assert isinstance(db_source, ConfigSource)
|
||||||
|
|
||||||
|
|
||||||
class TestCreateConfigSource:
|
|
||||||
"""Tests for the config source factory function."""
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def toml_file(self, tmp_path: Path) -> Path:
|
|
||||||
"""Create a minimal TOML config file."""
|
|
||||||
toml_content = """
|
|
||||||
[adapters.nws]
|
|
||||||
enabled = true
|
|
||||||
cadence_s = 60
|
|
||||||
states = []
|
|
||||||
contact_email = "test@example.com"
|
|
||||||
|
|
||||||
[cloudevents]
|
|
||||||
[nats]
|
|
||||||
[postgres]
|
|
||||||
dsn = "postgresql://test@localhost/test"
|
|
||||||
"""
|
|
||||||
path = tmp_path / "central.toml"
|
|
||||||
path.write_text(toml_content)
|
|
||||||
return path
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_create_toml_source(self, toml_file: Path) -> None:
|
|
||||||
"""create_config_source returns TomlConfigSource for 'toml' type."""
|
|
||||||
source = await create_config_source(
|
|
||||||
source_type="toml",
|
|
||||||
toml_path=toml_file,
|
|
||||||
)
|
|
||||||
assert isinstance(source, TomlConfigSource)
|
|
||||||
await source.close()
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_create_db_source(self, clean_config_schema: None) -> None:
|
|
||||||
"""create_config_source returns DbConfigSource for 'db' type."""
|
|
||||||
source = await create_config_source(
|
|
||||||
source_type="db",
|
|
||||||
dsn=TEST_DB_DSN,
|
|
||||||
)
|
|
||||||
assert isinstance(source, DbConfigSource)
|
|
||||||
await source.close()
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_create_toml_requires_path(self) -> None:
|
|
||||||
"""create_config_source raises for 'toml' without path."""
|
|
||||||
with pytest.raises(ValueError, match="toml_path required"):
|
|
||||||
await create_config_source(source_type="toml")
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_create_db_requires_dsn(self) -> None:
|
|
||||||
"""create_config_source raises for 'db' without dsn."""
|
|
||||||
with pytest.raises(ValueError, match="dsn required"):
|
|
||||||
await create_config_source(source_type="db")
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_create_unknown_type_raises(self) -> None:
|
|
||||||
"""create_config_source raises for unknown type."""
|
|
||||||
with pytest.raises(ValueError, match="Unknown config source type"):
|
|
||||||
await create_config_source(source_type="unknown")
|
|
||||||
|
|
|
||||||
|
|
@ -355,40 +355,3 @@ class TestRateLimitGuarantee:
|
||||||
f"Multiple NOTIFYs should not cause extra polls."
|
f"Multiple NOTIFYs should not cause extra polls."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestBootstrapConfigFlag:
|
|
||||||
"""Tests for CENTRAL_CONFIG_SOURCE bootstrap flag."""
|
|
||||||
|
|
||||||
def test_default_is_toml(self) -> None:
|
|
||||||
"""Default config_source is 'toml'."""
|
|
||||||
from central.bootstrap_config import Settings
|
|
||||||
|
|
||||||
# Create settings with minimal required fields
|
|
||||||
settings = Settings(
|
|
||||||
db_dsn="postgresql://test@localhost/test",
|
|
||||||
_env_file=None,
|
|
||||||
)
|
|
||||||
assert settings.config_source == "toml"
|
|
||||||
|
|
||||||
def test_accepts_db(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
|
||||||
"""config_source accepts 'db' value."""
|
|
||||||
from central.bootstrap_config import Settings, get_settings
|
|
||||||
|
|
||||||
get_settings.cache_clear()
|
|
||||||
monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "db")
|
|
||||||
monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test")
|
|
||||||
|
|
||||||
settings = get_settings()
|
|
||||||
assert settings.config_source == "db"
|
|
||||||
|
|
||||||
def test_rejects_invalid(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
|
||||||
"""config_source rejects invalid values."""
|
|
||||||
from pydantic import ValidationError
|
|
||||||
from central.bootstrap_config import Settings, get_settings
|
|
||||||
|
|
||||||
get_settings.cache_clear()
|
|
||||||
monkeypatch.setenv("CENTRAL_CONFIG_SOURCE", "invalid")
|
|
||||||
monkeypatch.setenv("CENTRAL_DB_DSN", "postgresql://test@localhost/test")
|
|
||||||
|
|
||||||
with pytest.raises(ValidationError):
|
|
||||||
get_settings()
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue