diff --git a/README.md b/README.md index f057003..36bcb4c 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,6 @@ Phase 0 — scaffold. Not yet operational. - One archive consumer process persisting events to TimescaleDB - Both processes systemd-managed -## Testing - -See [docs/test-database.md](docs/test-database.md) for test database setup. - ## License MIT. See LICENSE. diff --git a/docs/migrations.md b/docs/migrations.md index ccf2e61..ca53862 100644 --- a/docs/migrations.md +++ b/docs/migrations.md @@ -1,11 +1,5 @@ # Migration policy -## Migrations are the sole source of truth - -The `sql/migrations/` directory contains all schema definitions. There is -no separate schema.sql file; use `pg_dump -s central` to generate a -human-readable snapshot of the current schema when needed. - ## Migrations must be idempotent New migration files (007+) must use guards so they can be safely @@ -26,24 +20,8 @@ Direct `psql` execution bypasses the `schema_migrations` tracker and was the cause of the v0.2.0 reconcile. If a migration needs to be applied on the live system, run: - sudo -u central /opt/central/.venv/bin/python -m central.migrate + sudo -u central /opt/central/.venv/bin/python -m scripts.migrate Never apply migration SQL directly via `psql`, even as a superuser, even "just to test." If migrate.py has a bug that's blocking you, fix migrate.py. - -## Extensions are not in migrations - -PostgreSQL extensions like PostGIS require superuser privileges to -install. The production `central` role is intentionally not a superuser. -Therefore, extensions live outside the migration system: - -- **Production bootstrap:** A DBA runs `CREATE EXTENSION postgis` once - before the first `migrate.py` run. -- **Test database:** The `central_test` role is a superuser, allowing - test fixtures to self-bootstrap extensions. - -This is documented in [docs/test-database.md](test-database.md). - -Do not add `CREATE EXTENSION` statements to migrations — they will fail -in production where migrations run as the non-superuser `central` role. diff --git a/docs/test-database.md b/docs/test-database.md deleted file mode 100644 index 71c71a4..0000000 --- a/docs/test-database.md +++ /dev/null @@ -1,83 +0,0 @@ -# Test Database Setup - -Central's integration tests require a PostgreSQL database. This document -covers one-time setup and maintenance of the test database. - -## DSN Convention - -Tests default to: - -``` -postgresql://central_test:testpass@localhost/central_test -``` - -Override via the `CENTRAL_TEST_DB_DSN` environment variable: - -```bash -export CENTRAL_TEST_DB_DSN="postgresql://myuser:mypass@localhost/mydb" -``` - -## One-Time Setup - -Run these commands once on a fresh PostgreSQL installation: - -```bash -# Create the test user (as postgres superuser) -sudo -u postgres createuser -s central_test -sudo -u postgres psql -c "ALTER USER central_test PASSWORD 'testpass'" - -# Create the test database -sudo -u postgres createdb -O central_test central_test - -# Install required extensions -sudo -u postgres psql central_test -c "CREATE EXTENSION IF NOT EXISTS postgis" -``` - -**Note:** The `central_test` role is created as a superuser (`-s` flag). -This allows test fixtures to self-bootstrap extensions like PostGIS via -`CREATE EXTENSION IF NOT EXISTS`. Production uses a non-superuser role. - -## Required Extensions - -| Extension | Version | Purpose | -|-----------|---------|---------| -| postgis | 3.4+ | Geometry types for geospatial event data | - -## Why PostGIS Is Not in Migrations - -PostGIS requires superuser privileges to install. The production `central` -role is intentionally not a superuser for security reasons. Therefore: - -- **Production:** A DBA must run `CREATE EXTENSION postgis` before the - first `migrate.py` run. This is a one-time bootstrap step. -- **Test:** The `central_test` role is a superuser, so test fixtures can - self-bootstrap PostGIS via `CREATE EXTENSION IF NOT EXISTS`. - -This divergence is documented rather than "fixed" because granting -superuser to production roles creates security risk, and the PostgreSQL -packaging on Ubuntu does not mark PostGIS as a trusted extension. - -## Resetting the Test Database - -If the test database gets into a bad state: - -```bash -# Drop and recreate -sudo -u postgres dropdb central_test -sudo -u postgres createdb -O central_test central_test -sudo -u postgres psql central_test -c "CREATE EXTENSION IF NOT EXISTS postgis" -``` - -Test fixtures handle their own table creation and cleanup, so this is -rarely needed. - -## Running Tests - -```bash -cd /opt/central -uv run pytest tests/ # all tests -uv run pytest tests/test_config_store.py -v # specific file -``` - -Tests that require the database will skip gracefully if the connection -fails, though most integration tests will fail without a working DB. diff --git a/sql/migrations/011_events_add_adapter_column.sql b/sql/migrations/011_events_add_adapter_column.sql deleted file mode 100644 index 14596dd..0000000 --- a/sql/migrations/011_events_add_adapter_column.sql +++ /dev/null @@ -1,46 +0,0 @@ --- Migration 011: Add adapter column to events, drop source column --- Replaces module-path-based source with stable adapter identifier - --- Add adapter column (idempotent) -ALTER TABLE public.events ADD COLUMN IF NOT EXISTS adapter TEXT; - --- Backfill from existing source values -UPDATE public.events -SET adapter = REPLACE(source, 'central/adapters/', '') -WHERE adapter IS NULL AND source IS NOT NULL; - --- Make NOT NULL after backfill (idempotent check) -DO $$ -BEGIN - IF EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_schema = 'public' - AND table_name = 'events' - AND column_name = 'adapter' - AND is_nullable = 'YES' - ) THEN - ALTER TABLE public.events ALTER COLUMN adapter SET NOT NULL; - END IF; -END $$; - --- Add FK constraint (idempotent check) -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.table_constraints - WHERE constraint_name = 'events_adapter_fkey' - AND table_name = 'events' - ) THEN - ALTER TABLE public.events - ADD CONSTRAINT events_adapter_fkey - FOREIGN KEY (adapter) REFERENCES config.adapters(name) - ON DELETE RESTRICT; - END IF; -END $$; - --- Add index for dashboard queries (idempotent) -CREATE INDEX IF NOT EXISTS events_adapter_received_idx -ON public.events (adapter, received DESC); - --- Drop deprecated source column (idempotent) -ALTER TABLE public.events DROP COLUMN IF EXISTS source; diff --git a/sql/schema.sql b/sql/schema.sql new file mode 100644 index 0000000..9a89e19 --- /dev/null +++ b/sql/schema.sql @@ -0,0 +1,30 @@ +-- Central Data Hub schema +-- PostgreSQL 16 + TimescaleDB + PostGIS + +CREATE TABLE IF NOT EXISTS events ( + id TEXT NOT NULL, -- CloudEvent id + source TEXT NOT NULL, -- adapter identity + category TEXT NOT NULL, -- "wx.alert." + time TIMESTAMPTZ NOT NULL, -- event-time UTC + expires TIMESTAMPTZ, + severity SMALLINT, -- 0..4 or NULL + geom GEOMETRY(Geometry, 4326), -- centroid or bbox as Polygon + regions TEXT[], -- ["US-ID-Ada", ...] + primary_region TEXT, + payload JSONB NOT NULL, -- full Event as JSON + received TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (id, time) -- composite PK for TimescaleDB +); + +SELECT create_hypertable('events', 'time', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS events_category_time_idx + ON events (category, time DESC); + +CREATE INDEX IF NOT EXISTS events_geom_gist + ON events USING GIST (geom); + +CREATE INDEX IF NOT EXISTS events_regions_gin + ON events USING GIN (regions); + +-- Dedup on insert via ON CONFLICT (id, time) in the archive consumer. diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 3f746fa..ee2ac30 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -330,7 +330,7 @@ class FIRMSAdapter(SourceAdapter): return Event( id=stable_id, - adapter="firms", + source="central/adapters/firms", category=f"fire.hotspot.{satellite_short}.{confidence}", time=time, expires=None, diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 129584f..fa12f98 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -475,7 +475,7 @@ class NWSAdapter(SourceAdapter): return Event( id=event_id, - adapter="nws", + source="central/adapters/nws", category=category, time=time, expires=expires, diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 601c52b..c4e871d 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -348,7 +348,7 @@ class USGSQuakeAdapter(SourceAdapter): return Event( id=event_id, - adapter="usgs_quake", + source="central/adapters/usgs_quake", category=f"quake.event.{tier}", time=event_time, expires=None, diff --git a/src/central/archive.py b/src/central/archive.py index b64a187..05cc7e0 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -1,8 +1,4 @@ -"""Central archive consumer - JetStream to TimescaleDB. - -Consumes events from multiple NATS JetStream streams and archives them -to TimescaleDB. One durable consumer per stream for independent ack tracking. -""" +"""Central archive consumer - JetStream to TimescaleDB.""" import asyncio import json @@ -16,27 +12,17 @@ import asyncpg import nats from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy -from nats.js.errors import NotFoundError from central.bootstrap_config import get_settings -# Event-bearing streams to consume (skip CENTRAL_META - status messages only) -STREAMS = [ - ("CENTRAL_WX", "central.wx.>"), - ("CENTRAL_FIRE", "central.fire.>"), - ("CENTRAL_QUAKE", "central.quake.>"), -] - +CONSUMER_NAME = "archive" +STREAM_NAME = "CENTRAL_WX" +SUBJECT_FILTER = "central.wx.>" BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 ACK_WAIT = 30 -def consumer_name_for(stream: str) -> str: - """Generate consumer name for a stream.""" - return f"archive-{stream.lower()}" - - class JsonFormatter(logging.Formatter): """JSON log formatter for structured logging.""" @@ -139,49 +125,24 @@ class ArchiveConsumer: self._js = None logger.info("Disconnected") - async def _cleanup_orphaned_consumer(self) -> None: - """Remove orphaned 'archive' consumer from CENTRAL_WX if it exists. - - The old single-stream code used a consumer named 'archive' on CENTRAL_WX. - Now we use 'archive-central_wx' instead. Clean up the old one. - """ + async def _ensure_consumer(self) -> None: + """Ensure the durable consumer exists.""" if not self._js: return try: - await self._js.consumer_info("CENTRAL_WX", "archive") - await self._js.delete_consumer("CENTRAL_WX", "archive") - logger.info("Removed orphaned 'archive' consumer from CENTRAL_WX") - except NotFoundError: - pass # Already gone or never existed - - async def _ensure_consumer( - self, stream_name: str, subject_filter: str, consumer_name: str - ) -> None: - """Ensure the durable consumer exists for a stream.""" - if not self._js: - return - - try: - await self._js.consumer_info(stream_name, consumer_name) - logger.info( - "Consumer exists", - extra={"stream": stream_name, "consumer": consumer_name} - ) - except NotFoundError: + await self._js.consumer_info(STREAM_NAME, CONSUMER_NAME) + logger.info("Consumer exists", extra={"consumer": CONSUMER_NAME}) + except nats.js.errors.NotFoundError: consumer_config = ConsumerConfig( - durable_name=consumer_name, + durable_name=CONSUMER_NAME, deliver_policy=DeliverPolicy.ALL, ack_policy=AckPolicy.EXPLICIT, ack_wait=ACK_WAIT, - max_deliver=5, - filter_subject=subject_filter, - ) - await self._js.add_consumer(stream_name, consumer_config) - logger.info( - "Consumer created", - extra={"stream": stream_name, "consumer": consumer_name} + filter_subject=SUBJECT_FILTER, ) + await self._js.add_consumer(STREAM_NAME, consumer_config) + logger.info("Consumer created", extra={"consumer": CONSUMER_NAME}) async def _process_message(self, msg: Any, conn: asyncpg.Connection) -> None: """Process a single message and insert into database.""" @@ -196,7 +157,7 @@ class ArchiveConsumer: geo_data = event_data.get("geo") event_id = envelope.get("id") - adapter = event_data.get("adapter", "") + source = event_data.get("source", "") category = event_data.get("category", "") time_str = event_data.get("time") expires_str = event_data.get("expires") @@ -233,12 +194,12 @@ class ArchiveConsumer: if geom_json: await conn.execute( """ - INSERT INTO events (id, adapter, category, time, expires, severity, + INSERT INTO events (id, source, category, time, expires, severity, geom, regions, primary_region, payload) VALUES ($1, $2, $3, $4, $5, $6, ST_GeomFromGeoJSON($7), $8, $9, $10) ON CONFLICT (id, time) DO UPDATE SET - adapter = EXCLUDED.adapter, + source = EXCLUDED.source, category = EXCLUDED.category, expires = EXCLUDED.expires, severity = EXCLUDED.severity, @@ -247,17 +208,17 @@ class ArchiveConsumer: primary_region = EXCLUDED.primary_region, payload = EXCLUDED.payload """, - event_id, adapter, category, event_time, expires_time, severity, + event_id, source, category, event_time, expires_time, severity, geom_json, regions, primary_region, json.dumps(envelope) ) else: await conn.execute( """ - INSERT INTO events (id, adapter, category, time, expires, severity, + INSERT INTO events (id, source, category, time, expires, severity, geom, regions, primary_region, payload) VALUES ($1, $2, $3, $4, $5, $6, NULL, $7, $8, $9) ON CONFLICT (id, time) DO UPDATE SET - adapter = EXCLUDED.adapter, + source = EXCLUDED.source, category = EXCLUDED.category, expires = EXCLUDED.expires, severity = EXCLUDED.severity, @@ -266,7 +227,7 @@ class ArchiveConsumer: primary_region = EXCLUDED.primary_region, payload = EXCLUDED.payload """, - event_id, adapter, category, event_time, expires_time, severity, + event_id, source, category, event_time, expires_time, severity, regions, primary_region, json.dumps(envelope) ) @@ -280,24 +241,22 @@ class ArchiveConsumer: ) # Don't ack - let it be redelivered - async def _consume_stream( - self, stream_name: str, subject_filter: str, consumer_name: str - ) -> None: - """Consume loop for a single stream.""" + async def _consume_loop(self) -> None: + """Main consume loop.""" if not self._js or not self._pool: return - await self._ensure_consumer(stream_name, subject_filter, consumer_name) + await self._ensure_consumer() sub = await self._js.pull_subscribe( - subject_filter, - durable=consumer_name, - stream=stream_name, + SUBJECT_FILTER, + durable=CONSUMER_NAME, + stream=STREAM_NAME, ) logger.info( "Subscribed to stream", - extra={"stream": stream_name, "filter": subject_filter} + extra={"stream": STREAM_NAME, "filter": SUBJECT_FILTER} ) while not self._shutdown_event.is_set(): @@ -318,62 +277,19 @@ class ArchiveConsumer: except asyncio.CancelledError: break except Exception as e: - logger.exception( - "Error in consume loop", - extra={"stream": stream_name, "error": str(e)} - ) + logger.exception("Error in consume loop", extra={"error": str(e)}) await asyncio.sleep(1) - logger.info("Consume loop stopped", extra={"stream": stream_name}) + logger.info("Consume loop stopped") async def start(self) -> None: """Start the consumer.""" await self.connect() - await self._cleanup_orphaned_consumer() logger.info("Archive consumer ready") async def run(self) -> None: - """Run consume loops for all streams until shutdown.""" - tasks = [] - for stream_name, subject_filter in STREAMS: - consumer_name = consumer_name_for(stream_name) - task = asyncio.create_task( - self._consume_stream(stream_name, subject_filter, consumer_name), - name=f"consume-{stream_name}", - ) - tasks.append(task) - - try: - # Wait for all tasks; if one fails, cancel the others - done, pending = await asyncio.wait( - tasks, - return_when=asyncio.FIRST_EXCEPTION, - ) - - # Check for exceptions in completed tasks - for task in done: - if task.exception(): - logger.error( - "Stream consumer failed", - extra={"task": task.get_name(), "error": str(task.exception())} - ) - - # Cancel any remaining tasks - for task in pending: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - except asyncio.CancelledError: - # Shutdown requested, cancel all tasks - for task in tasks: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass + """Run the consume loop until shutdown.""" + await self._consume_loop() async def stop(self) -> None: """Stop the consumer gracefully.""" @@ -392,6 +308,7 @@ async def async_main() -> None: "Archive starting", extra={ "nats_url": settings.nats_url, + }, ) diff --git a/src/central/gui/__init__.py b/src/central/gui/__init__.py index 87f1269..1907d44 100644 --- a/src/central/gui/__init__.py +++ b/src/central/gui/__init__.py @@ -80,16 +80,12 @@ async def lifespan(app: FastAPI): from central.bootstrap_config import get_settings from central.gui.db import close_pool, init_pool - from central.gui.nats import close_nats, init_nats settings = get_settings() # Initialize database pool await init_pool(settings.db_dsn) - # Initialize NATS connection - await init_nats(settings.nats_url) - # Start session cleanup task _shutdown_event = asyncio.Event() _cleanup_task = asyncio.create_task(_session_cleanup_loop()) @@ -107,7 +103,6 @@ async def lifespan(app: FastAPI): except asyncio.TimeoutError: _cleanup_task.cancel() - await close_nats() await close_pool() logger.info("Central GUI stopped") diff --git a/src/central/gui/audit.py b/src/central/gui/audit.py index 7d2f8f1..428275a 100644 --- a/src/central/gui/audit.py +++ b/src/central/gui/audit.py @@ -9,7 +9,6 @@ AUTH_LOGIN_FAILED = "auth.login_failed" AUTH_LOGOUT = "auth.logout" AUTH_PASSWORD_CHANGE = "auth.password_change" OPERATOR_CREATE = "operator.create" -ADAPTER_UPDATE = "adapter.update" async def write_audit( @@ -21,15 +20,18 @@ async def write_audit( after: dict[str, Any] | None = None, ) -> None: """Write an audit log entry.""" - # asyncpg handles dict -> jsonb conversion automatically + # Serialize before/after as JSON strings if provided + before_json = json.dumps(before) if before else None + after_json = json.dumps(after) if after else None + await conn.execute( """ INSERT INTO config.audit_log (operator_id, action, target, before, after) - VALUES ($1, $2, $3, $4, $5) + VALUES ($1, $2, $3, $4::jsonb, $5::jsonb) """, operator_id, action, target, - before, - after, + before_json, + after_json, ) diff --git a/src/central/gui/nats.py b/src/central/gui/nats.py deleted file mode 100644 index 393e1f9..0000000 --- a/src/central/gui/nats.py +++ /dev/null @@ -1,46 +0,0 @@ -"""NATS connection for GUI.""" - -import logging - -import nats -from nats.js import JetStreamContext - -logger = logging.getLogger(__name__) - -_nc: nats.NATS | None = None -_js: JetStreamContext | None = None - - -async def init_nats(url: str) -> JetStreamContext | None: - """Initialize the NATS connection and JetStream context.""" - global _nc, _js - if _nc is None: - try: - _nc = await nats.connect(url) - _js = _nc.jetstream() - logger.info("Connected to NATS", extra={"url": url}) - except Exception as e: - logger.warning("Failed to connect to NATS", extra={"error": str(e)}) - _nc = None - _js = None - return _js - - -def get_js() -> JetStreamContext | None: - """Get the JetStream context. Returns None if not connected.""" - return _js - - -async def close_nats() -> None: - """Close the NATS connection.""" - global _nc, _js - if _nc is not None: - try: - await _nc.drain() - await _nc.close() - logger.info("Disconnected from NATS") - except Exception as e: - logger.warning("Error closing NATS", extra={"error": str(e)}) - finally: - _nc = None - _js = None diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 755cb80..1b9dc24 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1,9 +1,5 @@ """Route handlers for Central GUI.""" -import json -import re -from typing import Any - from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi_csrf_protect import CsrfProtect @@ -16,7 +12,6 @@ from central.gui.auth import ( verify_password, ) from central.gui.audit import ( - ADAPTER_UPDATE, AUTH_LOGIN, AUTH_LOGIN_FAILED, AUTH_LOGOUT, @@ -28,24 +23,6 @@ from central.gui.db import get_pool router = APIRouter() -# Streams to display on dashboard -DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] - -# Email validation regex (simple but effective) -EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") - - -def _get_valid_satellites() -> list[str]: - """Get valid satellite identifiers from firms adapter.""" - from central.adapters.firms import SATELLITE_SHORT - return list(SATELLITE_SHORT.keys()) - - -def _get_valid_feeds() -> set[str]: - """Get valid feed values from usgs_quake adapter.""" - from central.adapters.usgs_quake import VALID_FEEDS - return VALID_FEEDS - def _get_templates(): """Get templates instance (deferred import to avoid circular).""" @@ -53,15 +30,6 @@ def _get_templates(): return templates -def _format_bytes(size: int) -> str: - """Format bytes as human-readable string.""" - for unit in ["B", "KB", "MB", "GB", "TB"]: - if size < 1024: - return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}" - size /= 1024 - return f"{size:.1f} PB" - - def _set_session_cookie( response: Response, token: str, @@ -108,154 +76,6 @@ async def index(request: Request, csrf_protect: CsrfProtect = Depends()) -> HTML return response -@router.get("/dashboard/events", response_class=HTMLResponse) -async def dashboard_events(request: Request) -> HTMLResponse: - """Get events by adapter for the last 24 hours.""" - templates = _get_templates() - pool = get_pool() - - events = [] - error = None - - try: - async with pool.acquire() as conn: - rows = await conn.fetch( - """ - SELECT adapter, COUNT(*) as count - FROM events - WHERE received > NOW() - INTERVAL '24 hours' - GROUP BY adapter - ORDER BY count DESC - """ - ) - events = [{"adapter": row["adapter"], "count": row["count"]} for row in rows] - except Exception as e: - error = f"Database error: {str(e)}" - - return templates.TemplateResponse( - request=request, - name="_dashboard_events.html", - context={"events": events, "error": error}, - ) - - -@router.get("/dashboard/streams", response_class=HTMLResponse) -async def dashboard_streams(request: Request) -> HTMLResponse: - """Get stream sizes from NATS JetStream.""" - from central.gui.nats import get_js - - templates = _get_templates() - js = get_js() - - streams = None - error = None - - if js is None: - error = "NATS unavailable" - else: - streams = [] - for stream_name in DASHBOARD_STREAMS: - try: - info = await js.stream_info(stream_name) - streams.append({ - "name": stream_name, - "messages": info.state.messages, - "size": _format_bytes(info.state.bytes), - "error": None, - }) - except Exception: - streams.append({ - "name": stream_name, - "messages": 0, - "size": "0 B", - "error": "unavailable", - }) - - return templates.TemplateResponse( - request=request, - name="_dashboard_streams.html", - context={"streams": streams, "error": error}, - ) - - -@router.get("/dashboard/polls", response_class=HTMLResponse) -async def dashboard_polls(request: Request) -> HTMLResponse: - """Get last poll times for each adapter.""" - from central.gui.nats import get_js - - templates = _get_templates() - pool = get_pool() - js = get_js() - - adapters = [] - error = None - - try: - async with pool.acquire() as conn: - rows = await conn.fetch( - "SELECT name FROM config.adapters ORDER BY name" - ) - adapter_names = [row["name"] for row in rows] - except Exception as e: - error = f"Database error: {str(e)}" - return templates.TemplateResponse( - request=request, - name="_dashboard_polls.html", - context={"adapters": [], "error": error}, - ) - - if js is None: - error = "NATS unavailable" - adapters = [{"name": name, "last_poll": None, "status": None, "error": "NATS unavailable"} for name in adapter_names] - else: - for name in adapter_names: - try: - # Get last message from CENTRAL_META for this adapter - sub = await js.pull_subscribe( - f"central.meta.{name}.status", - durable=f"dashboard-poll-{name}", - stream="CENTRAL_META", - ) - try: - msgs = await sub.fetch(1, timeout=1.0) - if msgs: - data = json.loads(msgs[0].data.decode()) - last_poll = data.get("data", {}).get("time", "—") - adapters.append({ - "name": name, - "last_poll": last_poll, - "status": "✓", - "error": None, - }) - else: - adapters.append({ - "name": name, - "last_poll": None, - "status": None, - "error": None, - }) - except Exception: - adapters.append({ - "name": name, - "last_poll": None, - "status": None, - "error": None, - }) - except Exception: - adapters.append({ - "name": name, - "last_poll": None, - "status": None, - "error": "unavailable", - }) - - return templates.TemplateResponse( - request=request, - name="_dashboard_polls.html", - context={"adapters": adapters, "error": error}, - ) - - @router.get("/setup", response_class=HTMLResponse) async def setup_form( request: Request, @@ -550,285 +370,3 @@ async def change_password_submit( # Redirect to index return RedirectResponse(url="/", status_code=302) - - -# ============================================================================= -# Adapters routes -# ============================================================================= - - -@router.get("/adapters", response_class=HTMLResponse) -async def adapters_list( - request: Request, - csrf_protect: CsrfProtect = Depends(), -) -> HTMLResponse: - """List all adapters.""" - templates = _get_templates() - pool = get_pool() - operator = request.state.operator - - async with pool.acquire() as conn: - rows = await conn.fetch( - """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at - FROM config.adapters - ORDER BY name - """ - ) - - adapters = [] - for row in rows: - settings = row["settings"] or {} - adapters.append({ - "name": row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - }) - - csrf_token, signed_token = csrf_protect.generate_csrf_tokens() - response = templates.TemplateResponse( - request=request, - name="adapters_list.html", - context={ - "operator": operator, - "csrf_token": csrf_token, - "adapters": adapters, - }, - ) - csrf_protect.set_csrf_cookie(signed_token, response) - return response - - -@router.get("/adapters/{name}", response_class=HTMLResponse) -async def adapters_edit_form( - request: Request, - name: str, - csrf_protect: CsrfProtect = Depends(), -) -> Response: - """Render the adapter edit form.""" - templates = _get_templates() - pool = get_pool() - operator = request.state.operator - - async with pool.acquire() as conn: - row = await conn.fetchrow( - """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at - FROM config.adapters - WHERE name = $1 - """, - name, - ) - - if row is None: - return Response(status_code=404, content="Adapter not found") - - # Get API keys for firms dropdown - api_keys = await conn.fetch( - "SELECT alias FROM config.api_keys ORDER BY alias" - ) - - settings = row["settings"] or {} - adapter = { - "name": row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - } - - csrf_token, signed_token = csrf_protect.generate_csrf_tokens() - response = templates.TemplateResponse( - request=request, - name="adapters_edit.html", - context={ - "operator": operator, - "csrf_token": csrf_token, - "adapter": adapter, - "errors": None, - "form_data": None, - "api_keys": [{"alias": k["alias"]} for k in api_keys], - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), - }, - ) - csrf_protect.set_csrf_cookie(signed_token, response) - return response - - -@router.post("/adapters/{name}") -async def adapters_edit_submit( - request: Request, - name: str, - csrf_protect: CsrfProtect = Depends(), -) -> Response: - """Process the adapter edit form.""" - templates = _get_templates() - pool = get_pool() - operator = request.state.operator - - # Validate CSRF - await csrf_protect.validate_csrf(request) - - # Parse form data - form = await request.form() - enabled = "enabled" in form - cadence_s_str = form.get("cadence_s", "") - - # Build form_data for re-render on error - form_data: dict[str, Any] = { - "enabled": enabled, - "cadence_s": cadence_s_str, - } - - errors: dict[str, str] = {} - - # Validate cadence_s - try: - cadence_s = int(cadence_s_str) - if cadence_s < 60 or cadence_s > 3600: - errors["cadence_s"] = "Cadence must be between 60 and 3600 seconds" - except ValueError: - errors["cadence_s"] = "Cadence must be a valid integer" - cadence_s = 0 - - async with pool.acquire() as conn: - # Get current adapter state - row = await conn.fetchrow( - """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at - FROM config.adapters - WHERE name = $1 - """, - name, - ) - - if row is None: - return Response(status_code=404, content="Adapter not found") - - current_settings = row["settings"] or {} - new_settings = dict(current_settings) - - # Adapter-specific validation and settings update - if name == "nws": - contact_email = form.get("contact_email", "").strip() - form_data["contact_email"] = contact_email - if not contact_email: - errors["contact_email"] = "Contact email is required" - elif not EMAIL_REGEX.match(contact_email): - errors["contact_email"] = "Invalid email format" - else: - new_settings["contact_email"] = contact_email - - elif name == "firms": - api_key_alias = form.get("api_key_alias", "").strip() - satellites = form.getlist("satellites") - form_data["api_key_alias"] = api_key_alias - form_data["satellites"] = satellites - - # Validate api_key_alias if set - if api_key_alias: - key_exists = await conn.fetchrow( - "SELECT 1 FROM config.api_keys WHERE alias = $1", - api_key_alias, - ) - if not key_exists: - errors["api_key_alias"] = f"API key alias '{api_key_alias}' does not exist" - else: - new_settings["api_key_alias"] = api_key_alias - else: - new_settings["api_key_alias"] = None - - # Validate satellites - valid_sats = set(_get_valid_satellites()) - invalid_sats = [s for s in satellites if s not in valid_sats] - if invalid_sats: - errors["satellites"] = f"Invalid satellites: {', '.join(invalid_sats)}" - else: - new_settings["satellites"] = satellites - - elif name == "usgs_quake": - feed = form.get("feed", "").strip() - form_data["feed"] = feed - valid_feeds = _get_valid_feeds() - if feed not in valid_feeds: - errors["feed"] = f"Invalid feed. Must be one of: {', '.join(sorted(valid_feeds))}" - else: - new_settings["feed"] = feed - - # If there are errors, re-render the form - if errors: - adapter = { - "name": row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": current_settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - } - - api_keys = await conn.fetch( - "SELECT alias FROM config.api_keys ORDER BY alias" - ) - - csrf_token, signed_token = csrf_protect.generate_csrf_tokens() - response = templates.TemplateResponse( - request=request, - name="adapters_edit.html", - context={ - "operator": operator, - "csrf_token": csrf_token, - "adapter": adapter, - "errors": errors, - "form_data": form_data, - "api_keys": [{"alias": k["alias"]} for k in api_keys], - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), - }, - status_code=200, - ) - csrf_protect.set_csrf_cookie(signed_token, response) - return response - - # Build before state for audit - before = { - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": current_settings, - } - - # Build after state for audit - after = { - "enabled": enabled, - "cadence_s": cadence_s, - "settings": new_settings, - } - - # Update the adapter - await conn.execute( - """ - UPDATE config.adapters - SET enabled = $1, cadence_s = $2, settings = $3, updated_at = now() - WHERE name = $4 - """, - enabled, - cadence_s, - new_settings, - name, - ) - - # Write audit log - await write_audit( - conn, - ADAPTER_UPDATE, - operator_id=operator.id, - target=name, - before=before, - after=after, - ) - - return RedirectResponse(url="/adapters", status_code=302) diff --git a/src/central/gui/templates/_dashboard_events.html b/src/central/gui/templates/_dashboard_events.html deleted file mode 100644 index 690b6ce..0000000 --- a/src/central/gui/templates/_dashboard_events.html +++ /dev/null @@ -1,22 +0,0 @@ -{% if error %} -

