mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-22 18:44:40 +02:00
Merge pull request #13 from zvx-echo6/feature/1b-3a-events-adapter-column
feat(schema): add adapter column to events, drop source
This commit is contained in:
commit
71c73b4eb1
11 changed files with 374 additions and 47 deletions
|
|
@ -1,5 +1,11 @@
|
||||||
# Migration policy
|
# Migration policy
|
||||||
|
|
||||||
|
## Migrations are the sole source of truth
|
||||||
|
|
||||||
|
The `sql/migrations/` directory contains all schema definitions. There is
|
||||||
|
no separate schema.sql file; use `pg_dump -s central` to generate a
|
||||||
|
human-readable snapshot of the current schema when needed.
|
||||||
|
|
||||||
## Migrations must be idempotent
|
## Migrations must be idempotent
|
||||||
|
|
||||||
New migration files (007+) must use guards so they can be safely
|
New migration files (007+) must use guards so they can be safely
|
||||||
|
|
|
||||||
46
sql/migrations/011_events_add_adapter_column.sql
Normal file
46
sql/migrations/011_events_add_adapter_column.sql
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
-- Migration 011: Add adapter column to events, drop source column
|
||||||
|
-- Replaces module-path-based source with stable adapter identifier
|
||||||
|
|
||||||
|
-- Add adapter column (idempotent)
|
||||||
|
ALTER TABLE public.events ADD COLUMN IF NOT EXISTS adapter TEXT;
|
||||||
|
|
||||||
|
-- Backfill from existing source values
|
||||||
|
UPDATE public.events
|
||||||
|
SET adapter = REPLACE(source, 'central/adapters/', '')
|
||||||
|
WHERE adapter IS NULL AND source IS NOT NULL;
|
||||||
|
|
||||||
|
-- Make NOT NULL after backfill (idempotent check)
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (
|
||||||
|
SELECT 1 FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'events'
|
||||||
|
AND column_name = 'adapter'
|
||||||
|
AND is_nullable = 'YES'
|
||||||
|
) THEN
|
||||||
|
ALTER TABLE public.events ALTER COLUMN adapter SET NOT NULL;
|
||||||
|
END IF;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
-- Add FK constraint (idempotent check)
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (
|
||||||
|
SELECT 1 FROM information_schema.table_constraints
|
||||||
|
WHERE constraint_name = 'events_adapter_fkey'
|
||||||
|
AND table_name = 'events'
|
||||||
|
) THEN
|
||||||
|
ALTER TABLE public.events
|
||||||
|
ADD CONSTRAINT events_adapter_fkey
|
||||||
|
FOREIGN KEY (adapter) REFERENCES config.adapters(name)
|
||||||
|
ON DELETE RESTRICT;
|
||||||
|
END IF;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
-- Add index for dashboard queries (idempotent)
|
||||||
|
CREATE INDEX IF NOT EXISTS events_adapter_received_idx
|
||||||
|
ON public.events (adapter, received DESC);
|
||||||
|
|
||||||
|
-- Drop deprecated source column (idempotent)
|
||||||
|
ALTER TABLE public.events DROP COLUMN IF EXISTS source;
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
-- Central Data Hub schema
|
|
||||||
-- PostgreSQL 16 + TimescaleDB + PostGIS
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS events (
|
|
||||||
id TEXT NOT NULL, -- CloudEvent id
|
|
||||||
source TEXT NOT NULL, -- adapter identity
|
|
||||||
category TEXT NOT NULL, -- "wx.alert.<type>"
|
|
||||||
time TIMESTAMPTZ NOT NULL, -- event-time UTC
|
|
||||||
expires TIMESTAMPTZ,
|
|
||||||
severity SMALLINT, -- 0..4 or NULL
|
|
||||||
geom GEOMETRY(Geometry, 4326), -- centroid or bbox as Polygon
|
|
||||||
regions TEXT[], -- ["US-ID-Ada", ...]
|
|
||||||
primary_region TEXT,
|
|
||||||
payload JSONB NOT NULL, -- full Event as JSON
|
|
||||||
received TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
||||||
PRIMARY KEY (id, time) -- composite PK for TimescaleDB
|
|
||||||
);
|
|
||||||
|
|
||||||
SELECT create_hypertable('events', 'time', if_not_exists => TRUE);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS events_category_time_idx
|
|
||||||
ON events (category, time DESC);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS events_geom_gist
|
|
||||||
ON events USING GIST (geom);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS events_regions_gin
|
|
||||||
ON events USING GIN (regions);
|
|
||||||
|
|
||||||
-- Dedup on insert via ON CONFLICT (id, time) in the archive consumer.
|
|
||||||
|
|
@ -330,7 +330,7 @@ class FIRMSAdapter(SourceAdapter):
|
||||||
|
|
||||||
return Event(
|
return Event(
|
||||||
id=stable_id,
|
id=stable_id,
|
||||||
source="central/adapters/firms",
|
adapter="firms",
|
||||||
category=f"fire.hotspot.{satellite_short}.{confidence}",
|
category=f"fire.hotspot.{satellite_short}.{confidence}",
|
||||||
time=time,
|
time=time,
|
||||||
expires=None,
|
expires=None,
|
||||||
|
|
|
||||||
|
|
@ -475,7 +475,7 @@ class NWSAdapter(SourceAdapter):
|
||||||
|
|
||||||
return Event(
|
return Event(
|
||||||
id=event_id,
|
id=event_id,
|
||||||
source="central/adapters/nws",
|
adapter="nws",
|
||||||
category=category,
|
category=category,
|
||||||
time=time,
|
time=time,
|
||||||
expires=expires,
|
expires=expires,
|
||||||
|
|
|
||||||
|
|
@ -348,7 +348,7 @@ class USGSQuakeAdapter(SourceAdapter):
|
||||||
|
|
||||||
return Event(
|
return Event(
|
||||||
id=event_id,
|
id=event_id,
|
||||||
source="central/adapters/usgs_quake",
|
adapter="usgs_quake",
|
||||||
category=f"quake.event.{tier}",
|
category=f"quake.event.{tier}",
|
||||||
time=event_time,
|
time=event_time,
|
||||||
expires=None,
|
expires=None,
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ class ArchiveConsumer:
|
||||||
geo_data = event_data.get("geo")
|
geo_data = event_data.get("geo")
|
||||||
|
|
||||||
event_id = envelope.get("id")
|
event_id = envelope.get("id")
|
||||||
source = event_data.get("source", "")
|
adapter = event_data.get("adapter", "")
|
||||||
category = event_data.get("category", "")
|
category = event_data.get("category", "")
|
||||||
time_str = event_data.get("time")
|
time_str = event_data.get("time")
|
||||||
expires_str = event_data.get("expires")
|
expires_str = event_data.get("expires")
|
||||||
|
|
@ -194,12 +194,12 @@ class ArchiveConsumer:
|
||||||
if geom_json:
|
if geom_json:
|
||||||
await conn.execute(
|
await conn.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO events (id, source, category, time, expires, severity,
|
INSERT INTO events (id, adapter, category, time, expires, severity,
|
||||||
geom, regions, primary_region, payload)
|
geom, regions, primary_region, payload)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6,
|
VALUES ($1, $2, $3, $4, $5, $6,
|
||||||
ST_GeomFromGeoJSON($7), $8, $9, $10)
|
ST_GeomFromGeoJSON($7), $8, $9, $10)
|
||||||
ON CONFLICT (id, time) DO UPDATE SET
|
ON CONFLICT (id, time) DO UPDATE SET
|
||||||
source = EXCLUDED.source,
|
adapter = EXCLUDED.adapter,
|
||||||
category = EXCLUDED.category,
|
category = EXCLUDED.category,
|
||||||
expires = EXCLUDED.expires,
|
expires = EXCLUDED.expires,
|
||||||
severity = EXCLUDED.severity,
|
severity = EXCLUDED.severity,
|
||||||
|
|
@ -208,17 +208,17 @@ class ArchiveConsumer:
|
||||||
primary_region = EXCLUDED.primary_region,
|
primary_region = EXCLUDED.primary_region,
|
||||||
payload = EXCLUDED.payload
|
payload = EXCLUDED.payload
|
||||||
""",
|
""",
|
||||||
event_id, source, category, event_time, expires_time, severity,
|
event_id, adapter, category, event_time, expires_time, severity,
|
||||||
geom_json, regions, primary_region, json.dumps(envelope)
|
geom_json, regions, primary_region, json.dumps(envelope)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await conn.execute(
|
await conn.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO events (id, source, category, time, expires, severity,
|
INSERT INTO events (id, adapter, category, time, expires, severity,
|
||||||
geom, regions, primary_region, payload)
|
geom, regions, primary_region, payload)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, NULL, $7, $8, $9)
|
VALUES ($1, $2, $3, $4, $5, $6, NULL, $7, $8, $9)
|
||||||
ON CONFLICT (id, time) DO UPDATE SET
|
ON CONFLICT (id, time) DO UPDATE SET
|
||||||
source = EXCLUDED.source,
|
adapter = EXCLUDED.adapter,
|
||||||
category = EXCLUDED.category,
|
category = EXCLUDED.category,
|
||||||
expires = EXCLUDED.expires,
|
expires = EXCLUDED.expires,
|
||||||
severity = EXCLUDED.severity,
|
severity = EXCLUDED.severity,
|
||||||
|
|
@ -227,7 +227,7 @@ class ArchiveConsumer:
|
||||||
primary_region = EXCLUDED.primary_region,
|
primary_region = EXCLUDED.primary_region,
|
||||||
payload = EXCLUDED.payload
|
payload = EXCLUDED.payload
|
||||||
""",
|
""",
|
||||||
event_id, source, category, event_time, expires_time, severity,
|
event_id, adapter, category, event_time, expires_time, severity,
|
||||||
regions, primary_region, json.dumps(envelope)
|
regions, primary_region, json.dumps(envelope)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class Event(BaseModel):
|
||||||
model_config = ConfigDict(extra="forbid", frozen=True)
|
model_config = ConfigDict(extra="forbid", frozen=True)
|
||||||
|
|
||||||
id: str # unique, stable across republish
|
id: str # unique, stable across republish
|
||||||
source: str # adapter identity, e.g. "central/adapters/nws"
|
adapter: str # adapter identity, e.g. "nws"
|
||||||
category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
|
category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
|
||||||
time: datetime # event-time UTC, not processing-time
|
time: datetime # event-time UTC, not processing-time
|
||||||
expires: datetime | None = None
|
expires: datetime | None = None
|
||||||
|
|
|
||||||
305
tests/test_events_adapter_column.py
Normal file
305
tests/test_events_adapter_column.py
Normal file
|
|
@ -0,0 +1,305 @@
|
||||||
|
"""Tests for events.adapter column migration (011).
|
||||||
|
|
||||||
|
These tests exercise the actual migration SQL against a test database,
|
||||||
|
verifying backfill logic, FK constraints, NOT NULL enforcement, and
|
||||||
|
source column removal.
|
||||||
|
|
||||||
|
Requires CENTRAL_TEST_DB_DSN or uses default central_test database.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import asyncpg
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
|
||||||
|
from central.models import Event, Geo
|
||||||
|
|
||||||
|
|
||||||
|
# Test database DSN - matches test_config_store.py pattern
|
||||||
|
TEST_DB_DSN = os.environ.get(
|
||||||
|
"CENTRAL_TEST_DB_DSN",
|
||||||
|
"postgresql://central_test:testpass@localhost/central_test",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Path to migration file
|
||||||
|
MIGRATION_011_PATH = Path(__file__).parent.parent / "sql" / "migrations" / "011_events_add_adapter_column.sql"
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventModelAdapterField:
|
||||||
|
"""Test Event model has adapter field (not source)."""
|
||||||
|
|
||||||
|
def test_event_has_adapter_field(self):
|
||||||
|
"""Event model has adapter field."""
|
||||||
|
event = Event(
|
||||||
|
id="test-1",
|
||||||
|
adapter="nws",
|
||||||
|
category="wx.alert.test",
|
||||||
|
time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
|
geo=Geo(),
|
||||||
|
data={},
|
||||||
|
)
|
||||||
|
assert event.adapter == "nws"
|
||||||
|
|
||||||
|
def test_event_model_no_source_field(self):
|
||||||
|
"""Event model does not have source field."""
|
||||||
|
assert "source" not in Event.model_fields
|
||||||
|
assert "adapter" in Event.model_fields
|
||||||
|
|
||||||
|
def test_event_adapter_accepts_all_adapter_names(self):
|
||||||
|
"""Event adapter field accepts all known adapter names."""
|
||||||
|
for adapter_name in ["nws", "firms", "usgs_quake"]:
|
||||||
|
event = Event(
|
||||||
|
id=f"test-{adapter_name}",
|
||||||
|
adapter=adapter_name,
|
||||||
|
category="test.category",
|
||||||
|
time=datetime.now(timezone.utc),
|
||||||
|
geo=Geo(),
|
||||||
|
data={},
|
||||||
|
)
|
||||||
|
assert event.adapter == adapter_name
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def db_conn() -> asyncpg.Connection:
|
||||||
|
"""Get a database connection for migration tests."""
|
||||||
|
conn = await asyncpg.connect(TEST_DB_DSN)
|
||||||
|
yield conn
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def pre_migration_events_table(db_conn: asyncpg.Connection) -> None:
|
||||||
|
"""Create events table with pre-migration schema (source column, no adapter).
|
||||||
|
|
||||||
|
Also ensures config.adapters exists with test adapters.
|
||||||
|
"""
|
||||||
|
# Ensure config schema and adapters table exist
|
||||||
|
await db_conn.execute("CREATE SCHEMA IF NOT EXISTS config")
|
||||||
|
await db_conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS config.adapters (
|
||||||
|
name TEXT PRIMARY KEY,
|
||||||
|
enabled BOOLEAN NOT NULL DEFAULT true,
|
||||||
|
cadence_s INTEGER NOT NULL,
|
||||||
|
settings JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||||
|
paused_at TIMESTAMPTZ,
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Insert test adapters (idempotent)
|
||||||
|
await db_conn.execute("""
|
||||||
|
INSERT INTO config.adapters (name, cadence_s)
|
||||||
|
VALUES ('nws', 60), ('firms', 300), ('usgs_quake', 60)
|
||||||
|
ON CONFLICT (name) DO NOTHING
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Drop events table if exists (clean slate)
|
||||||
|
await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE")
|
||||||
|
|
||||||
|
# Create events table with PRE-MIGRATION schema (has source, no adapter)
|
||||||
|
# Note: geom column omitted since test DB lacks PostGIS extension
|
||||||
|
await db_conn.execute("""
|
||||||
|
CREATE TABLE public.events (
|
||||||
|
id TEXT NOT NULL,
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
category TEXT NOT NULL,
|
||||||
|
time TIMESTAMPTZ NOT NULL,
|
||||||
|
expires TIMESTAMPTZ,
|
||||||
|
severity SMALLINT,
|
||||||
|
regions TEXT[],
|
||||||
|
primary_region TEXT,
|
||||||
|
payload JSONB NOT NULL,
|
||||||
|
received TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
PRIMARY KEY (id, time)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Insert test rows with different source values
|
||||||
|
test_rows = [
|
||||||
|
("event-nws-1", "central/adapters/nws", "wx.alert.tornado_warning"),
|
||||||
|
("event-nws-2", "central/adapters/nws", "wx.alert.flood_warning"),
|
||||||
|
("event-firms-1", "central/adapters/firms", "fire.hotspot"),
|
||||||
|
("event-usgs-1", "central/adapters/usgs_quake", "seismic.earthquake"),
|
||||||
|
]
|
||||||
|
|
||||||
|
for event_id, source, category in test_rows:
|
||||||
|
await db_conn.execute("""
|
||||||
|
INSERT INTO public.events (id, source, category, time, payload)
|
||||||
|
VALUES ($1, $2, $3, $4, $5)
|
||||||
|
""", event_id, source, category, datetime.now(timezone.utc), "{}")
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def run_migration_011(
|
||||||
|
db_conn: asyncpg.Connection,
|
||||||
|
pre_migration_events_table: None,
|
||||||
|
) -> None:
|
||||||
|
"""Run migration 011 against the pre-migration events table."""
|
||||||
|
migration_sql = MIGRATION_011_PATH.read_text()
|
||||||
|
await db_conn.execute(migration_sql)
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
class TestMigration011Backfill:
|
||||||
|
"""Test migration 011 backfill logic."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_backfill_nws_source_to_adapter(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Backfill converts 'central/adapters/nws' to 'nws'."""
|
||||||
|
rows = await db_conn.fetch(
|
||||||
|
"SELECT id, adapter FROM public.events WHERE id LIKE 'event-nws-%'"
|
||||||
|
)
|
||||||
|
assert len(rows) == 2
|
||||||
|
for row in rows:
|
||||||
|
assert row["adapter"] == "nws"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_backfill_firms_source_to_adapter(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Backfill converts 'central/adapters/firms' to 'firms'."""
|
||||||
|
row = await db_conn.fetchrow(
|
||||||
|
"SELECT adapter FROM public.events WHERE id = 'event-firms-1'"
|
||||||
|
)
|
||||||
|
assert row["adapter"] == "firms"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_backfill_usgs_quake_source_to_adapter(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Backfill converts 'central/adapters/usgs_quake' to 'usgs_quake'."""
|
||||||
|
row = await db_conn.fetchrow(
|
||||||
|
"SELECT adapter FROM public.events WHERE id = 'event-usgs-1'"
|
||||||
|
)
|
||||||
|
assert row["adapter"] == "usgs_quake"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_backfill_all_rows_have_adapter(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""All rows have non-NULL adapter after backfill."""
|
||||||
|
count = await db_conn.fetchval(
|
||||||
|
"SELECT COUNT(*) FROM public.events WHERE adapter IS NULL"
|
||||||
|
)
|
||||||
|
assert count == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestMigration011Constraints:
|
||||||
|
"""Test migration 011 constraint enforcement."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fk_restrict_prevents_adapter_deletion(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""FK RESTRICT prevents deleting adapter with existing events."""
|
||||||
|
with pytest.raises(asyncpg.ForeignKeyViolationError):
|
||||||
|
await db_conn.execute(
|
||||||
|
"DELETE FROM config.adapters WHERE name = 'nws'"
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_not_null_rejects_null_adapter(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""NOT NULL constraint rejects NULL adapter on insert."""
|
||||||
|
with pytest.raises(asyncpg.NotNullViolationError):
|
||||||
|
await db_conn.execute("""
|
||||||
|
INSERT INTO public.events (id, adapter, category, time, payload)
|
||||||
|
VALUES ('null-test', NULL, 'test', now(), '{}')
|
||||||
|
""")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fk_rejects_unknown_adapter(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""FK constraint rejects unknown adapter values."""
|
||||||
|
with pytest.raises(asyncpg.ForeignKeyViolationError):
|
||||||
|
await db_conn.execute("""
|
||||||
|
INSERT INTO public.events (id, adapter, category, time, payload)
|
||||||
|
VALUES ('bad-adapter', 'nonexistent_adapter', 'test', now(), '{}')
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
class TestMigration011SchemaChanges:
|
||||||
|
"""Test migration 011 schema changes."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_source_column_dropped(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Source column no longer exists after migration."""
|
||||||
|
row = await db_conn.fetchrow("""
|
||||||
|
SELECT COUNT(*) as count
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'events'
|
||||||
|
AND column_name = 'source'
|
||||||
|
""")
|
||||||
|
assert row["count"] == 0
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adapter_column_exists(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Adapter column exists after migration."""
|
||||||
|
row = await db_conn.fetchrow("""
|
||||||
|
SELECT COUNT(*) as count
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'events'
|
||||||
|
AND column_name = 'adapter'
|
||||||
|
""")
|
||||||
|
assert row["count"] == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adapter_column_is_not_null(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Adapter column has NOT NULL constraint."""
|
||||||
|
row = await db_conn.fetchrow("""
|
||||||
|
SELECT is_nullable
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'events'
|
||||||
|
AND column_name = 'adapter'
|
||||||
|
""")
|
||||||
|
assert row["is_nullable"] == "NO"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adapter_fk_constraint_exists(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""FK constraint events_adapter_fkey exists."""
|
||||||
|
row = await db_conn.fetchrow("""
|
||||||
|
SELECT COUNT(*) as count
|
||||||
|
FROM information_schema.table_constraints
|
||||||
|
WHERE table_schema = 'public'
|
||||||
|
AND table_name = 'events'
|
||||||
|
AND constraint_name = 'events_adapter_fkey'
|
||||||
|
AND constraint_type = 'FOREIGN KEY'
|
||||||
|
""")
|
||||||
|
assert row["count"] == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adapter_received_index_exists(
|
||||||
|
self, db_conn: asyncpg.Connection, run_migration_011: None
|
||||||
|
):
|
||||||
|
"""Index events_adapter_received_idx exists."""
|
||||||
|
row = await db_conn.fetchrow("""
|
||||||
|
SELECT COUNT(*) as count
|
||||||
|
FROM pg_indexes
|
||||||
|
WHERE schemaname = 'public'
|
||||||
|
AND tablename = 'events'
|
||||||
|
AND indexname = 'events_adapter_received_idx'
|
||||||
|
""")
|
||||||
|
assert row["count"] == 1
|
||||||
|
|
@ -288,7 +288,7 @@ class TestSubjectGeneration:
|
||||||
def test_subject_format(self):
|
def test_subject_format(self):
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test",
|
id="test",
|
||||||
source="central/adapters/firms",
|
adapter="firms",
|
||||||
category="fire.hotspot.viirs_snpp.high",
|
category="fire.hotspot.viirs_snpp.high",
|
||||||
time=datetime.now(timezone.utc),
|
time=datetime.now(timezone.utc),
|
||||||
severity=3,
|
severity=3,
|
||||||
|
|
@ -302,7 +302,7 @@ class TestSubjectGeneration:
|
||||||
def test_subject_nominal_confidence(self):
|
def test_subject_nominal_confidence(self):
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test",
|
id="test",
|
||||||
source="central/adapters/firms",
|
adapter="firms",
|
||||||
category="fire.hotspot.viirs_noaa20.nominal",
|
category="fire.hotspot.viirs_noaa20.nominal",
|
||||||
time=datetime.now(timezone.utc),
|
time=datetime.now(timezone.utc),
|
||||||
severity=2,
|
severity=2,
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ def sample_event(sample_geo: Geo) -> Event:
|
||||||
"""Sample Event object for testing."""
|
"""Sample Event object for testing."""
|
||||||
return Event(
|
return Event(
|
||||||
id="urn:central:nws:alert:KBOI-202401151200-SVR",
|
id="urn:central:nws:alert:KBOI-202401151200-SVR",
|
||||||
source="central/adapters/nws",
|
adapter="nws",
|
||||||
category="wx.alert.severe_thunderstorm_warning",
|
category="wx.alert.severe_thunderstorm_warning",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
expires=datetime(2024, 1, 15, 13, 0, 0, tzinfo=timezone.utc),
|
expires=datetime(2024, 1, 15, 13, 0, 0, tzinfo=timezone.utc),
|
||||||
|
|
@ -75,7 +75,7 @@ class TestSubjectForEvent:
|
||||||
)
|
)
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test-zone",
|
id="test-zone",
|
||||||
source="test",
|
adapter="nws",
|
||||||
category="wx.alert.winter_storm_warning",
|
category="wx.alert.winter_storm_warning",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
geo=geo,
|
geo=geo,
|
||||||
|
|
@ -89,7 +89,7 @@ class TestSubjectForEvent:
|
||||||
geo = Geo(regions=[], primary_region=None)
|
geo = Geo(regions=[], primary_region=None)
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test-unknown",
|
id="test-unknown",
|
||||||
source="test",
|
adapter="nws",
|
||||||
category="wx.alert.test",
|
category="wx.alert.test",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
geo=geo,
|
geo=geo,
|
||||||
|
|
@ -144,7 +144,7 @@ class TestCloudEventsWire:
|
||||||
"""When severity is None, centralseverity is omitted entirely."""
|
"""When severity is None, centralseverity is omitted entirely."""
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test-no-severity",
|
id="test-no-severity",
|
||||||
source="test",
|
adapter="nws",
|
||||||
category="wx.alert.test",
|
category="wx.alert.test",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
severity=None, # Explicitly None
|
severity=None, # Explicitly None
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue