Merge pull request #73 from zvx-echo6/v0_9_12_archive_bbox_filter

v0.9.12: archive-level monitoring-area bbox filter
This commit is contained in:
malice 2026-05-26 17:40:59 -06:00 committed by GitHub
commit 3ff6f1ebc0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 518 additions and 1 deletions

View file

@ -0,0 +1,10 @@
-- Migration 030: Add system-level monitoring-area bbox to config.system (v0.9.12)
-- Backs the archive-level safety-net filter: events whose geometry lies entirely
-- outside this bbox are dropped at INSERT time; null-geom events are always kept.
-- Default bounds = Idaho. Idempotent per docs/migrations.md.
ALTER TABLE config.system
ADD COLUMN IF NOT EXISTS monitor_north DOUBLE PRECISION NOT NULL DEFAULT 44.5,
ADD COLUMN IF NOT EXISTS monitor_south DOUBLE PRECISION NOT NULL DEFAULT 41.8,
ADD COLUMN IF NOT EXISTS monitor_east DOUBLE PRECISION NOT NULL DEFAULT -111.0,
ADD COLUMN IF NOT EXISTS monitor_west DOUBLE PRECISION NOT NULL DEFAULT -117.5;

View file

@ -9,11 +9,13 @@ import json
import logging import logging
import signal import signal
import sys import sys
from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any from typing import Any
import asyncpg import asyncpg
import nats import nats
from shapely.geometry import box as _shapely_box, shape
from nats.js import JetStreamContext from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
from nats.js.errors import NotFoundError from nats.js.errors import NotFoundError
@ -28,6 +30,9 @@ STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearin
BATCH_SIZE = 100 BATCH_SIZE = 100
FETCH_TIMEOUT = 5.0 FETCH_TIMEOUT = 5.0
ACK_WAIT = 30 ACK_WAIT = 30
# How often the archive re-reads the monitoring-area bbox from config.system so
# GUI edits take effect without a service restart (~this many seconds latency).
MONITORING_AREA_REFRESH_S = 60
def consumer_name_for(stream: str) -> str: def consumer_name_for(stream: str) -> str:
@ -108,6 +113,45 @@ def _build_geom_sql(geo_data: dict[str, Any] | None) -> str | None:
return None return None
@dataclass(frozen=True)
class MonitoringArea:
"""System-level bounding box that events must intersect to be archived."""
north: float
south: float
east: float
west: float
def as_box(self):
# shapely box(minx, miny, maxx, maxy) -> (west, south, east, north)
return _shapely_box(self.west, self.south, self.east, self.north)
def _classify_geom(geom_json: str | None, area: "MonitoringArea | None") -> str:
"""Classify an event geometry against the monitoring area (archive bbox filter).
Returns one of:
'null-geom' -- no geometry; always archived (SWPC trio, .removed tombstones)
'no-area' -- no monitoring area configured; archive everything
'in-bounds' -- geometry intersects the area; archive
'out-of-bounds' -- geometry lies entirely outside the area; drop
'invalid-geom' -- geometry could not be evaluated; archived (fail-open) + warn
Uses intersects() so border-straddlers are kept (matches PostGIS ST_Intersects).
The filter must never drop an event because of a parse failure: when in doubt,
keep it.
"""
if geom_json is None:
return "null-geom"
if area is None:
return "no-area"
try:
geom = shape(json.loads(geom_json))
return "in-bounds" if geom.intersects(area.as_box()) else "out-of-bounds"
except Exception:
return "invalid-geom"
class ArchiveConsumer: class ArchiveConsumer:
"""Archive consumer process.""" """Archive consumer process."""
@ -118,6 +162,8 @@ class ArchiveConsumer:
self._js: JetStreamContext | None = None self._js: JetStreamContext | None = None
self._pool: asyncpg.Pool | None = None self._pool: asyncpg.Pool | None = None
self._shutdown_event = asyncio.Event() self._shutdown_event = asyncio.Event()
self._monitoring_area: MonitoringArea | None = None
self._dropped: dict[str, int] = {}
async def connect(self) -> None: async def connect(self) -> None:
"""Connect to NATS and PostgreSQL.""" """Connect to NATS and PostgreSQL."""
@ -144,6 +190,49 @@ class ArchiveConsumer:
self._js = None self._js = None
logger.info("Disconnected") logger.info("Disconnected")
async def _load_monitoring_area(self) -> None:
"""Load (or refresh) the system monitoring-area bbox from config.system.
On any error keep the last-known value and warn -- the filter must never
block archiving because a config read failed."""
if not self._pool:
return
try:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT monitor_north, monitor_south, monitor_east, monitor_west "
"FROM config.system WHERE id = true"
)
cols = ("monitor_north", "monitor_south", "monitor_east", "monitor_west")
if row and all(row[c] is not None for c in cols):
self._monitoring_area = MonitoringArea(
north=row["monitor_north"], south=row["monitor_south"],
east=row["monitor_east"], west=row["monitor_west"],
)
else:
self._monitoring_area = None
except Exception as e:
logger.warning(
"Could not load monitoring area; keeping previous value",
extra={"error": str(e)},
)
async def _refresh_monitoring_area_loop(self) -> None:
"""Periodically refresh the monitoring area so GUI edits propagate without
a restart, and log a rolling summary of dropped (out-of-bounds) events."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(), timeout=MONITORING_AREA_REFRESH_S
)
except asyncio.TimeoutError:
await self._load_monitoring_area()
if self._dropped:
logger.info(
"bbox filter drop summary (cumulative)",
extra={"dropped_by_adapter": dict(self._dropped)},
)
async def _cleanup_orphaned_consumer(self) -> None: async def _cleanup_orphaned_consumer(self) -> None:
"""Remove orphaned 'archive' consumer from CENTRAL_WX if it exists. """Remove orphaned 'archive' consumer from CENTRAL_WX if it exists.
@ -234,6 +323,21 @@ class ArchiveConsumer:
geom_json = _build_geom_sql(geo_data) geom_json = _build_geom_sql(geo_data)
verdict = _classify_geom(geom_json, self._monitoring_area)
if verdict == "out-of-bounds":
self._dropped[adapter] = self._dropped.get(adapter, 0) + 1
logger.debug(
"Dropped out-of-bounds event (archive bbox filter)",
extra={"id": event_id, "adapter": adapter, "category": category},
)
await msg.ack()
return
if verdict == "invalid-geom":
logger.warning(
"Geom could not be evaluated for bbox filter; archiving",
extra={"id": event_id, "adapter": adapter},
)
try: try:
if geom_json: if geom_json:
await conn.execute( await conn.execute(
@ -335,11 +439,25 @@ class ArchiveConsumer:
"""Start the consumer.""" """Start the consumer."""
await self.connect() await self.connect()
await self._cleanup_orphaned_consumer() await self._cleanup_orphaned_consumer()
logger.info("Archive consumer ready") await self._load_monitoring_area()
area = self._monitoring_area
logger.info(
"Archive consumer ready",
extra={"monitoring_area": (
{"north": area.north, "south": area.south,
"east": area.east, "west": area.west} if area else None
)},
)
async def run(self) -> None: async def run(self) -> None:
"""Run consume loops for all streams until shutdown.""" """Run consume loops for all streams until shutdown."""
tasks = [] tasks = []
tasks.append(
asyncio.create_task(
self._refresh_monitoring_area_loop(),
name="refresh-monitoring-area",
)
)
for stream_name, subject_filter in STREAMS: for stream_name, subject_filter in STREAMS:
consumer_name = consumer_name_for(stream_name) consumer_name = consumer_name_for(stream_name)
task = asyncio.create_task( task = asyncio.create_task(

View file

@ -2267,6 +2267,134 @@ async def enrichment_update(request: Request) -> Response:
return RedirectResponse(url="/enrichment", status_code=302) return RedirectResponse(url="/enrichment", status_code=302)
# --- Monitoring area (system-level archive bbox filter) --------------------
_DEFAULT_MONITOR = {"north": 44.5, "south": 41.8, "east": -111.0, "west": -117.5}
async def _read_monitoring_area(conn) -> dict[str, Any]:
"""Read the monitoring-area bbox + map tile settings from config.system."""
row = await conn.fetchrow(
"SELECT monitor_north, monitor_south, monitor_east, monitor_west, "
"map_tile_url, map_attribution FROM config.system WHERE id = true"
)
if row is None:
return {
**_DEFAULT_MONITOR,
"tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"tile_attribution": "© OpenStreetMap contributors",
}
return {
"north": row["monitor_north"], "south": row["monitor_south"],
"east": row["monitor_east"], "west": row["monitor_west"],
"tile_url": row["map_tile_url"],
"tile_attribution": row["map_attribution"],
}
@router.get("/monitoring-area", response_class=HTMLResponse)
async def monitoring_area_form(request: Request) -> HTMLResponse:
"""Render the system monitoring-area editor (one draggable Leaflet rectangle).
Events whose geometry falls entirely outside this box are dropped by the
archive-level bbox filter; null-geom events are always kept."""
templates = _get_templates()
pool = get_pool()
async with pool.acquire() as conn:
area = await _read_monitoring_area(conn)
return templates.TemplateResponse(
request=request,
name="monitoring_area.html",
context={
"operator": getattr(request.state, "operator", None),
"csrf_token": request.state.csrf_token,
"area": area,
"tile_url": area["tile_url"],
"tile_attribution": area["tile_attribution"],
},
)
@router.post("/monitoring-area")
async def monitoring_area_update(request: Request) -> Response:
"""Validate + persist the monitoring-area bbox. The archive applies the new
bounds within ~60s via its background refresh (no restart needed)."""
templates = _get_templates()
pool = get_pool()
form = await request.form()
if not form.get("csrf_token") or form.get("csrf_token") != request.state.csrf_token:
raise CsrfValidationError("Invalid CSRF token")
errors: dict[str, str] = {}
vals: dict[str, float] = {}
for key, lo, hi in (
("north", -90.0, 90.0), ("south", -90.0, 90.0),
("east", -180.0, 180.0), ("west", -180.0, 180.0),
):
raw = form.get(f"monitor_{key}", "")
try:
v = float(raw)
except (TypeError, ValueError):
errors[key] = f"{key.title()} must be a number"
continue
if not (lo <= v <= hi):
errors[key] = f"{key.title()} must be between {lo:g} and {hi:g}"
else:
vals[key] = v
if not errors:
if vals["north"] <= vals["south"]:
errors["north"] = "North must be greater than South"
if vals["east"] <= vals["west"]:
errors["east"] = "East must be greater than West"
if errors:
async with pool.acquire() as conn:
saved = await _read_monitoring_area(conn)
render_area = {
"north": form.get("monitor_north") or saved["north"],
"south": form.get("monitor_south") or saved["south"],
"east": form.get("monitor_east") or saved["east"],
"west": form.get("monitor_west") or saved["west"],
}
return templates.TemplateResponse(
request=request,
name="monitoring_area.html",
context={
"operator": getattr(request.state, "operator", None),
"csrf_token": request.state.csrf_token,
"area": render_area,
"tile_url": saved["tile_url"],
"tile_attribution": saved["tile_attribution"],
"errors": errors,
},
status_code=200,
)
async with pool.acquire() as conn:
old = await conn.fetchrow(
"SELECT monitor_north, monitor_south, monitor_east, monitor_west "
"FROM config.system WHERE id = true"
)
await conn.execute(
"UPDATE config.system SET monitor_north=$1, monitor_south=$2, "
"monitor_east=$3, monitor_west=$4 WHERE id = true",
vals["north"], vals["south"], vals["east"], vals["west"],
)
operator = getattr(request.state, "operator", None)
await write_audit(
conn, SYSTEM_UPDATE,
operator_id=operator.id if operator else None,
target="monitoring_area",
before=dict(old) if old else None,
after={"monitor_north": vals["north"], "monitor_south": vals["south"],
"monitor_east": vals["east"], "monitor_west": vals["west"]},
)
return RedirectResponse(url="/monitoring-area", status_code=302)
# Alias validation regex # Alias validation regex
ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$') ALIAS_REGEX = re.compile(r'^[a-zA-Z0-9_]+$')

View file

@ -19,6 +19,7 @@
<a href="/telemetry">Telemetry</a> <a href="/telemetry">Telemetry</a>
<a href="/streams">Streams</a> <a href="/streams">Streams</a>
<a href="/enrichment">Enrichment</a> <a href="/enrichment">Enrichment</a>
<a href="/monitoring-area">Monitoring Area</a>
<a href="/api-keys">API Keys</a> <a href="/api-keys">API Keys</a>
<span class="sep">·</span> <span class="sep">·</span>
<span class="who">{{ operator.username }}</span> <span class="who">{{ operator.username }}</span>

View file

@ -0,0 +1,145 @@
{% extends "base.html" %}
{% block title %}Central — Monitoring Area{% endblock %}
{% block head %}
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin="">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet.draw/1.0.4/leaflet.draw.css" integrity="sha512-gc3xjCmIy673V6MyOAZhIW93xhM9ei1I+gLbmFjUHIjocENRsLX/QUE1htk5q1XV2D/iie/VQ8DXI6Vu8bexvQ==" crossorigin="anonymous">
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js" integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo=" crossorigin=""></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/leaflet.draw/1.0.4/leaflet.draw.js" integrity="sha512-ozq8xQKq6urvuU6jNgkfqAmT7jKN2XumbrX1JiB3TnF7tI48DPI4Gy1GXKD/V3EExgAs1V+pRO7vwtS1LHg0Gw==" crossorigin="anonymous"></script>
{% endblock %}
{% block content %}
<h1>Monitoring Area</h1>
<p class="muted">
Events whose geometry falls entirely outside this box are dropped by the
archive before they reach the events table. Events with no geometry (e.g.
space-weather alerts, removal tombstones) are always kept. Changes apply
within about a minute — no restart required.
</p>
<form method="post" action="/monitoring-area">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<div id="region-picker-container"
data-north="{{ area.north }}"
data-south="{{ area.south }}"
data-east="{{ area.east }}"
data-west="{{ area.west }}"
data-tile-url="{{ tile_url }}"
data-tile-attr="{{ tile_attribution }}">
<div id="region-map" style="height: 420px; margin-bottom: 1rem; border: 1px solid var(--rule); border-radius: var(--radius);"></div>
<div class="cols">
<div>
<label for="monitor_north">North</label>
<input type="number" id="monitor_north" name="monitor_north" step="0.0001" min="-90" max="90" readonly value="{{ area.north }}">
{% if errors and errors.north %}<small class="field-error">{{ errors.north }}</small>{% endif %}
</div>
<div>
<label for="monitor_south">South</label>
<input type="number" id="monitor_south" name="monitor_south" step="0.0001" min="-90" max="90" readonly value="{{ area.south }}">
{% if errors and errors.south %}<small class="field-error">{{ errors.south }}</small>{% endif %}
</div>
<div>
<label for="monitor_east">East</label>
<input type="number" id="monitor_east" name="monitor_east" step="0.0001" min="-180" max="180" readonly value="{{ area.east }}">
{% if errors and errors.east %}<small class="field-error">{{ errors.east }}</small>{% endif %}
</div>
<div>
<label for="monitor_west">West</label>
<input type="number" id="monitor_west" name="monitor_west" step="0.0001" min="-180" max="180" readonly value="{{ area.west }}">
{% if errors and errors.west %}<small class="field-error">{{ errors.west }}</small>{% endif %}
</div>
</div>
<button type="button" id="region-reset-btn" class="btn-secondary" style="margin-top: 12px;">Reset to Saved</button>
</div>
<div style="margin-top: 1rem;">
<button type="submit" class="btn-primary">Save Monitoring Area</button>
</div>
</form>
<script>
(function() {
const container = document.getElementById('region-picker-container');
const savedNorth = parseFloat(container.dataset.north);
const savedSouth = parseFloat(container.dataset.south);
const savedEast = parseFloat(container.dataset.east);
const savedWest = parseFloat(container.dataset.west);
const tileUrl = container.dataset.tileUrl || 'https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png';
const tileAttr = container.dataset.tileAttr || '&copy; OpenStreetMap contributors';
const centerLat = (savedNorth + savedSouth) / 2;
const centerLng = (savedEast + savedWest) / 2;
const map = L.map('region-map').setView([centerLat, centerLng], 5);
L.tileLayer(tileUrl, { attribution: tileAttr, maxZoom: 18 }).addTo(map);
setTimeout(function() { map.invalidateSize(); }, 100);
const bounds = L.latLngBounds(
L.latLng(savedSouth, savedWest),
L.latLng(savedNorth, savedEast)
);
map.fitBounds(bounds.pad(0.1));
let rectangle = L.rectangle(bounds, { color: '#3388ff', weight: 2, fillOpacity: 0.2 });
const drawnItems = new L.FeatureGroup();
drawnItems.addLayer(rectangle);
map.addLayer(drawnItems);
const drawControl = new L.Control.Draw({
draw: {
rectangle: { shapeOptions: { color: '#3388ff', weight: 2, fillOpacity: 0.2 } },
polyline: false, polygon: false, circle: false, marker: false, circlemarker: false
},
edit: { featureGroup: drawnItems, edit: false, remove: false }
});
map.addControl(drawControl);
rectangle.editing.enable();
const northInput = document.getElementById('monitor_north');
const southInput = document.getElementById('monitor_south');
const eastInput = document.getElementById('monitor_east');
const westInput = document.getElementById('monitor_west');
function updateInputs() {
const b = rectangle.getBounds();
northInput.value = b.getNorth().toFixed(4);
southInput.value = b.getSouth().toFixed(4);
eastInput.value = b.getEast().toFixed(4);
westInput.value = b.getWest().toFixed(4);
}
rectangle.on('edit', updateInputs);
map.on(L.Draw.Event.CREATED, function(e) {
drawnItems.clearLayers();
rectangle = e.layer;
rectangle.setStyle({ color: '#3388ff', weight: 2, fillOpacity: 0.2 });
drawnItems.addLayer(rectangle);
rectangle.editing.enable();
rectangle.on('edit', updateInputs);
updateInputs();
});
document.getElementById('region-reset-btn').addEventListener('click', function() {
const originalBounds = L.latLngBounds(
L.latLng(savedSouth, savedWest),
L.latLng(savedNorth, savedEast)
);
drawnItems.clearLayers();
rectangle = L.rectangle(originalBounds, { color: '#3388ff', weight: 2, fillOpacity: 0.2 });
drawnItems.addLayer(rectangle);
rectangle.editing.enable();
rectangle.on('edit', updateInputs);
updateInputs();
});
updateInputs();
})();
</script>
{% endblock %}

View file

@ -0,0 +1,115 @@
"""Archive-level monitoring-area bbox filter (v0.9.12).
Events whose geometry falls entirely outside the system monitoring area are
dropped at archive INSERT time; null-geom events and border-straddlers are kept.
The filter is fail-open: an unparseable geometry is archived (with a warning),
never dropped.
"""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from central.archive import ArchiveConsumer, MonitoringArea, _classify_geom
IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5)
def _pt(lon, lat):
return json.dumps({"type": "Point", "coordinates": [lon, lat]})
class TestClassifyGeom:
def test_null_geom_always_kept(self):
assert _classify_geom(None, IDAHO) == "null-geom"
def test_no_area_keeps_everything(self):
assert _classify_geom(_pt(-114.0, 43.5), None) == "no-area"
def test_in_bounds_kept(self):
assert _classify_geom(_pt(-114.0, 43.5), IDAHO) == "in-bounds"
def test_out_of_bounds_dropped(self):
assert _classify_geom(_pt(-74.0, 40.7), IDAHO) == "out-of-bounds"
def test_border_straddling_polygon_kept(self):
# Spans the western edge (west=-117.5): partly out, partly in -> kept.
poly = json.dumps({
"type": "Polygon",
"coordinates": [[[-119, 42], [-116, 42], [-116, 43], [-119, 43], [-119, 42]]],
})
assert _classify_geom(poly, IDAHO) == "in-bounds"
def test_point_exactly_on_border_kept(self):
assert _classify_geom(_pt(-117.5, 43.0), IDAHO) == "in-bounds"
def test_unparseable_geom_kept(self):
assert _classify_geom("{not valid json", IDAHO) == "invalid-geom"
def test_unknown_geom_type_kept(self):
assert _classify_geom(json.dumps({"type": "Nonsense"}), IDAHO) == "invalid-geom"
def _make_msg(envelope):
msg = MagicMock()
msg.data = json.dumps(envelope).encode()
msg.ack = AsyncMock()
return msg
def _envelope(adapter, lon, lat):
return {
"id": f"{adapter}:evt1",
"data": {
"adapter": adapter,
"category": "test",
"time": "2026-05-26T00:00:00Z",
"geo": {"centroid": [lon, lat]},
},
}
class TestProcessMessageFilter:
@pytest.mark.asyncio
async def test_out_of_bounds_dropped_and_counted(self):
c = ArchiveConsumer("nats://x", "postgresql://x")
c._monitoring_area = IDAHO
conn = AsyncMock()
msg = _make_msg(_envelope("wzdx", -74.0, 40.7))
await c._process_message(msg, conn)
conn.execute.assert_not_called()
msg.ack.assert_awaited_once()
assert c._dropped == {"wzdx": 1}
@pytest.mark.asyncio
async def test_in_bounds_inserted(self):
c = ArchiveConsumer("nats://x", "postgresql://x")
c._monitoring_area = IDAHO
conn = AsyncMock()
msg = _make_msg(_envelope("nws", -114.0, 43.5))
await c._process_message(msg, conn)
conn.execute.assert_awaited_once()
msg.ack.assert_awaited_once()
assert c._dropped == {}
@pytest.mark.asyncio
async def test_null_geom_inserted(self):
c = ArchiveConsumer("nats://x", "postgresql://x")
c._monitoring_area = IDAHO
conn = AsyncMock()
env = {"id": "swpc:1", "data": {
"adapter": "swpc_alerts", "category": "space",
"time": "2026-05-26T00:00:00Z"}}
await c._process_message(_make_msg(env), conn)
conn.execute.assert_awaited_once()
assert c._dropped == {}
@pytest.mark.asyncio
async def test_no_area_keeps_out_of_bounds(self):
c = ArchiveConsumer("nats://x", "postgresql://x")
c._monitoring_area = None
conn = AsyncMock()
msg = _make_msg(_envelope("wzdx", -74.0, 40.7))
await c._process_message(msg, conn)
conn.execute.assert_awaited_once()
assert c._dropped == {}