diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 5d7cbc0..014a74b 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -2,6 +2,7 @@ import csv import logging +import math import sqlite3 from collections.abc import AsyncIterator from datetime import datetime, timezone @@ -51,6 +52,56 @@ SEVERITY_MAP = { "low": 1, } +# 1° latitude ≈ 111 km everywhere; longitude scales by cos(lat). +_KM_PER_DEG_LAT = 111.0 +# Defensive clamp: keeps cos(lat) bounded away from 0 at the poles. FIRMS +# coverage stops well short of ±60°, so this only fires for malformed input. +_POLE_CLAMP_DEG = 89.0 + + +def _pixel_polygon( + lat: float, + lon: float, + scan: float | None, + track: float | None, +) -> dict[str, Any] | None: + """Build a rectangular GeoJSON Polygon for a FIRMS satellite pixel footprint. + + scan and track are pixel dimensions in km from the FIRMS CSV. scan is the + along-track (~latitude) extent, track is the cross-track (~longitude) + extent; the cross-track dimension is scaled by cos(lat) to convert km to + degrees of longitude. Pixels are typically ~0.5×0.66 km at nadir and grow + off-nadir. + + Returns None if scan or track is missing, non-numeric, or non-positive — + the caller falls back to centroid-only Geo. + """ + if scan is None or track is None: + return None + try: + scan_km = float(scan) + track_km = float(track) + except (TypeError, ValueError): + return None + if scan_km <= 0 or track_km <= 0: + return None + + clamped_lat = max(-_POLE_CLAMP_DEG, min(_POLE_CLAMP_DEG, lat)) + half_scan_deg = (scan_km / 2.0) / _KM_PER_DEG_LAT + half_track_deg = (track_km / 2.0) / ( + _KM_PER_DEG_LAT * math.cos(math.radians(clamped_lat)) + ) + + # CCW ring with the closing vertex duplicated (GeoJSON requirement). + ring = [ + [lon - half_track_deg, lat - half_scan_deg], + [lon + half_track_deg, lat - half_scan_deg], + [lon + half_track_deg, lat + half_scan_deg], + [lon - half_track_deg, lat + half_scan_deg], + [lon - half_track_deg, lat - half_scan_deg], + ] + return {"type": "Polygon", "coordinates": [ring]} + class FIRMSSettings(BaseModel): """Settings schema for FIRMS adapter.""" @@ -313,9 +364,15 @@ class FIRMSAdapter(SourceAdapter): # Build stable ID stable_id = self._build_stable_id(satellite, acq_date, acq_time, lat, lon) + # Construct the satellite pixel footprint as a GeoJSON Polygon from + # scan/track. Falls back to centroid-only Geo if either dimension is + # missing/invalid — _build_geom_sql then stores a Point. We keep + # centroid alongside geometry so consumers that read only centroid + # still work. + geometry = _pixel_polygon(lat, lon, row.get("scan"), row.get("track")) geo = Geo( centroid=(lon, lat), # GeoJSON order: lon, lat - bbox=(lon, lat, lon, lat), # Point bbox + geometry=geometry, regions=[], primary_region=None, ) @@ -353,6 +410,10 @@ class FIRMSAdapter(SourceAdapter): total_features = 0 total_new = 0 + # Count emitted records that fell back to centroid-only Geo because + # scan/track were missing. Warned once at the end of the poll cycle + # rather than per-record to avoid log spam. + total_missing_dims = 0 for satellite in self._satellites: url = self._build_url(satellite) @@ -379,6 +440,8 @@ class FIRMSAdapter(SourceAdapter): continue event = self._row_to_event(row, satellite) + if event.geo.geometry is None: + total_missing_dims += 1 yield event self.mark_published(stable_id) new_count += 1 @@ -400,6 +463,15 @@ class FIRMSAdapter(SourceAdapter): ) continue + if total_missing_dims: + logger.warning( + "FIRMS records missing scan/track — falling back to centroid-only Geo", + extra={ + "missing_count": total_missing_dims, + "total_new": total_new, + }, + ) + logger.info( "FIRMS poll completed", extra={ diff --git a/tests/test_firms.py b/tests/test_firms.py index aafead2..55f7d15 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -1,5 +1,7 @@ """Tests for FIRMS adapter.""" +import json +import math import pytest from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch @@ -10,8 +12,10 @@ from central.adapters.firms import ( FIRMSAdapter, CONFIDENCE_MAP, SATELLITE_SHORT, + _pixel_polygon, ) -from central.config_models import AdapterConfig, RegionConfig +from central.archive import _build_geom_sql +from central.config_models import AdapterConfig from central.models import Event, Geo @@ -224,9 +228,12 @@ class TestEventGeneration: rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") - # GeoJSON order: lon, lat + # GeoJSON order: lon, lat. v0.9.21: degenerate point-bbox was dropped; + # the pixel footprint now ships as geo.geometry (Polygon). assert event.geo.centroid == (-116.456, 45.123) - assert event.geo.bbox == (-116.456, 45.123, -116.456, 45.123) + assert event.geo.bbox is None + assert event.geo.geometry is not None + assert event.geo.geometry["type"] == "Polygon" class TestDeduplication: @@ -476,3 +483,123 @@ class TestEnrichmentIntegration: assert "_enriched" in event.data assert event.data["_enriched"]["geocoder"] == all_null_bundle() + + +class TestPixelPolygon: + """v0.9.21: rectangular GeoJSON Polygon from FIRMS scan/track.""" + + def test_polygon_geometry_constructed(self): + """Happy path: realistic VIIRS scan/track -> CCW Polygon with closing vertex.""" + # VIIRS at nadir: ~0.375km square; centroid should lie inside the ring. + geom = _pixel_polygon(lat=42.0, lon=-114.0, scan=0.375, track=0.375) + assert geom is not None + assert geom["type"] == "Polygon" + ring = geom["coordinates"][0] + # 4 corners + duplicated closing vertex + assert len(ring) == 5 + assert ring[0] == ring[-1], "ring must close on its first vertex" + # CCW: signed area of the ring should be positive (lon=x, lat=y). + signed_area = sum( + (ring[i][0] * ring[i + 1][1]) - (ring[i + 1][0] * ring[i][1]) + for i in range(len(ring) - 1) + ) + assert signed_area > 0, "ring must wind counter-clockwise" + # Centroid is inside (axis-aligned rectangle, so min/max bounds check it). + lons = [pt[0] for pt in ring] + lats = [pt[1] for pt in ring] + assert min(lons) < -114.0 < max(lons) + assert min(lats) < 42.0 < max(lats) + + def test_polygon_at_high_latitude(self): + """At lat=70°, longitude degrees stretch by 1/cos(70°) ≈ 2.92x.""" + low = _pixel_polygon(lat=0.0, lon=0.0, scan=0.5, track=0.5) + high = _pixel_polygon(lat=70.0, lon=0.0, scan=0.5, track=0.5) + assert low is not None and high is not None + + # half_track_deg at lat=70 should be ≈ 1/cos(70°) larger than at lat=0. + half_track_low = max(pt[0] for pt in low["coordinates"][0]) + half_track_high = max(pt[0] for pt in high["coordinates"][0]) + ratio = half_track_high / half_track_low + expected = 1.0 / math.cos(math.radians(70.0)) + assert ratio == pytest.approx(expected, rel=1e-6) + + # Latitude (scan) extent is independent of lat — same at both. + half_scan_low = max(pt[1] for pt in low["coordinates"][0]) + half_scan_high = max(pt[1] for pt in high["coordinates"][0]) - 70.0 + assert half_scan_low == pytest.approx(half_scan_high, rel=1e-9) + + def test_pole_clamp(self): + """lat=89.99 must clamp to 89.0 instead of divide-by-near-zero.""" + # Without clamping, cos(89.99°) ≈ 1.7e-4, blowing half_track_deg up + # to ~3000 deg. With the clamp at 89.0, cos(89°) ≈ 0.01745 keeps the + # polygon finite. + geom = _pixel_polygon(lat=89.99, lon=10.0, scan=0.5, track=0.5) + assert geom is not None + ring = geom["coordinates"][0] + half_track = max(pt[0] for pt in ring) - 10.0 + expected = (0.5 / 2.0) / (111.0 * math.cos(math.radians(89.0))) + assert half_track == pytest.approx(expected, rel=1e-9) + + def test_missing_scan_falls_back_to_point(self): + """scan=None must return None geometry (caller falls back to centroid).""" + assert _pixel_polygon(42.0, -114.0, None, 0.5) is None + assert _pixel_polygon(42.0, -114.0, 0.5, None) is None + + def test_invalid_dims_fall_back(self): + """Non-numeric / non-positive scan or track must return None.""" + assert _pixel_polygon(42.0, -114.0, "not-a-number", 0.5) is None + assert _pixel_polygon(42.0, -114.0, 0.0, 0.5) is None + assert _pixel_polygon(42.0, -114.0, -0.5, 0.5) is None + + +class TestGeoGeometryRoundTripsThroughArchive: + """Regression guard: FIRMS geo.geometry must reach _build_geom_sql as Polygon.""" + + @pytest.mark.asyncio + async def test_geo_geometry_round_trips_through_archive_path( + self, temp_db_path, mock_config_store + ): + """Event with scan/track produces a Polygon SQL clause (mirrors tomtom_flow).""" + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") + + # Simulate what archive does: serialize geo to dict and run it through + # the same helper that produces the PostGIS geom clause. + geo_dict = event.geo.model_dump() + sql_clause = _build_geom_sql(geo_dict) + assert sql_clause is not None + decoded = json.loads(sql_clause) + assert decoded["type"] == "Polygon" + # Real footprint, not a degenerate point — first and third corners + # bracket the centroid in both axes. + ring = decoded["coordinates"][0] + assert ring[0] != ring[2] + + @pytest.mark.asyncio + async def test_missing_dims_round_trips_as_point( + self, temp_db_path, mock_config_store + ): + """Without scan/track, archive still gets a usable Point geometry.""" + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT") + # Strip scan/track to force fallback path. + rows[0]["scan"] = None + rows[0]["track"] = None + event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT") + assert event.geo.geometry is None + + geo_dict = event.geo.model_dump() + sql_clause = _build_geom_sql(geo_dict) + assert sql_clause is not None + assert json.loads(sql_clause)["type"] == "Point"