diff --git a/sql/migrations/020_add_gdacs_adapter.sql b/sql/migrations/020_add_gdacs_adapter.sql deleted file mode 100644 index 20ac01f..0000000 --- a/sql/migrations/020_add_gdacs_adapter.sql +++ /dev/null @@ -1,14 +0,0 @@ --- Migration: 020_add_gdacs_adapter --- Adds the GDACS adapter row to config.adapters. --- Ships disabled; operator enables via GUI. --- Default event_types excludes EQ (USGS is canonical for earthquakes on central.quake.>). --- Idempotent: uses ON CONFLICT DO NOTHING. - -INSERT INTO config.adapters (name, enabled, cadence_s, settings) -VALUES ( - 'gdacs', - false, - 600, - jsonb_build_object('event_types', jsonb_build_array('WF', 'DR', 'FL', 'VO', 'TC')) -) -ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/021_add_central_disaster_stream.sql b/sql/migrations/021_add_central_disaster_stream.sql deleted file mode 100644 index b9192f0..0000000 --- a/sql/migrations/021_add_central_disaster_stream.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Migration: 021_add_central_disaster_stream --- Seeds the CENTRAL_DISASTER JetStream stream row for central.disaster.> subjects. --- 7-day retention, 1 GiB max_bytes -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE. --- Idempotent: uses ON CONFLICT DO NOTHING. - -INSERT INTO config.streams (name, max_age_s, max_bytes) -VALUES ('CENTRAL_DISASTER', 604800, 1073741824) -ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/gdacs.py b/src/central/adapters/gdacs.py deleted file mode 100644 index 9ff6d6e..0000000 --- a/src/central/adapters/gdacs.py +++ /dev/null @@ -1,478 +0,0 @@ -"""GDACS (Global Disaster Alert and Coordination System) adapter.""" - -import logging -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from email.utils import parsedate_to_datetime -from pathlib import Path -from typing import Any -from xml.etree import ElementTree as ET - -import aiohttp -from pydantic import BaseModel -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.config_models import AdapterConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - -GDACS_RSS_URL = "https://www.gdacs.org/xml/rss.xml" - -NS = { - "gdacs": "http://www.gdacs.org", - "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", - "georss": "http://www.georss.org/georss", - "dc": "http://purl.org/dc/elements/1.1/", -} - -_ALERTLEVEL_TO_SEVERITY = {"Green": 1, "Orange": 2, "Red": 3} - -DEFAULT_EVENT_TYPES = ["WF", "DR", "FL", "VO", "TC"] - - -def severity_from_alertlevel(level: str | None) -> int: - """Green=1, Orange=2, Red=3, default 0.""" - if not level: - return 0 - return _ALERTLEVEL_TO_SEVERITY.get(level.strip().capitalize(), 0) - - -def subject_for_country(country: str | None) -> str: - """Lowercase, hyphenate spaces. 'unknown' for missing/empty. Takes first if comma-separated.""" - if not country: - return "unknown" - first = country.split(",")[0].strip() - if not first: - return "unknown" - return first.lower().replace(" ", "-") - - -def parse_rfc822_utc(raw: str | None) -> datetime | None: - """Parse an RFC 822 datetime string to UTC datetime.""" - if not raw: - return None - try: - dt = parsedate_to_datetime(raw) - except (ValueError, TypeError): - return None - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt.astimezone(timezone.utc) - - -def parse_gdacs_bbox(raw: str | None) -> tuple[float, float, float, float] | None: - """Parse GDACS bbox 'lonmin lonmax latmin latmax' to Geo.bbox (minLon, minLat, maxLon, maxLat).""" - if not raw: - return None - try: - parts = [float(p) for p in raw.split()] - except ValueError: - return None - if len(parts) != 4: - return None - lon_min, lon_max, lat_min, lat_max = parts - return (lon_min, lat_min, lon_max, lat_max) - - -def init_gdacs_observed_table(db: sqlite3.Connection) -> None: - db.execute(""" - CREATE TABLE IF NOT EXISTS gdacs_observed ( - guid TEXT PRIMARY KEY, - country TEXT, - eventtype TEXT, - last_observed_at TEXT NOT NULL - ) - """) - db.commit() - - -def get_observed(db: sqlite3.Connection) -> dict[str, tuple[str | None, str | None]]: - cur = db.execute("SELECT guid, country, eventtype FROM gdacs_observed") - return {row[0]: (row[1], row[2]) for row in cur.fetchall()} - - -def mark_observed(db: sqlite3.Connection, guid: str, country: str | None, eventtype: str | None) -> None: - db.execute( - """ - INSERT INTO gdacs_observed (guid, country, eventtype, last_observed_at) - VALUES (?, ?, ?, CURRENT_TIMESTAMP) - ON CONFLICT (guid) DO UPDATE SET last_observed_at = CURRENT_TIMESTAMP - """, - (guid, country, eventtype), - ) - db.commit() - - -def mark_retired(db: sqlite3.Connection, guids: set[str]) -> None: - for guid in guids: - db.execute("DELETE FROM gdacs_observed WHERE guid = ?", (guid,)) - db.commit() - - -class GDACSSettings(BaseModel): - """Settings schema for GDACS adapter. - - event_types is the explicit allowlist of GDACS eventtype codes to publish. - EQ is intentionally absent from the default because USGS is the canonical - earthquake source for Central and quakes are already published on - central.quake.>. Operators can re-include "EQ" here if USGS is - temporarily unavailable or if the GDACS alertlevel triage signal is - operationally needed. - - Future-unknown eventtypes (anything GDACS may add later) are dropped by - default — opt in by adding the code to this list. - """ - - event_types: list[str] = DEFAULT_EVENT_TYPES.copy() - - -class GDACSAdapter(SourceAdapter): - """Global Disaster Alert and Coordination System adapter.""" - - name = "gdacs" - display_name = "GDACS — Global Disaster Alerts" - description = ( - "Global Disaster Alert and Coordination System events: wildfires, drought, " - "flood, volcano, and tropical cyclones with humanitarian-coordination triage signals." - ) - settings_schema = GDACSSettings - requires_api_key = None - api_key_field = None - wizard_order = None - default_cadence_s = 600 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - self.event_types: list[str] = list( - config.settings.get("event_types", DEFAULT_EVENT_TYPES) - ) - - async def startup(self) -> None: - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=60), - ) - self._db = sqlite3.connect(self._cursor_db_path) - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - init_gdacs_observed_table(self._db) - self._db.commit() - logger.info("GDACS adapter started", extra={"event_types": self.event_types}) - - async def shutdown(self) -> None: - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("GDACS adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - self.event_types = list( - new_config.settings.get("event_types", DEFAULT_EVENT_TYPES) - ) - logger.info("GDACS config updated", extra={"event_types": self.event_types}) - - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("GDACS swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - parts = event.category.split(".") - country_subj = subject_for_country(event.data.get("country")) - if len(parts) >= 3 and parts[-1] == "removed": - eventtype = parts[1] - return f"central.disaster.{eventtype}.removed.{country_subj}" - eventtype = (event.data.get("eventtype") or "").lower() or "unknown" - return f"central.disaster.{eventtype}.{country_subj}" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch(self) -> str: - if not self._session: - raise RuntimeError("Session not initialized") - async with self._session.get( - GDACS_RSS_URL, headers={"User-Agent": "Central/0.4"} - ) as resp: - resp.raise_for_status() - text = await resp.text() - return text - - async def poll(self) -> AsyncIterator[Event]: - if not self._db: - raise RuntimeError("Database not initialized") - - try: - content = await self._fetch() - except Exception as e: - logger.error("GDACS fetch failed", extra={"error": str(e)}) - raise - - try: - root = ET.fromstring(content) - except ET.ParseError as e: - logger.error("GDACS RSS parse error", extra={"error": str(e)}) - raise - - channel = root.find("channel") - if channel is None: - logger.info("GDACS fetch completed", extra={"item_count": 0}) - return - - items = channel.findall("item") - logger.info("GDACS fetch completed", extra={"item_count": len(items)}) - - observed_before = get_observed(self._db) - current_guids: set[str] = set() - events_yielded = 0 - - for item_elem in items: - guid_elem = item_elem.find("guid") - if guid_elem is None or not guid_elem.text: - continue - guid = guid_elem.text.strip() - if not guid: - continue - - eventtype_elem = item_elem.find("gdacs:eventtype", NS) - eventtype = ( - eventtype_elem.text.strip() - if eventtype_elem is not None and eventtype_elem.text - else "" - ) - - if eventtype not in self.event_types: - continue - - iscurrent_elem = item_elem.find("gdacs:iscurrent", NS) - iscurrent = True - if iscurrent_elem is not None and iscurrent_elem.text: - iscurrent = iscurrent_elem.text.strip().lower() == "true" - - country_elem = item_elem.find("gdacs:country", NS) - country = ( - country_elem.text.strip() - if country_elem is not None and country_elem.text - else None - ) - - iso3_elem = item_elem.find("gdacs:iso3", NS) - iso3 = ( - iso3_elem.text.strip() - if iso3_elem is not None and iso3_elem.text - else None - ) - - alertlevel_elem = item_elem.find("gdacs:alertlevel", NS) - alertlevel = ( - alertlevel_elem.text.strip() - if alertlevel_elem is not None and alertlevel_elem.text - else None - ) - - fromdate_elem = item_elem.find("gdacs:fromdate", NS) - event_time = parse_rfc822_utc( - fromdate_elem.text if fromdate_elem is not None else None - ) - if event_time is None: - pub_date_elem = item_elem.find("pubDate") - event_time = ( - parse_rfc822_utc(pub_date_elem.text if pub_date_elem is not None else None) - or datetime.now(timezone.utc) - ) - - bbox_elem = item_elem.find("gdacs:bbox", NS) - bbox = parse_gdacs_bbox( - bbox_elem.text if bbox_elem is not None else None - ) - - lat_elem = item_elem.find("geo:Point/geo:lat", NS) - lon_elem = item_elem.find("geo:Point/geo:long", NS) - centroid: tuple[float, float] | None = None - if lat_elem is not None and lon_elem is not None and lat_elem.text and lon_elem.text: - try: - centroid = (float(lon_elem.text), float(lat_elem.text)) - except ValueError: - centroid = None - - region = iso3 or country - regions = [region] if region else [] - primary_region = region - - title_elem = item_elem.find("title") - description_elem = item_elem.find("description") - link_elem = item_elem.find("link") - eventid_elem = item_elem.find("gdacs:eventid", NS) - alertscore_elem = item_elem.find("gdacs:alertscore", NS) - datemodified_elem = item_elem.find("gdacs:datemodified", NS) - - geo = Geo( - centroid=centroid, - bbox=bbox, - regions=regions, - primary_region=primary_region, - ) - - data: dict[str, Any] = { - "guid": guid, - "eventtype": eventtype, - "eventid": eventid_elem.text.strip() if eventid_elem is not None and eventid_elem.text else None, - "country": country, - "iso3": iso3, - "alertlevel": alertlevel, - "alertscore": alertscore_elem.text.strip() if alertscore_elem is not None and alertscore_elem.text else None, - "title": title_elem.text if title_elem is not None and title_elem.text else "", - "description": description_elem.text if description_elem is not None and description_elem.text else "", - "url": link_elem.text if link_elem is not None and link_elem.text else "", - "datemodified": datemodified_elem.text if datemodified_elem is not None and datemodified_elem.text else None, - "iscurrent": iscurrent, - } - - if not iscurrent: - # Explicit tombstone from upstream. Only emit if we previously observed it. - if guid in observed_before: - tombstone = Event( - id=f"{guid}:removed", - adapter=self.name, - category=f"disaster.{eventtype.lower()}.removed", - time=datetime.now(timezone.utc), - severity=0, - geo=geo, - data={**data, "reason": "iscurrent_false"}, - ) - if not self.is_published(tombstone.id): - yield tombstone - self.mark_published(tombstone.id) - events_yielded += 1 - continue - - current_guids.add(guid) - - if self.is_published(guid): - mark_observed(self._db, guid, country, eventtype) - continue - - event = Event( - id=guid, - adapter=self.name, - category=f"disaster.{eventtype.lower()}", - time=event_time, - severity=severity_from_alertlevel(alertlevel), - geo=geo, - data=data, - ) - - yield event - self.mark_published(guid) - mark_observed(self._db, guid, country, eventtype) - events_yielded += 1 - - # Fall-off: events present in observed_before but absent from this poll - fallen_off = set(observed_before.keys()) - current_guids - for guid in fallen_off: - prior_country, prior_eventtype = observed_before[guid] - if prior_eventtype and prior_eventtype not in self.event_types: - # Was published before settings narrowed; clean up silently. - mark_retired(self._db, {guid}) - continue - tombstone_id = f"{guid}:removed" - if self.is_published(tombstone_id): - mark_retired(self._db, {guid}) - continue - now = datetime.now(timezone.utc) - region = prior_country - geo = Geo( - regions=[region] if region else [], - primary_region=region, - ) - tombstone = Event( - id=tombstone_id, - adapter=self.name, - category=f"disaster.{(prior_eventtype or '').lower()}.removed", - time=now, - severity=0, - geo=geo, - data={ - "guid": guid, - "country": prior_country, - "eventtype": prior_eventtype, - "reason": "missing_from_feed", - }, - ) - yield tombstone - self.mark_published(tombstone_id) - mark_retired(self._db, {guid}) - events_yielded += 1 - - self.sweep_old_ids() - logger.info( - "GDACS poll completed", - extra={ - "events_yielded": events_yielded, - "current_observed": len(current_guids), - "fallen_off": len(fallen_off), - }, - ) diff --git a/src/central/archive.py b/src/central/archive.py index dfa5475..c173805 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -19,11 +19,14 @@ from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy from nats.js.errors import NotFoundError from central.bootstrap_config import get_settings -from central.streams import STREAMS as STREAM_REGISTRY -# Event-bearing streams to consume -- derived from the registry's event_bearing flag. -# CENTRAL_META is excluded because it carries status messages, not events. -STREAMS = [(s.name, s.subject_filter) for s in STREAM_REGISTRY if s.event_bearing] +# Event-bearing streams to consume (skip CENTRAL_META - status messages only) +STREAMS = [ + ("CENTRAL_WX", "central.wx.>"), + ("CENTRAL_FIRE", "central.fire.>"), + ("CENTRAL_QUAKE", "central.quake.>"), + ("CENTRAL_SPACE", "central.space.>"), +] BATCH_SIZE = 100 FETCH_TIMEOUT = 5.0 diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index c5baf44..b5c66f7 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -49,7 +49,6 @@ from functools import cache from central.gui.db import get_pool from central.gui.form_descriptors import describe_fields, FieldDescriptor from central.adapter_discovery import discover_adapters -from central.streams import STREAMS as STREAM_REGISTRY from pydantic import ValidationError @cache @@ -64,8 +63,8 @@ def _adapter_classes() -> dict: router = APIRouter() -# Streams to display on dashboard -- derived from the registry's dashboard flag. -DASHBOARD_STREAMS = [s.name for s in STREAM_REGISTRY if s.dashboard] +# Streams to display on dashboard +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"] # Email validation regex (simple but effective) ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") diff --git a/src/central/streams.py b/src/central/streams.py deleted file mode 100644 index e9e05f5..0000000 --- a/src/central/streams.py +++ /dev/null @@ -1,32 +0,0 @@ -"""Stream registry — single source of truth for NATS JetStream stream definitions. - -Subject-filter mappings live in code (structural; change only when code changes). -Retention / max_bytes live in config.streams (operator-tunable; ops state). - -Adding a stream: one StreamEntry below + one migration row that seeds -config.streams. supervisor STREAM_SUBJECTS / archive STREAMS / gui DASHBOARD_STREAMS -all derive automatically. -""" - -from dataclasses import dataclass - - -@dataclass(frozen=True) -class StreamEntry: - name: str - subject_filter: str - event_bearing: bool = True - """Whether central-archive consumes events from this stream into the events table. - False for status-only streams (CENTRAL_META) that the archive intentionally skips.""" - dashboard: bool = True - """Whether the GUI dashboard surfaces this stream's stats card.""" - - -STREAMS: list[StreamEntry] = [ - StreamEntry("CENTRAL_WX", "central.wx.>"), - StreamEntry("CENTRAL_FIRE", "central.fire.>"), - StreamEntry("CENTRAL_QUAKE", "central.quake.>"), - StreamEntry("CENTRAL_SPACE", "central.space.>"), - StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), - StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), -] diff --git a/src/central/supervisor.py b/src/central/supervisor.py index bdc68e6..81fa1bf 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -21,12 +21,16 @@ from central.config_source import ConfigSource, DbConfigSource from central.config_store import ConfigStore from central.bootstrap_config import get_settings from central.stream_manager import StreamManager -from central.streams import STREAMS as STREAM_REGISTRY CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") -# Stream subject mappings -- derived from the registry; every stream is included -# (META too: supervisor must create it in JetStream even though archive skips it). -STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY} +# Stream subject mappings +STREAM_SUBJECTS = { + "CENTRAL_WX": ["central.wx.>"], + "CENTRAL_META": ["central.meta.>"], + "CENTRAL_FIRE": ["central.fire.>"], + "CENTRAL_QUAKE": ["central.quake.>"], + "CENTRAL_SPACE": ["central.space.>"], +} # Recompute interval for stream max_bytes (1 hour) STREAM_RECOMPUTE_INTERVAL_S = 3600 diff --git a/tests/test_archive_multi_stream.py b/tests/test_archive_multi_stream.py index 0727bb1..bba544d 100644 --- a/tests/test_archive_multi_stream.py +++ b/tests/test_archive_multi_stream.py @@ -26,6 +26,35 @@ class TestConsumerNaming: assert consumer_name_for("CENTRAL_QUAKE") == "archive-central_quake" +class TestStreamsConfiguration: + """Test streams configuration.""" + + def test_streams_list_has_four_entries(self): + """STREAMS list has four event-bearing streams.""" + assert len(STREAMS) == 4 + + def test_streams_contains_central_wx(self): + """STREAMS contains CENTRAL_WX with correct filter.""" + assert ("CENTRAL_WX", "central.wx.>") in STREAMS + + def test_streams_contains_central_fire(self): + """STREAMS contains CENTRAL_FIRE with correct filter.""" + assert ("CENTRAL_FIRE", "central.fire.>") in STREAMS + + def test_streams_contains_central_quake(self): + """STREAMS contains CENTRAL_QUAKE with correct filter.""" + assert ("CENTRAL_QUAKE", "central.quake.>") in STREAMS + + def test_streams_contains_central_space(self): + """STREAMS contains CENTRAL_SPACE with correct filter.""" + assert ("CENTRAL_SPACE", "central.space.>") in STREAMS + + def test_streams_excludes_central_meta(self): + """STREAMS does not contain CENTRAL_META (status messages only).""" + stream_names = [s[0] for s in STREAMS] + assert "CENTRAL_META" not in stream_names + + class TestOrphanedConsumerCleanup: """Test cleanup of orphaned 'archive' consumer.""" diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py index 73a2ace..73df132 100644 --- a/tests/test_dashboard.py +++ b/tests/test_dashboard.py @@ -205,6 +205,7 @@ class TestDashboardStreamsIsolation: call_args = mock_templates.TemplateResponse.call_args context = call_args.kwargs.get("context", call_args[1].get("context")) streams = context["streams"] + assert len(streams) == 5 fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE") assert fire_stream.get("error") == "unavailable" wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX") diff --git a/tests/test_gdacs.py b/tests/test_gdacs.py deleted file mode 100644 index e87b493..0000000 --- a/tests/test_gdacs.py +++ /dev/null @@ -1,367 +0,0 @@ -"""Tests for GDACS adapter.""" - -from datetime import datetime, timezone -from pathlib import Path -from unittest.mock import AsyncMock, MagicMock - -import pytest - -from central.config_models import AdapterConfig -from central.models import Event - - -# Frozen RSS fixture mirroring real GDACS shape (namespaces + element layout). -SAMPLE_RSS = """ - - - GDACS RSS information - https://www.gdacs.org/ - Near real-time notification - Tue, 19 May 2026 06:35:01 GMT - - Green wildfire in Greece 18/05/2026 11:00 UTC - Wildfire in Attica region of Greece. - https://www.gdacs.org/report.aspx?eventtype=WF&eventid=2002001 - Mon, 18 May 2026 11:10:00 GMT - true - Mon, 18 May 2026 11:00:00 GMT - Tue, 19 May 2026 04:00:00 GMT - WF2002001 - - 38.0 - 23.7 - - 21.7 25.7 36.0 40.0 - WF - Green - 0 - 2002001 - GRC - Greece - - - Orange drought in United States of America 15/04/2026 - Multi-state drought. - https://www.gdacs.org/report.aspx?eventtype=DR&eventid=3003001 - Wed, 15 Apr 2026 00:00:00 GMT - true - Wed, 15 Apr 2026 00:00:00 GMT - DR3003001 - - 39.5 - -98.5 - - -110.0 -90.0 32.0 45.0 - DR - Orange - 1.5 - 3003001 - USA - United States of America - - - Green earthquake in Vanuatu - EQ Vanuatu - https://www.gdacs.org/report.aspx?eventtype=EQ&eventid=1541360 - Tue, 19 May 2026 02:41:13 GMT - true - Tue, 19 May 2026 02:29:24 GMT - EQ1541360 - - -18.15 - 168.09 - - EQ - Green - 1541360 - VUT - Vanuatu - - - Synthetic unknown eventtype - XX synthetic test - https://www.gdacs.org/report.aspx?eventtype=XX&eventid=999999 - Tue, 19 May 2026 00:00:00 GMT - true - Tue, 19 May 2026 00:00:00 GMT - XX999999 - XX - Green - 999999 - Nowhere - - -""" - - -# Same items but WF turned to iscurrent=false (tombstone scenario) -SAMPLE_RSS_WF_RETIRED = SAMPLE_RSS.replace( - "true\n Mon, 18 May 2026 11:00:00 GMT", - "false\n Mon, 18 May 2026 11:00:00 GMT", - 1, -) - - -# Just the DR + EQ + XX items, with WF removed entirely (missing-from-feed scenario) -SAMPLE_RSS_WF_MISSING = SAMPLE_RSS.replace( - """ - Green wildfire in Greece 18/05/2026 11:00 UTC - Wildfire in Attica region of Greece. - https://www.gdacs.org/report.aspx?eventtype=WF&eventid=2002001 - Mon, 18 May 2026 11:10:00 GMT - true - Mon, 18 May 2026 11:00:00 GMT - Tue, 19 May 2026 04:00:00 GMT - WF2002001 - - 38.0 - 23.7 - - 21.7 25.7 36.0 40.0 - WF - Green - 0 - 2002001 - GRC - Greece - - """, - "", - 1, -) - - -def _config(settings: dict | None = None) -> AdapterConfig: - return AdapterConfig( - name="gdacs", - enabled=True, - cadence_s=600, - settings=settings or {"event_types": ["WF", "DR", "FL", "VO", "TC"]}, - updated_at=datetime.now(timezone.utc), - ) - - -class TestGDACSHelpers: - def test_severity_from_alertlevel_green_orange_red(self): - from central.adapters.gdacs import severity_from_alertlevel - - assert severity_from_alertlevel("Green") == 1 - assert severity_from_alertlevel("Orange") == 2 - assert severity_from_alertlevel("Red") == 3 - assert severity_from_alertlevel(None) == 0 - assert severity_from_alertlevel("") == 0 - assert severity_from_alertlevel("Unknown") == 0 - # case-insensitive - assert severity_from_alertlevel("green") == 1 - assert severity_from_alertlevel("RED") == 3 - - def test_subject_for_lowercase_country(self): - from central.adapters.gdacs import subject_for_country - - assert subject_for_country("United States") == "united-states" - assert subject_for_country("Greece") == "greece" - assert subject_for_country("Solomon Islands") == "solomon-islands" - - def test_subject_for_unknown_country(self): - from central.adapters.gdacs import subject_for_country - - assert subject_for_country(None) == "unknown" - assert subject_for_country("") == "unknown" - assert subject_for_country(" ") == "unknown" - - def test_subject_for_multi_country_takes_first(self): - from central.adapters.gdacs import subject_for_country - - assert subject_for_country("Mozambique, Madagascar") == "mozambique" - - def test_parse_gdacs_bbox(self): - from central.adapters.gdacs import parse_gdacs_bbox - - # GDACS format: lonmin lonmax latmin latmax - # Geo.bbox: (minLon, minLat, maxLon, maxLat) - result = parse_gdacs_bbox("21.7 25.7 36.0 40.0") - assert result == (21.7, 36.0, 25.7, 40.0) - assert parse_gdacs_bbox(None) is None - assert parse_gdacs_bbox("") is None - assert parse_gdacs_bbox("not numbers") is None - - -class TestGDACSAdapter: - def test_class_attrs_complete(self): - from central.adapters.gdacs import GDACSAdapter, GDACSSettings - - assert GDACSAdapter.name == "gdacs" - assert isinstance(GDACSAdapter.display_name, str) and GDACSAdapter.display_name - assert isinstance(GDACSAdapter.description, str) and GDACSAdapter.description - assert GDACSAdapter.settings_schema is GDACSSettings - assert GDACSAdapter.requires_api_key is None - assert GDACSAdapter.api_key_field is None - assert GDACSAdapter.wizard_order is None - assert GDACSAdapter.default_cadence_s == 600 - - @pytest.mark.asyncio - async def test_normalization_basic_wf(self, tmp_path: Path): - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - - await adapter.startup() - events: list[Event] = [e async for e in adapter.poll()] - await adapter.shutdown() - - # WF + DR should yield; EQ + XX filtered. - assert len(events) == 2 - wf = next(e for e in events if e.data["eventtype"] == "WF") - assert wf.adapter == "gdacs" - assert wf.category == "disaster.wf" - assert wf.id == "WF2002001" - assert wf.severity == 1 # Green - assert wf.data["country"] == "Greece" - assert wf.data["iso3"] == "GRC" - assert wf.geo.centroid == (23.7, 38.0) - assert wf.geo.bbox == (21.7, 36.0, 25.7, 40.0) - assert wf.geo.primary_region == "GRC" - assert wf.geo.regions == ["GRC"] - - dr = next(e for e in events if e.data["eventtype"] == "DR") - assert dr.severity == 2 # Orange - assert dr.category == "disaster.dr" - assert dr.data["iso3"] == "USA" - - @pytest.mark.asyncio - async def test_eq_filtered_by_default(self, tmp_path: Path): - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - - await adapter.startup() - events = [e async for e in adapter.poll()] - await adapter.shutdown() - - # No EQ in default allowlist; EQ1541360 must not appear. - assert all(e.id != "EQ1541360" for e in events) - assert all(e.data["eventtype"] != "EQ" for e in events) - - @pytest.mark.asyncio - async def test_unknown_eventtype_filtered(self, tmp_path: Path): - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - - await adapter.startup() - events = [e async for e in adapter.poll()] - await adapter.shutdown() - - assert all(e.data["eventtype"] != "XX" for e in events) - - @pytest.mark.asyncio - async def test_settings_event_types_override(self, tmp_path: Path): - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter( - _config({"event_types": ["EQ"]}), - MagicMock(), - tmp_path / "cursors.db", - ) - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - - await adapter.startup() - events = [e async for e in adapter.poll()] - await adapter.shutdown() - - # Only EQ should yield now. - assert len(events) == 1 - assert events[0].id == "EQ1541360" - assert events[0].category == "disaster.eq" - - @pytest.mark.asyncio - async def test_dedup_by_guid(self, tmp_path: Path): - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - - await adapter.startup() - first_pass = [e async for e in adapter.poll()] - second_pass = [e async for e in adapter.poll()] - await adapter.shutdown() - - assert len(first_pass) == 2 - assert len(second_pass) == 0 - - @pytest.mark.asyncio - async def test_fall_off_iscurrent_false(self, tmp_path: Path): - """Item seen iscurrent=true then iscurrent=false -> tombstone.""" - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - await adapter.startup() - first_pass = [e async for e in adapter.poll()] - assert any(e.id == "WF2002001" for e in first_pass) - - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_RETIRED) - second_pass = [e async for e in adapter.poll()] - await adapter.shutdown() - - tombstones = [e for e in second_pass if e.category.endswith(".removed")] - assert len(tombstones) == 1 - ts = tombstones[0] - assert ts.id == "WF2002001:removed" - assert ts.category == "disaster.wf.removed" - assert ts.data["reason"] == "iscurrent_false" - # Subject form: central.disaster..removed. - assert adapter.subject_for(ts) == "central.disaster.wf.removed.greece" - - @pytest.mark.asyncio - async def test_fall_off_missing_from_feed(self, tmp_path: Path): - """Item seen, then completely missing from feed -> tombstone.""" - from central.adapters.gdacs import GDACSAdapter - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) - await adapter.startup() - _ = [e async for e in adapter.poll()] - - adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_MISSING) - second_pass = [e async for e in adapter.poll()] - await adapter.shutdown() - - tombstones = [e for e in second_pass if e.category.endswith(".removed")] - assert len(tombstones) == 1 - assert tombstones[0].id == "WF2002001:removed" - assert tombstones[0].category == "disaster.wf.removed" - assert tombstones[0].data["reason"] == "missing_from_feed" - - @pytest.mark.asyncio - async def test_subject_for_returns_country_path(self, tmp_path: Path): - from central.adapters.gdacs import GDACSAdapter - from central.models import Geo - - adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") - event = Event( - id="WF2002001", - adapter="gdacs", - category="disaster.wf", - time=datetime(2026, 5, 18, 11, tzinfo=timezone.utc), - severity=1, - geo=Geo(), - data={"eventtype": "WF", "country": "Greece"}, - ) - assert adapter.subject_for(event) == "central.disaster.wf.greece" - - event_unknown = Event( - id="DR1", - adapter="gdacs", - category="disaster.dr", - time=datetime(2026, 5, 18, tzinfo=timezone.utc), - severity=0, - geo=Geo(), - data={"eventtype": "DR", "country": None}, - ) - assert adapter.subject_for(event_unknown) == "central.disaster.dr.unknown" diff --git a/tests/test_stream_registry.py b/tests/test_stream_registry.py deleted file mode 100644 index b8b2678..0000000 --- a/tests/test_stream_registry.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Registry-consistency tests for src/central/streams.py. - -These are property tests, not literal restatements. Adding a new stream to the -registry requires no new test code -- every invariant here automatically -covers it. -""" - -import re - -from central.streams import STREAMS - - -def test_stream_names_unique(): - names = [s.name for s in STREAMS] - assert len(names) == len(set(names)), "duplicate stream names in registry" - - -def test_subject_filters_unique(): - filters = [s.subject_filter for s in STREAMS] - assert len(filters) == len(set(filters)), "duplicate subject filters in registry" - - -def test_subject_filter_central_prefix_wildcard(): - pattern = re.compile(r"^central\.[a-z][a-z_]*\.>$") - for s in STREAMS: - assert pattern.match(s.subject_filter), ( - f"{s.name}: subject_filter {s.subject_filter!r} does not match /^central\\.[a-z][a-z_]*\\.>$/" - ) - - -def test_meta_is_only_non_event_bearing(): - """CENTRAL_META is the only non-event-bearing stream today. - - If you're adding a second one, update this test deliberately -- the - archive will silently skip the new stream, which is rarely what you want. - """ - non_event = [s for s in STREAMS if not s.event_bearing] - assert len(non_event) == 1, ( - f"expected exactly one non-event-bearing stream, got {[s.name for s in non_event]}" - ) - assert non_event[0].name == "CENTRAL_META" - - -def test_supervisor_stream_subjects_includes_meta(): - """Supervisor creates every stream in JetStream, including META.""" - from central.supervisor import STREAM_SUBJECTS - - assert "CENTRAL_META" in STREAM_SUBJECTS - assert STREAM_SUBJECTS["CENTRAL_META"] == ["central.meta.>"] - - -def test_supervisor_stream_subjects_includes_all(): - """Every registry stream appears in supervisor's derived dict with the right filter.""" - from central.supervisor import STREAM_SUBJECTS - - assert set(STREAM_SUBJECTS.keys()) == {s.name for s in STREAMS} - for s in STREAMS: - assert STREAM_SUBJECTS[s.name] == [s.subject_filter] - - -def test_archive_streams_excludes_non_event_bearing(): - """Archive's STREAMS list contains exactly the event_bearing=True entries.""" - from central.archive import STREAMS as ARCHIVE_STREAMS - - expected = [(s.name, s.subject_filter) for s in STREAMS if s.event_bearing] - assert ARCHIVE_STREAMS == expected - archive_names = {name for name, _ in ARCHIVE_STREAMS} - for s in STREAMS: - if s.event_bearing: - assert s.name in archive_names - else: - assert s.name not in archive_names - - -def test_dashboard_streams_matches_dashboard_flag(): - """GUI's DASHBOARD_STREAMS matches [s.name for s in STREAMS if s.dashboard].""" - from central.gui.routes import DASHBOARD_STREAMS - - expected = [s.name for s in STREAMS if s.dashboard] - assert DASHBOARD_STREAMS == expected