diff --git a/docs/migrations.md b/docs/migrations.md index ca53862..ddac19a 100644 --- a/docs/migrations.md +++ b/docs/migrations.md @@ -1,5 +1,11 @@ # Migration policy +## Migrations are the sole source of truth + +The `sql/migrations/` directory contains all schema definitions. There is +no separate schema.sql file; use `pg_dump -s central` to generate a +human-readable snapshot of the current schema when needed. + ## Migrations must be idempotent New migration files (007+) must use guards so they can be safely diff --git a/sql/schema.sql b/sql/schema.sql deleted file mode 100644 index 1bdda55..0000000 --- a/sql/schema.sql +++ /dev/null @@ -1,36 +0,0 @@ --- Central Data Hub schema --- PostgreSQL 16 + TimescaleDB + PostGIS --- NOTE: Migrations in sql/migrations/ are the source of truth. --- This file is for reference and initial setup only. - -CREATE TABLE IF NOT EXISTS events ( - id TEXT NOT NULL, -- CloudEvent id - adapter TEXT NOT NULL -- adapter identity (FK to config.adapters.name) - REFERENCES config.adapters(name) ON DELETE RESTRICT, - category TEXT NOT NULL, -- "wx.alert." - time TIMESTAMPTZ NOT NULL, -- event-time UTC - expires TIMESTAMPTZ, - severity SMALLINT, -- 0..4 or NULL - geom GEOMETRY(Geometry, 4326), -- centroid or bbox as Polygon - regions TEXT[], -- ["US-ID-Ada", ...] - primary_region TEXT, - payload JSONB NOT NULL, -- full Event as JSON - received TIMESTAMPTZ NOT NULL DEFAULT now(), - PRIMARY KEY (id, time) -- composite PK for TimescaleDB -); - -SELECT create_hypertable('events', 'time', if_not_exists => TRUE); - -CREATE INDEX IF NOT EXISTS events_category_time_idx - ON events (category, time DESC); - -CREATE INDEX IF NOT EXISTS events_adapter_received_idx - ON events (adapter, received DESC); - -CREATE INDEX IF NOT EXISTS events_geom_gist - ON events USING GIST (geom); - -CREATE INDEX IF NOT EXISTS events_regions_gin - ON events USING GIN (regions); - --- Dedup on insert via ON CONFLICT (id, time) in the archive consumer. diff --git a/tests/test_events_adapter_column.py b/tests/test_events_adapter_column.py index 1e9a72e..82ab085 100644 --- a/tests/test_events_adapter_column.py +++ b/tests/test_events_adapter_column.py @@ -1,15 +1,38 @@ -"""Tests for events.adapter column migration.""" +"""Tests for events.adapter column migration (011). -import pytest +These tests exercise the actual migration SQL against a test database, +verifying backfill logic, FK constraints, NOT NULL enforcement, and +source column removal. + +Requires CENTRAL_TEST_DB_DSN or uses default central_test database. +""" + +import os from datetime import datetime, timezone +from pathlib import Path + +import asyncpg +import pytest +import pytest_asyncio + from central.models import Event, Geo -class TestEventAdapterField: - """Test Event model adapter field.""" +# Test database DSN - matches test_config_store.py pattern +TEST_DB_DSN = os.environ.get( + "CENTRAL_TEST_DB_DSN", + "postgresql://central_test:testpass@localhost/central_test", +) + +# Path to migration file +MIGRATION_011_PATH = Path(__file__).parent.parent / "sql" / "migrations" / "011_events_add_adapter_column.sql" + + +class TestEventModelAdapterField: + """Test Event model has adapter field (not source).""" def test_event_has_adapter_field(self): - """Event model has adapter field instead of source.""" + """Event model has adapter field.""" event = Event( id="test-1", adapter="nws", @@ -19,10 +42,14 @@ class TestEventAdapterField: data={}, ) assert event.adapter == "nws" - assert not hasattr(event, "source") or "source" not in event.model_fields - def test_event_adapter_values(self): - """Event adapter field accepts valid adapter names.""" + def test_event_model_no_source_field(self): + """Event model does not have source field.""" + assert "source" not in Event.model_fields + assert "adapter" in Event.model_fields + + def test_event_adapter_accepts_all_adapter_names(self): + """Event adapter field accepts all known adapter names.""" for adapter_name in ["nws", "firms", "usgs_quake"]: event = Event( id=f"test-{adapter_name}", @@ -35,46 +62,244 @@ class TestEventAdapterField: assert event.adapter == adapter_name -class TestAdapterColumnMigration: - """Tests for migration behavior (run against test DB).""" - - @pytest.fixture - def db_connection(self): - """Skip if no test DB available.""" - pytest.skip("Requires test database - verified manually in CT104 verification") - - def test_backfill_transforms_source_to_adapter(self, db_connection): - """REPLACE(source, 'central/adapters/', '') produces correct adapter values.""" - # This logic is tested via SQL: - # REPLACE('central/adapters/nws', 'central/adapters/', '') = 'nws' - assert "central/adapters/nws".replace("central/adapters/", "") == "nws" - assert "central/adapters/firms".replace("central/adapters/", "") == "firms" - assert "central/adapters/usgs_quake".replace("central/adapters/", "") == "usgs_quake" - - def test_fk_restrict_behavior(self, db_connection): - """FK constraint prevents deleting adapter with existing events.""" - # Verified manually: DELETE FROM config.adapters WHERE name = 'nws' - # raises foreign key violation error - pytest.skip("Verified manually in CT104 verification step 10") +@pytest_asyncio.fixture +async def db_conn() -> asyncpg.Connection: + """Get a database connection for migration tests.""" + conn = await asyncpg.connect(TEST_DB_DSN) + yield conn + await conn.close() -class TestSourceColumnRemoval: - """Test that source column is removed post-migration.""" +@pytest_asyncio.fixture +async def pre_migration_events_table(db_conn: asyncpg.Connection) -> None: + """Create events table with pre-migration schema (source column, no adapter). - def test_event_model_no_source_field(self): - """Event model does not have source field.""" - assert "source" not in Event.model_fields - assert "adapter" in Event.model_fields + Also ensures config.adapters exists with test adapters. + """ + # Ensure config schema and adapters table exist + await db_conn.execute("CREATE SCHEMA IF NOT EXISTS config") + await db_conn.execute(""" + CREATE TABLE IF NOT EXISTS config.adapters ( + name TEXT PRIMARY KEY, + enabled BOOLEAN NOT NULL DEFAULT true, + cadence_s INTEGER NOT NULL, + settings JSONB NOT NULL DEFAULT '{}'::jsonb, + paused_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + ) + """) - def test_source_column_dropped(self): - """Source column should not exist in events table post-migration.""" - # Verified via: \d public.events - no source column present - # See CT104 verification step 9 - pass # Schema verification done via psql + # Insert test adapters (idempotent) + await db_conn.execute(""" + INSERT INTO config.adapters (name, cadence_s) + VALUES ('nws', 60), ('firms', 300), ('usgs_quake', 60) + ON CONFLICT (name) DO NOTHING + """) + + # Drop events table if exists (clean slate) + await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE") + + # Create events table with PRE-MIGRATION schema (has source, no adapter) + await db_conn.execute(""" + CREATE TABLE public.events ( + id TEXT NOT NULL, + source TEXT NOT NULL, + category TEXT NOT NULL, + time TIMESTAMPTZ NOT NULL, + expires TIMESTAMPTZ, + severity SMALLINT, + geom GEOMETRY(Geometry, 4326), + regions TEXT[], + primary_region TEXT, + payload JSONB NOT NULL, + received TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (id, time) + ) + """) + + # Insert test rows with different source values + test_rows = [ + ("event-nws-1", "central/adapters/nws", "wx.alert.tornado_warning"), + ("event-nws-2", "central/adapters/nws", "wx.alert.flood_warning"), + ("event-firms-1", "central/adapters/firms", "fire.hotspot"), + ("event-usgs-1", "central/adapters/usgs_quake", "seismic.earthquake"), + ] + + for event_id, source, category in test_rows: + await db_conn.execute(""" + INSERT INTO public.events (id, source, category, time, payload) + VALUES ($1, $2, $3, $4, $5) + """, event_id, source, category, datetime.now(timezone.utc), "{}") + + yield + + # Cleanup + await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE") -# No smoke tests - all assertions are differentiating: -# - test_event_has_adapter_field: verifies model field rename -# - test_event_adapter_values: verifies valid adapter names accepted -# - test_backfill_transforms_source_to_adapter: verifies string transformation -# - test_event_model_no_source_field: verifies source field removed from model +@pytest_asyncio.fixture +async def run_migration_011( + db_conn: asyncpg.Connection, + pre_migration_events_table: None, +) -> None: + """Run migration 011 against the pre-migration events table.""" + migration_sql = MIGRATION_011_PATH.read_text() + await db_conn.execute(migration_sql) + yield + + +class TestMigration011Backfill: + """Test migration 011 backfill logic.""" + + @pytest.mark.asyncio + async def test_backfill_nws_source_to_adapter( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Backfill converts 'central/adapters/nws' to 'nws'.""" + rows = await db_conn.fetch( + "SELECT id, adapter FROM public.events WHERE id LIKE 'event-nws-%'" + ) + assert len(rows) == 2 + for row in rows: + assert row["adapter"] == "nws" + + @pytest.mark.asyncio + async def test_backfill_firms_source_to_adapter( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Backfill converts 'central/adapters/firms' to 'firms'.""" + row = await db_conn.fetchrow( + "SELECT adapter FROM public.events WHERE id = 'event-firms-1'" + ) + assert row["adapter"] == "firms" + + @pytest.mark.asyncio + async def test_backfill_usgs_quake_source_to_adapter( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Backfill converts 'central/adapters/usgs_quake' to 'usgs_quake'.""" + row = await db_conn.fetchrow( + "SELECT adapter FROM public.events WHERE id = 'event-usgs-1'" + ) + assert row["adapter"] == "usgs_quake" + + @pytest.mark.asyncio + async def test_backfill_all_rows_have_adapter( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """All rows have non-NULL adapter after backfill.""" + count = await db_conn.fetchval( + "SELECT COUNT(*) FROM public.events WHERE adapter IS NULL" + ) + assert count == 0 + + +class TestMigration011Constraints: + """Test migration 011 constraint enforcement.""" + + @pytest.mark.asyncio + async def test_fk_restrict_prevents_adapter_deletion( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """FK RESTRICT prevents deleting adapter with existing events.""" + with pytest.raises(asyncpg.ForeignKeyViolationError): + await db_conn.execute( + "DELETE FROM config.adapters WHERE name = 'nws'" + ) + + @pytest.mark.asyncio + async def test_not_null_rejects_null_adapter( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """NOT NULL constraint rejects NULL adapter on insert.""" + with pytest.raises(asyncpg.NotNullViolationError): + await db_conn.execute(""" + INSERT INTO public.events (id, adapter, category, time, payload) + VALUES ('null-test', NULL, 'test', now(), '{}') + """) + + @pytest.mark.asyncio + async def test_fk_rejects_unknown_adapter( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """FK constraint rejects unknown adapter values.""" + with pytest.raises(asyncpg.ForeignKeyViolationError): + await db_conn.execute(""" + INSERT INTO public.events (id, adapter, category, time, payload) + VALUES ('bad-adapter', 'nonexistent_adapter', 'test', now(), '{}') + """) + + +class TestMigration011SchemaChanges: + """Test migration 011 schema changes.""" + + @pytest.mark.asyncio + async def test_source_column_dropped( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Source column no longer exists after migration.""" + row = await db_conn.fetchrow(""" + SELECT COUNT(*) as count + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'events' + AND column_name = 'source' + """) + assert row["count"] == 0 + + @pytest.mark.asyncio + async def test_adapter_column_exists( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Adapter column exists after migration.""" + row = await db_conn.fetchrow(""" + SELECT COUNT(*) as count + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'events' + AND column_name = 'adapter' + """) + assert row["count"] == 1 + + @pytest.mark.asyncio + async def test_adapter_column_is_not_null( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Adapter column has NOT NULL constraint.""" + row = await db_conn.fetchrow(""" + SELECT is_nullable + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'events' + AND column_name = 'adapter' + """) + assert row["is_nullable"] == "NO" + + @pytest.mark.asyncio + async def test_adapter_fk_constraint_exists( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """FK constraint events_adapter_fkey exists.""" + row = await db_conn.fetchrow(""" + SELECT COUNT(*) as count + FROM information_schema.table_constraints + WHERE table_schema = 'public' + AND table_name = 'events' + AND constraint_name = 'events_adapter_fkey' + AND constraint_type = 'FOREIGN KEY' + """) + assert row["count"] == 1 + + @pytest.mark.asyncio + async def test_adapter_received_index_exists( + self, db_conn: asyncpg.Connection, run_migration_011: None + ): + """Index events_adapter_received_idx exists.""" + row = await db_conn.fetchrow(""" + SELECT COUNT(*) as count + FROM pg_indexes + WHERE schemaname = 'public' + AND tablename = 'events' + AND indexname = 'events_adapter_received_idx' + """) + assert row["count"] == 1