v0.9.21: FIRMS satellite pixel polygons via Geo.geometry (#84)

FIRMS hotspots rendered as tiny dots on /events because the adapter shipped
`bbox=(lon, lat, lon, lat)` and no `geometry`. `_build_geom_sql` collapsed
that into a 5-vertex zero-area `ST_Polygon`. Fix: construct a real rectangular
GeoJSON Polygon from `(latitude, longitude, scan, track)` per record and ship
it via `Geo.geometry`, mirroring the v0.9.3 pattern used by `tomtom_flow`.

- `_pixel_polygon()` builds a CCW 4-corner ring with the closing vertex
  duplicated. Cross-track (longitude) extent scales by `cos(lat)`. Latitude
  is pole-clamped to ±89° defensively.
- `_row_to_event` drops the degenerate bbox and sets `geo.geometry`;
  centroid stays alongside for consumers that read centroid only.
- `poll()` emits a single per-cycle warn if any record fell back to
  centroid-only Geo (missing scan/track).
- Forward-only at the geometry layer: same dedup keys, new payload shape
  for new records only. Existing PostGIS rows retain their degenerate
  polygons until republished or aged out (48h dedup window).
- Side-quest: removed a pre-existing dead RegionConfig import in
  tests/test_firms.py since the imports block was touched.

Tests: 5 new `_pixel_polygon` unit tests + 2 archive round-trip regression
guards mirroring `test_archive_prefers_geo_geometry`. Full suite 930 passed
/ 1 skipped under both `central` and `zvx` users.

CAVEAT — axis mapping convention: the `_pixel_polygon` docstring describes
`scan` as the along-track (latitude) extent and `track` as the cross-track
(longitude) extent. This matches the v0.9.21 prompt and is internally
consistent. NASA FIRMS convention may have these inverted — `scan` is often
documented as the cross-track scan width and `track` as the along-track
extent. If deploy-time visual inspection shows polygons rotated 90° from
expected, swap the assignments inside `_pixel_polygon`:

    half_scan_deg → multiply by cos(lat) (treat as longitude offset)
    half_track_deg → divide by 111.0 only (treat as latitude offset)

Clickability is the first-order goal and is unaffected — the polygon will
still have non-zero area in both axes either way; only the aspect ratio
flips. Tracked as a fast-follow if visual inspection requires it.

PR #84.
This commit is contained in:
malice 2026-06-03 20:51:31 -06:00 committed by GitHub
commit 1f7bccaac6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 203 additions and 4 deletions

View file

@ -2,6 +2,7 @@
import csv
import logging
import math
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
@ -51,6 +52,56 @@ SEVERITY_MAP = {
"low": 1,
}
# 1° latitude ≈ 111 km everywhere; longitude scales by cos(lat).
_KM_PER_DEG_LAT = 111.0
# Defensive clamp: keeps cos(lat) bounded away from 0 at the poles. FIRMS
# coverage stops well short of ±60°, so this only fires for malformed input.
_POLE_CLAMP_DEG = 89.0
def _pixel_polygon(
lat: float,
lon: float,
scan: float | None,
track: float | None,
) -> dict[str, Any] | None:
"""Build a rectangular GeoJSON Polygon for a FIRMS satellite pixel footprint.
scan and track are pixel dimensions in km from the FIRMS CSV. scan is the
along-track (~latitude) extent, track is the cross-track (~longitude)
extent; the cross-track dimension is scaled by cos(lat) to convert km to
degrees of longitude. Pixels are typically ~0.5×0.66 km at nadir and grow
off-nadir.
Returns None if scan or track is missing, non-numeric, or non-positive
the caller falls back to centroid-only Geo.
"""
if scan is None or track is None:
return None
try:
scan_km = float(scan)
track_km = float(track)
except (TypeError, ValueError):
return None
if scan_km <= 0 or track_km <= 0:
return None
clamped_lat = max(-_POLE_CLAMP_DEG, min(_POLE_CLAMP_DEG, lat))
half_scan_deg = (scan_km / 2.0) / _KM_PER_DEG_LAT
half_track_deg = (track_km / 2.0) / (
_KM_PER_DEG_LAT * math.cos(math.radians(clamped_lat))
)
# CCW ring with the closing vertex duplicated (GeoJSON requirement).
ring = [
[lon - half_track_deg, lat - half_scan_deg],
[lon + half_track_deg, lat - half_scan_deg],
[lon + half_track_deg, lat + half_scan_deg],
[lon - half_track_deg, lat + half_scan_deg],
[lon - half_track_deg, lat - half_scan_deg],
]
return {"type": "Polygon", "coordinates": [ring]}
class FIRMSSettings(BaseModel):
"""Settings schema for FIRMS adapter."""
@ -313,9 +364,15 @@ class FIRMSAdapter(SourceAdapter):
# Build stable ID
stable_id = self._build_stable_id(satellite, acq_date, acq_time, lat, lon)
# Construct the satellite pixel footprint as a GeoJSON Polygon from
# scan/track. Falls back to centroid-only Geo if either dimension is
# missing/invalid — _build_geom_sql then stores a Point. We keep
# centroid alongside geometry so consumers that read only centroid
# still work.
geometry = _pixel_polygon(lat, lon, row.get("scan"), row.get("track"))
geo = Geo(
centroid=(lon, lat), # GeoJSON order: lon, lat
bbox=(lon, lat, lon, lat), # Point bbox
geometry=geometry,
regions=[],
primary_region=None,
)
@ -353,6 +410,10 @@ class FIRMSAdapter(SourceAdapter):
total_features = 0
total_new = 0
# Count emitted records that fell back to centroid-only Geo because
# scan/track were missing. Warned once at the end of the poll cycle
# rather than per-record to avoid log spam.
total_missing_dims = 0
for satellite in self._satellites:
url = self._build_url(satellite)
@ -379,6 +440,8 @@ class FIRMSAdapter(SourceAdapter):
continue
event = self._row_to_event(row, satellite)
if event.geo.geometry is None:
total_missing_dims += 1
yield event
self.mark_published(stable_id)
new_count += 1
@ -400,6 +463,15 @@ class FIRMSAdapter(SourceAdapter):
)
continue
if total_missing_dims:
logger.warning(
"FIRMS records missing scan/track — falling back to centroid-only Geo",
extra={
"missing_count": total_missing_dims,
"total_new": total_new,
},
)
logger.info(
"FIRMS poll completed",
extra={

View file

@ -1,5 +1,7 @@
"""Tests for FIRMS adapter."""
import json
import math
import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
@ -10,8 +12,10 @@ from central.adapters.firms import (
FIRMSAdapter,
CONFIDENCE_MAP,
SATELLITE_SHORT,
_pixel_polygon,
)
from central.config_models import AdapterConfig, RegionConfig
from central.archive import _build_geom_sql
from central.config_models import AdapterConfig
from central.models import Event, Geo
@ -224,9 +228,12 @@ class TestEventGeneration:
rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT")
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
# GeoJSON order: lon, lat
# GeoJSON order: lon, lat. v0.9.21: degenerate point-bbox was dropped;
# the pixel footprint now ships as geo.geometry (Polygon).
assert event.geo.centroid == (-116.456, 45.123)
assert event.geo.bbox == (-116.456, 45.123, -116.456, 45.123)
assert event.geo.bbox is None
assert event.geo.geometry is not None
assert event.geo.geometry["type"] == "Polygon"
class TestDeduplication:
@ -476,3 +483,123 @@ class TestEnrichmentIntegration:
assert "_enriched" in event.data
assert event.data["_enriched"]["geocoder"] == all_null_bundle()
class TestPixelPolygon:
"""v0.9.21: rectangular GeoJSON Polygon from FIRMS scan/track."""
def test_polygon_geometry_constructed(self):
"""Happy path: realistic VIIRS scan/track -> CCW Polygon with closing vertex."""
# VIIRS at nadir: ~0.375km square; centroid should lie inside the ring.
geom = _pixel_polygon(lat=42.0, lon=-114.0, scan=0.375, track=0.375)
assert geom is not None
assert geom["type"] == "Polygon"
ring = geom["coordinates"][0]
# 4 corners + duplicated closing vertex
assert len(ring) == 5
assert ring[0] == ring[-1], "ring must close on its first vertex"
# CCW: signed area of the ring should be positive (lon=x, lat=y).
signed_area = sum(
(ring[i][0] * ring[i + 1][1]) - (ring[i + 1][0] * ring[i][1])
for i in range(len(ring) - 1)
)
assert signed_area > 0, "ring must wind counter-clockwise"
# Centroid is inside (axis-aligned rectangle, so min/max bounds check it).
lons = [pt[0] for pt in ring]
lats = [pt[1] for pt in ring]
assert min(lons) < -114.0 < max(lons)
assert min(lats) < 42.0 < max(lats)
def test_polygon_at_high_latitude(self):
"""At lat=70°, longitude degrees stretch by 1/cos(70°) ≈ 2.92x."""
low = _pixel_polygon(lat=0.0, lon=0.0, scan=0.5, track=0.5)
high = _pixel_polygon(lat=70.0, lon=0.0, scan=0.5, track=0.5)
assert low is not None and high is not None
# half_track_deg at lat=70 should be ≈ 1/cos(70°) larger than at lat=0.
half_track_low = max(pt[0] for pt in low["coordinates"][0])
half_track_high = max(pt[0] for pt in high["coordinates"][0])
ratio = half_track_high / half_track_low
expected = 1.0 / math.cos(math.radians(70.0))
assert ratio == pytest.approx(expected, rel=1e-6)
# Latitude (scan) extent is independent of lat — same at both.
half_scan_low = max(pt[1] for pt in low["coordinates"][0])
half_scan_high = max(pt[1] for pt in high["coordinates"][0]) - 70.0
assert half_scan_low == pytest.approx(half_scan_high, rel=1e-9)
def test_pole_clamp(self):
"""lat=89.99 must clamp to 89.0 instead of divide-by-near-zero."""
# Without clamping, cos(89.99°) ≈ 1.7e-4, blowing half_track_deg up
# to ~3000 deg. With the clamp at 89.0, cos(89°) ≈ 0.01745 keeps the
# polygon finite.
geom = _pixel_polygon(lat=89.99, lon=10.0, scan=0.5, track=0.5)
assert geom is not None
ring = geom["coordinates"][0]
half_track = max(pt[0] for pt in ring) - 10.0
expected = (0.5 / 2.0) / (111.0 * math.cos(math.radians(89.0)))
assert half_track == pytest.approx(expected, rel=1e-9)
def test_missing_scan_falls_back_to_point(self):
"""scan=None must return None geometry (caller falls back to centroid)."""
assert _pixel_polygon(42.0, -114.0, None, 0.5) is None
assert _pixel_polygon(42.0, -114.0, 0.5, None) is None
def test_invalid_dims_fall_back(self):
"""Non-numeric / non-positive scan or track must return None."""
assert _pixel_polygon(42.0, -114.0, "not-a-number", 0.5) is None
assert _pixel_polygon(42.0, -114.0, 0.0, 0.5) is None
assert _pixel_polygon(42.0, -114.0, -0.5, 0.5) is None
class TestGeoGeometryRoundTripsThroughArchive:
"""Regression guard: FIRMS geo.geometry must reach _build_geom_sql as Polygon."""
@pytest.mark.asyncio
async def test_geo_geometry_round_trips_through_archive_path(
self, temp_db_path, mock_config_store
):
"""Event with scan/track produces a Polygon SQL clause (mirrors tomtom_flow)."""
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT")
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
# Simulate what archive does: serialize geo to dict and run it through
# the same helper that produces the PostGIS geom clause.
geo_dict = event.geo.model_dump()
sql_clause = _build_geom_sql(geo_dict)
assert sql_clause is not None
decoded = json.loads(sql_clause)
assert decoded["type"] == "Polygon"
# Real footprint, not a degenerate point — first and third corners
# bracket the centroid in both axes.
ring = decoded["coordinates"][0]
assert ring[0] != ring[2]
@pytest.mark.asyncio
async def test_missing_dims_round_trips_as_point(
self, temp_db_path, mock_config_store
):
"""Without scan/track, archive still gets a usable Point geometry."""
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
rows = adapter._parse_csv(SAMPLE_CSV, "VIIRS_SNPP_NRT")
# Strip scan/track to force fallback path.
rows[0]["scan"] = None
rows[0]["track"] = None
event = adapter._row_to_event(rows[0], "VIIRS_SNPP_NRT")
assert event.geo.geometry is None
geo_dict = event.geo.model_dump()
sql_clause = _build_geom_sql(geo_dict)
assert sql_clause is not None
assert json.loads(sql_clause)["type"] == "Point"