diff --git a/README.md b/README.md index 36bcb4c..f057003 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,10 @@ Phase 0 — scaffold. Not yet operational. - One archive consumer process persisting events to TimescaleDB - Both processes systemd-managed +## Testing + +See [docs/test-database.md](docs/test-database.md) for test database setup. + ## License MIT. See LICENSE. diff --git a/docs/migrations.md b/docs/migrations.md index ca53862..ccf2e61 100644 --- a/docs/migrations.md +++ b/docs/migrations.md @@ -1,5 +1,11 @@ # Migration policy +## Migrations are the sole source of truth + +The `sql/migrations/` directory contains all schema definitions. There is +no separate schema.sql file; use `pg_dump -s central` to generate a +human-readable snapshot of the current schema when needed. + ## Migrations must be idempotent New migration files (007+) must use guards so they can be safely @@ -20,8 +26,24 @@ Direct `psql` execution bypasses the `schema_migrations` tracker and was the cause of the v0.2.0 reconcile. If a migration needs to be applied on the live system, run: - sudo -u central /opt/central/.venv/bin/python -m scripts.migrate + sudo -u central /opt/central/.venv/bin/python -m central.migrate Never apply migration SQL directly via `psql`, even as a superuser, even "just to test." If migrate.py has a bug that's blocking you, fix migrate.py. + +## Extensions are not in migrations + +PostgreSQL extensions like PostGIS require superuser privileges to +install. The production `central` role is intentionally not a superuser. +Therefore, extensions live outside the migration system: + +- **Production bootstrap:** A DBA runs `CREATE EXTENSION postgis` once + before the first `migrate.py` run. +- **Test database:** The `central_test` role is a superuser, allowing + test fixtures to self-bootstrap extensions. + +This is documented in [docs/test-database.md](test-database.md). + +Do not add `CREATE EXTENSION` statements to migrations — they will fail +in production where migrations run as the non-superuser `central` role. diff --git a/docs/test-database.md b/docs/test-database.md new file mode 100644 index 0000000..71c71a4 --- /dev/null +++ b/docs/test-database.md @@ -0,0 +1,83 @@ +# Test Database Setup + +Central's integration tests require a PostgreSQL database. This document +covers one-time setup and maintenance of the test database. + +## DSN Convention + +Tests default to: + +``` +postgresql://central_test:testpass@localhost/central_test +``` + +Override via the `CENTRAL_TEST_DB_DSN` environment variable: + +```bash +export CENTRAL_TEST_DB_DSN="postgresql://myuser:mypass@localhost/mydb" +``` + +## One-Time Setup + +Run these commands once on a fresh PostgreSQL installation: + +```bash +# Create the test user (as postgres superuser) +sudo -u postgres createuser -s central_test +sudo -u postgres psql -c "ALTER USER central_test PASSWORD 'testpass'" + +# Create the test database +sudo -u postgres createdb -O central_test central_test + +# Install required extensions +sudo -u postgres psql central_test -c "CREATE EXTENSION IF NOT EXISTS postgis" +``` + +**Note:** The `central_test` role is created as a superuser (`-s` flag). +This allows test fixtures to self-bootstrap extensions like PostGIS via +`CREATE EXTENSION IF NOT EXISTS`. Production uses a non-superuser role. + +## Required Extensions + +| Extension | Version | Purpose | +|-----------|---------|---------| +| postgis | 3.4+ | Geometry types for geospatial event data | + +## Why PostGIS Is Not in Migrations + +PostGIS requires superuser privileges to install. The production `central` +role is intentionally not a superuser for security reasons. Therefore: + +- **Production:** A DBA must run `CREATE EXTENSION postgis` before the + first `migrate.py` run. This is a one-time bootstrap step. +- **Test:** The `central_test` role is a superuser, so test fixtures can + self-bootstrap PostGIS via `CREATE EXTENSION IF NOT EXISTS`. + +This divergence is documented rather than "fixed" because granting +superuser to production roles creates security risk, and the PostgreSQL +packaging on Ubuntu does not mark PostGIS as a trusted extension. + +## Resetting the Test Database + +If the test database gets into a bad state: + +```bash +# Drop and recreate +sudo -u postgres dropdb central_test +sudo -u postgres createdb -O central_test central_test +sudo -u postgres psql central_test -c "CREATE EXTENSION IF NOT EXISTS postgis" +``` + +Test fixtures handle their own table creation and cleanup, so this is +rarely needed. + +## Running Tests + +```bash +cd /opt/central +uv run pytest tests/ # all tests +uv run pytest tests/test_config_store.py -v # specific file +``` + +Tests that require the database will skip gracefully if the connection +fails, though most integration tests will fail without a working DB. diff --git a/sql/migrations/011_events_add_adapter_column.sql b/sql/migrations/011_events_add_adapter_column.sql new file mode 100644 index 0000000..14596dd --- /dev/null +++ b/sql/migrations/011_events_add_adapter_column.sql @@ -0,0 +1,46 @@ +-- Migration 011: Add adapter column to events, drop source column +-- Replaces module-path-based source with stable adapter identifier + +-- Add adapter column (idempotent) +ALTER TABLE public.events ADD COLUMN IF NOT EXISTS adapter TEXT; + +-- Backfill from existing source values +UPDATE public.events +SET adapter = REPLACE(source, 'central/adapters/', '') +WHERE adapter IS NULL AND source IS NOT NULL; + +-- Make NOT NULL after backfill (idempotent check) +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'events' + AND column_name = 'adapter' + AND is_nullable = 'YES' + ) THEN + ALTER TABLE public.events ALTER COLUMN adapter SET NOT NULL; + END IF; +END $$; + +-- Add FK constraint (idempotent check) +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'events_adapter_fkey' + AND table_name = 'events' + ) THEN + ALTER TABLE public.events + ADD CONSTRAINT events_adapter_fkey + FOREIGN KEY (adapter) REFERENCES config.adapters(name) + ON DELETE RESTRICT; + END IF; +END $$; + +-- Add index for dashboard queries (idempotent) +CREATE INDEX IF NOT EXISTS events_adapter_received_idx +ON public.events (adapter, received DESC); + +-- Drop deprecated source column (idempotent) +ALTER TABLE public.events DROP COLUMN IF EXISTS source; diff --git a/sql/schema.sql b/sql/schema.sql deleted file mode 100644 index 9a89e19..0000000 --- a/sql/schema.sql +++ /dev/null @@ -1,30 +0,0 @@ --- Central Data Hub schema --- PostgreSQL 16 + TimescaleDB + PostGIS - -CREATE TABLE IF NOT EXISTS events ( - id TEXT NOT NULL, -- CloudEvent id - source TEXT NOT NULL, -- adapter identity - category TEXT NOT NULL, -- "wx.alert." - time TIMESTAMPTZ NOT NULL, -- event-time UTC - expires TIMESTAMPTZ, - severity SMALLINT, -- 0..4 or NULL - geom GEOMETRY(Geometry, 4326), -- centroid or bbox as Polygon - regions TEXT[], -- ["US-ID-Ada", ...] - primary_region TEXT, - payload JSONB NOT NULL, -- full Event as JSON - received TIMESTAMPTZ NOT NULL DEFAULT now(), - PRIMARY KEY (id, time) -- composite PK for TimescaleDB -); - -SELECT create_hypertable('events', 'time', if_not_exists => TRUE); - -CREATE INDEX IF NOT EXISTS events_category_time_idx - ON events (category, time DESC); - -CREATE INDEX IF NOT EXISTS events_geom_gist - ON events USING GIST (geom); - -CREATE INDEX IF NOT EXISTS events_regions_gin - ON events USING GIN (regions); - --- Dedup on insert via ON CONFLICT (id, time) in the archive consumer. diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index ee2ac30..3f746fa 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -330,7 +330,7 @@ class FIRMSAdapter(SourceAdapter): return Event( id=stable_id, - source="central/adapters/firms", + adapter="firms", category=f"fire.hotspot.{satellite_short}.{confidence}", time=time, expires=None, diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index fa12f98..129584f 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -475,7 +475,7 @@ class NWSAdapter(SourceAdapter): return Event( id=event_id, - source="central/adapters/nws", + adapter="nws", category=category, time=time, expires=expires, diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index c4e871d..601c52b 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -348,7 +348,7 @@ class USGSQuakeAdapter(SourceAdapter): return Event( id=event_id, - source="central/adapters/usgs_quake", + adapter="usgs_quake", category=f"quake.event.{tier}", time=event_time, expires=None, diff --git a/src/central/archive.py b/src/central/archive.py index 05cc7e0..b64a187 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -1,4 +1,8 @@ -"""Central archive consumer - JetStream to TimescaleDB.""" +"""Central archive consumer - JetStream to TimescaleDB. + +Consumes events from multiple NATS JetStream streams and archives them +to TimescaleDB. One durable consumer per stream for independent ack tracking. +""" import asyncio import json @@ -12,17 +16,27 @@ import asyncpg import nats from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy +from nats.js.errors import NotFoundError from central.bootstrap_config import get_settings -CONSUMER_NAME = "archive" -STREAM_NAME = "CENTRAL_WX" -SUBJECT_FILTER = "central.wx.>" +# Event-bearing streams to consume (skip CENTRAL_META - status messages only) +STREAMS = [ + ("CENTRAL_WX", "central.wx.>"), + ("CENTRAL_FIRE", "central.fire.>"), + ("CENTRAL_QUAKE", "central.quake.>"), +] + BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 ACK_WAIT = 30 +def consumer_name_for(stream: str) -> str: + """Generate consumer name for a stream.""" + return f"archive-{stream.lower()}" + + class JsonFormatter(logging.Formatter): """JSON log formatter for structured logging.""" @@ -125,24 +139,49 @@ class ArchiveConsumer: self._js = None logger.info("Disconnected") - async def _ensure_consumer(self) -> None: - """Ensure the durable consumer exists.""" + async def _cleanup_orphaned_consumer(self) -> None: + """Remove orphaned 'archive' consumer from CENTRAL_WX if it exists. + + The old single-stream code used a consumer named 'archive' on CENTRAL_WX. + Now we use 'archive-central_wx' instead. Clean up the old one. + """ if not self._js: return try: - await self._js.consumer_info(STREAM_NAME, CONSUMER_NAME) - logger.info("Consumer exists", extra={"consumer": CONSUMER_NAME}) - except nats.js.errors.NotFoundError: + await self._js.consumer_info("CENTRAL_WX", "archive") + await self._js.delete_consumer("CENTRAL_WX", "archive") + logger.info("Removed orphaned 'archive' consumer from CENTRAL_WX") + except NotFoundError: + pass # Already gone or never existed + + async def _ensure_consumer( + self, stream_name: str, subject_filter: str, consumer_name: str + ) -> None: + """Ensure the durable consumer exists for a stream.""" + if not self._js: + return + + try: + await self._js.consumer_info(stream_name, consumer_name) + logger.info( + "Consumer exists", + extra={"stream": stream_name, "consumer": consumer_name} + ) + except NotFoundError: consumer_config = ConsumerConfig( - durable_name=CONSUMER_NAME, + durable_name=consumer_name, deliver_policy=DeliverPolicy.ALL, ack_policy=AckPolicy.EXPLICIT, ack_wait=ACK_WAIT, - filter_subject=SUBJECT_FILTER, + max_deliver=5, + filter_subject=subject_filter, + ) + await self._js.add_consumer(stream_name, consumer_config) + logger.info( + "Consumer created", + extra={"stream": stream_name, "consumer": consumer_name} ) - await self._js.add_consumer(STREAM_NAME, consumer_config) - logger.info("Consumer created", extra={"consumer": CONSUMER_NAME}) async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None: """Process a single message and insert into database.""" @@ -157,7 +196,7 @@ class ArchiveConsumer: geo_data = event_data.get("geo") event_id = envelope.get("id") - source = event_data.get("source", "") + adapter = event_data.get("adapter", "") category = event_data.get("category", "") time_str = event_data.get("time") expires_str = event_data.get("expires") @@ -194,12 +233,12 @@ class ArchiveConsumer: if geom_json: await conn.execute( """ - INSERT INTO events (id, source, category, time, expires, severity, + INSERT INTO events (id, adapter, category, time, expires, severity, geom, regions, primary_region, payload) VALUES ($1, $2, $3, $4, $5, $6, ST_GeomFromGeoJSON($7), $8, $9, $10) ON CONFLICT (id, time) DO UPDATE SET - source = EXCLUDED.source, + adapter = EXCLUDED.adapter, category = EXCLUDED.category, expires = EXCLUDED.expires, severity = EXCLUDED.severity, @@ -208,17 +247,17 @@ class ArchiveConsumer: primary_region = EXCLUDED.primary_region, payload = EXCLUDED.payload """, - event_id, source, category, event_time, expires_time, severity, + event_id, adapter, category, event_time, expires_time, severity, geom_json, regions, primary_region, json.dumps(envelope) ) else: await conn.execute( """ - INSERT INTO events (id, source, category, time, expires, severity, + INSERT INTO events (id, adapter, category, time, expires, severity, geom, regions, primary_region, payload) VALUES ($1, $2, $3, $4, $5, $6, NULL, $7, $8, $9) ON CONFLICT (id, time) DO UPDATE SET - source = EXCLUDED.source, + adapter = EXCLUDED.adapter, category = EXCLUDED.category, expires = EXCLUDED.expires, severity = EXCLUDED.severity, @@ -227,7 +266,7 @@ class ArchiveConsumer: primary_region = EXCLUDED.primary_region, payload = EXCLUDED.payload """, - event_id, source, category, event_time, expires_time, severity, + event_id, adapter, category, event_time, expires_time, severity, regions, primary_region, json.dumps(envelope) ) @@ -241,22 +280,24 @@ class ArchiveConsumer: ) # Don't ack - let it be redelivered - async def _consume_loop(self) -> None: - """Main consume loop.""" + async def _consume_stream( + self, stream_name: str, subject_filter: str, consumer_name: str + ) -> None: + """Consume loop for a single stream.""" if not self._js or not self._pool: return - await self._ensure_consumer() + await self._ensure_consumer(stream_name, subject_filter, consumer_name) sub = await self._js.pull_subscribe( - SUBJECT_FILTER, - durable=CONSUMER_NAME, - stream=STREAM_NAME, + subject_filter, + durable=consumer_name, + stream=stream_name, ) logger.info( "Subscribed to stream", - extra={"stream": STREAM_NAME, "filter": SUBJECT_FILTER} + extra={"stream": stream_name, "filter": subject_filter} ) while not self._shutdown_event.is_set(): @@ -277,19 +318,62 @@ class ArchiveConsumer: except asyncio.CancelledError: break except Exception as e: - logger.exception("Error in consume loop", extra={"error": str(e)}) + logger.exception( + "Error in consume loop", + extra={"stream": stream_name, "error": str(e)} + ) await asyncio.sleep(1) - logger.info("Consume loop stopped") + logger.info("Consume loop stopped", extra={"stream": stream_name}) async def start(self) -> None: """Start the consumer.""" await self.connect() + await self._cleanup_orphaned_consumer() logger.info("Archive consumer ready") async def run(self) -> None: - """Run the consume loop until shutdown.""" - await self._consume_loop() + """Run consume loops for all streams until shutdown.""" + tasks = [] + for stream_name, subject_filter in STREAMS: + consumer_name = consumer_name_for(stream_name) + task = asyncio.create_task( + self._consume_stream(stream_name, subject_filter, consumer_name), + name=f"consume-{stream_name}", + ) + tasks.append(task) + + try: + # Wait for all tasks; if one fails, cancel the others + done, pending = await asyncio.wait( + tasks, + return_when=asyncio.FIRST_EXCEPTION, + ) + + # Check for exceptions in completed tasks + for task in done: + if task.exception(): + logger.error( + "Stream consumer failed", + extra={"task": task.get_name(), "error": str(task.exception())} + ) + + # Cancel any remaining tasks + for task in pending: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + except asyncio.CancelledError: + # Shutdown requested, cancel all tasks + for task in tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass async def stop(self) -> None: """Stop the consumer gracefully.""" @@ -308,7 +392,6 @@ async def async_main() -> None: "Archive starting", extra={ "nats_url": settings.nats_url, - }, ) diff --git a/src/central/gui/__init__.py b/src/central/gui/__init__.py index 1907d44..87f1269 100644 --- a/src/central/gui/__init__.py +++ b/src/central/gui/__init__.py @@ -80,12 +80,16 @@ async def lifespan(app: FastAPI): from central.bootstrap_config import get_settings from central.gui.db import close_pool, init_pool + from central.gui.nats import close_nats, init_nats settings = get_settings() # Initialize database pool await init_pool(settings.db_dsn) + # Initialize NATS connection + await init_nats(settings.nats_url) + # Start session cleanup task _shutdown_event = asyncio.Event() _cleanup_task = asyncio.create_task(_session_cleanup_loop()) @@ -103,6 +107,7 @@ async def lifespan(app: FastAPI): except asyncio.TimeoutError: _cleanup_task.cancel() + await close_nats() await close_pool() logger.info("Central GUI stopped") diff --git a/src/central/gui/audit.py b/src/central/gui/audit.py index 428275a..7d2f8f1 100644 --- a/src/central/gui/audit.py +++ b/src/central/gui/audit.py @@ -9,6 +9,7 @@ AUTH_LOGIN_FAILED = "auth.login_failed" AUTH_LOGOUT = "auth.logout" AUTH_PASSWORD_CHANGE = "auth.password_change" OPERATOR_CREATE = "operator.create" +ADAPTER_UPDATE = "adapter.update" async def write_audit( @@ -20,18 +21,15 @@ async def write_audit( after: dict[str, Any] | None = None, ) -> None: """Write an audit log entry.""" - # Serialize before/after as JSON strings if provided - before_json = json.dumps(before) if before else None - after_json = json.dumps(after) if after else None - + # asyncpg handles dict -> jsonb conversion automatically await conn.execute( """ INSERT INTO config.audit_log (operator_id, action, target, before, after) - VALUES ($1, $2, $3, $4::jsonb, $5::jsonb) + VALUES ($1, $2, $3, $4, $5) """, operator_id, action, target, - before_json, - after_json, + before, + after, ) diff --git a/src/central/gui/nats.py b/src/central/gui/nats.py new file mode 100644 index 0000000..393e1f9 --- /dev/null +++ b/src/central/gui/nats.py @@ -0,0 +1,46 @@ +"""NATS connection for GUI.""" + +import logging + +import nats +from nats.js import JetStreamContext + +logger = logging.getLogger(__name__) + +_nc: nats.NATS | None = None +_js: JetStreamContext | None = None + + +async def init_nats(url: str) -> JetStreamContext | None: + """Initialize the NATS connection and JetStream context.""" + global _nc, _js + if _nc is None: + try: + _nc = await nats.connect(url) + _js = _nc.jetstream() + logger.info("Connected to NATS", extra={"url": url}) + except Exception as e: + logger.warning("Failed to connect to NATS", extra={"error": str(e)}) + _nc = None + _js = None + return _js + + +def get_js() -> JetStreamContext | None: + """Get the JetStream context. Returns None if not connected.""" + return _js + + +async def close_nats() -> None: + """Close the NATS connection.""" + global _nc, _js + if _nc is not None: + try: + await _nc.drain() + await _nc.close() + logger.info("Disconnected from NATS") + except Exception as e: + logger.warning("Error closing NATS", extra={"error": str(e)}) + finally: + _nc = None + _js = None diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 1b9dc24..755cb80 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1,5 +1,9 @@ """Route handlers for Central GUI.""" +import json +import re +from typing import Any + from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi_csrf_protect import CsrfProtect @@ -12,6 +16,7 @@ from central.gui.auth import ( verify_password, ) from central.gui.audit import ( + ADAPTER_UPDATE, AUTH_LOGIN, AUTH_LOGIN_FAILED, AUTH_LOGOUT, @@ -23,6 +28,24 @@ from central.gui.db import get_pool router = APIRouter() +# Streams to display on dashboard +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] + +# Email validation regex (simple but effective) +EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") + + +def _get_valid_satellites() -> list[str]: + """Get valid satellite identifiers from firms adapter.""" + from central.adapters.firms import SATELLITE_SHORT + return list(SATELLITE_SHORT.keys()) + + +def _get_valid_feeds() -> set[str]: + """Get valid feed values from usgs_quake adapter.""" + from central.adapters.usgs_quake import VALID_FEEDS + return VALID_FEEDS + def _get_templates(): """Get templates instance (deferred import to avoid circular).""" @@ -30,6 +53,15 @@ def _get_templates(): return templates +def _format_bytes(size: int) -> str: + """Format bytes as human-readable string.""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size < 1024: + return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}" + size /= 1024 + return f"{size:.1f} PB" + + def _set_session_cookie( response: Response, token: str, @@ -76,6 +108,154 @@ async def index(request: Request, csrf_protect: CsrfProtect = Depends()) -> HTML return response +@router.get("/dashboard/events", response_class=HTMLResponse) +async def dashboard_events(request: Request) -> HTMLResponse: + """Get events by adapter for the last 24 hours.""" + templates = _get_templates() + pool = get_pool() + + events = [] + error = None + + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT adapter, COUNT(*) as count + FROM events + WHERE received > NOW() - INTERVAL '24 hours' + GROUP BY adapter + ORDER BY count DESC + """ + ) + events = [{"adapter": row["adapter"], "count": row["count"]} for row in rows] + except Exception as e: + error = f"Database error: {str(e)}" + + return templates.TemplateResponse( + request=request, + name="_dashboard_events.html", + context={"events": events, "error": error}, + ) + + +@router.get("/dashboard/streams", response_class=HTMLResponse) +async def dashboard_streams(request: Request) -> HTMLResponse: + """Get stream sizes from NATS JetStream.""" + from central.gui.nats import get_js + + templates = _get_templates() + js = get_js() + + streams = None + error = None + + if js is None: + error = "NATS unavailable" + else: + streams = [] + for stream_name in DASHBOARD_STREAMS: + try: + info = await js.stream_info(stream_name) + streams.append({ + "name": stream_name, + "messages": info.state.messages, + "size": _format_bytes(info.state.bytes), + "error": None, + }) + except Exception: + streams.append({ + "name": stream_name, + "messages": 0, + "size": "0 B", + "error": "unavailable", + }) + + return templates.TemplateResponse( + request=request, + name="_dashboard_streams.html", + context={"streams": streams, "error": error}, + ) + + +@router.get("/dashboard/polls", response_class=HTMLResponse) +async def dashboard_polls(request: Request) -> HTMLResponse: + """Get last poll times for each adapter.""" + from central.gui.nats import get_js + + templates = _get_templates() + pool = get_pool() + js = get_js() + + adapters = [] + error = None + + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT name FROM config.adapters ORDER BY name" + ) + adapter_names = [row["name"] for row in rows] + except Exception as e: + error = f"Database error: {str(e)}" + return templates.TemplateResponse( + request=request, + name="_dashboard_polls.html", + context={"adapters": [], "error": error}, + ) + + if js is None: + error = "NATS unavailable" + adapters = [{"name": name, "last_poll": None, "status": None, "error": "NATS unavailable"} for name in adapter_names] + else: + for name in adapter_names: + try: + # Get last message from CENTRAL_META for this adapter + sub = await js.pull_subscribe( + f"central.meta.{name}.status", + durable=f"dashboard-poll-{name}", + stream="CENTRAL_META", + ) + try: + msgs = await sub.fetch(1, timeout=1.0) + if msgs: + data = json.loads(msgs[0].data.decode()) + last_poll = data.get("data", {}).get("time", "—") + adapters.append({ + "name": name, + "last_poll": last_poll, + "status": "✓", + "error": None, + }) + else: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": None, + }) + except Exception: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": None, + }) + except Exception: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": "unavailable", + }) + + return templates.TemplateResponse( + request=request, + name="_dashboard_polls.html", + context={"adapters": adapters, "error": error}, + ) + + @router.get("/setup", response_class=HTMLResponse) async def setup_form( request: Request, @@ -370,3 +550,285 @@ async def change_password_submit( # Redirect to index return RedirectResponse(url="/", status_code=302) + + +# ============================================================================= +# Adapters routes +# ============================================================================= + + +@router.get("/adapters", response_class=HTMLResponse) +async def adapters_list( + request: Request, + csrf_protect: CsrfProtect = Depends(), +) -> HTMLResponse: + """List all adapters.""" + templates = _get_templates() + pool = get_pool() + operator = request.state.operator + + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT name, enabled, cadence_s, settings, paused_at, updated_at + FROM config.adapters + ORDER BY name + """ + ) + + adapters = [] + for row in rows: + settings = row["settings"] or {} + adapters.append({ + "name": row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + }) + + csrf_token, signed_token = csrf_protect.generate_csrf_tokens() + response = templates.TemplateResponse( + request=request, + name="adapters_list.html", + context={ + "operator": operator, + "csrf_token": csrf_token, + "adapters": adapters, + }, + ) + csrf_protect.set_csrf_cookie(signed_token, response) + return response + + +@router.get("/adapters/{name}", response_class=HTMLResponse) +async def adapters_edit_form( + request: Request, + name: str, + csrf_protect: CsrfProtect = Depends(), +) -> Response: + """Render the adapter edit form.""" + templates = _get_templates() + pool = get_pool() + operator = request.state.operator + + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT name, enabled, cadence_s, settings, paused_at, updated_at + FROM config.adapters + WHERE name = $1 + """, + name, + ) + + if row is None: + return Response(status_code=404, content="Adapter not found") + + # Get API keys for firms dropdown + api_keys = await conn.fetch( + "SELECT alias FROM config.api_keys ORDER BY alias" + ) + + settings = row["settings"] or {} + adapter = { + "name": row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + } + + csrf_token, signed_token = csrf_protect.generate_csrf_tokens() + response = templates.TemplateResponse( + request=request, + name="adapters_edit.html", + context={ + "operator": operator, + "csrf_token": csrf_token, + "adapter": adapter, + "errors": None, + "form_data": None, + "api_keys": [{"alias": k["alias"]} for k in api_keys], + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), + }, + ) + csrf_protect.set_csrf_cookie(signed_token, response) + return response + + +@router.post("/adapters/{name}") +async def adapters_edit_submit( + request: Request, + name: str, + csrf_protect: CsrfProtect = Depends(), +) -> Response: + """Process the adapter edit form.""" + templates = _get_templates() + pool = get_pool() + operator = request.state.operator + + # Validate CSRF + await csrf_protect.validate_csrf(request) + + # Parse form data + form = await request.form() + enabled = "enabled" in form + cadence_s_str = form.get("cadence_s", "") + + # Build form_data for re-render on error + form_data: dict[str, Any] = { + "enabled": enabled, + "cadence_s": cadence_s_str, + } + + errors: dict[str, str] = {} + + # Validate cadence_s + try: + cadence_s = int(cadence_s_str) + if cadence_s < 60 or cadence_s > 3600: + errors["cadence_s"] = "Cadence must be between 60 and 3600 seconds" + except ValueError: + errors["cadence_s"] = "Cadence must be a valid integer" + cadence_s = 0 + + async with pool.acquire() as conn: + # Get current adapter state + row = await conn.fetchrow( + """ + SELECT name, enabled, cadence_s, settings, paused_at, updated_at + FROM config.adapters + WHERE name = $1 + """, + name, + ) + + if row is None: + return Response(status_code=404, content="Adapter not found") + + current_settings = row["settings"] or {} + new_settings = dict(current_settings) + + # Adapter-specific validation and settings update + if name == "nws": + contact_email = form.get("contact_email", "").strip() + form_data["contact_email"] = contact_email + if not contact_email: + errors["contact_email"] = "Contact email is required" + elif not EMAIL_REGEX.match(contact_email): + errors["contact_email"] = "Invalid email format" + else: + new_settings["contact_email"] = contact_email + + elif name == "firms": + api_key_alias = form.get("api_key_alias", "").strip() + satellites = form.getlist("satellites") + form_data["api_key_alias"] = api_key_alias + form_data["satellites"] = satellites + + # Validate api_key_alias if set + if api_key_alias: + key_exists = await conn.fetchrow( + "SELECT 1 FROM config.api_keys WHERE alias = $1", + api_key_alias, + ) + if not key_exists: + errors["api_key_alias"] = f"API key alias '{api_key_alias}' does not exist" + else: + new_settings["api_key_alias"] = api_key_alias + else: + new_settings["api_key_alias"] = None + + # Validate satellites + valid_sats = set(_get_valid_satellites()) + invalid_sats = [s for s in satellites if s not in valid_sats] + if invalid_sats: + errors["satellites"] = f"Invalid satellites: {', '.join(invalid_sats)}" + else: + new_settings["satellites"] = satellites + + elif name == "usgs_quake": + feed = form.get("feed", "").strip() + form_data["feed"] = feed + valid_feeds = _get_valid_feeds() + if feed not in valid_feeds: + errors["feed"] = f"Invalid feed. Must be one of: {', '.join(sorted(valid_feeds))}" + else: + new_settings["feed"] = feed + + # If there are errors, re-render the form + if errors: + adapter = { + "name": row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": current_settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + } + + api_keys = await conn.fetch( + "SELECT alias FROM config.api_keys ORDER BY alias" + ) + + csrf_token, signed_token = csrf_protect.generate_csrf_tokens() + response = templates.TemplateResponse( + request=request, + name="adapters_edit.html", + context={ + "operator": operator, + "csrf_token": csrf_token, + "adapter": adapter, + "errors": errors, + "form_data": form_data, + "api_keys": [{"alias": k["alias"]} for k in api_keys], + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), + }, + status_code=200, + ) + csrf_protect.set_csrf_cookie(signed_token, response) + return response + + # Build before state for audit + before = { + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": current_settings, + } + + # Build after state for audit + after = { + "enabled": enabled, + "cadence_s": cadence_s, + "settings": new_settings, + } + + # Update the adapter + await conn.execute( + """ + UPDATE config.adapters + SET enabled = $1, cadence_s = $2, settings = $3, updated_at = now() + WHERE name = $4 + """, + enabled, + cadence_s, + new_settings, + name, + ) + + # Write audit log + await write_audit( + conn, + ADAPTER_UPDATE, + operator_id=operator.id, + target=name, + before=before, + after=after, + ) + + return RedirectResponse(url="/adapters", status_code=302) diff --git a/src/central/gui/templates/_dashboard_events.html b/src/central/gui/templates/_dashboard_events.html new file mode 100644 index 0000000..690b6ce --- /dev/null +++ b/src/central/gui/templates/_dashboard_events.html @@ -0,0 +1,22 @@ +{% if error %} +

