From 1bebf2570be9e3b0ca879b0c2b7cea63a14bc710 Mon Sep 17 00:00:00 2001 From: malice Date: Fri, 5 Jun 2026 20:34:10 -0600 Subject: [PATCH] v0.10.2: monitoring-area bbox enforced at supervisor publish (was archive-only) (#PR_NUMBER_PLACEHOLDER) Closes #86 Co-Authored-By: Claude Opus 4.7 (1M context) --- src/central/adapters/firms.py | 2 +- src/central/archive.py | 106 ++------------ src/central/config_store.py | 15 ++ src/central/monitoring_area.py | 130 +++++++++++++++++ src/central/supervisor.py | 91 ++++++++++++ tests/test_archive_bbox_filter.py | 49 +------ tests/test_firms.py | 8 +- tests/test_monitoring_area.py | 142 ++++++++++++++++++ tests/test_supervisor_publish_filter.py | 186 ++++++++++++++++++++++++ tests/test_tomtom_flow.py | 6 +- 10 files changed, 590 insertions(+), 145 deletions(-) create mode 100644 src/central/monitoring_area.py create mode 100644 tests/test_monitoring_area.py create mode 100644 tests/test_supervisor_publish_filter.py diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 014a74b..52ee5b1 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -366,7 +366,7 @@ class FIRMSAdapter(SourceAdapter): # Construct the satellite pixel footprint as a GeoJSON Polygon from # scan/track. Falls back to centroid-only Geo if either dimension is - # missing/invalid — _build_geom_sql then stores a Point. We keep + # missing/invalid — build_geom_json then stores a Point. We keep # centroid alongside geometry so consumers that read only centroid # still work. geometry = _pixel_polygon(lat, lon, row.get("scan"), row.get("track")) diff --git a/src/central/archive.py b/src/central/archive.py index f7765d0..a28494f 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -9,18 +9,23 @@ import json import logging import signal import sys -from dataclasses import dataclass from datetime import datetime, timezone from typing import Any import asyncpg import nats -from shapely.geometry import box as _shapely_box, shape from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy from nats.js.errors import NotFoundError from central.bootstrap_config import get_settings +from central.monitoring_area import ( + MONITORING_AREA_REFRESH_S, + MonitoringArea, + build_geom_json, + classify_geom, + load_monitoring_area, +) from central.streams import STREAMS as STREAM_REGISTRY # Event-bearing streams to consume -- derived from the registry's event_bearing flag. @@ -30,9 +35,6 @@ STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearin BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 ACK_WAIT = 30 -# How often the archive re-reads the monitoring-area bbox from config.system so -# GUI edits take effect without a service restart (~this many seconds latency). -MONITORING_AREA_REFRESH_S = 60 def consumer_name_for(stream: str) -> str: @@ -75,83 +77,6 @@ def setup_logging() -> None: logger = logging.getLogger("central.archive") -def _build_geom_sql(geo_data: dict[str, Any] | None) -> str | None: - """Build PostGIS geometry from event geo data.""" - if not geo_data: - return None - - # A full GeoJSON geometry (e.g. flow-segment LineString) wins over the - # bbox/centroid fallbacks so the map renders the real shape. Inert for - # adapters that don't set geo.geometry. - geometry = geo_data.get("geometry") - if geometry: - return json.dumps(geometry) - - bbox = geo_data.get("bbox") - centroid = geo_data.get("centroid") - - if bbox and len(bbox) == 4: - # Create polygon from bbox - min_lon, min_lat, max_lon, max_lat = bbox - return json.dumps({ - "type": "Polygon", - "coordinates": [[ - [min_lon, min_lat], - [max_lon, min_lat], - [max_lon, max_lat], - [min_lon, max_lat], - [min_lon, min_lat], - ]] - }) - elif centroid and len(centroid) == 2: - # Create point from centroid - return json.dumps({ - "type": "Point", - "coordinates": centroid - }) - - return None - - -@dataclass(frozen=True) -class MonitoringArea: - """System-level bounding box that events must intersect to be archived.""" - - north: float - south: float - east: float - west: float - - def as_box(self): - # shapely box(minx, miny, maxx, maxy) -> (west, south, east, north) - return _shapely_box(self.west, self.south, self.east, self.north) - - -def _classify_geom(geom_json: str | None, area: "MonitoringArea | None") -> str: - """Classify an event geometry against the monitoring area (archive bbox filter). - - Returns one of: - 'null-geom' -- no geometry; always archived (SWPC trio, .removed tombstones) - 'no-area' -- no monitoring area configured; archive everything - 'in-bounds' -- geometry intersects the area; archive - 'out-of-bounds' -- geometry lies entirely outside the area; drop - 'invalid-geom' -- geometry could not be evaluated; archived (fail-open) + warn - - Uses intersects() so border-straddlers are kept (matches PostGIS ST_Intersects). - The filter must never drop an event because of a parse failure: when in doubt, - keep it. - """ - if geom_json is None: - return "null-geom" - if area is None: - return "no-area" - try: - geom = shape(json.loads(geom_json)) - return "in-bounds" if geom.intersects(area.as_box()) else "out-of-bounds" - except Exception: - return "invalid-geom" - - class ArchiveConsumer: """Archive consumer process.""" @@ -199,18 +124,7 @@ class ArchiveConsumer: return try: async with self._pool.acquire() as conn: - row = await conn.fetchrow( - "SELECT monitor_north, monitor_south, monitor_east, monitor_west " - "FROM config.system WHERE id = true" - ) - cols = ("monitor_north", "monitor_south", "monitor_east", "monitor_west") - if row and all(row[c] is not None for c in cols): - self._monitoring_area = MonitoringArea( - north=row["monitor_north"], south=row["monitor_south"], - east=row["monitor_east"], west=row["monitor_west"], - ) - else: - self._monitoring_area = None + self._monitoring_area = await load_monitoring_area(conn) except Exception as e: logger.warning( "Could not load monitoring area; keeping previous value", @@ -321,9 +235,9 @@ class ArchiveConsumer: await msg.ack() return - geom_json = _build_geom_sql(geo_data) + geom_json = build_geom_json(geo_data) - verdict = _classify_geom(geom_json, self._monitoring_area) + verdict = classify_geom(geom_json, self._monitoring_area) if verdict == "out-of-bounds": self._dropped[adapter] = self._dropped.get(adapter, 0) + 1 logger.debug( diff --git a/src/central/config_store.py b/src/central/config_store.py index bd0b199..0044845 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -14,6 +14,7 @@ import asyncpg from central.config_models import AdapterConfig, EnrichmentConfig, StreamConfig from central.crypto import decrypt, encrypt +from central.monitoring_area import MonitoringArea, load_monitoring_area logger = logging.getLogger(__name__) @@ -49,6 +50,20 @@ class ConfigStore: """Close the connection pool.""" await self._pool.close() + # ------------------------------------------------------------------------- + # System configuration + # ------------------------------------------------------------------------- + + async def get_monitoring_area(self) -> MonitoringArea | None: + """Read the system monitoring-area bbox from ``config.system``. + + Returns ``None`` if no row is set or any monitor_* column is NULL. + Used by both archive (for the INSERT-time filter) and supervisor (for + the publish-time filter, v0.10.2). + """ + async with self._pool.acquire() as conn: + return await load_monitoring_area(conn) + # ------------------------------------------------------------------------- # Adapter configuration # ------------------------------------------------------------------------- diff --git a/src/central/monitoring_area.py b/src/central/monitoring_area.py new file mode 100644 index 0000000..c52dc39 --- /dev/null +++ b/src/central/monitoring_area.py @@ -0,0 +1,130 @@ +"""System monitoring-area bbox: shared geometry/classification helpers. + +Originally lived inside ``central.archive`` (v0.9.12); lifted here in v0.10.2 so +the supervisor can enforce the same bbox at publish time and NATS subscribers +(meshai, Navi) stop seeing out-of-area events. Archive keeps using it as the +defense-in-depth layer between NATS and Postgres. + +Behavior MUST stay byte-identical to archive's v0.9.12-v0.10.1 implementation: +``intersects()`` semantics (border-straddlers kept), fail-open on parse failure, +all-NULL column row -> ``None`` area (filter no-ops). +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from typing import Any + +import asyncpg +from shapely.geometry import box as _shapely_box +from shapely.geometry import shape + +logger = logging.getLogger(__name__) + +# How often archive / supervisor re-read the monitoring-area bbox from +# config.system so GUI edits propagate without a service restart. +MONITORING_AREA_REFRESH_S = 60 + + +@dataclass(frozen=True) +class MonitoringArea: + """System-level bounding box that events must intersect to be archived/published.""" + + north: float + south: float + east: float + west: float + + def as_box(self): + # shapely box(minx, miny, maxx, maxy) -> (west, south, east, north) + return _shapely_box(self.west, self.south, self.east, self.north) + + +def build_geom_json(geo_data: dict[str, Any] | None) -> str | None: + """Build a GeoJSON string from an event's ``Geo`` dict. + + Priority matches archive's v0.9.8+ rule: a full ``geometry`` dict wins over + ``bbox`` (rendered as a 4-corner Polygon) over ``centroid`` (rendered as a + Point). Returns ``None`` if no usable shape is present. + """ + if not geo_data: + return None + + geometry = geo_data.get("geometry") + if geometry: + return json.dumps(geometry) + + bbox = geo_data.get("bbox") + centroid = geo_data.get("centroid") + + if bbox and len(bbox) == 4: + min_lon, min_lat, max_lon, max_lat = bbox + return json.dumps({ + "type": "Polygon", + "coordinates": [[ + [min_lon, min_lat], + [max_lon, min_lat], + [max_lon, max_lat], + [min_lon, max_lat], + [min_lon, min_lat], + ]] + }) + if centroid and len(centroid) == 2: + return json.dumps({ + "type": "Point", + "coordinates": centroid, + }) + + return None + + +def classify_geom(geom_json: str | None, area: MonitoringArea | None) -> str: + """Classify a GeoJSON string against the monitoring area. + + Returns one of: + 'null-geom' -- no geometry; always kept (SWPC trio, .removed tombstones) + 'no-area' -- no monitoring area configured; keep everything + 'in-bounds' -- geometry intersects the area; keep + 'out-of-bounds' -- geometry lies entirely outside the area; drop + 'invalid-geom' -- geometry could not be evaluated; kept (fail-open) + warn + + Uses ``intersects()`` so border-straddlers and points-on-edge are kept + (matches PostGIS ``ST_Intersects``). The filter must never drop an event + because of a parse failure -- when in doubt, keep it. + """ + if geom_json is None: + return "null-geom" + if area is None: + return "no-area" + try: + geom = shape(json.loads(geom_json)) + return "in-bounds" if geom.intersects(area.as_box()) else "out-of-bounds" + except Exception: + return "invalid-geom" + + +async def load_monitoring_area(conn: asyncpg.Connection) -> MonitoringArea | None: + """Read the system monitoring area from ``config.system``. + + Returns ``None`` when: + - no row exists (``id = true`` not set), or + - any of monitor_{north,south,east,west} is NULL. + + Callers are expected to handle their own pool acquisition AND any exception + (archive's pattern: keep the last-known value on read failure and warn). + """ + row = await conn.fetchrow( + "SELECT monitor_north, monitor_south, monitor_east, monitor_west " + "FROM config.system WHERE id = true" + ) + cols = ("monitor_north", "monitor_south", "monitor_east", "monitor_west") + if row and all(row[c] is not None for c in cols): + return MonitoringArea( + north=row["monitor_north"], + south=row["monitor_south"], + east=row["monitor_east"], + west=row["monitor_west"], + ) + return None diff --git a/src/central/supervisor.py b/src/central/supervisor.py index b54cf1b..55dd8c6 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -31,6 +31,12 @@ from central.enrichment.backends.no_op import NoOpBackend from central.enrichment.backends.photon import PhotonBackend from central.enrichment.geocoder import GeocoderEnricher from central.models import Event +from central.monitoring_area import ( + MONITORING_AREA_REFRESH_S, + MonitoringArea, + build_geom_json, + classify_geom, +) from central.stream_manager import StreamManager from central.streams import STREAMS as STREAM_REGISTRY CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") @@ -231,6 +237,12 @@ class Supervisor: self._start_time = datetime.now(timezone.utc) self._config_watch_task: asyncio.Task[None] | None = None self._lock = asyncio.Lock() + # v0.10.2 publish-time monitoring-area filter: mirror archive's + # ACK-but-don't-insert behavior at the supervisor->NATS hop so + # subscribers (meshai, Navi) never see out-of-area events. Refreshed + # every MONITORING_AREA_REFRESH_S from config.system. + self._monitoring_area: MonitoringArea | None = None + self._dropped_publish: dict[str, int] = {} async def connect(self) -> None: """Connect to NATS.""" @@ -239,6 +251,25 @@ class Supervisor: self._stream_manager = StreamManager(self._js) logger.info("Connected to NATS", extra={"url": self._nats_url}) + # Load the publish-time monitoring-area bbox (v0.10.2). Mirrors archive: + # on failure keep the last value and warn -- never block startup over a + # config read. + try: + self._monitoring_area = await self._config_store.get_monitoring_area() + except Exception as e: + logger.warning( + "Could not load monitoring area at startup; publish filter no-ops until refresh", + extra={"error": str(e)}, + ) + area = self._monitoring_area + logger.info( + "Publish-time monitoring area loaded", + extra={"monitoring_area": ( + {"north": area.north, "south": area.south, + "east": area.east, "west": area.west} if area else None + )}, + ) + async def disconnect(self) -> None: """Disconnect from NATS.""" if self._nc: @@ -267,6 +298,31 @@ class Supervisor: headers={"Nats-Msg-Id": msg_id}, ) + async def _refresh_monitoring_area_loop(self) -> None: + """Periodically refresh the publish-time monitoring-area bbox so GUI + edits propagate within ~MONITORING_AREA_REFRESH_S without a restart, and + log a rolling INFO summary of per-adapter drops. Mirrors archive's + ``_refresh_monitoring_area_loop`` pattern -- same cadence, same fail- + open behavior (keep last value on read failure).""" + while not self._shutdown_event.is_set(): + try: + await asyncio.wait_for( + self._shutdown_event.wait(), timeout=MONITORING_AREA_REFRESH_S + ) + except asyncio.TimeoutError: + try: + self._monitoring_area = await self._config_store.get_monitoring_area() + except Exception as e: + logger.warning( + "Could not refresh monitoring area; keeping previous value", + extra={"error": str(e)}, + ) + if self._dropped_publish: + logger.info( + "publish bbox filter drop summary (cumulative)", + extra={"dropped_by_adapter": dict(self._dropped_publish)}, + ) + def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: """Create an adapter instance based on config name.""" cls = self._adapters.get(config.name) @@ -352,6 +408,36 @@ class Supervisor: subject = state.adapter.subject_for(event) + # v0.10.2 publish-time monitoring-area filter. Mirrors + # archive's classify->ACK pattern but here we just `continue` + # without mark_published -- if the area widens later the + # next poll re-yields the same id and we'll publish it + # naturally. Marking published on drop would be a forward- + # only blackhole. + geom_json = build_geom_json( + event.geo.model_dump() if event.geo else None + ) + verdict = classify_geom(geom_json, self._monitoring_area) + if verdict == "out-of-bounds": + self._dropped_publish[state.name] = ( + self._dropped_publish.get(state.name, 0) + 1 + ) + logger.debug( + "Dropped out-of-bounds event at publish (monitoring-area filter)", + extra={ + "id": event.id, + "adapter": state.name, + "category": event.category, + "subject": subject, + }, + ) + continue + if verdict == "invalid-geom": + logger.warning( + "Geom could not be evaluated for publish-time bbox filter; publishing", + extra={"id": event.id, "adapter": state.name}, + ) + # Publish await self._publish_event(subject, envelope, msg_id) state.adapter.mark_published(event.id) @@ -978,6 +1064,11 @@ class Supervisor: # Start archived-events retention sweep loop (v0.9.13) self._tasks.append(asyncio.create_task(self._events_retention_loop())) + # Start publish-time monitoring-area refresh loop (v0.10.2) + self._tasks.append( + asyncio.create_task(self._refresh_monitoring_area_loop()) + ) + logger.info( "Supervisor started", extra={"adapters": list(self._adapter_states.keys())}, diff --git a/tests/test_archive_bbox_filter.py b/tests/test_archive_bbox_filter.py index 14908cd..be25e68 100644 --- a/tests/test_archive_bbox_filter.py +++ b/tests/test_archive_bbox_filter.py @@ -1,55 +1,22 @@ -"""Archive-level monitoring-area bbox filter (v0.9.12). +"""Archive-level monitoring-area bbox filter integration (v0.9.12). -Events whose geometry falls entirely outside the system monitoring area are -dropped at archive INSERT time; null-geom events and border-straddlers are kept. -The filter is fail-open: an unparseable geometry is archived (with a warning), -never dropped. +After v0.10.2 the ``MonitoringArea`` / ``classify_geom`` / ``build_geom_json`` +primitives moved to ``central.monitoring_area`` -- their pure-unit tests live in +``test_monitoring_area.py``. This file keeps the end-to-end check that archive's +``_process_message`` still drops out-of-bounds events (ACK + counter, no INSERT) +and that null-geom / no-area paths still archive. """ import json import pytest from unittest.mock import AsyncMock, MagicMock -from central.archive import ArchiveConsumer, MonitoringArea, _classify_geom +from central.archive import ArchiveConsumer +from central.monitoring_area import MonitoringArea IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5) -def _pt(lon, lat): - return json.dumps({"type": "Point", "coordinates": [lon, lat]}) - - -class TestClassifyGeom: - def test_null_geom_always_kept(self): - assert _classify_geom(None, IDAHO) == "null-geom" - - def test_no_area_keeps_everything(self): - assert _classify_geom(_pt(-114.0, 43.5), None) == "no-area" - - def test_in_bounds_kept(self): - assert _classify_geom(_pt(-114.0, 43.5), IDAHO) == "in-bounds" - - def test_out_of_bounds_dropped(self): - assert _classify_geom(_pt(-74.0, 40.7), IDAHO) == "out-of-bounds" - - def test_border_straddling_polygon_kept(self): - # Spans the western edge (west=-117.5): partly out, partly in -> kept. - poly = json.dumps({ - "type": "Polygon", - "coordinates": [[[-119, 42], [-116, 42], [-116, 43], [-119, 43], [-119, 42]]], - }) - assert _classify_geom(poly, IDAHO) == "in-bounds" - - def test_point_exactly_on_border_kept(self): - assert _classify_geom(_pt(-117.5, 43.0), IDAHO) == "in-bounds" - - def test_unparseable_geom_kept(self): - assert _classify_geom("{not valid json", IDAHO) == "invalid-geom" - - def test_unknown_geom_type_kept(self): - assert _classify_geom(json.dumps({"type": "Nonsense"}), IDAHO) == "invalid-geom" - - def _make_msg(envelope): msg = MagicMock() msg.data = json.dumps(envelope).encode() diff --git a/tests/test_firms.py b/tests/test_firms.py index 55f7d15..2e2fb74 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -14,7 +14,7 @@ from central.adapters.firms import ( SATELLITE_SHORT, _pixel_polygon, ) -from central.archive import _build_geom_sql +from central.monitoring_area import build_geom_json from central.config_models import AdapterConfig from central.models import Event, Geo @@ -553,7 +553,7 @@ class TestPixelPolygon: class TestGeoGeometryRoundTripsThroughArchive: - """Regression guard: FIRMS geo.geometry must reach _build_geom_sql as Polygon.""" + """Regression guard: FIRMS geo.geometry must reach build_geom_json as Polygon.""" @pytest.mark.asyncio async def test_geo_geometry_round_trips_through_archive_path( @@ -572,7 +572,7 @@ class TestGeoGeometryRoundTripsThroughArchive: # Simulate what archive does: serialize geo to dict and run it through # the same helper that produces the PostGIS geom clause. geo_dict = event.geo.model_dump() - sql_clause = _build_geom_sql(geo_dict) + sql_clause = build_geom_json(geo_dict) assert sql_clause is not None decoded = json.loads(sql_clause) assert decoded["type"] == "Polygon" @@ -600,6 +600,6 @@ class TestGeoGeometryRoundTripsThroughArchive: assert event.geo.geometry is None geo_dict = event.geo.model_dump() - sql_clause = _build_geom_sql(geo_dict) + sql_clause = build_geom_json(geo_dict) assert sql_clause is not None assert json.loads(sql_clause)["type"] == "Point" diff --git a/tests/test_monitoring_area.py b/tests/test_monitoring_area.py new file mode 100644 index 0000000..1ec91db --- /dev/null +++ b/tests/test_monitoring_area.py @@ -0,0 +1,142 @@ +"""Unit tests for the shared monitoring-area module (v0.10.2). + +Covers the four exports: + - ``MonitoringArea.as_box`` (shapely box construction) + - ``build_geom_json`` (all five Geo shapes the archive sees) + - ``classify_geom`` (the five verdicts) + - ``load_monitoring_area`` (DB read; None on missing-row / NULL-column) + +The classify_geom + as_box assertions are the v0.9.12 archive bbox tests +lifted out of ``test_archive_bbox_filter.py`` and expanded with edge cases. +""" + +import json + +import pytest +from unittest.mock import AsyncMock, MagicMock +from shapely.geometry import Polygon, box as shapely_box + +from central.monitoring_area import ( + MonitoringArea, + build_geom_json, + classify_geom, + load_monitoring_area, +) + +IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5) + + +def _pt(lon, lat): + return json.dumps({"type": "Point", "coordinates": [lon, lat]}) + + +class TestMonitoringAreaAsBox: + def test_as_box_returns_shapely_polygon(self): + b = IDAHO.as_box() + assert isinstance(b, Polygon) + + def test_as_box_corners_match_west_south_east_north(self): + # shapely box(minx, miny, maxx, maxy) -> envelope (west, south, east, north) + expected = shapely_box(-117.5, 41.8, -111.0, 44.5) + assert IDAHO.as_box().equals(expected) + + +class TestBuildGeomJson: + def test_none_input_returns_none(self): + assert build_geom_json(None) is None + + def test_empty_dict_returns_none(self): + assert build_geom_json({}) is None + + def test_full_geometry_wins_over_bbox(self): + # If a real geometry is present it MUST be used verbatim (this is the + # v0.9.8 wfigs/tomtom invariant -- the map needs the real shape). + geom = {"type": "LineString", "coordinates": [[-114, 43], [-115, 44]]} + out = build_geom_json({ + "geometry": geom, + "bbox": [-115, 43, -114, 44], + "centroid": [-114.5, 43.5], + }) + assert json.loads(out) == geom + + def test_bbox_rendered_as_closed_polygon(self): + out = build_geom_json({"bbox": [-117, 42, -111, 44]}) + parsed = json.loads(out) + assert parsed["type"] == "Polygon" + coords = parsed["coordinates"][0] + # 5 points: 4 corners + closing duplicate + assert len(coords) == 5 + assert coords[0] == coords[-1] + assert coords[0] == [-117, 42] + + def test_centroid_rendered_as_point(self): + out = build_geom_json({"centroid": [-114.5, 43.5]}) + assert json.loads(out) == {"type": "Point", "coordinates": [-114.5, 43.5]} + + def test_partial_bbox_falls_through(self): + # An invalid 3-element bbox should not produce a 3-corner polygon; + # caller is expected to fall through to centroid or return None. + assert build_geom_json({"bbox": [-117, 42, -111]}) is None + + def test_centroid_wins_when_bbox_invalid(self): + out = build_geom_json({"bbox": [-117, 42, -111], "centroid": [-114, 43]}) + assert json.loads(out) == {"type": "Point", "coordinates": [-114, 43]} + + +class TestClassifyGeom: + def test_null_geom_always_kept(self): + assert classify_geom(None, IDAHO) == "null-geom" + + def test_null_geom_kept_even_without_area(self): + assert classify_geom(None, None) == "null-geom" + + def test_no_area_keeps_everything(self): + assert classify_geom(_pt(-114.0, 43.5), None) == "no-area" + + def test_in_bounds_kept(self): + assert classify_geom(_pt(-114.0, 43.5), IDAHO) == "in-bounds" + + def test_out_of_bounds_dropped(self): + assert classify_geom(_pt(-74.0, 40.7), IDAHO) == "out-of-bounds" + + def test_border_straddling_polygon_kept(self): + # Spans the western edge (west=-117.5): partly out, partly in -> kept. + poly = json.dumps({ + "type": "Polygon", + "coordinates": [[[-119, 42], [-116, 42], [-116, 43], [-119, 43], [-119, 42]]], + }) + assert classify_geom(poly, IDAHO) == "in-bounds" + + def test_point_exactly_on_border_kept(self): + assert classify_geom(_pt(-117.5, 43.0), IDAHO) == "in-bounds" + + def test_unparseable_geom_keeps_failopen(self): + assert classify_geom("{not valid json", IDAHO) == "invalid-geom" + + def test_unknown_geom_type_keeps_failopen(self): + assert classify_geom(json.dumps({"type": "Nonsense"}), IDAHO) == "invalid-geom" + + +@pytest.mark.asyncio +class TestLoadMonitoringArea: + async def test_returns_area_when_all_columns_set(self): + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value={ + "monitor_north": 44.5, "monitor_south": 41.8, + "monitor_east": -111.0, "monitor_west": -117.5, + }) + area = await load_monitoring_area(conn) + assert area == IDAHO + + async def test_returns_none_when_no_row(self): + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value=None) + assert await load_monitoring_area(conn) is None + + async def test_returns_none_when_any_column_null(self): + conn = MagicMock() + conn.fetchrow = AsyncMock(return_value={ + "monitor_north": 44.5, "monitor_south": None, + "monitor_east": -111.0, "monitor_west": -117.5, + }) + assert await load_monitoring_area(conn) is None diff --git a/tests/test_supervisor_publish_filter.py b/tests/test_supervisor_publish_filter.py new file mode 100644 index 0000000..bb58145 --- /dev/null +++ b/tests/test_supervisor_publish_filter.py @@ -0,0 +1,186 @@ +"""Supervisor publish-time monitoring-area filter (v0.10.2). + +Covers the wire-up between ``subject_for`` and ``_publish_event`` in +``Supervisor._run_adapter_loop``: out-of-area drops, in-area publishes, +null-geom passes, invalid-geom fails open, and the refresh-loop reload. +""" +import asyncio +import logging +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central import supervisor as sup_mod +from central.config_models import AdapterConfig, EnrichmentConfig +from central.models import Event, Geo +from central.monitoring_area import MonitoringArea + +IDAHO = MonitoringArea(north=44.5, south=41.8, east=-111.0, west=-117.5) +SPOKANE = (-117.4, 47.6) +BOISE = (-114.0, 43.5) +NYC = (-74.0, 40.7) + + +def _ev(eid: str, geo: Geo) -> Event: + return Event( + id=eid, adapter="mock", category="mock.test", + time=datetime.now(timezone.utc), geo=geo, data={}, + ) + + +class _MockAdapter: + requires_api_key = None + enrichment_locations = () + + def __init__(self, config, *_args) -> None: + self.config = config + self.cadence_s = config.cadence_s + self.events: list[Event] = [] + self.published: set[str] = set() + self.done = asyncio.Event() + self._called = False + + async def startup(self): ... + async def shutdown(self): ... + async def apply_config(self, c): ... + + async def poll(self): + if not self._called: + self._called = True + for e in self.events: + yield e + self.done.set() + + def is_published(self, eid): return eid in self.published + def mark_published(self, eid): self.published.add(eid) + def bump_last_seen(self, eid): ... + def sweep_old_ids(self): return 0 + def subject_for(self, e): return f"central.test.{e.id}" + + +@pytest.fixture +def sup_factory(): + """Build a supervisor with mocked NATS + ConfigStore; spy on _publish_event.""" + def _build(area: MonitoringArea | None): + nc = AsyncMock(); js = AsyncMock(); js.publish = AsyncMock() + nc.jetstream = MagicMock(return_value=js) + store = MagicMock() + store.list_streams = AsyncMock(return_value=[]) + store.get_stream = AsyncMock(return_value=None) + store.set_adapter_last_error = AsyncMock() + store.get_api_key = AsyncMock(return_value=None) + store.get_monitoring_area = AsyncMock(return_value=area) + config_source = MagicMock() + config_source.get_enrichment_config = AsyncMock(return_value=EnrichmentConfig()) + sup = sup_mod.Supervisor( + config_source=config_source, config_store=store, + nats_url="nats://x:4222", cloudevents_config=None, + enrichment_config=EnrichmentConfig(), + ) + sup._nc = nc; sup._js = nc.jetstream() + sup._monitoring_area = area + sup._publish_event = AsyncMock() + sup._publish_meta = AsyncMock() + sup._adapters["mock"] = _MockAdapter + return sup + return _build + + +async def _drive(sup, events): + adapter = _MockAdapter(MagicMock(cadence_s=3600)) + adapter.events = events + config = AdapterConfig( + name="mock", enabled=True, cadence_s=3600, settings={}, + paused_at=None, updated_at=datetime.now(timezone.utc), + ) + state = sup_mod.AdapterState(name="mock", config=config, adapter=adapter) + task = asyncio.create_task(sup._run_adapter_loop(state)) + try: + await asyncio.wait_for(adapter.done.wait(), timeout=5.0) + await asyncio.sleep(0) + finally: + # Cancel ONLY this loop -- never touch sup._shutdown_event so callers + # can drive the same supervisor through multiple poll cycles. + task.cancel() + try: await task + except (asyncio.CancelledError, Exception): pass + return adapter + + +# --- verdict matrix ----------------------------------------------------------- + +@pytest.mark.asyncio +@pytest.mark.parametrize("area,geo,should_publish,should_drop", [ + pytest.param(None, Geo(centroid=NYC), True, False, id="no-area-publishes-out-of-bbox"), + pytest.param(IDAHO, Geo(centroid=BOISE), True, False, id="in-bounds-publishes"), + pytest.param(IDAHO, Geo(centroid=NYC), False, True, id="out-of-bounds-NY-drops"), + pytest.param(IDAHO, Geo(centroid=SPOKANE), False, True, id="out-of-bounds-Spokane-drops"), + pytest.param(IDAHO, Geo(), True, False, id="null-geom-publishes"), + pytest.param(IDAHO, Geo(geometry={"type": "Nonsense"}), True, False, id="invalid-geom-fail-open"), +]) +async def test_verdict(sup_factory, area, geo, should_publish, should_drop): + sup = sup_factory(area) + a = await _drive(sup, [_ev("e1", geo)]) + if should_publish: + assert sup._publish_event.await_count == 1 + assert a.published == {"e1"} + else: + sup._publish_event.assert_not_called() + # CRITICAL forward-only: drops MUST NOT mark_published, else widening + # the bbox can't re-deliver -- see [[feedback_dedup_forward_only]]. + assert a.published == set() + assert (sup._dropped_publish == {"mock": 1}) is should_drop + + +@pytest.mark.asyncio +async def test_mixed_batch_partitions_correctly(sup_factory): + sup = sup_factory(IDAHO) + a = await _drive(sup, [ + _ev("in1", Geo(centroid=BOISE)), + _ev("out1", Geo(centroid=NYC)), + _ev("null1", Geo()), + _ev("out2", Geo(centroid=SPOKANE)), + ]) + assert sup._publish_event.await_count == 2 + assert a.published == {"in1", "null1"} + assert sup._dropped_publish == {"mock": 2} + + +@pytest.mark.asyncio +async def test_widening_area_re_publishes_previously_dropped(sup_factory): + """Forward-only invariant: drops are reversible -- never mark_published.""" + sup = sup_factory(IDAHO) + spokane = _ev("spokane", Geo(centroid=SPOKANE)) + # The same supervisor drives two polls so the second sees the same id + # AFTER the bbox widens. Need a fresh adapter each call because _drive's + # MockAdapter signals done once per instance. + a1 = await _drive(sup, [spokane]) + sup._publish_event.assert_not_called() + assert a1.published == set() + sup._monitoring_area = MonitoringArea( + north=48.5, south=41.8, east=-111.0, west=-118.0 + ) + a2 = await _drive(sup, [spokane]) + assert sup._publish_event.await_count == 1 + assert a2.published == {"spokane"} + + +@pytest.mark.asyncio +async def test_refresh_loop_reloads_area_and_logs_summary( + sup_factory, caplog, monkeypatch +): + sup = sup_factory(None) + sup._dropped_publish = {"mock": 7} + monkeypatch.setattr(sup_mod, "MONITORING_AREA_REFRESH_S", 0.05) + sup._config_store.get_monitoring_area = AsyncMock(return_value=IDAHO) + with caplog.at_level(logging.INFO): + task = asyncio.create_task(sup._refresh_monitoring_area_loop()) + await asyncio.sleep(0.15) + sup._shutdown_event.set() + try: await asyncio.wait_for(task, timeout=1.0) + except (asyncio.TimeoutError, asyncio.CancelledError): task.cancel() + assert sup._monitoring_area == IDAHO + assert any( + "publish bbox filter drop summary" in r.message for r in caplog.records + ) diff --git a/tests/test_tomtom_flow.py b/tests/test_tomtom_flow.py index 957fe07..ccf0b93 100644 --- a/tests/test_tomtom_flow.py +++ b/tests/test_tomtom_flow.py @@ -17,7 +17,7 @@ import pytest from central.adapter import SourceAdapter from central.adapters.tomtom_flow import TomTomFlowAdapter -from central.archive import _build_geom_sql +from central.monitoring_area import build_geom_json from central.config_models import AdapterConfig from central.tomtom_flow_parse import ( _local_to_lonlat, @@ -83,10 +83,10 @@ def test_subject_for(): def test_archive_prefers_geo_geometry(): line = {"type": "LineString", "coordinates": [[-116.2, 43.6], [-116.1, 43.7]]} # geometry present -> returned verbatim (not bbox/centroid) - out = _build_geom_sql({"geometry": line, "centroid": [-116.2, 43.6], "bbox": [-116.3, 43.5, -116.0, 43.8]}) + out = build_geom_json({"geometry": line, "centroid": [-116.2, 43.6], "bbox": [-116.3, 43.5, -116.0, 43.8]}) assert json.loads(out) == line # no geometry -> falls back to centroid Point (regression guard) - out2 = _build_geom_sql({"centroid": [-116.2, 43.6]}) + out2 = build_geom_json({"centroid": [-116.2, 43.6]}) assert json.loads(out2)["type"] == "Point"