Compare commits

...

5 commits

Author SHA1 Message Date
40b6342bb7
Merge feature/2-e5-stream-registry: single-source-of-truth stream registry
feat(2-E.5): single-source-of-truth stream registry
2026-05-19 01:42:36 -06:00
zvx-echo6
456a744bb4 feat(2-E.5): single-source-of-truth stream registry
Eliminates the duplication that has been hand-bumped through PRs B, C, D, E.
Adding a stream is now one StreamEntry in src/central/streams.py + one
migration row in config.streams. supervisor STREAM_SUBJECTS / archive
STREAMS / gui DASHBOARD_STREAMS all derive at import time. No drift
possible because there is one source.

Pure refactor; no behavior change. Runtime verified: derived structures
are byte-equivalent to the previous literal definitions.

src/central/streams.py (new):
  @dataclass(frozen=True)
  class StreamEntry:
      name: str
      subject_filter: str
      event_bearing: bool = True   # archive consumes from this stream
      dashboard: bool = True       # GUI dashboard surfaces this stream

  STREAMS: list[StreamEntry] = [
      StreamEntry("CENTRAL_WX",       "central.wx.>"),
      StreamEntry("CENTRAL_FIRE",     "central.fire.>"),
      StreamEntry("CENTRAL_QUAKE",    "central.quake.>"),
      StreamEntry("CENTRAL_SPACE",    "central.space.>"),
      StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
      StreamEntry("CENTRAL_META",     "central.meta.>", event_bearing=False),
  ]

Consumers derive:
  supervisor.STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAMS}
    (includes META: supervisor must create every stream in JetStream)
  archive.STREAMS = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing]
    (excludes META: status messages, not events)
  gui.DASHBOARD_STREAMS = [s.name for s in STREAMS if s.dashboard]

To resolve the name collision between the registry STREAMS and the
existing archive.STREAMS public symbol, archive.py imports the registry
under an alias: from central.streams import STREAMS as STREAM_REGISTRY.
The archives STREAMS surface (the tuple-list) is unchanged for callers.
Same alias used in supervisor.py and gui/routes.py for symmetry.

Migration files unchanged. config.streams keeps seeding retention/bytes --
operator-tunable ops state, separate SoT from the structural mapping.

Tests:
  Dropped from test_archive_multi_stream.py (7, all tautological vs. registry):
    test_streams_list_has_five_entries (magic-number count)
    test_streams_contains_central_wx / fire / quake / space / disaster
    test_streams_excludes_central_meta
  Dropped from test_dashboard.py:
    `assert len(streams) == 6` line inside test_single_stream_failure_doesnt_crash_card
    (the test itself stays; only the magic-number assertion is removed)
  Added in test_stream_registry.py (8 invariant tests):
    test_stream_names_unique
    test_subject_filters_unique
    test_subject_filter_central_prefix_wildcard
    test_meta_is_only_non_event_bearing
    test_supervisor_stream_subjects_includes_meta
    test_supervisor_stream_subjects_includes_all
    test_archive_streams_excludes_non_event_bearing
    test_dashboard_streams_matches_dashboard_flag

The new tests assert properties (uniqueness, format, derivation correctness),
not literals. Future stream additions need zero new test code -- every
invariant automatically covers them.

Note: test file named tests/test_stream_registry.py (not test_streams.py)
to avoid colliding with the pre-existing tests/test_streams.py, which
covers the GUI streams-management page.

Full suite: 427 passed (was 426 on main: -7 dropped + 8 added).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 07:37:01 +00:00
f1779e3233
Merge feature/2-e-gdacs: GDACS disaster adapter
feat(2-E): GDACS disaster adapter
2026-05-19 01:10:22 -06:00
zvx-echo6
7b6f684b66 fix(2-E): use canonical removed-event subject pattern
Per handoff §9 the removed-event convention is
central.<domain>.<subtype>.removed.<geo> -- WFIGS uses
central.fire.incident.removed.<state>. GDACS tombstones were emitting
central.disaster.removed.<country> with the eventtype only in the
category (disaster.removed.wf), which would silently miss type-filtered
subscribers (e.g. central.disaster.wf.> would not see WF removals).

Fix:
  - poll() iscurrent=false branch and missing-from-feed loop both set
    category=f"disaster.{eventtype.lower()}.removed" (eventtype before
    the .removed token, matching the live-event subject hierarchy).
  - subject_for() detects parts[-1] == "removed" and emits
    central.disaster.<eventtype>.removed.<country>.

Tests updated:
  test_fall_off_iscurrent_false now asserts category disaster.wf.removed
  and subject central.disaster.wf.removed.greece.
  test_fall_off_missing_from_feed adds the category assertion.
  Both tombstone-collection filters flip from startswith("disaster.removed")
  to endswith(".removed") for general-shape coverage.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 07:08:15 +00:00
zvx-echo6
52cb3c2be9 feat(2-E): GDACS disaster adapter
Adds the GDACS (Global Disaster Alert and Coordination System) adapter
against the self-describing framework. Polls https://www.gdacs.org/xml/rss.xml
every 600s, parses the RSS items, and publishes to a new CENTRAL_DISASTER
JetStream stream on central.disaster.<eventtype_lower>.<country_lower>.