{{ error }}

+{% elif events %} + + + + + + + + + {% for event in events %} + + + + + {% endfor %} + +
AdapterCount
{{ event.adapter }}{{ event.count }}
+{% else %} +

No events in the last 24 hours.

+{% endif %} diff --git a/src/central/gui/templates/_dashboard_polls.html b/src/central/gui/templates/_dashboard_polls.html new file mode 100644 index 0000000..c80e98b --- /dev/null +++ b/src/central/gui/templates/_dashboard_polls.html @@ -0,0 +1,31 @@ +{% if error %} +

{{ error }}

+{% elif adapters %} + + + + + + + + + + {% for adapter in adapters %} + + + {% if adapter.error %} + + {% elif adapter.last_poll %} + + + {% else %} + + + {% endif %} + + {% endfor %} + +
AdapterLast PollStatus
{{ adapter.name }}{{ adapter.error }}{{ adapter.last_poll }}{{ adapter.status }}
+{% else %} +

No adapter data available.

+{% endif %} diff --git a/src/central/gui/templates/_dashboard_streams.html b/src/central/gui/templates/_dashboard_streams.html new file mode 100644 index 0000000..246cbca --- /dev/null +++ b/src/central/gui/templates/_dashboard_streams.html @@ -0,0 +1,28 @@ +{% if error %} +

{{ error }}

+{% elif streams %} + + + + + + + + + + {% for stream in streams %} + + + {% if stream.error %} + + {% else %} + + + {% endif %} + + {% endfor %} + +
StreamMessagesSize
{{ stream.name }}{{ stream.error }}{{ stream.messages }}{{ stream.size }}
+{% else %} +

No stream data available.

+{% endif %} diff --git a/src/central/gui/templates/adapters_edit.html b/src/central/gui/templates/adapters_edit.html new file mode 100644 index 0000000..fe7e093 --- /dev/null +++ b/src/central/gui/templates/adapters_edit.html @@ -0,0 +1,49 @@ +{% extends "base.html" %} + +{% block title %}Central — Edit {{ adapter.name }}{% endblock %} + +{% block content %} +

Edit Adapter: {{ adapter.name }}

+ +
+ + +
+ Universal Settings + + + + + + {% if errors and errors.cadence_s %} + {{ errors.cadence_s }} + {% endif %} +
+ +
+ Adapter-Specific Settings + {% include "adapters_edit_" + adapter.name + ".html" %} +
+ +
+ Region (read-only) + {% if adapter.settings.region %} +

+ North: {{ adapter.settings.region.north }}
+ South: {{ adapter.settings.region.south }}
+ East: {{ adapter.settings.region.east }}
+ West: {{ adapter.settings.region.west }} +

