diff --git a/sql/migrations/020_add_gdacs_adapter.sql b/sql/migrations/020_add_gdacs_adapter.sql
deleted file mode 100644
index 20ac01f..0000000
--- a/sql/migrations/020_add_gdacs_adapter.sql
+++ /dev/null
@@ -1,14 +0,0 @@
--- Migration: 020_add_gdacs_adapter
--- Adds the GDACS adapter row to config.adapters.
--- Ships disabled; operator enables via GUI.
--- Default event_types excludes EQ (USGS is canonical for earthquakes on central.quake.>).
--- Idempotent: uses ON CONFLICT DO NOTHING.
-
-INSERT INTO config.adapters (name, enabled, cadence_s, settings)
-VALUES (
- 'gdacs',
- false,
- 600,
- jsonb_build_object('event_types', jsonb_build_array('WF', 'DR', 'FL', 'VO', 'TC'))
-)
-ON CONFLICT (name) DO NOTHING;
diff --git a/sql/migrations/021_add_central_disaster_stream.sql b/sql/migrations/021_add_central_disaster_stream.sql
deleted file mode 100644
index b9192f0..0000000
--- a/sql/migrations/021_add_central_disaster_stream.sql
+++ /dev/null
@@ -1,8 +0,0 @@
--- Migration: 021_add_central_disaster_stream
--- Seeds the CENTRAL_DISASTER JetStream stream row for central.disaster.> subjects.
--- 7-day retention, 1 GiB max_bytes -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE.
--- Idempotent: uses ON CONFLICT DO NOTHING.
-
-INSERT INTO config.streams (name, max_age_s, max_bytes)
-VALUES ('CENTRAL_DISASTER', 604800, 1073741824)
-ON CONFLICT (name) DO NOTHING;
diff --git a/src/central/adapters/gdacs.py b/src/central/adapters/gdacs.py
deleted file mode 100644
index 9ff6d6e..0000000
--- a/src/central/adapters/gdacs.py
+++ /dev/null
@@ -1,478 +0,0 @@
-"""GDACS (Global Disaster Alert and Coordination System) adapter."""
-
-import logging
-import sqlite3
-from collections.abc import AsyncIterator
-from datetime import datetime, timezone
-from email.utils import parsedate_to_datetime
-from pathlib import Path
-from typing import Any
-from xml.etree import ElementTree as ET
-
-import aiohttp
-from pydantic import BaseModel
-from tenacity import (
- retry,
- retry_if_exception_type,
- stop_after_attempt,
- wait_exponential_jitter,
-)
-
-from central.adapter import SourceAdapter
-from central.config_models import AdapterConfig
-from central.config_store import ConfigStore
-from central.models import Event, Geo
-
-logger = logging.getLogger(__name__)
-
-GDACS_RSS_URL = "https://www.gdacs.org/xml/rss.xml"
-
-NS = {
- "gdacs": "http://www.gdacs.org",
- "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#",
- "georss": "http://www.georss.org/georss",
- "dc": "http://purl.org/dc/elements/1.1/",
-}
-
-_ALERTLEVEL_TO_SEVERITY = {"Green": 1, "Orange": 2, "Red": 3}
-
-DEFAULT_EVENT_TYPES = ["WF", "DR", "FL", "VO", "TC"]
-
-
-def severity_from_alertlevel(level: str | None) -> int:
- """Green=1, Orange=2, Red=3, default 0."""
- if not level:
- return 0
- return _ALERTLEVEL_TO_SEVERITY.get(level.strip().capitalize(), 0)
-
-
-def subject_for_country(country: str | None) -> str:
- """Lowercase, hyphenate spaces. 'unknown' for missing/empty. Takes first if comma-separated."""
- if not country:
- return "unknown"
- first = country.split(",")[0].strip()
- if not first:
- return "unknown"
- return first.lower().replace(" ", "-")
-
-
-def parse_rfc822_utc(raw: str | None) -> datetime | None:
- """Parse an RFC 822 datetime string to UTC datetime."""
- if not raw:
- return None
- try:
- dt = parsedate_to_datetime(raw)
- except (ValueError, TypeError):
- return None
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
-
-
-def parse_gdacs_bbox(raw: str | None) -> tuple[float, float, float, float] | None:
- """Parse GDACS bbox 'lonmin lonmax latmin latmax' to Geo.bbox (minLon, minLat, maxLon, maxLat)."""
- if not raw:
- return None
- try:
- parts = [float(p) for p in raw.split()]
- except ValueError:
- return None
- if len(parts) != 4:
- return None
- lon_min, lon_max, lat_min, lat_max = parts
- return (lon_min, lat_min, lon_max, lat_max)
-
-
-def init_gdacs_observed_table(db: sqlite3.Connection) -> None:
- db.execute("""
- CREATE TABLE IF NOT EXISTS gdacs_observed (
- guid TEXT PRIMARY KEY,
- country TEXT,
- eventtype TEXT,
- last_observed_at TEXT NOT NULL
- )
- """)
- db.commit()
-
-
-def get_observed(db: sqlite3.Connection) -> dict[str, tuple[str | None, str | None]]:
- cur = db.execute("SELECT guid, country, eventtype FROM gdacs_observed")
- return {row[0]: (row[1], row[2]) for row in cur.fetchall()}
-
-
-def mark_observed(db: sqlite3.Connection, guid: str, country: str | None, eventtype: str | None) -> None:
- db.execute(
- """
- INSERT INTO gdacs_observed (guid, country, eventtype, last_observed_at)
- VALUES (?, ?, ?, CURRENT_TIMESTAMP)
- ON CONFLICT (guid) DO UPDATE SET last_observed_at = CURRENT_TIMESTAMP
- """,
- (guid, country, eventtype),
- )
- db.commit()
-
-
-def mark_retired(db: sqlite3.Connection, guids: set[str]) -> None:
- for guid in guids:
- db.execute("DELETE FROM gdacs_observed WHERE guid = ?", (guid,))
- db.commit()
-
-
-class GDACSSettings(BaseModel):
- """Settings schema for GDACS adapter.
-
- event_types is the explicit allowlist of GDACS eventtype codes to publish.
- EQ is intentionally absent from the default because USGS is the canonical
- earthquake source for Central and quakes are already published on
- central.quake.>. Operators can re-include "EQ" here if USGS is
- temporarily unavailable or if the GDACS alertlevel triage signal is
- operationally needed.
-
- Future-unknown eventtypes (anything GDACS may add later) are dropped by
- default — opt in by adding the code to this list.
- """
-
- event_types: list[str] = DEFAULT_EVENT_TYPES.copy()
-
-
-class GDACSAdapter(SourceAdapter):
- """Global Disaster Alert and Coordination System adapter."""
-
- name = "gdacs"
- display_name = "GDACS — Global Disaster Alerts"
- description = (
- "Global Disaster Alert and Coordination System events: wildfires, drought, "
- "flood, volcano, and tropical cyclones with humanitarian-coordination triage signals."
- )
- settings_schema = GDACSSettings
- requires_api_key = None
- api_key_field = None
- wizard_order = None
- default_cadence_s = 600
-
- def __init__(
- self,
- config: AdapterConfig,
- config_store: ConfigStore,
- cursor_db_path: Path,
- ) -> None:
- self._config_store = config_store
- self._cursor_db_path = cursor_db_path
- self._session: aiohttp.ClientSession | None = None
- self._db: sqlite3.Connection | None = None
- self.event_types: list[str] = list(
- config.settings.get("event_types", DEFAULT_EVENT_TYPES)
- )
-
- async def startup(self) -> None:
- self._session = aiohttp.ClientSession(
- timeout=aiohttp.ClientTimeout(total=60),
- )
- self._db = sqlite3.connect(self._cursor_db_path)
- self._db.execute("""
- CREATE TABLE IF NOT EXISTS published_ids (
- adapter TEXT NOT NULL,
- event_id TEXT NOT NULL,
- first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
- last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY (adapter, event_id)
- )
- """)
- self._db.execute("""
- CREATE INDEX IF NOT EXISTS published_ids_last_seen
- ON published_ids (last_seen)
- """)
- init_gdacs_observed_table(self._db)
- self._db.commit()
- logger.info("GDACS adapter started", extra={"event_types": self.event_types})
-
- async def shutdown(self) -> None:
- if self._session:
- await self._session.close()
- self._session = None
- if self._db:
- self._db.close()
- self._db = None
- logger.info("GDACS adapter shut down")
-
- async def apply_config(self, new_config: AdapterConfig) -> None:
- self.event_types = list(
- new_config.settings.get("event_types", DEFAULT_EVENT_TYPES)
- )
- logger.info("GDACS config updated", extra={"event_types": self.event_types})
-
- def is_published(self, event_id: str) -> bool:
- if not self._db:
- return False
- cur = self._db.execute(
- "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
- (self.name, event_id),
- )
- return cur.fetchone() is not None
-
- def mark_published(self, event_id: str) -> None:
- if not self._db:
- return
- self._db.execute(
- """
- INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
- VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
- ON CONFLICT (adapter, event_id) DO UPDATE SET
- last_seen = CURRENT_TIMESTAMP
- """,
- (self.name, event_id),
- )
- self._db.commit()
-
- def sweep_old_ids(self) -> int:
- if not self._db:
- return 0
- cur = self._db.execute(
- "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
- (self.name,),
- )
- self._db.commit()
- count = cur.rowcount
- if count > 0:
- logger.info("GDACS swept old dedup entries", extra={"count": count})
- return count
-
- def subject_for(self, event: Event) -> str:
- parts = event.category.split(".")
- country_subj = subject_for_country(event.data.get("country"))
- if len(parts) >= 3 and parts[-1] == "removed":
- eventtype = parts[1]
- return f"central.disaster.{eventtype}.removed.{country_subj}"
- eventtype = (event.data.get("eventtype") or "").lower() or "unknown"
- return f"central.disaster.{eventtype}.{country_subj}"
-
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential_jitter(initial=1, max=30),
- retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
- )
- async def _fetch(self) -> str:
- if not self._session:
- raise RuntimeError("Session not initialized")
- async with self._session.get(
- GDACS_RSS_URL, headers={"User-Agent": "Central/0.4"}
- ) as resp:
- resp.raise_for_status()
- text = await resp.text()
- return text
-
- async def poll(self) -> AsyncIterator[Event]:
- if not self._db:
- raise RuntimeError("Database not initialized")
-
- try:
- content = await self._fetch()
- except Exception as e:
- logger.error("GDACS fetch failed", extra={"error": str(e)})
- raise
-
- try:
- root = ET.fromstring(content)
- except ET.ParseError as e:
- logger.error("GDACS RSS parse error", extra={"error": str(e)})
- raise
-
- channel = root.find("channel")
- if channel is None:
- logger.info("GDACS fetch completed", extra={"item_count": 0})
- return
-
- items = channel.findall("item")
- logger.info("GDACS fetch completed", extra={"item_count": len(items)})
-
- observed_before = get_observed(self._db)
- current_guids: set[str] = set()
- events_yielded = 0
-
- for item_elem in items:
- guid_elem = item_elem.find("guid")
- if guid_elem is None or not guid_elem.text:
- continue
- guid = guid_elem.text.strip()
- if not guid:
- continue
-
- eventtype_elem = item_elem.find("gdacs:eventtype", NS)
- eventtype = (
- eventtype_elem.text.strip()
- if eventtype_elem is not None and eventtype_elem.text
- else ""
- )
-
- if eventtype not in self.event_types:
- continue
-
- iscurrent_elem = item_elem.find("gdacs:iscurrent", NS)
- iscurrent = True
- if iscurrent_elem is not None and iscurrent_elem.text:
- iscurrent = iscurrent_elem.text.strip().lower() == "true"
-
- country_elem = item_elem.find("gdacs:country", NS)
- country = (
- country_elem.text.strip()
- if country_elem is not None and country_elem.text
- else None
- )
-
- iso3_elem = item_elem.find("gdacs:iso3", NS)
- iso3 = (
- iso3_elem.text.strip()
- if iso3_elem is not None and iso3_elem.text
- else None
- )
-
- alertlevel_elem = item_elem.find("gdacs:alertlevel", NS)
- alertlevel = (
- alertlevel_elem.text.strip()
- if alertlevel_elem is not None and alertlevel_elem.text
- else None
- )
-
- fromdate_elem = item_elem.find("gdacs:fromdate", NS)
- event_time = parse_rfc822_utc(
- fromdate_elem.text if fromdate_elem is not None else None
- )
- if event_time is None:
- pub_date_elem = item_elem.find("pubDate")
- event_time = (
- parse_rfc822_utc(pub_date_elem.text if pub_date_elem is not None else None)
- or datetime.now(timezone.utc)
- )
-
- bbox_elem = item_elem.find("gdacs:bbox", NS)
- bbox = parse_gdacs_bbox(
- bbox_elem.text if bbox_elem is not None else None
- )
-
- lat_elem = item_elem.find("geo:Point/geo:lat", NS)
- lon_elem = item_elem.find("geo:Point/geo:long", NS)
- centroid: tuple[float, float] | None = None
- if lat_elem is not None and lon_elem is not None and lat_elem.text and lon_elem.text:
- try:
- centroid = (float(lon_elem.text), float(lat_elem.text))
- except ValueError:
- centroid = None
-
- region = iso3 or country
- regions = [region] if region else []
- primary_region = region
-
- title_elem = item_elem.find("title")
- description_elem = item_elem.find("description")
- link_elem = item_elem.find("link")
- eventid_elem = item_elem.find("gdacs:eventid", NS)
- alertscore_elem = item_elem.find("gdacs:alertscore", NS)
- datemodified_elem = item_elem.find("gdacs:datemodified", NS)
-
- geo = Geo(
- centroid=centroid,
- bbox=bbox,
- regions=regions,
- primary_region=primary_region,
- )
-
- data: dict[str, Any] = {
- "guid": guid,
- "eventtype": eventtype,
- "eventid": eventid_elem.text.strip() if eventid_elem is not None and eventid_elem.text else None,
- "country": country,
- "iso3": iso3,
- "alertlevel": alertlevel,
- "alertscore": alertscore_elem.text.strip() if alertscore_elem is not None and alertscore_elem.text else None,
- "title": title_elem.text if title_elem is not None and title_elem.text else "",
- "description": description_elem.text if description_elem is not None and description_elem.text else "",
- "url": link_elem.text if link_elem is not None and link_elem.text else "",
- "datemodified": datemodified_elem.text if datemodified_elem is not None and datemodified_elem.text else None,
- "iscurrent": iscurrent,
- }
-
- if not iscurrent:
- # Explicit tombstone from upstream. Only emit if we previously observed it.
- if guid in observed_before:
- tombstone = Event(
- id=f"{guid}:removed",
- adapter=self.name,
- category=f"disaster.{eventtype.lower()}.removed",
- time=datetime.now(timezone.utc),
- severity=0,
- geo=geo,
- data={**data, "reason": "iscurrent_false"},
- )
- if not self.is_published(tombstone.id):
- yield tombstone
- self.mark_published(tombstone.id)
- events_yielded += 1
- continue
-
- current_guids.add(guid)
-
- if self.is_published(guid):
- mark_observed(self._db, guid, country, eventtype)
- continue
-
- event = Event(
- id=guid,
- adapter=self.name,
- category=f"disaster.{eventtype.lower()}",
- time=event_time,
- severity=severity_from_alertlevel(alertlevel),
- geo=geo,
- data=data,
- )
-
- yield event
- self.mark_published(guid)
- mark_observed(self._db, guid, country, eventtype)
- events_yielded += 1
-
- # Fall-off: events present in observed_before but absent from this poll
- fallen_off = set(observed_before.keys()) - current_guids
- for guid in fallen_off:
- prior_country, prior_eventtype = observed_before[guid]
- if prior_eventtype and prior_eventtype not in self.event_types:
- # Was published before settings narrowed; clean up silently.
- mark_retired(self._db, {guid})
- continue
- tombstone_id = f"{guid}:removed"
- if self.is_published(tombstone_id):
- mark_retired(self._db, {guid})
- continue
- now = datetime.now(timezone.utc)
- region = prior_country
- geo = Geo(
- regions=[region] if region else [],
- primary_region=region,
- )
- tombstone = Event(
- id=tombstone_id,
- adapter=self.name,
- category=f"disaster.{(prior_eventtype or '').lower()}.removed",
- time=now,
- severity=0,
- geo=geo,
- data={
- "guid": guid,
- "country": prior_country,
- "eventtype": prior_eventtype,
- "reason": "missing_from_feed",
- },
- )
- yield tombstone
- self.mark_published(tombstone_id)
- mark_retired(self._db, {guid})
- events_yielded += 1
-
- self.sweep_old_ids()
- logger.info(
- "GDACS poll completed",
- extra={
- "events_yielded": events_yielded,
- "current_observed": len(current_guids),
- "fallen_off": len(fallen_off),
- },
- )
diff --git a/src/central/archive.py b/src/central/archive.py
index dfa5475..c173805 100644
--- a/src/central/archive.py
+++ b/src/central/archive.py
@@ -19,11 +19,14 @@ from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
from nats.js.errors import NotFoundError
from central.bootstrap_config import get_settings
-from central.streams import STREAMS as STREAM_REGISTRY
-# Event-bearing streams to consume -- derived from the registry's event_bearing flag.
-# CENTRAL_META is excluded because it carries status messages, not events.
-STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearing]
+# Event-bearing streams to consume (skip CENTRAL_META - status messages only)
+STREAMS = [
+ ("CENTRAL_WX", "central.wx.>"),
+ ("CENTRAL_FIRE", "central.fire.>"),
+ ("CENTRAL_QUAKE", "central.quake.>"),
+ ("CENTRAL_SPACE", "central.space.>"),
+]
BATCH_SIZE = 100
FETCH_TIMEOUT = 5.0
diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py
index c5baf44..b5c66f7 100644
--- a/src/central/gui/routes.py
+++ b/src/central/gui/routes.py
@@ -49,7 +49,6 @@ from functools import cache
from central.gui.db import get_pool
from central.gui.form_descriptors import describe_fields, FieldDescriptor
from central.adapter_discovery import discover_adapters
-from central.streams import STREAMS as STREAM_REGISTRY
from pydantic import ValidationError
@cache
@@ -64,8 +63,8 @@ def _adapter_classes() -> dict:
router = APIRouter()
-# Streams to display on dashboard -- derived from the registry's dashboard flag.
-DASHBOARD_STREAMS = [s.name for s in STREAM_REGISTRY if s.dashboard]
+# Streams to display on dashboard
+DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"]
# Email validation regex (simple but effective)
ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$")
diff --git a/src/central/streams.py b/src/central/streams.py
deleted file mode 100644
index e9e05f5..0000000
--- a/src/central/streams.py
+++ /dev/null
@@ -1,32 +0,0 @@
-"""Stream registry — single source of truth for NATS JetStream stream definitions.
-
-Subject-filter mappings live in code (structural; change only when code changes).
-Retention / max_bytes live in config.streams (operator-tunable; ops state).
-
-Adding a stream: one StreamEntry below + one migration row that seeds
-config.streams. supervisor STREAM_SUBJECTS / archive STREAMS / gui DASHBOARD_STREAMS
-all derive automatically.
-"""
-
-from dataclasses import dataclass
-
-
-@dataclass(frozen=True)
-class StreamEntry:
- name: str
- subject_filter: str
- event_bearing: bool = True
- """Whether central-archive consumes events from this stream into the events table.
- False for status-only streams (CENTRAL_META) that the archive intentionally skips."""
- dashboard: bool = True
- """Whether the GUI dashboard surfaces this stream's stats card."""
-
-
-STREAMS: list[StreamEntry] = [
- StreamEntry("CENTRAL_WX", "central.wx.>"),
- StreamEntry("CENTRAL_FIRE", "central.fire.>"),
- StreamEntry("CENTRAL_QUAKE", "central.quake.>"),
- StreamEntry("CENTRAL_SPACE", "central.space.>"),
- StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
- StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
-]
diff --git a/src/central/supervisor.py b/src/central/supervisor.py
index bdc68e6..81fa1bf 100644
--- a/src/central/supervisor.py
+++ b/src/central/supervisor.py
@@ -21,12 +21,16 @@ from central.config_source import ConfigSource, DbConfigSource
from central.config_store import ConfigStore
from central.bootstrap_config import get_settings
from central.stream_manager import StreamManager
-from central.streams import STREAMS as STREAM_REGISTRY
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
-# Stream subject mappings -- derived from the registry; every stream is included
-# (META too: supervisor must create it in JetStream even though archive skips it).
-STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY}
+# Stream subject mappings
+STREAM_SUBJECTS = {
+ "CENTRAL_WX": ["central.wx.>"],
+ "CENTRAL_META": ["central.meta.>"],
+ "CENTRAL_FIRE": ["central.fire.>"],
+ "CENTRAL_QUAKE": ["central.quake.>"],
+ "CENTRAL_SPACE": ["central.space.>"],
+}
# Recompute interval for stream max_bytes (1 hour)
STREAM_RECOMPUTE_INTERVAL_S = 3600
diff --git a/tests/test_archive_multi_stream.py b/tests/test_archive_multi_stream.py
index 0727bb1..bba544d 100644
--- a/tests/test_archive_multi_stream.py
+++ b/tests/test_archive_multi_stream.py
@@ -26,6 +26,35 @@ class TestConsumerNaming:
assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake"
+class TestStreamsConfiguration:
+ """Test streams configuration."""
+
+ def test_streams_list_has_four_entries(self):
+ """STREAMS list has four event-bearing streams."""
+ assert len(STREAMS) == 4
+
+ def test_streams_contains_central_wx(self):
+ """STREAMS contains CENTRAL_WX with correct filter."""
+ assert ("CENTRAL_WX", "central.wx.>") in STREAMS
+
+ def test_streams_contains_central_fire(self):
+ """STREAMS contains CENTRAL_FIRE with correct filter."""
+ assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS
+
+ def test_streams_contains_central_quake(self):
+ """STREAMS contains CENTRAL_QUAKE with correct filter."""
+ assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS
+
+ def test_streams_contains_central_space(self):
+ """STREAMS contains CENTRAL_SPACE with correct filter."""
+ assert ("CENTRAL_SPACE", "central.space.>") in STREAMS
+
+ def test_streams_excludes_central_meta(self):
+ """STREAMS does not contain CENTRAL_META (status messages only)."""
+ stream_names = [s[0] for s in STREAMS]
+ assert "CENTRAL_META" not in stream_names
+
+
class TestOrphanedConsumerCleanup:
"""Test cleanup of orphaned 'archive' consumer."""
diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py
index 73a2ace..73df132 100644
--- a/tests/test_dashboard.py
+++ b/tests/test_dashboard.py
@@ -205,6 +205,7 @@ class TestDashboardStreamsIsolation:
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
streams = context["streams"]
+ assert len(streams) == 5
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
assert fire_stream.get("error") == "unavailable"
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")
diff --git a/tests/test_gdacs.py b/tests/test_gdacs.py
deleted file mode 100644
index e87b493..0000000
--- a/tests/test_gdacs.py
+++ /dev/null
@@ -1,367 +0,0 @@
-"""Tests for GDACS adapter."""
-
-from datetime import datetime, timezone
-from pathlib import Path
-from unittest.mock import AsyncMock, MagicMock
-
-import pytest
-
-from central.config_models import AdapterConfig
-from central.models import Event
-
-
-# Frozen RSS fixture mirroring real GDACS shape (namespaces + element layout).
-SAMPLE_RSS = """
-
-
- GDACS RSS information
- https://www.gdacs.org/
- Near real-time notification
- Tue, 19 May 2026 06:35:01 GMT
- -
- Green wildfire in Greece 18/05/2026 11:00 UTC
- Wildfire in Attica region of Greece.
- https://www.gdacs.org/report.aspx?eventtype=WF&eventid=2002001
- Mon, 18 May 2026 11:10:00 GMT
- true
- Mon, 18 May 2026 11:00:00 GMT
- Tue, 19 May 2026 04:00:00 GMT
- WF2002001
-
- 38.0
- 23.7
-
- 21.7 25.7 36.0 40.0
- WF
- Green
- 0
- 2002001
- GRC
- Greece
-
- -
- Orange drought in United States of America 15/04/2026
- Multi-state drought.
- https://www.gdacs.org/report.aspx?eventtype=DR&eventid=3003001
- Wed, 15 Apr 2026 00:00:00 GMT
- true
- Wed, 15 Apr 2026 00:00:00 GMT
- DR3003001
-
- 39.5
- -98.5
-
- -110.0 -90.0 32.0 45.0
- DR
- Orange
- 1.5
- 3003001
- USA
- United States of America
-
- -
- Green earthquake in Vanuatu
- EQ Vanuatu
- https://www.gdacs.org/report.aspx?eventtype=EQ&eventid=1541360
- Tue, 19 May 2026 02:41:13 GMT
- true
- Tue, 19 May 2026 02:29:24 GMT
- EQ1541360
-
- -18.15
- 168.09
-
- EQ
- Green
- 1541360
- VUT
- Vanuatu
-
- -
- Synthetic unknown eventtype
- XX synthetic test
- https://www.gdacs.org/report.aspx?eventtype=XX&eventid=999999
- Tue, 19 May 2026 00:00:00 GMT
- true
- Tue, 19 May 2026 00:00:00 GMT
- XX999999
- XX
- Green
- 999999
- Nowhere
-
-
-"""
-
-
-# Same items but WF turned to iscurrent=false (tombstone scenario)
-SAMPLE_RSS_WF_RETIRED = SAMPLE_RSS.replace(
- "true\n Mon, 18 May 2026 11:00:00 GMT",
- "false\n Mon, 18 May 2026 11:00:00 GMT",
- 1,
-)
-
-
-# Just the DR + EQ + XX items, with WF removed entirely (missing-from-feed scenario)
-SAMPLE_RSS_WF_MISSING = SAMPLE_RSS.replace(
- """-
- Green wildfire in Greece 18/05/2026 11:00 UTC
- Wildfire in Attica region of Greece.
- https://www.gdacs.org/report.aspx?eventtype=WF&eventid=2002001
- Mon, 18 May 2026 11:10:00 GMT
- true
- Mon, 18 May 2026 11:00:00 GMT
- Tue, 19 May 2026 04:00:00 GMT
- WF2002001
-
- 38.0
- 23.7
-
- 21.7 25.7 36.0 40.0
- WF
- Green
- 0
- 2002001
- GRC
- Greece
-
- """,
- "",
- 1,
-)
-
-
-def _config(settings: dict | None = None) -> AdapterConfig:
- return AdapterConfig(
- name="gdacs",
- enabled=True,
- cadence_s=600,
- settings=settings or {"event_types": ["WF", "DR", "FL", "VO", "TC"]},
- updated_at=datetime.now(timezone.utc),
- )
-
-
-class TestGDACSHelpers:
- def test_severity_from_alertlevel_green_orange_red(self):
- from central.adapters.gdacs import severity_from_alertlevel
-
- assert severity_from_alertlevel("Green") == 1
- assert severity_from_alertlevel("Orange") == 2
- assert severity_from_alertlevel("Red") == 3
- assert severity_from_alertlevel(None) == 0
- assert severity_from_alertlevel("") == 0
- assert severity_from_alertlevel("Unknown") == 0
- # case-insensitive
- assert severity_from_alertlevel("green") == 1
- assert severity_from_alertlevel("RED") == 3
-
- def test_subject_for_lowercase_country(self):
- from central.adapters.gdacs import subject_for_country
-
- assert subject_for_country("United States") == "united-states"
- assert subject_for_country("Greece") == "greece"
- assert subject_for_country("Solomon Islands") == "solomon-islands"
-
- def test_subject_for_unknown_country(self):
- from central.adapters.gdacs import subject_for_country
-
- assert subject_for_country(None) == "unknown"
- assert subject_for_country("") == "unknown"
- assert subject_for_country(" ") == "unknown"
-
- def test_subject_for_multi_country_takes_first(self):
- from central.adapters.gdacs import subject_for_country
-
- assert subject_for_country("Mozambique, Madagascar") == "mozambique"
-
- def test_parse_gdacs_bbox(self):
- from central.adapters.gdacs import parse_gdacs_bbox
-
- # GDACS format: lonmin lonmax latmin latmax
- # Geo.bbox: (minLon, minLat, maxLon, maxLat)
- result = parse_gdacs_bbox("21.7 25.7 36.0 40.0")
- assert result == (21.7, 36.0, 25.7, 40.0)
- assert parse_gdacs_bbox(None) is None
- assert parse_gdacs_bbox("") is None
- assert parse_gdacs_bbox("not numbers") is None
-
-
-class TestGDACSAdapter:
- def test_class_attrs_complete(self):
- from central.adapters.gdacs import GDACSAdapter, GDACSSettings
-
- assert GDACSAdapter.name == "gdacs"
- assert isinstance(GDACSAdapter.display_name, str) and GDACSAdapter.display_name
- assert isinstance(GDACSAdapter.description, str) and GDACSAdapter.description
- assert GDACSAdapter.settings_schema is GDACSSettings
- assert GDACSAdapter.requires_api_key is None
- assert GDACSAdapter.api_key_field is None
- assert GDACSAdapter.wizard_order is None
- assert GDACSAdapter.default_cadence_s == 600
-
- @pytest.mark.asyncio
- async def test_normalization_basic_wf(self, tmp_path: Path):
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
-
- await adapter.startup()
- events: list[Event] = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- # WF + DR should yield; EQ + XX filtered.
- assert len(events) == 2
- wf = next(e for e in events if e.data["eventtype"] == "WF")
- assert wf.adapter == "gdacs"
- assert wf.category == "disaster.wf"
- assert wf.id == "WF2002001"
- assert wf.severity == 1 # Green
- assert wf.data["country"] == "Greece"
- assert wf.data["iso3"] == "GRC"
- assert wf.geo.centroid == (23.7, 38.0)
- assert wf.geo.bbox == (21.7, 36.0, 25.7, 40.0)
- assert wf.geo.primary_region == "GRC"
- assert wf.geo.regions == ["GRC"]
-
- dr = next(e for e in events if e.data["eventtype"] == "DR")
- assert dr.severity == 2 # Orange
- assert dr.category == "disaster.dr"
- assert dr.data["iso3"] == "USA"
-
- @pytest.mark.asyncio
- async def test_eq_filtered_by_default(self, tmp_path: Path):
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
-
- await adapter.startup()
- events = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- # No EQ in default allowlist; EQ1541360 must not appear.
- assert all(e.id != "EQ1541360" for e in events)
- assert all(e.data["eventtype"] != "EQ" for e in events)
-
- @pytest.mark.asyncio
- async def test_unknown_eventtype_filtered(self, tmp_path: Path):
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
-
- await adapter.startup()
- events = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- assert all(e.data["eventtype"] != "XX" for e in events)
-
- @pytest.mark.asyncio
- async def test_settings_event_types_override(self, tmp_path: Path):
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(
- _config({"event_types": ["EQ"]}),
- MagicMock(),
- tmp_path / "cursors.db",
- )
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
-
- await adapter.startup()
- events = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- # Only EQ should yield now.
- assert len(events) == 1
- assert events[0].id == "EQ1541360"
- assert events[0].category == "disaster.eq"
-
- @pytest.mark.asyncio
- async def test_dedup_by_guid(self, tmp_path: Path):
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
-
- await adapter.startup()
- first_pass = [e async for e in adapter.poll()]
- second_pass = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- assert len(first_pass) == 2
- assert len(second_pass) == 0
-
- @pytest.mark.asyncio
- async def test_fall_off_iscurrent_false(self, tmp_path: Path):
- """Item seen iscurrent=true then iscurrent=false -> tombstone."""
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
-
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
- await adapter.startup()
- first_pass = [e async for e in adapter.poll()]
- assert any(e.id == "WF2002001" for e in first_pass)
-
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_RETIRED)
- second_pass = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- tombstones = [e for e in second_pass if e.category.endswith(".removed")]
- assert len(tombstones) == 1
- ts = tombstones[0]
- assert ts.id == "WF2002001:removed"
- assert ts.category == "disaster.wf.removed"
- assert ts.data["reason"] == "iscurrent_false"
- # Subject form: central.disaster..removed.
- assert adapter.subject_for(ts) == "central.disaster.wf.removed.greece"
-
- @pytest.mark.asyncio
- async def test_fall_off_missing_from_feed(self, tmp_path: Path):
- """Item seen, then completely missing from feed -> tombstone."""
- from central.adapters.gdacs import GDACSAdapter
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
-
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS)
- await adapter.startup()
- _ = [e async for e in adapter.poll()]
-
- adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_MISSING)
- second_pass = [e async for e in adapter.poll()]
- await adapter.shutdown()
-
- tombstones = [e for e in second_pass if e.category.endswith(".removed")]
- assert len(tombstones) == 1
- assert tombstones[0].id == "WF2002001:removed"
- assert tombstones[0].category == "disaster.wf.removed"
- assert tombstones[0].data["reason"] == "missing_from_feed"
-
- @pytest.mark.asyncio
- async def test_subject_for_returns_country_path(self, tmp_path: Path):
- from central.adapters.gdacs import GDACSAdapter
- from central.models import Geo
-
- adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db")
- event = Event(
- id="WF2002001",
- adapter="gdacs",
- category="disaster.wf",
- time=datetime(2026, 5, 18, 11, tzinfo=timezone.utc),
- severity=1,
- geo=Geo(),
- data={"eventtype": "WF", "country": "Greece"},
- )
- assert adapter.subject_for(event) == "central.disaster.wf.greece"
-
- event_unknown = Event(
- id="DR1",
- adapter="gdacs",
- category="disaster.dr",
- time=datetime(2026, 5, 18, tzinfo=timezone.utc),
- severity=0,
- geo=Geo(),
- data={"eventtype": "DR", "country": None},
- )
- assert adapter.subject_for(event_unknown) == "central.disaster.dr.unknown"
diff --git a/tests/test_stream_registry.py b/tests/test_stream_registry.py
deleted file mode 100644
index b8b2678..0000000
--- a/tests/test_stream_registry.py
+++ /dev/null
@@ -1,80 +0,0 @@
-"""Registry-consistency tests for src/central/streams.py.
-
-These are property tests, not literal restatements. Adding a new stream to the
-registry requires no new test code -- every invariant here automatically
-covers it.
-"""
-
-import re
-
-from central.streams import STREAMS
-
-
-def test_stream_names_unique():
- names = [s.name for s in STREAMS]
- assert len(names) == len(set(names)), "duplicate stream names in registry"
-
-
-def test_subject_filters_unique():
- filters = [s.subject_filter for s in STREAMS]
- assert len(filters) == len(set(filters)), "duplicate subject filters in registry"
-
-
-def test_subject_filter_central_prefix_wildcard():
- pattern = re.compile(r"^central\.[a-z][a-z_]*\.>$")
- for s in STREAMS:
- assert pattern.match(s.subject_filter), (
- f"{s.name}: subject_filter {s.subject_filter!r} does not match /^central\\.[a-z][a-z_]*\\.>$/"
- )
-
-
-def test_meta_is_only_non_event_bearing():
- """CENTRAL_META is the only non-event-bearing stream today.
-
- If you're adding a second one, update this test deliberately -- the
- archive will silently skip the new stream, which is rarely what you want.
- """
- non_event = [s for s in STREAMS if not s.event_bearing]
- assert len(non_event) == 1, (
- f"expected exactly one non-event-bearing stream, got {[s.name for s in non_event]}"
- )
- assert non_event[0].name == "CENTRAL_META"
-
-
-def test_supervisor_stream_subjects_includes_meta():
- """Supervisor creates every stream in JetStream, including META."""
- from central.supervisor import STREAM_SUBJECTS
-
- assert "CENTRAL_META" in STREAM_SUBJECTS
- assert STREAM_SUBJECTS["CENTRAL_META"] == ["central.meta.>"]
-
-
-def test_supervisor_stream_subjects_includes_all():
- """Every registry stream appears in supervisor's derived dict with the right filter."""
- from central.supervisor import STREAM_SUBJECTS
-
- assert set(STREAM_SUBJECTS.keys()) == {s.name for s in STREAMS}
- for s in STREAMS:
- assert STREAM_SUBJECTS[s.name] == [s.subject_filter]
-
-
-def test_archive_streams_excludes_non_event_bearing():
- """Archive's STREAMS list contains exactly the event_bearing=True entries."""
- from central.archive import STREAMS as ARCHIVE_STREAMS
-
- expected = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing]
- assert ARCHIVE_STREAMS == expected
- archive_names = {name for name, _ in ARCHIVE_STREAMS}
- for s in STREAMS:
- if s.event_bearing:
- assert s.name in archive_names
- else:
- assert s.name not in archive_names
-
-
-def test_dashboard_streams_matches_dashboard_flag():
- """GUI's DASHBOARD_STREAMS matches [s.name for s in STREAMS if s.dashboard]."""
- from central.gui.routes import DASHBOARD_STREAMS
-
- expected = [s.name for s in STREAMS if s.dashboard]
- assert DASHBOARD_STREAMS == expected