{{ error }}

-{% elif events %} - - - - - - - - - {% for event in events %} - - - - - {% endfor %} - -
AdapterCount
{{ event.adapter }}{{ event.count }}
-{% else %} -

No events in the last 24 hours.

-{% endif %} diff --git a/src/central/gui/templates/_dashboard_polls.html b/src/central/gui/templates/_dashboard_polls.html deleted file mode 100644 index c80e98b..0000000 --- a/src/central/gui/templates/_dashboard_polls.html +++ /dev/null @@ -1,31 +0,0 @@ -{% if error %} -

{{ error }}

-{% elif adapters %} - - - - - - - - - - {% for adapter in adapters %} - - - {% if adapter.error %} - - {% elif adapter.last_poll %} - - - {% else %} - - - {% endif %} - - {% endfor %} - -
AdapterLast PollStatus
{{ adapter.name }}{{ adapter.error }}{{ adapter.last_poll }}{{ adapter.status }}
-{% else %} -

No adapter data available.

-{% endif %} diff --git a/src/central/gui/templates/_dashboard_streams.html b/src/central/gui/templates/_dashboard_streams.html deleted file mode 100644 index 246cbca..0000000 --- a/src/central/gui/templates/_dashboard_streams.html +++ /dev/null @@ -1,28 +0,0 @@ -{% if error %} -

