feat(notifications): Phase 2.14 USGS earthquake adapter (new) -- closes Rule 16 Seismic standalone path

First net-new environmental adapter (prior phases wired existing ones).
Adds meshai/env/usgs_quake.py with USGSQuakeAdapter + USGSQuakeConfig,
polling a keyless USGS earthquake GeoJSON feed and emitting one Event per
qualifying quake. Establishes the standalone Seismic path (Rule 16);
Central becomes the dual-source in v0.4.

Adapter (mirrors the fires/usgs-water per-event pattern):
- Feed: https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_day.geojson
  (M2.5+ past day -- M1.0 too noisy, M4.5+ too sparse for the region).
  Tick 300s.
- Filters each feature by min_magnitude AND a geographic bbox.
- Per quake: source=usgs_quake, category=earthquake_event, stable
  event_id = the USGS feature id (e.g. "us6000abcd"), lat/lon from
  geometry.coordinates[1],[0], region tag from config (default
  "magic_valley").
- to_event(): category earthquake_event, magnitude-binned severity passed
  through, group_key = inhibit_key = the USGS id. Defensive None for
  missing id / coords / magnitude. get_events()/health_status mirror the
  other adapters.

MAGNITUDE -> SEVERITY BINS (as proposed):
  M < 3.5        -> routine
  3.5 <= M < 5.0 -> priority
  M >= 5.0       -> immediate
