mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
fix(tests): replace stub tests with real DB migration tests
- Replace pytest.skip stubs with actual DB tests against central_test - Test backfill for all three adapters (nws, firms, usgs_quake) - Test FK RESTRICT, NOT NULL, and FK validation constraints - Test schema changes (source dropped, adapter exists with constraints) - Delete stale sql/schema.sql (migrations are sole source of truth) - Update docs/migrations.md with schema.sql removal note Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
8601a19f60
commit
98e9d95810
3 changed files with 273 additions and 78 deletions
|
|
@ -1,5 +1,11 @@
|
|||
# Migration policy
|
||||
|
||||
## Migrations are the sole source of truth
|
||||
|
||||
The `sql/migrations/` directory contains all schema definitions. There is
|
||||
no separate schema.sql file; use `pg_dump -s central` to generate a
|
||||
human-readable snapshot of the current schema when needed.
|
||||
|
||||
## Migrations must be idempotent
|
||||
|
||||
New migration files (007+) must use guards so they can be safely
|
||||
|
|
|
|||
|
|
@ -1,36 +0,0 @@
|
|||
-- Central Data Hub schema
|
||||
-- PostgreSQL 16 + TimescaleDB + PostGIS
|
||||
-- NOTE: Migrations in sql/migrations/ are the source of truth.
|
||||
-- This file is for reference and initial setup only.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id TEXT NOT NULL, -- CloudEvent id
|
||||
adapter TEXT NOT NULL -- adapter identity (FK to config.adapters.name)
|
||||
REFERENCES config.adapters(name) ON DELETE RESTRICT,
|
||||
category TEXT NOT NULL, -- "wx.alert.<type>"
|
||||
time TIMESTAMPTZ NOT NULL, -- event-time UTC
|
||||
expires TIMESTAMPTZ,
|
||||
severity SMALLINT, -- 0..4 or NULL
|
||||
geom GEOMETRY(Geometry, 4326), -- centroid or bbox as Polygon
|
||||
regions TEXT[], -- ["US-ID-Ada", ...]
|
||||
primary_region TEXT,
|
||||
payload JSONB NOT NULL, -- full Event as JSON
|
||||
received TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (id, time) -- composite PK for TimescaleDB
|
||||
);
|
||||
|
||||
SELECT create_hypertable('events', 'time', if_not_exists => TRUE);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS events_category_time_idx
|
||||
ON events (category, time DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS events_adapter_received_idx
|
||||
ON events (adapter, received DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS events_geom_gist
|
||||
ON events USING GIST (geom);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS events_regions_gin
|
||||
ON events USING GIN (regions);
|
||||
|
||||
-- Dedup on insert via ON CONFLICT (id, time) in the archive consumer.
|
||||
|
|
@ -1,15 +1,38 @@
|
|||
"""Tests for events.adapter column migration."""
|
||||
"""Tests for events.adapter column migration (011).
|
||||
|
||||
import pytest
|
||||
These tests exercise the actual migration SQL against a test database,
|
||||
verifying backfill logic, FK constraints, NOT NULL enforcement, and
|
||||
source column removal.
|
||||
|
||||
Requires CENTRAL_TEST_DB_DSN or uses default central_test database.
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from central.models import Event, Geo
|
||||
|
||||
|
||||
class TestEventAdapterField:
|
||||
"""Test Event model adapter field."""
|
||||
# Test database DSN - matches test_config_store.py pattern
|
||||
TEST_DB_DSN = os.environ.get(
|
||||
"CENTRAL_TEST_DB_DSN",
|
||||
"postgresql://central_test:testpass@localhost/central_test",
|
||||
)
|
||||
|
||||
# Path to migration file
|
||||
MIGRATION_011_PATH = Path(__file__).parent.parent / "sql" / "migrations" / "011_events_add_adapter_column.sql"
|
||||
|
||||
|
||||
class TestEventModelAdapterField:
|
||||
"""Test Event model has adapter field (not source)."""
|
||||
|
||||
def test_event_has_adapter_field(self):
|
||||
"""Event model has adapter field instead of source."""
|
||||
"""Event model has adapter field."""
|
||||
event = Event(
|
||||
id="test-1",
|
||||
adapter="nws",
|
||||
|
|
@ -19,10 +42,14 @@ class TestEventAdapterField:
|
|||
data={},
|
||||
)
|
||||
assert event.adapter == "nws"
|
||||
assert not hasattr(event, "source") or "source" not in event.model_fields
|
||||
|
||||
def test_event_adapter_values(self):
|
||||
"""Event adapter field accepts valid adapter names."""
|
||||
def test_event_model_no_source_field(self):
|
||||
"""Event model does not have source field."""
|
||||
assert "source" not in Event.model_fields
|
||||
assert "adapter" in Event.model_fields
|
||||
|
||||
def test_event_adapter_accepts_all_adapter_names(self):
|
||||
"""Event adapter field accepts all known adapter names."""
|
||||
for adapter_name in ["nws", "firms", "usgs_quake"]:
|
||||
event = Event(
|
||||
id=f"test-{adapter_name}",
|
||||
|
|
@ -35,46 +62,244 @@ class TestEventAdapterField:
|
|||
assert event.adapter == adapter_name
|
||||
|
||||
|
||||
class TestAdapterColumnMigration:
|
||||
"""Tests for migration behavior (run against test DB)."""
|
||||
|
||||
@pytest.fixture
|
||||
def db_connection(self):
|
||||
"""Skip if no test DB available."""
|
||||
pytest.skip("Requires test database - verified manually in CT104 verification")
|
||||
|
||||
def test_backfill_transforms_source_to_adapter(self, db_connection):
|
||||
"""REPLACE(source, 'central/adapters/', '') produces correct adapter values."""
|
||||
# This logic is tested via SQL:
|
||||
# REPLACE('central/adapters/nws', 'central/adapters/', '') = 'nws'
|
||||
assert "central/adapters/nws".replace("central/adapters/", "") == "nws"
|
||||
assert "central/adapters/firms".replace("central/adapters/", "") == "firms"
|
||||
assert "central/adapters/usgs_quake".replace("central/adapters/", "") == "usgs_quake"
|
||||
|
||||
def test_fk_restrict_behavior(self, db_connection):
|
||||
"""FK constraint prevents deleting adapter with existing events."""
|
||||
# Verified manually: DELETE FROM config.adapters WHERE name = 'nws'
|
||||
# raises foreign key violation error
|
||||
pytest.skip("Verified manually in CT104 verification step 10")
|
||||
@pytest_asyncio.fixture
|
||||
async def db_conn() -> asyncpg.Connection:
|
||||
"""Get a database connection for migration tests."""
|
||||
conn = await asyncpg.connect(TEST_DB_DSN)
|
||||
yield conn
|
||||
await conn.close()
|
||||
|
||||
|
||||
class TestSourceColumnRemoval:
|
||||
"""Test that source column is removed post-migration."""
|
||||
@pytest_asyncio.fixture
|
||||
async def pre_migration_events_table(db_conn: asyncpg.Connection) -> None:
|
||||
"""Create events table with pre-migration schema (source column, no adapter).
|
||||
|
||||
def test_event_model_no_source_field(self):
|
||||
"""Event model does not have source field."""
|
||||
assert "source" not in Event.model_fields
|
||||
assert "adapter" in Event.model_fields
|
||||
Also ensures config.adapters exists with test adapters.
|
||||
"""
|
||||
# Ensure config schema and adapters table exist
|
||||
await db_conn.execute("CREATE SCHEMA IF NOT EXISTS config")
|
||||
await db_conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS config.adapters (
|
||||
name TEXT PRIMARY KEY,
|
||||
enabled BOOLEAN NOT NULL DEFAULT true,
|
||||
cadence_s INTEGER NOT NULL,
|
||||
settings JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
paused_at TIMESTAMPTZ,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)
|
||||
""")
|
||||
|
||||
def test_source_column_dropped(self):
|
||||
"""Source column should not exist in events table post-migration."""
|
||||
# Verified via: \d public.events - no source column present
|
||||
# See CT104 verification step 9
|
||||
pass # Schema verification done via psql
|
||||
# Insert test adapters (idempotent)
|
||||
await db_conn.execute("""
|
||||
INSERT INTO config.adapters (name, cadence_s)
|
||||
VALUES ('nws', 60), ('firms', 300), ('usgs_quake', 60)
|
||||
ON CONFLICT (name) DO NOTHING
|
||||
""")
|
||||
|
||||
# Drop events table if exists (clean slate)
|
||||
await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE")
|
||||
|
||||
# Create events table with PRE-MIGRATION schema (has source, no adapter)
|
||||
await db_conn.execute("""
|
||||
CREATE TABLE public.events (
|
||||
id TEXT NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
time TIMESTAMPTZ NOT NULL,
|
||||
expires TIMESTAMPTZ,
|
||||
severity SMALLINT,
|
||||
geom GEOMETRY(Geometry, 4326),
|
||||
regions TEXT[],
|
||||
primary_region TEXT,
|
||||
payload JSONB NOT NULL,
|
||||
received TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (id, time)
|
||||
)
|
||||
""")
|
||||
|
||||
# Insert test rows with different source values
|
||||
test_rows = [
|
||||
("event-nws-1", "central/adapters/nws", "wx.alert.tornado_warning"),
|
||||
("event-nws-2", "central/adapters/nws", "wx.alert.flood_warning"),
|
||||
("event-firms-1", "central/adapters/firms", "fire.hotspot"),
|
||||
("event-usgs-1", "central/adapters/usgs_quake", "seismic.earthquake"),
|
||||
]
|
||||
|
||||
for event_id, source, category in test_rows:
|
||||
await db_conn.execute("""
|
||||
INSERT INTO public.events (id, source, category, time, payload)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
""", event_id, source, category, datetime.now(timezone.utc), "{}")
|
||||
|
||||
yield
|
||||
|
||||
# Cleanup
|
||||
await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE")
|
||||
|
||||
|
||||
# No smoke tests - all assertions are differentiating:
|
||||
# - test_event_has_adapter_field: verifies model field rename
|
||||
# - test_event_adapter_values: verifies valid adapter names accepted
|
||||
# - test_backfill_transforms_source_to_adapter: verifies string transformation
|
||||
# - test_event_model_no_source_field: verifies source field removed from model
|
||||
@pytest_asyncio.fixture
|
||||
async def run_migration_011(
|
||||
db_conn: asyncpg.Connection,
|
||||
pre_migration_events_table: None,
|
||||
) -> None:
|
||||
"""Run migration 011 against the pre-migration events table."""
|
||||
migration_sql = MIGRATION_011_PATH.read_text()
|
||||
await db_conn.execute(migration_sql)
|
||||
yield
|
||||
|
||||
|
||||
class TestMigration011Backfill:
|
||||
"""Test migration 011 backfill logic."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_nws_source_to_adapter(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Backfill converts 'central/adapters/nws' to 'nws'."""
|
||||
rows = await db_conn.fetch(
|
||||
"SELECT id, adapter FROM public.events WHERE id LIKE 'event-nws-%'"
|
||||
)
|
||||
assert len(rows) == 2
|
||||
for row in rows:
|
||||
assert row["adapter"] == "nws"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_firms_source_to_adapter(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Backfill converts 'central/adapters/firms' to 'firms'."""
|
||||
row = await db_conn.fetchrow(
|
||||
"SELECT adapter FROM public.events WHERE id = 'event-firms-1'"
|
||||
)
|
||||
assert row["adapter"] == "firms"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_usgs_quake_source_to_adapter(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Backfill converts 'central/adapters/usgs_quake' to 'usgs_quake'."""
|
||||
row = await db_conn.fetchrow(
|
||||
"SELECT adapter FROM public.events WHERE id = 'event-usgs-1'"
|
||||
)
|
||||
assert row["adapter"] == "usgs_quake"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_all_rows_have_adapter(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""All rows have non-NULL adapter after backfill."""
|
||||
count = await db_conn.fetchval(
|
||||
"SELECT COUNT(*) FROM public.events WHERE adapter IS NULL"
|
||||
)
|
||||
assert count == 0
|
||||
|
||||
|
||||
class TestMigration011Constraints:
|
||||
"""Test migration 011 constraint enforcement."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fk_restrict_prevents_adapter_deletion(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""FK RESTRICT prevents deleting adapter with existing events."""
|
||||
with pytest.raises(asyncpg.ForeignKeyViolationError):
|
||||
await db_conn.execute(
|
||||
"DELETE FROM config.adapters WHERE name = 'nws'"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_not_null_rejects_null_adapter(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""NOT NULL constraint rejects NULL adapter on insert."""
|
||||
with pytest.raises(asyncpg.NotNullViolationError):
|
||||
await db_conn.execute("""
|
||||
INSERT INTO public.events (id, adapter, category, time, payload)
|
||||
VALUES ('null-test', NULL, 'test', now(), '{}')
|
||||
""")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fk_rejects_unknown_adapter(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""FK constraint rejects unknown adapter values."""
|
||||
with pytest.raises(asyncpg.ForeignKeyViolationError):
|
||||
await db_conn.execute("""
|
||||
INSERT INTO public.events (id, adapter, category, time, payload)
|
||||
VALUES ('bad-adapter', 'nonexistent_adapter', 'test', now(), '{}')
|
||||
""")
|
||||
|
||||
|
||||
class TestMigration011SchemaChanges:
|
||||
"""Test migration 011 schema changes."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_source_column_dropped(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Source column no longer exists after migration."""
|
||||
row = await db_conn.fetchrow("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'events'
|
||||
AND column_name = 'source'
|
||||
""")
|
||||
assert row["count"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_adapter_column_exists(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Adapter column exists after migration."""
|
||||
row = await db_conn.fetchrow("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'events'
|
||||
AND column_name = 'adapter'
|
||||
""")
|
||||
assert row["count"] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_adapter_column_is_not_null(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Adapter column has NOT NULL constraint."""
|
||||
row = await db_conn.fetchrow("""
|
||||
SELECT is_nullable
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'events'
|
||||
AND column_name = 'adapter'
|
||||
""")
|
||||
assert row["is_nullable"] == "NO"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_adapter_fk_constraint_exists(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""FK constraint events_adapter_fkey exists."""
|
||||
row = await db_conn.fetchrow("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM information_schema.table_constraints
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'events'
|
||||
AND constraint_name = 'events_adapter_fkey'
|
||||
AND constraint_type = 'FOREIGN KEY'
|
||||
""")
|
||||
assert row["count"] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_adapter_received_index_exists(
|
||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||
):
|
||||
"""Index events_adapter_received_idx exists."""
|
||||
row = await db_conn.fetchrow("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM pg_indexes
|
||||
WHERE schemaname = 'public'
|
||||
AND tablename = 'events'
|
||||
AND indexname = 'events_adapter_received_idx'
|
||||
""")
|
||||
assert row["count"] == 1
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue