v0.11.0: new celestrak_tle adapter + CENTRAL_SAT satellite-tracking stream (#100)

This commit is contained in:
malice 2026-06-09 00:54:19 -06:00 committed by GitHub
commit 621148ac46
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1067 additions and 4 deletions

View file

@ -135,6 +135,7 @@ Central's archive.
| `CENTRAL_TRAFFIC_FLOW` | `central.traffic_flow.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_TRAFFIC_CAMERAS` | `central.traffic_cameras.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_AVY` | `central.avy.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_SAT` | `central.sat.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_META` | `central.meta.>` | 1 | 1 GiB | — | ✓ |
Retention and storage caps are migration-seeded defaults visible in `config.streams`;
@ -1831,6 +1832,44 @@ at parameter `00060`, gage height (ft) at `00065`, water temperature (°C) at
\
---
### celestrak_tle — CelesTrak satellite TLEs (v0.11.0)
- **Source:** `https://celestrak.org/NORAD/elements/gp.php?GROUP=<group>&FORMAT=TLE`
per configured group (defaults: `stations`, `weather`, `amateur`), plus
per-CATNR endpoint for operator-pinned `extra_norad_ids`. 4h cadence
(CelesTrak refreshes ~8h).
- **Stream:** `CENTRAL_SAT` (`central.sat.>`)
- **Subject:** `central.sat.tle.<norad_id>` — one subject per satellite,
globally. No state coord (orbital state is universal). Consumers
compute passes locally with their own observer geolocation
(e.g. satellite.js + `navigator.geolocation`).
- **Dedup key shape:** `<norad_id>:<epoch_iso>` — re-fetching the same TLE
is swallowed; CelesTrak issues a new epoch every ~8h and that produces
a fresh dedup key, naturally triggering a republish.
- **Severity:** `1` (informational; no alerting).
- **Geo:** intentionally empty (`Geo()`). TLEs are global orbital state,
not a surface point — consumers propagate the orbit at observe time.
- **Event.data fields:**
| Field | Type | Notes |
|---|---|---|
| `norad_id` | int | Satellite catalog number (e.g. 25544 for ISS) |
| `satellite_name` | string | Upstream display name |
| `tle_line1`, `tle_line2` | string | Raw 69-char TLE strings; pass to satellite-js verbatim |
| `epoch` | string | ISO datetime decoded from Line 1 cols 19-32 (YYDDD.DDDDDDDD; Y2K rule 00-56 = 2000s, 57-99 = 1957-1999) |
| `classification` | string | `U` / `C` / `S` (almost always U) |
| `intl_designator` | string | International designator e.g. `1998-067A` (ISS) — but in the packed TLE form (`98067A`) |
| `source_url` | string | The exact URL the TLE was fetched from |
- **`_enriched.orbit`:** parsed straight from Line 2 columns when valid:
`inclination_deg`, `mean_motion_rev_per_day`, `eccentricity` (the
implicit-leading-0. is reconstructed). Absent if Line 2 fails to parse.
- **Group/extras dedup:** if a satellite appears in two configured groups
or in both a group and `extra_norad_ids`, it's fetched **once** (first
occurrence wins).
\
---
## 7. Fall-off / removal semantics
Central adapters fall into three buckets for handling upstream events that

View file

