mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
v0.9.18: reconcile schema_migrations drift + add --check drift detection
Migrations 025-029 (wzdx + traffic-family adapters/streams) were applied
out-of-band via direct psql during their deploys and never recorded in
schema_migrations. Result: a fresh restore would replay them, and an audit of
the tracking table understated what was actually live. All five are pure
additive INSERT ... ON CONFLICT DO NOTHING (verified idempotent by inspection),
so they were back-filled directly into schema_migrations (5 rows, applied_at =
now() = the reconciliation event; the original out-of-band apply dates are
unrecoverable and are documented as such rather than guessed).
Adds `central-migrate --check` to catch this class of drift going forward:
- find_drift(): pure function comparing schema_migrations rows vs *.sql files,
returning (untracked, orphan). Unit-tested, no DB dependency.
- check_drift(): CI-assertion form -- exit 1 if any file is untracked, else 0.
- log_drift(): WARNs per drifted entry, called on every migrate run too.
No migration 031 for the v0.9.17 wzdx state default: that default lives in
adapter code (_read_states falls back to _DEFAULT_STATES), so the live row's
explicit 7-state array is behaviorally identical to a fresh-install null.
Materializing it into SQL would freeze a snapshot of _DEFAULT_STATES and create
a new drift vector. Deferred to a future "show effective defaults" UI PR.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d41c418276
commit
ce66ff9361
2 changed files with 139 additions and 3 deletions
|
|
@ -8,15 +8,19 @@ plain SQL files in `sql/migrations/` named with numeric prefixes:
|
|||
|
||||
Usage:
|
||||
central-migrate [--dry-run]
|
||||
central-migrate --check # report drift; exit 1 if any file is untracked
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import asyncpg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MIGRATIONS_DIR = Path(__file__).parent.parent.parent / "sql" / "migrations"
|
||||
|
||||
|
||||
|
|
@ -52,6 +56,47 @@ def discover_migrations(migrations_dir: Path) -> list[tuple[str, Path]]:
|
|||
return migrations
|
||||
|
||||
|
||||
def find_drift(
|
||||
applied: set[str], discovered: list[tuple[str, Path]]
|
||||
) -> tuple[list[str], list[str]]:
|
||||
"""Compare schema_migrations rows against migration files on disk.
|
||||
|
||||
Returns (untracked, orphan):
|
||||
- untracked: migration files present on disk with NO schema_migrations row.
|
||||
This is the v0.9.18 failure mode -- migrations applied out-of-band (direct
|
||||
psql) that were never recorded, so a fresh restore would try to replay
|
||||
them and an audit of the tracking table understates what is live.
|
||||
- orphan: schema_migrations versions with NO matching .sql file (e.g. a
|
||||
migration file removed from the repo).
|
||||
|
||||
Pure function (no I/O) so the drift logic is unit-testable without a database.
|
||||
"""
|
||||
disk_versions = {v for v, _ in discovered}
|
||||
untracked = sorted(v for v in disk_versions if v not in applied)
|
||||
orphan = sorted(v for v in applied if v not in disk_versions)
|
||||
return untracked, orphan
|
||||
|
||||
|
||||
def log_drift(untracked: list[str], orphan: list[str]) -> None:
|
||||
"""Emit a WARN per drifted migration so divergence is visible in logs.
|
||||
|
||||
Cheap insurance against drift compounding silently (v0.9.18): any file not
|
||||
recorded in schema_migrations, or any row with no file, is surfaced on every
|
||||
central-migrate run.
|
||||
"""
|
||||
for version in untracked:
|
||||
logger.warning(
|
||||
"migration file %s is not recorded in schema_migrations "
|
||||
"(pending apply, or applied out-of-band -- investigate)",
|
||||
version,
|
||||
)
|
||||
for version in orphan:
|
||||
logger.warning(
|
||||
"schema_migrations records %s but no matching .sql file exists on disk",
|
||||
version,
|
||||
)
|
||||
|
||||
|
||||
async def apply_migration(
|
||||
conn: asyncpg.Connection, version: str, sql_path: Path, dry_run: bool = False
|
||||
) -> None:
|
||||
|
|
@ -80,9 +125,12 @@ async def run_migrations(dsn: str, dry_run: bool = False) -> int:
|
|||
try:
|
||||
await ensure_migrations_table(conn)
|
||||
applied = await get_applied_migrations(conn)
|
||||
pending = [
|
||||
(v, p) for v, p in discover_migrations(MIGRATIONS_DIR) if v not in applied
|
||||
]
|
||||
discovered = discover_migrations(MIGRATIONS_DIR)
|
||||
|
||||
untracked, orphan = find_drift(applied, discovered)
|
||||
log_drift(untracked, orphan)
|
||||
|
||||
pending = [(v, p) for v, p in discovered if v not in applied]
|
||||
|
||||
if not pending:
|
||||
print("No pending migrations.")
|
||||
|
|
@ -97,19 +145,57 @@ async def run_migrations(dsn: str, dry_run: bool = False) -> int:
|
|||
await conn.close()
|
||||
|
||||
|
||||
async def check_drift(dsn: str) -> int:
|
||||
"""Report migration drift; return 1 if any file is untracked, else 0.
|
||||
|
||||
The CI-assertion form of log_drift: run against a DB expected to be fully
|
||||
reconciled (after a restore, or in CI). A non-zero exit means a migration
|
||||
file was never recorded -- the v0.9.18 drift, made loud and scriptable.
|
||||
"""
|
||||
conn = await asyncpg.connect(dsn)
|
||||
try:
|
||||
await ensure_migrations_table(conn)
|
||||
applied = await get_applied_migrations(conn)
|
||||
untracked, orphan = find_drift(applied, discover_migrations(MIGRATIONS_DIR))
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
for version in untracked:
|
||||
print(f"DRIFT: {version} present on disk but not in schema_migrations")
|
||||
for version in orphan:
|
||||
print(f"DRIFT: {version} in schema_migrations but no .sql file on disk")
|
||||
|
||||
if untracked:
|
||||
print(f"{len(untracked)} untracked migration file(s) -- drift detected.")
|
||||
return 1
|
||||
print("No migration drift detected.")
|
||||
return 0
|
||||
|
||||
|
||||
async def async_main() -> None:
|
||||
"""Async entry point."""
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
parser = argparse.ArgumentParser(description="Run database migrations")
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show what would be applied without executing",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--check",
|
||||
action="store_true",
|
||||
help="Report drift and exit 1 if any migration file is untracked "
|
||||
"(does not apply migrations)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
from central.bootstrap_config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
if args.check:
|
||||
sys.exit(await check_drift(settings.db_dsn))
|
||||
|
||||
count = await run_migrations(settings.db_dsn, dry_run=args.dry_run)
|
||||
|
||||
if count > 0 and not args.dry_run:
|
||||
|
|
|
|||
50
tests/test_migrate.py
Normal file
50
tests/test_migrate.py
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
"""Drift-detection tests for the migration runner (v0.9.18).
|
||||
|
||||
`find_drift` is a pure function so the divergence logic is verified without a
|
||||
database -- the suite runs identically as `zvx` or `central`.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from central.migrate import find_drift
|
||||
|
||||
|
||||
def _discovered(*versions: str) -> list[tuple[str, Path]]:
|
||||
"""Build a discover_migrations-shaped list from version stems."""
|
||||
return [(v, Path(f"sql/migrations/{v}.sql")) for v in versions]
|
||||
|
||||
|
||||
def test_find_drift_untracked_file():
|
||||
"""A migration file on disk with no schema_migrations row is untracked.
|
||||
|
||||
This is the v0.9.18 case: 025-029 existed on disk but were never recorded.
|
||||
"""
|
||||
applied = {"001_a", "002_b"}
|
||||
discovered = _discovered("001_a", "002_b", "003_c")
|
||||
|
||||
untracked, orphan = find_drift(applied, discovered)
|
||||
|
||||
assert untracked == ["003_c"]
|
||||
assert orphan == []
|
||||
|
||||
|
||||
def test_find_drift_orphan_row():
|
||||
"""A schema_migrations row with no matching .sql file is an orphan."""
|
||||
applied = {"001_a", "002_b", "099_removed"}
|
||||
discovered = _discovered("001_a", "002_b")
|
||||
|
||||
untracked, orphan = find_drift(applied, discovered)
|
||||
|
||||
assert untracked == []
|
||||
assert orphan == ["099_removed"]
|
||||
|
||||
|
||||
def test_find_drift_clean():
|
||||
"""No drift when disk and schema_migrations match exactly."""
|
||||
applied = {"001_a", "002_b", "003_c"}
|
||||
discovered = _discovered("001_a", "002_b", "003_c")
|
||||
|
||||
untracked, orphan = find_drift(applied, discovered)
|
||||
|
||||
assert untracked == []
|
||||
assert orphan == []
|
||||
Loading…
Add table
Add a link
Reference in a new issue