('sig' is captured in the event dict as metadata but severity is
magnitude-binned -- clearer and matches the spec's primary suggestion.)

GEOGRAPHIC BBOX (as proposed) -- [west, south, east, north]:
  [-115.5, 42.0, -110.0, 45.2]
Covers Magic Valley / Twin Falls (SW), the Lost River Range / Borah Peak
and Sawtooths (central Idaho, seismically active -- 1983 M6.9), the eastern
Snake River Plain / INL, and the Yellowstone caldera (NW Wyoming). An empty
bbox disables the geographic filter (accepts all).

Wiring:
- config.py: new USGSQuakeConfig dataclass; usgs_quake field on
  EnvironmentalConfig; loader branch in _dict_to_dataclass.
- store.py __init__: registers self._adapters["usgs_quake"] when enabled --
  this is what grows the live adapter count 7 -> 8.
- store._ingest: NO dedicated branch added. usgs_quake is a standard
  per-event adapter, so the existing generic "else" loop (dedup on
  (source, event_id) + _emit_event) already routes it. (The swpc/ducting
  branches are special only because they also maintain status blobs.)
- env_feeds.yaml (live /data/config): added usgs_quake block, enabled:true,
  default bbox/min_mag/region.

Rule 17: GUI-editable config (env_feeds.yaml). Rule 18 N/A -- USGS
earthquake feed is keyless (no .env entry; .ref credentials has no
USGS/ArcGIS/quake key). Rule 16: standalone path established + validated
in-container.

Tests: tests/test_adapter_usgs_quake.py (15 tests) mirrors the 2.12/2.13
shape -- severity bins, _fetch severity assignment, magnitude filter,
geographic filter (in-bbox vs California/out), empty-bbox-accepts-all,
dedup id stable across ticks for the same quake id, category, severity
pass-through, group_key/inhibit_keys, field population, defensive cases
(missing id/coords/magnitude/corrupted -> None), and malformed-feature
skipping. _fetch tests patch urlopen with synthetic FeatureCollections.
Full suite: 248 passed.

Live smoke test (prod container, rebuilt): clean startup, adapter count
grew 7 -> 8 ("EnvironmentalStore initialized with 8 adapters"), healthy,
no traceback, no usgs_quake errors. In-container standalone tick over the
real feed succeeded (is_loaded=true, last_error=null,
consecutive_errors=0); the feed returned 54 global M2.5+ quakes, 0 inside
the Magic Valley->Yellowstone bbox right now (quiet) -- so no Event is
emitted, acceptable, and it exercises the fetch + magnitude + geographic
filter + no-emit path on live data. The emission path (in-region quake ->
earthquake_event) is unit-validated and uses the same store->bus path
emitting live for NWS, traffic, and NIFC fires.

Note (.gitignore): line 36 `env/` (a virtualenv pattern under "Virtual
environments") collaterally matches meshai/env/, so this NEW file required
`git add -f` (untracked files there are otherwise ignored and hidden from
status). Existing tracked env files are unaffected. Recommended follow-up:
anchor the rule to `/env/` so future net-new env adapters don't need -f.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-28 00:10:39 +00:00
commit 8b2cdeee0b
4 changed files with 503 additions and 0 deletions

View file

@ -377,6 +377,19 @@ class USGSConfig:
flood_thresholds: dict = field(default_factory=dict) # {site_id: {flow: X, height: Y}}
@dataclass
class USGSQuakeConfig:
"""USGS earthquake feed settings (Phase 2.14)."""
enabled: bool = False
tick_seconds: int = 300
feed_url: str = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_day.geojson"
min_magnitude: float = 2.5
# [west, south, east, north] -- Magic Valley -> Borah Peak -> Yellowstone
bbox: list = field(default_factory=lambda: [-115.5, 42.0, -110.0, 45.2])
region: str = "magic_valley"
@dataclass
class TomTomConfig:
"""TomTom traffic flow settings."""
@ -425,6 +438,7 @@ class EnvironmentalConfig:
fires: NICFFiresConfig = field(default_factory=NICFFiresConfig)
avalanche: AvalancheConfig = field(default_factory=AvalancheConfig)
usgs: USGSConfig = field(default_factory=USGSConfig)
usgs_quake: USGSQuakeConfig = field(default_factory=USGSQuakeConfig)
traffic: TomTomConfig = field(default_factory=TomTomConfig)
roads511: Roads511Config = field(default_factory=Roads511Config)
firms: FIRMSConfig = field(default_factory=FIRMSConfig)
@ -673,6 +687,8 @@ def _dict_to_dataclass(cls, data: dict):
kwargs[key] = _dict_to_dataclass(AvalancheConfig, value)
elif key == "usgs" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(USGSConfig, value)
elif key == "usgs_quake" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(USGSQuakeConfig, value)
elif key == "traffic" and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(TomTomConfig, value)
elif key == "roads511" and isinstance(value, dict):

4
meshai/env/store.py vendored
View file

@ -53,6 +53,10 @@ class EnvironmentalStore:
from .usgs import USGSStreamsAdapter
self._adapters["usgs"] = USGSStreamsAdapter(config.usgs)
if config.usgs_quake.enabled:
from .usgs_quake import USGSQuakeAdapter
self._adapters["usgs_quake"] = USGSQuakeAdapter(config.usgs_quake)
if config.traffic.enabled:
from .traffic import TomTomTrafficAdapter
self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic)

241
meshai/env/usgs_quake.py vendored Normal file
View file

@ -0,0 +1,241 @@
"""USGS earthquake feed adapter (keyless, open API).
Phase 2.14 -- first net-new environmental adapter. Polls a USGS earthquake
GeoJSON summary feed, filters by magnitude and a geographic bounding box, and
emits one Event per qualifying quake. Standalone path (Rule 16); Central will
be the dual-source in v0.4.
"""
import json
import logging
import time
from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING:
from ..config import USGSQuakeConfig
logger = logging.getLogger(__name__)
class USGSQuakeAdapter:
"""USGS earthquake GeoJSON feed polling.
Feed format (FeatureCollection): each feature has a stable USGS id (e.g.
"us6000abcd"), properties.mag / place / title / sig / time(ms), and
geometry.coordinates [lon, lat, depth_km].
"""
def __init__(self, config: "USGSQuakeConfig"):
self._feed_url = config.feed_url
self._min_magnitude = config.min_magnitude
self._bbox = config.bbox or [] # [west, south, east, north]
self._region = config.region or "magic_valley"
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _in_bbox(self, lat, lon) -> bool:
"""True if (lat, lon) is inside the configured bbox (or no bbox set)."""
if not self._bbox or len(self._bbox) != 4:
return True # no geographic filter configured -> accept all
west, south, east, north = self._bbox
return (south <= lat <= north) and (west <= lon <= east)
def _severity_for_mag(self, mag) -> str:
"""Magnitude -> severity bins: M<3.5 routine, 3.5-5 priority, >=5 immediate."""
if mag is None:
return "routine"
if mag >= 5.0:
return "immediate"
if mag >= 3.5:
return "priority"
return "routine"
def _fetch(self) -> bool:
"""Fetch the USGS earthquake GeoJSON feed and filter.
Returns:
True if the set of qualifying quake ids changed
"""
headers = {"User-Agent": "MeshAI/1.0", "Accept": "application/json"}
try:
req = Request(self._feed_url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"USGS quake HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"USGS quake connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"USGS quake fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
features = data.get("features", [])
new_events = []
now = time.time()
for feature in features:
try:
props = feature.get("properties", {}) or {}
geom = feature.get("geometry") or {}
quake_id = feature.get("id")
mag = props.get("mag")
coords = geom.get("coordinates") or []
# Filter: need id, magnitude, coords; magnitude threshold; bbox.
if quake_id is None or mag is None or len(coords) < 2:
continue
if mag < self._min_magnitude:
continue
lon = coords[0]
lat = coords[1]
depth_km = coords[2] if len(coords) > 2 else None
if lat is None or lon is None:
continue
if not self._in_bbox(lat, lon):
continue
place = props.get("place") or "Unknown location"
title = props.get("title") or f"M{mag} - {place}"
quake_time = props.get("time") # epoch ms
ts = quake_time / 1000.0 if quake_time else now
new_events.append({
"source": "usgs_quake",
"event_id": quake_id, # stable USGS id, e.g. "us6000abcd"
"event_type": "Earthquake",
"severity": self._severity_for_mag(mag),
"headline": title,
"magnitude": mag,
"place": place,
"depth_km": depth_km,
"sig": props.get("sig"),
"url": props.get("url"),
"region": self._region,
"lat": lat,
"lon": lon,
"quake_time": ts,
"expires": now + 86400, # 24h TTL (matches the day-feed window)
"fetched_at": now,
})
except Exception:
logger.exception("USGS quake feature parse error")
continue
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"USGS quakes updated: {len(new_events)} in region {self._region}")
return changed
def to_event(self, evt: dict) -> Optional["Event"]:
"""Translate a stored quake dict into a pipeline Event.
Category is always earthquake_event; magnitude-binned severity is
passed through. The stable USGS id is the group_key and sole
inhibit_key.
Args:
evt: Internal event dict from get_events()
Returns:
Event instance, or None if the dict is missing its id, coords, or
magnitude.
"""
try:
event_id = evt.get("event_id")
if not event_id:
return None
lat = evt.get("lat")
lon = evt.get("lon")
if lat is None or lon is None:
return None
mag = evt.get("magnitude")
if mag is None:
return None
severity = evt.get("severity", "routine")
title = evt.get("headline") or f"M{mag} earthquake"
summary_parts = [title]
depth = evt.get("depth_km")
if depth is not None:
summary_parts.append(f"depth {round(depth, 1)} km")
summary = " | ".join(summary_parts)[:300]
return make_event(
source="usgs_quake",
category="earthquake_event",
severity=severity,
title=title,
summary=summary,
timestamp=evt.get("quake_time") or evt.get("fetched_at"),
expires=evt.get("expires"),
lat=lat,
lon=lon,
region=evt.get("region"),
group_key=event_id,
inhibit_keys=[event_id],
)
except Exception:
logger.exception(f"USGS quake to_event failed for evt: {evt.get('event_id')}")
return None
def get_events(self) -> list:
"""Get current qualifying quake events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "usgs_quake",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
}