@ -362,7 +362,7 @@ central.<domain>.<subtype>[.<dimensions>...]
```
- `<domain>` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`,
`traffic`, `traffic_flow`, `traffic_cameras`, `avy`, `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding
`traffic`, `traffic_flow`, `traffic_cameras`, `avy`, `sat`, `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding
one). Operators MUST be able to subscribe to all of one domain with
`central.<domain>.>`.
- `<subtype>` is adapter-driven and identifies the event category within the
@ -554,6 +554,7 @@ STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_TRAFFIC_FLOW", "central.traffic_flow.>"),
StreamEntry("CENTRAL_TRAFFIC_CAMERAS", "central.traffic_cameras.>"),
StreamEntry("CENTRAL_AVY", "central.avy.>"),
StreamEntry("CENTRAL_SAT", "central.sat.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]
```

View file

@ -0,0 +1,32 @@
-- Migration 037: seed CENTRAL_SAT stream + register celestrak_tle adapter (v0.11.0)
--
-- New top-level satellite-tracking domain. The adapter publishes TLEs
-- (orbital state) for satellites in configured CelesTrak groups
-- (defaults: stations, weather, amateur) so mesh consumers can compute
-- passes locally with their own observer geolocation. v0.11.1 (followup)
-- will add the satpass_predict adapter for fixed-observer pass alerts.
--
-- Stream config: 7-day retention, 1 GiB max_bytes -- mirrors
-- CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_AVY defaults. TLE volume is
-- predictable (~75 sats × 4 polls/day = 300 events/day) so the cap is
-- generous; operator can tighten via /streams.
--
-- Adapter ships disabled (`enabled=false`) -- operator enables via GUI
-- after merge. Default settings groups = ["stations", "weather",
-- "amateur"]; extra_norad_ids empty.
--
-- Idempotent on both rows: ON CONFLICT DO NOTHING preserves any
-- operator-tuned state (e.g. settings or enabled flag changed by hand).
INSERT INTO config.streams (name, max_age_s, max_bytes)
VALUES ('CENTRAL_SAT', 604800, 1073741824)
ON CONFLICT (name) DO NOTHING;
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'celestrak_tle',
false,
14400,
'{"groups": ["stations", "weather", "amateur"], "extra_norad_ids": []}'::jsonb
)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,393 @@
"""CelesTrak TLE (Two-Line Element) adapter — orbital state for satellites.
Polls ``https://celestrak.org/NORAD/elements/gp.php`` for each configured
group (defaults: ``stations``, ``weather``, ``amateur``) plus any operator-
specified ``extra_norad_ids`` not already in a group. Each fetched group is
plain text in the 3-line TLE format (name, line1, line2). One Event per
satellite per refresh cycle.
Subjects are global (no per-state coord TLEs are universal orbital state):
central.sat.tle.<norad_id>
Consumers compute satellite passes locally with their own observer
geolocation (satellite.js / sgp4 client-side). The adapter publishes the raw
TLE strings on the wire so the pass-prediction math stays where the location
context lives -- not in Central.
Dedup key is ``{norad_id}:{epoch_iso}``: re-fetching the same TLE yields the
same key and is swallowed by the dedup mixin. CelesTrak refreshes TLEs ~8h;
the 4h default cadence gives us one update per refresh cycle without
thrashing them.
``_enriched.orbit`` is parsed directly from Line 2 columns -- no skyfield or
sgp4 dependency. The implicit leading ``0.`` on the eccentricity field
(NORAD's standard packed form) is reconstructed at parse time.
"""
from __future__ import annotations
import asyncio
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import aiohttp
from pydantic import BaseModel
from tenacity import (
after_nothing,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
_BASE_URL = "https://celestrak.org/NORAD/elements/gp.php"
_FETCH_TIMEOUT_S = 30
_DEDUP_DDL = (
"CREATE TABLE IF NOT EXISTS published_ids ("
"adapter TEXT NOT NULL, event_id TEXT NOT NULL, "
"first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
"last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
"PRIMARY KEY (adapter, event_id))"
)
# TLE Line 1 / Line 2 are fixed 69-char records. Column slice offsets used
# below are 0-indexed inclusive-start, exclusive-end (Python convention).
# Per the v0.11.0 spec these are: epoch [18:32] (14 chars YYDDD.DDDDDDDD --
# the spec said cols 19-32 which was off-by-one against the actual layout),
# inclination [8:16], eccentricity [26:33] (with implicit leading "0."),
# mean motion [52:63]. Verified against live ISS TLE 25544 at probe time.
def _group_source_url(group: str) -> str:
return f"{_BASE_URL}?GROUP={group}&FORMAT=TLE"
def _catnr_source_url(norad_id: int) -> str:
return f"{_BASE_URL}?CATNR={norad_id}&FORMAT=TLE"
def _parse_tle_groups(text: str) -> list[tuple[str, str, str]]:
"""Split a CelesTrak TLE response into ``(name, line1, line2)`` triples.
Tolerant to trailing whitespace, blank lines between records, and CRLF
line endings (CelesTrak emits LF; defensive normalisation costs nothing).
Records whose lines don't start with the expected ``1 ``/``2 `` markers
are skipped silently -- malformed segments don't sink the batch.
"""
out: list[tuple[str, str, str]] = []
# Normalise line endings; filter out blank lines.
lines = [
ln.rstrip("\r").rstrip()
for ln in text.replace("\r\n", "\n").split("\n")
if ln.strip()
]
i = 0
while i + 2 < len(lines):
name = lines[i].rstrip()
l1 = lines[i + 1]
l2 = lines[i + 2]
if l1.startswith("1 ") and l2.startswith("2 "):
out.append((name, l1, l2))
i += 3
else:
# Mis-aligned: skip one line and try to re-sync.
i += 1
return out
def _decode_epoch(line1: str) -> datetime | None:
"""Decode Line 1 cols [18:32] (YYDDD.DDDDDDDD) into a UTC datetime.
NORAD's Y2K rule: ``YY`` 0056 → 20002056, ``YY`` 5799 → 19571999.
Day-of-year is 1-indexed (``001`` = Jan 1).
"""
if not line1 or len(line1) < 32:
return None
field = line1[18:32]
try:
yy = int(field[0:2])
doy_frac = float(field[2:])
except (ValueError, IndexError):
return None
year = 2000 + yy if yy < 57 else 1900 + yy
try:
return datetime(year, 1, 1, tzinfo=timezone.utc) + timedelta(days=doy_frac - 1.0)
except (OverflowError, ValueError):
return None
def _decode_orbit(line2: str) -> dict[str, float] | None:
"""Parse the three orbit elements out of Line 2 column slots.
Returns ``{inclination_deg, mean_motion_rev_per_day, eccentricity}`` or
``None`` on any parse failure (the ``_enriched.orbit`` bundle is then
omitted from the Event, per spec).
"""
if not line2 or len(line2) < 63:
return None
try:
inclination = float(line2[8:16])
# Eccentricity is stored as 7 digits with an implicit leading "0.".
ecc_raw = line2[26:33].strip()
if not ecc_raw.isdigit():
return None
eccentricity = float("0." + ecc_raw)
mean_motion = float(line2[52:63])
except (ValueError, IndexError):
return None
return {
"inclination_deg": inclination,
"mean_motion_rev_per_day": mean_motion,
"eccentricity": eccentricity,
}
def _norad_id_from_line1(line1: str) -> int | None:
"""Extract the catalog number from Line 1 cols [2:7]."""
try:
return int(line1[2:7].strip())
except (ValueError, IndexError):
return None
class CelestrakTleSettings(BaseModel):
"""``groups``: CelesTrak group IDs to fetch in full. ``extra_norad_ids``:
operator-specified satellites outside the group rosters."""
groups: list[str] = ["stations", "weather", "amateur"]
extra_norad_ids: list[int] = []
_EXP_WAIT = wait_exponential_jitter(initial=1, max=30)
class CelestrakTleAdapter(SourceAdapter):
"""CelesTrak TLE (orbital element) publisher."""
name = "celestrak_tle"
display_name = "CelesTrak satellite TLEs"
description = (
"Orbital state (TLE / two-line elements) for satellites listed by "
"CelesTrak. Polls the configured group rosters plus any extra NORAD "
"IDs the operator pins; consumers compute passes locally."
)
settings_schema = CelestrakTleSettings
requires_api_key = None
wizard_order = None
default_cadence_s = 14400 # 4h; CelesTrak refreshes ~8h
data_class = "telemetry"
enrichment_locations = []
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self._groups: list[str] = list(
config.settings.get("groups") or ["stations", "weather", "amateur"]
)
self._extra_ids: set[int] = {
int(x) for x in (config.settings.get("extra_norad_ids") or [])
}
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S),
headers={"User-Agent": "Central/0.11 (+celestrak_tle)"},
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute(_DEDUP_DDL)
self._db.execute(
"CREATE INDEX IF NOT EXISTS published_ids_last_seen "
"ON published_ids (last_seen)"
)
self._db.commit()
logger.info(
"celestrak_tle adapter started",
extra={"groups": self._groups, "extra_norad_ids": sorted(self._extra_ids)},
)
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
async def apply_config(self, new_config: AdapterConfig) -> None:
self._groups = list(
new_config.settings.get("groups") or ["stations", "weather", "amateur"]
)
self._extra_ids = {
int(x) for x in (new_config.settings.get("extra_norad_ids") or [])
}
logger.info(
"celestrak_tle config updated",
extra={"groups": self._groups, "extra_norad_ids": sorted(self._extra_ids)},
)
@retry(
stop=stop_after_attempt(3),
wait=_EXP_WAIT,
retry=retry_if_exception_type(
(aiohttp.ClientConnectionError, asyncio.TimeoutError, TimeoutError)
),
reraise=True,
before_sleep=None,
after=after_nothing,
)
async def _fetch_url(self, url: str) -> str | None:
"""GET a CelesTrak URL; return body text on 200 else None."""
if self._session is None:
raise RuntimeError("celestrak_tle session not started")
async with self._session.get(url) as resp:
if resp.status == 200:
body = await resp.text()
if body.startswith("Invalid query"):
logger.warning(
"celestrak_tle upstream rejected request",
extra={"url": url, "body": body[:200]},
)
return None
return body
logger.warning(
"celestrak_tle upstream non-200; skipping this fetch",
extra={"url": url, "status": resp.status},
)
return None
def _build_event_record(
self,
name: str,
line1: str,
line2: str,
source_url: str,
) -> Event | None:
"""Build an Event from one (name, line1, line2) triple. None on parse fail."""
norad_id = _norad_id_from_line1(line1)
if norad_id is None:
return None
epoch = _decode_epoch(line1)
if epoch is None:
return None
epoch_iso = epoch.isoformat()
intl_designator = line1[9:17].strip()
classification = line1[7:8].strip() or "U"
orbit = _decode_orbit(line2)
data: dict[str, Any] = {
"norad_id": norad_id,
"satellite_name": name.strip(),
"tle_line1": line1,
"tle_line2": line2,
"epoch": epoch_iso,
"classification": classification,
"intl_designator": intl_designator,
"source_url": source_url,
}
if orbit is not None:
data["_enriched"] = {"orbit": orbit}
return Event(
id=f"{norad_id}:{epoch_iso}",
adapter=self.name,
category="tle.celestrak_tle",
time=epoch,
severity=1,
geo=Geo(),
data=data,
)
async def poll(self) -> AsyncIterator[Event]:
if not self._session:
raise RuntimeError("Session not initialized")
# Phase 1: fetch all groups concurrently.
group_results = await asyncio.gather(
*[self._fetch_url(_group_source_url(g)) for g in self._groups],
return_exceptions=True,
)
# records: norad_id -> (name, line1, line2, source_url)
records: dict[int, tuple[str, str, str, str]] = {}
for group, result in zip(self._groups, group_results):
if isinstance(result, BaseException) or not result:
if isinstance(result, BaseException):
logger.warning(
"celestrak_tle group fetch failed",
extra={"group": group, "error": str(result)},
)
continue
source_url = _group_source_url(group)
for name, l1, l2 in _parse_tle_groups(result):
nid = _norad_id_from_line1(l1)
if nid is None:
continue
# Duplicates across groups collapse: first group wins.
if nid not in records:
records[nid] = (name, l1, l2, source_url)
# Phase 2: fetch extras not already collected by any group.
needed_extras = [nid for nid in self._extra_ids if nid not in records]
if needed_extras:
extra_results = await asyncio.gather(
*[self._fetch_url(_catnr_source_url(nid)) for nid in needed_extras],
return_exceptions=True,
)
for nid, result in zip(needed_extras, extra_results):
if isinstance(result, BaseException) or not result:
continue
parsed = _parse_tle_groups(result)
if parsed:
name, l1, l2 = parsed[0]
records[nid] = (name, l1, l2, _catnr_source_url(nid))
# Phase 3: yield Events.
yielded = 0
for nid, (name, l1, l2, source_url) in records.items():
try:
ev = self._build_event_record(name, l1, l2, source_url)
except Exception:
logger.exception(
"celestrak_tle event build failed", extra={"norad_id": nid}
)
continue
if ev is not None:
yield ev
yielded += 1
self.sweep_old_ids()
logger.info(
"celestrak_tle poll completed",
extra={
"groups": self._groups,
"extras": sorted(self._extra_ids),
"records_collected": len(records),
"events_yielded": yielded,
},
)
def subject_for(self, event: Event) -> str:
nid = event.data.get("norad_id")
if nid is None:
return "central.sat.tle.unknown"
return f"central.sat.tle.{nid}"

View file

@ -2938,7 +2938,7 @@ DEFAULT_TIME = "last_24h"
ADAPTER_GROUPS = {
"Disasters": ["gdacs", "firms", "inciweb", "wfigs_incidents", "wfigs_perimeters"],
"Weather": ["nws"],
"Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"],
"Space": ["swpc_alerts", "swpc_kindex", "swpc_protons", "celestrak_tle"],
"Geophysical": ["usgs_quake", "nwis"],
"Earth Observation": ["eonet"],
"Transportation": ["wzdx", "tomtom_flow", "tomtom_incidents", "itd_511", "itd_511_cameras"],

View file

@ -0,0 +1,10 @@
{# CelesTrak satellite TLE. Fields from payload->data->data. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %}
{% set orb = (d.get('_enriched') or {}).get('orbit') or {} %}
{% if d.get('satellite_name') is not none %}<dt>Satellite</dt><dd>{{ d.satellite_name }}</dd>{% endif %}
{% if d.get('norad_id') is not none %}<dt>NORAD ID</dt><dd><code>{{ d.norad_id }}</code></dd>{% endif %}
{% if d.get('intl_designator') is not none %}<dt>Intl designator</dt><dd><code>{{ d.intl_designator }}</code></dd>{% endif %}
{% if d.get('epoch') is not none %}<dt>Epoch</dt><dd>{{ d.epoch }}</dd>{% endif %}
{% if orb.get('inclination_deg') is not none %}<dt>Inclination</dt><dd>{{ "%.2f"|format(orb.inclination_deg) }}°</dd>{% endif %}
{% if orb.get('mean_motion_rev_per_day') is not none %}<dt>Orbital period</dt><dd>{{ "%.1f"|format(1440.0 / orb.mean_motion_rev_per_day) }} min</dd>{% endif %}
{% if orb.get('eccentricity') is not none %}<dt>Eccentricity</dt><dd>{{ "%.6f"|format(orb.eccentricity) }}</dd>{% endif %}

View file

@ -0,0 +1,6 @@
{%- set d = (event.data.get('data') or {}).get('data') or {} -%}
{%- set orb = (d.get('_enriched') or {}).get('orbit') or {} -%}
{%- if d.get('satellite_name') -%}
TLE update: {{ d.satellite_name }} (NORAD {{ d.norad_id }})
{%- if orb.get('mean_motion_rev_per_day') and orb.get('inclination_deg') is not none %} — {{ "%.1f"|format(1440.0 / orb.mean_motion_rev_per_day) }}min orbit at {{ "%.1f"|format(orb.inclination_deg) }}°{% endif -%}
{%- endif -%}

View file

@ -33,5 +33,6 @@ STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_TRAFFIC_FLOW", "central.traffic_flow.>"),
StreamEntry("CENTRAL_TRAFFIC_CAMERAS", "central.traffic_cameras.>"),
StreamEntry("CENTRAL_AVY", "central.avy.>"),
StreamEntry("CENTRAL_SAT", "central.sat.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]

View file

@ -141,6 +141,7 @@ STREAM_CATEGORY_DOMAINS: dict[str, tuple[str, ...]] = {
"CENTRAL_TRAFFIC_FLOW": ("flow",),
"CENTRAL_TRAFFIC_CAMERAS": ("camera",),
"CENTRAL_AVY": ("avy",),
"CENTRAL_SAT": ("tle",),
}

75
tests/fixtures/celestrak_stations.tle vendored Normal file
View file

@ -0,0 +1,75 @@
ISS (ZARYA)
1 25544U 98067A 26159.80410962 .00007129 00000+0 13425-3 0 9999
2 25544 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570453
POISK
1 36086U 09060A 26159.80410962 .00007129 00000+0 13425-3 0 9997
2 36086 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570490
CSS (TIANHE)
1 48274U 21035A 26159.78589514 .00023223 00000+0 27715-3 0 9992
2 48274 41.4695 13.1170 0008543 10.7662 349.3358 15.60455863291854
ISS (NAUKA)
1 49044U 21066A 26159.80410962 .00007129 00000+0 13425-3 0 9995
2 49044 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570498
FREGAT DEB
1 49271U 11037PF 26151.14915702 .00018955 00000+0 30994-1 0 9992
2 49271 51.6227 53.0895 0914379 245.3590 104.9405 12.41880577222933
CSS (WENTIAN)
1 53239U 22085A 26159.52989194 .00024398 00000+0 29104-3 0 9993
2 53239 41.4696 14.6734 0008548 8.8388 351.2600 15.60445632288235
CSS (MENGTIAN)
1 54216U 22143A 26159.78589514 .00023223 00000+0 27715-3 0 9996
2 54216 41.4695 13.1170 0008543 10.7662 349.3358 15.60455863292342
HRC MONOBLOCK CAMERA
1 66052U 98067XR 26158.73746098 .00076529 00000+0 55774-3 0 9994
2 66052 51.6266 327.6637 0001454 182.7303 177.3688 15.72605662 36514
SZ-21 MODULE
1 66515U 25246C 26159.85148265 .00040731 00000+0 25860-3 0 9992
2 66515 41.4714 359.2138 0006700 299.9993 60.0186 15.75674541 32427
SOYUZ-MS 28
1 66664U 25275A 26159.80410962 .00007129 00000+0 13425-3 0 9998
2 66664 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570495
DUPLEX
1 66906U 98067XS 26159.73675084 .00040385 00000+0 45764-3 0 9998
2 66906 51.6281 332.1277 0004113 167.7557 192.3537 15.61972382 29335
ISS OBJECT XY
1 66912U 98067XY 26158.70071378 .00325324 50593-4 10139-2 0 9990
2 66912 51.6203 324.3260 0007317 272.0274 87.9894 15.91000080 29331
KNACKSAT-2
1 67683U 98067XZ 26159.76435122 .00049104 00000+0 61524-3 0 9991
2 67683 51.6310 336.8215 0010220 178.4517 181.6510 15.59209268 18967
CORAL
1 67684U 98067YA 26159.75867441 .00182222 00000+0 12503-2 0 9995
2 67684 51.6250 331.4984 0009704 202.0943 157.9639 15.73558477 19039
GXIBA-1
1 67685U 98067YB 26158.75031954 .00052644 00000+0 62566-3 0 9996
2 67685 51.6294 341.2956 0011540 172.2654 187.8520 15.60548421 18814
UITMSAT-2
1 67686U 98067YC 26159.73943375 .00078865 00000+0 82941-3 0 9997
2 67686 51.6284 335.1508 0007219 166.8486 193.2698 15.63581061 18984
LEOPARD
1 67687U 98067YD 26159.73360970 .00053486 00000+0 64740-3 0 9991
2 67687 51.6313 336.6235 0007704 159.0092 201.1219 15.60107442 18888
HMU-SAT2
1 67688U 98067YE 26159.78505259 .00069363 00000+0 77123-3 0 9998
2 67688 51.6311 335.4642 0007416 161.8346 198.2914 15.62207389 18903
CREW DRAGON 12
1 67796U 26031A 26159.80410962 .00007129 00000+0 13425-3 0 9996
2 67796 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570492
PROGRESS-MS 33
1 68319U 26058A 26159.80410962 .00007129 00000+0 13425-3 0 9997
2 68319 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570494
CYGNUS NG-24
1 68689U 26079A 26159.28821174 .00006674 00000+0 12624-3 0 9997
2 68689 51.6333 344.1431 0006937 146.3340 213.8090 15.49664883570410
PROGRESS-MS 34
1 68837U 26093A 26159.28821174 .00006674 00000+0 12624-3 0 9998
2 68837 51.6333 344.1431 0006937 146.3340 213.8090 15.49664883570415
TIANZHOU-10
1 69049U 26102A 26159.52989194 .00024398 00000+0 29104-3 0 9993
2 69049 41.4696 14.6734 0008548 8.8388 351.2600 15.60445632 4495
DRAGON CRS-34
1 69103U 26107A 26159.80410962 .00007129 00000+0 13425-3 0 9994
2 69103 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570496
SHENZHOU-23 (SZ-23)
1 69180U 26113A 26159.52989194 .00024398 00000+0 29104-3 0 9991
2 69180 41.4696 14.6734 0008548 8.8388 351.2600 15.60445632291882

495
tests/test_celestrak_tle.py Normal file
View file

@ -0,0 +1,495 @@
"""Tests for the v0.11.0 celestrak_tle adapter."""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from central.adapter import SourceAdapter
from central.adapters.celestrak_tle import (
CelestrakTleAdapter,
CelestrakTleSettings,
_catnr_source_url,
_decode_epoch,
_decode_orbit,
_group_source_url,
_norad_id_from_line1,
_parse_tle_groups,
)
from central.config_models import AdapterConfig
FIXTURE_PATH = Path(__file__).parent / "fixtures" / "celestrak_stations.tle"
@pytest.fixture
def stations_tle_text() -> str:
"""Live CelesTrak stations TLE response captured at probe time."""
return FIXTURE_PATH.read_text()
@pytest.fixture
def adapter(tmp_path: Path) -> CelestrakTleAdapter:
cfg = AdapterConfig(
name="celestrak_tle",
enabled=True,
cadence_s=14400,
settings={"groups": ["stations"], "extra_norad_ids": []},
updated_at=datetime.now(timezone.utc),
)
return CelestrakTleAdapter(cfg, MagicMock(), tmp_path / "cursors.db")
# --- 3-line group splitter --------------------------------------------------
def test_parse_tle_groups_real_stations_fixture(stations_tle_text):
"""The live stations fixture must split cleanly into 25 (name, l1, l2)."""
groups = _parse_tle_groups(stations_tle_text)
assert len(groups) == 25
names = {n.strip() for n, _, _ in groups}
assert "ISS (ZARYA)" in names
# Every line1 starts '1 ', every line2 starts '2 '.
for _, l1, l2 in groups:
assert l1.startswith("1 ")
assert l2.startswith("2 ")
def test_parse_tle_groups_tolerates_crlf_line_endings():
text = "FOO\r\n1 00001U 00001A 26100.5 .0 0 0 0 1\r\n2 00001 0.0 0.0 0000000 0.0 0.0 1.00000000 0\r\n"
groups = _parse_tle_groups(text)
assert len(groups) == 1
assert groups[0][0] == "FOO"
def test_parse_tle_groups_tolerates_trailing_blank_lines():
text = "FOO\n1 00001U 00001A 26100.5 .0 0 0 0 1\n2 00001 0.0 0.0 0000000 0.0 0.0 1.00000000 0\n\n\n"
groups = _parse_tle_groups(text)
assert len(groups) == 1
def test_parse_tle_groups_skips_misaligned_segments():
"""A stray line that's not 1/2-prefixed shouldn't sink the rest."""
text = (
"FOO\n"
"1 00001U 00001A 26100.5 .0 0 0 0 1\n"
"2 00001 0.0 0.0 0000000 0.0 0.0 1.00000000 0\n"
"garbage line that doesn't belong\n"
"BAR\n"
"1 00002U 00001A 26100.5 .0 0 0 0 1\n"
"2 00002 0.0 0.0 0000000 0.0 0.0 1.00000000 0\n"
)
groups = _parse_tle_groups(text)
assert {n for n, _, _ in groups} == {"FOO", "BAR"}
def test_parse_tle_groups_empty_input():
assert _parse_tle_groups("") == []
assert _parse_tle_groups("\n\n\n") == []
# --- Epoch decoding ---------------------------------------------------------
def test_decode_epoch_real_iss_tle():
line1 = "1 25544U 98067A 26159.80410962 .00007129 00000+0 13425-3 0 9999"
dt = _decode_epoch(line1)
assert dt is not None
assert dt.tzinfo == timezone.utc
assert dt.year == 2026
# day 159 of 2026 = June 8 (Jan 31 + Feb 28 + Mar 31 + Apr 30 + May 31 = 151, + 8 = 159)
assert dt.month == 6 and dt.day == 8
def test_decode_epoch_y2k_pivot_2056_vs_1957():
"""NORAD Y2K rule: YY 00-56 → 2000-2056; YY 57-99 → 1957-1999."""
# YY=56 → 2056
line1_2056 = "1 00001U 00001A 56001.50000000 .0 0 0 0 1"
dt = _decode_epoch(line1_2056)
assert dt is not None and dt.year == 2056
# YY=57 → 1957 (the dawn of orbital tracking)
line1_1957 = "1 00001U 57001A 57001.50000000 .0 0 0 0 1"
dt = _decode_epoch(line1_1957)
assert dt is not None and dt.year == 1957
# YY=99 → 1999
line1_1999 = "1 00001U 99001A 99365.00000000 .0 0 0 0 1"
dt = _decode_epoch(line1_1999)
assert dt is not None and dt.year == 1999
# YY=00 → 2000
line1_2000 = "1 00001U 00001A 00001.00000000 .0 0 0 0 1"
dt = _decode_epoch(line1_2000)
assert dt is not None and dt.year == 2000
def test_decode_epoch_doy_arithmetic():
"""Day 1.0 = Jan 1 00:00; day 1.5 = Jan 1 12:00."""
line1 = "1 00001U 00001A 26001.50000000 .0 0 0 0 1"
dt = _decode_epoch(line1)
assert dt is not None
assert dt.year == 2026 and dt.month == 1 and dt.day == 1 and dt.hour == 12
def test_decode_epoch_returns_none_on_malformed():
assert _decode_epoch("") is None
assert _decode_epoch("too short") is None
assert _decode_epoch("1 25544U 98067A xxxxx.xxxxxxxx .0 0 0 0 1") is None
# --- Orbit decoding ---------------------------------------------------------
def test_decode_orbit_real_iss_line2():
line2 = "2 25544 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570453"
orbit = _decode_orbit(line2)
assert orbit is not None
assert abs(orbit["inclination_deg"] - 51.6336) < 1e-6
# Eccentricity carries an implicit leading "0." → 0.0006923
assert abs(orbit["eccentricity"] - 0.0006923) < 1e-9
assert abs(orbit["mean_motion_rev_per_day"] - 15.49672912) < 1e-9
def test_decode_orbit_implicit_leading_zero_on_eccentricity():
"""'0006923' must parse to 0.0006923, not 6923. The leading-0. is implicit."""
line2 = "2 00001 45.0000 100.0000 1234567 0.0000 0.0000 1.00000000 0"
orbit = _decode_orbit(line2)
assert orbit is not None
assert abs(orbit["eccentricity"] - 0.1234567) < 1e-9
def test_decode_orbit_zero_eccentricity():
line2 = "2 00001 45.0000 100.0000 0000000 0.0000 0.0000 1.00000000 0"
orbit = _decode_orbit(line2)
assert orbit is not None
assert orbit["eccentricity"] == 0.0
def test_decode_orbit_returns_none_on_malformed():
assert _decode_orbit("") is None
assert _decode_orbit("too short") is None
# Garbage in inclination slot.
assert _decode_orbit(
"2 25544 XXXXXXX 341.5878 0006923 148.5365 211.6039 15.49672912570453"
) is None
# --- NORAD ID extraction ----------------------------------------------------
def test_norad_id_extraction_real_iss():
line1 = "1 25544U 98067A 26159.80410962 .00007129 00000+0 13425-3 0 9999"
assert _norad_id_from_line1(line1) == 25544
def test_norad_id_extraction_short_id_padded():
line1 = "1 00007U 00007A 26100.5 .0 0 0 0 1"
assert _norad_id_from_line1(line1) == 7
def test_norad_id_extraction_returns_none_on_malformed():
assert _norad_id_from_line1("") is None
assert _norad_id_from_line1("1 XXXXX") is None
# --- _build_event_record ----------------------------------------------------
def _real_iss_triple():
return (
"ISS (ZARYA) ",
"1 25544U 98067A 26159.80410962 .00007129 00000+0 13425-3 0 9999",
"2 25544 51.6336 341.5878 0006923 148.5365 211.6039 15.49672912570453",
)
def test_build_event_record_real_iss_shape(adapter):
name, l1, l2 = _real_iss_triple()
src = "https://celestrak.org/NORAD/elements/gp.php?GROUP=stations&FORMAT=TLE"
ev = adapter._build_event_record(name, l1, l2, src)
assert ev is not None
# Identity
assert ev.adapter == "celestrak_tle"
assert ev.category == "tle.celestrak_tle"
assert ev.severity == 1
# Dedup id = {norad_id}:{epoch_iso}
assert ev.id.startswith("25544:")
assert ":2026-06-08T" in ev.id # day 159 of 2026 = June 8
# Geo intentionally empty (TLEs are global state, no surface point)
assert ev.geo.centroid is None
assert ev.geo.bbox is None
assert ev.geo.geometry is None
# Data fields per spec
assert ev.data["norad_id"] == 25544
assert ev.data["satellite_name"] == "ISS (ZARYA)"
assert ev.data["tle_line1"] == l1
assert ev.data["tle_line2"] == l2
assert ev.data["classification"] == "U"
assert ev.data["intl_designator"] == "98067A"
assert ev.data["source_url"] == src
# _enriched.orbit bundle present
orb = ev.data["_enriched"]["orbit"]
assert abs(orb["inclination_deg"] - 51.6336) < 1e-6
assert abs(orb["mean_motion_rev_per_day"] - 15.49672912) < 1e-9
def test_build_event_record_omits_orbit_bundle_on_parse_failure(adapter):
"""Malformed Line 2 → orbit bundle absent, event still publishes."""
name = "BROKEN"
l1 = "1 00001U 00001A 26100.50000000 .0 0 0 0 1"
l2 = "2 00001 XXXXXX 341.5878 0006923 148.5365 211.6039 15.49672912570453"
ev = adapter._build_event_record(name, l1, l2, "x")
assert ev is not None
assert "_enriched" not in ev.data # parse failure → bundle absent
def test_build_event_record_returns_none_on_missing_norad_id(adapter):
l1 = "1 XXXXXX 00001A 26100.50000000 .0 0 0 0 1"
l2 = "2 00001 45.0 100.0 0000000 0.0 0.0 1.00000000 0"
assert adapter._build_event_record("X", l1, l2, "x") is None
# --- subject_for ------------------------------------------------------------
def test_subject_for_uses_norad_id(adapter):
ev = adapter._build_event_record(*_real_iss_triple(), "x")
assert adapter.subject_for(ev) == "central.sat.tle.25544"
def test_subject_for_falls_back_when_norad_id_missing(adapter):
"""Defensive: if the data dict somehow lacks norad_id, route to .unknown."""
from central.models import Event, Geo
ev = Event(
id="x", adapter="celestrak_tle", category="tle.celestrak_tle",
time=datetime.now(timezone.utc), severity=1, geo=Geo(), data={},
)
assert adapter.subject_for(ev) == "central.sat.tle.unknown"
# --- Source-URL builders ----------------------------------------------------
def test_source_url_builders():
assert _group_source_url("stations") == (
"https://celestrak.org/NORAD/elements/gp.php?GROUP=stations&FORMAT=TLE"
)
assert _catnr_source_url(25544) == (
"https://celestrak.org/NORAD/elements/gp.php?CATNR=25544&FORMAT=TLE"
)
# --- Settings + adapter scaffolding ----------------------------------------
def test_default_settings_match_spec():
s = CelestrakTleSettings()
assert s.groups == ["stations", "weather", "amateur"]
assert s.extra_norad_ids == []
def test_inherits_dedup_mixin_from_source_adapter(tmp_path):
"""v0.9.1 regression guard: every adapter must inherit the dedup mixin
so the supervisor's is_published() / mark_published() calls dispatch."""
assert issubclass(CelestrakTleAdapter, SourceAdapter)
a = CelestrakTleAdapter(
AdapterConfig(
name="celestrak_tle", enabled=False, cadence_s=14400,
settings={}, updated_at=datetime.now(timezone.utc),
),
MagicMock(),
tmp_path / "cursors.db", # tmp_path: pytest's per-test isolation
)
# Inherited methods exposed on the instance.
assert callable(a.is_published)
assert callable(a.mark_published)
assert callable(a.sweep_old_ids)
@pytest.mark.asyncio
async def test_apply_config_updates_groups_and_extras(adapter):
new_cfg = AdapterConfig(
name="celestrak_tle", enabled=True, cadence_s=14400,
settings={"groups": ["amateur"], "extra_norad_ids": [25544, 33591]},
updated_at=datetime.now(timezone.utc),
)
await adapter.apply_config(new_cfg)
assert adapter._groups == ["amateur"]
assert adapter._extra_ids == {25544, 33591}
# --- Group / extras collapse + dedup logic ---------------------------------
@pytest.mark.asyncio
async def test_extra_norad_ids_collapse_with_group_membership(tmp_path, monkeypatch):
"""If an extra_norad_id is already in a configured group's roster, the
extra-fetch is SKIPPED (no second CATNR request)."""
cfg = AdapterConfig(
name="celestrak_tle", enabled=True, cadence_s=14400,
settings={"groups": ["stations"], "extra_norad_ids": [25544, 99999]},
updated_at=datetime.now(timezone.utc),
)
adapter = CelestrakTleAdapter(cfg, MagicMock(), tmp_path / "cursors.db")
await adapter.startup()
try:
# stations response contains ISS (NORAD 25544) -- so the 25544 extra
# should collapse; only NORAD 99999 needs a CATNR fetch.
stations_text = (
"ISS (ZARYA) \n"
"1 25544U 98067A 26100.50000000 .0 0 0 0 1\n"
"2 25544 51.6 0.0 0006923 0.0 0.0 15.5 0\n"
)
catnr_99999_text = (
"MYSAT \n"
"1 99999U 00001A 26100.50000000 .0 0 0 0 1\n"
"2 99999 45.0 0.0 0006923 0.0 0.0 14.0 0\n"
)
fetch_urls: list[str] = []
async def _fake_fetch(url):
fetch_urls.append(url)
if "GROUP=stations" in url:
return stations_text
if "CATNR=99999" in url:
return catnr_99999_text
return None
monkeypatch.setattr(adapter, "_fetch_url", _fake_fetch)
events = [e async for e in adapter.poll()]
# ONE group fetch + ONE catnr fetch (for 99999), NOT two catnr fetches.
assert any("GROUP=stations" in u for u in fetch_urls)
assert any("CATNR=99999" in u for u in fetch_urls)
assert not any("CATNR=25544" in u for u in fetch_urls), (
"25544 was in the stations group -- CATNR fetch should have been "
"skipped per the dedup-collapse rule"
)
norad_ids = {e.data["norad_id"] for e in events}
assert norad_ids == {25544, 99999}
finally:
await adapter.shutdown()
@pytest.mark.asyncio
async def test_duplicate_across_groups_yields_once(tmp_path, monkeypatch):
"""If a satellite appears in two configured groups, the adapter yields
only ONE Event for it (first-group-wins by configured order)."""
cfg = AdapterConfig(
name="celestrak_tle", enabled=True, cadence_s=14400,
settings={"groups": ["stations", "amateur"], "extra_norad_ids": []},
updated_at=datetime.now(timezone.utc),
)
adapter = CelestrakTleAdapter(cfg, MagicMock(), tmp_path / "cursors.db")
await adapter.startup()
try:
# Both groups return the same NORAD 25544 record.
same_sat = (
"ISS (ZARYA) \n"
"1 25544U 98067A 26100.50000000 .0 0 0 0 1\n"
"2 25544 51.6 0.0 0006923 0.0 0.0 15.5 0\n"
)
async def _fake_fetch(url):
return same_sat
monkeypatch.setattr(adapter, "_fetch_url", _fake_fetch)
events = [e async for e in adapter.poll()]
assert len(events) == 1
assert events[0].data["norad_id"] == 25544
finally:
await adapter.shutdown()
@pytest.mark.asyncio
async def test_poll_yields_all_stations_from_real_fixture(tmp_path, monkeypatch, stations_tle_text):
"""End-to-end with the live fixture: 25 stations sats → 25 events."""
cfg = AdapterConfig(
name="celestrak_tle", enabled=True, cadence_s=14400,
settings={"groups": ["stations"], "extra_norad_ids": []},
updated_at=datetime.now(timezone.utc),
)
adapter = CelestrakTleAdapter(cfg, MagicMock(), tmp_path / "cursors.db")
await adapter.startup()
try:
async def _fake_fetch(url):
return stations_tle_text
monkeypatch.setattr(adapter, "_fetch_url", _fake_fetch)
events = [e async for e in adapter.poll()]
assert len(events) == 25
# ISS (NORAD 25544) must be among them, with severity 1 and empty geo.
iss = [e for e in events if e.data["norad_id"] == 25544]
assert len(iss) == 1
assert iss[0].severity == 1
assert iss[0].geo.centroid is None
assert iss[0].category == "tle.celestrak_tle"
finally:
await adapter.shutdown()
# --- Stream registry + family mapping (the v0.11.0 wiring) -----------------
def test_central_sat_registered_in_streams():
from central.streams import STREAMS
sat = [s for s in STREAMS if s.name == "CENTRAL_SAT"]
assert len(sat) == 1
assert sat[0].subject_filter == "central.sat.>"
assert sat[0].event_bearing is True
def test_central_sat_in_supervisor_family_map():
from central.supervisor import STREAM_CATEGORY_DOMAINS
assert STREAM_CATEGORY_DOMAINS["CENTRAL_SAT"] == ("tle",)
def test_celestrak_tle_in_space_adapter_group():
"""ADAPTER_GROUPS['Space'] must include 'celestrak_tle' alongside SWPC."""
from central.gui.routes import ADAPTER_GROUPS
assert "celestrak_tle" in ADAPTER_GROUPS["Space"]
# --- Partials render cleanly with fixture (v0.10.0 pattern) -----------------
def test_summary_partial_renders_cleanly_with_real_data(adapter, tmp_path):
"""Render the _event_summaries/celestrak_tle.html partial against a real
ISS Event payload; verify the rendered string contains the satellite
name, NORAD id, orbit period (min), and inclination -- per the spec.
"""
from jinja2 import Environment, FileSystemLoader
templates_dir = Path(__file__).parent.parent / "src" / "central" / "gui" / "templates"
env = Environment(loader=FileSystemLoader(str(templates_dir)), autoescape=True)
tmpl = env.get_template("_event_summaries/celestrak_tle.html")
ev = adapter._build_event_record(*_real_iss_triple(), "x")
rendered = tmpl.render(event={
"data": {"data": {"data": ev.model_dump(mode="json")["data"]}}
}).strip()
assert "ISS (ZARYA)" in rendered, f"got: {rendered!r}"
assert "25544" in rendered
assert "min orbit at" in rendered # period rendering
assert "51.6°" in rendered # inclination
def test_row_partial_renders_cleanly_with_real_data(adapter):
"""Render the _event_rows/celestrak_tle.html partial; verify the four
labeled rows the spec calls for: Satellite, NORAD ID, Epoch, Inclination,
Orbital period."""
from jinja2 import Environment, FileSystemLoader
templates_dir = Path(__file__).parent.parent / "src" / "central" / "gui" / "templates"
env = Environment(loader=FileSystemLoader(str(templates_dir)), autoescape=True)
tmpl = env.get_template("_event_rows/celestrak_tle.html")
ev = adapter._build_event_record(*_real_iss_triple(), "x")
rendered = tmpl.render(event={
"data": {"data": {"data": ev.model_dump(mode="json")["data"]}}
})
assert "<dt>Satellite</dt>" in rendered and "ISS (ZARYA)" in rendered
assert "<dt>NORAD ID</dt>" in rendered and "25544" in rendered
assert "<dt>Epoch</dt>" in rendered
assert "<dt>Inclination</dt>" in rendered and "51.63°" in rendered
assert "<dt>Orbital period</dt>" in rendered and "min" in rendered

View file

@ -1148,6 +1148,14 @@ _SAMPLE_INNER = {
"itd_511": {"event_type_short": "work_zone", "roadway_name": "I-84"},
"itd_511_cameras": {"location": "I-84 Mountain Home", "camera_id": 42},
"avalanche_org": {"zone_name": "Banner Summit", "danger_name": "Considerable"},
"celestrak_tle": {
"norad_id": 25544,
"satellite_name": "ISS (ZARYA)",
"_enriched": {"orbit": {
"inclination_deg": 51.6336,
"mean_motion_rev_per_day": 15.49672912,
}},
},
}
# Exact expected subjects for the deterministic adapters. swpc_alerts is omitted
@ -1172,6 +1180,7 @@ _EXPECTED_SUBJECT = {
"itd_511": "Road work on I-84",
"itd_511_cameras": "Camera: I-84 Mountain Home",
"avalanche_org": "Avalanche advisory — Banner Summit (Considerable)",
"celestrak_tle": "TLE update: ISS (ZARYA) (NORAD 25544) — 92.9min orbit at 51.6°",
}

View file

@ -11,7 +11,8 @@ from central.adapter_discovery import discover_adapters
from central.gui import routes
# Adapters with data_class="telemetry" (the pinned split; grow as telemetry adapters land).
_TELEMETRY = ["itd_511_cameras", "nwis", "tomtom_flow"]
# v0.11.0 added celestrak_tle (orbital state -- continuous-ish refresh, telemetry-class).
_TELEMETRY = ["celestrak_tle", "itd_511_cameras", "nwis", "tomtom_flow"]
# --- data_class defaults / registry split -----------------------------------
@ -20,7 +21,7 @@ def test_base_default_is_event():
assert SourceAdapter.data_class == "event"
def test_registry_split_11_event_1_telemetry():
def test_registry_split_event_vs_telemetry():
reg = discover_adapters()
by_class = {}
for name, cls in reg.items():