central/tests/test_satpass_predict.py
malice 3f1fec9846
v0.13.0: sat_orbits adapter (forward-orbit-track per satellite) + antimeridian splitter
## Matt's "each sat's path" framing

After enabling the satellite family in v0.12.1, the `/events` map showed overlapping orange visibility-footprint circles from satpass_predict + a polar-orbit ground track wrapping the wrong way across the antimeridian (the v0.11.2 documented limitation). Matt's ask:

> honestly i just want each sats path.

Interpreted as: one continuous orbital track per satellite, color-coded, no observer-specific clutter, no visibility-footprint overlays. Six tracked sats = six distinguishable lines on the map.

## Family placement — global line counterpart to global points

| Adapter | What it publishes | Geometry | Cadence |
|---|---|---|---|
| satpass_predict (v0.11.1) | Observer-anchored pass alerts | LineString ground-track + Polygon footprint per pass | 1h |
| sat_positions (v0.12.0) | Current sub-sat POINT per sat | Point centroid only | 60s |
| **sat_orbits (this PR)** | Forward-orbit LINE per sat | LineString / MultiLineString, 90min horizon | 5min |

Each answers a different question; they complement.

## Antimeridian splitter — shared sat_common primitive

`split_antimeridian(coords)` lives in `sat_common.py` next to `gmst_rad` / `eci_to_ecef` / `subsatellite_point`. Returns `None` for <2 vertices, a `LineString` dict for the common no-crossing case, or a `MultiLineString` dict when one or more ±180° crossings exist. Each crossing closes the current segment at `sign(prev_lon)*180` with a linearly-interpolated latitude and starts the next at `sign(cur_lon)*180` with the same lat (sub-0.1° error at LEO orbital speeds, well below Leaflet rendering precision).

**Sibling concern fixed:** `satpass_predict._build_pass_geometry` now routes its `ground_track` through `split_antimeridian` too. This was the v0.11.2 documented limitation ("polar-orbit crossings near ±180° will produce a polygon that visually wraps the wrong way"). Sat_orbits and satpass_predict share the helper because the antimeridian problem is identical for both — and **44/44 existing satpass_predict tests still pass** because the splitter returns a LineString identical in shape to the prior inline construction when there's no crossing (which is the case for every CONUS-observer ISS-fixture test).

New test specifically for the splitter inside `_build_pass_geometry`: synthesized polar-orbit `ground_track` produces a `GeometryCollection` whose linear-geometry component is a `MultiLineString` with 2 segments (first ends at +180, second starts at -180).

## GUI per-NORAD-ID color helper

20-line addition to `events_list.html`:

```js
function orbitColorForNoradId(norad) {
    var hue = (norad * 137.508) % 360;  // golden-angle hue distribution
    return "hsl(" + hue.toFixed(1) + ", 70%, 50%)";
}
function getRowColor(adapter, row) {
    if (adapter === "tomtom_flow") return flowColor(row.dataset.severity);
    if (adapter === "sat_orbits") {
        var norad = parseInt((row.dataset.eventId || "").split(":")[0], 10);
        if (!isNaN(norad)) return orbitColorForNoradId(norad);
    }
    return getAdapterColor(adapter);
}
```

`event_id` shape is `<norad_id>:<iso>` (same as sat_positions), so JS reads the first colon-token. **Additive**: tomtom_flow keeps its severity-based color, every other adapter keeps its per-adapter palette color, sat_orbits gets per-satellite distinguishable lines.

## Phase A sanity (per spec)

```
vertices = 91                                      ✓ (90min @ 60s + 1 endpoint)
first vertex = (170.66°, -17.15°, 417.4km)        ✓ matches v0.11.1 ISS pin
last vertex  = (140.52°, -8.60°, 415.9km)         ✓ geographically distinct
antimeridian crossings in 90min track = 1
geometry type = MultiLineString, 2 segments        ✓ splitter integrates
```

## Diff size

**+838 / −9 = +829 net** across 15 files. Spec budget was ≤800 lines. **29 over** — much tighter than v0.12.0 (894) or v0.12.1 (848). Adapter LoC 275 (well under 350 cap). sat_common splitter 51 LoC (~budget).

Test breakdown: 285 (sat_orbits) + 60 (sat_common splitter) + 26 (satpass regression) + 12 (events_feed) + 4 (telemetry-separation) = 387 LoC tests. Production: 275 + 51 + 37 (migration) + 41 (doc) + 16 (partials) + 21 (JS) + 15 (satpass refactor) + 2 (wiring) = 458 LoC.

## Test plan

- [x] `pytest tests/test_sat_orbits.py` — 19 new tests, all pass.
- [x] `pytest tests/test_sat_common.py` — 7 new splitter tests, 16 total pass.
- [x] `pytest tests/test_satpass_predict.py` — **45/45 pass** (44 existing regression-guard + 1 new polar-orbit splitter integration test). The `_build_pass_geometry` rewire is byte-identical for non-crossing tracks.
- [x] `pytest tests/test_events_feed_frontend.py` — 125/125 pass (sat_orbits sample + expected subject extended).
- [x] `pytest tests/test_telemetry_separation.py` — 9/9 pass (`_TELEMETRY` pin extended with `sat_orbits`).
- [x] `pytest tests/test_consumer_doc.py` — 6/6 pass (new `### sat_orbits` subsection accepted).
- [x] Full sweep `pytest tests/` (excluding postgres-dep files): **1274 passed, 1 skipped, 0 failures**.
- [x] Ruff: clean on all new + touched satellite-family code.

## Deploy plan

1. Squash-merge PR #N → tag v0.13.0 at merge SHA → push tag.
2. `ssh central`, `git pull` on `/opt/central`. **No `uv sync`** (no new dep).
3. Apply migration 041 manually via psql (per option C):
   `sudo -u postgres psql central -f /opt/central/sql/migrations/041_add_sat_orbits_adapter.sql`
4. `sudo systemctl restart central-supervisor` (picks up new adapter + STREAM_CATEGORY_DOMAINS extension) + `sudo systemctl restart central-gui` (picks up new partials + ADAPTER_GROUPS extension + JS color helper).
5. **No** `central-archive` restart (CENTRAL_SAT pre-existed; only the category-domain tuple grew, archive already covers `central.sat.>`).
6. Verify: `config.adapters` has `sat_orbits` row with `enabled=false`; supervisor log shows discovery; no polling until Matt flips it.
7. Matt enables via `/adapters/sat_orbits/edit` when ready. First poll happens within 5min; orbit-track LineStrings surface at `/telemetry` filtered by adapter=sat_orbits, color-coded per NORAD ID.

## Halt acknowledgment

Per spec acceptance bar #6: **squash-merge NOT authorized**. Branch + PR open. Halting for line-by-line review.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2026-06-09 18:50:47 -06:00

598 lines
24 KiB
Python

