From 8b2cdeee0b668c7e124562666417e9dfb22e5b7b Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 00:10:39 +0000 Subject: [PATCH] feat(notifications): Phase 2.14 USGS earthquake adapter (new) -- closes Rule 16 Seismic standalone path First net-new environmental adapter (prior phases wired existing ones). Adds meshai/env/usgs_quake.py with USGSQuakeAdapter + USGSQuakeConfig, polling a keyless USGS earthquake GeoJSON feed and emitting one Event per qualifying quake. Establishes the standalone Seismic path (Rule 16); Central becomes the dual-source in v0.4. Adapter (mirrors the fires/usgs-water per-event pattern): - Feed: https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_day.geojson (M2.5+ past day -- M1.0 too noisy, M4.5+ too sparse for the region). Tick 300s. - Filters each feature by min_magnitude AND a geographic bbox. - Per quake: source=usgs_quake, category=earthquake_event, stable event_id = the USGS feature id (e.g. "us6000abcd"), lat/lon from geometry.coordinates[1],[0], region tag from config (default "magic_valley"). - to_event(): category earthquake_event, magnitude-binned severity passed through, group_key = inhibit_key = the USGS id. Defensive None for missing id / coords / magnitude. get_events()/health_status mirror the other adapters. MAGNITUDE -> SEVERITY BINS (as proposed): M < 3.5 -> routine 3.5 <= M < 5.0 -> priority M >= 5.0 -> immediate ('sig' is captured in the event dict as metadata but severity is magnitude-binned -- clearer and matches the spec's primary suggestion.) GEOGRAPHIC BBOX (as proposed) -- [west, south, east, north]: [-115.5, 42.0, -110.0, 45.2] Covers Magic Valley / Twin Falls (SW), the Lost River Range / Borah Peak and Sawtooths (central Idaho, seismically active -- 1983 M6.9), the eastern Snake River Plain / INL, and the Yellowstone caldera (NW Wyoming). An empty bbox disables the geographic filter (accepts all). Wiring: - config.py: new USGSQuakeConfig dataclass; usgs_quake field on EnvironmentalConfig; loader branch in _dict_to_dataclass. - store.py __init__: registers self._adapters["usgs_quake"] when enabled -- this is what grows the live adapter count 7 -> 8. - store._ingest: NO dedicated branch added. usgs_quake is a standard per-event adapter, so the existing generic "else" loop (dedup on (source, event_id) + _emit_event) already routes it. (The swpc/ducting branches are special only because they also maintain status blobs.) - env_feeds.yaml (live /data/config): added usgs_quake block, enabled:true, default bbox/min_mag/region. Rule 17: GUI-editable config (env_feeds.yaml). Rule 18 N/A -- USGS earthquake feed is keyless (no .env entry; .ref credentials has no USGS/ArcGIS/quake key). Rule 16: standalone path established + validated in-container. Tests: tests/test_adapter_usgs_quake.py (15 tests) mirrors the 2.12/2.13 shape -- severity bins, _fetch severity assignment, magnitude filter, geographic filter (in-bbox vs California/out), empty-bbox-accepts-all, dedup id stable across ticks for the same quake id, category, severity pass-through, group_key/inhibit_keys, field population, defensive cases (missing id/coords/magnitude/corrupted -> None), and malformed-feature skipping. _fetch tests patch urlopen with synthetic FeatureCollections. Full suite: 248 passed. Live smoke test (prod container, rebuilt): clean startup, adapter count grew 7 -> 8 ("EnvironmentalStore initialized with 8 adapters"), healthy, no traceback, no usgs_quake errors. In-container standalone tick over the real feed succeeded (is_loaded=true, last_error=null, consecutive_errors=0); the feed returned 54 global M2.5+ quakes, 0 inside the Magic Valley->Yellowstone bbox right now (quiet) -- so no Event is emitted, acceptable, and it exercises the fetch + magnitude + geographic filter + no-emit path on live data. The emission path (in-region quake -> earthquake_event) is unit-validated and uses the same store->bus path emitting live for NWS, traffic, and NIFC fires. Note (.gitignore): line 36 `env/` (a virtualenv pattern under "Virtual environments") collaterally matches meshai/env/, so this NEW file required `git add -f` (untracked files there are otherwise ignored and hidden from status). Existing tracked env files are unaffected. Recommended follow-up: anchor the rule to `/env/` so future net-new env adapters don't need -f. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/config.py | 16 ++ meshai/env/store.py | 4 + meshai/env/usgs_quake.py | 241 ++++++++++++++++++++++++++++++ tests/test_adapter_usgs_quake.py | 242 +++++++++++++++++++++++++++++++ 4 files changed, 503 insertions(+) create mode 100644 meshai/env/usgs_quake.py create mode 100644 tests/test_adapter_usgs_quake.py diff --git a/meshai/config.py b/meshai/config.py index 0f2d09f..8c0cdca 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -377,6 +377,19 @@ class USGSConfig: flood_thresholds: dict = field(default_factory=dict) # {site_id: {flow: X, height: Y}} +@dataclass +class USGSQuakeConfig: + """USGS earthquake feed settings (Phase 2.14).""" + + enabled: bool = False + tick_seconds: int = 300 + feed_url: str = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_day.geojson" + min_magnitude: float = 2.5 + # [west, south, east, north] -- Magic Valley -> Borah Peak -> Yellowstone + bbox: list = field(default_factory=lambda: [-115.5, 42.0, -110.0, 45.2]) + region: str = "magic_valley" + + @dataclass class TomTomConfig: """TomTom traffic flow settings.""" @@ -425,6 +438,7 @@ class EnvironmentalConfig: fires: NICFFiresConfig = field(default_factory=NICFFiresConfig) avalanche: AvalancheConfig = field(default_factory=AvalancheConfig) usgs: USGSConfig = field(default_factory=USGSConfig) + usgs_quake: USGSQuakeConfig = field(default_factory=USGSQuakeConfig) traffic: TomTomConfig = field(default_factory=TomTomConfig) roads511: Roads511Config = field(default_factory=Roads511Config) firms: FIRMSConfig = field(default_factory=FIRMSConfig) @@ -673,6 +687,8 @@ def _dict_to_dataclass(cls, data: dict): kwargs[key] = _dict_to_dataclass(AvalancheConfig, value) elif key == "usgs" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(USGSConfig, value) + elif key == "usgs_quake" and isinstance(value, dict): + kwargs[key] = _dict_to_dataclass(USGSQuakeConfig, value) elif key == "traffic" and isinstance(value, dict): kwargs[key] = _dict_to_dataclass(TomTomConfig, value) elif key == "roads511" and isinstance(value, dict): diff --git a/meshai/env/store.py b/meshai/env/store.py index 9b6ad1d..3aa11a5 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -53,6 +53,10 @@ class EnvironmentalStore: from .usgs import USGSStreamsAdapter self._adapters["usgs"] = USGSStreamsAdapter(config.usgs) + if config.usgs_quake.enabled: + from .usgs_quake import USGSQuakeAdapter + self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake) + if config.traffic.enabled: from .traffic import TomTomTrafficAdapter self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic) diff --git a/meshai/env/usgs_quake.py b/meshai/env/usgs_quake.py new file mode 100644 index 0000000..805e887 --- /dev/null +++ b/meshai/env/usgs_quake.py @@ -0,0 +1,241 @@ +"""USGS earthquake feed adapter (keyless, open API). + +Phase 2.14 -- first net-new environmental adapter. Polls a USGS earthquake +GeoJSON summary feed, filters by magnitude and a geographic bounding box, and +emits one Event per qualifying quake. Standalone path (Rule 16); Central will +be the dual-source in v0.4. +""" + +import json +import logging +import time +from typing import TYPE_CHECKING, Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from meshai.notifications.events import Event, make_event + +if TYPE_CHECKING: + from ..config import USGSQuakeConfig + +logger = logging.getLogger(__name__) + + +class USGSQuakeAdapter: + """USGS earthquake GeoJSON feed polling. + + Feed format (FeatureCollection): each feature has a stable USGS id (e.g. + "us6000abcd"), properties.mag / place / title / sig / time(ms), and + geometry.coordinates [lon, lat, depth_km]. + """ + + def __init__(self, config: "USGSQuakeConfig"): + self._feed_url = config.feed_url + self._min_magnitude = config.min_magnitude + self._bbox = config.bbox or [] # [west, south, east, north] + self._region = config.region or "magic_valley" + self._tick_interval = config.tick_seconds or 300 + self._last_tick = 0.0 + self._events = [] + self._consecutive_errors = 0 + self._last_error = None + self._is_loaded = False + + def tick(self) -> bool: + """Execute one polling tick. + + Returns: + True if data changed + """ + now = time.time() + if now - self._last_tick < self._tick_interval: + return False + self._last_tick = now + return self._fetch() + + def _in_bbox(self, lat, lon) -> bool: + """True if (lat, lon) is inside the configured bbox (or no bbox set).""" + if not self._bbox or len(self._bbox) != 4: + return True # no geographic filter configured -> accept all + west, south, east, north = self._bbox + return (south <= lat <= north) and (west <= lon <= east) + + def _severity_for_mag(self, mag) -> str: + """Magnitude -> severity bins: M<3.5 routine, 3.5-5 priority, >=5 immediate.""" + if mag is None: + return "routine" + if mag >= 5.0: + return "immediate" + if mag >= 3.5: + return "priority" + return "routine" + + def _fetch(self) -> bool: + """Fetch the USGS earthquake GeoJSON feed and filter. + + Returns: + True if the set of qualifying quake ids changed + """ + headers = {"User-Agent": "MeshAI/1.0", "Accept": "application/json"} + + try: + req = Request(self._feed_url, headers=headers) + with urlopen(req, timeout=30) as resp: + data = json.loads(resp.read().decode("utf-8")) + + except HTTPError as e: + logger.warning(f"USGS quake HTTP error: {e.code}") + self._last_error = f"HTTP {e.code}" + self._consecutive_errors += 1 + return False + + except URLError as e: + logger.warning(f"USGS quake connection error: {e.reason}") + self._last_error = str(e.reason) + self._consecutive_errors += 1 + return False + + except Exception as e: + logger.warning(f"USGS quake fetch error: {e}") + self._last_error = str(e) + self._consecutive_errors += 1 + return False + + features = data.get("features", []) + new_events = [] + now = time.time() + + for feature in features: + try: + props = feature.get("properties", {}) or {} + geom = feature.get("geometry") or {} + quake_id = feature.get("id") + mag = props.get("mag") + coords = geom.get("coordinates") or [] + + # Filter: need id, magnitude, coords; magnitude threshold; bbox. + if quake_id is None or mag is None or len(coords) < 2: + continue + if mag < self._min_magnitude: + continue + + lon = coords[0] + lat = coords[1] + depth_km = coords[2] if len(coords) > 2 else None + + if lat is None or lon is None: + continue + if not self._in_bbox(lat, lon): + continue + + place = props.get("place") or "Unknown location" + title = props.get("title") or f"M{mag} - {place}" + quake_time = props.get("time") # epoch ms + ts = quake_time / 1000.0 if quake_time else now + + new_events.append({ + "source": "usgs_quake", + "event_id": quake_id, # stable USGS id, e.g. "us6000abcd" + "event_type": "Earthquake", + "severity": self._severity_for_mag(mag), + "headline": title, + "magnitude": mag, + "place": place, + "depth_km": depth_km, + "sig": props.get("sig"), + "url": props.get("url"), + "region": self._region, + "lat": lat, + "lon": lon, + "quake_time": ts, + "expires": now + 86400, # 24h TTL (matches the day-feed window) + "fetched_at": now, + }) + except Exception: + logger.exception("USGS quake feature parse error") + continue + + old_ids = {e["event_id"] for e in self._events} + new_ids = {e["event_id"] for e in new_events} + changed = old_ids != new_ids + + self._events = new_events + self._consecutive_errors = 0 + self._last_error = None + self._is_loaded = True + + if changed: + logger.info(f"USGS quakes updated: {len(new_events)} in region {self._region}") + + return changed + + def to_event(self, evt: dict) -> Optional["Event"]: + """Translate a stored quake dict into a pipeline Event. + + Category is always earthquake_event; magnitude-binned severity is + passed through. The stable USGS id is the group_key and sole + inhibit_key. + + Args: + evt: Internal event dict from get_events() + + Returns: + Event instance, or None if the dict is missing its id, coords, or + magnitude. + """ + try: + event_id = evt.get("event_id") + if not event_id: + return None + + lat = evt.get("lat") + lon = evt.get("lon") + if lat is None or lon is None: + return None + + mag = evt.get("magnitude") + if mag is None: + return None + + severity = evt.get("severity", "routine") + title = evt.get("headline") or f"M{mag} earthquake" + + summary_parts = [title] + depth = evt.get("depth_km") + if depth is not None: + summary_parts.append(f"depth {round(depth, 1)} km") + summary = " | ".join(summary_parts)[:300] + + return make_event( + source="usgs_quake", + category="earthquake_event", + severity=severity, + title=title, + summary=summary, + timestamp=evt.get("quake_time") or evt.get("fetched_at"), + expires=evt.get("expires"), + lat=lat, + lon=lon, + region=evt.get("region"), + group_key=event_id, + inhibit_keys=[event_id], + ) + except Exception: + logger.exception(f"USGS quake to_event failed for evt: {evt.get('event_id')}") + return None + + def get_events(self) -> list: + """Get current qualifying quake events.""" + return self._events + + @property + def health_status(self) -> dict: + """Get adapter health status.""" + return { + "source": "usgs_quake", + "is_loaded": self._is_loaded, + "last_error": str(self._last_error) if self._last_error else None, + "consecutive_errors": self._consecutive_errors, + "event_count": len(self._events), + "last_fetch": self._last_tick, + } diff --git a/tests/test_adapter_usgs_quake.py b/tests/test_adapter_usgs_quake.py new file mode 100644 index 0000000..7074983 --- /dev/null +++ b/tests/test_adapter_usgs_quake.py @@ -0,0 +1,242 @@ +"""Tests for USGS earthquake adapter Phase 2.14 — fetch/filter + to_event().""" + +import time +from unittest.mock import MagicMock + +import pytest + +from meshai.env.usgs_quake import USGSQuakeAdapter +from meshai.notifications.events import Event + + +# ============================================================ +# FIXTURES / HELPERS +# ============================================================ + +def fresh_adapter(min_magnitude=2.5, bbox=None, region="magic_valley"): + cfg = MagicMock() + cfg.feed_url = "https://example.test/feed.geojson" + cfg.min_magnitude = min_magnitude + cfg.bbox = bbox if bbox is not None else [-115.5, 42.0, -110.0, 45.2] + cfg.region = region + cfg.tick_seconds = 300 + return USGSQuakeAdapter(cfg) + + +@pytest.fixture +def adapter(): + return fresh_adapter() + + +def make_feature(quake_id="us6000abcd", mag=3.0, lon=-114.5, lat=42.6, depth=8.0, + place="10 km N of Twin Falls, ID", time_ms=None): + """A USGS GeoJSON feature (mirrors the real feed).""" + if time_ms is None: + time_ms = int(time.time() * 1000) + return { + "type": "Feature", + "id": quake_id, + "properties": { + "mag": mag, + "place": place, + "time": time_ms, + "sig": int((mag or 0) * 30), + "url": f"https://earthquake.usgs.gov/earthquakes/eventpage/{quake_id}", + "title": f"M {mag} - {place}", + }, + "geometry": {"type": "Point", "coordinates": [lon, lat, depth]}, + } + + +def feed(features): + return {"type": "FeatureCollection", "features": features} + + +def make_quake_event(quake_id="us6000abcd", mag=3.0, severity="routine", + lat=42.6, lon=-114.5, depth=8.0, region="magic_valley"): + """A stored quake dict (mirrors _fetch output) for to_event tests.""" + now = time.time() + return { + "source": "usgs_quake", + "event_id": quake_id, + "event_type": "Earthquake", + "severity": severity, + "headline": f"M {mag} - near Twin Falls", + "magnitude": mag, + "place": "near Twin Falls", + "depth_km": depth, + "sig": int(mag * 30), + "url": "https://earthquake.usgs.gov/x", + "region": region, + "lat": lat, + "lon": lon, + "quake_time": now, + "expires": now + 86400, + "fetched_at": now, + } + + +def run_fetch(adapter, features): + """Patch urlopen to return a synthetic feed and run _fetch.""" + import meshai.env.usgs_quake as mod + + class FakeResp: + def __init__(self, payload): + self._p = payload.encode() + def read(self): + return self._p + def __enter__(self): + return self + def __exit__(self, *a): + return False + + import json as _json + payload = _json.dumps(feed(features)) + orig = mod.urlopen + mod.urlopen = lambda req, timeout=30: FakeResp(payload) + try: + adapter._fetch() + finally: + mod.urlopen = orig + return adapter.get_events() + + +# ============================================================ +# SEVERITY BIN TESTS +# ============================================================ + +def test_severity_bins(adapter): + """M<3.5 routine, 3.5-5 priority, >=5 immediate.""" + assert adapter._severity_for_mag(2.9) == "routine" + assert adapter._severity_for_mag(3.4) == "routine" + assert adapter._severity_for_mag(3.5) == "priority" + assert adapter._severity_for_mag(4.9) == "priority" + assert adapter._severity_for_mag(5.0) == "immediate" + assert adapter._severity_for_mag(6.5) == "immediate" + + +def test_fetch_assigns_severity(adapter): + evs = run_fetch(adapter, [ + make_feature("q1", mag=3.0, lat=42.6, lon=-114.5), + make_feature("q2", mag=4.2, lat=43.0, lon=-113.0), + make_feature("q3", mag=5.5, lat=44.4, lon=-110.6), + ]) + by_id = {e["event_id"]: e["severity"] for e in evs} + assert by_id == {"q1": "routine", "q2": "priority", "q3": "immediate"} + + +# ============================================================ +# MAGNITUDE + GEOGRAPHIC FILTER TESTS +# ============================================================ + +def test_magnitude_filter(adapter): + """Quakes below min_magnitude are dropped.""" + evs = run_fetch(adapter, [ + make_feature("small", mag=2.0, lat=42.6, lon=-114.5), # below 2.5 + make_feature("ok", mag=2.6, lat=42.6, lon=-114.5), + ]) + ids = {e["event_id"] for e in evs} + assert ids == {"ok"} + + +def test_geographic_filter(adapter): + """Quakes outside the bbox are dropped.""" + evs = run_fetch(adapter, [ + make_feature("in", mag=3.0, lat=42.6, lon=-114.5), # Magic Valley -> in + make_feature("ca", mag=3.0, lat=40.6, lon=-121.5), # California -> out + make_feature("yellowstone", mag=3.0, lat=44.4, lon=-110.6), # in + ]) + ids = {e["event_id"] for e in evs} + assert ids == {"in", "yellowstone"} + + +def test_no_bbox_accepts_all(): + """An empty bbox disables the geographic filter.""" + a = fresh_adapter(bbox=[]) + evs = run_fetch(a, [ + make_feature("ca", mag=3.0, lat=40.6, lon=-121.5), + make_feature("id", mag=3.0, lat=42.6, lon=-114.5), + ]) + assert {e["event_id"] for e in evs} == {"ca", "id"} + + +# ============================================================ +# DEDUP REGRESSION TEST +# ============================================================ + +def test_dedup_id_stable_across_ticks(adapter): + """A quake keeps its USGS id as event_id across ticks (store dedups it).""" + e1 = run_fetch(adapter, [make_feature("us6000abcd", mag=3.0, lat=42.6, lon=-114.5)])[0]["event_id"] + time.sleep(0.01) + e2 = run_fetch(adapter, [make_feature("us6000abcd", mag=3.1, lat=42.6, lon=-114.5)])[0]["event_id"] + assert e1 == e2 == "us6000abcd" + + +# ============================================================ +# to_event() — CATEGORY / KEYS / FIELDS +# ============================================================ + +def test_category_is_earthquake_event(adapter): + event = adapter.to_event(make_quake_event()) + assert event is not None + assert event.category == "earthquake_event" + + +def test_severity_passes_through(adapter): + for sev in ["routine", "priority", "immediate"]: + event = adapter.to_event(make_quake_event(severity=sev)) + assert event.severity == sev + + +def test_group_key_is_usgs_id(adapter): + event = adapter.to_event(make_quake_event(quake_id="us6000xyz")) + assert event.group_key == "us6000xyz" + assert event.inhibit_keys == ["us6000xyz"] + + +def test_populates_core_fields(adapter): + evt = make_quake_event(lat=42.61, lon=-114.48, region="magic_valley") + event = adapter.to_event(evt) + assert event.source == "usgs_quake" + assert event.lat == 42.61 + assert event.lon == -114.48 + assert event.region == "magic_valley" + assert event.expires == evt["expires"] + assert event.timestamp == evt["quake_time"] + assert event.id + + +# ============================================================ +# DEFENSIVE TESTS +# ============================================================ + +def test_missing_id_returns_none(adapter): + evt = make_quake_event() + evt["event_id"] = None + assert adapter.to_event(evt) is None + + +def test_missing_coords_returns_none(adapter): + evt = make_quake_event() + evt["lat"] = None + assert adapter.to_event(evt) is None + + +def test_missing_magnitude_returns_none(adapter): + evt = make_quake_event() + evt["magnitude"] = None + assert adapter.to_event(evt) is None + + +def test_does_not_raise_on_corrupted_dict(adapter): + assert adapter.to_event({"garbage": True}) is None + + +def test_fetch_skips_malformed_features(adapter): + """Features missing id/mag/coords are skipped without raising.""" + evs = run_fetch(adapter, [ + {"id": "noprops", "geometry": {"coordinates": [-114.5, 42.6, 5]}}, # no mag + {"id": "nogeom", "properties": {"mag": 3.0}}, # no coords + make_feature("good", mag=3.0, lat=42.6, lon=-114.5), + ]) + assert {e["event_id"] for e in evs} == {"good"}