{{ error }}

-{% elif streams %} - - - - - - - - - - {% for stream in streams %} - - - {% if stream.error %} - - {% else %} - - - {% endif %} - - {% endfor %} - -
StreamMessagesSize
{{ stream.name }}{{ stream.error }}{{ stream.messages }}{{ stream.size }}
-{% else %} -

No stream data available.

-{% endif %} diff --git a/src/central/gui/templates/adapters_edit.html b/src/central/gui/templates/adapters_edit.html deleted file mode 100644 index fe7e093..0000000 --- a/src/central/gui/templates/adapters_edit.html +++ /dev/null @@ -1,49 +0,0 @@ -{% extends "base.html" %} - -{% block title %}Central — Edit {{ adapter.name }}{% endblock %} - -{% block content %} -

Edit Adapter: {{ adapter.name }}

- -
- - -
- Universal Settings - - - - - - {% if errors and errors.cadence_s %} - {{ errors.cadence_s }} - {% endif %} -
- -
- Adapter-Specific Settings - {% include "adapters_edit_" + adapter.name + ".html" %} -
- -
- Region (read-only) - {% if adapter.settings.region %} -

- North: {{ adapter.settings.region.north }}
- South: {{ adapter.settings.region.south }}
- East: {{ adapter.settings.region.east }}
- West: {{ adapter.settings.region.west }} -