Locked decisions:
- Keep: WF, DR, FL, VO, TC. Drop: EQ (USGS canonical on central.quake.>),
  plus any future-unknown eventtype.
- Filter via settings_schema event_types: list[str] so operators can
  re-allow without a code change.
- Dedup by RSS guid (format <eventtype><eventid>, stable across reissue).
- Severity from gdacs:alertlevel (Green=1, Orange=2, Red=3, default 0).
- Fall-off uses GDACS gdacs:iscurrent=false as explicit tombstone signal,
  with a fallback for items that vanish entirely from the feed. Tombstones
  publish on disaster.removed.<eventtype>.<country>.
- Geo: centroid from geo:Point, bbox from gdacs:bbox (reordered to Geo
  (minLon, minLat, maxLon, maxLat)), primary_region from gdacs:iso3.

CENTRAL_DISASTER stream: 7d retention, 1 GiB max_bytes, mirroring
CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE. Migrations 020 (adapter row,
enabled=false, default event_types in settings) and 021 (stream seed).
STREAM_SUBJECTS, archive STREAMS, GUI DASHBOARD_STREAMS each pick up
the new stream.

Tests: 14 new in tests/test_gdacs.py using frozen RSS fixtures with WF/DR/EQ/XX
items (covering normalization, EQ drop, unknown drop, settings override,
guid dedup, iscurrent=false tombstone, missing-from-feed tombstone,
helper boundaries). Stream-count assertions bumped 4->5 and 5->6 for
the new stream (anti-pattern noted; queued as a follow-up PR E.5).
+1 membership test test_streams_contains_central_disaster.
Full suite: 426 passed.

End-to-end on CT104: 48 events published on first poll (44 disaster.wf +
4 disaster.fl), zero EQ events, all subjects under central.disaster.>
with lowercase-hyphenated country suffixes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 06:58:52 +00:00
11 changed files with 990 additions and 47 deletions

View file

