diff --git a/sql/migrations/030_add_monitoring_area.sql b/sql/migrations/030_add_monitoring_area.sql new file mode 100644 index 0000000..e53457d --- /dev/null +++ b/sql/migrations/030_add_monitoring_area.sql @@ -0,0 +1,10 @@ +-- Migration 030: Add system-level monitoring-area bbox to config.system (v0.9.12) +-- Backs the archive-level safety-net filter: events whose geometry lies entirely +-- outside this bbox are dropped at INSERT time; null-geom events are always kept. +-- Default bounds = Idaho. Idempotent per docs/migrations.md. + +ALTER TABLE config.system + ADD COLUMN IF NOT EXISTS monitor_north DOUBLE PRECISION NOT NULL DEFAULT 44.5, + ADD COLUMN IF NOT EXISTS monitor_south DOUBLE PRECISION NOT NULL DEFAULT 41.8, + ADD COLUMN IF NOT EXISTS monitor_east DOUBLE PRECISION NOT NULL DEFAULT -111.0, + ADD COLUMN IF NOT EXISTS monitor_west DOUBLE PRECISION NOT NULL DEFAULT -117.5; diff --git a/src/central/archive.py b/src/central/archive.py index f26a104..f7765d0 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -9,11 +9,13 @@ import json import logging import signal import sys +from dataclasses import dataclass from datetime import datetime, timezone from typing import Any import asyncpg import nats +from shapely.geometry import box as _shapely_box, shape from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy from nats.js.errors import NotFoundError @@ -28,6 +30,9 @@ STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearin BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 ACK_WAIT = 30 +# How often the archive re-reads the monitoring-area bbox from config.system so +# GUI edits take effect without a service restart (~this many seconds latency). +MONITORING_AREA_REFRESH_S = 60 def consumer_name_for(stream: str) -> str: @@ -108,6 +113,45 @@ def _build_geom_sql(geo_data: dict[str, Any] | None) -> str | None: return None +@dataclass(frozen=True) +class MonitoringArea: + """System-level bounding box that events must intersect to be archived.""" + + north: float + south: float + east: float + west: float + + def as_box(self): + # shapely box(minx, miny, maxx, maxy) -> (west, south, east, north) + return _shapely_box(self.west, self.south, self.east, self.north) + + +def _classify_geom(geom_json: str | None, area: "MonitoringArea | None") -> str: + """Classify an event geometry against the monitoring area (archive bbox filter). + + Returns one of: + 'null-geom' -- no geometry; always archived (SWPC trio, .removed tombstones) + 'no-area' -- no monitoring area configured; archive everything + 'in-bounds' -- geometry intersects the area; archive + 'out-of-bounds' -- geometry lies entirely outside the area; drop + 'invalid-geom' -- geometry could not be evaluated; archived (fail-open) + warn + + Uses intersects() so border-straddlers are kept (matches PostGIS ST_Intersects). + The filter must never drop an event because of a parse failure: when in doubt, + keep it. + """ + if geom_json is None: + return "null-geom" + if area is None: + return "no-area" + try: + geom = shape(json.loads(geom_json)) + return "in-bounds" if geom.intersects(area.as_box()) else "out-of-bounds" + except Exception: + return "invalid-geom" + + class ArchiveConsumer: """Archive consumer process.""" @@ -118,6 +162,8 @@ class ArchiveConsumer: self._js: JetStreamContext | None = None self._pool: asyncpg.Pool | None = None self._shutdown_event = asyncio.Event() + self._monitoring_area: MonitoringArea | None = None + self._dropped: dict[str, int] = {} async def connect(self) -> None: """Connect to NATS and PostgreSQL.""" @@ -144,6 +190,49 @@ class ArchiveConsumer: self._js = None logger.info("Disconnected") + async def _load_monitoring_area(self) -> None: + """Load (or refresh) the system monitoring-area bbox from config.system. + + On any error keep the last-known value and warn -- the filter must never + block archiving because a config read failed.""" + if not self._pool: + return + try: + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT monitor_north, monitor_south, monitor_east, monitor_west " + "FROM config.system WHERE id = true" + ) + cols = ("monitor_north", "monitor_south", "monitor_east", "monitor_west") + if row and all(row[c] is not None for c in cols): + self._monitoring_area = MonitoringArea( + north=row["monitor_north"], south=row["monitor_south"], + east=row["monitor_east"], west=row["monitor_west"], + ) + else: + self._monitoring_area = None + except Exception as e: + logger.warning( + "Could not load monitoring area; keeping previous value", + extra={"error": str(e)}, + ) + + async def _refresh_monitoring_area_loop(self) -> None: + """Periodically refresh the monitoring area so GUI edits propagate without + a restart, and log a rolling summary of dropped (out-of-bounds) events.""" + while not self._shutdown_event.is_set(): + try: + await asyncio.wait_for( + self._shutdown_event.wait(), timeout=MONITORING_AREA_REFRESH_S + ) + except asyncio.TimeoutError: + await self._load_monitoring_area() + if self._dropped: + logger.info( + "bbox filter drop summary (cumulative)", + extra={"dropped_by_adapter": dict(self._dropped)}, + ) + async def _cleanup_orphaned_consumer(self) -> None: """Remove orphaned 'archive' consumer from CENTRAL_WX if it exists. @@ -234,6 +323,21 @@ class ArchiveConsumer: geom_json = _build_geom_sql(geo_data) + verdict = _classify_geom(geom_json, self._monitoring_area) + if verdict == "out-of-bounds": + self._dropped[adapter] = self._dropped.get(adapter, 0) + 1 + logger.debug( + "Dropped out-of-bounds event (archive bbox filter)", + extra={"id": event_id, "adapter": adapter, "category": category}, + ) + await msg.ack() + return + if verdict == "invalid-geom": + logger.warning( + "Geom could not be evaluated for bbox filter; archiving", + extra={"id": event_id, "adapter": adapter}, + ) + try: if geom_json: await conn.execute( @@ -335,11 +439,25 @@ class ArchiveConsumer: """Start the consumer.""" await self.connect() await self._cleanup_orphaned_consumer() - logger.info("Archive consumer ready") + await self._load_monitoring_area() + area = self._monitoring_area + logger.info( + "Archive consumer ready", + extra={"monitoring_area": ( + {"north": area.north, "south": area.south, + "east": area.east, "west": area.west} if area else None + )}, + ) async def run(self) -> None: """Run consume loops for all streams until shutdown.""" tasks = [] + tasks.append( + asyncio.create_task( + self._refresh_monitoring_area_loop(), + name="refresh-monitoring-area", + ) + ) for stream_name, subject_filter in STREAMS: consumer_name = consumer_name_for(stream_name) task = asyncio.create_task( diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index c8e5bd7..b5b5da9 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2267,6 +2267,134 @@ async def enrichment_update(request: Request) -> Response: return RedirectResponse(url="/enrichment", status_code=302) +# --- Monitoring area (system-level archive bbox filter) -------------------- + +_DEFAULT_MONITOR = {"north": 44.5, "south": 41.8, "east": -111.0, "west": -117.5} + + +async def _read_monitoring_area(conn) -> dict[str, Any]: + """Read the monitoring-area bbox + map tile settings from config.system.""" + row = await conn.fetchrow( + "SELECT monitor_north, monitor_south, monitor_east, monitor_west, " + "map_tile_url, map_attribution FROM config.system WHERE id = true" + ) + if row is None: + return { + **_DEFAULT_MONITOR, + "tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "tile_attribution": "© OpenStreetMap contributors", + } + return { + "north": row["monitor_north"], "south": row["monitor_south"], + "east": row["monitor_east"], "west": row["monitor_west"], + "tile_url": row["map_tile_url"], + "tile_attribution": row["map_attribution"], + } + + +@router.get("/monitoring-area", response_class=HTMLResponse) +async def monitoring_area_form(request: Request) -> HTMLResponse: + """Render the system monitoring-area editor (one draggable Leaflet rectangle). + + Events whose geometry falls entirely outside this box are dropped by the + archive-level bbox filter; null-geom events are always kept.""" + templates = _get_templates() + pool = get_pool() + async with pool.acquire() as conn: + area = await _read_monitoring_area(conn) + return templates.TemplateResponse( + request=request, + name="monitoring_area.html", + context={ + "operator": getattr(request.state, "operator", None), + "csrf_token": request.state.csrf_token, + "area": area, + "tile_url": area["tile_url"], + "tile_attribution": area["tile_attribution"], + }, + ) + + +@router.post("/monitoring-area") +async def monitoring_area_update(request: Request) -> Response: + """Validate + persist the monitoring-area bbox. The archive applies the new + bounds within ~60s via its background refresh (no restart needed).""" + templates = _get_templates() + pool = get_pool() + + form = await request.form() + if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token: + raise CsrfValidationError("Invalid CSRF token") + + errors: dict[str, str] = {} + vals: dict[str, float] = {} + for key, lo, hi in ( + ("north", -90.0, 90.0), ("south", -90.0, 90.0), + ("east", -180.0, 180.0), ("west", -180.0, 180.0), + ): + raw = form.get(f"monitor_{key}", "") + try: + v = float(raw) + except (TypeError, ValueError): + errors[key] = f"{key.title()} must be a number" + continue + if not (lo <= v <= hi): + errors[key] = f"{key.title()} must be between {lo:g} and {hi:g}" + else: + vals[key] = v + + if not errors: + if vals["north"] <= vals["south"]: + errors["north"] = "North must be greater than South" + if vals["east"] <= vals["west"]: + errors["east"] = "East must be greater than West" + + if errors: + async with pool.acquire() as conn: + saved = await _read_monitoring_area(conn) + render_area = { + "north": form.get("monitor_north") or saved["north"], + "south": form.get("monitor_south") or saved["south"], + "east": form.get("monitor_east") or saved["east"], + "west": form.get("monitor_west") or saved["west"], + } + return templates.TemplateResponse( + request=request, + name="monitoring_area.html", + context={ + "operator": getattr(request.state, "operator", None), + "csrf_token": request.state.csrf_token, + "area": render_area, + "tile_url": saved["tile_url"], + "tile_attribution": saved["tile_attribution"], + "errors": errors, + }, + status_code=200, + ) + + async with pool.acquire() as conn: + old = await conn.fetchrow( + "SELECT monitor_north, monitor_south, monitor_east, monitor_west " + "FROM config.system WHERE id = true" + ) + await conn.execute( + "UPDATE config.system SET monitor_north=$1, monitor_south=$2, " + "monitor_east=$3, monitor_west=$4 WHERE id = true", + vals["north"], vals["south"], vals["east"], vals["west"], + ) + operator = getattr(request.state, "operator", None) + await write_audit( + conn, SYSTEM_UPDATE, + operator_id=operator.id if operator else None, + target="monitoring_area", + before=dict(old) if old else None, + after={"monitor_north": vals["north"], "monitor_south": vals["south"], + "monitor_east": vals["east"], "monitor_west": vals["west"]}, + ) + + return RedirectResponse(url="/monitoring-area", status_code=302) + + # Alias validation regex ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$') diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index c2b829b..5732a68 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -19,6 +19,7 @@ Telemetry Streams Enrichment + Monitoring Area API Keys · {{ operator.username }} diff --git a/src/central/gui/templates/monitoring_area.html b/src/central/gui/templates/monitoring_area.html new file mode 100644 index 0000000..15c0b21 --- /dev/null +++ b/src/central/gui/templates/monitoring_area.html @@ -0,0 +1,145 @@ +{% extends "base.html" %} + +{% block title %}Central — Monitoring Area{% endblock %} + +{% block head %} + + + + +{% endblock %} + +{% block content %} +

