diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index beeac1d..0526acb 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -1870,6 +1870,38 @@ at parameter `00060`, gage height (ft) at `00065`, water temperature (°C) at \ --- +### satpass_predict — server-side satellite pass alerts (v0.11.1) + +- **Source:** the `events` table itself — reads the latest TLE per `norad_id` + emitted by `celestrak_tle` within the last 14 days, then propagates each + one with SGP4 against every configured fixed observer. +- **Stream:** `CENTRAL_SAT` (same stream as TLEs; v0.11.1 extends + `STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"]` to `("tle", "pass")`). +- **Subject:** `central.sat.pass.us..` — one + subject per observer. Multiple satellites passing the same observer + collapse to the same subject; the category-discriminated `Nats-Msg-Id` + (v0.10.8) keeps each pass distinct in JetStream's dedup window. +- **Dedup key shape:** `::` — re-running + the same poll within an hour computes the same passes and dedups; new + TLEs landing between polls produce slightly different propagation paths + and hence different AOS times, naturally triggering republishes. +- **Severity bucket** from peak elevation: `>=60°` = 4 (zenith pass), + `>=30°` = 3 (high), `>=10°` = 2 (low; default gate threshold). +- **Geo:** `centroid = (observer.lon, observer.lat)` so the GUI map plots + the alert at the observer point, not at the satellite track. +- **Event.data fields:** `observer_name`, `observer_slug`, `observer_state`, + `norad_id`, `satellite_name`, `aos_time`, `los_time`, `peak_time`, + `max_elevation_deg`, `azimuth_at_aos`, `azimuth_at_los`, `duration_s`, + `tle_epoch` (the TLE epoch used for this prediction). +- **Cadence:** 1h. The adapter recomputes the 24h horizon every hour; + new TLEs landing between polls are picked up at the next poll. +- **Empty-TLE behaviour:** if no `celestrak_tle` events are in the table + (adapter still disabled, or hasn't polled yet), the adapter logs at + INFO and yields zero events — no exception. + +\ +--- + ## 7. Fall-off / removal semantics Central adapters fall into three buckets for handling upstream events that diff --git a/pyproject.toml b/pyproject.toml index 22c50a7..a9d4cad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "shapely>=2.0", "tenacity>=9.1.4", "uvicorn[standard]>=0.34.0", + "sgp4>=2.25", ] [project.scripts] diff --git a/sql/migrations/038_add_satpass_predict_adapter.sql b/sql/migrations/038_add_satpass_predict_adapter.sql new file mode 100644 index 0000000..d8c8581 --- /dev/null +++ b/sql/migrations/038_add_satpass_predict_adapter.sql @@ -0,0 +1,35 @@ +-- Migration 038: register satpass_predict adapter row (v0.11.1) +-- +-- Server-side complement to meshAI's per-user client-side pass computation. +-- Reads the latest TLE per norad_id from the events table (celestrak_tle +-- adapter, v0.11.0) and emits one Event per (observer, satellite, AOS) +-- tuple within a 24h horizon. Publishes on the existing CENTRAL_SAT +-- stream via the supervisor's STREAM_CATEGORY_DOMAINS extension +-- ("CENTRAL_SAT": ("tle", "pass")) -- no new stream is needed and +-- migration does NOT touch config.streams. +-- +-- Ships disabled (`enabled=false`) -- operator-configures observers via +-- GUI, then enables. Default settings include one Treasure Valley observer +-- as a worked example operators can edit/extend in place. +-- +-- Cadence 3600s (1h): the adapter recomputes the 24h horizon every hour. +-- New TLEs landing between polls are naturally picked up at the next poll. +-- +-- Idempotent: ON CONFLICT (name) DO NOTHING preserves any operator-tuned +-- state (settings changed by hand, enabled flag flipped, cadence override). + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'satpass_predict', + false, + 3600, + '{ + "observers": [ + {"name": "Treasure Valley", "slug": "treasure-valley", + "state": "ID", "lat": 43.6, "lon": -116.2, "elev_m": 0} + ], + "min_elevation_deg": 10, + "horizon_hours": 24 + }'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/satpass_predict.py b/src/central/adapters/satpass_predict.py new file mode 100644 index 0000000..d2f9662 --- /dev/null +++ b/src/central/adapters/satpass_predict.py @@ -0,0 +1,433 @@ +"""Satellite pass predictor — server-side complement to client-side satpass. + +Polls the ``events`` table for the latest TLE per ``norad_id`` (within the last +14 days), then propagates each one with SGP4 against every configured fixed +observer over a 24-hour horizon. Emits one Event per upcoming pass per +(observer, satellite) tuple. Dedup id is ``{observer_slug}:{norad_id}:{aos_iso}`` +so re-running the same poll within the hour produces identical ids and is +swallowed by the dedup mixin; new TLEs landing between polls produce slightly +different propagation paths and hence different AOS times, naturally triggering +a republish. + +Severity bucket from peak elevation: + + >= 60° (zenith) -> 4 + >= 30° (high) -> 3 + >= 10° (low) -> 2 + < 10° -> 1 (gated: not emitted) + +Subject: ``central.sat.pass.us..`` -- one subject per +observer. Multiple satellites passing the same observer collapse to the same +subject; the dedup-discriminated Nats-Msg-Id (v0.10.8: ``id:category``) keeps +each pass distinct in JetStream's dedup window. + +Math: SGP4 propagation gives ECI; we rotate to ECEF via GMST (Vallado mean +sidereal formula) then to topocentric east-north-up using the observer's +geodetic position (spherical earth, 6378.137 km equatorial radius -- fine for +horizon/elevation determination, error << 0.1° in azimuth). Pass detection +walks a 60-second grid looking for elevation-crossing events at the configured +``min_elevation_deg`` threshold. +""" + +from __future__ import annotations + +import logging +import math +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +from pydantic import BaseModel +from sgp4.api import Satrec, jday + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +# Earth equatorial radius (WGS-84). Used for observer ECEF position; we treat +# the earth as spherical for topocentric look-angle math -- ellipsoidal effects +# matter for centimeter-level GPS work but are well below 0.1° in elevation, +# more than enough for "is the satellite above the horizon" decisions. +_EARTH_RADIUS_KM = 6378.137 + +_PASS_STEP_S = 60 # 60-second grid for elevation sampling +_DEDUP_DDL = ( + "CREATE TABLE IF NOT EXISTS published_ids (" + "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " + "first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" +) + +_LATEST_TLES_SQL = """ +SELECT DISTINCT ON (payload->'data'->'data'->>'norad_id') + (payload->'data'->'data'->>'norad_id')::int AS norad_id, + payload->'data'->'data'->>'satellite_name' AS satellite_name, + payload->'data'->'data'->>'tle_line1' AS tle_line1, + payload->'data'->'data'->>'tle_line2' AS tle_line2, + payload->'data'->'data'->>'epoch' AS tle_epoch +FROM events +WHERE adapter = 'celestrak_tle' + AND time > now() - interval '14 days' +ORDER BY payload->'data'->'data'->>'norad_id', time DESC +""" + + +# --- Pure math helpers (no I/O) --------------------------------------------- + + +def _gmst_rad(jd: float, fr: float) -> float: + """Greenwich Mean Sidereal Time in radians (Vallado, simplified). + + Accurate to within milliseconds for any post-1900 epoch -- plenty for + horizon/elevation work. + """ + t = (jd + fr - 2451545.0) / 36525.0 + gmst_sec = ( + 67310.54841 + + (876600.0 * 3600.0 + 8640184.812866) * t + + 0.093104 * t * t + - 6.2e-6 * t * t * t + ) + return (gmst_sec % 86400.0) * (2.0 * math.pi / 86400.0) + + +def _eci_to_ecef(pos_eci_km: tuple[float, float, float], theta: float) -> tuple[float, float, float]: + """Rotate ECI coordinates to ECEF by GMST angle theta (radians).""" + x, y, z = pos_eci_km + ct = math.cos(theta) + st = math.sin(theta) + return (ct * x + st * y, -st * x + ct * y, z) + + +def _observer_ecef(lat_deg: float, lon_deg: float, elev_m: float) -> tuple[float, float, float]: + """Observer position in ECEF km (spherical earth, sub-0.1° precision).""" + lat_r = math.radians(lat_deg) + lon_r = math.radians(lon_deg) + r = _EARTH_RADIUS_KM + elev_m / 1000.0 + return ( + r * math.cos(lat_r) * math.cos(lon_r), + r * math.cos(lat_r) * math.sin(lon_r), + r * math.sin(lat_r), + ) + + +def _topocentric_az_el( + sat_ecef_km: tuple[float, float, float], + obs_ecef_km: tuple[float, float, float], + obs_lat_deg: float, + obs_lon_deg: float, +) -> tuple[float, float]: + """Return ``(azimuth_deg, elevation_deg)`` from observer to satellite. + + Azimuth measured from north, clockwise (0 = N, 90 = E). Elevation is the + angle above the local horizon (0 = horizon, 90 = zenith, negative = below). + """ + dx = sat_ecef_km[0] - obs_ecef_km[0] + dy = sat_ecef_km[1] - obs_ecef_km[1] + dz = sat_ecef_km[2] - obs_ecef_km[2] + + lat_r = math.radians(obs_lat_deg) + lon_r = math.radians(obs_lon_deg) + sl, cl = math.sin(lat_r), math.cos(lat_r) + slo, clo = math.sin(lon_r), math.cos(lon_r) + + east = -slo * dx + clo * dy + north = -sl * clo * dx - sl * slo * dy + cl * dz + up = cl * clo * dx + cl * slo * dy + sl * dz + + horizontal = math.sqrt(east * east + north * north) + elevation = math.degrees(math.atan2(up, horizontal)) + azimuth = math.degrees(math.atan2(east, north)) % 360.0 + return azimuth, elevation + + +def _elev_at( + sat: Satrec, + t: datetime, + obs_ecef_km: tuple[float, float, float], + obs_lat_deg: float, + obs_lon_deg: float, +) -> tuple[float, float] | None: + """Compute ``(azimuth_deg, elevation_deg)`` at instant ``t`` (UTC datetime). + + Returns ``None`` if SGP4 reports a propagation error (decayed orbit, bad + epoch, etc.). + """ + jd, fr = jday(t.year, t.month, t.day, t.hour, t.minute, t.second + t.microsecond / 1e6) + err, pos_eci, _ = sat.sgp4(jd, fr) + if err: + return None + sat_ecef = _eci_to_ecef(pos_eci, _gmst_rad(jd, fr)) + return _topocentric_az_el(sat_ecef, obs_ecef_km, obs_lat_deg, obs_lon_deg) + + +def _severity_from_elev(max_elev_deg: float) -> int: + """Pass severity per the v0.11.1 bucketing rule.""" + if max_elev_deg >= 60.0: + return 4 + if max_elev_deg >= 30.0: + return 3 + if max_elev_deg >= 10.0: + return 2 + return 1 # below the gate; never emitted in practice + + +def _next_passes( + tle_line1: str, + tle_line2: str, + observer: "Observer", + ref_time: datetime, + horizon_hours: float, + min_elevation_deg: float, +) -> list[dict[str, Any]]: + """Walk a 60-second grid; return all passes >= min_elevation_deg in window.""" + try: + sat = Satrec.twoline2rv(tle_line1, tle_line2) + except Exception: + return [] + obs_ecef = _observer_ecef(observer.lat, observer.lon, observer.elev_m) + + passes: list[dict[str, Any]] = [] + in_pass = False + aos_t: datetime | None = None + aos_az: float | None = None + peak_t: datetime | None = None + peak_e: float = -180.0 + peak_az: float | None = None + + t = ref_time + end = ref_time + timedelta(hours=horizon_hours) + step = timedelta(seconds=_PASS_STEP_S) + while t < end: + sample = _elev_at(sat, t, obs_ecef, observer.lat, observer.lon) + if sample is None: + t += step + continue + az, e = sample + if e >= min_elevation_deg: + if not in_pass: + in_pass = True + aos_t = t + aos_az = az + peak_t = t + peak_e = e + peak_az = az + elif e > peak_e: + peak_t = t + peak_e = e + peak_az = az + elif in_pass: + # threshold-crossing on the way down -> close the pass + passes.append({ + "aos": aos_t, "aos_az": aos_az, + "peak": peak_t, "peak_az": peak_az, "max_elev_deg": peak_e, + "los": t, "los_az": az, + }) + in_pass = False + aos_t = aos_az = peak_t = peak_az = None + peak_e = -180.0 + t += step + + # Pass still in progress at the horizon edge -- close it at the boundary. + if in_pass and aos_t and peak_t: + passes.append({ + "aos": aos_t, "aos_az": aos_az, + "peak": peak_t, "peak_az": peak_az, "max_elev_deg": peak_e, + "los": end, "los_az": None, + }) + + return passes + + +# --- Settings + adapter ----------------------------------------------------- + + +class Observer(BaseModel): + """Fixed observer location for server-side pass prediction.""" + + name: str + slug: str + state: str + lat: float + lon: float + elev_m: float = 0.0 + + +class SatpassPredictSettings(BaseModel): + """Per-observer list + threshold + horizon. Default observer ships disabled + until the operator edits the list to their site(s).""" + + observers: list[Observer] = [ + Observer(name="Treasure Valley", slug="treasure-valley", + state="ID", lat=43.6, lon=-116.2, elev_m=0.0), + ] + min_elevation_deg: float = 10.0 + horizon_hours: int = 24 + + +class SatpassPredictAdapter(SourceAdapter): + """Server-side satellite pass alerts for fixed observers.""" + + name = "satpass_predict" + display_name = "Satellite Pass Predictions" + description = ( + "Predicts upcoming satellite passes over fixed observer locations " + "by propagating the latest TLE for each NORAD ID via SGP4. Reads " + "TLEs from the events table (celestrak_tle adapter); emits one " + "Event per (observer, satellite, AOS) tuple within a 24h horizon." + ) + settings_schema = SatpassPredictSettings + requires_api_key = None + wizard_order = None + default_cadence_s = 3600 # 1h + data_class = "event" + enrichment_locations = [] + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._db: sqlite3.Connection | None = None + self._apply_settings(config.settings or {}) + + def _apply_settings(self, settings: dict[str, Any]) -> None: + raw_observers = settings.get("observers") or [] + self._observers: list[Observer] = [ + o if isinstance(o, Observer) else Observer(**o) for o in raw_observers + ] + self._min_elev: float = float(settings.get("min_elevation_deg") or 10.0) + self._horizon_h: float = float(settings.get("horizon_hours") or 24) + + async def startup(self) -> None: + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(_DEDUP_DDL) + self._db.execute( + "CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)" + ) + self._db.commit() + logger.info( + "satpass_predict adapter started", + extra={ + "observers": [o.slug for o in self._observers], + "min_elevation_deg": self._min_elev, + "horizon_hours": self._horizon_h, + }, + ) + + async def shutdown(self) -> None: + if self._db: + self._db.close() + self._db = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + self._apply_settings(new_config.settings or {}) + logger.info( + "satpass_predict config updated", + extra={ + "observers": [o.slug for o in self._observers], + "min_elevation_deg": self._min_elev, + "horizon_hours": self._horizon_h, + }, + ) + + async def _fetch_latest_tles(self) -> list[dict[str, Any]]: + """Return rows: ``{norad_id, satellite_name, tle_line1, tle_line2, tle_epoch}``. + + Empty list if no TLEs in the events table within the 14-day window. + Never raises -- caller handles empty. + """ + pool = self._config_store.get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch(_LATEST_TLES_SQL) + return [dict(r) for r in rows] + + def _pass_to_event( + self, + p: dict[str, Any], + row: dict[str, Any], + observer: Observer, + ) -> Event: + aos: datetime = p["aos"] + return Event( + id=f"{observer.slug}:{row['norad_id']}:{aos.isoformat()}", + adapter=self.name, + category="pass.satpass_predict", + time=p["peak"], + severity=_severity_from_elev(p["max_elev_deg"]), + geo=Geo( + centroid=(observer.lon, observer.lat), + regions=[f"US-{observer.state}"], + primary_region=f"US-{observer.state}", + ), + data={ + "observer_name": observer.name, + "observer_slug": observer.slug, + "observer_state": observer.state, + "norad_id": row["norad_id"], + "satellite_name": row["satellite_name"], + "aos_time": aos.isoformat(), + "los_time": p["los"].isoformat() if p["los"] else None, + "peak_time": p["peak"].isoformat(), + "max_elevation_deg": round(p["max_elev_deg"], 2), + "azimuth_at_aos": round(p["aos_az"], 1) if p["aos_az"] is not None else None, + "azimuth_at_los": round(p["los_az"], 1) if p["los_az"] is not None else None, + "duration_s": (p["los"] - aos).total_seconds() if p["los"] else None, + "tle_epoch": row["tle_epoch"], + }, + ) + + async def poll(self) -> AsyncIterator[Event]: + if not self._observers: + logger.info("satpass_predict: no observers configured; nothing to predict") + return + rows = await self._fetch_latest_tles() + if not rows: + logger.info( + "satpass_predict: no TLEs available; nothing to predict " + "(is celestrak_tle enabled and has it polled at least once?)" + ) + return + + ref_time = datetime.now(timezone.utc) + yielded = 0 + for observer in self._observers: + for row in rows: + try: + passes = _next_passes( + row["tle_line1"], row["tle_line2"], observer, + ref_time, self._horizon_h, self._min_elev, + ) + except Exception: + logger.exception( + "satpass_predict pass computation failed", + extra={"norad_id": row["norad_id"], "observer": observer.slug}, + ) + continue + for p in passes: + yield self._pass_to_event(p, row, observer) + yielded += 1 + + self.sweep_old_ids() + logger.info( + "satpass_predict poll completed", + extra={ + "observers": [o.slug for o in self._observers], + "tles_considered": len(rows), + "events_yielded": yielded, + }, + ) + + def subject_for(self, event: Event) -> str: + state = (event.data.get("observer_state") or "").lower() or "unknown" + slug = event.data.get("observer_slug") or "unknown" + return f"central.sat.pass.us.{state}.{slug}" diff --git a/src/central/config_store.py b/src/central/config_store.py index 0044845..cb949df 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -50,6 +50,18 @@ class ConfigStore: """Close the connection pool.""" await self._pool.close() + def get_pool(self) -> asyncpg.Pool: + """Return the underlying connection pool. + + v0.11.1: introduced for the ``satpass_predict`` adapter which needs + ad-hoc reads against the ``events`` table to fetch the latest TLE + per ``norad_id`` (the supervisor's adapter scheduler doesn't pipe + Postgres access through any of the existing ConfigStore methods). + Single-method-surface escape hatch -- adapters acquire from the + pool themselves, no celestrak/tle-specific shape leaks into + ConfigStore.""" + return self._pool + # ------------------------------------------------------------------------- # System configuration # ------------------------------------------------------------------------- diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index d0a7671..af51b52 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2938,7 +2938,7 @@ DEFAULT_TIME = "last_24h" ADAPTER_GROUPS = { "Disasters": ["gdacs", "firms", "inciweb", "wfigs_incidents", "wfigs_perimeters"], "Weather": ["nws"], - "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons", "celestrak_tle"], + "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons", "celestrak_tle", "satpass_predict"], "Geophysical": ["usgs_quake", "nwis"], "Earth Observation": ["eonet"], "Transportation": ["wzdx", "tomtom_flow", "tomtom_incidents", "itd_511", "itd_511_cameras"], diff --git a/src/central/gui/templates/_event_rows/satpass_predict.html b/src/central/gui/templates/_event_rows/satpass_predict.html new file mode 100644 index 0000000..641e234 --- /dev/null +++ b/src/central/gui/templates/_event_rows/satpass_predict.html @@ -0,0 +1,9 @@ +{# satpass_predict server-side pass alert. Fields from payload->data->data. #} +{% set d = (event.data.get('data') or {}).get('data') or {} %} +{% if d.get('satellite_name') is not none %}
Satellite
{{ d.satellite_name }} (NORAD {{ d.norad_id }})
{% endif %} +{% if d.get('observer_name') is not none %}
Observer
{{ d.observer_name }}{% if d.get('observer_state') %} ({{ d.observer_state }}){% endif %}
{% endif %} +{% if d.get('aos_time') is not none %}
AOS (rise)
{{ d.aos_time }}{% if d.get('azimuth_at_aos') is not none %} — azimuth {{ "%.0f"|format(d.azimuth_at_aos) }}°{% endif %}
{% endif %} +{% if d.get('peak_time') is not none %}
Peak
{{ d.peak_time }}{% if d.get('max_elevation_deg') is not none %} — max elevation {{ "%.0f"|format(d.max_elevation_deg) }}°{% endif %}
{% endif %} +{% if d.get('los_time') is not none %}
LOS (set)
{{ d.los_time }}{% if d.get('azimuth_at_los') is not none %} — azimuth {{ "%.0f"|format(d.azimuth_at_los) }}°{% endif %}
{% endif %} +{% if d.get('duration_s') is not none %}
Duration
{{ "%.0f"|format(d.duration_s) }} sec
{% endif %} +{% if d.get('tle_epoch') is not none %}
TLE epoch
{{ d.tle_epoch }}
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/satpass_predict.html b/src/central/gui/templates/_event_summaries/satpass_predict.html new file mode 100644 index 0000000..e85d0ec --- /dev/null +++ b/src/central/gui/templates/_event_summaries/satpass_predict.html @@ -0,0 +1,4 @@ +{%- set d = (event.data.get('data') or {}).get('data') or {} -%} +{%- if d.get('satellite_name') and d.get('peak_time') and d.get('max_elevation_deg') is not none -%} +{{ d.satellite_name }} passes overhead at {{ d.peak_time[11:16] }} UTC — max elevation {{ "%.0f"|format(d.max_elevation_deg) }}° +{%- endif -%} diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 40984e4..2f79c28 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -141,7 +141,7 @@ STREAM_CATEGORY_DOMAINS: dict[str, tuple[str, ...]] = { "CENTRAL_TRAFFIC_FLOW": ("flow",), "CENTRAL_TRAFFIC_CAMERAS": ("camera",), "CENTRAL_AVY": ("avy",), - "CENTRAL_SAT": ("tle",), + "CENTRAL_SAT": ("tle", "pass"), } diff --git a/tests/test_celestrak_tle.py b/tests/test_celestrak_tle.py index 902a340..cf94556 100644 --- a/tests/test_celestrak_tle.py +++ b/tests/test_celestrak_tle.py @@ -444,8 +444,10 @@ def test_central_sat_registered_in_streams(): def test_central_sat_in_supervisor_family_map(): + """v0.11.0 set this to ('tle',); v0.11.1 extended to ('tle', 'pass') so + satpass_predict events also route to CENTRAL_SAT.""" from central.supervisor import STREAM_CATEGORY_DOMAINS - assert STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"] == ("tle",) + assert "tle" in STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"] def test_celestrak_tle_in_space_adapter_group(): diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py index 3715d69..656e817 100644 --- a/tests/test_events_feed_frontend.py +++ b/tests/test_events_feed_frontend.py @@ -1156,6 +1156,12 @@ _SAMPLE_INNER = { "mean_motion_rev_per_day": 15.49672912, }}, }, + "satpass_predict": { + "satellite_name": "ISS (ZARYA)", + "norad_id": 25544, + "peak_time": "2026-06-09T15:39:37+00:00", + "max_elevation_deg": 40.3, + }, } # Exact expected subjects for the deterministic adapters. swpc_alerts is omitted @@ -1181,6 +1187,7 @@ _EXPECTED_SUBJECT = { "itd_511_cameras": "Camera: I-84 Mountain Home", "avalanche_org": "Avalanche advisory — Banner Summit (Considerable)", "celestrak_tle": "TLE update: ISS (ZARYA) (NORAD 25544) — 92.9min orbit at 51.6°", + "satpass_predict": "ISS (ZARYA) passes overhead at 15:39 UTC — max elevation 40°", } diff --git a/tests/test_satpass_predict.py b/tests/test_satpass_predict.py new file mode 100644 index 0000000..c50e233 --- /dev/null +++ b/tests/test_satpass_predict.py @@ -0,0 +1,377 @@ +"""Tests for the v0.11.1 satpass_predict adapter. + +Deterministic via a fixed ISS TLE + fixed observer + pinned reference time. +The TLE comes from the v0.11.0 stations fixture (epoch 2026-06-08T19:17 UTC); +reference time pinned at 2026-06-09T07:00 UTC; observer is Treasure Valley +(43.6, -116.2, 0m elev). This combination produces a known ISS pass starting +at ~15:36 UTC the same day (verified via the sgp4 sanity script during Phase +A of v0.11.1). +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.adapter import SourceAdapter +from central.adapters.satpass_predict import ( + Observer, + SatpassPredictAdapter, + SatpassPredictSettings, + _gmst_rad, + _next_passes, + _observer_ecef, + _severity_from_elev, + _topocentric_az_el, +) +from central.config_models import AdapterConfig + + +# Live TLE from the v0.11.0 stations fixture, ISS (NORAD 25544). +_ISS_L1 = "1 25544U 98067A 26159.80410962 .00007129 00000+0 13425-3 0 9999" +_ISS_L2 = "2 25544 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570453" + +# Pinned observer + reference time. +_OBS = Observer(name="Treasure Valley", slug="treasure-valley", + state="ID", lat=43.6, lon=-116.2, elev_m=0.0) +_REF = datetime(2026, 6, 9, 7, 0, 0, tzinfo=timezone.utc) + + +@pytest.fixture +def adapter(tmp_path: Path) -> SatpassPredictAdapter: + cfg = AdapterConfig( + name="satpass_predict", + enabled=True, + cadence_s=3600, + settings={"observers": [_OBS.model_dump()], + "min_elevation_deg": 10.0, "horizon_hours": 24}, + updated_at=datetime.now(timezone.utc), + ) + return SatpassPredictAdapter(cfg, MagicMock(), tmp_path / "cursors.db") + + +# --- Pure math helpers ------------------------------------------------------ + + +def test_gmst_rad_returns_radians_in_canonical_range(): + """GMST output must wrap into [0, 2π).""" + import math as m + val = _gmst_rad(2460835.0, 0.5) # arbitrary post-2000 JD + assert 0.0 <= val < 2.0 * m.pi + + +def test_observer_ecef_for_north_pole_and_equator(): + """Sanity: north pole sits on z-axis; equator at lon=0 sits on x-axis.""" + pole = _observer_ecef(90.0, 0.0, 0.0) + assert abs(pole[0]) < 1e-6 and abs(pole[1]) < 1e-6 + assert pole[2] > 6378.0 # ~6378.137 km + + eq_zero = _observer_ecef(0.0, 0.0, 0.0) + assert eq_zero[0] > 6378.0 and abs(eq_zero[1]) < 1e-6 and abs(eq_zero[2]) < 1e-6 + + +def test_topocentric_zenith_satellite_returns_90_elevation(): + """A satellite directly overhead must read elevation 90°, any azimuth.""" + obs_lat, obs_lon = 43.6, -116.2 + obs = _observer_ecef(obs_lat, obs_lon, 0.0) + # 400km straight up = scale observer position vector by (R+400)/R + import math as m + r_obs = m.sqrt(sum(c * c for c in obs)) + r_sat = r_obs + 400.0 + scale = r_sat / r_obs + sat_ecef = (obs[0] * scale, obs[1] * scale, obs[2] * scale) + az, el = _topocentric_az_el(sat_ecef, obs, obs_lat, obs_lon) + assert abs(el - 90.0) < 0.01, f"expected zenith elevation, got {el}" + + +def test_topocentric_below_horizon_returns_negative_elevation(): + """Satellite on the opposite side of the earth = below horizon.""" + obs = _observer_ecef(0.0, 0.0, 0.0) # equator, prime meridian + antipode = (-obs[0] * 2.0, 0.0, 0.0) # other side, well below + _, el = _topocentric_az_el(antipode, obs, 0.0, 0.0) + assert el < -10.0 + + +# --- Severity bucketing ----------------------------------------------------- + + +@pytest.mark.parametrize("max_elev, expected", [ + (90.0, 4), # zenith + (60.0, 4), # boundary -> 4 + (59.99, 3), + (30.0, 3), # boundary -> 3 + (29.99, 2), + (10.0, 2), # boundary -> 2 (gate threshold; emit) + (9.99, 1), # below gate -> 1 (should never emit in practice) + (0.0, 1), +]) +def test_severity_from_elev_buckets(max_elev, expected): + assert _severity_from_elev(max_elev) == expected + + +# --- Pass detection (the load-bearing math test) --------------------------- + + +def test_iss_next_pass_over_treasure_valley_is_chronologically_sane(): + """Pinned TLE + observer + ref time produces ONE known ISS pass in 24h. + AOS < peak < LOS, max_elev in (10, 90), positive duration.""" + passes = _next_passes( + _ISS_L1, _ISS_L2, _OBS, + ref_time=_REF, horizon_hours=24, min_elevation_deg=10.0, + ) + assert len(passes) > 0, "expected at least one ISS pass over Boise in next 24h" + p = passes[0] + assert p["aos"] < p["peak"] <= p["los"] + assert 10.0 < p["max_elev_deg"] < 90.0 + assert (p["los"] - p["aos"]).total_seconds() > 0 + # And the pass must lie inside the 24h horizon (ref + 24h = 2026-06-10T07:00 UTC). + horizon_end = datetime(2026, 6, 10, 7, 0, 0, tzinfo=timezone.utc) + assert p["aos"] >= _REF + assert p["los"] <= horizon_end + + +def test_iss_pass_has_plausible_azimuths(): + """Azimuth at AOS and LOS should be valid 0-360° readings.""" + passes = _next_passes( + _ISS_L1, _ISS_L2, _OBS, + ref_time=_REF, horizon_hours=24, min_elevation_deg=10.0, + ) + p = passes[0] + assert 0.0 <= p["aos_az"] < 360.0 + # los_az may be None if the pass ran to the horizon edge, but for ISS + # against the pinned ref it completes within 24h. + if p["los_az"] is not None: + assert 0.0 <= p["los_az"] < 360.0 + + +def test_min_elevation_gate_filters_lower_passes(): + """Same TLE, raise the gate to 80° -- now zero passes (ISS at 51.6° + inclination from latitude 43.6° can't reach 80° often).""" + passes_low = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + passes_high = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 80.0) + assert len(passes_low) > 0 + # No 80°+ passes today (would require near-overhead crossing). + for p in passes_high: + assert p["max_elev_deg"] >= 80.0 + + +def test_malformed_tle_returns_empty_pass_list(): + """A garbage TLE must not crash; just yield no passes.""" + passes = _next_passes("not a tle", "also not", _OBS, _REF, 24, 10.0) + assert passes == [] + + +# --- _build_event / _pass_to_event ------------------------------------------ + + +def _row_for_iss(): + return { + "norad_id": 25544, "satellite_name": "ISS (ZARYA)", + "tle_line1": _ISS_L1, "tle_line2": _ISS_L2, + "tle_epoch": "2026-06-08T19:17:55+00:00", + } + + +def test_pass_event_shape(adapter): + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + assert passes + ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS) + # Identity + assert ev.adapter == "satpass_predict" + assert ev.category == "pass.satpass_predict" + # Dedup id shape: {observer_slug}:{norad_id}:{aos_iso} + assert ev.id.startswith("treasure-valley:25544:") + assert ":2026-06-" in ev.id # AOS within the same UTC day window + # Severity bucket maps from peak elevation + assert ev.severity == _severity_from_elev(passes[0]["max_elev_deg"]) + # Geo: centroid at the observer point + assert ev.geo.centroid == (-116.2, 43.6) + assert ev.geo.primary_region == "US-ID" + # data fields per spec + assert ev.data["observer_name"] == "Treasure Valley" + assert ev.data["observer_slug"] == "treasure-valley" + assert ev.data["observer_state"] == "ID" + assert ev.data["norad_id"] == 25544 + assert ev.data["satellite_name"] == "ISS (ZARYA)" + assert ev.data["max_elevation_deg"] == round(passes[0]["max_elev_deg"], 2) + assert ev.data["duration_s"] > 0 + assert ev.data["tle_epoch"] == "2026-06-08T19:17:55+00:00" + + +def test_subject_for_uses_observer_state_and_slug(adapter): + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS) + assert adapter.subject_for(ev) == "central.sat.pass.us.id.treasure-valley" + + +def test_subject_for_falls_back_when_state_or_slug_missing(adapter): + from central.models import Event, Geo + ev = Event( + id="x", adapter="satpass_predict", category="pass.satpass_predict", + time=datetime.now(timezone.utc), severity=2, geo=Geo(), data={}, + ) + assert adapter.subject_for(ev) == "central.sat.pass.us.unknown.unknown" + + +# --- poll() integration with mocked pool ------------------------------------ + + +def _mock_pool_returning(rows): + """Build a MagicMock pool that yields ``rows`` from any SELECT.""" + pool = MagicMock() + conn = MagicMock() + conn.fetch = AsyncMock(return_value=rows) + pool.acquire.return_value.__aenter__ = AsyncMock(return_value=conn) + pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + return pool + + +@pytest.mark.asyncio +async def test_poll_empty_tles_table_logs_and_yields_zero(tmp_path): + """v0.11.1 spec: empty TLE table -> 0 events, INFO log, no exception.""" + cfg = AdapterConfig( + name="satpass_predict", enabled=True, cadence_s=3600, + settings={"observers": [_OBS.model_dump()], + "min_elevation_deg": 10.0, "horizon_hours": 24}, + updated_at=datetime.now(timezone.utc), + ) + config_store = MagicMock() + config_store.get_pool.return_value = _mock_pool_returning([]) + adapter = SatpassPredictAdapter(cfg, config_store, tmp_path / "cursors.db") + await adapter.startup() + try: + events = [e async for e in adapter.poll()] + assert events == [] + finally: + await adapter.shutdown() + + +@pytest.mark.asyncio +async def test_poll_multi_observer_yields_per_observer_pass_list(tmp_path): + """Two observers in settings → each observer gets its own pass list against + the same TLE. Boise (43.6, -116.2) and Salt Lake City (40.76, -111.89) + both see ISS but with slightly different AOS times -> different events.""" + boise = _OBS + slc = Observer(name="Salt Lake City", slug="slc", + state="UT", lat=40.76, lon=-111.89, elev_m=0.0) + cfg = AdapterConfig( + name="satpass_predict", enabled=True, cadence_s=3600, + settings={"observers": [boise.model_dump(), slc.model_dump()], + "min_elevation_deg": 10.0, "horizon_hours": 24}, + updated_at=datetime.now(timezone.utc), + ) + config_store = MagicMock() + config_store.get_pool.return_value = _mock_pool_returning([_row_for_iss()]) + adapter = SatpassPredictAdapter(cfg, config_store, tmp_path / "cursors.db") + await adapter.startup() + try: + events = [e async for e in adapter.poll()] + # We don't pin counts (number of passes per 24h varies with the pinned + # ref time), but each observer must have at least one event distinct + # from the other. + boise_evs = [e for e in events if e.data["observer_slug"] == "treasure-valley"] + slc_evs = [e for e in events if e.data["observer_slug"] == "slc"] + assert boise_evs, "no Boise passes" + assert slc_evs, "no Salt Lake City passes" + # Subject routing differs by state. + assert adapter.subject_for(boise_evs[0]) == "central.sat.pass.us.id.treasure-valley" + assert adapter.subject_for(slc_evs[0]) == "central.sat.pass.us.ut.slc" + finally: + await adapter.shutdown() + + +# --- Settings / apply_config / dedup-mixin regression ---------------------- + + +def test_default_settings_match_spec(): + s = SatpassPredictSettings() + assert s.min_elevation_deg == 10.0 + assert s.horizon_hours == 24 + assert len(s.observers) == 1 + assert s.observers[0].slug == "treasure-valley" + + +def test_inherits_dedup_mixin_from_source_adapter(tmp_path): + """v0.9.1 regression guard.""" + assert issubclass(SatpassPredictAdapter, SourceAdapter) + a = SatpassPredictAdapter( + AdapterConfig( + name="satpass_predict", enabled=False, cadence_s=3600, + settings={}, updated_at=datetime.now(timezone.utc), + ), + MagicMock(), + tmp_path / "cursors.db", + ) + assert callable(a.is_published) + assert callable(a.mark_published) + assert callable(a.sweep_old_ids) + + +@pytest.mark.asyncio +async def test_apply_config_updates_observers_and_threshold(adapter): + new_obs = Observer(name="Sandpoint", slug="sandpoint", + state="ID", lat=48.27, lon=-116.55, elev_m=600.0) + new_cfg = AdapterConfig( + name="satpass_predict", enabled=True, cadence_s=3600, + settings={"observers": [new_obs.model_dump()], + "min_elevation_deg": 25.0, "horizon_hours": 12}, + updated_at=datetime.now(timezone.utc), + ) + await adapter.apply_config(new_cfg) + assert len(adapter._observers) == 1 + assert adapter._observers[0].slug == "sandpoint" + assert adapter._min_elev == 25.0 + assert adapter._horizon_h == 12.0 + + +# --- Stream registry + family map + GUI wiring ---------------------------- + + +def test_central_sat_family_includes_pass_token(): + """v0.11.1: pass.* categories also route to CENTRAL_SAT.""" + from central.supervisor import STREAM_CATEGORY_DOMAINS + assert STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"] == ("tle", "pass") + + +def test_satpass_predict_in_space_adapter_group(): + from central.gui.routes import ADAPTER_GROUPS + assert "satpass_predict" in ADAPTER_GROUPS["Space"] + + +# --- Partials render cleanly (v0.10.0 pattern) ------------------------------ + + +def test_summary_partial_renders_cleanly_with_real_pass(adapter): + from jinja2 import Environment, FileSystemLoader + templates_dir = Path(__file__).parent.parent / "src" / "central" / "gui" / "templates" + env = Environment(loader=FileSystemLoader(str(templates_dir)), autoescape=True) + tmpl = env.get_template("_event_summaries/satpass_predict.html") + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS) + rendered = tmpl.render(event={ + "data": {"data": {"data": ev.model_dump(mode="json")["data"]}} + }).strip() + assert "ISS (ZARYA)" in rendered, f"got: {rendered!r}" + assert "max elevation" in rendered + assert "UTC" in rendered + + +def test_row_partial_renders_cleanly(adapter): + from jinja2 import Environment, FileSystemLoader + templates_dir = Path(__file__).parent.parent / "src" / "central" / "gui" / "templates" + env = Environment(loader=FileSystemLoader(str(templates_dir)), autoescape=True) + tmpl = env.get_template("_event_rows/satpass_predict.html") + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS) + rendered = tmpl.render(event={ + "data": {"data": {"data": ev.model_dump(mode="json")["data"]}} + }) + assert "
Satellite
" in rendered and "ISS (ZARYA)" in rendered + assert "
Observer
" in rendered and "Treasure Valley" in rendered + assert "
AOS (rise)
" in rendered + assert "
Peak
" in rendered + assert "
LOS (set)
" in rendered + assert "
Duration
" in rendered diff --git a/uv.lock b/uv.lock index 2a5ae75..cda0ba2 100644 --- a/uv.lock +++ b/uv.lock @@ -189,6 +189,7 @@ dependencies = [ { name = "pydantic" }, { name = "pydantic-settings" }, { name = "python-multipart" }, + { name = "sgp4" }, { name = "shapely" }, { name = "tenacity" }, { name = "uvicorn", extra = ["standard"] }, @@ -219,6 +220,7 @@ requires-dist = [ { name = "pydantic", specifier = ">=2,<3" }, { name = "pydantic-settings", specifier = ">=2.7.0" }, { name = "python-multipart", specifier = ">=0.0.20" }, + { name = "sgp4", specifier = ">=2.25" }, { name = "shapely", specifier = ">=2.0" }, { name = "tenacity", specifier = ">=9.1.4" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" }, @@ -893,6 +895,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9b/36/9c015cd052fca743dae8cb2aeb16b551444787467db42ceab0fc968865af/ruff-0.15.13-py3-none-win_arm64.whl", hash = "sha256:2471da9bd1068c8c064b5fd9c0c4b6dddffd6369cb1cd68b29993b1709ff1b21", size = 11179336, upload-time = "2026-05-14T13:44:33.026Z" }, ] +[[package]] +name = "sgp4" +version = "2.25" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6e/d0/fc467010d17742321f73b16a71acac88439a88f2b166641942a6566c9b2a/sgp4-2.25.tar.gz", hash = "sha256:e19edc6dcc25d69fb8fde0a267b8f0c44d7e915c7bcbeacf5d3a8b595baf0674", size = 181016, upload-time = "2025-08-04T18:02:33.765Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0a/71/864524bde46a02e636cc5de47b9a4e1f1ed18c7acc3f1319cf9fe1db3c7a/sgp4-2.25-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:170ec2882cd166ff9d8dccfb8018f86d5cc033ea8a07c27a1825999c62439f05", size = 162985, upload-time = "2025-08-04T18:01:55.646Z" }, + { url = "https://files.pythonhosted.org/packages/e3/cd/022aa419d9570d494dafd5103a71dda64c6ffc956a1c7f5b096a58a23a6a/sgp4-2.25-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:64c7597a60b770caac51566b1f621d1cd74df0409ef19c5e7ea3505d0dfbc677", size = 161951, upload-time = "2025-08-04T18:01:56.745Z" }, + { url = "https://files.pythonhosted.org/packages/3a/1c/76dbf2190d30a770fe8ac57474d212e005f56f47e65dd6fcecdb546d454f/sgp4-2.25-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0e1d18b8972643dd29e758e67c062cfb68fbe2421fe3f6398f1957a9825119f6", size = 236340, upload-time = "2025-08-04T18:01:57.778Z" }, + { url = "https://files.pythonhosted.org/packages/97/a4/2fc9bf9cb75571222bd453407317e91193a3db1c559333c5e46ce7a014c9/sgp4-2.25-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:35649388a06cbee7def24cbb789f452c31d42ed9e87bddd89935ed78f19451ed", size = 233080, upload-time = "2025-08-04T18:01:58.812Z" }, + { url = "https://files.pythonhosted.org/packages/fc/40/50ecdc518edd3a85ad74bda7a2196b53d5901256e3d7ab34225c96e8edc8/sgp4-2.25-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:911460477f1c52dcda2b3eb20538435b89b0a43668bcb5edd1e7700b7a1a0225", size = 235729, upload-time = "2025-08-04T18:01:59.83Z" }, + { url = "https://files.pythonhosted.org/packages/1b/dd/c1ee8571828debfd3e0f2297379a2a2af75024062c70cf76bdc121e77623/sgp4-2.25-cp312-cp312-win32.whl", hash = "sha256:128edd3d6061e833600d93e77d4c08d1a5002293997e368256b0b777ea525dda", size = 161899, upload-time = "2025-08-04T18:02:00.882Z" }, + { url = "https://files.pythonhosted.org/packages/c8/f8/7dae15af520dfe5def1f8620c2817203cbbf1a1bf154b2079add1200acd3/sgp4-2.25-cp312-cp312-win_amd64.whl", hash = "sha256:979eb60e74aff5dc318cfe1a6c817db884486bdfc8496d2c5bc07b05fe833280", size = 164137, upload-time = "2025-08-04T18:02:01.817Z" }, + { url = "https://files.pythonhosted.org/packages/3a/47/8231e3d4a88341316ec8d0eb98d3a8a972477d8b038555259522735a8371/sgp4-2.25-py3-none-any.whl", hash = "sha256:4f39ecf6c2663109fed04adfe9982815ac83893271b521d92d5b186820f8c78e", size = 137376, upload-time = "2026-04-27T18:29:23.71Z" }, +] + [[package]] name = "shapely" version = "2.1.2"