@ -0,0 +1,14 @@
-- Migration: 020_add_gdacs_adapter
-- Adds the GDACS adapter row to config.adapters.
-- Ships disabled; operator enables via GUI.
-- Default event_types excludes EQ (USGS is canonical for earthquakes on central.quake.>).
-- Idempotent: uses ON CONFLICT DO NOTHING.
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'gdacs',
false,
600,
jsonb_build_object('event_types', jsonb_build_array('WF', 'DR', 'FL', 'VO', 'TC'))
)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,8 @@
-- Migration: 021_add_central_disaster_stream
-- Seeds the CENTRAL_DISASTER JetStream stream row for central.disaster.> subjects.
-- 7-day retention, 1 GiB max_bytes -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE.
-- Idempotent: uses ON CONFLICT DO NOTHING.
INSERT INTO config.streams (name, max_age_s, max_bytes)
VALUES ('CENTRAL_DISASTER', 604800, 1073741824)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,478 @@
"""GDACS (Global Disaster Alert and Coordination System) adapter."""
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any
from xml.etree import ElementTree as ET
import aiohttp
from pydantic import BaseModel
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
GDACS_RSS_URL = "https://www.gdacs.org/xml/rss.xml"
NS = {
"gdacs": "http://www.gdacs.org",
"geo": "http://www.w3.org/2003/01/geo/wgs84_pos#",
"georss": "http://www.georss.org/georss",
"dc": "http://purl.org/dc/elements/1.1/",
}
_ALERTLEVEL_TO_SEVERITY = {"Green": 1, "Orange": 2, "Red": 3}
DEFAULT_EVENT_TYPES = ["WF", "DR", "FL", "VO", "TC"]
def severity_from_alertlevel(level: str | None) -> int:
"""Green=1, Orange=2, Red=3, default 0."""
if not level:
return 0
return _ALERTLEVEL_TO_SEVERITY.get(level.strip().capitalize(), 0)
def subject_for_country(country: str | None) -> str:
"""Lowercase, hyphenate spaces. 'unknown' for missing/empty. Takes first if comma-separated."""
if not country:
return "unknown"
first = country.split(",")[0].strip()
if not first:
return "unknown"
return first.lower().replace(" ", "-")
def parse_rfc822_utc(raw: str | None) -> datetime | None:
"""Parse an RFC 822 datetime string to UTC datetime."""
if not raw:
return None
try:
dt = parsedate_to_datetime(raw)
except (ValueError, TypeError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def parse_gdacs_bbox(raw: str | None) -> tuple[float, float, float, float] | None:
"""Parse GDACS bbox 'lonmin lonmax latmin latmax' to Geo.bbox (minLon, minLat, maxLon, maxLat)."""
if not raw:
return None
try:
parts = [float(p) for p in raw.split()]
except ValueError:
return None
if len(parts) != 4:
return None
lon_min, lon_max, lat_min, lat_max = parts
return (lon_min, lat_min, lon_max, lat_max)
def init_gdacs_observed_table(db: sqlite3.Connection) -> None:
db.execute("""
CREATE TABLE IF NOT EXISTS gdacs_observed (
guid TEXT PRIMARY KEY,
country TEXT,
eventtype TEXT,
last_observed_at TEXT NOT NULL
)
""")
db.commit()
def get_observed(db: sqlite3.Connection) -> dict[str, tuple[str | None, str | None]]:
cur = db.execute("SELECT guid, country, eventtype FROM gdacs_observed")
return {row[0]: (row[1], row[2]) for row in cur.fetchall()}
def mark_observed(db: sqlite3.Connection, guid: str, country: str | None, eventtype: str | None) -> None:
db.execute(
"""
INSERT INTO gdacs_observed (guid, country, eventtype, last_observed_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT (guid) DO UPDATE SET last_observed_at = CURRENT_TIMESTAMP
""",
(guid, country, eventtype),
)
db.commit()
def mark_retired(db: sqlite3.Connection, guids: set[str]) -> None:
for guid in guids:
db.execute("DELETE FROM gdacs_observed WHERE guid = ?", (guid,))
db.commit()
class GDACSSettings(BaseModel):
"""Settings schema for GDACS adapter.
event_types is the explicit allowlist of GDACS eventtype codes to publish.
EQ is intentionally absent from the default because USGS is the canonical
earthquake source for Central and quakes are already published on
central.quake.>. Operators can re-include "EQ" here if USGS is
temporarily unavailable or if the GDACS alertlevel triage signal is
operationally needed.
Future-unknown eventtypes (anything GDACS may add later) are dropped by
default opt in by adding the code to this list.
"""
event_types: list[str] = DEFAULT_EVENT_TYPES.copy()
class GDACSAdapter(SourceAdapter):
"""Global Disaster Alert and Coordination System adapter."""
name = "gdacs"
display_name = "GDACS — Global Disaster Alerts"
description = (
"Global Disaster Alert and Coordination System events: wildfires, drought, "
"flood, volcano, and tropical cyclones with humanitarian-coordination triage signals."
)
settings_schema = GDACSSettings
requires_api_key = None
api_key_field = None
wizard_order = None
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self.event_types: list[str] = list(
config.settings.get("event_types", DEFAULT_EVENT_TYPES)
)
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
init_gdacs_observed_table(self._db)
self._db.commit()
logger.info("GDACS adapter started", extra={"event_types": self.event_types})
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("GDACS adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
self.event_types = list(
new_config.settings.get("event_types", DEFAULT_EVENT_TYPES)
)
logger.info("GDACS config updated", extra={"event_types": self.event_types})
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("GDACS swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
parts = event.category.split(".")
country_subj = subject_for_country(event.data.get("country"))
if len(parts) >= 3 and parts[-1] == "removed":
eventtype = parts[1]
return f"central.disaster.{eventtype}.removed.{country_subj}"
eventtype = (event.data.get("eventtype") or "").lower() or "unknown"
return f"central.disaster.{eventtype}.{country_subj}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch(self) -> str:
if not self._session:
raise RuntimeError("Session not initialized")
async with self._session.get(
GDACS_RSS_URL, headers={"User-Agent": "Central/0.4"}
) as resp:
resp.raise_for_status()
text = await resp.text()
return text
async def poll(self) -> AsyncIterator[Event]:
if not self._db:
raise RuntimeError("Database not initialized")
try:
content = await self._fetch()
except Exception as e:
logger.error("GDACS fetch failed", extra={"error": str(e)})
raise
try:
root = ET.fromstring(content)
except ET.ParseError as e:
logger.error("GDACS RSS parse error", extra={"error": str(e)})
raise
channel = root.find("channel")
if channel is None:
logger.info("GDACS fetch completed", extra={"item_count": 0})
return
items = channel.findall("item")
logger.info("GDACS fetch completed", extra={"item_count": len(items)})
observed_before = get_observed(self._db)
current_guids: set[str] = set()
events_yielded = 0
for item_elem in items:
guid_elem = item_elem.find("guid")
if guid_elem is None or not guid_elem.text:
continue
guid = guid_elem.text.strip()
if not guid:
continue
eventtype_elem = item_elem.find("gdacs:eventtype", NS)
eventtype = (
eventtype_elem.text.strip()
if eventtype_elem is not None and eventtype_elem.text
else ""
)
if eventtype not in self.event_types:
continue
iscurrent_elem = item_elem.find("gdacs:iscurrent", NS)
iscurrent = True
if iscurrent_elem is not None and iscurrent_elem.text:
iscurrent = iscurrent_elem.text.strip().lower() == "true"
country_elem = item_elem.find("gdacs:country", NS)
country = (
country_elem.text.strip()
if country_elem is not None and country_elem.text
else None
)
iso3_elem = item_elem.find("gdacs:iso3", NS)
iso3 = (
iso3_elem.text.strip()
if iso3_elem is not None and iso3_elem.text
else None
)
alertlevel_elem = item_elem.find("gdacs:alertlevel", NS)
alertlevel = (
alertlevel_elem.text.strip()
if alertlevel_elem is not None and alertlevel_elem.text
else None
)
fromdate_elem = item_elem.find("gdacs:fromdate", NS)
event_time = parse_rfc822_utc(
fromdate_elem.text if fromdate_elem is not None else None
)
if event_time is None:
pub_date_elem = item_elem.find("pubDate")
event_time = (
parse_rfc822_utc(pub_date_elem.text if pub_date_elem is not None else None)
or datetime.now(timezone.utc)
)
bbox_elem = item_elem.find("gdacs:bbox", NS)
bbox = parse_gdacs_bbox(
bbox_elem.text if bbox_elem is not None else None
)
lat_elem = item_elem.find("geo:Point/geo:lat", NS)
lon_elem = item_elem.find("geo:Point/geo:long", NS)
centroid: tuple[float, float] | None = None
if lat_elem is not None and lon_elem is not None and lat_elem.text and lon_elem.text:
try:
centroid = (float(lon_elem.text), float(lat_elem.text))
except ValueError:
centroid = None
region = iso3 or country
regions = [region] if region else []
primary_region = region
title_elem = item_elem.find("title")
description_elem = item_elem.find("description")
link_elem = item_elem.find("link")
eventid_elem = item_elem.find("gdacs:eventid", NS)
alertscore_elem = item_elem.find("gdacs:alertscore", NS)
datemodified_elem = item_elem.find("gdacs:datemodified", NS)
geo = Geo(
centroid=centroid,
bbox=bbox,
regions=regions,
primary_region=primary_region,
)
data: dict[str, Any] = {
"guid": guid,
"eventtype": eventtype,
"eventid": eventid_elem.text.strip() if eventid_elem is not None and eventid_elem.text else None,
"country": country,
"iso3": iso3,
"alertlevel": alertlevel,
"alertscore": alertscore_elem.text.strip() if alertscore_elem is not None and alertscore_elem.text else None,
"title": title_elem.text if title_elem is not None and title_elem.text else "",
"description": description_elem.text if description_elem is not None and description_elem.text else "",
"url": link_elem.text if link_elem is not None and link_elem.text else "",
"datemodified": datemodified_elem.text if datemodified_elem is not None and datemodified_elem.text else None,
"iscurrent": iscurrent,
}
if not iscurrent:
# Explicit tombstone from upstream. Only emit if we previously observed it.
if guid in observed_before:
tombstone = Event(
id=f"{guid}:removed",
adapter=self.name,
category=f"disaster.{eventtype.lower()}.removed",
time=datetime.now(timezone.utc),
severity=0,
geo=geo,
data={**data, "reason": "iscurrent_false"},
)
if not self.is_published(tombstone.id):
yield tombstone
self.mark_published(tombstone.id)
events_yielded += 1
continue
current_guids.add(guid)
if self.is_published(guid):
mark_observed(self._db, guid, country, eventtype)
continue
event = Event(
id=guid,
adapter=self.name,
category=f"disaster.{eventtype.lower()}",
time=event_time,
severity=severity_from_alertlevel(alertlevel),
geo=geo,
data=data,
)
yield event
self.mark_published(guid)
mark_observed(self._db, guid, country, eventtype)
events_yielded += 1
# Fall-off: events present in observed_before but absent from this poll
fallen_off = set(observed_before.keys()) - current_guids
for guid in fallen_off:
prior_country, prior_eventtype = observed_before[guid]
if prior_eventtype and prior_eventtype not in self.event_types:
# Was published before settings narrowed; clean up silently.
mark_retired(self._db, {guid})
continue
tombstone_id = f"{guid}:removed"
if self.is_published(tombstone_id):
mark_retired(self._db, {guid})
continue
now = datetime.now(timezone.utc)
region = prior_country
geo = Geo(
regions=[region] if region else [],
primary_region=region,
)
tombstone = Event(
id=tombstone_id,
adapter=self.name,
category=f"disaster.{(prior_eventtype or '').lower()}.removed",
time=now,
severity=0,
geo=geo,
data={
"guid": guid,
"country": prior_country,
"eventtype": prior_eventtype,
"reason": "missing_from_feed",
},
)
yield tombstone
self.mark_published(tombstone_id)
mark_retired(self._db, {guid})
events_yielded += 1
self.sweep_old_ids()
logger.info(
"GDACS poll completed",
extra={
"events_yielded": events_yielded,
"current_observed": len(current_guids),
"fallen_off": len(fallen_off),
},
)

View file

@ -19,14 +19,11 @@ from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
from nats.js.errors import NotFoundError from nats.js.errors import NotFoundError
from central.bootstrap_config import get_settings from central.bootstrap_config import get_settings
from central.streams import STREAMS as STREAM_REGISTRY
# Event-bearing streams to consume (skip CENTRAL_META - status messages only) # Event-bearing streams to consume -- derived from the registry's event_bearing flag.
STREAMS = [ # CENTRAL_META is excluded because it carries status messages, not events.
("CENTRAL_WX", "central.wx.>"), STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearing]
("CENTRAL_FIRE", "central.fire.>"),
("CENTRAL_QUAKE", "central.quake.>"),
("CENTRAL_SPACE", "central.space.>"),
]
BATCH_SIZE = 100 BATCH_SIZE = 100
FETCH_TIMEOUT = 5.0 FETCH_TIMEOUT = 5.0

View file

@ -49,6 +49,7 @@ from functools import cache
from central.gui.db import get_pool from central.gui.db import get_pool
from central.gui.form_descriptors import describe_fields, FieldDescriptor from central.gui.form_descriptors import describe_fields, FieldDescriptor
from central.adapter_discovery import discover_adapters from central.adapter_discovery import discover_adapters
from central.streams import STREAMS as STREAM_REGISTRY
from pydantic import ValidationError from pydantic import ValidationError
@cache @cache
@ -63,8 +64,8 @@ def _adapter_classes() -> dict:
router = APIRouter() router = APIRouter()
# Streams to display on dashboard # Streams to display on dashboard -- derived from the registry's dashboard flag.
DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"] DASHBOARD_STREAMS = [s.name for s in STREAM_REGISTRY if s.dashboard]
# Email validation regex (simple but effective) # Email validation regex (simple but effective)
ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$")

32
src/central/streams.py Normal file
View file

@ -0,0 +1,32 @@
"""Stream registry — single source of truth for NATS JetStream stream definitions.
Subject-filter mappings live in code (structural; change only when code changes).
Retention / max_bytes live in config.streams (operator-tunable; ops state).
Adding a stream: one StreamEntry below + one migration row that seeds
config.streams. supervisor STREAM_SUBJECTS / archive STREAMS / gui DASHBOARD_STREAMS
all derive automatically.
"""
from dataclasses import dataclass
@dataclass(frozen=True)
class StreamEntry:
name: str
subject_filter: str
event_bearing: bool = True
"""Whether central-archive consumes events from this stream into the events table.
False for status-only streams (CENTRAL_META) that the archive intentionally skips."""
dashboard: bool = True
"""Whether the GUI dashboard surfaces this stream's stats card."""
STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_WX", "central.wx.>"),
StreamEntry("CENTRAL_FIRE", "central.fire.>"),
StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
]

View file

@ -21,16 +21,12 @@ from central.config_source import ConfigSource, DbConfigSource
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.bootstrap_config import get_settings from central.bootstrap_config import get_settings
from central.stream_manager import StreamManager from central.stream_manager import StreamManager
from central.streams import STREAMS as STREAM_REGISTRY
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
# Stream subject mappings # Stream subject mappings -- derived from the registry; every stream is included
STREAM_SUBJECTS = { # (META too: supervisor must create it in JetStream even though archive skips it).
"CENTRAL_WX": ["central.wx.>"], STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY}
"CENTRAL_META": ["central.meta.>"],
"CENTRAL_FIRE": ["central.fire.>"],
"CENTRAL_QUAKE": ["central.quake.>"],
"CENTRAL_SPACE": ["central.space.>"],
}
# Recompute interval for stream max_bytes (1 hour) # Recompute interval for stream max_bytes (1 hour)
STREAM_RECOMPUTE_INTERVAL_S = 3600 STREAM_RECOMPUTE_INTERVAL_S = 3600