+ {% else %} +

No region configured.

+ {% endif %} + Region editing comes in 1b-5. +
+ + + Cancel +
+{% endblock %} diff --git a/src/central/gui/templates/adapters_edit_firms.html b/src/central/gui/templates/adapters_edit_firms.html new file mode 100644 index 0000000..a2a339a --- /dev/null +++ b/src/central/gui/templates/adapters_edit_firms.html @@ -0,0 +1,21 @@ + + +{% if errors and errors.api_key_alias %} +{{ errors.api_key_alias }} +{% endif %} + + +{% for sat in valid_satellites %} + +{% endfor %} +{% if errors and errors.satellites %} +{{ errors.satellites }} +{% endif %} diff --git a/src/central/gui/templates/adapters_edit_nws.html b/src/central/gui/templates/adapters_edit_nws.html new file mode 100644 index 0000000..e655a41 --- /dev/null +++ b/src/central/gui/templates/adapters_edit_nws.html @@ -0,0 +1,5 @@ + + +{% if errors and errors.contact_email %} +{{ errors.contact_email }} +{% endif %} diff --git a/src/central/gui/templates/adapters_edit_usgs_quake.html b/src/central/gui/templates/adapters_edit_usgs_quake.html new file mode 100644 index 0000000..0c3b7ee --- /dev/null +++ b/src/central/gui/templates/adapters_edit_usgs_quake.html @@ -0,0 +1,9 @@ + + +{% if errors and errors.feed %} +{{ errors.feed }} +{% endif %} diff --git a/src/central/gui/templates/adapters_list.html b/src/central/gui/templates/adapters_list.html new file mode 100644 index 0000000..b97ae88 --- /dev/null +++ b/src/central/gui/templates/adapters_list.html @@ -0,0 +1,29 @@ +{% extends "base.html" %} + +{% block title %}Central — Adapters{% endblock %} + +{% block content %} +

Adapters

+ + + + + + + + + + + + {% for adapter in adapters %} + + + + + + + + {% endfor %} + +
NameEnabledCadenceLast Updated
{{ adapter.name }}{% if adapter.enabled %}Yes{% else %}No{% endif %}{{ adapter.cadence_s }}s{{ adapter.updated_at.strftime('%Y-%m-%d %H:%M') if adapter.updated_at else '—' }}Edit
+{% endblock %} diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index 631c542..1d2e24b 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -15,6 +15,8 @@