"""Tests for the v0.11.1 satpass_predict adapter.
Deterministic via a fixed ISS TLE + fixed observer + pinned reference time.
The TLE comes from the v0.11.0 stations fixture (epoch 2026-06-08T19:17 UTC);
reference time pinned at 2026-06-09T07:00 UTC; observer is Treasure Valley
(43.6, -116.2, 0m elev). This combination produces a known ISS pass starting
at ~15:36 UTC the same day (verified via the sgp4 sanity script during Phase
A of v0.11.1).
"""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.adapter import SourceAdapter
from central.adapters.sat_common import (
eci_to_ecef as _eci_to_ecef,
gmst_rad as _gmst_rad,
subsatellite_point as _subsatellite_point,
)
from central.adapters.satpass_predict import (
Observer,
SatpassPredictAdapter,
SatpassPredictSettings,
_build_pass_geometry,
_next_passes,
_observer_ecef,
_severity_from_elev,
_topocentric_az_el,
_visibility_footprint,
)
from central.config_models import AdapterConfig
# Live TLE from the v0.11.0 stations fixture, ISS (NORAD 25544).
_ISS_L1 = "1 25544U 98067A 26159.80410962 .00007129 00000+0 13425-3 0 9999"
_ISS_L2 = "2 25544 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570453"
# Pinned observer + reference time.
_OBS = Observer(name="Treasure Valley", slug="treasure-valley",
state="ID", lat=43.6, lon=-116.2, elev_m=0.0)
_REF = datetime(2026, 6, 9, 7, 0, 0, tzinfo=timezone.utc)
@pytest.fixture
def adapter(tmp_path: Path) -> SatpassPredictAdapter:
cfg = AdapterConfig(
name="satpass_predict",
enabled=True,
cadence_s=3600,
settings={"observers": [_OBS.model_dump()],
"min_elevation_deg": 10.0, "horizon_hours": 24},
updated_at=datetime.now(timezone.utc),
)
return SatpassPredictAdapter(cfg, MagicMock(), tmp_path / "cursors.db")
# --- Pure math helpers ------------------------------------------------------
def test_gmst_rad_returns_radians_in_canonical_range():
"""GMST output must wrap into [0, 2π)."""
import math as m
val = _gmst_rad(2460835.0, 0.5) # arbitrary post-2000 JD
assert 0.0 <= val < 2.0 * m.pi
def test_observer_ecef_for_north_pole_and_equator():
"""Sanity: north pole sits on z-axis; equator at lon=0 sits on x-axis."""
pole = _observer_ecef(90.0, 0.0, 0.0)
assert abs(pole[0]) < 1e-6 and abs(pole[1]) < 1e-6
assert pole[2] > 6378.0 # ~6378.137 km
eq_zero = _observer_ecef(0.0, 0.0, 0.0)
assert eq_zero[0] > 6378.0 and abs(eq_zero[1]) < 1e-6 and abs(eq_zero[2]) < 1e-6
def test_topocentric_zenith_satellite_returns_90_elevation():
"""A satellite directly overhead must read elevation 90°, any azimuth."""
obs_lat, obs_lon = 43.6, -116.2
obs = _observer_ecef(obs_lat, obs_lon, 0.0)
# 400km straight up = scale observer position vector by (R+400)/R
import math as m
r_obs = m.sqrt(sum(c * c for c in obs))
r_sat = r_obs + 400.0
scale = r_sat / r_obs
sat_ecef = (obs[0] * scale, obs[1] * scale, obs[2] * scale)
az, el = _topocentric_az_el(sat_ecef, obs, obs_lat, obs_lon)
assert abs(el - 90.0) < 0.01, f"expected zenith elevation, got {el}"
def test_topocentric_below_horizon_returns_negative_elevation():
"""Satellite on the opposite side of the earth = below horizon."""
obs = _observer_ecef(0.0, 0.0, 0.0) # equator, prime meridian
antipode = (-obs[0] * 2.0, 0.0, 0.0) # other side, well below
_, el = _topocentric_az_el(antipode, obs, 0.0, 0.0)
assert el < -10.0
# --- Severity bucketing -----------------------------------------------------
@pytest.mark.parametrize("max_elev, expected", [
(90.0, 4), # zenith
(60.0, 4), # boundary -> 4
(59.99, 3),
(30.0, 3), # boundary -> 3
(29.99, 2),
(10.0, 2), # boundary -> 2 (gate threshold; emit)
(9.99, 1), # below gate -> 1 (should never emit in practice)
(0.0, 1),
])
def test_severity_from_elev_buckets(max_elev, expected):
assert _severity_from_elev(max_elev) == expected
# --- Pass detection (the load-bearing math test) ---------------------------
def test_iss_next_pass_over_treasure_valley_is_chronologically_sane():
"""Pinned TLE + observer + ref time produces ONE known ISS pass in 24h.
AOS < peak < LOS, max_elev in (10, 90), positive duration."""
passes = _next_passes(
_ISS_L1, _ISS_L2, _OBS,
ref_time=_REF, horizon_hours=24, min_elevation_deg=10.0,
)
assert len(passes) > 0, "expected at least one ISS pass over Boise in next 24h"
p = passes[0]
assert p["aos"] < p["peak"] <= p["los"]
assert 10.0 < p["max_elev_deg"] < 90.0
assert (p["los"] - p["aos"]).total_seconds() > 0
# And the pass must lie inside the 24h horizon (ref + 24h = 2026-06-10T07:00 UTC).
horizon_end = datetime(2026, 6, 10, 7, 0, 0, tzinfo=timezone.utc)
assert p["aos"] >= _REF
assert p["los"] <= horizon_end
def test_iss_pass_has_plausible_azimuths():
"""Azimuth at AOS and LOS should be valid 0-360° readings."""
passes = _next_passes(
_ISS_L1, _ISS_L2, _OBS,
ref_time=_REF, horizon_hours=24, min_elevation_deg=10.0,
)
p = passes[0]
assert 0.0 <= p["aos_az"] < 360.0
# los_az may be None if the pass ran to the horizon edge, but for ISS
# against the pinned ref it completes within 24h.
if p["los_az"] is not None:
assert 0.0 <= p["los_az"] < 360.0
def test_min_elevation_gate_filters_lower_passes():
"""Same TLE, raise the gate to 80° -- now zero passes (ISS at 51.6°
inclination from latitude 43.6° can't reach 80° often)."""
passes_low = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
passes_high = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 80.0)
assert len(passes_low) > 0
# No 80°+ passes today (would require near-overhead crossing).
for p in passes_high:
assert p["max_elev_deg"] >= 80.0
def test_malformed_tle_returns_empty_pass_list():
"""A garbage TLE must not crash; just yield no passes."""
passes = _next_passes("not a tle", "also not", _OBS, _REF, 24, 10.0)
assert passes == []
# --- _build_event / _pass_to_event ------------------------------------------
def _row_for_iss():
return {
"norad_id": 25544, "satellite_name": "ISS (ZARYA)",
"tle_line1": _ISS_L1, "tle_line2": _ISS_L2,
"tle_epoch": "2026-06-08T19:17:55+00:00",
}
def test_pass_event_shape(adapter):
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
assert passes
ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS)
# Identity
assert ev.adapter == "satpass_predict"
assert ev.category == "pass.satpass_predict"
# Dedup id shape: {observer_slug}:{norad_id}:{aos_iso}
assert ev.id.startswith("treasure-valley:25544:")
assert ":2026-06-" in ev.id # AOS within the same UTC day window
# Severity bucket maps from peak elevation
assert ev.severity == _severity_from_elev(passes[0]["max_elev_deg"])
# Geo: centroid at the observer point
assert ev.geo.centroid == (-116.2, 43.6)
assert ev.geo.primary_region == "US-ID"
# data fields per spec
assert ev.data["observer_name"] == "Treasure Valley"
assert ev.data["observer_slug"] == "treasure-valley"
assert ev.data["observer_state"] == "ID"
assert ev.data["norad_id"] == 25544
assert ev.data["satellite_name"] == "ISS (ZARYA)"
assert ev.data["max_elevation_deg"] == round(passes[0]["max_elev_deg"], 2)
assert ev.data["duration_s"] > 0
assert ev.data["tle_epoch"] == "2026-06-08T19:17:55+00:00"
def test_subject_for_uses_observer_state_and_slug(adapter):
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS)
assert adapter.subject_for(ev) == "central.sat.pass.us.id.treasure-valley"
def test_subject_for_falls_back_when_state_or_slug_missing(adapter):
from central.models import Event, Geo
ev = Event(
id="x", adapter="satpass_predict", category="pass.satpass_predict",
time=datetime.now(timezone.utc), severity=2, geo=Geo(), data={},
)
assert adapter.subject_for(ev) == "central.sat.pass.us.unknown.unknown"
# --- poll() integration with mocked pool ------------------------------------
def _mock_pool_returning(rows):
"""Build a MagicMock pool that yields ``rows`` from any SELECT."""
pool = MagicMock()
conn = MagicMock()
conn.fetch = AsyncMock(return_value=rows)
pool.acquire.return_value.__aenter__ = AsyncMock(return_value=conn)
pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
return pool
@pytest.mark.asyncio
async def test_poll_empty_tles_table_logs_and_yields_zero(tmp_path):
"""v0.11.1 spec: empty TLE table -> 0 events, INFO log, no exception."""
cfg = AdapterConfig(
name="satpass_predict", enabled=True, cadence_s=3600,
settings={"observers": [_OBS.model_dump()],
"min_elevation_deg": 10.0, "horizon_hours": 24},
updated_at=datetime.now(timezone.utc),
)
config_store = MagicMock()
config_store.get_pool.return_value = _mock_pool_returning([])
adapter = SatpassPredictAdapter(cfg, config_store, tmp_path / "cursors.db")
await adapter.startup()
try:
events = [e async for e in adapter.poll()]
assert events == []
finally:
await adapter.shutdown()
@pytest.mark.asyncio
async def test_poll_multi_observer_yields_per_observer_pass_list(tmp_path):
"""Two observers in settings → each observer gets its own pass list against
the same TLE. Boise (43.6, -116.2) and Salt Lake City (40.76, -111.89)
both see ISS but with slightly different AOS times -> different events."""
boise = _OBS
slc = Observer(name="Salt Lake City", slug="slc",
state="UT", lat=40.76, lon=-111.89, elev_m=0.0)
cfg = AdapterConfig(
name="satpass_predict", enabled=True, cadence_s=3600,
settings={"observers": [boise.model_dump(), slc.model_dump()],
"min_elevation_deg": 10.0, "horizon_hours": 24},
updated_at=datetime.now(timezone.utc),
)
config_store = MagicMock()
config_store.get_pool.return_value = _mock_pool_returning([_row_for_iss()])
adapter = SatpassPredictAdapter(cfg, config_store, tmp_path / "cursors.db")
await adapter.startup()
try:
events = [e async for e in adapter.poll()]
# We don't pin counts (number of passes per 24h varies with the pinned
# ref time), but each observer must have at least one event distinct
# from the other.
boise_evs = [e for e in events if e.data["observer_slug"] == "treasure-valley"]
slc_evs = [e for e in events if e.data["observer_slug"] == "slc"]
assert boise_evs, "no Boise passes"
assert slc_evs, "no Salt Lake City passes"
# Subject routing differs by state.
assert adapter.subject_for(boise_evs[0]) == "central.sat.pass.us.id.treasure-valley"
assert adapter.subject_for(slc_evs[0]) == "central.sat.pass.us.ut.slc"
finally:
await adapter.shutdown()
# --- Settings / apply_config / dedup-mixin regression ----------------------
def test_default_settings_match_spec():
s = SatpassPredictSettings()
assert s.min_elevation_deg == 10.0
assert s.horizon_hours == 24
assert len(s.observers) == 1
assert s.observers[0].slug == "treasure-valley"
def test_inherits_dedup_mixin_from_source_adapter(tmp_path):
"""v0.9.1 regression guard."""
assert issubclass(SatpassPredictAdapter, SourceAdapter)
a = SatpassPredictAdapter(
AdapterConfig(
name="satpass_predict", enabled=False, cadence_s=3600,
settings={}, updated_at=datetime.now(timezone.utc),
),
MagicMock(),
tmp_path / "cursors.db",
)
assert callable(a.is_published)
assert callable(a.mark_published)
assert callable(a.sweep_old_ids)
@pytest.mark.asyncio
async def test_apply_config_updates_observers_and_threshold(adapter):
new_obs = Observer(name="Sandpoint", slug="sandpoint",
state="ID", lat=48.27, lon=-116.55, elev_m=600.0)
new_cfg = AdapterConfig(
name="satpass_predict", enabled=True, cadence_s=3600,
settings={"observers": [new_obs.model_dump()],
"min_elevation_deg": 25.0, "horizon_hours": 12},
updated_at=datetime.now(timezone.utc),
)
await adapter.apply_config(new_cfg)
assert len(adapter._observers) == 1
assert adapter._observers[0].slug == "sandpoint"
assert adapter._min_elev == 25.0
assert adapter._horizon_h == 12.0
# --- Stream registry + family map + GUI wiring ----------------------------
def test_central_sat_family_includes_pass_token():
"""v0.11.1: pass.* categories also route to CENTRAL_SAT.
v0.12.0: position.* extends the family for sat_positions telemetry."""
from central.supervisor import STREAM_CATEGORY_DOMAINS
assert "pass" in STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"]
def test_satpass_predict_in_space_adapter_group():
from central.gui.routes import ADAPTER_GROUPS
assert "satpass_predict" in ADAPTER_GROUPS["Space"]
# --- Partials render cleanly (v0.10.0 pattern) ------------------------------
def test_summary_partial_renders_cleanly_with_real_pass(adapter):
from jinja2 import Environment, FileSystemLoader
templates_dir = Path(__file__).parent.parent / "src" / "central" / "gui" / "templates"
env = Environment(loader=FileSystemLoader(str(templates_dir)), autoescape=True)
tmpl = env.get_template("_event_summaries/satpass_predict.html")
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS)
rendered = tmpl.render(event={
"data": {"data": {"data": ev.model_dump(mode="json")["data"]}}
}).strip()
assert "ISS (ZARYA)" in rendered, f"got: {rendered!r}"
assert "max elevation" in rendered
assert "UTC" in rendered
def test_row_partial_renders_cleanly(adapter):
from jinja2 import Environment, FileSystemLoader
templates_dir = Path(__file__).parent.parent / "src" / "central" / "gui" / "templates"
env = Environment(loader=FileSystemLoader(str(templates_dir)), autoescape=True)
tmpl = env.get_template("_event_rows/satpass_predict.html")
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS)
rendered = tmpl.render(event={
"data": {"data": {"data": ev.model_dump(mode="json")["data"]}}
})
assert "<dt>Satellite</dt>" in rendered and "ISS (ZARYA)" in rendered
assert "<dt>Observer</dt>" in rendered and "Treasure Valley" in rendered
assert "<dt>AOS (rise)</dt>" in rendered
assert "<dt>Peak</dt>" in rendered
assert "<dt>LOS (set)</dt>" in rendered
assert "<dt>Duration</dt>" in rendered
# --- v0.11.2: sub-satellite point + visibility footprint + GeometryCollection
def test_subsatellite_point_at_north_pole_returns_polar_coords():
"""Sat at +z over geocentre -> lat=90, lon undefined (atan2 returns 0)."""
lon, lat, alt = _subsatellite_point((0.0, 0.0, 7000.0))
assert abs(lat - 90.0) < 1e-6
assert abs(alt - (7000.0 - 6378.137)) < 1e-6
def test_subsatellite_point_over_equator_lon_zero():
"""Sat on +x axis at altitude 400km over (lon=0, lat=0)."""
lon, lat, alt = _subsatellite_point((6378.137 + 400.0, 0.0, 0.0))
assert abs(lon - 0.0) < 1e-6
assert abs(lat - 0.0) < 1e-6
assert abs(alt - 400.0) < 1e-6
def test_subsatellite_point_over_equator_at_lon_90():
"""Sat on +y axis over (lon=90, lat=0)."""
lon, lat, alt = _subsatellite_point((0.0, 6778.137, 0.0))
assert abs(lon - 90.0) < 1e-6
assert abs(lat - 0.0) < 1e-6
def test_subsatellite_point_lon_normalised_into_180_range():
"""Sat at lon=-90 (Pacific) -> lon=-90, not 270."""
lon, _, _ = _subsatellite_point((0.0, -6778.137, 0.0))
assert -180.0 <= lon <= 180.0
assert abs(lon - (-90.0)) < 1e-6
def test_subsatellite_point_real_iss_sample_via_sgp4():
"""End-to-end against sgp4: ISS at TLE epoch -- sub-sat point should be
on a 51.6° inclination orbit (lat in [-52, 52]). Bit-deterministic."""
from sgp4.api import Satrec, jday
sat = Satrec.twoline2rv(_ISS_L1, _ISS_L2)
# Propagate at TLE epoch itself for a clean reference point.
jd, fr = jday(2026, 6, 8, 19, 17, 55.071168)
err, pos_eci, _ = sat.sgp4(jd, fr)
assert err == 0
sat_ecef = _eci_to_ecef(pos_eci, _gmst_rad(jd, fr))
lon, lat, alt = _subsatellite_point(sat_ecef)
# ISS inclination is 51.6° so sub-sat latitude must stay within ±52°.
assert -52.0 < lat < 52.0, f"ISS sub-sat lat {lat}° outside inclination envelope"
# ISS altitude is ~408 km nominally; allow generous range for SGP4 noise.
assert 350.0 < alt < 500.0, f"ISS altitude {alt}km outside expected range"
assert -180.0 <= lon <= 180.0
# --- Visibility footprint --------------------------------------------------
def test_visibility_footprint_returns_closed_32_vertex_polygon():
poly = _visibility_footprint(lon_deg=-116.2, lat_deg=43.6, alt_km=408.0)
assert poly is not None
assert poly["type"] == "Polygon"
ring = poly["coordinates"][0]
# 32 vertices + closing duplicate = 33 points in the ring.
assert len(ring) == 33
# First == last (closed polygon).
assert ring[0] == ring[-1]
def test_visibility_footprint_iss_radius_approximation():
"""ISS at 408km -> horizon ~2253km (spec says ~2200km)."""
poly = _visibility_footprint(lon_deg=0.0, lat_deg=0.0, alt_km=408.0)
ring = poly["coordinates"][0]
# At the equator with sub-sat at (0,0), the easternmost vertex is at
# bearing 90° (pure east), so its longitude equals the angular distance
# in degrees. radius_km / R_earth = angular_dist in rad; *180/pi for deg.
import math as m
r_earth = 6378.137
expected_angular_deg = m.degrees(r_earth * m.acos(r_earth / (r_earth + 408.0)) / r_earth)
# 2200km / 6378km ≈ 0.345 rad ≈ 19.76°. Expect lons in ring around ±19.76.
max_lon = max(p[0] for p in ring)
assert 18.0 < max_lon < 22.0, f"ISS east-vertex lon {max_lon}, expected ~20° (radius ~2200km)"
assert abs(max_lon - expected_angular_deg) < 0.5
def test_visibility_footprint_geo_radius_approximation():
"""GEO at 35786km -> horizon ~9000km (spec)."""
poly = _visibility_footprint(lon_deg=0.0, lat_deg=0.0, alt_km=35786.0)
ring = poly["coordinates"][0]
max_lon = max(p[0] for p in ring)
# 9000km / 6378km ≈ 1.41 rad ≈ 80.85°. Expect lons in ring spanning ±81.
assert 78.0 < max_lon < 83.0, f"GEO east-vertex lon {max_lon}, expected ~81°"
def test_visibility_footprint_none_for_decayed_altitude():
"""Negative or zero altitude -> None (orbit decayed, garbage in)."""
assert _visibility_footprint(0.0, 0.0, 0.0) is None
assert _visibility_footprint(0.0, 0.0, -100.0) is None
def test_visibility_footprint_near_antimeridian_does_not_crash():
"""Polar-orbit-style sub-sat at lon=179° -- vertices wrap across the
dateline. Documented limitation: each vertex is normalised independently
so the polygon may visually wrap the "wrong way" in Leaflet for sats
crossing ±180°. Per-vertex normalisation is the simplest approach and
Idaho-overhead passes stay well clear of this case.
"""
poly = _visibility_footprint(lon_deg=179.0, lat_deg=0.0, alt_km=400.0)
assert poly is not None
ring = poly["coordinates"][0]
for lon, lat in ring:
# Every vertex's lon stays within [-180, 180]; no NaN / Inf.
assert -180.0 <= lon <= 180.0
assert -90.0 <= lat <= 90.0
import math as m
assert m.isfinite(lon) and m.isfinite(lat)
# --- Ground track + GeometryCollection assembly --------------------------
def test_ground_track_collected_during_real_iss_pass():
"""The pinned-ref ISS pass over Treasure Valley collects multiple sub-sat
points from AOS through LOS. Track must be a non-empty list of (lon, lat)."""
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
assert passes
track = passes[0]["ground_track"]
assert isinstance(track, list)
assert len(track) >= 2 # at least AOS + LOS samples
for lon, lat in track:
assert -180.0 <= lon <= 180.0
assert -90.0 <= lat <= 90.0
def test_peak_subsat_captured_at_peak_time():
"""peak_subsat is (lon, lat, alt) of the satellite at peak elevation."""
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
p = passes[0]
assert p["peak_subsat"] is not None
lon, lat, alt = p["peak_subsat"]
assert -180.0 <= lon <= 180.0
# ISS inclination 51.6° → sub-sat lat in [-52, 52] always.
assert -52.0 < lat < 52.0
# ISS altitude ~400-450km.
assert 350.0 < alt < 500.0
def test_build_pass_geometry_returns_geometrycollection_with_both_shapes():
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
geom = _build_pass_geometry(passes[0])
assert geom is not None
assert geom["type"] == "GeometryCollection"
types = [g["type"] for g in geom["geometries"]]
assert "LineString" in types
assert "Polygon" in types
# LineString must have at least 2 vertices.
ls = next(g for g in geom["geometries"] if g["type"] == "LineString")
assert len(ls["coordinates"]) >= 2
# Polygon must be closed.
poly = next(g for g in geom["geometries"] if g["type"] == "Polygon")
ring = poly["coordinates"][0]
assert ring[0] == ring[-1]
def test_build_pass_geometry_uses_split_antimeridian_for_polar_track():
"""v0.13.0: a synthesized polar-orbit ground_track that crosses +/-180
must produce a MultiLineString inside the GeometryCollection, NOT the
"wrong-way wrap" LineString the v0.11.2 inline builder produced.
Wired into _build_pass_geometry via sat_common.split_antimeridian."""
polar_track = [
(170.0, 60.0), (175.0, 65.0), (179.0, 70.0),
(-179.0, 75.0), (-175.0, 80.0), (-170.0, 85.0),
]
geom = _build_pass_geometry({
"ground_track": polar_track,
"peak_subsat": (-179.0, 75.0, 400.0),
})
assert geom is not None
assert geom["type"] == "GeometryCollection"
types = [g["type"] for g in geom["geometries"]]
assert "MultiLineString" in types, (
f"polar track must split at antimeridian; got types={types}"
)
mls = next(g for g in geom["geometries"] if g["type"] == "MultiLineString")
assert len(mls["coordinates"]) == 2 # one crossing -> two segments
# First segment closes at +180; second starts at -180.
assert mls["coordinates"][0][-1][0] == 180.0
assert mls["coordinates"][1][0][0] == -180.0
def test_build_pass_geometry_returns_none_when_inputs_missing():
"""Defensive: pass dict with no track + no peak_subsat -> None (don't
write an empty GeometryCollection to the wire)."""
assert _build_pass_geometry({}) is None
assert _build_pass_geometry({"ground_track": [], "peak_subsat": None}) is None
def test_build_pass_geometry_polygon_only_when_track_too_short():
"""A single-sample track (only 1 vertex) is below LineString minimum;
we omit the LineString but keep the footprint Polygon."""
geom = _build_pass_geometry({
"ground_track": [(-116.2, 43.6)],
"peak_subsat": (-116.2, 43.6, 400.0),
})
assert geom is not None
types = [g["type"] for g in geom["geometries"]]
assert types == ["Polygon"]
def test_pass_event_includes_geometry_collection(adapter):
"""End-to-end: built Event has the GeometryCollection attached."""
passes = _next_passes(_ISS_L1, _ISS_L2, _OBS, _REF, 24, 10.0)
ev = adapter._pass_to_event(passes[0], _row_for_iss(), _OBS)
assert ev.geo.geometry is not None
assert ev.geo.geometry["type"] == "GeometryCollection"
# centroid stays at observer (unchanged contract).
assert ev.geo.centroid == (-116.2, 43.6)