v0.10.2: monitoring-area bbox enforced at supervisor publish (was archive-only) (#PR_NUMBER_PLACEHOLDER)

Closes #86

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
malice 2026-06-05 20:34:10 -06:00 committed by GitHub
commit 1bebf2570b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 590 additions and 145 deletions

View file

@ -366,7 +366,7 @@ class FIRMSAdapter(SourceAdapter):
# Construct the satellite pixel footprint as a GeoJSON Polygon from
# scan/track. Falls back to centroid-only Geo if either dimension is
# missing/invalid — _build_geom_sql then stores a Point. We keep
# missing/invalid — build_geom_json then stores a Point. We keep
# centroid alongside geometry so consumers that read only centroid
# still work.
geometry = _pixel_polygon(lat, lon, row.get("scan"), row.get("track"))

View file

@ -9,18 +9,23 @@ import json
import logging
import signal
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import asyncpg
import nats
from shapely.geometry import box as _shapely_box, shape
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
from nats.js.errors import NotFoundError
from central.bootstrap_config import get_settings
from central.monitoring_area import (
MONITORING_AREA_REFRESH_S,
MonitoringArea,
build_geom_json,
classify_geom,
load_monitoring_area,
)
from central.streams import STREAMS as STREAM_REGISTRY
# Event-bearing streams to consume -- derived from the registry's event_bearing flag.
@ -30,9 +35,6 @@ STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearin
BATCH_SIZE = 100
FETCH_TIMEOUT = 5.0
ACK_WAIT = 30
# How often the archive re-reads the monitoring-area bbox from config.system so
# GUI edits take effect without a service restart (~this many seconds latency).
MONITORING_AREA_REFRESH_S = 60
def consumer_name_for(stream: str) -> str:
@ -75,83 +77,6 @@ def setup_logging() -> None:
logger = logging.getLogger("central.archive")
def _build_geom_sql(geo_data: dict[str, Any] | None) -> str | None:
"""Build PostGIS geometry from event geo data."""
if not geo_data:
return None
# A full GeoJSON geometry (e.g. flow-segment LineString) wins over the
# bbox/centroid fallbacks so the map renders the real shape. Inert for
# adapters that don't set geo.geometry.
geometry = geo_data.get("geometry")
if geometry:
return json.dumps(geometry)
bbox = geo_data.get("bbox")
centroid = geo_data.get("centroid")
if bbox and len(bbox) == 4:
# Create polygon from bbox
min_lon, min_lat, max_lon, max_lat = bbox
return json.dumps({
"type": "Polygon",
"coordinates": [[
[min_lon, min_lat],
[max_lon, min_lat],
[max_lon, max_lat],
[min_lon, max_lat],
[min_lon, min_lat],
]]
})
elif centroid and len(centroid) == 2:
# Create point from centroid
return json.dumps({
"type": "Point",
"coordinates": centroid
})
return None
@dataclass(frozen=True)
class MonitoringArea:
"""System-level bounding box that events must intersect to be archived."""
north: float
south: float
east: float
west: float
def as_box(self):
# shapely box(minx, miny, maxx, maxy) -> (west, south, east, north)
return _shapely_box(self.west, self.south, self.east, self.north)
def _classify_geom(geom_json: str | None, area: "MonitoringArea | None") -> str:
"""Classify an event geometry against the monitoring area (archive bbox filter).
Returns one of:
'null-geom' -- no geometry; always archived (SWPC trio, .removed tombstones)
'no-area' -- no monitoring area configured; archive everything
'in-bounds' -- geometry intersects the area; archive
'out-of-bounds' -- geometry lies entirely outside the area; drop
'invalid-geom' -- geometry could not be evaluated; archived (fail-open) + warn
Uses intersects() so border-straddlers are kept (matches PostGIS ST_Intersects).
The filter must never drop an event because of a parse failure: when in doubt,
keep it.
"""
if geom_json is None:
return "null-geom"
if area is None:
return "no-area"
try:
geom = shape(json.loads(geom_json))
return "in-bounds" if geom.intersects(area.as_box()) else "out-of-bounds"
except Exception:
return "invalid-geom"
class ArchiveConsumer:
"""Archive consumer process."""
@ -199,18 +124,7 @@ class ArchiveConsumer:
return
try:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT monitor_north, monitor_south, monitor_east, monitor_west "
"FROM config.system WHERE id = true"
)
cols = ("monitor_north", "monitor_south", "monitor_east", "monitor_west")
if row and all(row[c] is not None for c in cols):
self._monitoring_area = MonitoringArea(
north=row["monitor_north"], south=row["monitor_south"],
east=row["monitor_east"], west=row["monitor_west"],
)
else:
self._monitoring_area = None
self._monitoring_area = await load_monitoring_area(conn)
except Exception as e:
logger.warning(
"Could not load monitoring area; keeping previous value",
@ -321,9 +235,9 @@ class ArchiveConsumer:
await msg.ack()
return
geom_json = _build_geom_sql(geo_data)
geom_json = build_geom_json(geo_data)
verdict = _classify_geom(geom_json, self._monitoring_area)
verdict = classify_geom(geom_json, self._monitoring_area)
if verdict == "out-of-bounds":
self._dropped[adapter] = self._dropped.get(adapter, 0) + 1
logger.debug(

View file

@ -14,6 +14,7 @@ import asyncpg
from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig
from central.crypto import decrypt, encrypt
from central.monitoring_area import MonitoringArea, load_monitoring_area
logger = logging.getLogger(__name__)
@ -49,6 +50,20 @@ class ConfigStore:
"""Close the connection pool."""
await self._pool.close()
# -------------------------------------------------------------------------
# System configuration
# -------------------------------------------------------------------------
async def get_monitoring_area(self) -> MonitoringArea | None:
"""Read the system monitoring-area bbox from ``config.system``.
Returns ``None`` if no row is set or any monitor_* column is NULL.
Used by both archive (for the INSERT-time filter) and supervisor (for
the publish-time filter, v0.10.2).
"""
async with self._pool.acquire() as conn:
return await load_monitoring_area(conn)
# -------------------------------------------------------------------------
# Adapter configuration
# -------------------------------------------------------------------------

View file

@ -0,0 +1,130 @@
"""System monitoring-area bbox: shared geometry/classification helpers.
Originally lived inside ``central.archive`` (v0.9.12); lifted here in v0.10.2 so
the supervisor can enforce the same bbox at publish time and NATS subscribers
(meshai, Navi) stop seeing out-of-area events. Archive keeps using it as the
defense-in-depth layer between NATS and Postgres.
Behavior MUST stay byte-identical to archive's v0.9.12-v0.10.1 implementation:
``intersects()`` semantics (border-straddlers kept), fail-open on parse failure,
all-NULL column row -> ``None`` area (filter no-ops).
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import Any
import asyncpg
from shapely.geometry import box as _shapely_box
from shapely.geometry import shape
logger = logging.getLogger(__name__)
# How often archive / supervisor re-read the monitoring-area bbox from
# config.system so GUI edits propagate without a service restart.
MONITORING_AREA_REFRESH_S = 60
@dataclass(frozen=True)
class MonitoringArea:
"""System-level bounding box that events must intersect to be archived/published."""
north: float
south: float
east: float
west: float
def as_box(self):
# shapely box(minx, miny, maxx, maxy) -> (west, south, east, north)
return _shapely_box(self.west, self.south, self.east, self.north)
def build_geom_json(geo_data: dict[str, Any] | None) -> str | None:
"""Build a GeoJSON string from an event's ``Geo`` dict.
Priority matches archive's v0.9.8+ rule: a full ``geometry`` dict wins over
``bbox`` (rendered as a 4-corner Polygon) over ``centroid`` (rendered as a
Point). Returns ``None`` if no usable shape is present.
"""
if not geo_data:
return None
geometry = geo_data.get("geometry")
if geometry:
return json.dumps(geometry)
bbox = geo_data.get("bbox")
centroid = geo_data.get("centroid")
if bbox and len(bbox) == 4:
min_lon, min_lat, max_lon, max_lat = bbox
return json.dumps({
"type": "Polygon",
"coordinates": [[
[min_lon, min_lat],
[max_lon, min_lat],
[max_lon, max_lat],
[min_lon, max_lat],
[min_lon, min_lat],
]]
})
if centroid and len(centroid) == 2:
return json.dumps({
"type": "Point",
"coordinates": centroid,
})
return None
def classify_geom(geom_json: str | None, area: MonitoringArea | None) -> str:
"""Classify a GeoJSON string against the monitoring area.
Returns one of:
'null-geom' -- no geometry; always kept (SWPC trio, .removed tombstones)
'no-area' -- no monitoring area configured; keep everything
'in-bounds' -- geometry intersects the area; keep
'out-of-bounds' -- geometry lies entirely outside the area; drop
'invalid-geom' -- geometry could not be evaluated; kept (fail-open) + warn
Uses ``intersects()`` so border-straddlers and points-on-edge are kept
(matches PostGIS ``ST_Intersects``). The filter must never drop an event
because of a parse failure -- when in doubt, keep it.
"""
if geom_json is None:
return "null-geom"
if area is None:
return "no-area"
try:
geom = shape(json.loads(geom_json))
return "in-bounds" if geom.intersects(area.as_box()) else "out-of-bounds"
except Exception:
return "invalid-geom"
async def load_monitoring_area(conn: asyncpg.Connection) -> MonitoringArea | None:
"""Read the system monitoring area from ``config.system``.
Returns ``None`` when:
- no row exists (``id = true`` not set), or
- any of monitor_{north,south,east,west} is NULL.
Callers are expected to handle their own pool acquisition AND any exception
(archive's pattern: keep the last-known value on read failure and warn).
"""
row = await conn.fetchrow(
"SELECT monitor_north, monitor_south, monitor_east, monitor_west "
"FROM config.system WHERE id = true"
)
cols = ("monitor_north", "monitor_south", "monitor_east", "monitor_west")
if row and all(row[c] is not None for c in cols):
return MonitoringArea(
north=row["monitor_north"],
south=row["monitor_south"],
east=row["monitor_east"],
west=row["monitor_west"],
)
return None

View file

@ -31,6 +31,12 @@ from central.enrichment.backends.no_op import NoOpBackend
from central.enrichment.backends.photon import PhotonBackend
from central.enrichment.geocoder import GeocoderEnricher
from central.models import Event
from central.monitoring_area import (
MONITORING_AREA_REFRESH_S,
MonitoringArea,
build_geom_json,
classify_geom,
)
from central.stream_manager import StreamManager
from central.streams import STREAMS as STREAM_REGISTRY
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
@ -231,6 +237,12 @@ class Supervisor:
self._start_time = datetime.now(timezone.utc)
self._config_watch_task: asyncio.Task[None] | None = None
self._lock = asyncio.Lock()
# v0.10.2 publish-time monitoring-area filter: mirror archive's
# ACK-but-don't-insert behavior at the supervisor->NATS hop so
# subscribers (meshai, Navi) never see out-of-area events. Refreshed
# every MONITORING_AREA_REFRESH_S from config.system.
self._monitoring_area: MonitoringArea | None = None
self._dropped_publish: dict[str, int] = {}
async def connect(self) -> None:
"""Connect to NATS."""
@ -239,6 +251,25 @@ class Supervisor:
self._stream_manager = StreamManager(self._js)
logger.info("Connected to NATS", extra={"url": self._nats_url})
# Load the publish-time monitoring-area bbox (v0.10.2). Mirrors archive:
# on failure keep the last value and warn -- never block startup over a
# config read.
try:
self._monitoring_area = await self._config_store.get_monitoring_area()
except Exception as e:
logger.warning(
"Could not load monitoring area at startup; publish filter no-ops until refresh",
extra={"error": str(e)},
)
area = self._monitoring_area
logger.info(
"Publish-time monitoring area loaded",
extra={"monitoring_area": (
{"north": area.north, "south": area.south,
"east": area.east, "west": area.west} if area else None
)},
)
async def disconnect(self) -> None:
"""Disconnect from NATS."""
if self._nc:
@ -267,6 +298,31 @@ class Supervisor:
headers={"Nats-Msg-Id": msg_id},
)
async def _refresh_monitoring_area_loop(self) -> None:
"""Periodically refresh the publish-time monitoring-area bbox so GUI
edits propagate within ~MONITORING_AREA_REFRESH_S without a restart, and
log a rolling INFO summary of per-adapter drops. Mirrors archive's
``_refresh_monitoring_area_loop`` pattern -- same cadence, same fail-
open behavior (keep last value on read failure)."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(), timeout=MONITORING_AREA_REFRESH_S
)
except asyncio.TimeoutError:
try:
self._monitoring_area = await self._config_store.get_monitoring_area()
except Exception as e:
logger.warning(
"Could not refresh monitoring area; keeping previous value",
extra={"error": str(e)},
)
if self._dropped_publish:
logger.info(
"publish bbox filter drop summary (cumulative)",
extra={"dropped_by_adapter": dict(self._dropped_publish)},
)
def _create_adapter(self, config: AdapterConfig) -> SourceAdapter:
"""Create an adapter instance based on config name."""
cls = self._adapters.get(config.name)
@ -352,6 +408,36 @@ class Supervisor:
subject = state.adapter.subject_for(event)
# v0.10.2 publish-time monitoring-area filter. Mirrors
# archive's classify->ACK pattern but here we just `continue`
# without mark_published -- if the area widens later the
# next poll re-yields the same id and we'll publish it
# naturally. Marking published on drop would be a forward-
# only blackhole.
geom_json = build_geom_json(
event.geo.model_dump() if event.geo else None
)
verdict = classify_geom(geom_json, self._monitoring_area)
if verdict == "out-of-bounds":
self._dropped_publish[state.name] = (
self._dropped_publish.get(state.name, 0) + 1
)
logger.debug(
"Dropped out-of-bounds event at publish (monitoring-area filter)",
extra={
"id": event.id,
"adapter": state.name,
"category": event.category,
"subject": subject,
},
)
continue
if verdict == "invalid-geom":
logger.warning(
"Geom could not be evaluated for publish-time bbox filter; publishing",
extra={"id": event.id, "adapter": state.name},
)
# Publish
await self._publish_event(subject, envelope, msg_id)
state.adapter.mark_published(event.id)
@ -978,6 +1064,11 @@ class Supervisor:
# Start archived-events retention sweep loop (v0.9.13)
self._tasks.append(asyncio.create_task(self._events_retention_loop()))
# Start publish-time monitoring-area refresh loop (v0.10.2)
self._tasks.append(
asyncio.create_task(self._refresh_monitoring_area_loop())
)
logger.info(
"Supervisor started",
extra={"adapters": list(self._adapter_states.keys())},

View file

@ -1,55 +1,22 @@
"""Archive-level monitoring-area bbox filter (v0.9.12).
"""Archive-level monitoring-area bbox filter integration (v0.9.12).
Events whose geometry falls entirely outside the system monitoring area are
dropped at archive INSERT time; null-geom events and border-straddlers are kept.
The filter is fail-open: an unparseable geometry is archived (with a warning),
never dropped.
After v0.10.2 the ``MonitoringArea`` / ``classify_geom`` / ``build_geom_json``
primitives moved to ``central.monitoring_area`` -- their pure-unit tests live in
``test_monitoring_area.py``. This file keeps the end-to-end check that archive's
``_process_message`` still drops out-of-bounds events (ACK + counter, no INSERT)
and that null-geom / no-area paths still archive.
"""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from central.archive import ArchiveConsumer, MonitoringArea, _classify_geom
from central.archive import ArchiveConsumer
from central.monitoring_area import MonitoringArea
IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5)
def _pt(lon, lat):
return json.dumps({"type": "Point", "coordinates": [lon, lat]})
class TestClassifyGeom:
def test_null_geom_always_kept(self):
assert _classify_geom(None, IDAHO) == "null-geom"
def test_no_area_keeps_everything(self):
assert _classify_geom(_pt(-114.0, 43.5), None) == "no-area"
def test_in_bounds_kept(self):
assert _classify_geom(_pt(-114.0, 43.5), IDAHO) == "in-bounds"
def test_out_of_bounds_dropped(self):
assert _classify_geom(_pt(-74.0, 40.7), IDAHO) == "out-of-bounds"
def test_border_straddling_polygon_kept(self):
# Spans the western edge (west=-117.5): partly out, partly in -> kept.
poly = json.dumps({
"type": "Polygon",
"coordinates": [[[-119, 42], [-116, 42], [-116, 43], [-119, 43], [-119, 42]]],
})
assert _classify_geom(poly, IDAHO) == "in-bounds"
def test_point_exactly_on_border_kept(self):
assert _classify_geom(_pt(-117.5, 43.0), IDAHO) == "in-bounds"
def test_unparseable_geom_kept(self):
assert _classify_geom("{not valid json", IDAHO) == "invalid-geom"
def test_unknown_geom_type_kept(self):
assert _classify_geom(json.dumps({"type": "Nonsense"}), IDAHO) == "invalid-geom"
def _make_msg(envelope):
msg = MagicMock()
msg.data = json.dumps(envelope).encode()

View file

@ -14,7 +14,7 @@ from central.adapters.firms import (
SATELLITE_SHORT,
_pixel_polygon,
)
from central.archive import _build_geom_sql
from central.monitoring_area import build_geom_json
from central.config_models import AdapterConfig
from central.models import Event, Geo
@ -553,7 +553,7 @@ class TestPixelPolygon:
class TestGeoGeometryRoundTripsThroughArchive:
"""Regression guard: FIRMS geo.geometry must reach _build_geom_sql as Polygon."""
"""Regression guard: FIRMS geo.geometry must reach build_geom_json as Polygon."""
@pytest.mark.asyncio
async def test_geo_geometry_round_trips_through_archive_path(
@ -572,7 +572,7 @@ class TestGeoGeometryRoundTripsThroughArchive:
# Simulate what archive does: serialize geo to dict and run it through
# the same helper that produces the PostGIS geom clause.
geo_dict = event.geo.model_dump()
sql_clause = _build_geom_sql(geo_dict)
sql_clause = build_geom_json(geo_dict)
assert sql_clause is not None
decoded = json.loads(sql_clause)
assert decoded["type"] == "Polygon"
@ -600,6 +600,6 @@ class TestGeoGeometryRoundTripsThroughArchive:
assert event.geo.geometry is None
geo_dict = event.geo.model_dump()
sql_clause = _build_geom_sql(geo_dict)
sql_clause = build_geom_json(geo_dict)
assert sql_clause is not None
assert json.loads(sql_clause)["type"] == "Point"

View file

@ -0,0 +1,142 @@
"""Unit tests for the shared monitoring-area module (v0.10.2).
Covers the four exports:
- ``MonitoringArea.as_box`` (shapely box construction)
- ``build_geom_json`` (all five Geo shapes the archive sees)
- ``classify_geom`` (the five verdicts)
- ``load_monitoring_area`` (DB read; None on missing-row / NULL-column)
The classify_geom + as_box assertions are the v0.9.12 archive bbox tests
lifted out of ``test_archive_bbox_filter.py`` and expanded with edge cases.
"""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from shapely.geometry import Polygon, box as shapely_box
from central.monitoring_area import (
MonitoringArea,
build_geom_json,
classify_geom,
load_monitoring_area,
)
IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5)
def _pt(lon, lat):
return json.dumps({"type": "Point", "coordinates": [lon, lat]})
class TestMonitoringAreaAsBox:
def test_as_box_returns_shapely_polygon(self):
b = IDAHO.as_box()
assert isinstance(b, Polygon)
def test_as_box_corners_match_west_south_east_north(self):
# shapely box(minx, miny, maxx, maxy) -> envelope (west, south, east, north)
expected = shapely_box(-117.5, 41.8, -111.0, 44.5)
assert IDAHO.as_box().equals(expected)
class TestBuildGeomJson:
def test_none_input_returns_none(self):
assert build_geom_json(None) is None
def test_empty_dict_returns_none(self):
assert build_geom_json({}) is None
def test_full_geometry_wins_over_bbox(self):
# If a real geometry is present it MUST be used verbatim (this is the
# v0.9.8 wfigs/tomtom invariant -- the map needs the real shape).
geom = {"type": "LineString", "coordinates": [[-114, 43], [-115, 44]]}
out = build_geom_json({
"geometry": geom,
"bbox": [-115, 43, -114, 44],
"centroid": [-114.5, 43.5],
})
assert json.loads(out) == geom
def test_bbox_rendered_as_closed_polygon(self):
out = build_geom_json({"bbox": [-117, 42, -111, 44]})
parsed = json.loads(out)
assert parsed["type"] == "Polygon"
coords = parsed["coordinates"][0]
# 5 points: 4 corners + closing duplicate
assert len(coords) == 5
assert coords[0] == coords[-1]
assert coords[0] == [-117, 42]
def test_centroid_rendered_as_point(self):
out = build_geom_json({"centroid": [-114.5, 43.5]})
assert json.loads(out) == {"type": "Point", "coordinates": [-114.5, 43.5]}
def test_partial_bbox_falls_through(self):
# An invalid 3-element bbox should not produce a 3-corner polygon;
# caller is expected to fall through to centroid or return None.
assert build_geom_json({"bbox": [-117, 42, -111]}) is None
def test_centroid_wins_when_bbox_invalid(self):
out = build_geom_json({"bbox": [-117, 42, -111], "centroid": [-114, 43]})
assert json.loads(out) == {"type": "Point", "coordinates": [-114, 43]}
class TestClassifyGeom:
def test_null_geom_always_kept(self):
assert classify_geom(None, IDAHO) == "null-geom"
def test_null_geom_kept_even_without_area(self):
assert classify_geom(None, None) == "null-geom"
def test_no_area_keeps_everything(self):
assert classify_geom(_pt(-114.0, 43.5), None) == "no-area"
def test_in_bounds_kept(self):
assert classify_geom(_pt(-114.0, 43.5), IDAHO) == "in-bounds"
def test_out_of_bounds_dropped(self):
assert classify_geom(_pt(-74.0, 40.7), IDAHO) == "out-of-bounds"
def test_border_straddling_polygon_kept(self):
# Spans the western edge (west=-117.5): partly out, partly in -> kept.
poly = json.dumps({
"type": "Polygon",
"coordinates": [[[-119, 42], [-116, 42], [-116, 43], [-119, 43], [-119, 42]]],
})
assert classify_geom(poly, IDAHO) == "in-bounds"
def test_point_exactly_on_border_kept(self):
assert classify_geom(_pt(-117.5, 43.0), IDAHO) == "in-bounds"
def test_unparseable_geom_keeps_failopen(self):
assert classify_geom("{not valid json", IDAHO) == "invalid-geom"
def test_unknown_geom_type_keeps_failopen(self):
assert classify_geom(json.dumps({"type": "Nonsense"}), IDAHO) == "invalid-geom"
@pytest.mark.asyncio
class TestLoadMonitoringArea:
async def test_returns_area_when_all_columns_set(self):
conn = MagicMock()
conn.fetchrow = AsyncMock(return_value={
"monitor_north": 44.5, "monitor_south": 41.8,
"monitor_east": -111.0, "monitor_west": -117.5,
})
area = await load_monitoring_area(conn)
assert area == IDAHO
async def test_returns_none_when_no_row(self):
conn = MagicMock()
conn.fetchrow = AsyncMock(return_value=None)
assert await load_monitoring_area(conn) is None
async def test_returns_none_when_any_column_null(self):
conn = MagicMock()
conn.fetchrow = AsyncMock(return_value={
"monitor_north": 44.5, "monitor_south": None,
"monitor_east": -111.0, "monitor_west": -117.5,
})
assert await load_monitoring_area(conn) is None

View file

@ -0,0 +1,186 @@
"""Supervisor publish-time monitoring-area filter (v0.10.2).
Covers the wire-up between ``subject_for`` and ``_publish_event`` in
``Supervisor._run_adapter_loop``: out-of-area drops, in-area publishes,
null-geom passes, invalid-geom fails open, and the refresh-loop reload.
"""
import asyncio
import logging
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from central import supervisor as sup_mod
from central.config_models import AdapterConfig, EnrichmentConfig
from central.models import Event, Geo
from central.monitoring_area import MonitoringArea
IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5)
SPOKANE = (-117.4, 47.6)
BOISE = (-114.0, 43.5)
NYC = (-74.0, 40.7)
def _ev(eid: str, geo: Geo) -> Event:
return Event(
id=eid, adapter="mock", category="mock.test",
time=datetime.now(timezone.utc), geo=geo, data={},
)
class _MockAdapter:
requires_api_key = None
enrichment_locations = ()
def __init__(self, config, *_args) -> None:
self.config = config
self.cadence_s = config.cadence_s
self.events: list[Event] = []
self.published: set[str] = set()
self.done = asyncio.Event()
self._called = False
async def startup(self): ...
async def shutdown(self): ...
async def apply_config(self, c): ...
async def poll(self):
if not self._called:
self._called = True
for e in self.events:
yield e
self.done.set()
def is_published(self, eid): return eid in self.published
def mark_published(self, eid): self.published.add(eid)
def bump_last_seen(self, eid): ...
def sweep_old_ids(self): return 0
def subject_for(self, e): return f"central.test.{e.id}"
@pytest.fixture
def sup_factory():
"""Build a supervisor with mocked NATS + ConfigStore; spy on _publish_event."""
def _build(area: MonitoringArea | None):
nc = AsyncMock(); js = AsyncMock(); js.publish = AsyncMock()
nc.jetstream = MagicMock(return_value=js)
store = MagicMock()
store.list_streams = AsyncMock(return_value=[])
store.get_stream = AsyncMock(return_value=None)
store.set_adapter_last_error = AsyncMock()
store.get_api_key = AsyncMock(return_value=None)
store.get_monitoring_area = AsyncMock(return_value=area)
config_source = MagicMock()
config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig())
sup = sup_mod.Supervisor(
config_source=config_source, config_store=store,
nats_url="nats://x:4222", cloudevents_config=None,
enrichment_config=EnrichmentConfig(),
)
sup._nc = nc; sup._js = nc.jetstream()
sup._monitoring_area = area
sup._publish_event = AsyncMock()
sup._publish_meta = AsyncMock()
sup._adapters["mock"] = _MockAdapter
return sup
return _build
async def _drive(sup, events):
adapter = _MockAdapter(MagicMock(cadence_s=3600))
adapter.events = events
config = AdapterConfig(
name="mock", enabled=True, cadence_s=3600, settings={},
paused_at=None, updated_at=datetime.now(timezone.utc),
)
state = sup_mod.AdapterState(name="mock", config=config, adapter=adapter)
task = asyncio.create_task(sup._run_adapter_loop(state))
try:
await asyncio.wait_for(adapter.done.wait(), timeout=5.0)
await asyncio.sleep(0)
finally:
# Cancel ONLY this loop -- never touch sup._shutdown_event so callers
# can drive the same supervisor through multiple poll cycles.
task.cancel()
try: await task
except (asyncio.CancelledError, Exception): pass
return adapter
# --- verdict matrix -----------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize("area,geo,should_publish,should_drop", [
pytest.param(None, Geo(centroid=NYC), True, False, id="no-area-publishes-out-of-bbox"),
pytest.param(IDAHO, Geo(centroid=BOISE), True, False, id="in-bounds-publishes"),
pytest.param(IDAHO, Geo(centroid=NYC), False, True, id="out-of-bounds-NY-drops"),
pytest.param(IDAHO, Geo(centroid=SPOKANE), False, True, id="out-of-bounds-Spokane-drops"),
pytest.param(IDAHO, Geo(), True, False, id="null-geom-publishes"),
pytest.param(IDAHO, Geo(geometry={"type": "Nonsense"}), True, False, id="invalid-geom-fail-open"),
])
async def test_verdict(sup_factory, area, geo, should_publish, should_drop):
sup = sup_factory(area)
a = await _drive(sup, [_ev("e1", geo)])
if should_publish:
assert sup._publish_event.await_count == 1
assert a.published == {"e1"}
else:
sup._publish_event.assert_not_called()
# CRITICAL forward-only: drops MUST NOT mark_published, else widening
# the bbox can't re-deliver -- see [[feedback_dedup_forward_only]].
assert a.published == set()
assert (sup._dropped_publish == {"mock": 1}) is should_drop
@pytest.mark.asyncio
async def test_mixed_batch_partitions_correctly(sup_factory):
sup = sup_factory(IDAHO)
a = await _drive(sup, [
_ev("in1", Geo(centroid=BOISE)),
_ev("out1", Geo(centroid=NYC)),
_ev("null1", Geo()),
_ev("out2", Geo(centroid=SPOKANE)),
])
assert sup._publish_event.await_count == 2
assert a.published == {"in1", "null1"}
assert sup._dropped_publish == {"mock": 2}
@pytest.mark.asyncio
async def test_widening_area_re_publishes_previously_dropped(sup_factory):
"""Forward-only invariant: drops are reversible -- never mark_published."""
sup = sup_factory(IDAHO)
spokane = _ev("spokane", Geo(centroid=SPOKANE))
# The same supervisor drives two polls so the second sees the same id
# AFTER the bbox widens. Need a fresh adapter each call because _drive's
# MockAdapter signals done once per instance.
a1 = await _drive(sup, [spokane])
sup._publish_event.assert_not_called()
assert a1.published == set()
sup._monitoring_area = MonitoringArea(
north=48.5, south=41.8, east=-111.0, west=-118.0
)
a2 = await _drive(sup, [spokane])
assert sup._publish_event.await_count == 1
assert a2.published == {"spokane"}
@pytest.mark.asyncio
async def test_refresh_loop_reloads_area_and_logs_summary(
sup_factory, caplog, monkeypatch
):
sup = sup_factory(None)
sup._dropped_publish = {"mock": 7}
monkeypatch.setattr(sup_mod, "MONITORING_AREA_REFRESH_S", 0.05)
sup._config_store.get_monitoring_area = AsyncMock(return_value=IDAHO)
with caplog.at_level(logging.INFO):
task = asyncio.create_task(sup._refresh_monitoring_area_loop())
await asyncio.sleep(0.15)
sup._shutdown_event.set()
try: await asyncio.wait_for(task, timeout=1.0)
except (asyncio.TimeoutError, asyncio.CancelledError): task.cancel()
assert sup._monitoring_area == IDAHO
assert any(
"publish bbox filter drop summary" in r.message for r in caplog.records
)

View file

@ -17,7 +17,7 @@ import pytest
from central.adapter import SourceAdapter
from central.adapters.tomtom_flow import TomTomFlowAdapter
from central.archive import _build_geom_sql
from central.monitoring_area import build_geom_json
from central.config_models import AdapterConfig
from central.tomtom_flow_parse import (
_local_to_lonlat,
@ -83,10 +83,10 @@ def test_subject_for():
def test_archive_prefers_geo_geometry():
line = {"type": "LineString", "coordinates": [[-116.2, 43.6], [-116.1, 43.7]]}
# geometry present -> returned verbatim (not bbox/centroid)
out = _build_geom_sql({"geometry": line, "centroid": [-116.2, 43.6], "bbox": [-116.3, 43.5, -116.0, 43.8]})
out = build_geom_json({"geometry": line, "centroid": [-116.2, 43.6], "bbox": [-116.3, 43.5, -116.0, 43.8]})
assert json.loads(out) == line
# no geometry -> falls back to centroid Point (regression guard)
out2 = _build_geom_sql({"centroid": [-116.2, 43.6]})
out2 = build_geom_json({"centroid": [-116.2, 43.6]})
assert json.loads(out2)["type"] == "Point"