diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index 51cf4b4..f0b43e0 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -1939,6 +1939,46 @@ at parameter `00060`, gage height (ft) at `00065`, water temperature (°C) at - **Empty-TLE behaviour:** logs INFO and yields zero events, same as `satpass_predict`. Enable `celestrak_tle` first. +### n2yo_visualpasses — server-side visible-pass alerts (v0.12.1) + +- **Source:** the `visualpasses` endpoint at `api.n2yo.com`. Requires a + free n2yo API key (configured via the GUI `/api-keys` page with alias + `n2yo`). n2yo's servers add sun-illumination and visual-magnitude data + that local SGP4 propagation alone cannot compute, which is why this + adapter exists alongside (not replacing) `satpass_predict`. +- **Stream:** `CENTRAL_SAT` (existing; no new stream). +- **Subject:** `central.sat.pass.us..` — + **intentionally identical to `satpass_predict`'s subject**. A consumer + subscribing to e.g. `central.sat.pass.us.id.boise` will receive events + from BOTH adapters. **Disambiguate via `data.category`**: filter on + `pass.n2yo_visualpasses` for this adapter, `pass.satpass_predict` for + the local SGP4 flow. JetStream's category-discriminated `Nats-Msg-Id` + (v0.10.8) keeps both adapters' dedup windows separate even when they + emit for the same (observer, satellite, AOS) tuple. +- **Dedup key shape:** `::` — same shape + as `satpass_predict` by design; the category-discriminated `Nats-Msg-Id` + is what keeps them distinct in JetStream. +- **Severity bucket** from visual magnitude (**lower mag = brighter**): + `<= -3` = 4 (very bright); `-3 .. -1` = 3 (bright, naked-eye easy); + `-1 .. 2` = 2 (faint, binoculars help); `> 2` = 1 (telescope-grade; + rarely fires since n2yo's `visualpasses` only returns sunlit passes). +- **Geo:** `centroid = (observer.lon, observer.lat)` so the GUI map plots + the alert at the observer point. +- **Event.data fields:** `observer_name`, `observer_slug`, `observer_state`, + `norad_id`, `satellite_name`, `aos_time`, `peak_time`, `los_time`, + `max_elevation_deg`, `magnitude`, `azimuth_at_aos` /`_compass`, + `azimuth_at_peak` /`_compass`, `azimuth_at_los` /`_compass`, + `duration_s`. +- **Cadence:** 1h. The adapter recomputes the 2-day visible-pass horizon + every hour. Default 6 observers × 6 sats × 24 polls/day = 864 + transactions/day, under n2yo's free-tier 1000/day quota cap. +- **Settings:** `observers`, `norad_ids`, `days_ahead = 2`, + `min_visibility_seconds = 300`, `api_key_alias = "n2yo"`. +- **Missing-key behaviour:** if no key is configured for the alias, the + adapter logs INFO and yields zero events — no exception. Operator adds + the key via GUI `/api-keys` then the adapter picks it up on the next + config-change notification. + \ --- diff --git a/sql/migrations/040_add_n2yo_visualpasses_adapter.sql b/sql/migrations/040_add_n2yo_visualpasses_adapter.sql new file mode 100644 index 0000000..5cde956 --- /dev/null +++ b/sql/migrations/040_add_n2yo_visualpasses_adapter.sql @@ -0,0 +1,45 @@ +-- Migration 040: register n2yo_visualpasses adapter (v0.12.1) +-- +-- Server-side complement to v0.11.1 satpass_predict. n2yo's visualpasses +-- endpoint adds sun illumination + visual magnitude that SGP4-from-TLE +-- alone cannot compute. Subject collision with satpass_predict on +-- central.sat.pass.us.. is intentional; consumers +-- disambiguate via data.category (pass.n2yo_visualpasses vs +-- pass.satpass_predict). v0.10.8 category-discriminated Nats-Msg-Id keeps +-- the JetStream dedup windows distinct. +-- +-- No stream changes: CENTRAL_SAT already routes via the "pass" token in +-- STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"] = ("tle", "pass", "position"). +-- +-- No api_keys seeding: Matt adds the "n2yo" alias via the GUI /api-keys +-- page (Add -> alias "n2yo" -> paste key) before enabling the adapter. +-- Missing-key behavior is graceful (log INFO + zero-yield, no exception), +-- so the row can land in config.adapters before the key does without +-- breaking anything. +-- +-- Ships disabled. Default 6 observers x 6 sats x 24 polls/day = 864 +-- transactions/day, under n2yo's free 1000/day quota cap. +-- +-- Idempotent: ON CONFLICT (name) DO NOTHING preserves operator-tuned state. + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'n2yo_visualpasses', + false, + 3600, + '{ + "observers": [ + {"name": "Filer", "slug": "filer", "state": "ID", "lat": 42.57, "lon": -114.60, "elev_m": 1200}, + {"name": "Boise", "slug": "boise", "state": "ID", "lat": 43.62, "lon": -116.20, "elev_m": 825}, + {"name": "Idaho Falls", "slug": "idaho-falls", "state": "ID", "lat": 43.49, "lon": -112.04, "elev_m": 1438}, + {"name": "Ogden", "slug": "ogden", "state": "UT", "lat": 41.22, "lon": -111.97, "elev_m": 1330}, + {"name": "Salt Lake City", "slug": "salt-lake-city", "state": "UT", "lat": 40.76, "lon": -111.89, "elev_m": 1290}, + {"name": "Provo", "slug": "provo", "state": "UT", "lat": 40.23, "lon": -111.66, "elev_m": 1387} + ], + "norad_ids": [25544, 25338, 28654, 33591, 27607, 43017], + "days_ahead": 2, + "min_visibility_seconds": 300, + "api_key_alias": "n2yo" + }'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/n2yo_visualpasses.py b/src/central/adapters/n2yo_visualpasses.py new file mode 100644 index 0000000..5057df4 --- /dev/null +++ b/src/central/adapters/n2yo_visualpasses.py @@ -0,0 +1,330 @@ +"""n2yo_visualpasses adapter -- server-side visible-pass alerts (v0.12.1). + +Complements satpass_predict (v0.11.1, SGP4-from-TLE): n2yo's API adds sun +illumination + visual magnitude, which local SGP4 propagation alone cannot +compute. Subject collision with satpass_predict on +``central.sat.pass.us..`` is intentional; consumers +disambiguate via ``data.category`` (``pass.n2yo_visualpasses`` vs +``pass.satpass_predict``). Category-discriminated Nats-Msg-Id (v0.10.8) +keeps the JetStream dedup windows distinct. + +The trailing ``/&apiKey=`` in the URL is n2yo's quirky convention, not a +typo. UTC fields in the response are Unix timestamps; ``mag`` is visual +magnitude (LOWER = BRIGHTER). +""" + +from __future__ import annotations + +import asyncio +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +_FETCH_TIMEOUT_S = 30 +_FETCH_CONCURRENCY = 4 + +_VISUALPASSES_URL = ( + "https://api.n2yo.com/rest/v1/satellite/visualpasses/" + "{norad_id}/{lat}/{lng}/{alt}/{days}/{min_vis_s}/&apiKey={key}" +) + +_DEDUP_DDL = ( + "CREATE TABLE IF NOT EXISTS published_ids (" + "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " + "first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" +) + + +def _severity_from_magnitude(mag: float) -> int: + """Visual-magnitude buckets. Lower = brighter. + <=-3 -> 4 (very bright); -3..-1 -> 3 (naked-eye); -1..2 -> 2 (binoculars); + >2 -> 1 (telescope-grade; rarely fires for sunlit passes).""" + if mag <= -3.0: + return 4 + if mag <= -1.0: + return 3 + if mag <= 2.0: + return 2 + return 1 + + +class Observer(BaseModel): + """Fixed observer location for n2yo's pre-computed pass queries.""" + + name: str + slug: str + state: str + lat: float + lon: float + elev_m: float = 0.0 + + +class N2yoVisualpassesSettings(BaseModel): + """Default 6 observers x 6 sats x 24 polls/day = 864 transactions/day, + under n2yo's free 1000/day cap. Operator can extend either list if + they upgrade quota. api_key_alias defaults to "n2yo".""" + + observers: list[Observer] = [ + Observer(name="Filer", slug="filer", state="ID", + lat=42.57, lon=-114.60, elev_m=1200.0), + Observer(name="Boise", slug="boise", state="ID", + lat=43.62, lon=-116.20, elev_m=825.0), + Observer(name="Idaho Falls", slug="idaho-falls", state="ID", + lat=43.49, lon=-112.04, elev_m=1438.0), + Observer(name="Ogden", slug="ogden", state="UT", + lat=41.22, lon=-111.97, elev_m=1330.0), + Observer(name="Salt Lake City", slug="salt-lake-city", state="UT", + lat=40.76, lon=-111.89, elev_m=1290.0), + Observer(name="Provo", slug="provo", state="UT", + lat=40.23, lon=-111.66, elev_m=1387.0), + ] + norad_ids: list[int] = [25544, 25338, 28654, 33591, 27607, 43017] + days_ahead: int = 2 + min_visibility_seconds: int = 300 + api_key_alias: str = "n2yo" + + +class N2yoVisualpassesAdapter(SourceAdapter): + """Server-side visible-pass alerts via n2yo's visualpasses endpoint.""" + + name = "n2yo_visualpasses" + display_name = "n2yo Visible Passes" + description = ( + "Pre-computed visible-pass alerts from n2yo.com -- sun illumination " + "and visual magnitude are server-side data that complement " + "satpass_predict's local SGP4 propagation. Requires a free n2yo API " + "key (configured via /api-keys). One Event per (observer, satellite, " + "AOS) tuple within a 2-day horizon, severity bucketed by visual " + "magnitude." + ) + settings_schema = N2yoVisualpassesSettings + requires_api_key = "n2yo" + api_key_field = "api_key_alias" + wizard_order = None # Ships disabled; operator enables after adding key + default_cadence_s = 3600 # 1h + data_class = "event" + enrichment_locations = [] + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._db: sqlite3.Connection | None = None + self._session: aiohttp.ClientSession | None = None + self._api_key: str | None = None + self._apply_settings(config.settings or {}) + + def _apply_settings(self, settings: dict[str, Any]) -> None: + raw_obs = settings.get("observers") or [] + self._observers: list[Observer] = [ + o if isinstance(o, Observer) else Observer(**o) for o in raw_obs + ] + self._norad_ids: list[int] = [int(n) for n in (settings.get("norad_ids") or [])] + self._days_ahead: int = int(settings.get("days_ahead") or 2) + self._min_vis_s: int = int(settings.get("min_visibility_seconds") or 300) + self._api_key_alias: str = settings.get("api_key_alias") or "n2yo" + + def _redact(self, text: str) -> str: + """Strip the live key from log strings before they hit journald.""" + return text.replace(self._api_key, "") if self._api_key else text + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S), + headers={"User-Agent": "Central/0.12 (+n2yo_visualpasses)"}, + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(_DEDUP_DDL) + self._db.execute( + "CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)" + ) + self._db.commit() + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info( + "n2yo_visualpasses adapter started", + extra={ + "observers": [o.slug for o in self._observers], + "norad_ids": self._norad_ids, + "days_ahead": self._days_ahead, + "min_visibility_seconds": self._min_vis_s, + "api_key_present": bool(self._api_key), + }, + ) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + self._apply_settings(new_config.settings or {}) + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info( + "n2yo_visualpasses config updated", + extra={ + "observers": [o.slug for o in self._observers], + "norad_ids": self._norad_ids, + "api_key_present": bool(self._api_key), + }, + ) + + async def _fetch_passes(self, observer: Observer, norad_id: int) -> dict[str, Any] | None: + """One n2yo API call. Returns parsed JSON or None on failure (live key + scrubbed from log; caller skips this pair, one failure must not kill the poll).""" + assert self._session is not None + url = _VISUALPASSES_URL.format( + norad_id=norad_id, + lat=observer.lat, lng=observer.lon, alt=observer.elev_m, + days=self._days_ahead, min_vis_s=self._min_vis_s, + key=self._api_key, + ) + try: + async with self._session.get(url) as resp: + if resp.status != 200: + logger.warning( + "n2yo_visualpasses HTTP non-200", + extra={"observer": observer.slug, "norad_id": norad_id, + "status": resp.status}, + ) + return None + return await resp.json() + except (aiohttp.ClientError, TimeoutError) as exc: + logger.warning( + "n2yo_visualpasses fetch failed", + extra={"observer": observer.slug, "norad_id": norad_id, + "error": self._redact(str(exc))}, + ) + return None + + def _pass_to_event( + self, p: dict[str, Any], info: dict[str, Any], observer: Observer, + ) -> Event: + # All UTC fields from n2yo are Unix timestamps. + aos = datetime.fromtimestamp(p["startUTC"], tz=timezone.utc) + peak = datetime.fromtimestamp(p["maxUTC"], tz=timezone.utc) + los = datetime.fromtimestamp(p["endUTC"], tz=timezone.utc) + mag = float(p["mag"]) + return Event( + id=f"{observer.slug}:{info['satid']}:{aos.isoformat()}", + adapter=self.name, + category="pass.n2yo_visualpasses", + time=peak, + severity=_severity_from_magnitude(mag), + geo=Geo( + centroid=(observer.lon, observer.lat), + regions=[f"US-{observer.state}"], + primary_region=f"US-{observer.state}", + ), + data={ + "observer_name": observer.name, + "observer_slug": observer.slug, + "observer_state": observer.state, + "norad_id": int(info["satid"]), + "satellite_name": info["satname"], + "aos_time": aos.isoformat(), + "peak_time": peak.isoformat(), + "los_time": los.isoformat(), + "max_elevation_deg": round(float(p["maxEl"]), 2), + "magnitude": round(mag, 2), + "azimuth_at_aos": round(float(p["startAz"]), 1), + "azimuth_at_aos_compass": p.get("startAzCompass"), + "azimuth_at_peak": round(float(p["maxAz"]), 1), + "azimuth_at_peak_compass": p.get("maxAzCompass"), + "azimuth_at_los": round(float(p["endAz"]), 1), + "azimuth_at_los_compass": p.get("endAzCompass"), + "duration_s": int(p.get("duration") or 0), + }, + ) + + async def poll(self) -> AsyncIterator[Event]: + if not self._session: + raise RuntimeError("Session not initialized") + if not self._api_key: + logger.info( + "n2yo_visualpasses: no API key for alias; skipping poll", + extra={"alias": self._api_key_alias}, + ) + return + if not self._observers or not self._norad_ids: + logger.info( + "n2yo_visualpasses: empty observers or norad_ids; nothing to poll", + extra={"observers": len(self._observers), + "norad_ids": len(self._norad_ids)}, + ) + return + + sem = asyncio.Semaphore(_FETCH_CONCURRENCY) + + async def _one(obs: Observer, nid: int) -> tuple[ + Observer, dict[str, Any] | None, + ]: + async with sem: + return obs, await self._fetch_passes(obs, nid) + + tasks = [ + _one(obs, nid) for obs in self._observers for nid in self._norad_ids + ] + results = await asyncio.gather(*tasks) + + yielded = 0 + transactions_total = 0 + passes_total = 0 + failures = 0 + for obs, payload in results: + if payload is None: + failures += 1 + continue + info = payload.get("info") or {} + passes = payload.get("passes") or [] + transactions_total += int(info.get("transactionscount") or 0) + passes_total += len(passes) + for p in passes: + try: + yield self._pass_to_event(p, info, obs) + yielded += 1 + except Exception: + logger.exception( + "n2yo_visualpasses event-build failed", + extra={"observer": obs.slug, + "norad_id": info.get("satid")}, + ) + + self.sweep_old_ids() + logger.info( + "n2yo_visualpasses poll completed", + extra={ + "observers": [o.slug for o in self._observers], + "norad_ids": self._norad_ids, + "transactions_used_this_call": transactions_total, + "passes_returned": passes_total, + "events_yielded": yielded, + "fetch_failures": failures, + }, + ) + + def subject_for(self, event: Event) -> str: + state = (event.data.get("observer_state") or "").lower() or "unknown" + slug = event.data.get("observer_slug") or "unknown" + return f"central.sat.pass.us.{state}.{slug}" diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 02789ba..dca1901 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2975,7 +2975,7 @@ DEFAULT_TIME = "last_24h" ADAPTER_GROUPS = { "Disasters": ["gdacs", "firms", "inciweb", "wfigs_incidents", "wfigs_perimeters"], "Weather": ["nws"], - "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons", "celestrak_tle", "satpass_predict", "sat_positions"], + "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons", "celestrak_tle", "satpass_predict", "sat_positions", "n2yo_visualpasses"], "Geophysical": ["usgs_quake", "nwis"], "Earth Observation": ["eonet"], "Transportation": ["wzdx", "tomtom_flow", "tomtom_incidents", "itd_511", "itd_511_cameras"], diff --git a/src/central/gui/templates/_event_rows/n2yo_visualpasses.html b/src/central/gui/templates/_event_rows/n2yo_visualpasses.html new file mode 100644 index 0000000..12fa21b --- /dev/null +++ b/src/central/gui/templates/_event_rows/n2yo_visualpasses.html @@ -0,0 +1,9 @@ +{# n2yo_visualpasses pre-computed visible-pass alert. Fields from payload->data->data. #} +{% set d = (event.data.get('data') or {}).get('data') or {} %} +{% if d.get('satellite_name') is not none %}
Satellite
{{ d.satellite_name }} (NORAD {{ d.norad_id }})
{% endif %} +{% if d.get('observer_name') is not none %}
Observer
{{ d.observer_name }}{% if d.get('observer_state') %} ({{ d.observer_state }}){% endif %}
{% endif %} +{% if d.get('aos_time') is not none %}
AOS (rise)
{{ d.aos_time }}{% if d.get('azimuth_at_aos') is not none %} — azimuth {{ "%.0f"|format(d.azimuth_at_aos) }}°{% if d.get('azimuth_at_aos_compass') %} ({{ d.azimuth_at_aos_compass }}){% endif %}{% endif %}
{% endif %} +{% if d.get('peak_time') is not none %}
Peak
{{ d.peak_time }}{% if d.get('max_elevation_deg') is not none %} — max elevation {{ "%.0f"|format(d.max_elevation_deg) }}°{% endif %}{% if d.get('azimuth_at_peak') is not none %} at {{ "%.0f"|format(d.azimuth_at_peak) }}°{% if d.get('azimuth_at_peak_compass') %} ({{ d.azimuth_at_peak_compass }}){% endif %}{% endif %}
{% endif %} +{% if d.get('los_time') is not none %}
LOS (set)
{{ d.los_time }}{% if d.get('azimuth_at_los') is not none %} — azimuth {{ "%.0f"|format(d.azimuth_at_los) }}°{% if d.get('azimuth_at_los_compass') %} ({{ d.azimuth_at_los_compass }}){% endif %}{% endif %}
{% endif %} +{% if d.get('magnitude') is not none %}
Brightness
magnitude {{ "%.1f"|format(d.magnitude) }} (lower = brighter)
{% endif %} +{% if d.get('duration_s') is not none %}
Duration
{{ "%.0f"|format(d.duration_s) }} sec
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/n2yo_visualpasses.html b/src/central/gui/templates/_event_summaries/n2yo_visualpasses.html new file mode 100644 index 0000000..cb44b69 --- /dev/null +++ b/src/central/gui/templates/_event_summaries/n2yo_visualpasses.html @@ -0,0 +1,4 @@ +{%- set d = (event.data.get('data') or {}).get('data') or {} -%} +{%- if d.get('satellite_name') and d.get('peak_time') and d.get('magnitude') is not none and d.get('max_elevation_deg') is not none -%} +{{ d.satellite_name }} visible pass at {{ d.peak_time[11:16] }} UTC — mag {{ "%.1f"|format(d.magnitude) }}, peak {{ "%.0f"|format(d.max_elevation_deg) }}° +{%- endif -%} diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py index 951e27e..b535bbb 100644 --- a/tests/test_events_feed_frontend.py +++ b/tests/test_events_feed_frontend.py @@ -1171,6 +1171,13 @@ _SAMPLE_INNER = { "velocity_kmps": 7.66, "heading_deg": 87.3, }, + "n2yo_visualpasses": { + "satellite_name": "ISS (ZARYA)", + "norad_id": 25544, + "peak_time": "2026-06-09T21:14:00+00:00", + "magnitude": -3.4, + "max_elevation_deg": 47.0, + }, } # Exact expected subjects for the deterministic adapters. swpc_alerts is omitted @@ -1198,6 +1205,7 @@ _EXPECTED_SUBJECT = { "celestrak_tle": "TLE update: ISS (ZARYA) (NORAD 25544) — 92.9min orbit at 51.6°", "satpass_predict": "ISS (ZARYA) passes overhead at 15:39 UTC — max elevation 40°", "sat_positions": "ISS (ZARYA) at 43.6°N 116.2°W, alt 408km, 7.7km/s", + "n2yo_visualpasses": "ISS (ZARYA) visible pass at 21:14 UTC — mag -3.4, peak 47°", } diff --git a/tests/test_n2yo_visualpasses.py b/tests/test_n2yo_visualpasses.py new file mode 100644 index 0000000..9dd9437 --- /dev/null +++ b/tests/test_n2yo_visualpasses.py @@ -0,0 +1,411 @@ +"""Tests for the n2yo_visualpasses adapter (v0.12.1). + +Strictly offline: every HTTP path is mocked. The synthetic ISS-over-Filer +fixture mirrors the shape of n2yo's documented ``visualpasses`` response +(see https://www.n2yo.com/api/ -> "Visual Passes" section). Values are +plausible for an ISS pass at low magnitude and a peak around 47° elev. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.adapters.n2yo_visualpasses import ( + N2yoVisualpassesAdapter, + N2yoVisualpassesSettings, + Observer, + _severity_from_magnitude, +) +from central.config_models import AdapterConfig +from central.models import Event, Geo + + +# n2yo response fixture: one ISS pass over Filer, mag -3.4 (naked-eye easy), +# peak 47° elevation toward ESE, ~9.3 min above horizon. UTC fields are +# Unix timestamps per n2yo's convention. +_AOS_UNIX = 1781382000 # 2026-06-08T21:00:00Z (illustrative) +_PEAK_UNIX = _AOS_UNIX + 300 +_LOS_UNIX = _AOS_UNIX + 560 + + +def _iss_pass_fixture() -> dict[str, Any]: + return { + "info": { + "satid": 25544, + "satname": "ISS (ZARYA)", + "transactionscount": 1, + "passescount": 1, + }, + "passes": [{ + "startAz": 285.4, "startAzCompass": "WNW", "startEl": 0.0, + "startUTC": _AOS_UNIX, + "maxAz": 196.7, "maxAzCompass": "SSW", "maxEl": 47.0, + "maxUTC": _PEAK_UNIX, + "endAz": 113.2, "endAzCompass": "ESE", "endEl": 0.0, + "endUTC": _LOS_UNIX, + "mag": -3.4, "duration": 560, + }], + } + + +def _empty_fixture(norad_id: int = 25544) -> dict[str, Any]: + """n2yo returns passes=[] when no visible passes in the horizon.""" + return { + "info": {"satid": norad_id, "satname": "SAT", + "transactionscount": 1, "passescount": 0}, + "passes": [], + } + + +def _filer() -> Observer: + return Observer(name="Filer", slug="filer", state="ID", + lat=42.57, lon=-114.60, elev_m=1200.0) + + +def _make_adapter( + tmp_path: Path, + settings: dict[str, Any] | None = None, + api_key: str | None = "fake-test-key", +) -> N2yoVisualpassesAdapter: + """Build adapter with a mocked ConfigStore returning the supplied api_key.""" + cfg = AdapterConfig( + name="n2yo_visualpasses", + enabled=True, + cadence_s=3600, + settings=settings if settings is not None else { + "observers": [_filer().model_dump()], + "norad_ids": [25544], + "days_ahead": 2, + "min_visibility_seconds": 300, + "api_key_alias": "n2yo", + }, + updated_at=datetime.now(timezone.utc), + ) + config_store = MagicMock() + config_store.get_api_key = AsyncMock(return_value=api_key) + return N2yoVisualpassesAdapter(cfg, config_store, tmp_path / "cursors.db") + + +# --- Pure severity bucketing ---------------------------------------------- + + +class TestSeverityBucketing: + """Boundary cases per spec: mag <= -3 -> 4; <= -1 -> 3; <= 2 -> 2; else 1.""" + + def test_very_bright_iridium_flare(self): + assert _severity_from_magnitude(-4.5) == 4 + + def test_exactly_minus_three_is_bucket_4(self): + """Boundary: -3 is INCLUDED in bucket 4 (lower = brighter, more severe).""" + assert _severity_from_magnitude(-3.0) == 4 + + def test_just_above_minus_three_is_bucket_3(self): + assert _severity_from_magnitude(-2.9) == 3 + + def test_just_below_minus_three_is_bucket_4(self): + assert _severity_from_magnitude(-3.1) == 4 + + def test_naked_eye_easy_is_bucket_3(self): + assert _severity_from_magnitude(-1.5) == 3 + + def test_exactly_minus_one_is_bucket_3(self): + assert _severity_from_magnitude(-1.0) == 3 + + def test_just_above_minus_one_is_bucket_2(self): + assert _severity_from_magnitude(-0.5) == 2 + + def test_exactly_two_is_bucket_2(self): + assert _severity_from_magnitude(2.0) == 2 + + def test_above_two_is_bucket_1(self): + assert _severity_from_magnitude(2.5) == 1 + + +# --- Settings defaults pin the curated 6x6 set ------------------------------ + + +class TestSettingsDefaults: + def test_default_six_observers_in_id_and_ut(self): + s = N2yoVisualpassesSettings() + slugs = [o.slug for o in s.observers] + assert slugs == ["filer", "boise", "idaho-falls", "ogden", + "salt-lake-city", "provo"] + states = {o.state for o in s.observers} + assert states == {"ID", "UT"} + + def test_default_six_curated_norad_ids(self): + s = N2yoVisualpassesSettings() + # ISS, NOAA-15/18/19, SO-50, AO-91 + assert s.norad_ids == [25544, 25338, 28654, 33591, 27607, 43017] + + def test_quota_math_under_free_tier(self): + s = N2yoVisualpassesSettings() + polls_per_day_at_1h_cadence = 24 + daily = len(s.observers) * len(s.norad_ids) * polls_per_day_at_1h_cadence + assert daily == 864 + assert daily < 1000 # n2yo free-tier daily cap + + def test_default_api_key_alias(self): + assert N2yoVisualpassesSettings().api_key_alias == "n2yo" + + +# --- Adapter class attrs pin GUI wiring ------------------------------------- + + +class TestAdapterClassAttrs: + def test_requires_api_key_n2yo(self): + assert N2yoVisualpassesAdapter.requires_api_key == "n2yo" + + def test_api_key_field_pins_settings_field_name(self): + """Lets the GUI render an api_key_select dropdown bound to settings.""" + assert N2yoVisualpassesAdapter.api_key_field == "api_key_alias" + + def test_data_class_is_event(self): + assert N2yoVisualpassesAdapter.data_class == "event" + + def test_default_cadence_is_one_hour(self): + assert N2yoVisualpassesAdapter.default_cadence_s == 3600 + + +# --- subject_for + event-record shape --------------------------------------- + + +class TestSubjectFor: + def test_subject_matches_satpass_predict_shape(self, tmp_path): + """Subject collision with satpass_predict is intentional per v0.12.1 + design. Vendor disambiguation lives in data.category.""" + adapter = _make_adapter(tmp_path) + ev = Event( + id="x", adapter="n2yo_visualpasses", category="pass.n2yo_visualpasses", + time=datetime.now(timezone.utc), severity=2, + geo=Geo(centroid=(0.0, 0.0)), + data={"observer_state": "ID", "observer_slug": "filer"}, + ) + assert adapter.subject_for(ev) == "central.sat.pass.us.id.filer" + + def test_state_is_lowercased(self, tmp_path): + adapter = _make_adapter(tmp_path) + ev = Event( + id="x", adapter="n2yo_visualpasses", category="pass.n2yo_visualpasses", + time=datetime.now(timezone.utc), severity=2, + geo=Geo(centroid=(0.0, 0.0)), + data={"observer_state": "UT", "observer_slug": "ogden"}, + ) + assert adapter.subject_for(ev) == "central.sat.pass.us.ut.ogden" + + def test_unknown_state_falls_back(self, tmp_path): + adapter = _make_adapter(tmp_path) + ev = Event( + id="x", adapter="n2yo_visualpasses", category="pass.n2yo_visualpasses", + time=datetime.now(timezone.utc), severity=2, + geo=Geo(centroid=(0.0, 0.0)), + data={"observer_state": "", "observer_slug": ""}, + ) + assert adapter.subject_for(ev) == "central.sat.pass.us.unknown.unknown" + + +class TestPassToEvent: + def test_record_shape(self, tmp_path): + adapter = _make_adapter(tmp_path) + fix = _iss_pass_fixture() + ev = adapter._pass_to_event(fix["passes"][0], fix["info"], _filer()) + # Identity / category / severity + assert ev.adapter == "n2yo_visualpasses" + assert ev.category == "pass.n2yo_visualpasses" + assert ev.severity == 4 # mag=-3.4 -> bucket 4 + # Dedup id: :: + aos_iso = datetime.fromtimestamp(_AOS_UNIX, tz=timezone.utc).isoformat() + assert ev.id == f"filer:25544:{aos_iso}" + # event.time == peak_time + assert ev.time == datetime.fromtimestamp(_PEAK_UNIX, tz=timezone.utc) + # geo plots at observer + assert ev.geo.centroid == (-114.60, 42.57) + assert ev.geo.primary_region == "US-ID" + # Data fields populated + for k in ("observer_name", "observer_slug", "observer_state", + "norad_id", "satellite_name", "aos_time", "peak_time", + "los_time", "max_elevation_deg", "magnitude", + "azimuth_at_aos", "azimuth_at_aos_compass", + "azimuth_at_peak", "azimuth_at_peak_compass", + "azimuth_at_los", "azimuth_at_los_compass", "duration_s"): + assert k in ev.data, f"missing data key {k!r}" + # Sanity on the cherry-picked values + assert ev.data["norad_id"] == 25544 + assert ev.data["satellite_name"] == "ISS (ZARYA)" + assert ev.data["magnitude"] == -3.4 + assert ev.data["max_elevation_deg"] == 47.0 + assert ev.data["duration_s"] == 560 + assert ev.data["azimuth_at_peak_compass"] == "SSW" + + +# --- Poll loop with mocked _fetch_passes ------------------------------------ + + +class TestPollMissingKey: + @pytest.mark.asyncio + async def test_no_key_yields_zero_events_no_exception(self, tmp_path): + adapter = _make_adapter(tmp_path, api_key=None) + await adapter.startup() + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert events == [] + + +class TestPollEmptyConfig: + @pytest.mark.asyncio + async def test_no_observers_yields_zero(self, tmp_path): + adapter = _make_adapter(tmp_path, settings={ + "observers": [], "norad_ids": [25544], + "days_ahead": 2, "min_visibility_seconds": 300, + "api_key_alias": "n2yo", + }) + await adapter.startup() + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert events == [] + + @pytest.mark.asyncio + async def test_no_norad_ids_yields_zero(self, tmp_path): + adapter = _make_adapter(tmp_path, settings={ + "observers": [_filer().model_dump()], "norad_ids": [], + "days_ahead": 2, "min_visibility_seconds": 300, + "api_key_alias": "n2yo", + }) + await adapter.startup() + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert events == [] + + +class TestPollHappyPath: + @pytest.mark.asyncio + async def test_one_observer_one_sat_one_pass(self, tmp_path): + adapter = _make_adapter(tmp_path) + await adapter.startup() + with patch.object(adapter, "_fetch_passes", + new=AsyncMock(return_value=_iss_pass_fixture())): + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert len(events) == 1 + assert events[0].severity == 4 + assert events[0].data["norad_id"] == 25544 + + +class TestPollEmptyPassesArray: + @pytest.mark.asyncio + async def test_passes_empty_yields_zero_no_exception(self, tmp_path): + adapter = _make_adapter(tmp_path) + await adapter.startup() + with patch.object(adapter, "_fetch_passes", + new=AsyncMock(return_value=_empty_fixture())): + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert events == [] + + +class TestPollFetchFailureDoesNotKillPoll: + @pytest.mark.asyncio + async def test_one_fetch_returns_none_others_succeed(self, tmp_path): + """Three sats: first fetch fails (returns None), other two succeed. + Aggregate count = 2 events from the two successes.""" + adapter = _make_adapter(tmp_path, settings={ + "observers": [_filer().model_dump()], + "norad_ids": [25544, 25338, 28654], + "days_ahead": 2, "min_visibility_seconds": 300, + "api_key_alias": "n2yo", + }) + await adapter.startup() + responses = [None, _iss_pass_fixture(), _iss_pass_fixture()] + + async def _stub(_obs, _nid): + return responses.pop(0) + + with patch.object(adapter, "_fetch_passes", side_effect=_stub): + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert len(events) == 2 + + +class TestPollMultiObserverMultiSatAggregate: + @pytest.mark.asyncio + async def test_six_by_six_aggregate(self, tmp_path): + """Default 6 observers x 6 sats x 1 pass each = 36 events. + Explicit settings pass-through (the _make_adapter helper's + settings=None branch defaults to a Filer-only fixture; we want + the production schema defaults here).""" + prod_defaults = N2yoVisualpassesSettings().model_dump() + adapter = _make_adapter(tmp_path, settings=prod_defaults) + await adapter.startup() + + async def _stub(_obs, _nid): + return _iss_pass_fixture() + + with patch.object(adapter, "_fetch_passes", side_effect=_stub): + events = [ev async for ev in adapter.poll()] + await adapter.shutdown() + assert len(events) == 36 + # Sanity: all 6 observers represented + assert {e.data["observer_slug"] for e in events} == { + "filer", "boise", "idaho-falls", "ogden", + "salt-lake-city", "provo", + } + + +# --- HTTP layer (single end-to-end test through session.get) ---------------- + + +class TestHttpErrorPath: + """One HTTP-level test verifies the session.get path: HTTP 401 (invalid key) + yields None from _fetch_passes, which the poll loop handles as a failure + (skipped, doesn't kill the poll).""" + + @pytest.mark.asyncio + async def test_http_401_returns_none(self, tmp_path): + adapter = _make_adapter(tmp_path) + await adapter.startup() + + # Build a mock async context manager whose .__aenter__ returns a + # response with status=401. + resp = MagicMock() + resp.status = 401 + resp.json = AsyncMock(return_value={"error": "Invalid API key"}) + cm = MagicMock() + cm.__aenter__ = AsyncMock(return_value=resp) + cm.__aexit__ = AsyncMock(return_value=False) + assert adapter._session is not None + with patch.object(adapter._session, "get", + MagicMock(return_value=cm)): + result = await adapter._fetch_passes(_filer(), 25544) + await adapter.shutdown() + assert result is None + + +# --- Static isolation guard -------------------------------------------------- + + +class TestStaticIsolation: + def test_no_absolute_paths_in_adapter_source(self): + """Acceptance bar #4: no /home/, /tmp/, /opt/ in adapter or this test.""" + adapter_src = Path(__file__).parent.parent / "src" / "central" / \ + "adapters" / "n2yo_visualpasses.py" + text = adapter_src.read_text() + # Note: matching the *path prefixes*, not substrings inside text. + for needle in ("/home/", "/opt/", "/tmp/"): + assert needle not in text, f"hardcoded path {needle!r} in adapter" + + def test_no_hardcoded_api_key_in_adapter_source(self): + """Acceptance bar #2: no API-key constants embedded in source.""" + adapter_src = Path(__file__).parent.parent / "src" / "central" / \ + "adapters" / "n2yo_visualpasses.py" + text = adapter_src.read_text() + # Surface-pattern check: no raw 40-char alphanumeric strings that look + # like keys. n2yo keys are short and tokenized but the principle holds. + # Also: explicit guard against the literal placeholder forms we might + # have left behind. + for needle in ("apiKey=AKL", "apiKey=DEMO", "apiKey=YOUR"): + assert needle not in text, f"likely hardcoded key remnant {needle!r}"