Monitoring Area

+

+ Events whose geometry falls entirely outside this box are dropped by the + archive before they reach the events table. Events with no geometry (e.g. + space-weather alerts, removal tombstones) are always kept. Changes apply + within about a minute — no restart required. +

+ +
+ + +
+ +
+ +
+
+ + + {% if errors and errors.north %}{{ errors.north }}{% endif %} +
+
+ + + {% if errors and errors.south %}{{ errors.south }}{% endif %} +
+
+ + + {% if errors and errors.east %}{{ errors.east }}{% endif %} +
+
+ + + {% if errors and errors.west %}{{ errors.west }}{% endif %} +
+
+ + +
+ +
+ +
+
+ + +{% endblock %} diff --git a/tests/test_archive_bbox_filter.py b/tests/test_archive_bbox_filter.py new file mode 100644 index 0000000..14908cd --- /dev/null +++ b/tests/test_archive_bbox_filter.py @@ -0,0 +1,115 @@ +"""Archive-level monitoring-area bbox filter (v0.9.12). + +Events whose geometry falls entirely outside the system monitoring area are +dropped at archive INSERT time; null-geom events and border-straddlers are kept. +The filter is fail-open: an unparseable geometry is archived (with a warning), +never dropped. +""" +import json + +import pytest +from unittest.mock import AsyncMock, MagicMock + +from central.archive import ArchiveConsumer, MonitoringArea, _classify_geom + +IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5) + + +def _pt(lon, lat): + return json.dumps({"type": "Point", "coordinates": [lon, lat]}) + + +class TestClassifyGeom: + def test_null_geom_always_kept(self): + assert _classify_geom(None, IDAHO) == "null-geom" + + def test_no_area_keeps_everything(self): + assert _classify_geom(_pt(-114.0, 43.5), None) == "no-area" + + def test_in_bounds_kept(self): + assert _classify_geom(_pt(-114.0, 43.5), IDAHO) == "in-bounds" + + def test_out_of_bounds_dropped(self): + assert _classify_geom(_pt(-74.0, 40.7), IDAHO) == "out-of-bounds" + + def test_border_straddling_polygon_kept(self): + # Spans the western edge (west=-117.5): partly out, partly in -> kept. + poly = json.dumps({ + "type": "Polygon", + "coordinates": [[[-119, 42], [-116, 42], [-116, 43], [-119, 43], [-119, 42]]], + }) + assert _classify_geom(poly, IDAHO) == "in-bounds" + + def test_point_exactly_on_border_kept(self): + assert _classify_geom(_pt(-117.5, 43.0), IDAHO) == "in-bounds" + + def test_unparseable_geom_kept(self): + assert _classify_geom("{not valid json", IDAHO) == "invalid-geom" + + def test_unknown_geom_type_kept(self): + assert _classify_geom(json.dumps({"type": "Nonsense"}), IDAHO) == "invalid-geom" + + +def _make_msg(envelope): + msg = MagicMock() + msg.data = json.dumps(envelope).encode() + msg.ack = AsyncMock() + return msg + + +def _envelope(adapter, lon, lat): + return { + "id": f"{adapter}:evt1", + "data": { + "adapter": adapter, + "category": "test", + "time": "2026-05-26T00:00:00Z", + "geo": {"centroid": [lon, lat]}, + }, + } + + +class TestProcessMessageFilter: + @pytest.mark.asyncio + async def test_out_of_bounds_dropped_and_counted(self): + c = ArchiveConsumer("nats://x", "postgresql://x") + c._monitoring_area = IDAHO + conn = AsyncMock() + msg = _make_msg(_envelope("wzdx", -74.0, 40.7)) + await c._process_message(msg, conn) + conn.execute.assert_not_called() + msg.ack.assert_awaited_once() + assert c._dropped == {"wzdx": 1} + + @pytest.mark.asyncio + async def test_in_bounds_inserted(self): + c = ArchiveConsumer("nats://x", "postgresql://x") + c._monitoring_area = IDAHO + conn = AsyncMock() + msg = _make_msg(_envelope("nws", -114.0, 43.5)) + await c._process_message(msg, conn) + conn.execute.assert_awaited_once() + msg.ack.assert_awaited_once() + assert c._dropped == {} + + @pytest.mark.asyncio + async def test_null_geom_inserted(self): + c = ArchiveConsumer("nats://x", "postgresql://x") + c._monitoring_area = IDAHO + conn = AsyncMock() + env = {"id": "swpc:1", "data": { + "adapter": "swpc_alerts", "category": "space", + "time": "2026-05-26T00:00:00Z"}} + await c._process_message(_make_msg(env), conn) + conn.execute.assert_awaited_once() + assert c._dropped == {} + + @pytest.mark.asyncio + async def test_no_area_keeps_out_of_bounds(self): + c = ArchiveConsumer("nats://x", "postgresql://x") + c._monitoring_area = None + conn = AsyncMock() + msg = _make_msg(_envelope("wzdx", -74.0, 40.7)) + await c._process_message(msg, conn) + conn.execute.assert_awaited_once() + assert c._dropped == {}