- {% else %} -

No region configured.

- {% endif %} - Region editing comes in 1b-5. -
- - - Cancel -
-{% endblock %} diff --git a/src/central/gui/templates/adapters_edit_firms.html b/src/central/gui/templates/adapters_edit_firms.html deleted file mode 100644 index a2a339a..0000000 --- a/src/central/gui/templates/adapters_edit_firms.html +++ /dev/null @@ -1,21 +0,0 @@ - - -{% if errors and errors.api_key_alias %} -{{ errors.api_key_alias }} -{% endif %} - - -{% for sat in valid_satellites %} - -{% endfor %} -{% if errors and errors.satellites %} -{{ errors.satellites }} -{% endif %} diff --git a/src/central/gui/templates/adapters_edit_nws.html b/src/central/gui/templates/adapters_edit_nws.html deleted file mode 100644 index e655a41..0000000 --- a/src/central/gui/templates/adapters_edit_nws.html +++ /dev/null @@ -1,5 +0,0 @@ - - -{% if errors and errors.contact_email %} -{{ errors.contact_email }} -{% endif %} diff --git a/src/central/gui/templates/adapters_edit_usgs_quake.html b/src/central/gui/templates/adapters_edit_usgs_quake.html deleted file mode 100644 index 0c3b7ee..0000000 --- a/src/central/gui/templates/adapters_edit_usgs_quake.html +++ /dev/null @@ -1,9 +0,0 @@ - - -{% if errors and errors.feed %} -{{ errors.feed }} -{% endif %} diff --git a/src/central/gui/templates/adapters_list.html b/src/central/gui/templates/adapters_list.html deleted file mode 100644 index b97ae88..0000000 --- a/src/central/gui/templates/adapters_list.html +++ /dev/null @@ -1,29 +0,0 @@ -{% extends "base.html" %} - -{% block title %}Central — Adapters{% endblock %} - -{% block content %} -

Adapters

- - - - - - - - - - - - {% for adapter in adapters %} - - - - - - - - {% endfor %} - -
NameEnabledCadenceLast Updated
{{ adapter.name }}{% if adapter.enabled %}Yes{% else %}No{% endif %}{{ adapter.cadence_s }}s{{ adapter.updated_at.strftime('%Y-%m-%d %H:%M') if adapter.updated_at else '—' }}Edit
-{% endblock %} diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index 1d2e24b..631c542 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -15,8 +15,6 @@