View file

@ -26,35 +26,6 @@ class TestConsumerNaming:
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake" assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
class TestStreamsConfiguration:
"""Test streams configuration."""
def test_streams_list_has_four_entries(self):
"""STREAMS list has four event-bearing streams."""
assert len(STREAMS) == 4
def test_streams_contains_central_wx(self):
"""STREAMS contains CENTRAL_WX with correct filter."""
assert ("CENTRAL_WX", "central.wx.>") in STREAMS
def test_streams_contains_central_fire(self):
"""STREAMS contains CENTRAL_FIRE with correct filter."""
assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
def test_streams_contains_central_quake(self):
"""STREAMS contains CENTRAL_QUAKE with correct filter."""
assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
def test_streams_contains_central_space(self):
"""STREAMS contains CENTRAL_SPACE with correct filter."""
assert ("CENTRAL_SPACE", "central.space.>") in STREAMS
def test_streams_excludes_central_meta(self):
"""STREAMS does not contain CENTRAL_META (status messages only)."""
stream_names = [s[0] for s in STREAMS]
assert "CENTRAL_META" not in stream_names
class TestOrphanedConsumerCleanup: class TestOrphanedConsumerCleanup:
"""Test cleanup of orphaned 'archive' consumer.""" """Test cleanup of orphaned 'archive' consumer."""

View file

@ -205,7 +205,6 @@ class TestDashboardStreamsIsolation:
call_args = mock_templates.TemplateResponse.call_args call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context")) context = call_args.kwargs.get("context", call_args[1].get("context"))
streams = context["streams"] streams = context["streams"]
assert len(streams) == 5
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE") fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
assert fire_stream.get("error") == "unavailable" assert fire_stream.get("error") == "unavailable"
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX") wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")

367
tests/test_gdacs.py Normal file
View file

@ -0,0 +1,367 @@
"""Tests for GDACS adapter."""
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.config_models import AdapterConfig
from central.models import Event
# Frozen RSS fixture mirroring real GDACS shape (namespaces + element layout).
SAMPLE_RSS = """<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:geo="http://www.w3.org/2003/01/geo/wgs84_pos#" xmlns:gdacs="http://www.gdacs.org" xmlns:georss="http://www.georss.org/georss">
<channel>
<title>GDACS RSS information</title>
<link>https://www.gdacs.org/</link>
<description>Near real-time notification</description>
<pubDate>Tue, 19 May 2026 06:35:01 GMT</pubDate>
<item>
<title>Green wildfire in Greece 18/05/2026 11:00 UTC</title>
<description>Wildfire in Attica region of Greece.</description>
<link>https://www.gdacs.org/report.aspx?eventtype=WF&amp;eventid=2002001</link>
<pubDate>Mon, 18 May 2026 11:10:00 GMT</pubDate>
<gdacs:iscurrent>true</gdacs:iscurrent>
<gdacs:fromdate>Mon, 18 May 2026 11:00:00 GMT</gdacs:fromdate>
<gdacs:datemodified>Tue, 19 May 2026 04:00:00 GMT</gdacs:datemodified>
<guid isPermaLink="false">WF2002001</guid>
<geo:Point>
<geo:lat>38.0</geo:lat>
<geo:long>23.7</geo:long>
</geo:Point>
<gdacs:bbox>21.7 25.7 36.0 40.0</gdacs:bbox>
<gdacs:eventtype>WF</gdacs:eventtype>
<gdacs:alertlevel>Green</gdacs:alertlevel>
<gdacs:alertscore>0</gdacs:alertscore>
<gdacs:eventid>2002001</gdacs:eventid>
<gdacs:iso3>GRC</gdacs:iso3>
<gdacs:country>Greece</gdacs:country>
</item>
<item>
<title>Orange drought in United States of America 15/04/2026</title>
<description>Multi-state drought.</description>
<link>https://www.gdacs.org/report.aspx?eventtype=DR&amp;eventid=3003001</link>
<pubDate>Wed, 15 Apr 2026 00:00:00 GMT</pubDate>
<gdacs:iscurrent>true</gdacs:iscurrent>
<gdacs:fromdate>Wed, 15 Apr 2026 00:00:00 GMT</gdacs:fromdate>
<guid isPermaLink="false">DR3003001</guid>
<geo:Point>
<geo:lat>39.5</geo:lat>
<geo:long>-98.5</geo:long>
</geo:Point>
<gdacs:bbox>-110.0 -90.0 32.0 45.0</gdacs:bbox>
<gdacs:eventtype>DR</gdacs:eventtype>
<gdacs:alertlevel>Orange</gdacs:alertlevel>
<gdacs:alertscore>1.5</gdacs:alertscore>
<gdacs:eventid>3003001</gdacs:eventid>
<gdacs:iso3>USA</gdacs:iso3>
<gdacs:country>United States of America</gdacs:country>
</item>
<item>
<title>Green earthquake in Vanuatu</title>
<description>EQ Vanuatu</description>
<link>https://www.gdacs.org/report.aspx?eventtype=EQ&amp;eventid=1541360</link>
<pubDate>Tue, 19 May 2026 02:41:13 GMT</pubDate>
<gdacs:iscurrent>true</gdacs:iscurrent>
<gdacs:fromdate>Tue, 19 May 2026 02:29:24 GMT</gdacs:fromdate>
<guid isPermaLink="false">EQ1541360</guid>
<geo:Point>
<geo:lat>-18.15</geo:lat>
<geo:long>168.09</geo:long>
</geo:Point>
<gdacs:eventtype>EQ</gdacs:eventtype>
<gdacs:alertlevel>Green</gdacs:alertlevel>
<gdacs:eventid>1541360</gdacs:eventid>
<gdacs:iso3>VUT</gdacs:iso3>
<gdacs:country>Vanuatu</gdacs:country>
</item>
<item>
<title>Synthetic unknown eventtype</title>
<description>XX synthetic test</description>
<link>https://www.gdacs.org/report.aspx?eventtype=XX&amp;eventid=999999</link>
<pubDate>Tue, 19 May 2026 00:00:00 GMT</pubDate>
<gdacs:iscurrent>true</gdacs:iscurrent>
<gdacs:fromdate>Tue, 19 May 2026 00:00:00 GMT</gdacs:fromdate>
<guid isPermaLink="false">XX999999</guid>
<gdacs:eventtype>XX</gdacs:eventtype>
<gdacs:alertlevel>Green</gdacs:alertlevel>
<gdacs:eventid>999999</gdacs:eventid>
<gdacs:country>Nowhere</gdacs:country>
</item>
</channel>
</rss>"""
# Same items but WF turned to iscurrent=false (tombstone scenario)
SAMPLE_RSS_WF_RETIRED = SAMPLE_RSS.replace(
"<gdacs:iscurrent>true</gdacs:iscurrent>\n <gdacs:fromdate>Mon, 18 May 2026 11:00:00 GMT",
"<gdacs:iscurrent>false</gdacs:iscurrent>\n <gdacs:fromdate>Mon, 18 May 2026 11:00:00 GMT",
1,
)
# Just the DR + EQ + XX items, with WF removed entirely (missing-from-feed scenario)
SAMPLE_RSS_WF_MISSING = SAMPLE_RSS.replace(
"""<item>
<title>Green wildfire in Greece 18/05/2026 11:00 UTC</title>
<description>Wildfire in Attica region of Greece.</description>
<link>https://www.gdacs.org/report.aspx?eventtype=WF&amp;eventid=2002001</link>
<pubDate>Mon, 18 May 2026 11:10:00 GMT</pubDate>
<gdacs:iscurrent>true</gdacs:iscurrent>
<gdacs:fromdate>Mon, 18 May 2026 11:00:00 GMT</gdacs:fromdate>
<gdacs:datemodified>Tue, 19 May 2026 04:00:00 GMT</gdacs:datemodified>
<guid isPermaLink="false">WF2002001</guid>
<geo:Point>
<geo:lat>38.0</geo:lat>
<geo:long>23.7</geo:long>
</geo:Point>
<gdacs:bbox>21.7 25.7 36.0 40.0</gdacs:bbox>
<gdacs:eventtype>WF</gdacs:eventtype>
<gdacs:alertlevel>Green</gdacs:alertlevel>
<gdacs:alertscore>0</gdacs:alertscore>
<gdacs:eventid>2002001</gdacs:eventid>
<gdacs:iso3>GRC</gdacs:iso3>
<gdacs:country>Greece</gdacs:country>
</item>
""",
"",
1,
)
def _config(settings: dict | None = None) -> AdapterConfig:
return AdapterConfig(
name="gdacs",
enabled=True,
cadence_s=600,
settings=settings or {"event_types": ["WF", "DR", "FL", "VO", "TC"]},
updated_at=datetime.now(timezone.utc),
)
class TestGDACSHelpers:
def test_severity_from_alertlevel_green_orange_red(self):
from central.adapters.gdacs import severity_from_alertlevel
assert severity_from_alertlevel("Green") == 1
assert severity_from_alertlevel("Orange") == 2
assert severity_from_alertlevel("Red") == 3
assert severity_from_alertlevel(None) == 0
assert severity_from_alertlevel("") == 0
assert severity_from_alertlevel("Unknown") == 0
# case-insensitive
assert severity_from_alertlevel("green") == 1
assert severity_from_alertlevel("RED") == 3
def test_subject_for_lowercase_country(self):
from central.adapters.gdacs import subject_for_country
assert subject_for_country("United States") == "united-states"
assert subject_for_country("Greece") == "greece"
assert subject_for_country("Solomon Islands") == "solomon-islands"
def test_subject_for_unknown_country(self):
from central.adapters.gdacs import subject_for_country
assert subject_for_country(None) == "unknown"
assert subject_for_country("") == "unknown"
assert subject_for_country(" ") == "unknown"
def test_subject_for_multi_country_takes_first(self):
from central.adapters.gdacs import subject_for_country
assert subject_for_country("Mozambique, Madagascar") == "mozambique"
def test_parse_gdacs_bbox(self):
from central.adapters.gdacs import parse_gdacs_bbox
# GDACS format: lonmin lonmax latmin latmax
# Geo.bbox: (minLon, minLat, maxLon, maxLat)
result = parse_gdacs_bbox("21.7 25.7 36.0 40.0")
assert result == (21.7, 36.0, 25.7, 40.0)
assert parse_gdacs_bbox(None) is None
assert parse_gdacs_bbox("") is None
assert parse_gdacs_bbox("not numbers") is None
class TestGDACSAdapter:
def test_class_attrs_complete(self):
from central.adapters.gdacs import GDACSAdapter, GDACSSettings
assert GDACSAdapter.name == "gdacs"
assert isinstance(GDACSAdapter.display_name, str) and GDACSAdapter.display_name
assert isinstance(GDACSAdapter.description, str) and GDACSAdapter.description
assert GDACSAdapter.settings_schema is GDACSSettings
assert GDACSAdapter.requires_api_key is None
assert GDACSAdapter.api_key_field is None
assert GDACSAdapter.wizard_order is None
assert GDACSAdapter.default_cadence_s == 600
@pytest.mark.asyncio
async def test_normalization_basic_wf(self, tmp_path: Path):
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
events: list[Event] = [e async for e in adapter.poll()]
await adapter.shutdown()
# WF + DR should yield; EQ + XX filtered.
assert len(events) == 2
wf = next(e for e in events if e.data["eventtype"] == "WF")
assert wf.adapter == "gdacs"
assert wf.category == "disaster.wf"
assert wf.id == "WF2002001"
assert wf.severity == 1 # Green
assert wf.data["country"] == "Greece"
assert wf.data["iso3"] == "GRC"
assert wf.geo.centroid == (23.7, 38.0)
assert wf.geo.bbox == (21.7, 36.0, 25.7, 40.0)
assert wf.geo.primary_region == "GRC"
assert wf.geo.regions == ["GRC"]
dr = next(e for e in events if e.data["eventtype"] == "DR")
assert dr.severity == 2 # Orange
assert dr.category == "disaster.dr"
assert dr.data["iso3"] == "USA"
@pytest.mark.asyncio
async def test_eq_filtered_by_default(self, tmp_path: Path):
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# No EQ in default allowlist; EQ1541360 must not appear.
assert all(e.id != "EQ1541360" for e in events)
assert all(e.data["eventtype"] != "EQ" for e in events)
@pytest.mark.asyncio
async def test_unknown_eventtype_filtered(self, tmp_path: Path):
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert all(e.data["eventtype"] != "XX" for e in events)
@pytest.mark.asyncio
async def test_settings_event_types_override(self, tmp_path: Path):
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(
_config({"event_types": ["EQ"]}),
MagicMock(),
tmp_path / "cursors.db",
)
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Only EQ should yield now.
assert len(events) == 1
assert events[0].id == "EQ1541360"
assert events[0].category == "disaster.eq"
@pytest.mark.asyncio
async def test_dedup_by_guid(self, tmp_path: Path):
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(first_pass) == 2
assert len(second_pass) == 0
@pytest.mark.asyncio
async def test_fall_off_iscurrent_false(self, tmp_path: Path):
"""Item seen iscurrent=true then iscurrent=false -> tombstone."""
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
first_pass = [e async for e in adapter.poll()]
assert any(e.id == "WF2002001" for e in first_pass)
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_RETIRED)
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
tombstones = [e for e in second_pass if e.category.endswith(".removed")]
assert len(tombstones) == 1
ts = tombstones[0]
assert ts.id == "WF2002001:removed"
assert ts.category == "disaster.wf.removed"
assert ts.data["reason"] == "iscurrent_false"
# Subject form: central.disaster.<eventtype>.removed.<country>
assert adapter.subject_for(ts) == "central.disaster.wf.removed.greece"
@pytest.mark.asyncio
async def test_fall_off_missing_from_feed(self, tmp_path: Path):
"""Item seen, then completely missing from feed -> tombstone."""
from central.adapters.gdacs import GDACSAdapter
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
await adapter.startup()
_ = [e async for e in adapter.poll()]
adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_MISSING)
second_pass = [e async for e in adapter.poll()]
await adapter.shutdown()
tombstones = [e for e in second_pass if e.category.endswith(".removed")]
assert len(tombstones) == 1
assert tombstones[0].id == "WF2002001:removed"
assert tombstones[0].category == "disaster.wf.removed"
assert tombstones[0].data["reason"] == "missing_from_feed"
@pytest.mark.asyncio
async def test_subject_for_returns_country_path(self, tmp_path: Path):
from central.adapters.gdacs import GDACSAdapter
from central.models import Geo
adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
event = Event(
id="WF2002001",
adapter="gdacs",
category="disaster.wf",
time=datetime(2026, 5, 18, 11, tzinfo=timezone.utc),
severity=1,
geo=Geo(),
data={"eventtype": "WF", "country": "Greece"},
)
assert adapter.subject_for(event) == "central.disaster.wf.greece"
event_unknown = Event(
id="DR1",
adapter="gdacs",
category="disaster.dr",
time=datetime(2026, 5, 18, tzinfo=timezone.utc),
severity=0,
geo=Geo(),
data={"eventtype": "DR", "country": None},
)
assert adapter.subject_for(event_unknown) == "central.disaster.dr.unknown"

View file

@ -0,0 +1,80 @@
"""Registry-consistency tests for src/central/streams.py.
These are property tests, not literal restatements. Adding a new stream to the
registry requires no new test code -- every invariant here automatically
covers it.
"""
import re
from central.streams import STREAMS
def test_stream_names_unique():
names = [s.name for s in STREAMS]
assert len(names) == len(set(names)), "duplicate stream names in registry"
def test_subject_filters_unique():
filters = [s.subject_filter for s in STREAMS]
assert len(filters) == len(set(filters)), "duplicate subject filters in registry"
def test_subject_filter_central_prefix_wildcard():
pattern = re.compile(r"^central\.[a-z][a-z_]*\.>$")
for s in STREAMS:
assert pattern.match(s.subject_filter), (
f"{s.name}: subject_filter {s.subject_filter!r} does not match /^central\\.[a-z][a-z_]*\\.>$/"
)
def test_meta_is_only_non_event_bearing():
"""CENTRAL_META is the only non-event-bearing stream today.
If you're adding a second one, update this test deliberately -- the
archive will silently skip the new stream, which is rarely what you want.
"""
non_event = [s for s in STREAMS if not s.event_bearing]
assert len(non_event) == 1, (
f"expected exactly one non-event-bearing stream, got {[s.name for s in non_event]}"
)
assert non_event[0].name == "CENTRAL_META"
def test_supervisor_stream_subjects_includes_meta():
"""Supervisor creates every stream in JetStream, including META."""
from central.supervisor import STREAM_SUBJECTS
assert "CENTRAL_META" in STREAM_SUBJECTS
assert STREAM_SUBJECTS["CENTRAL_META"] == ["central.meta.>"]
def test_supervisor_stream_subjects_includes_all():
"""Every registry stream appears in supervisor's derived dict with the right filter."""
from central.supervisor import STREAM_SUBJECTS
assert set(STREAM_SUBJECTS.keys()) == {s.name for s in STREAMS}
for s in STREAMS:
assert STREAM_SUBJECTS[s.name] == [s.subject_filter]
def test_archive_streams_excludes_non_event_bearing():
"""Archive's STREAMS list contains exactly the event_bearing=True entries."""
from central.archive import STREAMS as ARCHIVE_STREAMS
expected = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing]
assert ARCHIVE_STREAMS == expected
archive_names = {name for name, _ in ARCHIVE_STREAMS}
for s in STREAMS:
if s.event_bearing:
assert s.name in archive_names
else:
assert s.name not in archive_names
def test_dashboard_streams_matches_dashboard_flag():
"""GUI's DASHBOARD_STREAMS matches [s.name for s in STREAMS if s.dashboard]."""
from central.gui.routes import DASHBOARD_STREAMS
expected = [s.name for s in STREAMS if s.dashboard]
assert DASHBOARD_STREAMS == expected