mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Compare commits
No commits in common. "4368c83613ea2fb6f0ab5d50a003803cd21e644d" and "4c9ca176a9d706df0478defd237f8946ff44575c" have entirely different histories.
4368c83613
...
4c9ca176a9
30 changed files with 87 additions and 2149 deletions
|
|
@ -15,10 +15,6 @@ Phase 0 — scaffold. Not yet operational.
|
||||||
- One archive consumer process persisting events to TimescaleDB
|
- One archive consumer process persisting events to TimescaleDB
|
||||||
- Both processes systemd-managed
|
- Both processes systemd-managed
|
||||||
|
|
||||||
## Testing
|
|
||||||
|
|
||||||
See [docs/test-database.md](docs/test-database.md) for test database setup.
|
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
MIT. See LICENSE.
|
MIT. See LICENSE.
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,5 @@
|
||||||
# Migration policy
|
# Migration policy
|
||||||
|
|
||||||
## Migrations are the sole source of truth
|
|
||||||
|
|
||||||
The `sql/migrations/` directory contains all schema definitions. There is
|
|
||||||
no separate schema.sql file; use `pg_dump -s central` to generate a
|
|
||||||
human-readable snapshot of the current schema when needed.
|
|
||||||
|
|
||||||
## Migrations must be idempotent
|
## Migrations must be idempotent
|
||||||
|
|
||||||
New migration files (007+) must use guards so they can be safely
|
New migration files (007+) must use guards so they can be safely
|
||||||
|
|
@ -26,24 +20,8 @@ Direct `psql` execution bypasses the `schema_migrations` tracker and
|
||||||
was the cause of the v0.2.0 reconcile. If a migration needs to be
|
was the cause of the v0.2.0 reconcile. If a migration needs to be
|
||||||
applied on the live system, run:
|
applied on the live system, run:
|
||||||
|
|
||||||
sudo -u central /opt/central/.venv/bin/python -m central.migrate
|
sudo -u central /opt/central/.venv/bin/python -m scripts.migrate
|
||||||
|
|
||||||
Never apply migration SQL directly via `psql`, even as a superuser,
|
Never apply migration SQL directly via `psql`, even as a superuser,
|
||||||
even "just to test." If migrate.py has a bug that's blocking you, fix
|
even "just to test." If migrate.py has a bug that's blocking you, fix
|
||||||
migrate.py.
|
migrate.py.
|
||||||
|
|
||||||
## Extensions are not in migrations
|
|
||||||
|
|
||||||
PostgreSQL extensions like PostGIS require superuser privileges to
|
|
||||||
install. The production `central` role is intentionally not a superuser.
|
|
||||||
Therefore, extensions live outside the migration system:
|
|
||||||
|
|
||||||
- **Production bootstrap:** A DBA runs `CREATE EXTENSION postgis` once
|
|
||||||
before the first `migrate.py` run.
|
|
||||||
- **Test database:** The `central_test` role is a superuser, allowing
|
|
||||||
test fixtures to self-bootstrap extensions.
|
|
||||||
|
|
||||||
This is documented in [docs/test-database.md](test-database.md).
|
|
||||||
|
|
||||||
Do not add `CREATE EXTENSION` statements to migrations — they will fail
|
|
||||||
in production where migrations run as the non-superuser `central` role.
|
|
||||||
|
|
|
||||||
|
|
@ -1,83 +0,0 @@
|
||||||
# Test Database Setup
|
|
||||||
|
|
||||||
Central's integration tests require a PostgreSQL database. This document
|
|
||||||
covers one-time setup and maintenance of the test database.
|
|
||||||
|
|
||||||
## DSN Convention
|
|
||||||
|
|
||||||
Tests default to:
|
|
||||||
|
|
||||||
```
|
|
||||||
postgresql://central_test:testpass@localhost/central_test
|
|
||||||
```
|
|
||||||
|
|
||||||
Override via the `CENTRAL_TEST_DB_DSN` environment variable:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
export CENTRAL_TEST_DB_DSN="postgresql://myuser:mypass@localhost/mydb"
|
|
||||||
```
|
|
||||||
|
|
||||||
## One-Time Setup
|
|
||||||
|
|
||||||
Run these commands once on a fresh PostgreSQL installation:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Create the test user (as postgres superuser)
|
|
||||||
sudo -u postgres createuser -s central_test
|
|
||||||
sudo -u postgres psql -c "ALTER USER central_test PASSWORD 'testpass'"
|
|
||||||
|
|
||||||
# Create the test database
|
|
||||||
sudo -u postgres createdb -O central_test central_test
|
|
||||||
|
|
||||||
# Install required extensions
|
|
||||||
sudo -u postgres psql central_test -c "CREATE EXTENSION IF NOT EXISTS postgis"
|
|
||||||
```
|
|
||||||
|
|
||||||
**Note:** The `central_test` role is created as a superuser (`-s` flag).
|
|
||||||
This allows test fixtures to self-bootstrap extensions like PostGIS via
|
|
||||||
`CREATE EXTENSION IF NOT EXISTS`. Production uses a non-superuser role.
|
|
||||||
|
|
||||||
## Required Extensions
|
|
||||||
|
|
||||||
| Extension | Version | Purpose |
|
|
||||||
|-----------|---------|---------|
|
|
||||||
| postgis | 3.4+ | Geometry types for geospatial event data |
|
|
||||||
|
|
||||||
## Why PostGIS Is Not in Migrations
|
|
||||||
|
|
||||||
PostGIS requires superuser privileges to install. The production `central`
|
|
||||||
role is intentionally not a superuser for security reasons. Therefore:
|
|
||||||
|
|
||||||
- **Production:** A DBA must run `CREATE EXTENSION postgis` before the
|
|
||||||
first `migrate.py` run. This is a one-time bootstrap step.
|
|
||||||
- **Test:** The `central_test` role is a superuser, so test fixtures can
|
|
||||||
self-bootstrap PostGIS via `CREATE EXTENSION IF NOT EXISTS`.
|
|
||||||
|
|
||||||
This divergence is documented rather than "fixed" because granting
|
|
||||||
superuser to production roles creates security risk, and the PostgreSQL
|
|
||||||
packaging on Ubuntu does not mark PostGIS as a trusted extension.
|
|
||||||
|
|
||||||
## Resetting the Test Database
|
|
||||||
|
|
||||||
If the test database gets into a bad state:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Drop and recreate
|
|
||||||
sudo -u postgres dropdb central_test
|
|
||||||
sudo -u postgres createdb -O central_test central_test
|
|
||||||
sudo -u postgres psql central_test -c "CREATE EXTENSION IF NOT EXISTS postgis"
|
|
||||||
```
|
|
||||||
|
|
||||||
Test fixtures handle their own table creation and cleanup, so this is
|
|
||||||
rarely needed.
|
|
||||||
|
|
||||||
## Running Tests
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cd /opt/central
|
|
||||||
uv run pytest tests/ # all tests
|
|
||||||
uv run pytest tests/test_config_store.py -v # specific file
|
|
||||||
```
|
|
||||||
|
|
||||||
Tests that require the database will skip gracefully if the connection
|
|
||||||
fails, though most integration tests will fail without a working DB.
|
|
||||||
|
|
@ -1,46 +0,0 @@
|
||||||
-- Migration 011: Add adapter column to events, drop source column
|
|
||||||
-- Replaces module-path-based source with stable adapter identifier
|
|
||||||
|
|
||||||
-- Add adapter column (idempotent)
|
|
||||||
ALTER TABLE public.events ADD COLUMN IF NOT EXISTS adapter TEXT;
|
|
||||||
|
|
||||||
-- Backfill from existing source values
|
|
||||||
UPDATE public.events
|
|
||||||
SET adapter = REPLACE(source, 'central/adapters/', '')
|
|
||||||
WHERE adapter IS NULL AND source IS NOT NULL;
|
|
||||||
|
|
||||||
-- Make NOT NULL after backfill (idempotent check)
|
|
||||||
DO $$
|
|
||||||
BEGIN
|
|
||||||
IF EXISTS (
|
|
||||||
SELECT 1 FROM information_schema.columns
|
|
||||||
WHERE table_schema = 'public'
|
|
||||||
AND table_name = 'events'
|
|
||||||
AND column_name = 'adapter'
|
|
||||||
AND is_nullable = 'YES'
|
|
||||||
) THEN
|
|
||||||
ALTER TABLE public.events ALTER COLUMN adapter SET NOT NULL;
|
|
||||||
END IF;
|
|
||||||
END $$;
|
|
||||||
|
|
||||||
-- Add FK constraint (idempotent check)
|
|
||||||
DO $$
|
|
||||||
BEGIN
|
|
||||||
IF NOT EXISTS (
|
|
||||||
SELECT 1 FROM information_schema.table_constraints
|
|
||||||
WHERE constraint_name = 'events_adapter_fkey'
|
|
||||||
AND table_name = 'events'
|
|
||||||
) THEN
|
|
||||||
ALTER TABLE public.events
|
|
||||||
ADD CONSTRAINT events_adapter_fkey
|
|
||||||
FOREIGN KEY (adapter) REFERENCES config.adapters(name)
|
|
||||||
ON DELETE RESTRICT;
|
|
||||||
END IF;
|
|
||||||
END $$;
|
|
||||||
|
|
||||||
-- Add index for dashboard queries (idempotent)
|
|
||||||
CREATE INDEX IF NOT EXISTS events_adapter_received_idx
|
|
||||||
ON public.events (adapter, received DESC);
|
|
||||||
|
|
||||||
-- Drop deprecated source column (idempotent)
|
|
||||||
ALTER TABLE public.events DROP COLUMN IF EXISTS source;
|
|
||||||
30
sql/schema.sql
Normal file
30
sql/schema.sql
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
-- Central Data Hub schema
|
||||||
|
-- PostgreSQL 16 + TimescaleDB + PostGIS
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS events (
|
||||||
|
id TEXT NOT NULL, -- CloudEvent id
|
||||||
|
source TEXT NOT NULL, -- adapter identity
|
||||||
|
category TEXT NOT NULL, -- "wx.alert.<type>"
|
||||||
|
time TIMESTAMPTZ NOT NULL, -- event-time UTC
|
||||||
|
expires TIMESTAMPTZ,
|
||||||
|
severity SMALLINT, -- 0..4 or NULL
|
||||||
|
geom GEOMETRY(Geometry, 4326), -- centroid or bbox as Polygon
|
||||||
|
regions TEXT[], -- ["US-ID-Ada", ...]
|
||||||
|
primary_region TEXT,
|
||||||
|
payload JSONB NOT NULL, -- full Event as JSON
|
||||||
|
received TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
PRIMARY KEY (id, time) -- composite PK for TimescaleDB
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_hypertable('events', 'time', if_not_exists => TRUE);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS events_category_time_idx
|
||||||
|
ON events (category, time DESC);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS events_geom_gist
|
||||||
|
ON events USING GIST (geom);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS events_regions_gin
|
||||||
|
ON events USING GIN (regions);
|
||||||
|
|
||||||
|
-- Dedup on insert via ON CONFLICT (id, time) in the archive consumer.
|
||||||
|
|
@ -330,7 +330,7 @@ class FIRMSAdapter(SourceAdapter):
|
||||||
|
|
||||||
return Event(
|
return Event(
|
||||||
id=stable_id,
|
id=stable_id,
|
||||||
adapter="firms",
|
source="central/adapters/firms",
|
||||||
category=f"fire.hotspot.{satellite_short}.{confidence}",
|
category=f"fire.hotspot.{satellite_short}.{confidence}",
|
||||||
time=time,
|
time=time,
|
||||||
expires=None,
|
expires=None,
|
||||||
|
|
|
||||||
|
|
@ -475,7 +475,7 @@ class NWSAdapter(SourceAdapter):
|
||||||
|
|
||||||
return Event(
|
return Event(
|
||||||
id=event_id,
|
id=event_id,
|
||||||
adapter="nws",
|
source="central/adapters/nws",
|
||||||
category=category,
|
category=category,
|
||||||
time=time,
|
time=time,
|
||||||
expires=expires,
|
expires=expires,
|
||||||
|
|
|
||||||
|
|
@ -348,7 +348,7 @@ class USGSQuakeAdapter(SourceAdapter):
|
||||||
|
|
||||||
return Event(
|
return Event(
|
||||||
id=event_id,
|
id=event_id,
|
||||||
adapter="usgs_quake",
|
source="central/adapters/usgs_quake",
|
||||||
category=f"quake.event.{tier}",
|
category=f"quake.event.{tier}",
|
||||||
time=event_time,
|
time=event_time,
|
||||||
expires=None,
|
expires=None,
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,4 @@
|
||||||
"""Central archive consumer - JetStream to TimescaleDB.
|
"""Central archive consumer - JetStream to TimescaleDB."""
|
||||||
|
|
||||||
Consumes events from multiple NATS JetStream streams and archives them
|
|
||||||
to TimescaleDB. One durable consumer per stream for independent ack tracking.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
|
@ -16,27 +12,17 @@ import asyncpg
|
||||||
import nats
|
import nats
|
||||||
from nats.js import JetStreamContext
|
from nats.js import JetStreamContext
|
||||||
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
|
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
|
||||||
from nats.js.errors import NotFoundError
|
|
||||||
|
|
||||||
from central.bootstrap_config import get_settings
|
from central.bootstrap_config import get_settings
|
||||||
|
|
||||||
# Event-bearing streams to consume (skip CENTRAL_META - status messages only)
|
CONSUMER_NAME = "archive"
|
||||||
STREAMS = [
|
STREAM_NAME = "CENTRAL_WX"
|
||||||
("CENTRAL_WX", "central.wx.>"),
|
SUBJECT_FILTER = "central.wx.>"
|
||||||
("CENTRAL_FIRE", "central.fire.>"),
|
|
||||||
("CENTRAL_QUAKE", "central.quake.>"),
|
|
||||||
]
|
|
||||||
|
|
||||||
BATCH_SIZE = 100
|
BATCH_SIZE = 100
|
||||||
FETCH_TIMEOUT = 5.0
|
FETCH_TIMEOUT = 5.0
|
||||||
ACK_WAIT = 30
|
ACK_WAIT = 30
|
||||||
|
|
||||||
|
|
||||||
def consumer_name_for(stream: str) -> str:
|
|
||||||
"""Generate consumer name for a stream."""
|
|
||||||
return f"archive-{stream.lower()}"
|
|
||||||
|
|
||||||
|
|
||||||
class JsonFormatter(logging.Formatter):
|
class JsonFormatter(logging.Formatter):
|
||||||
"""JSON log formatter for structured logging."""
|
"""JSON log formatter for structured logging."""
|
||||||
|
|
||||||
|
|
@ -139,49 +125,24 @@ class ArchiveConsumer:
|
||||||
self._js = None
|
self._js = None
|
||||||
logger.info("Disconnected")
|
logger.info("Disconnected")
|
||||||
|
|
||||||
async def _cleanup_orphaned_consumer(self) -> None:
|
async def _ensure_consumer(self) -> None:
|
||||||
"""Remove orphaned 'archive' consumer from CENTRAL_WX if it exists.
|
"""Ensure the durable consumer exists."""
|
||||||
|
|
||||||
The old single-stream code used a consumer named 'archive' on CENTRAL_WX.
|
|
||||||
Now we use 'archive-central_wx' instead. Clean up the old one.
|
|
||||||
"""
|
|
||||||
if not self._js:
|
if not self._js:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._js.consumer_info("CENTRAL_WX", "archive")
|
await self._js.consumer_info(STREAM_NAME, CONSUMER_NAME)
|
||||||
await self._js.delete_consumer("CENTRAL_WX", "archive")
|
logger.info("Consumer exists", extra={"consumer": CONSUMER_NAME})
|
||||||
logger.info("Removed orphaned 'archive' consumer from CENTRAL_WX")
|
except nats.js.errors.NotFoundError:
|
||||||
except NotFoundError:
|
|
||||||
pass # Already gone or never existed
|
|
||||||
|
|
||||||
async def _ensure_consumer(
|
|
||||||
self, stream_name: str, subject_filter: str, consumer_name: str
|
|
||||||
) -> None:
|
|
||||||
"""Ensure the durable consumer exists for a stream."""
|
|
||||||
if not self._js:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await self._js.consumer_info(stream_name, consumer_name)
|
|
||||||
logger.info(
|
|
||||||
"Consumer exists",
|
|
||||||
extra={"stream": stream_name, "consumer": consumer_name}
|
|
||||||
)
|
|
||||||
except NotFoundError:
|
|
||||||
consumer_config = ConsumerConfig(
|
consumer_config = ConsumerConfig(
|
||||||
durable_name=consumer_name,
|
durable_name=CONSUMER_NAME,
|
||||||
deliver_policy=DeliverPolicy.ALL,
|
deliver_policy=DeliverPolicy.ALL,
|
||||||
ack_policy=AckPolicy.EXPLICIT,
|
ack_policy=AckPolicy.EXPLICIT,
|
||||||
ack_wait=ACK_WAIT,
|
ack_wait=ACK_WAIT,
|
||||||
max_deliver=5,
|
filter_subject=SUBJECT_FILTER,
|
||||||
filter_subject=subject_filter,
|
|
||||||
)
|
|
||||||
await self._js.add_consumer(stream_name, consumer_config)
|
|
||||||
logger.info(
|
|
||||||
"Consumer created",
|
|
||||||
extra={"stream": stream_name, "consumer": consumer_name}
|
|
||||||
)
|
)
|
||||||
|
await self._js.add_consumer(STREAM_NAME, consumer_config)
|
||||||
|
logger.info("Consumer created", extra={"consumer": CONSUMER_NAME})
|
||||||
|
|
||||||
async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None:
|
async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None:
|
||||||
"""Process a single message and insert into database."""
|
"""Process a single message and insert into database."""
|
||||||
|
|
@ -196,7 +157,7 @@ class ArchiveConsumer:
|
||||||
geo_data = event_data.get("geo")
|
geo_data = event_data.get("geo")
|
||||||
|
|
||||||
event_id = envelope.get("id")
|
event_id = envelope.get("id")
|
||||||
adapter = event_data.get("adapter", "")
|
source = event_data.get("source", "")
|
||||||
category = event_data.get("category", "")
|
category = event_data.get("category", "")
|
||||||
time_str = event_data.get("time")
|
time_str = event_data.get("time")
|
||||||
expires_str = event_data.get("expires")
|
expires_str = event_data.get("expires")
|
||||||
|
|
@ -233,12 +194,12 @@ class ArchiveConsumer:
|
||||||
if geom_json:
|
if geom_json:
|
||||||
await conn.execute(
|
await conn.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO events (id, adapter, category, time, expires, severity,
|
INSERT INTO events (id, source, category, time, expires, severity,
|
||||||
geom, regions, primary_region, payload)
|
geom, regions, primary_region, payload)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6,
|
VALUES ($1, $2, $3, $4, $5, $6,
|
||||||
ST_GeomFromGeoJSON($7), $8, $9, $10)
|
ST_GeomFromGeoJSON($7), $8, $9, $10)
|
||||||
ON CONFLICT (id, time) DO UPDATE SET
|
ON CONFLICT (id, time) DO UPDATE SET
|
||||||
adapter = EXCLUDED.adapter,
|
source = EXCLUDED.source,
|
||||||
category = EXCLUDED.category,
|
category = EXCLUDED.category,
|
||||||
expires = EXCLUDED.expires,
|
expires = EXCLUDED.expires,
|
||||||
severity = EXCLUDED.severity,
|
severity = EXCLUDED.severity,
|
||||||
|
|
@ -247,17 +208,17 @@ class ArchiveConsumer:
|
||||||
primary_region = EXCLUDED.primary_region,
|
primary_region = EXCLUDED.primary_region,
|
||||||
payload = EXCLUDED.payload
|
payload = EXCLUDED.payload
|
||||||
""",
|
""",
|
||||||
event_id, adapter, category, event_time, expires_time, severity,
|
event_id, source, category, event_time, expires_time, severity,
|
||||||
geom_json, regions, primary_region, json.dumps(envelope)
|
geom_json, regions, primary_region, json.dumps(envelope)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await conn.execute(
|
await conn.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO events (id, adapter, category, time, expires, severity,
|
INSERT INTO events (id, source, category, time, expires, severity,
|
||||||
geom, regions, primary_region, payload)
|
geom, regions, primary_region, payload)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, NULL, $7, $8, $9)
|
VALUES ($1, $2, $3, $4, $5, $6, NULL, $7, $8, $9)
|
||||||
ON CONFLICT (id, time) DO UPDATE SET
|
ON CONFLICT (id, time) DO UPDATE SET
|
||||||
adapter = EXCLUDED.adapter,
|
source = EXCLUDED.source,
|
||||||
category = EXCLUDED.category,
|
category = EXCLUDED.category,
|
||||||
expires = EXCLUDED.expires,
|
expires = EXCLUDED.expires,
|
||||||
severity = EXCLUDED.severity,
|
severity = EXCLUDED.severity,
|
||||||
|
|
@ -266,7 +227,7 @@ class ArchiveConsumer:
|
||||||
primary_region = EXCLUDED.primary_region,
|
primary_region = EXCLUDED.primary_region,
|
||||||
payload = EXCLUDED.payload
|
payload = EXCLUDED.payload
|
||||||
""",
|
""",
|
||||||
event_id, adapter, category, event_time, expires_time, severity,
|
event_id, source, category, event_time, expires_time, severity,
|
||||||
regions, primary_region, json.dumps(envelope)
|
regions, primary_region, json.dumps(envelope)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -280,24 +241,22 @@ class ArchiveConsumer:
|
||||||
)
|
)
|
||||||
# Don't ack - let it be redelivered
|
# Don't ack - let it be redelivered
|
||||||
|
|
||||||
async def _consume_stream(
|
async def _consume_loop(self) -> None:
|
||||||
self, stream_name: str, subject_filter: str, consumer_name: str
|
"""Main consume loop."""
|
||||||
) -> None:
|
|
||||||
"""Consume loop for a single stream."""
|
|
||||||
if not self._js or not self._pool:
|
if not self._js or not self._pool:
|
||||||
return
|
return
|
||||||
|
|
||||||
await self._ensure_consumer(stream_name, subject_filter, consumer_name)
|
await self._ensure_consumer()
|
||||||
|
|
||||||
sub = await self._js.pull_subscribe(
|
sub = await self._js.pull_subscribe(
|
||||||
subject_filter,
|
SUBJECT_FILTER,
|
||||||
durable=consumer_name,
|
durable=CONSUMER_NAME,
|
||||||
stream=stream_name,
|
stream=STREAM_NAME,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Subscribed to stream",
|
"Subscribed to stream",
|
||||||
extra={"stream": stream_name, "filter": subject_filter}
|
extra={"stream": STREAM_NAME, "filter": SUBJECT_FILTER}
|
||||||
)
|
)
|
||||||
|
|
||||||
while not self._shutdown_event.is_set():
|
while not self._shutdown_event.is_set():
|
||||||
|
|
@ -318,62 +277,19 @@ class ArchiveConsumer:
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(
|
logger.exception("Error in consume loop", extra={"error": str(e)})
|
||||||
"Error in consume loop",
|
|
||||||
extra={"stream": stream_name, "error": str(e)}
|
|
||||||
)
|
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
logger.info("Consume loop stopped", extra={"stream": stream_name})
|
logger.info("Consume loop stopped")
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Start the consumer."""
|
"""Start the consumer."""
|
||||||
await self.connect()
|
await self.connect()
|
||||||
await self._cleanup_orphaned_consumer()
|
|
||||||
logger.info("Archive consumer ready")
|
logger.info("Archive consumer ready")
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
"""Run consume loops for all streams until shutdown."""
|
"""Run the consume loop until shutdown."""
|
||||||
tasks = []
|
await self._consume_loop()
|
||||||
for stream_name, subject_filter in STREAMS:
|
|
||||||
consumer_name = consumer_name_for(stream_name)
|
|
||||||
task = asyncio.create_task(
|
|
||||||
self._consume_stream(stream_name, subject_filter, consumer_name),
|
|
||||||
name=f"consume-{stream_name}",
|
|
||||||
)
|
|
||||||
tasks.append(task)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Wait for all tasks; if one fails, cancel the others
|
|
||||||
done, pending = await asyncio.wait(
|
|
||||||
tasks,
|
|
||||||
return_when=asyncio.FIRST_EXCEPTION,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check for exceptions in completed tasks
|
|
||||||
for task in done:
|
|
||||||
if task.exception():
|
|
||||||
logger.error(
|
|
||||||
"Stream consumer failed",
|
|
||||||
extra={"task": task.get_name(), "error": str(task.exception())}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Cancel any remaining tasks
|
|
||||||
for task in pending:
|
|
||||||
task.cancel()
|
|
||||||
try:
|
|
||||||
await task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
# Shutdown requested, cancel all tasks
|
|
||||||
for task in tasks:
|
|
||||||
task.cancel()
|
|
||||||
try:
|
|
||||||
await task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop the consumer gracefully."""
|
"""Stop the consumer gracefully."""
|
||||||
|
|
@ -392,6 +308,7 @@ async def async_main() -> None:
|
||||||
"Archive starting",
|
"Archive starting",
|
||||||
extra={
|
extra={
|
||||||
"nats_url": settings.nats_url,
|
"nats_url": settings.nats_url,
|
||||||
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -80,16 +80,12 @@ async def lifespan(app: FastAPI):
|
||||||
|
|
||||||
from central.bootstrap_config import get_settings
|
from central.bootstrap_config import get_settings
|
||||||
from central.gui.db import close_pool, init_pool
|
from central.gui.db import close_pool, init_pool
|
||||||
from central.gui.nats import close_nats, init_nats
|
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
|
|
||||||
# Initialize database pool
|
# Initialize database pool
|
||||||
await init_pool(settings.db_dsn)
|
await init_pool(settings.db_dsn)
|
||||||
|
|
||||||
# Initialize NATS connection
|
|
||||||
await init_nats(settings.nats_url)
|
|
||||||
|
|
||||||
# Start session cleanup task
|
# Start session cleanup task
|
||||||
_shutdown_event = asyncio.Event()
|
_shutdown_event = asyncio.Event()
|
||||||
_cleanup_task = asyncio.create_task(_session_cleanup_loop())
|
_cleanup_task = asyncio.create_task(_session_cleanup_loop())
|
||||||
|
|
@ -107,7 +103,6 @@ async def lifespan(app: FastAPI):
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
_cleanup_task.cancel()
|
_cleanup_task.cancel()
|
||||||
|
|
||||||
await close_nats()
|
|
||||||
await close_pool()
|
await close_pool()
|
||||||
logger.info("Central GUI stopped")
|
logger.info("Central GUI stopped")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ AUTH_LOGIN_FAILED = "auth.login_failed"
|
||||||
AUTH_LOGOUT = "auth.logout"
|
AUTH_LOGOUT = "auth.logout"
|
||||||
AUTH_PASSWORD_CHANGE = "auth.password_change"
|
AUTH_PASSWORD_CHANGE = "auth.password_change"
|
||||||
OPERATOR_CREATE = "operator.create"
|
OPERATOR_CREATE = "operator.create"
|
||||||
ADAPTER_UPDATE = "adapter.update"
|
|
||||||
|
|
||||||
|
|
||||||
async def write_audit(
|
async def write_audit(
|
||||||
|
|
@ -21,15 +20,18 @@ async def write_audit(
|
||||||
after: dict[str, Any] | None = None,
|
after: dict[str, Any] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Write an audit log entry."""
|
"""Write an audit log entry."""
|
||||||
# asyncpg handles dict -> jsonb conversion automatically
|
# Serialize before/after as JSON strings if provided
|
||||||
|
before_json = json.dumps(before) if before else None
|
||||||
|
after_json = json.dumps(after) if after else None
|
||||||
|
|
||||||
await conn.execute(
|
await conn.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO config.audit_log (operator_id, action, target, before, after)
|
INSERT INTO config.audit_log (operator_id, action, target, before, after)
|
||||||
VALUES ($1, $2, $3, $4, $5)
|
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb)
|
||||||
""",
|
""",
|
||||||
operator_id,
|
operator_id,
|
||||||
action,
|
action,
|
||||||
target,
|
target,
|
||||||
before,
|
before_json,
|
||||||
after,
|
after_json,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,46 +0,0 @@
|
||||||
"""NATS connection for GUI."""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
import nats
|
|
||||||
from nats.js import JetStreamContext
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
_nc: nats.NATS | None = None
|
|
||||||
_js: JetStreamContext | None = None
|
|
||||||
|
|
||||||
|
|
||||||
async def init_nats(url: str) -> JetStreamContext | None:
|
|
||||||
"""Initialize the NATS connection and JetStream context."""
|
|
||||||
global _nc, _js
|
|
||||||
if _nc is None:
|
|
||||||
try:
|
|
||||||
_nc = await nats.connect(url)
|
|
||||||
_js = _nc.jetstream()
|
|
||||||
logger.info("Connected to NATS", extra={"url": url})
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to connect to NATS", extra={"error": str(e)})
|
|
||||||
_nc = None
|
|
||||||
_js = None
|
|
||||||
return _js
|
|
||||||
|
|
||||||
|
|
||||||
def get_js() -> JetStreamContext | None:
|
|
||||||
"""Get the JetStream context. Returns None if not connected."""
|
|
||||||
return _js
|
|
||||||
|
|
||||||
|
|
||||||
async def close_nats() -> None:
|
|
||||||
"""Close the NATS connection."""
|
|
||||||
global _nc, _js
|
|
||||||
if _nc is not None:
|
|
||||||
try:
|
|
||||||
await _nc.drain()
|
|
||||||
await _nc.close()
|
|
||||||
logger.info("Disconnected from NATS")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Error closing NATS", extra={"error": str(e)})
|
|
||||||
finally:
|
|
||||||
_nc = None
|
|
||||||
_js = None
|
|
||||||
|
|
@ -1,9 +1,5 @@
|
||||||
"""Route handlers for Central GUI."""
|
"""Route handlers for Central GUI."""
|
||||||
|
|
||||||
import json
|
|
||||||
import re
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, Form, Request
|
from fastapi import APIRouter, Depends, Form, Request
|
||||||
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
||||||
from fastapi_csrf_protect import CsrfProtect
|
from fastapi_csrf_protect import CsrfProtect
|
||||||
|
|
@ -16,7 +12,6 @@ from central.gui.auth import (
|
||||||
verify_password,
|
verify_password,
|
||||||
)
|
)
|
||||||
from central.gui.audit import (
|
from central.gui.audit import (
|
||||||
ADAPTER_UPDATE,
|
|
||||||
AUTH_LOGIN,
|
AUTH_LOGIN,
|
||||||
AUTH_LOGIN_FAILED,
|
AUTH_LOGIN_FAILED,
|
||||||
AUTH_LOGOUT,
|
AUTH_LOGOUT,
|
||||||
|
|
@ -28,24 +23,6 @@ from central.gui.db import get_pool
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
# Streams to display on dashboard
|
|
||||||
DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"]
|
|
||||||
|
|
||||||
# Email validation regex (simple but effective)
|
|
||||||
EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
|
|
||||||
|
|
||||||
|
|
||||||
def _get_valid_satellites() -> list[str]:
|
|
||||||
"""Get valid satellite identifiers from firms adapter."""
|
|
||||||
from central.adapters.firms import SATELLITE_SHORT
|
|
||||||
return list(SATELLITE_SHORT.keys())
|
|
||||||
|
|
||||||
|
|
||||||
def _get_valid_feeds() -> set[str]:
|
|
||||||
"""Get valid feed values from usgs_quake adapter."""
|
|
||||||
from central.adapters.usgs_quake import VALID_FEEDS
|
|
||||||
return VALID_FEEDS
|
|
||||||
|
|
||||||
|
|
||||||
def _get_templates():
|
def _get_templates():
|
||||||
"""Get templates instance (deferred import to avoid circular)."""
|
"""Get templates instance (deferred import to avoid circular)."""
|
||||||
|
|
@ -53,15 +30,6 @@ def _get_templates():
|
||||||
return templates
|
return templates
|
||||||
|
|
||||||
|
|
||||||
def _format_bytes(size: int) -> str:
|
|
||||||
"""Format bytes as human-readable string."""
|
|
||||||
for unit in ["B", "KB", "MB", "GB", "TB"]:
|
|
||||||
if size < 1024:
|
|
||||||
return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}"
|
|
||||||
size /= 1024
|
|
||||||
return f"{size:.1f} PB"
|
|
||||||
|
|
||||||
|
|
||||||
def _set_session_cookie(
|
def _set_session_cookie(
|
||||||
response: Response,
|
response: Response,
|
||||||
token: str,
|
token: str,
|
||||||
|
|
@ -108,154 +76,6 @@ async def index(request: Request, csrf_protect: CsrfProtect = Depends()) -> HTML
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
@router.get("/dashboard/events", response_class=HTMLResponse)
|
|
||||||
async def dashboard_events(request: Request) -> HTMLResponse:
|
|
||||||
"""Get events by adapter for the last 24 hours."""
|
|
||||||
templates = _get_templates()
|
|
||||||
pool = get_pool()
|
|
||||||
|
|
||||||
events = []
|
|
||||||
error = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
rows = await conn.fetch(
|
|
||||||
"""
|
|
||||||
SELECT adapter, COUNT(*) as count
|
|
||||||
FROM events
|
|
||||||
WHERE received > NOW() - INTERVAL '24 hours'
|
|
||||||
GROUP BY adapter
|
|
||||||
ORDER BY count DESC
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
events = [{"adapter": row["adapter"], "count": row["count"]} for row in rows]
|
|
||||||
except Exception as e:
|
|
||||||
error = f"Database error: {str(e)}"
|
|
||||||
|
|
||||||
return templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="_dashboard_events.html",
|
|
||||||
context={"events": events, "error": error},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/dashboard/streams", response_class=HTMLResponse)
|
|
||||||
async def dashboard_streams(request: Request) -> HTMLResponse:
|
|
||||||
"""Get stream sizes from NATS JetStream."""
|
|
||||||
from central.gui.nats import get_js
|
|
||||||
|
|
||||||
templates = _get_templates()
|
|
||||||
js = get_js()
|
|
||||||
|
|
||||||
streams = None
|
|
||||||
error = None
|
|
||||||
|
|
||||||
if js is None:
|
|
||||||
error = "NATS unavailable"
|
|
||||||
else:
|
|
||||||
streams = []
|
|
||||||
for stream_name in DASHBOARD_STREAMS:
|
|
||||||
try:
|
|
||||||
info = await js.stream_info(stream_name)
|
|
||||||
streams.append({
|
|
||||||
"name": stream_name,
|
|
||||||
"messages": info.state.messages,
|
|
||||||
"size": _format_bytes(info.state.bytes),
|
|
||||||
"error": None,
|
|
||||||
})
|
|
||||||
except Exception:
|
|
||||||
streams.append({
|
|
||||||
"name": stream_name,
|
|
||||||
"messages": 0,
|
|
||||||
"size": "0 B",
|
|
||||||
"error": "unavailable",
|
|
||||||
})
|
|
||||||
|
|
||||||
return templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="_dashboard_streams.html",
|
|
||||||
context={"streams": streams, "error": error},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/dashboard/polls", response_class=HTMLResponse)
|
|
||||||
async def dashboard_polls(request: Request) -> HTMLResponse:
|
|
||||||
"""Get last poll times for each adapter."""
|
|
||||||
from central.gui.nats import get_js
|
|
||||||
|
|
||||||
templates = _get_templates()
|
|
||||||
pool = get_pool()
|
|
||||||
js = get_js()
|
|
||||||
|
|
||||||
adapters = []
|
|
||||||
error = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
rows = await conn.fetch(
|
|
||||||
"SELECT name FROM config.adapters ORDER BY name"
|
|
||||||
)
|
|
||||||
adapter_names = [row["name"] for row in rows]
|
|
||||||
except Exception as e:
|
|
||||||
error = f"Database error: {str(e)}"
|
|
||||||
return templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="_dashboard_polls.html",
|
|
||||||
context={"adapters": [], "error": error},
|
|
||||||
)
|
|
||||||
|
|
||||||
if js is None:
|
|
||||||
error = "NATS unavailable"
|
|
||||||
adapters = [{"name": name, "last_poll": None, "status": None, "error": "NATS unavailable"} for name in adapter_names]
|
|
||||||
else:
|
|
||||||
for name in adapter_names:
|
|
||||||
try:
|
|
||||||
# Get last message from CENTRAL_META for this adapter
|
|
||||||
sub = await js.pull_subscribe(
|
|
||||||
f"central.meta.{name}.status",
|
|
||||||
durable=f"dashboard-poll-{name}",
|
|
||||||
stream="CENTRAL_META",
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
msgs = await sub.fetch(1, timeout=1.0)
|
|
||||||
if msgs:
|
|
||||||
data = json.loads(msgs[0].data.decode())
|
|
||||||
last_poll = data.get("data", {}).get("time", "—")
|
|
||||||
adapters.append({
|
|
||||||
"name": name,
|
|
||||||
"last_poll": last_poll,
|
|
||||||
"status": "✓",
|
|
||||||
"error": None,
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
adapters.append({
|
|
||||||
"name": name,
|
|
||||||
"last_poll": None,
|
|
||||||
"status": None,
|
|
||||||
"error": None,
|
|
||||||
})
|
|
||||||
except Exception:
|
|
||||||
adapters.append({
|
|
||||||
"name": name,
|
|
||||||
"last_poll": None,
|
|
||||||
"status": None,
|
|
||||||
"error": None,
|
|
||||||
})
|
|
||||||
except Exception:
|
|
||||||
adapters.append({
|
|
||||||
"name": name,
|
|
||||||
"last_poll": None,
|
|
||||||
"status": None,
|
|
||||||
"error": "unavailable",
|
|
||||||
})
|
|
||||||
|
|
||||||
return templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="_dashboard_polls.html",
|
|
||||||
context={"adapters": adapters, "error": error},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/setup", response_class=HTMLResponse)
|
@router.get("/setup", response_class=HTMLResponse)
|
||||||
async def setup_form(
|
async def setup_form(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
|
@ -550,285 +370,3 @@ async def change_password_submit(
|
||||||
|
|
||||||
# Redirect to index
|
# Redirect to index
|
||||||
return RedirectResponse(url="/", status_code=302)
|
return RedirectResponse(url="/", status_code=302)
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Adapters routes
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/adapters", response_class=HTMLResponse)
|
|
||||||
async def adapters_list(
|
|
||||||
request: Request,
|
|
||||||
csrf_protect: CsrfProtect = Depends(),
|
|
||||||
) -> HTMLResponse:
|
|
||||||
"""List all adapters."""
|
|
||||||
templates = _get_templates()
|
|
||||||
pool = get_pool()
|
|
||||||
operator = request.state.operator
|
|
||||||
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
rows = await conn.fetch(
|
|
||||||
"""
|
|
||||||
SELECT name, enabled, cadence_s, settings, paused_at, updated_at
|
|
||||||
FROM config.adapters
|
|
||||||
ORDER BY name
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
adapters = []
|
|
||||||
for row in rows:
|
|
||||||
settings = row["settings"] or {}
|
|
||||||
adapters.append({
|
|
||||||
"name": row["name"],
|
|
||||||
"enabled": row["enabled"],
|
|
||||||
"cadence_s": row["cadence_s"],
|
|
||||||
"settings": settings,
|
|
||||||
"paused_at": row["paused_at"],
|
|
||||||
"updated_at": row["updated_at"],
|
|
||||||
})
|
|
||||||
|
|
||||||
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
|
|
||||||
response = templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="adapters_list.html",
|
|
||||||
context={
|
|
||||||
"operator": operator,
|
|
||||||
"csrf_token": csrf_token,
|
|
||||||
"adapters": adapters,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
csrf_protect.set_csrf_cookie(signed_token, response)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/adapters/{name}", response_class=HTMLResponse)
|
|
||||||
async def adapters_edit_form(
|
|
||||||
request: Request,
|
|
||||||
name: str,
|
|
||||||
csrf_protect: CsrfProtect = Depends(),
|
|
||||||
) -> Response:
|
|
||||||
"""Render the adapter edit form."""
|
|
||||||
templates = _get_templates()
|
|
||||||
pool = get_pool()
|
|
||||||
operator = request.state.operator
|
|
||||||
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
row = await conn.fetchrow(
|
|
||||||
"""
|
|
||||||
SELECT name, enabled, cadence_s, settings, paused_at, updated_at
|
|
||||||
FROM config.adapters
|
|
||||||
WHERE name = $1
|
|
||||||
""",
|
|
||||||
name,
|
|
||||||
)
|
|
||||||
|
|
||||||
if row is None:
|
|
||||||
return Response(status_code=404, content="Adapter not found")
|
|
||||||
|
|
||||||
# Get API keys for firms dropdown
|
|
||||||
api_keys = await conn.fetch(
|
|
||||||
"SELECT alias FROM config.api_keys ORDER BY alias"
|
|
||||||
)
|
|
||||||
|
|
||||||
settings = row["settings"] or {}
|
|
||||||
adapter = {
|
|
||||||
"name": row["name"],
|
|
||||||
"enabled": row["enabled"],
|
|
||||||
"cadence_s": row["cadence_s"],
|
|
||||||
"settings": settings,
|
|
||||||
"paused_at": row["paused_at"],
|
|
||||||
"updated_at": row["updated_at"],
|
|
||||||
}
|
|
||||||
|
|
||||||
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
|
|
||||||
response = templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="adapters_edit.html",
|
|
||||||
context={
|
|
||||||
"operator": operator,
|
|
||||||
"csrf_token": csrf_token,
|
|
||||||
"adapter": adapter,
|
|
||||||
"errors": None,
|
|
||||||
"form_data": None,
|
|
||||||
"api_keys": [{"alias": k["alias"]} for k in api_keys],
|
|
||||||
"valid_satellites": _get_valid_satellites(),
|
|
||||||
"valid_feeds": sorted(_get_valid_feeds()),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
csrf_protect.set_csrf_cookie(signed_token, response)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/adapters/{name}")
|
|
||||||
async def adapters_edit_submit(
|
|
||||||
request: Request,
|
|
||||||
name: str,
|
|
||||||
csrf_protect: CsrfProtect = Depends(),
|
|
||||||
) -> Response:
|
|
||||||
"""Process the adapter edit form."""
|
|
||||||
templates = _get_templates()
|
|
||||||
pool = get_pool()
|
|
||||||
operator = request.state.operator
|
|
||||||
|
|
||||||
# Validate CSRF
|
|
||||||
await csrf_protect.validate_csrf(request)
|
|
||||||
|
|
||||||
# Parse form data
|
|
||||||
form = await request.form()
|
|
||||||
enabled = "enabled" in form
|
|
||||||
cadence_s_str = form.get("cadence_s", "")
|
|
||||||
|
|
||||||
# Build form_data for re-render on error
|
|
||||||
form_data: dict[str, Any] = {
|
|
||||||
"enabled": enabled,
|
|
||||||
"cadence_s": cadence_s_str,
|
|
||||||
}
|
|
||||||
|
|
||||||
errors: dict[str, str] = {}
|
|
||||||
|
|
||||||
# Validate cadence_s
|
|
||||||
try:
|
|
||||||
cadence_s = int(cadence_s_str)
|
|
||||||
if cadence_s < 60 or cadence_s > 3600:
|
|
||||||
errors["cadence_s"] = "Cadence must be between 60 and 3600 seconds"
|
|
||||||
except ValueError:
|
|
||||||
errors["cadence_s"] = "Cadence must be a valid integer"
|
|
||||||
cadence_s = 0
|
|
||||||
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
# Get current adapter state
|
|
||||||
row = await conn.fetchrow(
|
|
||||||
"""
|
|
||||||
SELECT name, enabled, cadence_s, settings, paused_at, updated_at
|
|
||||||
FROM config.adapters
|
|
||||||
WHERE name = $1
|
|
||||||
""",
|
|
||||||
name,
|
|
||||||
)
|
|
||||||
|
|
||||||
if row is None:
|
|
||||||
return Response(status_code=404, content="Adapter not found")
|
|
||||||
|
|
||||||
current_settings = row["settings"] or {}
|
|
||||||
new_settings = dict(current_settings)
|
|
||||||
|
|
||||||
# Adapter-specific validation and settings update
|
|
||||||
if name == "nws":
|
|
||||||
contact_email = form.get("contact_email", "").strip()
|
|
||||||
form_data["contact_email"] = contact_email
|
|
||||||
if not contact_email:
|
|
||||||
errors["contact_email"] = "Contact email is required"
|
|
||||||
elif not EMAIL_REGEX.match(contact_email):
|
|
||||||
errors["contact_email"] = "Invalid email format"
|
|
||||||
else:
|
|
||||||
new_settings["contact_email"] = contact_email
|
|
||||||
|
|
||||||
elif name == "firms":
|
|
||||||
api_key_alias = form.get("api_key_alias", "").strip()
|
|
||||||
satellites = form.getlist("satellites")
|
|
||||||
form_data["api_key_alias"] = api_key_alias
|
|
||||||
form_data["satellites"] = satellites
|
|
||||||
|
|
||||||
# Validate api_key_alias if set
|
|
||||||
if api_key_alias:
|
|
||||||
key_exists = await conn.fetchrow(
|
|
||||||
"SELECT 1 FROM config.api_keys WHERE alias = $1",
|
|
||||||
api_key_alias,
|
|
||||||
)
|
|
||||||
if not key_exists:
|
|
||||||
errors["api_key_alias"] = f"API key alias '{api_key_alias}' does not exist"
|
|
||||||
else:
|
|
||||||
new_settings["api_key_alias"] = api_key_alias
|
|
||||||
else:
|
|
||||||
new_settings["api_key_alias"] = None
|
|
||||||
|
|
||||||
# Validate satellites
|
|
||||||
valid_sats = set(_get_valid_satellites())
|
|
||||||
invalid_sats = [s for s in satellites if s not in valid_sats]
|
|
||||||
if invalid_sats:
|
|
||||||
errors["satellites"] = f"Invalid satellites: {', '.join(invalid_sats)}"
|
|
||||||
else:
|
|
||||||
new_settings["satellites"] = satellites
|
|
||||||
|
|
||||||
elif name == "usgs_quake":
|
|
||||||
feed = form.get("feed", "").strip()
|
|
||||||
form_data["feed"] = feed
|
|
||||||
valid_feeds = _get_valid_feeds()
|
|
||||||
if feed not in valid_feeds:
|
|
||||||
errors["feed"] = f"Invalid feed. Must be one of: {', '.join(sorted(valid_feeds))}"
|
|
||||||
else:
|
|
||||||
new_settings["feed"] = feed
|
|
||||||
|
|
||||||
# If there are errors, re-render the form
|
|
||||||
if errors:
|
|
||||||
adapter = {
|
|
||||||
"name": row["name"],
|
|
||||||
"enabled": row["enabled"],
|
|
||||||
"cadence_s": row["cadence_s"],
|
|
||||||
"settings": current_settings,
|
|
||||||
"paused_at": row["paused_at"],
|
|
||||||
"updated_at": row["updated_at"],
|
|
||||||
}
|
|
||||||
|
|
||||||
api_keys = await conn.fetch(
|
|
||||||
"SELECT alias FROM config.api_keys ORDER BY alias"
|
|
||||||
)
|
|
||||||
|
|
||||||
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
|
|
||||||
response = templates.TemplateResponse(
|
|
||||||
request=request,
|
|
||||||
name="adapters_edit.html",
|
|
||||||
context={
|
|
||||||
"operator": operator,
|
|
||||||
"csrf_token": csrf_token,
|
|
||||||
"adapter": adapter,
|
|
||||||
"errors": errors,
|
|
||||||
"form_data": form_data,
|
|
||||||
"api_keys": [{"alias": k["alias"]} for k in api_keys],
|
|
||||||
"valid_satellites": _get_valid_satellites(),
|
|
||||||
"valid_feeds": sorted(_get_valid_feeds()),
|
|
||||||
},
|
|
||||||
status_code=200,
|
|
||||||
)
|
|
||||||
csrf_protect.set_csrf_cookie(signed_token, response)
|
|
||||||
return response
|
|
||||||
|
|
||||||
# Build before state for audit
|
|
||||||
before = {
|
|
||||||
"enabled": row["enabled"],
|
|
||||||
"cadence_s": row["cadence_s"],
|
|
||||||
"settings": current_settings,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Build after state for audit
|
|
||||||
after = {
|
|
||||||
"enabled": enabled,
|
|
||||||
"cadence_s": cadence_s,
|
|
||||||
"settings": new_settings,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Update the adapter
|
|
||||||
await conn.execute(
|
|
||||||
"""
|
|
||||||
UPDATE config.adapters
|
|
||||||
SET enabled = $1, cadence_s = $2, settings = $3, updated_at = now()
|
|
||||||
WHERE name = $4
|
|
||||||
""",
|
|
||||||
enabled,
|
|
||||||
cadence_s,
|
|
||||||
new_settings,
|
|
||||||
name,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Write audit log
|
|
||||||
await write_audit(
|
|
||||||
conn,
|
|
||||||
ADAPTER_UPDATE,
|
|
||||||
operator_id=operator.id,
|
|
||||||
target=name,
|
|
||||||
before=before,
|
|
||||||
after=after,
|
|
||||||
)
|
|
||||||
|
|
||||||
return RedirectResponse(url="/adapters", status_code=302)
|
|
||||||
|
|
|
||||||
|
|
@ -1,22 +0,0 @@
|
||||||
{% if error %}
|
|
||||||
<p class="error">{{ error }}</p>
|
|
||||||
{% elif events %}
|
|
||||||
<table>
|
|
||||||
<thead>
|
|
||||||
<tr>
|
|
||||||
<th>Adapter</th>
|
|
||||||
<th>Count</th>
|
|
||||||
</tr>
|
|
||||||
</thead>
|
|
||||||
<tbody>
|
|
||||||
{% for event in events %}
|
|
||||||
<tr>
|
|
||||||
<td>{{ event.adapter }}</td>
|
|
||||||
<td>{{ event.count }}</td>
|
|
||||||
</tr>
|
|
||||||
{% endfor %}
|
|
||||||
</tbody>
|
|
||||||
</table>
|
|
||||||
{% else %}
|
|
||||||
<p>No events in the last 24 hours.</p>
|
|
||||||
{% endif %}
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
{% if error %}
|
|
||||||
<p class="error">{{ error }}</p>
|
|
||||||
{% elif adapters %}
|
|
||||||
<table>
|
|
||||||
<thead>
|
|
||||||
<tr>
|
|
||||||
<th>Adapter</th>
|
|
||||||
<th>Last Poll</th>
|
|
||||||
<th>Status</th>
|
|
||||||
</tr>
|
|
||||||
</thead>
|
|
||||||
<tbody>
|
|
||||||
{% for adapter in adapters %}
|
|
||||||
<tr>
|
|
||||||
<td>{{ adapter.name }}</td>
|
|
||||||
{% if adapter.error %}
|
|
||||||
<td colspan="2" class="error">{{ adapter.error }}</td>
|
|
||||||
{% elif adapter.last_poll %}
|
|
||||||
<td>{{ adapter.last_poll }}</td>
|
|
||||||
<td>{{ adapter.status }}</td>
|
|
||||||
{% else %}
|
|
||||||
<td>—</td>
|
|
||||||
<td>—</td>
|
|
||||||
{% endif %}
|
|
||||||
</tr>
|
|
||||||
{% endfor %}
|
|
||||||
</tbody>
|
|
||||||
</table>
|
|
||||||
{% else %}
|
|
||||||
<p>No adapter data available.</p>
|
|
||||||
{% endif %}
|
|
||||||
|
|
@ -1,28 +0,0 @@
|
||||||
{% if error %}
|
|
||||||
<p class="error">{{ error }}</p>
|
|
||||||
{% elif streams %}
|
|
||||||
<table>
|
|
||||||
<thead>
|
|
||||||
<tr>
|
|
||||||
<th>Stream</th>
|
|
||||||
<th>Messages</th>
|
|
||||||
<th>Size</th>
|
|
||||||
</tr>
|
|
||||||
</thead>
|
|
||||||
<tbody>
|
|
||||||
{% for stream in streams %}
|
|
||||||
<tr>
|
|
||||||
<td>{{ stream.name }}</td>
|
|
||||||
{% if stream.error %}
|
|
||||||
<td colspan="2" class="error">{{ stream.error }}</td>
|
|
||||||
{% else %}
|
|
||||||
<td>{{ stream.messages }}</td>
|
|
||||||
<td>{{ stream.size }}</td>
|
|
||||||
{% endif %}
|
|
||||||
</tr>
|
|
||||||
{% endfor %}
|
|
||||||
</tbody>
|
|
||||||
</table>
|
|
||||||
{% else %}
|
|
||||||
<p>No stream data available.</p>
|
|
||||||
{% endif %}
|
|
||||||
|
|
@ -1,49 +0,0 @@
|
||||||
{% extends "base.html" %}
|
|
||||||
|
|
||||||
{% block title %}Central — Edit {{ adapter.name }}{% endblock %}
|
|
||||||
|
|
||||||
{% block content %}
|
|
||||||
<h1>Edit Adapter: {{ adapter.name }}</h1>
|
|
||||||
|
|
||||||
<form method="post" action="/adapters/{{ adapter.name }}">
|
|
||||||
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
|
|
||||||
|
|
||||||
<fieldset>
|
|
||||||
<legend>Universal Settings</legend>
|
|
||||||
|
|
||||||
<label>
|
|
||||||
<input type="checkbox" name="enabled" {% if adapter.enabled %}checked{% endif %}>
|
|
||||||
Enabled
|
|
||||||
</label>
|
|
||||||
|
|
||||||
<label for="cadence_s">Cadence (seconds)</label>
|
|
||||||
<input type="number" id="cadence_s" name="cadence_s" value="{{ form_data.cadence_s if form_data else adapter.cadence_s }}" min="60" max="3600" required>
|
|
||||||
{% if errors and errors.cadence_s %}
|
|
||||||
<small style="color: var(--pico-color-red-500);">{{ errors.cadence_s }}</small>
|
|
||||||
{% endif %}
|
|
||||||
</fieldset>
|
|
||||||
|
|
||||||
<fieldset>
|
|
||||||
<legend>Adapter-Specific Settings</legend>
|
|
||||||
{% include "adapters_edit_" + adapter.name + ".html" %}
|
|
||||||
</fieldset>
|
|
||||||
|
|
||||||
<fieldset>
|
|
||||||
<legend>Region (read-only)</legend>
|
|
||||||
{% if adapter.settings.region %}
|
|
||||||
<p>
|
|
||||||
<strong>North:</strong> {{ adapter.settings.region.north }}<br>
|
|
||||||
<strong>South:</strong> {{ adapter.settings.region.south }}<br>
|
|
||||||
<strong>East:</strong> {{ adapter.settings.region.east }}<br>
|
|
||||||
<strong>West:</strong> {{ adapter.settings.region.west }}
|
|
||||||
</p>
|
|
||||||
{% else %}
|
|
||||||
<p>No region configured.</p>
|
|
||||||
{% endif %}
|
|
||||||
<small>Region editing comes in 1b-5.</small>
|
|
||||||
</fieldset>
|
|
||||||
|
|
||||||
<button type="submit">Save Changes</button>
|
|
||||||
<a href="/adapters" role="button" class="outline">Cancel</a>
|
|
||||||
</form>
|
|
||||||
{% endblock %}
|
|
||||||
|
|
@ -1,21 +0,0 @@
|
||||||
<label for="api_key_alias">API Key Alias</label>
|
|
||||||
<select id="api_key_alias" name="api_key_alias">
|
|
||||||
<option value="" {% if not (form_data.api_key_alias if form_data else adapter.settings.api_key_alias) %}selected{% endif %}>(none)</option>
|
|
||||||
{% for key in api_keys %}
|
|
||||||
<option value="{{ key.alias }}" {% if (form_data.api_key_alias if form_data else adapter.settings.api_key_alias) == key.alias %}selected{% endif %}>{{ key.alias }}</option>
|
|
||||||
{% endfor %}
|
|
||||||
</select>
|
|
||||||
{% if errors and errors.api_key_alias %}
|
|
||||||
<small style="color: var(--pico-color-red-500);">{{ errors.api_key_alias }}</small>
|
|
||||||
{% endif %}
|
|
||||||
|
|
||||||
<label>Satellites</label>
|
|
||||||
{% for sat in valid_satellites %}
|
|
||||||
<label>
|
|
||||||
<input type="checkbox" name="satellites" value="{{ sat }}" {% if sat in (form_data.satellites if form_data else adapter.settings.satellites or []) %}checked{% endif %}>
|
|
||||||
{{ sat }}
|
|
||||||
</label>
|
|
||||||
{% endfor %}
|
|
||||||
{% if errors and errors.satellites %}
|
|
||||||
<small style="color: var(--pico-color-red-500);">{{ errors.satellites }}</small>
|
|
||||||
{% endif %}
|
|
||||||
|
|
@ -1,5 +0,0 @@
|
||||||
<label for="contact_email">Contact Email</label>
|
|
||||||
<input type="email" id="contact_email" name="contact_email" value="{{ form_data.contact_email if form_data else adapter.settings.contact_email }}" required>
|
|
||||||
{% if errors and errors.contact_email %}
|
|
||||||
<small style="color: var(--pico-color-red-500);">{{ errors.contact_email }}</small>
|
|
||||||
{% endif %}
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
<label for="feed">Feed</label>
|
|
||||||
<select id="feed" name="feed" required>
|
|
||||||
{% for f in valid_feeds %}
|
|
||||||
<option value="{{ f }}" {% if (form_data.feed if form_data else adapter.settings.feed) == f %}selected{% endif %}>{{ f }}</option>
|
|
||||||
{% endfor %}
|
|
||||||
</select>
|
|
||||||
{% if errors and errors.feed %}
|
|
||||||
<small style="color: var(--pico-color-red-500);">{{ errors.feed }}</small>
|
|
||||||
{% endif %}
|
|
||||||
|
|
@ -1,29 +0,0 @@
|
||||||
{% extends "base.html" %}
|
|
||||||
|
|
||||||
{% block title %}Central — Adapters{% endblock %}
|
|
||||||
|
|
||||||
{% block content %}
|
|
||||||
<h1>Adapters</h1>
|
|
||||||
<table>
|
|
||||||
<thead>
|
|
||||||
<tr>
|
|
||||||
<th>Name</th>
|
|
||||||
<th>Enabled</th>
|
|
||||||
<th>Cadence</th>
|
|
||||||
<th>Last Updated</th>
|
|
||||||
<th></th>
|
|
||||||
</tr>
|
|
||||||
</thead>
|
|
||||||
<tbody>
|
|
||||||
{% for adapter in adapters %}
|
|
||||||
<tr>
|
|
||||||
<td>{{ adapter.name }}</td>
|
|
||||||
<td>{% if adapter.enabled %}Yes{% else %}No{% endif %}</td>
|
|
||||||
<td>{{ adapter.cadence_s }}s</td>
|
|
||||||
<td>{{ adapter.updated_at.strftime('%Y-%m-%d %H:%M') if adapter.updated_at else '—' }}</td>
|
|
||||||
<td><a href="/adapters/{{ adapter.name }}">Edit</a></td>
|
|
||||||
</tr>
|
|
||||||
{% endfor %}
|
|
||||||
</tbody>
|
|
||||||
</table>
|
|
||||||
{% endblock %}
|
|
||||||
|
|
@ -15,8 +15,6 @@
|
||||||
</ul>
|
</ul>
|
||||||
<ul>
|
<ul>
|
||||||
{% if operator %}
|
{% if operator %}
|
||||||
<li><a href="/">Dashboard</a></li>
|
|
||||||
<li><a href="/adapters">Adapters</a></li>
|
|
||||||
<li>{{ operator.username }}</li>
|
<li>{{ operator.username }}</li>
|
||||||
<li><a href="/change-password">Change Password</a></li>
|
<li><a href="/change-password">Change Password</a></li>
|
||||||
<li>
|
<li>
|
||||||
|
|
|
||||||
|
|
@ -1,27 +1,12 @@
|
||||||
{% extends "base.html" %}
|
{% extends "base.html" %}
|
||||||
|
|
||||||
{% block title %}Central — Dashboard{% endblock %}
|
{% block title %}Central — Coming Soon{% endblock %}
|
||||||
|
|
||||||
{% block content %}
|
{% block content %}
|
||||||
<h1>Dashboard</h1>
|
<article>
|
||||||
<div class="grid">
|
<header>
|
||||||
<article>
|
<h1>Central</h1>
|
||||||
<header>Events (24h)</header>
|
</header>
|
||||||
<div hx-get="/dashboard/events" hx-trigger="load, every 10s" hx-swap="innerHTML">
|
<p>Data hub GUI — coming soon.</p>
|
||||||
Loading...
|
</article>
|
||||||
</div>
|
|
||||||
</article>
|
|
||||||
<article>
|
|
||||||
<header>Stream Sizes</header>
|
|
||||||
<div hx-get="/dashboard/streams" hx-trigger="load, every 10s" hx-swap="innerHTML">
|
|
||||||
Loading...
|
|
||||||
</div>
|
|
||||||
</article>
|
|
||||||
<article>
|
|
||||||
<header>Last Poll Times</header>
|
|
||||||
<div hx-get="/dashboard/polls" hx-trigger="load, every 10s" hx-swap="innerHTML">
|
|
||||||
Loading...
|
|
||||||
</div>
|
|
||||||
</article>
|
|
||||||
</div>
|
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class Event(BaseModel):
|
||||||
model_config = ConfigDict(extra="forbid", frozen=True)
|
model_config = ConfigDict(extra="forbid", frozen=True)
|
||||||
|
|
||||||
id: str # unique, stable across republish
|
id: str # unique, stable across republish
|
||||||
adapter: str # adapter identity, e.g. "nws"
|
source: str # adapter identity, e.g. "central/adapters/nws"
|
||||||
category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
|
category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
|
||||||
time: datetime # event-time UTC, not processing-time
|
time: datetime # event-time UTC, not processing-time
|
||||||
expires: datetime | None = None
|
expires: datetime | None = None
|
||||||
|
|
|
||||||
|
|
@ -1,511 +0,0 @@
|
||||||
"""Tests for adapter list and edit routes."""
|
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
# Set required env vars before importing central modules
|
|
||||||
os.environ.setdefault("CENTRAL_DB_DSN", "postgresql://test:test@localhost/test")
|
|
||||||
os.environ.setdefault("CENTRAL_CSRF_SECRET", "testsecret12345678901234567890ab")
|
|
||||||
os.environ.setdefault("CENTRAL_NATS_URL", "nats://localhost:4222")
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptersListUnauthenticated:
|
|
||||||
"""Test adapters list without authentication."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_list_unauthenticated_redirects(self):
|
|
||||||
"""GET /adapters without auth redirects to /login."""
|
|
||||||
from central.gui.routes import adapters_list
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = None
|
|
||||||
|
|
||||||
# The middleware handles the redirect, so we test the route expects operator
|
|
||||||
# In practice, middleware returns 302 before route is called
|
|
||||||
# This test verifies the route structure expects authentication
|
|
||||||
assert adapters_list is not None
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptersListAuthenticated:
|
|
||||||
"""Test adapters list with authentication."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_list_returns_all_adapters(self):
|
|
||||||
"""GET /adapters authenticated returns 200 with all adapters."""
|
|
||||||
from central.gui.routes import adapters_list
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetch.return_value = [
|
|
||||||
{"name": "firms", "enabled": True, "cadence_s": 300, "settings": {"api_key_alias": "firms"}, "paused_at": None, "updated_at": None},
|
|
||||||
{"name": "nws", "enabled": True, "cadence_s": 60, "settings": {"contact_email": "test@test.com"}, "paused_at": None, "updated_at": None},
|
|
||||||
{"name": "usgs_quake", "enabled": True, "cadence_s": 120, "settings": {"feed": "all_hour"}, "paused_at": None, "updated_at": None},
|
|
||||||
]
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
|
|
||||||
mock_csrf.set_csrf_cookie = MagicMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
result = await adapters_list(mock_request, mock_csrf)
|
|
||||||
|
|
||||||
# Verify template was called with adapters
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert len(context["adapters"]) == 3
|
|
||||||
assert context["adapters"][0]["name"] == "firms"
|
|
||||||
assert context["adapters"][1]["name"] == "nws"
|
|
||||||
assert context["adapters"][2]["name"] == "usgs_quake"
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptersEditForm:
|
|
||||||
"""Test adapter edit form GET."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_edit_nws_shows_form(self):
|
|
||||||
"""GET /adapters/nws authenticated returns 200 with form."""
|
|
||||||
from central.gui.routes import adapters_edit_form
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "nws",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 60,
|
|
||||||
"settings": {"contact_email": "test@example.com", "region": {"north": 49, "south": 24, "east": -66, "west": -125}},
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.fetch.return_value = [] # No API keys
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
|
|
||||||
mock_csrf.set_csrf_cookie = MagicMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
result = await adapters_edit_form(mock_request, "nws", mock_csrf)
|
|
||||||
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert context["adapter"]["name"] == "nws"
|
|
||||||
assert context["adapter"]["settings"]["contact_email"] == "test@example.com"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_edit_nonexistent_returns_404(self):
|
|
||||||
"""GET /adapters/nonexistent returns 404."""
|
|
||||||
from central.gui.routes import adapters_edit_form
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = None
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
|
|
||||||
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
result = await adapters_edit_form(mock_request, "nonexistent", mock_csrf)
|
|
||||||
|
|
||||||
assert result.status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptersEditSubmit:
|
|
||||||
"""Test adapter edit form POST."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_edit_valid_changes_updates_db(self):
|
|
||||||
"""POST /adapters/nws with valid changes updates DB and redirects."""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
mock_request.cookies = {}
|
|
||||||
|
|
||||||
# Mock form data
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "120",
|
|
||||||
"contact_email": "new@example.com",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = []
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "nws",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 60,
|
|
||||||
"settings": {"contact_email": "old@example.com"},
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.execute = AsyncMock()
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
with patch("central.gui.routes.write_audit", new_callable=AsyncMock) as mock_audit:
|
|
||||||
result = await adapters_edit_submit(mock_request, "nws", mock_csrf)
|
|
||||||
|
|
||||||
assert result.status_code == 302
|
|
||||||
assert result.headers["location"] == "/adapters"
|
|
||||||
mock_conn.execute.assert_called()
|
|
||||||
mock_audit.assert_called_once()
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_edit_invalid_cadence_shows_error(self):
|
|
||||||
"""POST /adapters/nws with cadence_s=30 shows error, no DB update."""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "30",
|
|
||||||
"contact_email": "test@example.com",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = []
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "nws",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 60,
|
|
||||||
"settings": {"contact_email": "test@example.com"},
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.fetch.return_value = []
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_response.status_code = 200
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
|
|
||||||
mock_csrf.set_csrf_cookie = MagicMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
result = await adapters_edit_submit(mock_request, "nws", mock_csrf)
|
|
||||||
|
|
||||||
# Should re-render form with error
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert "cadence_s" in context["errors"]
|
|
||||||
assert "60" in context["errors"]["cadence_s"] or "3600" in context["errors"]["cadence_s"]
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_edit_firms_unknown_api_key_shows_error(self):
|
|
||||||
"""POST /adapters/firms with unknown api_key_alias shows error."""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "300",
|
|
||||||
"api_key_alias": "nonexistent_key",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = ["VIIRS_SNPP_NRT"]
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.side_effect = [
|
|
||||||
{ # First call: get adapter
|
|
||||||
"name": "firms",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 300,
|
|
||||||
"settings": {"api_key_alias": "firms", "satellites": ["VIIRS_SNPP_NRT"]},
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
},
|
|
||||||
None, # Second call: check api_key exists - returns None
|
|
||||||
]
|
|
||||||
mock_conn.fetch.return_value = []
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_response.status_code = 200
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
|
|
||||||
mock_csrf.set_csrf_cookie = MagicMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
result = await adapters_edit_submit(mock_request, "firms", mock_csrf)
|
|
||||||
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert "api_key_alias" in context["errors"]
|
|
||||||
assert "nonexistent_key" in context["errors"]["api_key_alias"]
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapters_edit_usgs_unknown_feed_shows_error(self):
|
|
||||||
"""POST /adapters/usgs_quake with unknown feed shows error."""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "120",
|
|
||||||
"feed": "invalid_feed",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = []
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "usgs_quake",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 120,
|
|
||||||
"settings": {"feed": "all_hour"},
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.fetch.return_value = []
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_response.status_code = 200
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
|
|
||||||
mock_csrf.set_csrf_cookie = MagicMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
result = await adapters_edit_submit(mock_request, "usgs_quake", mock_csrf)
|
|
||||||
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert "feed" in context["errors"]
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptersAudit:
|
|
||||||
"""Test adapter audit logging."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_audit_row_has_before_after(self):
|
|
||||||
"""Audit row has before/after JSONB populated correctly."""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "120",
|
|
||||||
"contact_email": "new@example.com",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = []
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "nws",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 60,
|
|
||||||
"settings": {"contact_email": "old@example.com"},
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.execute = AsyncMock()
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
|
|
||||||
captured_audit = {}
|
|
||||||
|
|
||||||
async def capture_audit(conn, action, operator_id=None, target=None, before=None, after=None):
|
|
||||||
captured_audit["action"] = action
|
|
||||||
captured_audit["target"] = target
|
|
||||||
captured_audit["before"] = before
|
|
||||||
captured_audit["after"] = after
|
|
||||||
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
with patch("central.gui.routes.write_audit", side_effect=capture_audit):
|
|
||||||
result = await adapters_edit_submit(mock_request, "nws", mock_csrf)
|
|
||||||
|
|
||||||
assert captured_audit["action"] == "adapter.update"
|
|
||||||
assert captured_audit["target"] == "nws"
|
|
||||||
assert captured_audit["before"]["cadence_s"] == 60
|
|
||||||
assert captured_audit["after"]["cadence_s"] == 120
|
|
||||||
assert captured_audit["before"]["settings"]["contact_email"] == "old@example.com"
|
|
||||||
assert captured_audit["after"]["settings"]["contact_email"] == "new@example.com"
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptersJsonbRegression:
|
|
||||||
"""Regression tests for JSONB double-encoding bug."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_settings_passed_as_dict_not_string(self):
|
|
||||||
"""Verify settings is passed to UPDATE as dict, not json.dumps string.
|
|
||||||
|
|
||||||
Regression test for double-encoding bug where json.dumps() was called
|
|
||||||
on settings before passing to asyncpg, which already handles dict->jsonb.
|
|
||||||
"""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "120",
|
|
||||||
"contact_email": "test@example.com",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = []
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "nws",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 60,
|
|
||||||
"settings": {"contact_email": "old@example.com"}, # dict, as asyncpg returns
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.execute = AsyncMock()
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
with patch("central.gui.routes.write_audit", new_callable=AsyncMock):
|
|
||||||
await adapters_edit_submit(mock_request, "nws", mock_csrf)
|
|
||||||
|
|
||||||
# Get the settings argument passed to execute (3rd positional arg after query)
|
|
||||||
call_args = mock_conn.execute.call_args
|
|
||||||
# args[0] is the query, args[1:] are the parameters
|
|
||||||
settings_arg = call_args[0][3] # enabled=$1, cadence=$2, settings=$3
|
|
||||||
|
|
||||||
# CRITICAL: settings must be a dict, NOT a string
|
|
||||||
# If json.dumps() was called, this would be a str like {contact_email: ...}
|
|
||||||
assert isinstance(settings_arg, dict), f"settings should be dict, got {type(settings_arg)}: {settings_arg}"
|
|
||||||
assert settings_arg["contact_email"] == "test@example.com"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_audit_before_after_passed_as_dict(self):
|
|
||||||
"""Verify audit before/after are passed as dicts, not json.dumps strings."""
|
|
||||||
from central.gui.routes import adapters_edit_submit
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock(id=1, username="testop")
|
|
||||||
|
|
||||||
mock_form = MagicMock()
|
|
||||||
mock_form.get.side_effect = lambda k, d="": {
|
|
||||||
"cadence_s": "120",
|
|
||||||
"contact_email": "new@example.com",
|
|
||||||
}.get(k, d)
|
|
||||||
mock_form.getlist.return_value = []
|
|
||||||
mock_form.__contains__ = lambda self, k: k == "enabled"
|
|
||||||
mock_request.form = AsyncMock(return_value=mock_form)
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetchrow.return_value = {
|
|
||||||
"name": "nws",
|
|
||||||
"enabled": True,
|
|
||||||
"cadence_s": 60,
|
|
||||||
"settings": {"contact_email": "old@example.com"}, # dict
|
|
||||||
"paused_at": None,
|
|
||||||
"updated_at": None,
|
|
||||||
}
|
|
||||||
mock_conn.execute = AsyncMock()
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_csrf = MagicMock()
|
|
||||||
mock_csrf.validate_csrf = AsyncMock()
|
|
||||||
|
|
||||||
captured_audit = {}
|
|
||||||
|
|
||||||
async def capture_audit(conn, action, operator_id=None, target=None, before=None, after=None):
|
|
||||||
captured_audit["before"] = before
|
|
||||||
captured_audit["after"] = after
|
|
||||||
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
with patch("central.gui.routes.write_audit", side_effect=capture_audit):
|
|
||||||
await adapters_edit_submit(mock_request, "nws", mock_csrf)
|
|
||||||
|
|
||||||
# CRITICAL: before and after must be dicts, NOT strings
|
|
||||||
assert isinstance(captured_audit["before"], dict), f"before should be dict, got {type(captured_audit['before'])}"
|
|
||||||
assert isinstance(captured_audit["after"], dict), f"after should be dict, got {type(captured_audit['after'])}"
|
|
||||||
assert isinstance(captured_audit["before"]["settings"], dict), "before.settings should be dict"
|
|
||||||
assert isinstance(captured_audit["after"]["settings"], dict), "after.settings should be dict"
|
|
||||||
|
|
@ -1,150 +0,0 @@
|
||||||
"""Tests for multi-stream archive consumer."""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
|
||||||
|
|
||||||
from central.archive import (
|
|
||||||
STREAMS,
|
|
||||||
consumer_name_for,
|
|
||||||
ArchiveConsumer,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TestConsumerNaming:
|
|
||||||
"""Test consumer naming convention."""
|
|
||||||
|
|
||||||
def test_consumer_name_for_central_wx(self):
|
|
||||||
"""Consumer name for CENTRAL_WX is archive-central_wx."""
|
|
||||||
assert consumer_name_for("CENTRAL_WX") == "archive-central_wx"
|
|
||||||
|
|
||||||
def test_consumer_name_for_central_fire(self):
|
|
||||||
"""Consumer name for CENTRAL_FIRE is archive-central_fire."""
|
|
||||||
assert consumer_name_for("CENTRAL_FIRE") == "archive-central_fire"
|
|
||||||
|
|
||||||
def test_consumer_name_for_central_quake(self):
|
|
||||||
"""Consumer name for CENTRAL_QUAKE is archive-central_quake."""
|
|
||||||
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
|
|
||||||
|
|
||||||
|
|
||||||
class TestStreamsConfiguration:
|
|
||||||
"""Test streams configuration."""
|
|
||||||
|
|
||||||
def test_streams_list_has_three_entries(self):
|
|
||||||
"""STREAMS list has three event-bearing streams."""
|
|
||||||
assert len(STREAMS) == 3
|
|
||||||
|
|
||||||
def test_streams_contains_central_wx(self):
|
|
||||||
"""STREAMS contains CENTRAL_WX with correct filter."""
|
|
||||||
assert ("CENTRAL_WX", "central.wx.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_contains_central_fire(self):
|
|
||||||
"""STREAMS contains CENTRAL_FIRE with correct filter."""
|
|
||||||
assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_contains_central_quake(self):
|
|
||||||
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
|
|
||||||
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
|
|
||||||
|
|
||||||
def test_streams_excludes_central_meta(self):
|
|
||||||
"""STREAMS does not contain CENTRAL_META (status messages only)."""
|
|
||||||
stream_names = [s[0] for s in STREAMS]
|
|
||||||
assert "CENTRAL_META" not in stream_names
|
|
||||||
|
|
||||||
|
|
||||||
class TestOrphanedConsumerCleanup:
|
|
||||||
"""Test cleanup of orphaned 'archive' consumer."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_cleanup_removes_orphaned_consumer_when_exists(self):
|
|
||||||
"""Cleanup removes 'archive' consumer from CENTRAL_WX when it exists."""
|
|
||||||
consumer = ArchiveConsumer(
|
|
||||||
nats_url="nats://localhost:4222",
|
|
||||||
postgres_dsn="postgresql://test:test@localhost/test",
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_js = AsyncMock()
|
|
||||||
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
|
||||||
mock_js.delete_consumer = AsyncMock()
|
|
||||||
consumer._js = mock_js
|
|
||||||
|
|
||||||
await consumer._cleanup_orphaned_consumer()
|
|
||||||
|
|
||||||
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
|
||||||
mock_js.delete_consumer.assert_called_once_with("CENTRAL_WX", "archive")
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_cleanup_handles_not_found_gracefully(self):
|
|
||||||
"""Cleanup handles NotFoundError when 'archive' consumer doesn't exist."""
|
|
||||||
from nats.js.errors import NotFoundError
|
|
||||||
|
|
||||||
consumer = ArchiveConsumer(
|
|
||||||
nats_url="nats://localhost:4222",
|
|
||||||
postgres_dsn="postgresql://test:test@localhost/test",
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_js = AsyncMock()
|
|
||||||
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
|
||||||
mock_js.delete_consumer = AsyncMock()
|
|
||||||
consumer._js = mock_js
|
|
||||||
|
|
||||||
# Should not raise
|
|
||||||
await consumer._cleanup_orphaned_consumer()
|
|
||||||
|
|
||||||
mock_js.consumer_info.assert_called_once_with("CENTRAL_WX", "archive")
|
|
||||||
mock_js.delete_consumer.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
class TestEnsureConsumer:
|
|
||||||
"""Test consumer creation for each stream."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_ensure_consumer_creates_when_not_exists(self):
|
|
||||||
"""_ensure_consumer creates consumer when it doesn't exist."""
|
|
||||||
from nats.js.errors import NotFoundError
|
|
||||||
|
|
||||||
consumer = ArchiveConsumer(
|
|
||||||
nats_url="nats://localhost:4222",
|
|
||||||
postgres_dsn="postgresql://test:test@localhost/test",
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_js = AsyncMock()
|
|
||||||
mock_js.consumer_info = AsyncMock(side_effect=NotFoundError())
|
|
||||||
mock_js.add_consumer = AsyncMock()
|
|
||||||
consumer._js = mock_js
|
|
||||||
|
|
||||||
await consumer._ensure_consumer(
|
|
||||||
"CENTRAL_FIRE", "central.fire.>", "archive-central_fire"
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_js.consumer_info.assert_called_once_with(
|
|
||||||
"CENTRAL_FIRE", "archive-central_fire"
|
|
||||||
)
|
|
||||||
mock_js.add_consumer.assert_called_once()
|
|
||||||
# Verify the consumer config
|
|
||||||
call_args = mock_js.add_consumer.call_args
|
|
||||||
assert call_args[0][0] == "CENTRAL_FIRE"
|
|
||||||
config = call_args[0][1]
|
|
||||||
assert config.durable_name == "archive-central_fire"
|
|
||||||
assert config.filter_subject == "central.fire.>"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_ensure_consumer_skips_when_exists(self):
|
|
||||||
"""_ensure_consumer does nothing when consumer already exists."""
|
|
||||||
consumer = ArchiveConsumer(
|
|
||||||
nats_url="nats://localhost:4222",
|
|
||||||
postgres_dsn="postgresql://test:test@localhost/test",
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_js = AsyncMock()
|
|
||||||
mock_js.consumer_info = AsyncMock(return_value=MagicMock())
|
|
||||||
mock_js.add_consumer = AsyncMock()
|
|
||||||
consumer._js = mock_js
|
|
||||||
|
|
||||||
await consumer._ensure_consumer(
|
|
||||||
"CENTRAL_QUAKE", "central.quake.>", "archive-central_quake"
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_js.consumer_info.assert_called_once_with(
|
|
||||||
"CENTRAL_QUAKE", "archive-central_quake"
|
|
||||||
)
|
|
||||||
mock_js.add_consumer.assert_not_called()
|
|
||||||
|
|
@ -1,158 +0,0 @@
|
||||||
"""Tests for dashboard routes."""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from unittest.mock import MagicMock, AsyncMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
# Set required env vars before importing central modules
|
|
||||||
os.environ.setdefault("CENTRAL_DB_DSN", "postgresql://test:test@localhost/test")
|
|
||||||
os.environ.setdefault("CENTRAL_CSRF_SECRET", "testsecret12345678901234567890ab")
|
|
||||||
os.environ.setdefault("CENTRAL_NATS_URL", "nats://localhost:4222")
|
|
||||||
|
|
||||||
|
|
||||||
class TestFormatBytes:
|
|
||||||
"""Test _format_bytes helper."""
|
|
||||||
|
|
||||||
def test_format_bytes_bytes(self):
|
|
||||||
"""Bytes are shown as B."""
|
|
||||||
from central.gui.routes import _format_bytes
|
|
||||||
assert _format_bytes(100) == "100 B"
|
|
||||||
|
|
||||||
def test_format_bytes_kilobytes(self):
|
|
||||||
"""KB formatting."""
|
|
||||||
from central.gui.routes import _format_bytes
|
|
||||||
assert _format_bytes(1024) == "1.0 KB"
|
|
||||||
|
|
||||||
def test_format_bytes_megabytes(self):
|
|
||||||
"""MB formatting."""
|
|
||||||
from central.gui.routes import _format_bytes
|
|
||||||
assert _format_bytes(1048576) == "1.0 MB"
|
|
||||||
|
|
||||||
def test_format_bytes_gigabytes(self):
|
|
||||||
"""GB formatting."""
|
|
||||||
from central.gui.routes import _format_bytes
|
|
||||||
assert _format_bytes(1073741824) == "1.0 GB"
|
|
||||||
|
|
||||||
|
|
||||||
class TestDashboardEventsSQL:
|
|
||||||
"""Test events query construction."""
|
|
||||||
|
|
||||||
def test_events_query_has_24h_filter(self):
|
|
||||||
"""Events query filters by received > NOW() - 24h."""
|
|
||||||
# We can't easily test the full route without mocking,
|
|
||||||
# but we can verify the query logic by inspecting the source
|
|
||||||
import inspect
|
|
||||||
from central.gui.routes import dashboard_events
|
|
||||||
source = inspect.getsource(dashboard_events)
|
|
||||||
assert "24 hours" in source
|
|
||||||
assert "received > NOW()" in source
|
|
||||||
|
|
||||||
|
|
||||||
class TestDashboardStreamsGracefulDegradation:
|
|
||||||
"""Test streams endpoint graceful degradation."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_nats_unavailable_returns_error_message(self):
|
|
||||||
"""When NATS is unavailable, streams returns error message not 500."""
|
|
||||||
from central.gui.routes import dashboard_streams
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock()
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.nats.get_js", return_value=None):
|
|
||||||
result = await dashboard_streams(mock_request)
|
|
||||||
|
|
||||||
# Should call template with error context
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert context["error"] == "NATS unavailable"
|
|
||||||
assert context["streams"] is None
|
|
||||||
|
|
||||||
|
|
||||||
class TestDashboardPollsGracefulDegradation:
|
|
||||||
"""Test polls endpoint graceful degradation."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_nats_unavailable_shows_all_adapters_with_error(self):
|
|
||||||
"""When NATS is unavailable, polls shows adapters with error message."""
|
|
||||||
from central.gui.routes import dashboard_polls
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock()
|
|
||||||
|
|
||||||
mock_conn = AsyncMock()
|
|
||||||
mock_conn.fetch.return_value = [{"name": "nws"}, {"name": "firms"}]
|
|
||||||
|
|
||||||
mock_pool = MagicMock()
|
|
||||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
||||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
||||||
with patch("central.gui.nats.get_js", return_value=None):
|
|
||||||
result = await dashboard_polls(mock_request)
|
|
||||||
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
assert context["error"] == "NATS unavailable"
|
|
||||||
assert len(context["adapters"]) == 2
|
|
||||||
assert context["adapters"][0]["error"] == "NATS unavailable"
|
|
||||||
|
|
||||||
|
|
||||||
class TestDashboardStreamsIsolation:
|
|
||||||
"""Test stream failure isolation."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_single_stream_failure_doesnt_crash_card(self):
|
|
||||||
"""A single stream failure shows error for that stream only."""
|
|
||||||
from central.gui.routes import dashboard_streams
|
|
||||||
|
|
||||||
mock_request = MagicMock()
|
|
||||||
mock_request.state.operator = MagicMock()
|
|
||||||
|
|
||||||
async def mock_stream_info(name):
|
|
||||||
if name == "CENTRAL_FIRE":
|
|
||||||
raise Exception("Not found")
|
|
||||||
state = MagicMock()
|
|
||||||
state.messages = 100
|
|
||||||
state.bytes = 1024
|
|
||||||
info = MagicMock()
|
|
||||||
info.state = state
|
|
||||||
return info
|
|
||||||
|
|
||||||
mock_js = AsyncMock()
|
|
||||||
mock_js.stream_info.side_effect = mock_stream_info
|
|
||||||
|
|
||||||
mock_templates = MagicMock()
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_templates.TemplateResponse.return_value = mock_response
|
|
||||||
|
|
||||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
||||||
with patch("central.gui.nats.get_js", return_value=mock_js):
|
|
||||||
result = await dashboard_streams(mock_request)
|
|
||||||
|
|
||||||
call_args = mock_templates.TemplateResponse.call_args
|
|
||||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
||||||
|
|
||||||
streams = context["streams"]
|
|
||||||
# Should have 4 streams
|
|
||||||
assert len(streams) == 4
|
|
||||||
|
|
||||||
# CENTRAL_FIRE should have error
|
|
||||||
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
|
|
||||||
assert fire_stream.get("error") == "unavailable"
|
|
||||||
|
|
||||||
# CENTRAL_WX should be fine
|
|
||||||
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")
|
|
||||||
assert wx_stream.get("error") is None
|
|
||||||
assert wx_stream["messages"] == 100
|
|
||||||
|
|
@ -1,313 +0,0 @@
|
||||||
"""Tests for events.adapter column migration (011).
|
|
||||||
|
|
||||||
These tests exercise the actual migration SQL against a test database,
|
|
||||||
verifying backfill logic, FK constraints, NOT NULL enforcement, and
|
|
||||||
source column removal.
|
|
||||||
|
|
||||||
Requires CENTRAL_TEST_DB_DSN or uses default central_test database.
|
|
||||||
The test database must have PostGIS installed, or the central_test role
|
|
||||||
must be a superuser (which it is by default) to self-bootstrap PostGIS.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import asyncpg
|
|
||||||
import pytest
|
|
||||||
import pytest_asyncio
|
|
||||||
|
|
||||||
from central.models import Event, Geo
|
|
||||||
|
|
||||||
|
|
||||||
# Test database DSN - matches test_config_store.py pattern
|
|
||||||
TEST_DB_DSN = os.environ.get(
|
|
||||||
"CENTRAL_TEST_DB_DSN",
|
|
||||||
"postgresql://central_test:testpass@localhost/central_test",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Path to migration file
|
|
||||||
MIGRATION_011_PATH = Path(__file__).parent.parent / "sql" / "migrations" / "011_events_add_adapter_column.sql"
|
|
||||||
|
|
||||||
|
|
||||||
class TestEventModelAdapterField:
|
|
||||||
"""Test Event model has adapter field (not source)."""
|
|
||||||
|
|
||||||
def test_event_has_adapter_field(self):
|
|
||||||
"""Event model has adapter field."""
|
|
||||||
event = Event(
|
|
||||||
id="test-1",
|
|
||||||
adapter="nws",
|
|
||||||
category="wx.alert.test",
|
|
||||||
time=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
|
|
||||||
geo=Geo(),
|
|
||||||
data={},
|
|
||||||
)
|
|
||||||
assert event.adapter == "nws"
|
|
||||||
|
|
||||||
def test_event_model_no_source_field(self):
|
|
||||||
"""Event model does not have source field."""
|
|
||||||
assert "source" not in Event.model_fields
|
|
||||||
assert "adapter" in Event.model_fields
|
|
||||||
|
|
||||||
def test_event_adapter_accepts_all_adapter_names(self):
|
|
||||||
"""Event adapter field accepts all known adapter names."""
|
|
||||||
for adapter_name in ["nws", "firms", "usgs_quake"]:
|
|
||||||
event = Event(
|
|
||||||
id=f"test-{adapter_name}",
|
|
||||||
adapter=adapter_name,
|
|
||||||
category="test.category",
|
|
||||||
time=datetime.now(timezone.utc),
|
|
||||||
geo=Geo(),
|
|
||||||
data={},
|
|
||||||
)
|
|
||||||
assert event.adapter == adapter_name
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
|
||||||
async def db_conn() -> asyncpg.Connection:
|
|
||||||
"""Get a database connection for migration tests."""
|
|
||||||
conn = await asyncpg.connect(TEST_DB_DSN)
|
|
||||||
yield conn
|
|
||||||
await conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
|
||||||
async def pre_migration_events_table(db_conn: asyncpg.Connection) -> None:
|
|
||||||
"""Create events table with pre-migration schema (source column, no adapter).
|
|
||||||
|
|
||||||
Also ensures config.adapters exists with test adapters.
|
|
||||||
Self-bootstraps PostGIS if not already installed (central_test is superuser).
|
|
||||||
"""
|
|
||||||
# Self-bootstrap PostGIS extension (central_test role is superuser)
|
|
||||||
await db_conn.execute("CREATE EXTENSION IF NOT EXISTS postgis")
|
|
||||||
|
|
||||||
# Ensure config schema and adapters table exist
|
|
||||||
await db_conn.execute("CREATE SCHEMA IF NOT EXISTS config")
|
|
||||||
await db_conn.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS config.adapters (
|
|
||||||
name TEXT PRIMARY KEY,
|
|
||||||
enabled BOOLEAN NOT NULL DEFAULT true,
|
|
||||||
cadence_s INTEGER NOT NULL,
|
|
||||||
settings JSONB NOT NULL DEFAULT '{}'::jsonb,
|
|
||||||
paused_at TIMESTAMPTZ,
|
|
||||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
||||||
)
|
|
||||||
""")
|
|
||||||
|
|
||||||
# Insert test adapters (idempotent)
|
|
||||||
await db_conn.execute("""
|
|
||||||
INSERT INTO config.adapters (name, cadence_s)
|
|
||||||
VALUES ('nws', 60), ('firms', 300), ('usgs_quake', 60)
|
|
||||||
ON CONFLICT (name) DO NOTHING
|
|
||||||
""")
|
|
||||||
|
|
||||||
# Drop events table if exists (clean slate)
|
|
||||||
await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE")
|
|
||||||
|
|
||||||
# Create events table with PRE-MIGRATION schema (has source, no adapter)
|
|
||||||
# Matches production schema including geom column
|
|
||||||
await db_conn.execute("""
|
|
||||||
CREATE TABLE public.events (
|
|
||||||
id TEXT NOT NULL,
|
|
||||||
source TEXT NOT NULL,
|
|
||||||
category TEXT NOT NULL,
|
|
||||||
time TIMESTAMPTZ NOT NULL,
|
|
||||||
expires TIMESTAMPTZ,
|
|
||||||
severity SMALLINT,
|
|
||||||
geom GEOMETRY(Geometry, 4326),
|
|
||||||
regions TEXT[],
|
|
||||||
primary_region TEXT,
|
|
||||||
payload JSONB NOT NULL,
|
|
||||||
received TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
||||||
PRIMARY KEY (id, time)
|
|
||||||
)
|
|
||||||
""")
|
|
||||||
|
|
||||||
# Insert test rows with different source values
|
|
||||||
# geom is NULL (production schema permits this)
|
|
||||||
test_rows = [
|
|
||||||
("event-nws-1", "central/adapters/nws", "wx.alert.tornado_warning"),
|
|
||||||
("event-nws-2", "central/adapters/nws", "wx.alert.flood_warning"),
|
|
||||||
("event-firms-1", "central/adapters/firms", "fire.hotspot"),
|
|
||||||
("event-usgs-1", "central/adapters/usgs_quake", "seismic.earthquake"),
|
|
||||||
]
|
|
||||||
|
|
||||||
for event_id, source, category in test_rows:
|
|
||||||
await db_conn.execute("""
|
|
||||||
INSERT INTO public.events (id, source, category, time, payload)
|
|
||||||
VALUES ($1, $2, $3, $4, $5)
|
|
||||||
""", event_id, source, category, datetime.now(timezone.utc), "{}")
|
|
||||||
|
|
||||||
yield
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
await db_conn.execute("DROP TABLE IF EXISTS public.events CASCADE")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
|
||||||
async def run_migration_011(
|
|
||||||
db_conn: asyncpg.Connection,
|
|
||||||
pre_migration_events_table: None,
|
|
||||||
) -> None:
|
|
||||||
"""Run migration 011 against the pre-migration events table."""
|
|
||||||
migration_sql = MIGRATION_011_PATH.read_text()
|
|
||||||
await db_conn.execute(migration_sql)
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
class TestMigration011Backfill:
|
|
||||||
"""Test migration 011 backfill logic."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_backfill_nws_source_to_adapter(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Backfill converts 'central/adapters/nws' to 'nws'."""
|
|
||||||
rows = await db_conn.fetch(
|
|
||||||
"SELECT id, adapter FROM public.events WHERE id LIKE 'event-nws-%'"
|
|
||||||
)
|
|
||||||
assert len(rows) == 2
|
|
||||||
for row in rows:
|
|
||||||
assert row["adapter"] == "nws"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_backfill_firms_source_to_adapter(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Backfill converts 'central/adapters/firms' to 'firms'."""
|
|
||||||
row = await db_conn.fetchrow(
|
|
||||||
"SELECT adapter FROM public.events WHERE id = 'event-firms-1'"
|
|
||||||
)
|
|
||||||
assert row["adapter"] == "firms"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_backfill_usgs_quake_source_to_adapter(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Backfill converts 'central/adapters/usgs_quake' to 'usgs_quake'."""
|
|
||||||
row = await db_conn.fetchrow(
|
|
||||||
"SELECT adapter FROM public.events WHERE id = 'event-usgs-1'"
|
|
||||||
)
|
|
||||||
assert row["adapter"] == "usgs_quake"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_backfill_all_rows_have_adapter(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""All rows have non-NULL adapter after backfill."""
|
|
||||||
count = await db_conn.fetchval(
|
|
||||||
"SELECT COUNT(*) FROM public.events WHERE adapter IS NULL"
|
|
||||||
)
|
|
||||||
assert count == 0
|
|
||||||
|
|
||||||
|
|
||||||
class TestMigration011Constraints:
|
|
||||||
"""Test migration 011 constraint enforcement."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_fk_restrict_prevents_adapter_deletion(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""FK RESTRICT prevents deleting adapter with existing events."""
|
|
||||||
with pytest.raises(asyncpg.ForeignKeyViolationError):
|
|
||||||
await db_conn.execute(
|
|
||||||
"DELETE FROM config.adapters WHERE name = 'nws'"
|
|
||||||
)
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_not_null_rejects_null_adapter(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""NOT NULL constraint rejects NULL adapter on insert."""
|
|
||||||
with pytest.raises(asyncpg.NotNullViolationError):
|
|
||||||
await db_conn.execute("""
|
|
||||||
INSERT INTO public.events (id, adapter, category, time, payload)
|
|
||||||
VALUES ('null-test', NULL, 'test', now(), '{}')
|
|
||||||
""")
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_fk_rejects_unknown_adapter(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""FK constraint rejects unknown adapter values."""
|
|
||||||
with pytest.raises(asyncpg.ForeignKeyViolationError):
|
|
||||||
await db_conn.execute("""
|
|
||||||
INSERT INTO public.events (id, adapter, category, time, payload)
|
|
||||||
VALUES ('bad-adapter', 'nonexistent_adapter', 'test', now(), '{}')
|
|
||||||
""")
|
|
||||||
|
|
||||||
|
|
||||||
class TestMigration011SchemaChanges:
|
|
||||||
"""Test migration 011 schema changes."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_source_column_dropped(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Source column no longer exists after migration."""
|
|
||||||
row = await db_conn.fetchrow("""
|
|
||||||
SELECT COUNT(*) as count
|
|
||||||
FROM information_schema.columns
|
|
||||||
WHERE table_schema = 'public'
|
|
||||||
AND table_name = 'events'
|
|
||||||
AND column_name = 'source'
|
|
||||||
""")
|
|
||||||
assert row["count"] == 0
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapter_column_exists(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Adapter column exists after migration."""
|
|
||||||
row = await db_conn.fetchrow("""
|
|
||||||
SELECT COUNT(*) as count
|
|
||||||
FROM information_schema.columns
|
|
||||||
WHERE table_schema = 'public'
|
|
||||||
AND table_name = 'events'
|
|
||||||
AND column_name = 'adapter'
|
|
||||||
""")
|
|
||||||
assert row["count"] == 1
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapter_column_is_not_null(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Adapter column has NOT NULL constraint."""
|
|
||||||
row = await db_conn.fetchrow("""
|
|
||||||
SELECT is_nullable
|
|
||||||
FROM information_schema.columns
|
|
||||||
WHERE table_schema = 'public'
|
|
||||||
AND table_name = 'events'
|
|
||||||
AND column_name = 'adapter'
|
|
||||||
""")
|
|
||||||
assert row["is_nullable"] == "NO"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapter_fk_constraint_exists(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""FK constraint events_adapter_fkey exists."""
|
|
||||||
row = await db_conn.fetchrow("""
|
|
||||||
SELECT COUNT(*) as count
|
|
||||||
FROM information_schema.table_constraints
|
|
||||||
WHERE table_schema = 'public'
|
|
||||||
AND table_name = 'events'
|
|
||||||
AND constraint_name = 'events_adapter_fkey'
|
|
||||||
AND constraint_type = 'FOREIGN KEY'
|
|
||||||
""")
|
|
||||||
assert row["count"] == 1
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapter_received_index_exists(
|
|
||||||
self, db_conn: asyncpg.Connection, run_migration_011: None
|
|
||||||
):
|
|
||||||
"""Index events_adapter_received_idx exists."""
|
|
||||||
row = await db_conn.fetchrow("""
|
|
||||||
SELECT COUNT(*) as count
|
|
||||||
FROM pg_indexes
|
|
||||||
WHERE schemaname = 'public'
|
|
||||||
AND tablename = 'events'
|
|
||||||
AND indexname = 'events_adapter_received_idx'
|
|
||||||
""")
|
|
||||||
assert row["count"] == 1
|
|
||||||
|
|
@ -288,7 +288,7 @@ class TestSubjectGeneration:
|
||||||
def test_subject_format(self):
|
def test_subject_format(self):
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test",
|
id="test",
|
||||||
adapter="firms",
|
source="central/adapters/firms",
|
||||||
category="fire.hotspot.viirs_snpp.high",
|
category="fire.hotspot.viirs_snpp.high",
|
||||||
time=datetime.now(timezone.utc),
|
time=datetime.now(timezone.utc),
|
||||||
severity=3,
|
severity=3,
|
||||||
|
|
@ -302,7 +302,7 @@ class TestSubjectGeneration:
|
||||||
def test_subject_nominal_confidence(self):
|
def test_subject_nominal_confidence(self):
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test",
|
id="test",
|
||||||
adapter="firms",
|
source="central/adapters/firms",
|
||||||
category="fire.hotspot.viirs_noaa20.nominal",
|
category="fire.hotspot.viirs_noaa20.nominal",
|
||||||
time=datetime.now(timezone.utc),
|
time=datetime.now(timezone.utc),
|
||||||
severity=2,
|
severity=2,
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ def sample_event(sample_geo: Geo) -> Event:
|
||||||
"""Sample Event object for testing."""
|
"""Sample Event object for testing."""
|
||||||
return Event(
|
return Event(
|
||||||
id="urn:central:nws:alert:KBOI-202401151200-SVR",
|
id="urn:central:nws:alert:KBOI-202401151200-SVR",
|
||||||
adapter="nws",
|
source="central/adapters/nws",
|
||||||
category="wx.alert.severe_thunderstorm_warning",
|
category="wx.alert.severe_thunderstorm_warning",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
expires=datetime(2024, 1, 15, 13, 0, 0, tzinfo=timezone.utc),
|
expires=datetime(2024, 1, 15, 13, 0, 0, tzinfo=timezone.utc),
|
||||||
|
|
@ -75,7 +75,7 @@ class TestSubjectForEvent:
|
||||||
)
|
)
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test-zone",
|
id="test-zone",
|
||||||
adapter="nws",
|
source="test",
|
||||||
category="wx.alert.winter_storm_warning",
|
category="wx.alert.winter_storm_warning",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
geo=geo,
|
geo=geo,
|
||||||
|
|
@ -89,7 +89,7 @@ class TestSubjectForEvent:
|
||||||
geo = Geo(regions=[], primary_region=None)
|
geo = Geo(regions=[], primary_region=None)
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test-unknown",
|
id="test-unknown",
|
||||||
adapter="nws",
|
source="test",
|
||||||
category="wx.alert.test",
|
category="wx.alert.test",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
geo=geo,
|
geo=geo,
|
||||||
|
|
@ -144,7 +144,7 @@ class TestCloudEventsWire:
|
||||||
"""When severity is None, centralseverity is omitted entirely."""
|
"""When severity is None, centralseverity is omitted entirely."""
|
||||||
event = Event(
|
event = Event(
|
||||||
id="test-no-severity",
|
id="test-no-severity",
|
||||||
adapter="nws",
|
source="test",
|
||||||
category="wx.alert.test",
|
category="wx.alert.test",
|
||||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||||
severity=None, # Explicitly None
|
severity=None, # Explicitly None
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue