diff --git a/src/central/adapters/satpass_predict.py b/src/central/adapters/satpass_predict.py index d2f9662..6fdbda3 100644 --- a/src/central/adapters/satpass_predict.py +++ b/src/central/adapters/satpass_predict.py @@ -147,6 +147,28 @@ def _topocentric_az_el( return azimuth, elevation +def _sample_at( + sat: Satrec, + t: datetime, + obs_ecef_km: tuple[float, float, float], + obs_lat_deg: float, + obs_lon_deg: float, +) -> tuple[float, float, tuple[float, float, float]] | None: + """Sample SGP4 at instant ``t`` and return ``(azimuth_deg, elevation_deg, sat_ecef_km)``. + + Single sgp4 call per sample -- the satellite ECEF position is returned so + the caller can also compute the sub-satellite point without a second + propagation. Returns ``None`` if SGP4 reports a propagation error. + """ + jd, fr = jday(t.year, t.month, t.day, t.hour, t.minute, t.second + t.microsecond / 1e6) + err, pos_eci, _ = sat.sgp4(jd, fr) + if err: + return None + sat_ecef = _eci_to_ecef(pos_eci, _gmst_rad(jd, fr)) + az, el = _topocentric_az_el(sat_ecef, obs_ecef_km, obs_lat_deg, obs_lon_deg) + return az, el, sat_ecef + + def _elev_at( sat: Satrec, t: datetime, @@ -154,17 +176,104 @@ def _elev_at( obs_lat_deg: float, obs_lon_deg: float, ) -> tuple[float, float] | None: - """Compute ``(azimuth_deg, elevation_deg)`` at instant ``t`` (UTC datetime). + """Back-compat wrapper retained for v0.11.1 tests that expected ``(az, el)``. - Returns ``None`` if SGP4 reports a propagation error (decayed orbit, bad - epoch, etc.). + Prefer ``_sample_at`` in new code so the ECEF position is available for + sub-satellite point computation without a second propagation. """ - jd, fr = jday(t.year, t.month, t.day, t.hour, t.minute, t.second + t.microsecond / 1e6) - err, pos_eci, _ = sat.sgp4(jd, fr) - if err: + sample = _sample_at(sat, t, obs_ecef_km, obs_lat_deg, obs_lon_deg) + return None if sample is None else (sample[0], sample[1]) + + +def _subsatellite_point(pos_ecef_km: tuple[float, float, float]) -> tuple[float, float, float]: + """ECEF (km) -> ``(lon_deg, lat_deg, alt_km)``. + + Sub-satellite point is the ground location directly beneath the satellite + on a spherical earth. Longitude normalised to [-180, 180]. Altitude is + geocentric height above the equatorial radius (not WGS-84 height above + ellipsoid -- close enough for footprint-radius math). + """ + x, y, z = pos_ecef_km + horizontal = math.sqrt(x * x + y * y) + lat = math.degrees(math.atan2(z, horizontal)) + lon = math.degrees(math.atan2(y, x)) + if lon > 180.0: + lon -= 360.0 + elif lon < -180.0: + lon += 360.0 + alt = math.sqrt(x * x + y * y + z * z) - _EARTH_RADIUS_KM + return lon, lat, alt + + +def _visibility_footprint( + lon_deg: float, lat_deg: float, alt_km: float, n_vertices: int = 32, +) -> dict[str, Any] | None: + """Geodesic circle visible from a satellite at altitude ``alt_km``. + + Radius is the horizon distance on a spherical earth: + ``r = R * acos(R / (R + alt))``. ISS at 408km -> ~2253km; GEO at 35786km + -> ~9055km. Returns a GeoJSON Polygon (single closed ring of + ``n_vertices + 1`` ``[lon, lat]`` pairs); ``None`` if ``alt_km <= 0`` + (decayed orbit / parse error). + + Antimeridian behaviour: the longitude of each vertex is normalised to + [-180, 180] independently. For sub-satellite points well clear of the + dateline (which includes all Idaho-overhead passes for ISS-class + altitudes) the resulting polygon is well-formed in Leaflet. Polar-orbit + crossings near ±180° will produce a polygon that visually wraps the + "wrong way" around the globe -- documented limitation, not handled. + """ + if alt_km <= 0: return None - sat_ecef = _eci_to_ecef(pos_eci, _gmst_rad(jd, fr)) - return _topocentric_az_el(sat_ecef, obs_ecef_km, obs_lat_deg, obs_lon_deg) + r_earth = _EARTH_RADIUS_KM + radius_km = r_earth * math.acos(r_earth / (r_earth + alt_km)) + angular = radius_km / r_earth + lat1 = math.radians(lat_deg) + lon1 = math.radians(lon_deg) + sin_lat1, cos_lat1 = math.sin(lat1), math.cos(lat1) + sin_d, cos_d = math.sin(angular), math.cos(angular) + + ring: list[list[float]] = [] + for i in range(n_vertices): + bearing = 2.0 * math.pi * i / n_vertices + lat2 = math.asin(sin_lat1 * cos_d + cos_lat1 * sin_d * math.cos(bearing)) + lon2 = lon1 + math.atan2( + math.sin(bearing) * sin_d * cos_lat1, + cos_d - sin_lat1 * math.sin(lat2), + ) + lon2_deg = math.degrees(lon2) + # Wrap each vertex into [-180, 180]. + lon2_deg = ((lon2_deg + 180.0) % 360.0) - 180.0 + ring.append([lon2_deg, math.degrees(lat2)]) + ring.append(ring[0]) # close + return {"type": "Polygon", "coordinates": [ring]} + + +def _build_pass_geometry(p: dict[str, Any]) -> dict[str, Any] | None: + """Assemble the GeoJSON GeometryCollection for one pass (v0.11.2). + + Combines the ground-track LineString (AOS -> LOS) and the visibility- + footprint Polygon at peak time. Returns ``None`` if neither is buildable + (degenerate pass, propagation error, etc.) so the caller can omit + ``geo.geometry`` entirely rather than write a ``{"type": ..., ...}`` + placeholder. + """ + geometries: list[dict[str, Any]] = [] + track = p.get("ground_track") or [] + if len(track) >= 2: + geometries.append({ + "type": "LineString", + "coordinates": [[lon, lat] for lon, lat in track], + }) + peak_subsat = p.get("peak_subsat") + if peak_subsat: + lon, lat, alt = peak_subsat + footprint = _visibility_footprint(lon, lat, alt) + if footprint: + geometries.append(footprint) + if not geometries: + return None + return {"type": "GeometryCollection", "geometries": geometries} def _severity_from_elev(max_elev_deg: float) -> int: @@ -186,7 +295,15 @@ def _next_passes( horizon_hours: float, min_elevation_deg: float, ) -> list[dict[str, Any]]: - """Walk a 60-second grid; return all passes >= min_elevation_deg in window.""" + """Walk a 60-second grid; return all passes >= min_elevation_deg in window. + + Each returned dict now (v0.11.2) also carries: + ``peak_subsat``: ``(lon_deg, lat_deg, alt_km)`` at the moment of peak + elevation, for visibility-footprint construction. + ``ground_track``: list of ``(lon_deg, lat_deg)`` sub-satellite points + sampled at the same grid from AOS through LOS, for the + ground-track polyline. + """ try: sat = Satrec.twoline2rv(tle_line1, tle_line2) except Exception: @@ -200,16 +317,19 @@ def _next_passes( peak_t: datetime | None = None peak_e: float = -180.0 peak_az: float | None = None + peak_subsat: tuple[float, float, float] | None = None + track: list[tuple[float, float]] = [] t = ref_time end = ref_time + timedelta(hours=horizon_hours) step = timedelta(seconds=_PASS_STEP_S) while t < end: - sample = _elev_at(sat, t, obs_ecef, observer.lat, observer.lon) + sample = _sample_at(sat, t, obs_ecef, observer.lat, observer.lon) if sample is None: t += step continue - az, e = sample + az, e, sat_ecef = sample + subsat = _subsatellite_point(sat_ecef) # (lon, lat, alt) if e >= min_elevation_deg: if not in_pass: in_pass = True @@ -218,28 +338,43 @@ def _next_passes( peak_t = t peak_e = e peak_az = az - elif e > peak_e: - peak_t = t - peak_e = e - peak_az = az + peak_subsat = subsat + track = [(subsat[0], subsat[1])] + else: + track.append((subsat[0], subsat[1])) + if e > peak_e: + peak_t = t + peak_e = e + peak_az = az + peak_subsat = subsat elif in_pass: - # threshold-crossing on the way down -> close the pass + # threshold-crossing on the way down -> close the pass; include + # the descending boundary point in the track so the polyline + # ends at LOS rather than at the last sample above min_elev. + track.append((subsat[0], subsat[1])) passes.append({ "aos": aos_t, "aos_az": aos_az, "peak": peak_t, "peak_az": peak_az, "max_elev_deg": peak_e, "los": t, "los_az": az, + "peak_subsat": peak_subsat, + "ground_track": list(track), }) in_pass = False aos_t = aos_az = peak_t = peak_az = None peak_e = -180.0 + peak_subsat = None + track = [] t += step - # Pass still in progress at the horizon edge -- close it at the boundary. + # Pass still in progress at the horizon edge -- close it at the boundary + # (los_az=None signals the pass extended beyond the horizon). if in_pass and aos_t and peak_t: passes.append({ "aos": aos_t, "aos_az": aos_az, "peak": peak_t, "peak_az": peak_az, "max_elev_deg": peak_e, "los": end, "los_az": None, + "peak_subsat": peak_subsat, + "ground_track": list(track), }) return passes @@ -358,6 +493,13 @@ class SatpassPredictAdapter(SourceAdapter): observer: Observer, ) -> Event: aos: datetime = p["aos"] + # v0.11.2: enrich geo.geometry with a GeometryCollection so the + # events-list map (Leaflet L.geoJSON handles GeometryCollection + # natively) renders BOTH the ground-track LineString from AOS->LOS + # AND the visibility-footprint Polygon at peak time. centroid stays + # at the observer point -- the alert is logically AT the observer, + # the added geometry is supplementary spatial context. + geometry = _build_pass_geometry(p) return Event( id=f"{observer.slug}:{row['norad_id']}:{aos.isoformat()}", adapter=self.name, @@ -366,6 +508,7 @@ class SatpassPredictAdapter(SourceAdapter): severity=_severity_from_elev(p["max_elev_deg"]), geo=Geo( centroid=(observer.lon, observer.lat), + geometry=geometry, regions=[f"US-{observer.state}"], primary_region=f"US-{observer.state}", ), diff --git a/tests/test_satpass_predict.py b/tests/test_satpass_predict.py index c50e233..5404554 100644 --- a/tests/test_satpass_predict.py +++ b/tests/test_satpass_predict.py @@ -21,11 +21,14 @@ from central.adapters.satpass_predict import ( Observer, SatpassPredictAdapter, SatpassPredictSettings, + _build_pass_geometry, _gmst_rad, _next_passes, _observer_ecef, _severity_from_elev, + _subsatellite_point, _topocentric_az_el, + _visibility_footprint, ) from central.config_models import AdapterConfig @@ -375,3 +378,192 @@ def test_row_partial_renders_cleanly(adapter): assert "
Peak
" in rendered assert "
LOS (set)
" in rendered assert "
Duration
" in rendered + + +# --- v0.11.2: sub-satellite point + visibility footprint + GeometryCollection + + +def test_subsatellite_point_at_north_pole_returns_polar_coords(): + """Sat at +z over geocentre -> lat=90, lon undefined (atan2 returns 0).""" + lon, lat, alt = _subsatellite_point((0.0, 0.0, 7000.0)) + assert abs(lat - 90.0) < 1e-6 + assert abs(alt - (7000.0 - 6378.137)) < 1e-6 + + +def test_subsatellite_point_over_equator_lon_zero(): + """Sat on +x axis at altitude 400km over (lon=0, lat=0).""" + lon, lat, alt = _subsatellite_point((6378.137 + 400.0, 0.0, 0.0)) + assert abs(lon - 0.0) < 1e-6 + assert abs(lat - 0.0) < 1e-6 + assert abs(alt - 400.0) < 1e-6 + + +def test_subsatellite_point_over_equator_at_lon_90(): + """Sat on +y axis over (lon=90, lat=0).""" + lon, lat, alt = _subsatellite_point((0.0, 6778.137, 0.0)) + assert abs(lon - 90.0) < 1e-6 + assert abs(lat - 0.0) < 1e-6 + + +def test_subsatellite_point_lon_normalised_into_180_range(): + """Sat at lon=-90 (Pacific) -> lon=-90, not 270.""" + lon, _, _ = _subsatellite_point((0.0, -6778.137, 0.0)) + assert -180.0 <= lon <= 180.0 + assert abs(lon - (-90.0)) < 1e-6 + + +def test_subsatellite_point_real_iss_sample_via_sgp4(): + """End-to-end against sgp4: ISS at TLE epoch -- sub-sat point should be + on a 51.6° inclination orbit (lat in [-52, 52]). Bit-deterministic.""" + from sgp4.api import Satrec, jday + sat = Satrec.twoline2rv(_ISS_L1, _ISS_L2) + # Propagate at TLE epoch itself for a clean reference point. + jd, fr = jday(2026, 6, 8, 19, 17, 55.071168) + err, pos_eci, _ = sat.sgp4(jd, fr) + assert err == 0 + from central.adapters.satpass_predict import _eci_to_ecef, _gmst_rad as gmst + sat_ecef = _eci_to_ecef(pos_eci, gmst(jd, fr)) + lon, lat, alt = _subsatellite_point(sat_ecef) + # ISS inclination is 51.6° so sub-sat latitude must stay within ±52°. + assert -52.0 < lat < 52.0, f"ISS sub-sat lat {lat}° outside inclination envelope" + # ISS altitude is ~408 km nominally; allow generous range for SGP4 noise. + assert 350.0 < alt < 500.0, f"ISS altitude {alt}km outside expected range" + assert -180.0 <= lon <= 180.0 + + +# --- Visibility footprint -------------------------------------------------- + + +def test_visibility_footprint_returns_closed_32_vertex_polygon(): + poly = _visibility_footprint(lon_deg=-116.2, lat_deg=43.6, alt_km=408.0) + assert poly is not None + assert poly["type"] == "Polygon" + ring = poly["coordinates"][0] + # 32 vertices + closing duplicate = 33 points in the ring. + assert len(ring) == 33 + # First == last (closed polygon). + assert ring[0] == ring[-1] + + +def test_visibility_footprint_iss_radius_approximation(): + """ISS at 408km -> horizon ~2253km (spec says ~2200km).""" + poly = _visibility_footprint(lon_deg=0.0, lat_deg=0.0, alt_km=408.0) + ring = poly["coordinates"][0] + # At the equator with sub-sat at (0,0), the easternmost vertex is at + # bearing 90° (pure east), so its longitude equals the angular distance + # in degrees. radius_km / R_earth = angular_dist in rad; *180/pi for deg. + import math as m + r_earth = 6378.137 + expected_angular_deg = m.degrees(r_earth * m.acos(r_earth / (r_earth + 408.0)) / r_earth) + # 2200km / 6378km ≈ 0.345 rad ≈ 19.76°. Expect lons in ring around ±19.76. + max_lon = max(p[0] for p in ring) + assert 18.0 < max_lon < 22.0, f"ISS east-vertex lon {max_lon}, expected ~20° (radius ~2200km)" + assert abs(max_lon - expected_angular_deg) < 0.5 + + +def test_visibility_footprint_geo_radius_approximation(): + """GEO at 35786km -> horizon ~9000km (spec).""" + poly = _visibility_footprint(lon_deg=0.0, lat_deg=0.0, alt_km=35786.0) + ring = poly["coordinates"][0] + max_lon = max(p[0] for p in ring) + # 9000km / 6378km ≈ 1.41 rad ≈ 80.85°. Expect lons in ring spanning ±81. + assert 78.0 < max_lon < 83.0, f"GEO east-vertex lon {max_lon}, expected ~81°" + + +def test_visibility_footprint_none_for_decayed_altitude(): + """Negative or zero altitude -> None (orbit decayed, garbage in).""" + assert _visibility_footprint(0.0, 0.0, 0.0) is None + assert _visibility_footprint(0.0, 0.0, -100.0) is None + + +def test_visibility_footprint_near_antimeridian_does_not_crash(): + """Polar-orbit-style sub-sat at lon=179° -- vertices wrap across the + dateline. Documented limitation: each vertex is normalised independently + so the polygon may visually wrap the "wrong way" in Leaflet for sats + crossing ±180°. Per-vertex normalisation is the simplest approach and + Idaho-overhead passes stay well clear of this case. + """ + poly = _visibility_footprint(lon_deg=179.0, lat_deg=0.0, alt_km=400.0) + assert poly is not None + ring = poly["coordinates"][0] + for lon, lat in ring: + # Every vertex's lon stays within [-180, 180]; no NaN / Inf. + assert -180.0 <= lon <= 180.0 + assert -90.0 <= lat <= 90.0 + import math as m + assert m.isfinite(lon) and m.isfinite(lat) + + +# --- Ground track + GeometryCollection assembly -------------------------- + + +def test_ground_track_collected_during_real_iss_pass(): + """The pinned-ref ISS pass over Treasure Valley collects multiple sub-sat + points from AOS through LOS. Track must be a non-empty list of (lon, lat).""" + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + assert passes + track = passes[0]["ground_track"] + assert isinstance(track, list) + assert len(track) >= 2 # at least AOS + LOS samples + for lon, lat in track: + assert -180.0 <= lon <= 180.0 + assert -90.0 <= lat <= 90.0 + + +def test_peak_subsat_captured_at_peak_time(): + """peak_subsat is (lon, lat, alt) of the satellite at peak elevation.""" + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + p = passes[0] + assert p["peak_subsat"] is not None + lon, lat, alt = p["peak_subsat"] + assert -180.0 <= lon <= 180.0 + # ISS inclination 51.6° → sub-sat lat in [-52, 52] always. + assert -52.0 < lat < 52.0 + # ISS altitude ~400-450km. + assert 350.0 < alt < 500.0 + + +def test_build_pass_geometry_returns_geometrycollection_with_both_shapes(): + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + geom = _build_pass_geometry(passes[0]) + assert geom is not None + assert geom["type"] == "GeometryCollection" + types = [g["type"] for g in geom["geometries"]] + assert "LineString" in types + assert "Polygon" in types + # LineString must have at least 2 vertices. + ls = next(g for g in geom["geometries"] if g["type"] == "LineString") + assert len(ls["coordinates"]) >= 2 + # Polygon must be closed. + poly = next(g for g in geom["geometries"] if g["type"] == "Polygon") + ring = poly["coordinates"][0] + assert ring[0] == ring[-1] + + +def test_build_pass_geometry_returns_none_when_inputs_missing(): + """Defensive: pass dict with no track + no peak_subsat -> None (don't + write an empty GeometryCollection to the wire).""" + assert _build_pass_geometry({}) is None + assert _build_pass_geometry({"ground_track": [], "peak_subsat": None}) is None + + +def test_build_pass_geometry_polygon_only_when_track_too_short(): + """A single-sample track (only 1 vertex) is below LineString minimum; + we omit the LineString but keep the footprint Polygon.""" + geom = _build_pass_geometry({ + "ground_track": [(-116.2, 43.6)], + "peak_subsat": (-116.2, 43.6, 400.0), + }) + assert geom is not None + types = [g["type"] for g in geom["geometries"]] + assert types == ["Polygon"] + + +def test_pass_event_includes_geometry_collection(adapter): + """End-to-end: built Event has the GeometryCollection attached.""" + passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0) + ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS) + assert ev.geo.geometry is not None + assert ev.geo.geometry["type"] == "GeometryCollection" + # centroid stays at observer (unchanged contract). + assert ev.geo.centroid == (-116.2, 43.6)