feat(config): add migration framework and config schema

Add simple SQL migration runner tracking applied migrations in
schema_migrations table. First migration creates:

- config schema
- config.adapters table (name, enabled, cadence_s, settings JSONB)
- config.api_keys table (alias, encrypted_value BYTEA)
- NOTIFY triggers for real-time config change detection
- Seeds NWS adapter row from current TOML config

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-05-15 23:07:49 +00:00
commit a9b7dcab62
2 changed files with 189 additions and 0 deletions

View file

@ -0,0 +1,64 @@
-- Migration: 001_create_config_schema
-- Creates the config schema with adapters and api_keys tables.
-- Also seeds the NWS adapter row from current TOML config.
-- Create config schema
CREATE SCHEMA config;
-- Adapters configuration table
CREATE TABLE config.adapters (
name TEXT PRIMARY KEY,
enabled BOOLEAN NOT NULL DEFAULT true,
cadence_s INTEGER NOT NULL,
settings JSONB NOT NULL DEFAULT '{}'::jsonb,
paused_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- API keys table (encrypted values)
CREATE TABLE config.api_keys (
alias TEXT PRIMARY KEY,
encrypted_value BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
rotated_at TIMESTAMPTZ,
last_used_at TIMESTAMPTZ
);
-- Notify function for config changes
CREATE OR REPLACE FUNCTION config.notify_config_change()
RETURNS trigger AS $$
DECLARE
key_value TEXT;
BEGIN
-- Handle different table structures
IF TG_TABLE_NAME = 'adapters' THEN
key_value := COALESCE(NEW.name, OLD.name, '');
ELSIF TG_TABLE_NAME = 'api_keys' THEN
key_value := COALESCE(NEW.alias, OLD.alias, '');
ELSE
key_value := '';
END IF;
PERFORM pg_notify('config_changed', TG_TABLE_NAME || ':' || key_value);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- Trigger for adapters table
CREATE TRIGGER adapters_notify
AFTER INSERT OR UPDATE OR DELETE ON config.adapters
FOR EACH ROW EXECUTE FUNCTION config.notify_config_change();
-- Trigger for api_keys table
CREATE TRIGGER api_keys_notify
AFTER INSERT OR UPDATE OR DELETE ON config.api_keys
FOR EACH ROW EXECUTE FUNCTION config.notify_config_change();
-- Seed NWS adapter from current TOML config values
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'nws',
true,
60,
'{"states": ["ID", "OR", "WA", "MT", "WY", "UT", "NV"], "contact_email": "mj@k7zvx.com"}'::jsonb
);

125
src/central/migrate.py Normal file
View file

@ -0,0 +1,125 @@
"""Simple database migration runner.
Tracks applied migrations in a `schema_migrations` table. Migrations are
plain SQL files in `sql/migrations/` named with numeric prefixes:
001_create_config_schema.sql
002_add_operators_table.sql
...
Usage:
central-migrate [--dry-run]
"""
import argparse
import asyncio
import sys
from pathlib import Path
import asyncpg
MIGRATIONS_DIR = Path(__file__).parent.parent.parent / "sql" / "migrations"
async def ensure_migrations_table(conn: asyncpg.Connection) -> None:
"""Create the schema_migrations table if it doesn't exist."""
await conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
""")
async def get_applied_migrations(conn: asyncpg.Connection) -> set[str]:
"""Return set of already-applied migration versions."""
rows = await conn.fetch("SELECT version FROM schema_migrations")
return {row["version"] for row in rows}
def discover_migrations(migrations_dir: Path) -> list[tuple[str, Path]]:
"""Find all .sql files in migrations directory, sorted by name.
Returns list of (version, path) tuples where version is the filename
without extension.
"""
if not migrations_dir.exists():
return []
migrations = []
for f in sorted(migrations_dir.glob("*.sql")):
version = f.stem # e.g., "001_create_config_schema"
migrations.append((version, f))
return migrations
async def apply_migration(
conn: asyncpg.Connection, version: str, sql_path: Path, dry_run: bool = False
) -> None:
"""Apply a single migration."""
sql = sql_path.read_text()
if dry_run:
print(f"[DRY RUN] Would apply: {version}")
print(f" SQL: {sql[:200]}..." if len(sql) > 200 else f" SQL: {sql}")
return
async with conn.transaction():
await conn.execute(sql)
await conn.execute(
"INSERT INTO schema_migrations (version) VALUES ($1)", version
)
print(f"Applied: {version}")
async def run_migrations(dsn: str, dry_run: bool = False) -> int:
"""Run all pending migrations.
Returns number of migrations applied.
"""
conn = await asyncpg.connect(dsn)
try:
await ensure_migrations_table(conn)
applied = await get_applied_migrations(conn)
pending = [
(v, p) for v, p in discover_migrations(MIGRATIONS_DIR) if v not in applied
]
if not pending:
print("No pending migrations.")
return 0
print(f"Found {len(pending)} pending migration(s).")
for version, path in pending:
await apply_migration(conn, version, path, dry_run)
return len(pending)
finally:
await conn.close()
async def async_main() -> None:
"""Async entry point."""
parser = argparse.ArgumentParser(description="Run database migrations")
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be applied without executing",
)
args = parser.parse_args()
from central.bootstrap_config import get_settings
settings = get_settings()
count = await run_migrations(settings.db_dsn, dry_run=args.dry_run)
if count > 0 and not args.dry_run:
print(f"Successfully applied {count} migration(s).")
def main() -> None:
"""Entry point."""
asyncio.run(async_main())
if __name__ == "__main__":
main()