From 0b26bf902a294ccafe6f59b2120c70647fed887d Mon Sep 17 00:00:00 2001 From: zvx Date: Tue, 19 May 2026 15:35:25 +0000 Subject: [PATCH] feat(2-F): NASA EONET disaster adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the NASA Earth Observatory Natural Event Tracker (EONET v3) adapter, publishing on the existing CENTRAL_DISASTER stream under central.disaster.eonet..global subjects. - One Central event per EONET event id; geo = most-recent geometry point. - Composite dedup key (eonet::) — timeline advance re-publishes, idle re-poll suppresses. - category_allowlist defaults to all 13 upstream categories; operator opts OUT per-category if GDACS overlap (wildfires/floods/severeStorms/volcanoes) produces unwanted dupes on gdacs.* subjects. - camelCase upstream IDs (seaLakeIce, dustHaze, etc.) mapped to lower_snake_case subject components by a single _subject_category helper. - Country resolves to literal 'global' (no reverse-geocode in v1). - Fall-off: missing-from-feed event emits central.disaster.eonet..removed.global, subtype before 'removed' per §8 canonical pattern. Adapter ships disabled; operator enables via GUI. --- sql/migrations/022_add_eonet_adapter.sql | 21 ++ src/central/adapters/eonet.py | 434 +++++++++++++++++++++++ tests/fixtures/eonet_sample.json | 108 ++++++ tests/test_eonet.py | 252 +++++++++++++ 4 files changed, 815 insertions(+) create mode 100644 sql/migrations/022_add_eonet_adapter.sql create mode 100644 src/central/adapters/eonet.py create mode 100644 tests/fixtures/eonet_sample.json create mode 100644 tests/test_eonet.py diff --git a/sql/migrations/022_add_eonet_adapter.sql b/sql/migrations/022_add_eonet_adapter.sql new file mode 100644 index 0000000..b88552f --- /dev/null +++ b/sql/migrations/022_add_eonet_adapter.sql @@ -0,0 +1,21 @@ +-- Migration: 022_add_eonet_adapter +-- Adds the NASA EONET adapter row to config.adapters. +-- Ships disabled; operator enables via GUI. +-- +-- The settings JSON below is the literal output of EONETSettings().model_dump_json() +-- at migration-author time. Regenerate via: +-- sudo -u central .venv/bin/python -c \ +-- "from central.adapters.eonet import EONETSettings; print(EONETSettings().model_dump_json())" +-- Do NOT hand-edit the category_allowlist here — _DEFAULT_CATEGORIES in +-- src/central/adapters/eonet.py is the single source of truth. +-- +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'eonet', + false, + 1800, + '{"category_allowlist":["drought","dustHaze","earthquakes","floods","landslides","manmade","seaLakeIce","severeStorms","snow","tempExtremes","volcanoes","waterColor","wildfires"],"region":null}'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/eonet.py b/src/central/adapters/eonet.py new file mode 100644 index 0000000..ac4d46e --- /dev/null +++ b/src/central/adapters/eonet.py @@ -0,0 +1,434 @@ +"""NASA EONET (Earth Observatory Natural Event Tracker) adapter.""" + +import json +import logging +import re +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel, Field +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +EONET_EVENTS_URL = "https://eonet.gsfc.nasa.gov/api/v3/events" + +# Single source of truth for default category list. Mirrors the upstream +# /api/v3/categories registry at the time of integration. Do NOT duplicate +# this list in tests, fixtures, or migrations — derive from EONETSettings' +# default instead. Refresh by curling /api/v3/categories if upstream adds +# new IDs. +_DEFAULT_CATEGORIES: list[str] = [ + "drought", + "dustHaze", + "earthquakes", + "floods", + "landslides", + "manmade", + "seaLakeIce", + "severeStorms", + "snow", + "tempExtremes", + "volcanoes", + "waterColor", + "wildfires", +] + + +def _subject_category(category_id: str | None) -> str: + """Convert upstream EONET camelCase category id to lower_snake_case subject component. + + Examples: seaLakeIce -> sea_lake_ice, dustHaze -> dust_haze, + severeStorms -> severe_storms. Single-word ids pass through lowercased. + Empty/None -> 'unknown'. This is the ONLY place this mapping lives. + """ + if not category_id: + return "unknown" + return re.sub(r"(? datetime | None: + """Parse an ISO 8601 timestamp ('...Z' or with offset) to UTC datetime.""" + if not raw: + return None + try: + dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _dedup_key(event_id: str, latest_geometry_date_iso: str) -> str: + """Composite dedup key: same id + same timeline -> suppress; timeline advance -> re-publish.""" + return f"eonet:{event_id}:{latest_geometry_date_iso}" + + +def init_eonet_observed_table(db: sqlite3.Connection) -> None: + db.execute(""" + CREATE TABLE IF NOT EXISTS eonet_observed ( + event_id TEXT PRIMARY KEY, + category_id TEXT, + last_observed_at TEXT NOT NULL + ) + """) + db.commit() + + +def get_observed(db: sqlite3.Connection) -> dict[str, str | None]: + cur = db.execute("SELECT event_id, category_id FROM eonet_observed") + return {row[0]: row[1] for row in cur.fetchall()} + + +def mark_observed(db: sqlite3.Connection, event_id: str, category_id: str | None) -> None: + db.execute( + """ + INSERT INTO eonet_observed (event_id, category_id, last_observed_at) + VALUES (?, ?, CURRENT_TIMESTAMP) + ON CONFLICT (event_id) DO UPDATE SET + category_id = excluded.category_id, + last_observed_at = CURRENT_TIMESTAMP + """, + (event_id, category_id), + ) + db.commit() + + +def mark_retired(db: sqlite3.Connection, event_ids: set[str]) -> None: + for event_id in event_ids: + db.execute("DELETE FROM eonet_observed WHERE event_id = ?", (event_id,)) + db.commit() + + +class EONETSettings(BaseModel): + """Settings schema for NASA EONET adapter. + + category_allowlist defaults to ALL upstream categories. PM call: keep the + knob symmetric with GDACS event_types — operator opts OUT per-category if + duplicate events on gdacs.* and eonet.* subjects become a problem in + practice. Empirical note: in a 200-event upstream sample, ~77.5% of events + for wildfires/floods/severeStorms/volcanoes were GDACS-sourced. Disable + those categories here if downstream subscribers already consume the + GDACS adapter. + """ + + category_allowlist: list[str] = Field(default=list(_DEFAULT_CATEGORIES)) + region: RegionConfig | None = None + + +class EONETAdapter(SourceAdapter): + """NASA EONET v3 natural-event tracker adapter.""" + + name = "eonet" + display_name = "NASA EONET — Earth Observatory" + description = ( + "Natural event tracker from NASA EONET v3. Note: heavy GDACS overlap " + "for wildfires/floods/severeStorms/volcanoes — disable per-category " + "via the allowlist below if duplicate events on gdacs.* and eonet.* " + "subjects are not wanted." + ) + settings_schema = EONETSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 1800 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self.category_allowlist: list[str] = list( + config.settings.get("category_allowlist", _DEFAULT_CATEGORIES) + ) + region_dict = config.settings.get("region") + self.region: RegionConfig | None = ( + RegionConfig(**region_dict) if region_dict else None + ) + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + init_eonet_observed_table(self._db) + self._db.commit() + logger.info( + "EONET adapter started", + extra={ + "categories": self.category_allowlist, + "region": self.region.model_dump() if self.region else None, + }, + ) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("EONET adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + self.category_allowlist = list( + new_config.settings.get("category_allowlist", _DEFAULT_CATEGORIES) + ) + region_dict = new_config.settings.get("region") + self.region = RegionConfig(**region_dict) if region_dict else None + logger.info( + "EONET config updated", + extra={ + "categories": self.category_allowlist, + "region": self.region.model_dump() if self.region else None, + }, + ) + + def is_published(self, dedup_key: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, dedup_key), + ) + return cur.fetchone() is not None + + def mark_published(self, dedup_key: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, dedup_key), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("EONET swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + # event.category is "disaster.eonet.[.removed]" + parts = event.category.split(".") + subj_cat = parts[2] if len(parts) >= 3 else "unknown" + if len(parts) >= 4 and parts[-1] == "removed": + return f"central.disaster.eonet.{subj_cat}.removed.global" + return f"central.disaster.eonet.{subj_cat}.global" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> str: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + EONET_EVENTS_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + text = await resp.text() + return text + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + content = await self._fetch() + except Exception as e: + logger.error("EONET fetch failed", extra={"error": str(e)}) + raise + + try: + payload = json.loads(content) + except json.JSONDecodeError as e: + logger.error("EONET JSON parse error", extra={"error": str(e)}) + raise + + items = payload.get("events", []) + logger.info("EONET fetch completed", extra={"item_count": len(items)}) + + observed_before = get_observed(self._db) + current_ids: set[str] = set() + events_yielded = 0 + + for item in items: + event_id = item.get("id") + if not event_id: + continue + + categories = item.get("categories") or [] + category_id: str | None = None + if categories and isinstance(categories[0], dict): + category_id = categories[0].get("id") + + if not category_id or category_id not in self.category_allowlist: + continue + + subject_cat = _subject_category(category_id) + + geometry = item.get("geometry") or [] + if geometry: + latest = max(geometry, key=lambda g: g.get("date") or "") + else: + latest = None + + latest_date_iso = (latest or {}).get("date") or "" + event_time = _parse_iso_utc(latest_date_iso) or datetime.now(timezone.utc) + + coords = (latest or {}).get("coordinates") + centroid: tuple[float, float] | None = None + if ( + isinstance(coords, list) + and len(coords) == 2 + and all(isinstance(c, (int, float)) for c in coords) + ): + centroid = (float(coords[0]), float(coords[1])) # GeoJSON (lon, lat) + + if self.region is not None: + if centroid is None: + continue + lon, lat = centroid + if not ( + self.region.west <= lon <= self.region.east + and self.region.south <= lat <= self.region.north + ): + continue + + current_ids.add(event_id) + + geo = Geo( + centroid=centroid, + bbox=None, + regions=[], + primary_region=None, + ) + + magnitude_value = (latest or {}).get("magnitudeValue") + magnitude_unit = (latest or {}).get("magnitudeUnit") + + sources = item.get("sources") or [] + data: dict[str, Any] = { + "event_id": event_id, + "category_id": category_id, + "title": item.get("title") or "", + "description": item.get("description") or "", + "url": item.get("link") or "", + "closed": item.get("closed"), + "sources": [ + {"id": s.get("id"), "url": s.get("url")} for s in sources + ], + "magnitudeValue": magnitude_value, + "magnitudeUnit": magnitude_unit, + "latest_geometry_date": latest_date_iso or None, + } + + dedup_key = _dedup_key(event_id, latest_date_iso) + + if self.is_published(dedup_key): + mark_observed(self._db, event_id, category_id) + continue + + event = Event( + id=event_id, + adapter=self.name, + category=f"disaster.eonet.{subject_cat}", + time=event_time, + severity=0, + geo=geo, + data=data, + ) + + yield event + self.mark_published(dedup_key) + mark_observed(self._db, event_id, category_id) + events_yielded += 1 + + # Fall-off: events present in observed_before but absent from this poll + fallen_off = set(observed_before.keys()) - current_ids + for event_id in fallen_off: + prior_category_id = observed_before[event_id] + if prior_category_id and prior_category_id not in self.category_allowlist: + # Was published before settings narrowed; clean up silently. + mark_retired(self._db, {event_id}) + continue + subject_cat = _subject_category(prior_category_id) + tombstone_id = f"{event_id}:removed" + tombstone_dedup_key = _dedup_key(tombstone_id, "") + if self.is_published(tombstone_dedup_key): + mark_retired(self._db, {event_id}) + continue + tombstone = Event( + id=tombstone_id, + adapter=self.name, + category=f"disaster.eonet.{subject_cat}.removed", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(), + data={ + "event_id": event_id, + "category_id": prior_category_id, + "reason": "missing_from_feed", + }, + ) + yield tombstone + self.mark_published(tombstone_dedup_key) + mark_retired(self._db, {event_id}) + events_yielded += 1 + + self.sweep_old_ids() + logger.info( + "EONET poll completed", + extra={ + "events_yielded": events_yielded, + "current_observed": len(current_ids), + "fallen_off": len(fallen_off), + }, + ) diff --git a/tests/fixtures/eonet_sample.json b/tests/fixtures/eonet_sample.json new file mode 100644 index 0000000..42c3f55 --- /dev/null +++ b/tests/fixtures/eonet_sample.json @@ -0,0 +1,108 @@ +{ + "title": "EONET Events (frozen test fixture)", + "description": "Trimmed from upstream /api/v3/events for tests. Real-shape entries from live data plus synthetic items to exercise categories absent from the live sample (seaLakeIce, dustHaze, tempExtremes) and multi-geometry. Do not refresh casually.", + "link": "https://eonet.gsfc.nasa.gov/api/v3/events", + "events": [ + { + "id": "EONET_20098", + "title": "Eagle Lake Fire Wildfire, Hancock, Iowa", + "description": null, + "link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_20098", + "closed": null, + "categories": [ + {"id": "wildfires", "title": "Wildfires"} + ], + "sources": [ + {"id": "IRWIN", "url": "https://irwin.doi.gov/observer/incidents/c283aec9-aab3-4720-addc-47bb6a433a32"} + ], + "geometry": [ + { + "magnitudeValue": 550.00, + "magnitudeUnit": "acres", + "date": "2026-05-14T11:04:00Z", + "type": "Point", + "coordinates": [-93.72, 43.11] + } + ] + }, + { + "id": "EONET_31000", + "title": "Synthetic Sea Ice off Greenland", + "description": "Synthetic test event for seaLakeIce category.", + "link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_31000", + "closed": null, + "categories": [ + {"id": "seaLakeIce", "title": "Sea and Lake Ice"} + ], + "sources": [], + "geometry": [ + { + "date": "2026-05-10T00:00:00Z", + "type": "Point", + "coordinates": [-60.0, 70.5] + } + ] + }, + { + "id": "EONET_42000", + "title": "Synthetic Sahara Dust", + "description": "Synthetic test event for dustHaze category.", + "link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_42000", + "closed": null, + "categories": [ + {"id": "dustHaze", "title": "Dust and Haze"} + ], + "sources": [], + "geometry": [ + { + "date": "2026-05-11T12:00:00Z", + "type": "Point", + "coordinates": [30.0, 25.0] + } + ] + }, + { + "id": "EONET_55000", + "title": "Synthetic Tropical Storm Path", + "description": "Synthetic multi-geometry event for severeStorms category.", + "link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_55000", + "closed": null, + "categories": [ + {"id": "severeStorms", "title": "Severe Storms"} + ], + "sources": [ + {"id": "JTWC", "url": "https://www.metoc.navy.mil/jtwc/jtwc.html"} + ], + "geometry": [ + { + "date": "2026-05-12T00:00:00Z", + "type": "Point", + "coordinates": [140.0, 18.0] + }, + { + "date": "2026-05-13T06:00:00Z", + "type": "Point", + "coordinates": [142.5, 19.2] + } + ] + }, + { + "id": "EONET_66000", + "title": "Synthetic Heat Wave", + "description": "Synthetic test event for tempExtremes category.", + "link": "https://eonet.gsfc.nasa.gov/api/v3/events/EONET_66000", + "closed": null, + "categories": [ + {"id": "tempExtremes", "title": "Temperature Extremes"} + ], + "sources": [], + "geometry": [ + { + "date": "2026-05-09T18:00:00Z", + "type": "Point", + "coordinates": [12.5, 41.9] + } + ] + } + ] +} diff --git a/tests/test_eonet.py b/tests/test_eonet.py new file mode 100644 index 0000000..324a147 --- /dev/null +++ b/tests/test_eonet.py @@ -0,0 +1,252 @@ +"""Tests for NASA EONET adapter.""" + +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.config_models import AdapterConfig +from central.models import Event, Geo + +FIXTURE_PATH = Path(__file__).parent / "fixtures" / "eonet_sample.json" + + +def _fixture_text() -> str: + return FIXTURE_PATH.read_text() + + +def _fixture_json() -> dict: + return json.loads(_fixture_text()) + + +def _config(settings: dict | None = None) -> AdapterConfig: + return AdapterConfig( + name="eonet", + enabled=True, + cadence_s=1800, + settings=settings or {}, + updated_at=datetime.now(timezone.utc), + ) + + +class TestEONETHelpers: + def test_camelcase_subject_conversion(self): + """Verify the camelCase -> lower_snake_case conversion for every default category id. + + Inputs are read from _DEFAULT_CATEGORIES, the single source of truth — no + per-test hardcoded list of category strings. + """ + from central.adapters.eonet import _DEFAULT_CATEGORIES, _subject_category + + for cat_id in _DEFAULT_CATEGORIES: + subj = _subject_category(cat_id) + assert re.match(r"^[a-z_]+$", subj), f"{cat_id} -> {subj}: must be lower_snake_case" + # Round-trip: removing underscores from the converted form must yield + # the lowercased upstream id. Catches both missed and spurious boundaries. + assert subj.replace("_", "") == cat_id.lower(), f"{cat_id} -> {subj}: round-trip failed" + + def test_empty_category_subject(self): + from central.adapters.eonet import EONETAdapter, _subject_category + + assert _subject_category(None) == "unknown" + assert _subject_category("") == "unknown" + + # Through subject_for: a category with no upstream component yields .unknown.global + adapter = EONETAdapter(_config(), MagicMock(), Path("/tmp/never_used.db")) + event = Event( + id="X", + adapter="eonet", + category="disaster.eonet.unknown", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(), + data={}, + ) + assert adapter.subject_for(event).endswith(".unknown.global") + + def test_dedup_key_includes_latest_geometry_date(self): + from central.adapters.eonet import _dedup_key + + date_a = "2026-05-14T11:04:00Z" + date_b = "2026-05-15T00:00:00Z" + event_id = "EONET_TEST_1" + + key_a = _dedup_key(event_id, date_a) + assert date_a in key_a + assert event_id in key_a + + # Different timeline date -> different dedup key + assert _dedup_key(event_id, date_b) != key_a + + +class TestEONETSettings: + def test_category_allowlist_default_is_full_set(self): + """The default allowlist equals _DEFAULT_CATEGORIES — no parallel literal anywhere.""" + from central.adapters.eonet import EONETSettings, _DEFAULT_CATEGORIES + + assert EONETSettings().category_allowlist == _DEFAULT_CATEGORIES + + +class TestEONETAdapter: + def test_class_attrs_complete(self): + from central.adapters.eonet import EONETAdapter, EONETSettings + + assert EONETAdapter.name == "eonet" + assert isinstance(EONETAdapter.display_name, str) and EONETAdapter.display_name + assert isinstance(EONETAdapter.description, str) and EONETAdapter.description + assert EONETAdapter.settings_schema is EONETSettings + assert EONETAdapter.requires_api_key is None + assert EONETAdapter.api_key_field is None + assert EONETAdapter.wizard_order is None + assert EONETAdapter.default_cadence_s == 1800 + + @pytest.mark.asyncio + async def test_geometry_singular_key(self, tmp_path: Path): + """Adapter reads 'geometry' (singular) per upstream divergence from the spec brief.""" + from central.adapters.eonet import EONETAdapter + + fix = _fixture_json() + # Sanity-check the fixture itself is shaped per upstream: + assert all("geometry" in e for e in fix["events"]), "fixture must use 'geometry' (singular)" + assert all("geometries" not in e for e in fix["events"]), "fixture must not use 'geometries'" + + adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + # If the adapter were reading 'geometries' instead, centroids would be absent. + assert any(e.geo.centroid is not None for e in events) + + @pytest.mark.asyncio + async def test_lonlat_coordinate_order(self, tmp_path: Path): + """Upstream coordinates [lon, lat] (GeoJSON) map directly to Geo.centroid=(lon, lat).""" + from central.adapters.eonet import EONETAdapter + + fix = _fixture_json() + src = next(e for e in fix["events"] if e.get("geometry")) + lon_in, lat_in = src["geometry"][0]["coordinates"] + # Sanity-check orientation of fixture datum so the assertion below isn't trivially passing. + # The first fixture event is in the western/northern hemisphere (Iowa). + assert lon_in < 0, "fixture event 0 should have western-hemisphere lon" + assert 0 < lat_in < 90, "fixture event 0 should have northern lat in (0,90)" + + adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + emitted = next(e for e in events if e.id == src["id"]) + assert emitted.geo.centroid is not None + out_lon, out_lat = emitted.geo.centroid + assert out_lon == lon_in, "first centroid element must equal upstream lon (no swap)" + assert out_lat == lat_in, "second centroid element must equal upstream lat (no swap)" + + @pytest.mark.asyncio + async def test_country_always_global(self, tmp_path: Path): + """Every emitted event has subject suffix '.global' (no country resolution in v1).""" + from central.adapters.eonet import EONETAdapter + + adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert events, "fixture should produce at least one emitted event" + for e in events: + assert adapter.subject_for(e).endswith(".global"), e.category + + @pytest.mark.asyncio + async def test_magnitude_value_surfaced(self, tmp_path: Path): + """magnitudeValue from the most-recent geometry point is surfaced on Event.data.""" + from central.adapters.eonet import EONETAdapter + + adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + with_mag = [e for e in events if e.data.get("magnitudeValue") is not None] + assert with_mag, "fixture should contain at least one event with magnitudeValue" + for e in with_mag: + assert "magnitudeUnit" in e.data + + @pytest.mark.asyncio + async def test_category_allowlist_filters(self, tmp_path: Path): + """Narrowing category_allowlist drops events outside the allowlist.""" + from central.adapters.eonet import EONETAdapter + + fix = _fixture_json() + # Pick the first fixture event's category as the sole allowed category. + target = fix["events"][0]["categories"][0]["id"] + + adapter = EONETAdapter( + _config({"category_allowlist": [target]}), + MagicMock(), + tmp_path / "cursors.db", + ) + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert events, "fixture should include at least one event matching the target category" + for e in events: + assert e.data["category_id"] == target + + @pytest.mark.asyncio + async def test_dedup_suppresses_repeat_poll(self, tmp_path: Path): + """Second poll with identical upstream yields no new events (composite dedup hits).""" + from central.adapters.eonet import EONETAdapter + + adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + assert first_pass + assert second_pass == [] + + @pytest.mark.asyncio + async def test_fall_off_emits_removed_subject(self, tmp_path: Path): + """Event in observed_before but absent from this poll -> removal emitted.""" + from central.adapters.eonet import EONETAdapter, _subject_category + + fix = _fixture_json() + first_event = fix["events"][0] + second_fix = {**fix, "events": fix["events"][1:]} + + adapter = EONETAdapter(_config(), MagicMock(), tmp_path / "cursors.db") + + adapter._fetch = AsyncMock(return_value=_fixture_text()) + await adapter.startup() + first_pass = [e async for e in adapter.poll()] + assert any(e.id == first_event["id"] for e in first_pass) + + adapter._fetch = AsyncMock(return_value=json.dumps(second_fix)) + second_pass = [e async for e in adapter.poll()] + await adapter.shutdown() + + tombstones = [e for e in second_pass if e.category.endswith(".removed")] + assert len(tombstones) == 1 + ts = tombstones[0] + assert ts.id == f"{first_event['id']}:removed" + assert ts.data["reason"] == "missing_from_feed" + + # Subject pattern: subtype BEFORE 'removed' per §8 canonical pattern. + # Subscriber filtering on central.disaster.eonet..> must match the + # removal subject central.disaster.eonet..removed.global. + expected_cat = _subject_category(first_event["categories"][0]["id"]) + subj = adapter.subject_for(ts) + assert subj.startswith(f"central.disaster.eonet.{expected_cat}.") + assert ".removed." in subj + assert subj.endswith(".global")