diff --git a/sql/migrations/020_add_gdacs_adapter.sql b/sql/migrations/020_add_gdacs_adapter.sql new file mode 100644 index 0000000..20ac01f --- /dev/null +++ b/sql/migrations/020_add_gdacs_adapter.sql @@ -0,0 +1,14 @@ +-- Migration: 020_add_gdacs_adapter +-- Adds the GDACS adapter row to config.adapters. +-- Ships disabled; operator enables via GUI. +-- Default event_types excludes EQ (USGS is canonical for earthquakes on central.quake.>). +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'gdacs', + false, + 600, + jsonb_build_object('event_types', jsonb_build_array('WF', 'DR', 'FL', 'VO', 'TC')) +) +ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/021_add_central_disaster_stream.sql b/sql/migrations/021_add_central_disaster_stream.sql new file mode 100644 index 0000000..b9192f0 --- /dev/null +++ b/sql/migrations/021_add_central_disaster_stream.sql @@ -0,0 +1,8 @@ +-- Migration: 021_add_central_disaster_stream +-- Seeds the CENTRAL_DISASTER JetStream stream row for central.disaster.> subjects. +-- 7-day retention, 1 GiB max_bytes -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE / CENTRAL_SPACE. +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_DISASTER', 604800, 1073741824) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/gdacs.py b/src/central/adapters/gdacs.py new file mode 100644 index 0000000..f0af386 --- /dev/null +++ b/src/central/adapters/gdacs.py @@ -0,0 +1,476 @@ +"""GDACS (Global Disaster Alert and Coordination System) adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +from pathlib import Path +from typing import Any +from xml.etree import ElementTree as ET + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +GDACS_RSS_URL = "https://www.gdacs.org/xml/rss.xml" + +NS = { + "gdacs": "http://www.gdacs.org", + "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", + "georss": "http://www.georss.org/georss", + "dc": "http://purl.org/dc/elements/1.1/", +} + +_ALERTLEVEL_TO_SEVERITY = {"Green": 1, "Orange": 2, "Red": 3} + +DEFAULT_EVENT_TYPES = ["WF", "DR", "FL", "VO", "TC"] + + +def severity_from_alertlevel(level: str | None) -> int: + """Green=1, Orange=2, Red=3, default 0.""" + if not level: + return 0 + return _ALERTLEVEL_TO_SEVERITY.get(level.strip().capitalize(), 0) + + +def subject_for_country(country: str | None) -> str: + """Lowercase, hyphenate spaces. 'unknown' for missing/empty. Takes first if comma-separated.""" + if not country: + return "unknown" + first = country.split(",")[0].strip() + if not first: + return "unknown" + return first.lower().replace(" ", "-") + + +def parse_rfc822_utc(raw: str | None) -> datetime | None: + """Parse an RFC 822 datetime string to UTC datetime.""" + if not raw: + return None + try: + dt = parsedate_to_datetime(raw) + except (ValueError, TypeError): + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def parse_gdacs_bbox(raw: str | None) -> tuple[float, float, float, float] | None: + """Parse GDACS bbox 'lonmin lonmax latmin latmax' to Geo.bbox (minLon, minLat, maxLon, maxLat).""" + if not raw: + return None + try: + parts = [float(p) for p in raw.split()] + except ValueError: + return None + if len(parts) != 4: + return None + lon_min, lon_max, lat_min, lat_max = parts + return (lon_min, lat_min, lon_max, lat_max) + + +def init_gdacs_observed_table(db: sqlite3.Connection) -> None: + db.execute(""" + CREATE TABLE IF NOT EXISTS gdacs_observed ( + guid TEXT PRIMARY KEY, + country TEXT, + eventtype TEXT, + last_observed_at TEXT NOT NULL + ) + """) + db.commit() + + +def get_observed(db: sqlite3.Connection) -> dict[str, tuple[str | None, str | None]]: + cur = db.execute("SELECT guid, country, eventtype FROM gdacs_observed") + return {row[0]: (row[1], row[2]) for row in cur.fetchall()} + + +def mark_observed(db: sqlite3.Connection, guid: str, country: str | None, eventtype: str | None) -> None: + db.execute( + """ + INSERT INTO gdacs_observed (guid, country, eventtype, last_observed_at) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT (guid) DO UPDATE SET last_observed_at = CURRENT_TIMESTAMP + """, + (guid, country, eventtype), + ) + db.commit() + + +def mark_retired(db: sqlite3.Connection, guids: set[str]) -> None: + for guid in guids: + db.execute("DELETE FROM gdacs_observed WHERE guid = ?", (guid,)) + db.commit() + + +class GDACSSettings(BaseModel): + """Settings schema for GDACS adapter. + + event_types is the explicit allowlist of GDACS eventtype codes to publish. + EQ is intentionally absent from the default because USGS is the canonical + earthquake source for Central and quakes are already published on + central.quake.>. Operators can re-include "EQ" here if USGS is + temporarily unavailable or if the GDACS alertlevel triage signal is + operationally needed. + + Future-unknown eventtypes (anything GDACS may add later) are dropped by + default — opt in by adding the code to this list. + """ + + event_types: list[str] = DEFAULT_EVENT_TYPES.copy() + + +class GDACSAdapter(SourceAdapter): + """Global Disaster Alert and Coordination System adapter.""" + + name = "gdacs" + display_name = "GDACS — Global Disaster Alerts" + description = ( + "Global Disaster Alert and Coordination System events: wildfires, drought, " + "flood, volcano, and tropical cyclones with humanitarian-coordination triage signals." + ) + settings_schema = GDACSSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self.event_types: list[str] = list( + config.settings.get("event_types", DEFAULT_EVENT_TYPES) + ) + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + init_gdacs_observed_table(self._db) + self._db.commit() + logger.info("GDACS adapter started", extra={"event_types": self.event_types}) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("GDACS adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + self.event_types = list( + new_config.settings.get("event_types", DEFAULT_EVENT_TYPES) + ) + logger.info("GDACS config updated", extra={"event_types": self.event_types}) + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("GDACS swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + country_subj = subject_for_country(event.data.get("country")) + if event.category.startswith("disaster.removed"): + return f"central.disaster.removed.{country_subj}" + eventtype = (event.data.get("eventtype") or "").lower() or "unknown" + return f"central.disaster.{eventtype}.{country_subj}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> str: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + GDACS_RSS_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + text = await resp.text() + return text + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + content = await self._fetch() + except Exception as e: + logger.error("GDACS fetch failed", extra={"error": str(e)}) + raise + + try: + root = ET.fromstring(content) + except ET.ParseError as e: + logger.error("GDACS RSS parse error", extra={"error": str(e)}) + raise + + channel = root.find("channel") + if channel is None: + logger.info("GDACS fetch completed", extra={"item_count": 0}) + return + + items = channel.findall("item") + logger.info("GDACS fetch completed", extra={"item_count": len(items)}) + + observed_before = get_observed(self._db) + current_guids: set[str] = set() + events_yielded = 0 + + for item_elem in items: + guid_elem = item_elem.find("guid") + if guid_elem is None or not guid_elem.text: + continue + guid = guid_elem.text.strip() + if not guid: + continue + + eventtype_elem = item_elem.find("gdacs:eventtype", NS) + eventtype = ( + eventtype_elem.text.strip() + if eventtype_elem is not None and eventtype_elem.text + else "" + ) + + if eventtype not in self.event_types: + continue + + iscurrent_elem = item_elem.find("gdacs:iscurrent", NS) + iscurrent = True + if iscurrent_elem is not None and iscurrent_elem.text: + iscurrent = iscurrent_elem.text.strip().lower() == "true" + + country_elem = item_elem.find("gdacs:country", NS) + country = ( + country_elem.text.strip() + if country_elem is not None and country_elem.text + else None + ) + + iso3_elem = item_elem.find("gdacs:iso3", NS) + iso3 = ( + iso3_elem.text.strip() + if iso3_elem is not None and iso3_elem.text + else None + ) + + alertlevel_elem = item_elem.find("gdacs:alertlevel", NS) + alertlevel = ( + alertlevel_elem.text.strip() + if alertlevel_elem is not None and alertlevel_elem.text + else None + ) + + fromdate_elem = item_elem.find("gdacs:fromdate", NS) + event_time = parse_rfc822_utc( + fromdate_elem.text if fromdate_elem is not None else None + ) + if event_time is None: + pub_date_elem = item_elem.find("pubDate") + event_time = ( + parse_rfc822_utc(pub_date_elem.text if pub_date_elem is not None else None) + or datetime.now(timezone.utc) + ) + + bbox_elem = item_elem.find("gdacs:bbox", NS) + bbox = parse_gdacs_bbox( + bbox_elem.text if bbox_elem is not None else None + ) + + lat_elem = item_elem.find("geo:Point/geo:lat", NS) + lon_elem = item_elem.find("geo:Point/geo:long", NS) + centroid: tuple[float, float] | None = None + if lat_elem is not None and lon_elem is not None and lat_elem.text and lon_elem.text: + try: + centroid = (float(lon_elem.text), float(lat_elem.text)) + except ValueError: + centroid = None + + region = iso3 or country + regions = [region] if region else [] + primary_region = region + + title_elem = item_elem.find("title") + description_elem = item_elem.find("description") + link_elem = item_elem.find("link") + eventid_elem = item_elem.find("gdacs:eventid", NS) + alertscore_elem = item_elem.find("gdacs:alertscore", NS) + datemodified_elem = item_elem.find("gdacs:datemodified", NS) + + geo = Geo( + centroid=centroid, + bbox=bbox, + regions=regions, + primary_region=primary_region, + ) + + data: dict[str, Any] = { + "guid": guid, + "eventtype": eventtype, + "eventid": eventid_elem.text.strip() if eventid_elem is not None and eventid_elem.text else None, + "country": country, + "iso3": iso3, + "alertlevel": alertlevel, + "alertscore": alertscore_elem.text.strip() if alertscore_elem is not None and alertscore_elem.text else None, + "title": title_elem.text if title_elem is not None and title_elem.text else "", + "description": description_elem.text if description_elem is not None and description_elem.text else "", + "url": link_elem.text if link_elem is not None and link_elem.text else "", + "datemodified": datemodified_elem.text if datemodified_elem is not None and datemodified_elem.text else None, + "iscurrent": iscurrent, + } + + if not iscurrent: + # Explicit tombstone from upstream. Only emit if we previously observed it. + if guid in observed_before: + tombstone = Event( + id=f"{guid}:removed", + adapter=self.name, + category=f"disaster.removed.{eventtype.lower()}", + time=datetime.now(timezone.utc), + severity=0, + geo=geo, + data={**data, "reason": "iscurrent_false"}, + ) + if not self.is_published(tombstone.id): + yield tombstone + self.mark_published(tombstone.id) + events_yielded += 1 + continue + + current_guids.add(guid) + + if self.is_published(guid): + mark_observed(self._db, guid, country, eventtype) + continue + + event = Event( + id=guid, + adapter=self.name, + category=f"disaster.{eventtype.lower()}", + time=event_time, + severity=severity_from_alertlevel(alertlevel), + geo=geo, + data=data, + ) + + yield event + self.mark_published(guid) + mark_observed(self._db, guid, country, eventtype) + events_yielded += 1 + + # Fall-off: events present in observed_before but absent from this poll + fallen_off = set(observed_before.keys()) - current_guids + for guid in fallen_off: + prior_country, prior_eventtype = observed_before[guid] + if prior_eventtype and prior_eventtype not in self.event_types: + # Was published before settings narrowed; clean up silently. + mark_retired(self._db, {guid}) + continue + tombstone_id = f"{guid}:removed" + if self.is_published(tombstone_id): + mark_retired(self._db, {guid}) + continue + now = datetime.now(timezone.utc) + region = prior_country + geo = Geo( + regions=[region] if region else [], + primary_region=region, + ) + tombstone = Event( + id=tombstone_id, + adapter=self.name, + category=f"disaster.removed.{(prior_eventtype or '').lower()}", + time=now, + severity=0, + geo=geo, + data={ + "guid": guid, + "country": prior_country, + "eventtype": prior_eventtype, + "reason": "missing_from_feed", + }, + ) + yield tombstone + self.mark_published(tombstone_id) + mark_retired(self._db, {guid}) + events_yielded += 1 + + self.sweep_old_ids() + logger.info( + "GDACS poll completed", + extra={ + "events_yielded": events_yielded, + "current_observed": len(current_guids), + "fallen_off": len(fallen_off), + }, + ) diff --git a/src/central/archive.py b/src/central/archive.py index c173805..18d7c12 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -26,6 +26,7 @@ STREAMS = [ ("CENTRAL_FIRE", "central.fire.>"), ("CENTRAL_QUAKE", "central.quake.>"), ("CENTRAL_SPACE", "central.space.>"), + ("CENTRAL_DISASTER", "central.disaster.>"), ] BATCH_SIZE = 100 diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index b5c66f7..cb835e4 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -64,7 +64,7 @@ def _adapter_classes() -> dict: router = APIRouter() # Streams to display on dashboard -DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"] +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_DISASTER", "CENTRAL_META"] # Email validation regex (simple but effective) ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 81fa1bf..7495561 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -30,6 +30,7 @@ STREAM_SUBJECTS = { "CENTRAL_FIRE": ["central.fire.>"], "CENTRAL_QUAKE": ["central.quake.>"], "CENTRAL_SPACE": ["central.space.>"], + "CENTRAL_DISASTER": ["central.disaster.>"], } # Recompute interval for stream max_bytes (1 hour) diff --git a/tests/test_archive_multi_stream.py b/tests/test_archive_multi_stream.py index bba544d..21d810a 100644 --- a/tests/test_archive_multi_stream.py +++ b/tests/test_archive_multi_stream.py @@ -29,9 +29,9 @@ class TestConsumerNaming: class TestStreamsConfiguration: """Test streams configuration.""" - def test_streams_list_has_four_entries(self): - """STREAMS list has four event-bearing streams.""" - assert len(STREAMS) == 4 + def test_streams_list_has_five_entries(self): + """STREAMS list has five event-bearing streams.""" + assert len(STREAMS) == 5 def test_streams_contains_central_wx(self): """STREAMS contains CENTRAL_WX with correct filter.""" @@ -49,6 +49,10 @@ class TestStreamsConfiguration: """STREAMS contains CENTRAL_SPACE with correct filter.""" assert ("CENTRAL_SPACE", "central.space.>") in STREAMS + def test_streams_contains_central_disaster(self): + """STREAMS contains CENTRAL_DISASTER with correct filter.""" + assert ("CENTRAL_DISASTER", "central.disaster.>") in STREAMS + def test_streams_excludes_central_meta(self): """STREAMS does not contain CENTRAL_META (status messages only).""" stream_names = [s[0] for s in STREAMS] diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py index 73df132..9ea53a6 100644 --- a/tests/test_dashboard.py +++ b/tests/test_dashboard.py @@ -205,7 +205,7 @@ class TestDashboardStreamsIsolation: call_args = mock_templates.TemplateResponse.call_args context = call_args.kwargs.get("context", call_args[1].get("context")) streams = context["streams"] - assert len(streams) == 5 + assert len(streams) == 6 fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE") assert fire_stream.get("error") == "unavailable" wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX") diff --git a/tests/test_gdacs.py b/tests/test_gdacs.py new file mode 100644 index 0000000..b9c29d0 --- /dev/null +++ b/tests/test_gdacs.py @@ -0,0 +1,366 @@ +"""Tests for GDACS adapter.""" + +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.config_models import AdapterConfig +from central.models import Event + + +# Frozen RSS fixture mirroring real GDACS shape (namespaces + element layout). +SAMPLE_RSS = """ + + + GDACS RSS information + https://www.gdacs.org/ + Near real-time notification + Tue, 19 May 2026 06:35:01 GMT + + Green wildfire in Greece 18/05/2026 11:00 UTC + Wildfire in Attica region of Greece. + https://www.gdacs.org/report.aspx?eventtype=WF&eventid=2002001 + Mon, 18 May 2026 11:10:00 GMT + true + Mon, 18 May 2026 11:00:00 GMT + Tue, 19 May 2026 04:00:00 GMT + WF2002001 + + 38.0 + 23.7 + + 21.7 25.7 36.0 40.0 + WF + Green + 0 + 2002001 + GRC + Greece + + + Orange drought in United States of America 15/04/2026 + Multi-state drought. + https://www.gdacs.org/report.aspx?eventtype=DR&eventid=3003001 + Wed, 15 Apr 2026 00:00:00 GMT + true + Wed, 15 Apr 2026 00:00:00 GMT + DR3003001 + + 39.5 + -98.5 + + -110.0 -90.0 32.0 45.0 + DR + Orange + 1.5 + 3003001 + USA + United States of America + + + Green earthquake in Vanuatu + EQ Vanuatu + https://www.gdacs.org/report.aspx?eventtype=EQ&eventid=1541360 + Tue, 19 May 2026 02:41:13 GMT + true + Tue, 19 May 2026 02:29:24 GMT + EQ1541360 + + -18.15 + 168.09 + + EQ + Green + 1541360 + VUT + Vanuatu + + + Synthetic unknown eventtype + XX synthetic test + https://www.gdacs.org/report.aspx?eventtype=XX&eventid=999999 + Tue, 19 May 2026 00:00:00 GMT + true + Tue, 19 May 2026 00:00:00 GMT + XX999999 + XX + Green + 999999 + Nowhere + + +""" + + +# Same items but WF turned to iscurrent=false (tombstone scenario) +SAMPLE_RSS_WF_RETIRED = SAMPLE_RSS.replace( + "true\n Mon, 18 May 2026 11:00:00 GMT", + "false\n Mon, 18 May 2026 11:00:00 GMT", + 1, +) + + +# Just the DR + EQ + XX items, with WF removed entirely (missing-from-feed scenario) +SAMPLE_RSS_WF_MISSING = SAMPLE_RSS.replace( + """ + Green wildfire in Greece 18/05/2026 11:00 UTC + Wildfire in Attica region of Greece. + https://www.gdacs.org/report.aspx?eventtype=WF&eventid=2002001 + Mon, 18 May 2026 11:10:00 GMT + true + Mon, 18 May 2026 11:00:00 GMT + Tue, 19 May 2026 04:00:00 GMT + WF2002001 + + 38.0 + 23.7 + + 21.7 25.7 36.0 40.0 + WF + Green + 0 + 2002001 + GRC + Greece + + """, + "", + 1, +) + + +def _config(settings: dict | None = None) -> AdapterConfig: + return AdapterConfig( + name="gdacs", + enabled=True, + cadence_s=600, + settings=settings or {"event_types": ["WF", "DR", "FL", "VO", "TC"]}, + updated_at=datetime.now(timezone.utc), + ) + + +class TestGDACSHelpers: + def test_severity_from_alertlevel_green_orange_red(self): + from central.adapters.gdacs import severity_from_alertlevel + + assert severity_from_alertlevel("Green") == 1 + assert severity_from_alertlevel("Orange") == 2 + assert severity_from_alertlevel("Red") == 3 + assert severity_from_alertlevel(None) == 0 + assert severity_from_alertlevel("") == 0 + assert severity_from_alertlevel("Unknown") == 0 + # case-insensitive + assert severity_from_alertlevel("green") == 1 + assert severity_from_alertlevel("RED") == 3 + + def test_subject_for_lowercase_country(self): + from central.adapters.gdacs import subject_for_country + + assert subject_for_country("United States") == "united-states" + assert subject_for_country("Greece") == "greece" + assert subject_for_country("Solomon Islands") == "solomon-islands" + + def test_subject_for_unknown_country(self): + from central.adapters.gdacs import subject_for_country + + assert subject_for_country(None) == "unknown" + assert subject_for_country("") == "unknown" + assert subject_for_country(" ") == "unknown" + + def test_subject_for_multi_country_takes_first(self): + from central.adapters.gdacs import subject_for_country + + assert subject_for_country("Mozambique, Madagascar") == "mozambique" + + def test_parse_gdacs_bbox(self): + from central.adapters.gdacs import parse_gdacs_bbox + + # GDACS format: lonmin lonmax latmin latmax + # Geo.bbox: (minLon, minLat, maxLon, maxLat) + result = parse_gdacs_bbox("21.7 25.7 36.0 40.0") + assert result == (21.7, 36.0, 25.7, 40.0) + assert parse_gdacs_bbox(None) is None + assert parse_gdacs_bbox("") is None + assert parse_gdacs_bbox("not numbers") is None + + +class TestGDACSAdapter: + def test_class_attrs_complete(self): + from central.adapters.gdacs import GDACSAdapter, GDACSSettings + + assert GDACSAdapter.name == "gdacs" + assert isinstance(GDACSAdapter.display_name, str) and GDACSAdapter.display_name + assert isinstance(GDACSAdapter.description, str) and GDACSAdapter.description + assert GDACSAdapter.settings_schema is GDACSSettings + assert GDACSAdapter.requires_api_key is None + assert GDACSAdapter.api_key_field is None + assert GDACSAdapter.wizard_order is None + assert GDACSAdapter.default_cadence_s == 600 + + @pytest.mark.asyncio + async def test_normalization_basic_wf(self, tmp_path: Path): + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + + await adapter.startup() + events: list[Event] = [e async for e in adapter.poll()] + await adapter.shutdown() + + # WF + DR should yield; EQ + XX filtered. + assert len(events) == 2 + wf = next(e for e in events if e.data["eventtype"] == "WF") + assert wf.adapter == "gdacs" + assert wf.category == "disaster.wf" + assert wf.id == "WF2002001" + assert wf.severity == 1 # Green + assert wf.data["country"] == "Greece" + assert wf.data["iso3"] == "GRC" + assert wf.geo.centroid == (23.7, 38.0) + assert wf.geo.bbox == (21.7, 36.0, 25.7, 40.0) + assert wf.geo.primary_region == "GRC" + assert wf.geo.regions == ["GRC"] + + dr = next(e for e in events if e.data["eventtype"] == "DR") + assert dr.severity == 2 # Orange + assert dr.category == "disaster.dr" + assert dr.data["iso3"] == "USA" + + @pytest.mark.asyncio + async def test_eq_filtered_by_default(self, tmp_path: Path): + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + # No EQ in default allowlist; EQ1541360 must not appear. + assert all(e.id != "EQ1541360" for e in events) + assert all(e.data["eventtype"] != "EQ" for e in events) + + @pytest.mark.asyncio + async def test_unknown_eventtype_filtered(self, tmp_path: Path): + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert all(e.data["eventtype"] != "XX" for e in events) + + @pytest.mark.asyncio + async def test_settings_event_types_override(self, tmp_path: Path): + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter( + _config({"event_types": ["EQ"]}), + MagicMock(), + tmp_path / "cursors.db", + ) + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + # Only EQ should yield now. + assert len(events) == 1 + assert events[0].id == "EQ1541360" + assert events[0].category == "disaster.eq" + + @pytest.mark.asyncio + async def test_dedup_by_guid(self, tmp_path: Path): + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert len(first_pass) == 2 + assert len(second_pass) == 0 + + @pytest.mark.asyncio + async def test_fall_off_iscurrent_false(self, tmp_path: Path): + """Item seen iscurrent=true then iscurrent=false -> tombstone.""" + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + assert any(e.id == "WF2002001" for e in first_pass) + + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_RETIRED) + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + tombstones = [e for e in second_pass if e.category.startswith("disaster.removed")] + assert len(tombstones) == 1 + ts = tombstones[0] + assert ts.id == "WF2002001:removed" + assert ts.category == "disaster.removed.wf" + assert ts.data["reason"] == "iscurrent_false" + # Subject form: central.disaster.removed. + assert adapter.subject_for(ts) == "central.disaster.removed.greece" + + @pytest.mark.asyncio + async def test_fall_off_missing_from_feed(self, tmp_path: Path): + """Item seen, then completely missing from feed -> tombstone.""" + from central.adapters.gdacs import GDACSAdapter + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS) + await adapter.startup() + _ = [e async for e in adapter.poll()] + + adapter._fetch = AsyncMock(return_value=SAMPLE_RSS_WF_MISSING) + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + tombstones = [e for e in second_pass if e.category.startswith("disaster.removed")] + assert len(tombstones) == 1 + assert tombstones[0].id == "WF2002001:removed" + assert tombstones[0].data["reason"] == "missing_from_feed" + + @pytest.mark.asyncio + async def test_subject_for_returns_country_path(self, tmp_path: Path): + from central.adapters.gdacs import GDACSAdapter + from central.models import Geo + + adapter = GDACSAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + event = Event( + id="WF2002001", + adapter="gdacs", + category="disaster.wf", + time=datetime(2026, 5, 18, 11, tzinfo=timezone.utc), + severity=1, + geo=Geo(), + data={"eventtype": "WF", "country": "Greece"}, + ) + assert adapter.subject_for(event) == "central.disaster.wf.greece" + + event_unknown = Event( + id="DR1", + adapter="gdacs", + category="disaster.dr", + time=datetime(2026, 5, 18, tzinfo=timezone.utc), + severity=0, + geo=Geo(), + data={"eventtype": "DR", "country": None}, + ) + assert adapter.subject_for(event_unknown) == "central.disaster.dr.unknown"