refactor: simplify severity to 3 levels (routine/priority/immediate)

- Replace 6-level system (info/advisory/watch/warning/critical/emergency)
  with 3-level military precedence (routine/priority/immediate)
- Every adapter remapped: NWS, NIFC, FIRMS, USGS, SWPC, avalanche,
  traffic, 511, mesh alerts
- is_critical flag removed — severity covers it
- Quiet hours: suppress routine only, priority+immediate always deliver
- Dashboard: blue/amber/red for routine/priority/immediate
- Fix hex node ID parsing in Mesh DM channel (!23261b70 format)
This commit is contained in:
zvx-echo6 2026-05-13 19:05:50 -06:00
commit 49f2838048
17 changed files with 3285 additions and 3265 deletions

View file

@ -129,13 +129,13 @@ class AvalancheAdapter:
level_key, level_name = self.DANGER_LEVELS.get(danger_level, ("no_rating", "No Rating"))
if danger_level >= 4:
severity = "warning"
severity = "priority"
elif danger_level >= 3:
severity = "watch"
severity = "routine"
elif danger_level >= 2:
severity = "advisory"
severity = "routine"
else:
severity = "info"
severity = "routine"
# Compute centroid
geom = feature.get("geometry")

8
meshai/env/fires.py vendored
View file

@ -109,13 +109,13 @@ class NICFFiresAdapter:
# Severity based on distance
if distance_km is not None:
if distance_km < 25:
severity = "warning"
severity = "priority"
elif distance_km < 50:
severity = "watch"
severity = "routine"
else:
severity = "advisory"
severity = "routine"
else:
severity = "advisory"
severity = "routine"
# Format headline
headline = f"{name} -- {int(acres):,} ac, {int(pct_contained)}% contained"

730
meshai/env/firms.py vendored
View file

@ -1,365 +1,365 @@
"""NASA FIRMS satellite fire hotspot adapter."""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import FIRMSConfig
logger = logging.getLogger(__name__)
class FIRMSAdapter:
"""NASA FIRMS satellite fire hotspot polling.
Detects fire hotspots from satellite data (MODIS, VIIRS) typically
hours before NIFC publishes official perimeters. Early warning.
API: https://firms.modaps.eosdis.nasa.gov/api/area/csv/{MAP_KEY}/{SOURCE}/{BBOX}/{DAY_RANGE}
"""
BASE_URL = "https://firms.modaps.eosdis.nasa.gov/api/area/csv"
def __init__(self, config: "FIRMSConfig", region_anchors: list = None, fires_adapter=None):
self._map_key = config.map_key
self._source = config.source or "VIIRS_SNPP_NRT"
self._bbox = config.bbox # [west, south, east, north]
self._day_range = config.day_range or 1
self._tick_interval = config.tick_seconds or 1800
self._confidence_min = config.confidence_min or "nominal"
self._proximity_km = config.proximity_km or 10.0 # km to match known fire
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# For cross-referencing
self._region_anchors = region_anchors or []
self._fires_adapter = fires_adapter # NICFFiresAdapter for cross-ref
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
if not self._map_key:
if not self._last_error:
logger.warning("FIRMS: No MAP_KEY configured, skipping")
self._last_error = "No MAP_KEY configured"
return False
if not self._bbox or len(self._bbox) != 4:
if not self._last_error:
logger.warning("FIRMS: No valid bbox configured, skipping")
self._last_error = "No valid bbox configured"
return False
return self._fetch()
def _fetch(self) -> bool:
"""Fetch fire hotspots from NASA FIRMS.
Returns:
True if data changed
"""
# Format bbox as west,south,east,north
bbox_str = ",".join(str(c) for c in self._bbox)
url = f"{self.BASE_URL}/{self._map_key}/{self._source}/{bbox_str}/{self._day_range}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "text/csv",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
csv_data = resp.read().decode("utf-8")
except HTTPError as e:
if e.code == 401:
logger.error("FIRMS: Invalid MAP_KEY, disabling adapter")
self._last_error = "Invalid MAP_KEY"
self._consecutive_errors = 999 # Disable
return False
logger.warning(f"FIRMS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"FIRMS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"FIRMS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse CSV response
new_events = self._parse_csv(csv_data)
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
new_ignitions = sum(1 for e in new_events if e.get("properties", {}).get("new_ignition"))
logger.info(f"FIRMS hotspots updated: {len(new_events)} total, {new_ignitions} potential new ignitions")
return changed
def _parse_csv(self, csv_data: str) -> list:
"""Parse FIRMS CSV response into events."""
lines = csv_data.strip().split("\n")
if len(lines) < 2:
return []
# Parse header
header = lines[0].split(",")
header_map = {col.strip().lower(): i for i, col in enumerate(header)}
# Required columns
lat_idx = header_map.get("latitude")
lon_idx = header_map.get("longitude")
conf_idx = header_map.get("confidence")
frp_idx = header_map.get("frp") # Fire Radiative Power
acq_date_idx = header_map.get("acq_date")
acq_time_idx = header_map.get("acq_time")
bright_idx = header_map.get("bright_ti4") or header_map.get("brightness")
if lat_idx is None or lon_idx is None:
logger.warning("FIRMS CSV missing required columns")
return []
events = []
now = time.time()
# Confidence mapping
conf_values = {"low": 1, "l": 1, "nominal": 2, "n": 2, "high": 3, "h": 3}
min_conf = conf_values.get(self._confidence_min.lower(), 2)
# Get known fire locations for cross-referencing
known_fires = self._get_known_fires()
for line in lines[1:]:
cols = line.split(",")
if len(cols) < max(filter(None, [lat_idx, lon_idx, conf_idx])) + 1:
continue
try:
lat = float(cols[lat_idx])
lon = float(cols[lon_idx])
except (ValueError, IndexError):
continue
# Parse confidence
conf_raw = cols[conf_idx].strip() if conf_idx is not None and conf_idx < len(cols) else "n"
conf_value = conf_values.get(conf_raw.lower(), 2)
# Filter by confidence
if conf_value < min_conf:
continue
# Parse FRP (fire radiative power in MW)
frp = None
if frp_idx is not None and frp_idx < len(cols):
try:
frp = float(cols[frp_idx])
except ValueError:
pass
# Parse brightness temperature
brightness = None
if bright_idx is not None and bright_idx < len(cols):
try:
brightness = float(cols[bright_idx])
except ValueError:
pass
# Parse acquisition datetime
acq_date = cols[acq_date_idx].strip() if acq_date_idx is not None and acq_date_idx < len(cols) else ""
acq_time = cols[acq_time_idx].strip() if acq_time_idx is not None and acq_time_idx < len(cols) else ""
# Create unique ID from position and time
event_id = f"firms_{lat:.4f}_{lon:.4f}_{acq_date}_{acq_time}"
# Check if near known fire
near_fire, fire_name, distance_to_fire = self._check_near_known_fire(lat, lon, known_fires)
# Determine severity
if not near_fire:
# Potential new ignition
severity = "watch"
new_ignition = True
headline = f"NEW HOTSPOT detected"
else:
# Near known fire
severity = "advisory"
new_ignition = False
headline = f"Hotspot near {fire_name}"
# Bump severity for high FRP
if frp is not None and frp > 100:
if severity == "advisory":
severity = "watch"
elif severity == "watch":
severity = "warning"
headline += f" ({int(frp)} MW)"
# Compute proximity to region anchors
distance_km, nearest_anchor = self._nearest_anchor_distance(lat, lon)
if distance_km is not None and nearest_anchor:
headline += f" ({int(distance_km)} km from {nearest_anchor})"
event = {
"source": "firms",
"event_id": event_id,
"event_type": "Fire Hotspot",
"severity": severity,
"headline": headline,
"lat": lat,
"lon": lon,
"expires": now + 21600, # 6 hour TTL
"fetched_at": now,
"properties": {
"new_ignition": new_ignition,
"confidence": conf_raw,
"frp": frp,
"brightness": brightness,
"acq_date": acq_date,
"acq_time": acq_time,
"near_fire": fire_name if near_fire else None,
"distance_to_fire_km": distance_to_fire,
"distance_km": distance_km,
"nearest_anchor": nearest_anchor,
},
}
events.append(event)
return events
def _get_known_fires(self) -> list:
"""Get known fire locations from NIFC adapter."""
if not self._fires_adapter:
return []
fires = self._fires_adapter.get_events()
return [
{
"name": f.get("name", "Unknown"),
"lat": f.get("lat"),
"lon": f.get("lon"),
}
for f in fires
if f.get("lat") is not None and f.get("lon") is not None
]
def _check_near_known_fire(self, lat: float, lon: float, known_fires: list) -> tuple:
"""Check if hotspot is near a known fire.
Returns:
(is_near, fire_name, distance_km)
"""
if not known_fires:
return (False, None, None)
from ..geo import haversine_distance
for fire in known_fires:
fire_lat = fire.get("lat")
fire_lon = fire.get("lon")
if fire_lat is None or fire_lon is None:
continue
# haversine_distance returns miles, convert to km
dist_miles = haversine_distance(lat, lon, fire_lat, fire_lon)
dist_km = dist_miles * 1.60934
if dist_km <= self._proximity_km:
return (True, fire.get("name"), dist_km)
return (False, None, None)
def _nearest_anchor_distance(self, lat: float, lon: float) -> tuple:
"""Find distance to nearest region anchor.
Returns:
(distance_km, anchor_name) or (None, None)
"""
if not self._region_anchors:
return (None, None)
from ..geo import haversine_distance
min_dist = float("inf")
nearest_name = None
for anchor in self._region_anchors:
anchor_lat = anchor.get("lat") if isinstance(anchor, dict) else getattr(anchor, "lat", None)
anchor_lon = anchor.get("lon") if isinstance(anchor, dict) else getattr(anchor, "lon", None)
anchor_name = anchor.get("name") if isinstance(anchor, dict) else getattr(anchor, "name", "Unknown")
if anchor_lat is None or anchor_lon is None:
continue
# haversine_distance returns miles, convert to km
dist_miles = haversine_distance(lat, lon, anchor_lat, anchor_lon)
dist_km = dist_miles * 1.60934
if dist_km < min_dist:
min_dist = dist_km
nearest_name = anchor_name
if min_dist < float("inf"):
return (min_dist, nearest_name)
return (None, None)
def get_events(self) -> list:
"""Get current hotspot events."""
return self._events
def get_new_ignitions(self) -> list:
"""Get only potential new ignitions (not near known fires)."""
return [e for e in self._events if e.get("properties", {}).get("new_ignition")]
@property
def health_status(self) -> dict:
"""Get adapter health status."""
new_ignitions = len(self.get_new_ignitions())
return {
"source": "firms",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"new_ignitions": new_ignitions,
"last_fetch": self._last_tick,
}
"""NASA FIRMS satellite fire hotspot adapter."""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import FIRMSConfig
logger = logging.getLogger(__name__)
class FIRMSAdapter:
"""NASA FIRMS satellite fire hotspot polling.
Detects fire hotspots from satellite data (MODIS, VIIRS) typically
hours before NIFC publishes official perimeters. Early warning.
API: https://firms.modaps.eosdis.nasa.gov/api/area/csv/{MAP_KEY}/{SOURCE}/{BBOX}/{DAY_RANGE}
"""
BASE_URL = "https://firms.modaps.eosdis.nasa.gov/api/area/csv"
def __init__(self, config: "FIRMSConfig", region_anchors: list = None, fires_adapter=None):
self._map_key = config.map_key
self._source = config.source or "VIIRS_SNPP_NRT"
self._bbox = config.bbox # [west, south, east, north]
self._day_range = config.day_range or 1
self._tick_interval = config.tick_seconds or 1800
self._confidence_min = config.confidence_min or "nominal"
self._proximity_km = config.proximity_km or 10.0 # km to match known fire
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# For cross-referencing
self._region_anchors = region_anchors or []
self._fires_adapter = fires_adapter # NICFFiresAdapter for cross-ref
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
if not self._map_key:
if not self._last_error:
logger.warning("FIRMS: No MAP_KEY configured, skipping")
self._last_error = "No MAP_KEY configured"
return False
if not self._bbox or len(self._bbox) != 4:
if not self._last_error:
logger.warning("FIRMS: No valid bbox configured, skipping")
self._last_error = "No valid bbox configured"
return False
return self._fetch()
def _fetch(self) -> bool:
"""Fetch fire hotspots from NASA FIRMS.
Returns:
True if data changed
"""
# Format bbox as west,south,east,north
bbox_str = ",".join(str(c) for c in self._bbox)
url = f"{self.BASE_URL}/{self._map_key}/{self._source}/{bbox_str}/{self._day_range}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "text/csv",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
csv_data = resp.read().decode("utf-8")
except HTTPError as e:
if e.code == 401:
logger.error("FIRMS: Invalid MAP_KEY, disabling adapter")
self._last_error = "Invalid MAP_KEY"
self._consecutive_errors = 999 # Disable
return False
logger.warning(f"FIRMS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"FIRMS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"FIRMS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse CSV response
new_events = self._parse_csv(csv_data)
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
new_ignitions = sum(1 for e in new_events if e.get("properties", {}).get("new_ignition"))
logger.info(f"FIRMS hotspots updated: {len(new_events)} total, {new_ignitions} potential new ignitions")
return changed
def _parse_csv(self, csv_data: str) -> list:
"""Parse FIRMS CSV response into events."""
lines = csv_data.strip().split("\n")
if len(lines) < 2:
return []
# Parse header
header = lines[0].split(",")
header_map = {col.strip().lower(): i for i, col in enumerate(header)}
# Required columns
lat_idx = header_map.get("latitude")
lon_idx = header_map.get("longitude")
conf_idx = header_map.get("confidence")
frp_idx = header_map.get("frp") # Fire Radiative Power
acq_date_idx = header_map.get("acq_date")
acq_time_idx = header_map.get("acq_time")
bright_idx = header_map.get("bright_ti4") or header_map.get("brightness")
if lat_idx is None or lon_idx is None:
logger.warning("FIRMS CSV missing required columns")
return []
events = []
now = time.time()
# Confidence mapping
conf_values = {"low": 1, "l": 1, "nominal": 2, "n": 2, "high": 3, "h": 3}
min_conf = conf_values.get(self._confidence_min.lower(), 2)
# Get known fire locations for cross-referencing
known_fires = self._get_known_fires()
for line in lines[1:]:
cols = line.split(",")
if len(cols) < max(filter(None, [lat_idx, lon_idx, conf_idx])) + 1:
continue
try:
lat = float(cols[lat_idx])
lon = float(cols[lon_idx])
except (ValueError, IndexError):
continue
# Parse confidence
conf_raw = cols[conf_idx].strip() if conf_idx is not None and conf_idx < len(cols) else "n"
conf_value = conf_values.get(conf_raw.lower(), 2)
# Filter by confidence
if conf_value < min_conf:
continue
# Parse FRP (fire radiative power in MW)
frp = None
if frp_idx is not None and frp_idx < len(cols):
try:
frp = float(cols[frp_idx])
except ValueError:
pass
# Parse brightness temperature
brightness = None
if bright_idx is not None and bright_idx < len(cols):
try:
brightness = float(cols[bright_idx])
except ValueError:
pass
# Parse acquisition datetime
acq_date = cols[acq_date_idx].strip() if acq_date_idx is not None and acq_date_idx < len(cols) else ""
acq_time = cols[acq_time_idx].strip() if acq_time_idx is not None and acq_time_idx < len(cols) else ""
# Create unique ID from position and time
event_id = f"firms_{lat:.4f}_{lon:.4f}_{acq_date}_{acq_time}"
# Check if near known fire
near_fire, fire_name, distance_to_fire = self._check_near_known_fire(lat, lon, known_fires)
# Determine severity
if not near_fire:
# Potential new ignition
severity = "routine"
new_ignition = True
headline = f"NEW HOTSPOT detected"
else:
# Near known fire
severity = "routine"
new_ignition = False
headline = f"Hotspot near {fire_name}"
# Bump severity for high FRP
if frp is not None and frp > 100:
if severity == "routine":
severity = "routine"
elif severity == "routine":
severity = "priority"
headline += f" ({int(frp)} MW)"
# Compute proximity to region anchors
distance_km, nearest_anchor = self._nearest_anchor_distance(lat, lon)
if distance_km is not None and nearest_anchor:
headline += f" ({int(distance_km)} km from {nearest_anchor})"
event = {
"source": "firms",
"event_id": event_id,
"event_type": "Fire Hotspot",
"severity": severity,
"headline": headline,
"lat": lat,
"lon": lon,
"expires": now + 21600, # 6 hour TTL
"fetched_at": now,
"properties": {
"new_ignition": new_ignition,
"confidence": conf_raw,
"frp": frp,
"brightness": brightness,
"acq_date": acq_date,
"acq_time": acq_time,
"near_fire": fire_name if near_fire else None,
"distance_to_fire_km": distance_to_fire,
"distance_km": distance_km,
"nearest_anchor": nearest_anchor,
},
}
events.append(event)
return events
def _get_known_fires(self) -> list:
"""Get known fire locations from NIFC adapter."""
if not self._fires_adapter:
return []
fires = self._fires_adapter.get_events()
return [
{
"name": f.get("name", "Unknown"),
"lat": f.get("lat"),
"lon": f.get("lon"),
}
for f in fires
if f.get("lat") is not None and f.get("lon") is not None
]
def _check_near_known_fire(self, lat: float, lon: float, known_fires: list) -> tuple:
"""Check if hotspot is near a known fire.
Returns:
(is_near, fire_name, distance_km)
"""
if not known_fires:
return (False, None, None)
from ..geo import haversine_distance
for fire in known_fires:
fire_lat = fire.get("lat")
fire_lon = fire.get("lon")
if fire_lat is None or fire_lon is None:
continue
# haversine_distance returns miles, convert to km
dist_miles = haversine_distance(lat, lon, fire_lat, fire_lon)
dist_km = dist_miles * 1.60934
if dist_km <= self._proximity_km:
return (True, fire.get("name"), dist_km)
return (False, None, None)
def _nearest_anchor_distance(self, lat: float, lon: float) -> tuple:
"""Find distance to nearest region anchor.
Returns:
(distance_km, anchor_name) or (None, None)
"""
if not self._region_anchors:
return (None, None)
from ..geo import haversine_distance
min_dist = float("inf")
nearest_name = None
for anchor in self._region_anchors:
anchor_lat = anchor.get("lat") if isinstance(anchor, dict) else getattr(anchor, "lat", None)
anchor_lon = anchor.get("lon") if isinstance(anchor, dict) else getattr(anchor, "lon", None)
anchor_name = anchor.get("name") if isinstance(anchor, dict) else getattr(anchor, "name", "Unknown")
if anchor_lat is None or anchor_lon is None:
continue
# haversine_distance returns miles, convert to km
dist_miles = haversine_distance(lat, lon, anchor_lat, anchor_lon)
dist_km = dist_miles * 1.60934
if dist_km < min_dist:
min_dist = dist_km
nearest_name = anchor_name
if min_dist < float("inf"):
return (min_dist, nearest_name)
return (None, None)
def get_events(self) -> list:
"""Get current hotspot events."""
return self._events
def get_new_ignitions(self) -> list:
"""Get only potential new ignitions (not near known fires)."""
return [e for e in self._events if e.get("properties", {}).get("new_ignition")]
@property
def health_status(self) -> dict:
"""Get adapter health status."""
new_ignitions = len(self.get_new_ignitions())
return {
"source": "firms",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"new_ignitions": new_ignitions,
"last_fetch": self._last_tick,
}

396
meshai/env/nws.py vendored
View file

@ -1,193 +1,203 @@
"""NWS Active Alerts adapter."""
import json
import logging
import time
from datetime import datetime
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import NWSConfig
logger = logging.getLogger(__name__)
class NWSAlertsAdapter:
"""NWS Active Alerts -- polls api.weather.gov"""
def __init__(self, config: "NWSConfig"):
self._areas = config.areas or ["ID"]
self._user_agent = config.user_agent or "(meshai, ops@example.com)"
self._severity_min = config.severity_min or "moderate"
self._tick_interval = config.tick_seconds or 60
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._backoff_until = 0.0
self._is_loaded = False
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Rate limit backoff
if now < self._backoff_until:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _fetch(self) -> bool:
"""Fetch alerts from NWS API.
Returns:
True if data changed
"""
areas = ",".join(self._areas)
url = f"https://api.weather.gov/alerts/active?area={areas}"
headers = {
"User-Agent": self._user_agent,
"Accept": "application/geo+json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 429:
self._backoff_until = time.time() + 5
logger.warning("NWS rate limited, backing off 5s")
else:
logger.warning(f"NWS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"NWS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"NWS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
features = data.get("features", [])
new_events = []
# Severity levels for filtering
severity_levels = ["unknown", "minor", "moderate", "severe", "extreme"]
try:
min_idx = severity_levels.index(self._severity_min.lower())
except ValueError:
min_idx = 2 # default to moderate
for feature in features:
props = feature.get("properties", {})
# Severity filtering
severity = (props.get("severity") or "Unknown").lower()
try:
sev_idx = severity_levels.index(severity)
except ValueError:
sev_idx = 0
if sev_idx < min_idx:
continue
# Parse timestamps
onset = self._parse_iso(props.get("onset"))
expires = self._parse_iso(props.get("expires"))
event = {
"source": "nws",
"event_id": props.get("id", ""),
"event_type": props.get("event", "Unknown"),
"severity": severity,
"headline": props.get("headline", ""),
"description": (props.get("description") or "")[:500],
"onset": onset,
"expires": expires,
"areas": props.get("geocode", {}).get("UGC", []),
"area_desc": props.get("areaDesc", ""),
"fetched_at": time.time(),
}
# Try to get centroid from geometry
geom = feature.get("geometry")
if geom and geom.get("coordinates"):
try:
coords = geom["coordinates"]
if geom.get("type") == "Polygon" and coords:
# Compute centroid of first ring
ring = coords[0]
lat_sum = sum(c[1] for c in ring)
lon_sum = sum(c[0] for c in ring)
event["lat"] = lat_sum / len(ring)
event["lon"] = lon_sum / len(ring)
except Exception:
pass
new_events.append(event)
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"NWS alerts updated: {len(new_events)} active")
return changed
def _parse_iso(self, iso_str: str) -> float:
"""Parse ISO timestamp to epoch float."""
if not iso_str:
return 0.0
try:
# Handle various ISO formats
if iso_str.endswith("Z"):
iso_str = iso_str[:-1] + "+00:00"
dt = datetime.fromisoformat(iso_str)
return dt.timestamp()
except Exception:
return 0.0
def get_events(self) -> list:
"""Get current events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "nws",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
}
"""NWS Active Alerts adapter."""
import json
import logging
import time
from datetime import datetime
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import NWSConfig
logger = logging.getLogger(__name__)
class NWSAlertsAdapter:
"""NWS Active Alerts -- polls api.weather.gov"""
def __init__(self, config: "NWSConfig"):
self._areas = config.areas or ["ID"]
self._user_agent = config.user_agent or "(meshai, ops@example.com)"
self._severity_min = config.severity_min or "moderate"
self._tick_interval = config.tick_seconds or 60
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._backoff_until = 0.0
self._is_loaded = False
def _map_nws_severity(self, nws_severity: str) -> str:
"""Map NWS severity to 3-level system."""
if nws_severity == "extreme":
return "immediate"
elif nws_severity in ("severe", "warning"):
return "priority"
else: # moderate, minor, unknown
return "routine"
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Rate limit backoff
if now < self._backoff_until:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _fetch(self) -> bool:
"""Fetch alerts from NWS API.
Returns:
True if data changed
"""
areas = ",".join(self._areas)
url = f"https://api.weather.gov/alerts/active?area={areas}"
headers = {
"User-Agent": self._user_agent,
"Accept": "application/geo+json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 429:
self._backoff_until = time.time() + 5
logger.warning("NWS rate limited, backing off 5s")
else:
logger.warning(f"NWS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"NWS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"NWS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
features = data.get("features", [])
new_events = []
# Severity levels for filtering
severity_levels = ["unknown", "minor", "moderate", "severe", "extreme"]
try:
min_idx = severity_levels.index(self._severity_min.lower())
except ValueError:
min_idx = 2 # default to moderate
for feature in features:
props = feature.get("properties", {})
# Severity filtering
severity = (props.get("severity") or "Unknown").lower()
try:
sev_idx = severity_levels.index(severity)
except ValueError:
sev_idx = 0
if sev_idx < min_idx:
continue
# Parse timestamps
onset = self._parse_iso(props.get("onset"))
expires = self._parse_iso(props.get("expires"))
event = {
"source": "nws",
"event_id": props.get("id", ""),
"event_type": props.get("event", "Unknown"),
"severity": severity,
"headline": props.get("headline", ""),
"description": (props.get("description") or "")[:500],
"onset": onset,
"expires": expires,
"areas": props.get("geocode", {}).get("UGC", []),
"area_desc": props.get("areaDesc", ""),
"fetched_at": time.time(),
}
# Try to get centroid from geometry
geom = feature.get("geometry")
if geom and geom.get("coordinates"):
try:
coords = geom["coordinates"]
if geom.get("type") == "Polygon" and coords:
# Compute centroid of first ring
ring = coords[0]
lat_sum = sum(c[1] for c in ring)
lon_sum = sum(c[0] for c in ring)
event["lat"] = lat_sum / len(ring)
event["lon"] = lon_sum / len(ring)
except Exception:
pass
new_events.append(event)
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"NWS alerts updated: {len(new_events)} active")
return changed
def _parse_iso(self, iso_str: str) -> float:
"""Parse ISO timestamp to epoch float."""
if not iso_str:
return 0.0
try:
# Handle various ISO formats
if iso_str.endswith("Z"):
iso_str = iso_str[:-1] + "+00:00"
dt = datetime.fromisoformat(iso_str)
return dt.timestamp()
except Exception:
return 0.0
def get_events(self) -> list:
"""Get current events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "nws",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
}

732
meshai/env/roads511.py vendored
View file

@ -1,366 +1,366 @@
"""511 Road Conditions adapter.
Polls a configurable 511 API for road events. The base URL is fully
configurable as each state has a different 511 system.
"""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urljoin
if TYPE_CHECKING:
from ..config import Roads511Config
logger = logging.getLogger(__name__)
class Roads511Adapter:
"""511 road conditions polling adapter."""
def __init__(self, config: "Roads511Config"):
self._api_key = self._resolve_env(config.api_key or "")
self._base_url = (config.base_url or "").rstrip("/")
self._endpoints = config.endpoints or ["/get/event"]
self._bbox = config.bbox or [] # [west, south, east, north]
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._auth_failed = False # Stop retrying on auth failures
if not self._base_url:
logger.info("511: No base URL configured, adapter disabled")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No base URL configured
if not self._base_url:
return False
# Auth failed - don't keep retrying
if self._auth_failed:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch events from all configured endpoints.
Returns:
True if data changed
"""
new_events = []
now = time.time()
for endpoint in self._endpoints:
events = self._fetch_endpoint(endpoint, now)
if events:
new_events.extend(events)
# Apply bbox filter if configured
if self._bbox and len(self._bbox) == 4:
west, south, east, north = self._bbox
new_events = [
e for e in new_events
if e.get("lat") is not None and e.get("lon") is not None
and west <= e["lon"] <= east and south <= e["lat"] <= north
]
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._is_loaded = True
if changed:
logger.info(f"511 road events updated: {len(new_events)} active")
return changed
def _fetch_endpoint(self, endpoint: str, now: float) -> list:
"""Fetch events from a single endpoint.
Args:
endpoint: API endpoint path
now: Current timestamp
Returns:
List of event dicts
"""
url = urljoin(self._base_url + "/", endpoint.lstrip("/"))
# Add API key if configured
if self._api_key:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}key={self._api_key}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(
f"511 auth error: {e.code} - check API key configuration for {self._base_url}"
)
self._last_error = f"Auth error {e.code} - check API key"
self._auth_failed = True
return []
else:
logger.warning(f"511 HTTP error for {endpoint}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return []
except URLError as e:
logger.warning(f"511 connection error for {endpoint}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return []
except Exception as e:
logger.warning(f"511 fetch error for {endpoint}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return []
# Parse response - handle various 511 API formats
return self._parse_response(data, now)
def _parse_response(self, data, now: float) -> list:
"""Parse 511 API response.
Different states use different formats. Try common patterns.
Args:
data: JSON response data
now: Current timestamp
Returns:
List of event dicts
"""
events = []
# Handle array response
if isinstance(data, list):
items = data
# Handle wrapped response
elif isinstance(data, dict):
# Try common wrapper keys
items = (
data.get("events") or
data.get("items") or
data.get("data") or
data.get("results") or
[]
)
if not isinstance(items, list):
items = [data] if self._looks_like_event(data) else []
else:
return []
for item in items:
event = self._parse_event(item, now)
if event:
events.append(event)
self._consecutive_errors = 0
self._last_error = None
return events
def _looks_like_event(self, item: dict) -> bool:
"""Check if dict looks like a 511 event."""
return bool(
item.get("id") or item.get("EventId") or item.get("event_id")
)
def _parse_event(self, item: dict, now: float) -> dict:
"""Parse a single 511 event.
Args:
item: Event dict from API
now: Current timestamp
Returns:
Normalized event dict or None
"""
try:
# Try various ID field names
event_id = (
item.get("id") or
item.get("EventId") or
item.get("event_id") or
item.get("ID") or
str(hash(str(item)))[:12]
)
# Try various type field names
event_type = (
item.get("EventType") or
item.get("event_type") or
item.get("type") or
item.get("Type") or
item.get("category") or
"Road Event"
)
# Try various road name fields
roadway = (
item.get("RoadwayName") or
item.get("roadway_name") or
item.get("roadway") or
item.get("Roadway") or
item.get("road") or
item.get("route") or
""
)
# Try various description fields
description = (
item.get("Description") or
item.get("description") or
item.get("message") or
item.get("Message") or
item.get("details") or
""
)
# Try various location fields
lat = (
item.get("Latitude") or
item.get("latitude") or
item.get("lat") or
item.get("StartLatitude") or
None
)
lon = (
item.get("Longitude") or
item.get("longitude") or
item.get("lon") or
item.get("lng") or
item.get("StartLongitude") or
None
)
# Try to get coordinates from nested location object
if lat is None and "location" in item:
loc = item["location"]
if isinstance(loc, dict):
lat = loc.get("latitude") or loc.get("lat")
lon = loc.get("longitude") or loc.get("lon") or loc.get("lng")
# Check closure status
is_closure = (
item.get("IsFullClosure") or
item.get("is_full_closure") or
item.get("fullClosure") or
item.get("closed") or
"closure" in str(event_type).lower() or
"closed" in str(description).lower()
)
# Determine severity
if is_closure:
severity = "warning"
elif "construction" in str(event_type).lower():
severity = "advisory"
elif "incident" in str(event_type).lower():
severity = "advisory"
else:
severity = "info"
# Format headline
if roadway and description:
headline = f"{roadway}: {description[:100]}"
elif roadway:
headline = f"{roadway}: {event_type}"
elif description:
headline = description[:120]
else:
headline = f"{event_type}"
# Try to get timestamp for expiry
last_updated = (
item.get("LastUpdated") or
item.get("last_updated") or
item.get("updated") or
item.get("timestamp") or
None
)
# Default 6 hour TTL, refreshed every tick
expires = now + 21600
event = {
"source": "511",
"event_id": f"511_{event_id}",
"event_type": event_type,
"headline": headline,
"description": description[:500] if description else "",
"severity": severity,
"lat": float(lat) if lat is not None else None,
"lon": float(lon) if lon is not None else None,
"expires": expires,
"fetched_at": now,
"properties": {
"roadway": roadway,
"is_closure": bool(is_closure),
"last_updated": last_updated,
},
}
return event
except Exception as e:
logger.debug(f"511 event parse error: {e} - item: {item}")
return None
def get_events(self) -> list:
"""Get current road events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "511",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"auth_failed": self._auth_failed,
}
"""511 Road Conditions adapter.
Polls a configurable 511 API for road events. The base URL is fully
configurable as each state has a different 511 system.
"""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urljoin
if TYPE_CHECKING:
from ..config import Roads511Config
logger = logging.getLogger(__name__)
class Roads511Adapter:
"""511 road conditions polling adapter."""
def __init__(self, config: "Roads511Config"):
self._api_key = self._resolve_env(config.api_key or "")
self._base_url = (config.base_url or "").rstrip("/")
self._endpoints = config.endpoints or ["/get/event"]
self._bbox = config.bbox or [] # [west, south, east, north]
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._auth_failed = False # Stop retrying on auth failures
if not self._base_url:
logger.info("511: No base URL configured, adapter disabled")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No base URL configured
if not self._base_url:
return False
# Auth failed - don't keep retrying
if self._auth_failed:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch events from all configured endpoints.
Returns:
True if data changed
"""
new_events = []
now = time.time()
for endpoint in self._endpoints:
events = self._fetch_endpoint(endpoint, now)
if events:
new_events.extend(events)
# Apply bbox filter if configured
if self._bbox and len(self._bbox) == 4:
west, south, east, north = self._bbox
new_events = [
e for e in new_events
if e.get("lat") is not None and e.get("lon") is not None
and west <= e["lon"] <= east and south <= e["lat"] <= north
]
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._is_loaded = True
if changed:
logger.info(f"511 road events updated: {len(new_events)} active")
return changed
def _fetch_endpoint(self, endpoint: str, now: float) -> list:
"""Fetch events from a single endpoint.
Args:
endpoint: API endpoint path
now: Current timestamp
Returns:
List of event dicts
"""
url = urljoin(self._base_url + "/", endpoint.lstrip("/"))
# Add API key if configured
if self._api_key:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}key={self._api_key}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(
f"511 auth error: {e.code} - check API key configuration for {self._base_url}"
)
self._last_error = f"Auth error {e.code} - check API key"
self._auth_failed = True
return []
else:
logger.warning(f"511 HTTP error for {endpoint}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return []
except URLError as e:
logger.warning(f"511 connection error for {endpoint}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return []
except Exception as e:
logger.warning(f"511 fetch error for {endpoint}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return []
# Parse response - handle various 511 API formats
return self._parse_response(data, now)
def _parse_response(self, data, now: float) -> list:
"""Parse 511 API response.
Different states use different formats. Try common patterns.
Args:
data: JSON response data
now: Current timestamp
Returns:
List of event dicts
"""
events = []
# Handle array response
if isinstance(data, list):
items = data
# Handle wrapped response
elif isinstance(data, dict):
# Try common wrapper keys
items = (
data.get("events") or
data.get("items") or
data.get("data") or
data.get("results") or
[]
)
if not isinstance(items, list):
items = [data] if self._looks_like_event(data) else []
else:
return []
for item in items:
event = self._parse_event(item, now)
if event:
events.append(event)
self._consecutive_errors = 0
self._last_error = None
return events
def _looks_like_event(self, item: dict) -> bool:
"""Check if dict looks like a 511 event."""
return bool(
item.get("id") or item.get("EventId") or item.get("event_id")
)
def _parse_event(self, item: dict, now: float) -> dict:
"""Parse a single 511 event.
Args:
item: Event dict from API
now: Current timestamp
Returns:
Normalized event dict or None
"""
try:
# Try various ID field names
event_id = (
item.get("id") or
item.get("EventId") or
item.get("event_id") or
item.get("ID") or
str(hash(str(item)))[:12]
)
# Try various type field names
event_type = (
item.get("EventType") or
item.get("event_type") or
item.get("type") or
item.get("Type") or
item.get("category") or
"Road Event"
)
# Try various road name fields
roadway = (
item.get("RoadwayName") or
item.get("roadway_name") or
item.get("roadway") or
item.get("Roadway") or
item.get("road") or
item.get("route") or
""
)
# Try various description fields
description = (
item.get("Description") or
item.get("description") or
item.get("message") or
item.get("Message") or
item.get("details") or
""
)
# Try various location fields
lat = (
item.get("Latitude") or
item.get("latitude") or
item.get("lat") or
item.get("StartLatitude") or
None
)
lon = (
item.get("Longitude") or
item.get("longitude") or
item.get("lon") or
item.get("lng") or
item.get("StartLongitude") or
None
)
# Try to get coordinates from nested location object
if lat is None and "location" in item:
loc = item["location"]
if isinstance(loc, dict):
lat = loc.get("latitude") or loc.get("lat")
lon = loc.get("longitude") or loc.get("lon") or loc.get("lng")
# Check closure status
is_closure = (
item.get("IsFullClosure") or
item.get("is_full_closure") or
item.get("fullClosure") or
item.get("closed") or
"closure" in str(event_type).lower() or
"closed" in str(description).lower()
)
# Determine severity
if is_closure:
severity = "priority"
elif "construction" in str(event_type).lower():
severity = "routine"
elif "incident" in str(event_type).lower():
severity = "routine"
else:
severity = "routine"
# Format headline
if roadway and description:
headline = f"{roadway}: {description[:100]}"
elif roadway:
headline = f"{roadway}: {event_type}"
elif description:
headline = description[:120]
else:
headline = f"{event_type}"
# Try to get timestamp for expiry
last_updated = (
item.get("LastUpdated") or
item.get("last_updated") or
item.get("updated") or
item.get("timestamp") or
None
)
# Default 6 hour TTL, refreshed every tick
expires = now + 21600
event = {
"source": "511",
"event_id": f"511_{event_id}",
"event_type": event_type,
"headline": headline,
"description": description[:500] if description else "",
"severity": severity,
"lat": float(lat) if lat is not None else None,
"lon": float(lon) if lon is not None else None,
"expires": expires,
"fetched_at": now,
"properties": {
"roadway": roadway,
"is_closure": bool(is_closure),
"last_updated": last_updated,
},
}
return event
except Exception as e:
logger.debug(f"511 event parse error: {e} - item: {item}")
return None
def get_events(self) -> list:
"""Get current road events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "511",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"auth_failed": self._auth_failed,
}

544
meshai/env/swpc.py vendored
View file

@ -1,272 +1,272 @@
"""NOAA Space Weather Prediction Center adapter."""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import SWPCConfig
logger = logging.getLogger(__name__)
class SWPCAdapter:
"""NOAA Space Weather -- multi-endpoint with staggered ticks."""
# Endpoint definitions: (url, interval_seconds)
ENDPOINTS = {
"scales": ("https://services.swpc.noaa.gov/products/noaa-scales.json", 300),
"kp": ("https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json", 600),
"alerts": ("https://services.swpc.noaa.gov/products/alerts.json", 120),
"f107": ("https://services.swpc.noaa.gov/json/f107_cm_flux.json", 86400),
}
def __init__(self, config: "SWPCConfig"):
self._last_tick = {} # endpoint -> last_tick timestamp
self._status = {}
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# Initialize tick times to 0
for endpoint in self.ENDPOINTS:
self._last_tick[endpoint] = 0.0
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
changed = False
now = time.time()
for endpoint, (url, interval) in self.ENDPOINTS.items():
if now - self._last_tick[endpoint] >= interval:
self._last_tick[endpoint] = now
if self._fetch_endpoint(endpoint, url):
changed = True
if changed:
self._update_events()
return changed
def _fetch_endpoint(self, endpoint: str, url: str) -> bool:
"""Fetch a single endpoint.
Returns:
True on success
"""
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"SWPC {endpoint} HTTP error: {e.code}")
self._last_error = f"{endpoint}: HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"SWPC {endpoint} connection error: {e.reason}")
self._last_error = f"{endpoint}: {e.reason}"
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"SWPC {endpoint} error: {e}")
self._last_error = f"{endpoint}: {e}"
self._consecutive_errors += 1
return False
# Parse based on endpoint
try:
if endpoint == "scales":
self._parse_scales(data)
elif endpoint == "kp":
self._parse_kp(data)
elif endpoint == "alerts":
self._parse_alerts(data)
elif endpoint == "f107":
self._parse_f107(data)
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
return True
except Exception as e:
logger.warning(f"SWPC {endpoint} parse error: {e}")
self._last_error = f"{endpoint}: parse error"
return False
def _parse_scales(self, data):
"""Parse noaa-scales.json.
Data format: {""-1": {...}, "0": {...}, "1": {...}, ...}
"0" is current.
"""
current = data.get("0", {})
r_data = current.get("R", {})
s_data = current.get("S", {})
g_data = current.get("G", {})
# Handle empty string or None Scale values
def parse_scale(val):
if val is None or val == "":
return 0
try:
return int(val)
except (ValueError, TypeError):
return 0
self._status["r_scale"] = parse_scale(r_data.get("Scale"))
self._status["s_scale"] = parse_scale(s_data.get("Scale"))
self._status["g_scale"] = parse_scale(g_data.get("Scale"))
def _parse_kp(self, data):
"""Parse noaa-planetary-k-index.json.
Data format: array of objects with time_tag, Kp, a_running, station_count
Last entry is most recent. Store full history for charting.
"""
if not data:
return
# Store full history (last 24-48 hours of readings)
kp_history = []
for entry in data:
if isinstance(entry, dict):
try:
kp_history.append({
"time": entry.get("time_tag", ""),
"value": float(entry.get("Kp", 0)),
})
except (ValueError, TypeError):
continue
elif isinstance(entry, list) and len(entry) > 1:
# Legacy array format fallback
try:
kp_history.append({
"time": entry[0] if len(entry) > 0 else "",
"value": float(entry[1]),
})
except (ValueError, TypeError):
continue
self._status["kp_history"] = kp_history
# Get last entry (most recent) for current value
last_entry = data[-1]
if isinstance(last_entry, dict):
try:
self._status["kp_current"] = float(last_entry.get("Kp", 0))
except (ValueError, TypeError):
pass
self._status["kp_timestamp"] = last_entry.get("time_tag", "")
elif isinstance(last_entry, list) and len(last_entry) > 1:
# Legacy array format fallback
try:
self._status["kp_current"] = float(last_entry[1])
except (ValueError, TypeError):
pass
if len(last_entry) > 0:
self._status["kp_timestamp"] = last_entry[0]
def _parse_alerts(self, data):
"""Parse alerts.json.
Data format: array of objects with product_id, issue_datetime, message
"""
warnings = []
if isinstance(data, list):
for alert in data[:5]: # Keep most recent 5
message = alert.get("message", "")
# Extract first line as headline
headline = message.split("\n")[0].strip()
if headline:
warnings.append(headline)
self._status["active_warnings"] = warnings
def _parse_f107(self, data):
"""Parse f107_cm_flux.json.
Data format: array of objects with time_tag, flux
Store history for potential charting.
"""
if not data:
return
# Store SFI history (last 30 days of readings)
sfi_history = []
if isinstance(data, list):
for entry in data[-30:]: # Last 30 entries
if isinstance(entry, dict):
try:
sfi_history.append({
"time": entry.get("time_tag", ""),
"value": float(entry.get("flux", 0)),
})
except (ValueError, TypeError):
continue
self._status["sfi_history"] = sfi_history
# Get most recent entry (last in list)
if isinstance(data, list) and data:
last = data[-1]
if isinstance(last, dict):
try:
self._status["sfi"] = float(last.get("flux", 0))
except (ValueError, TypeError):
pass
def _update_events(self):
"""Generate events for significant space weather conditions."""
# Generate events for R-scale >= 3 (radio blackout)
self._events = []
r_scale = self._status.get("r_scale", 0)
if r_scale >= 3:
self._events.append({
"source": "swpc",
"event_id": f"swpc_r{r_scale}_{int(time.time())}",
"event_type": f"R{r_scale} Radio Blackout",
"severity": "warning" if r_scale >= 3 else "advisory",
"headline": f"R{r_scale} Radio Blackout in progress",
"expires": time.time() + 3600, # 1hr TTL
"areas": [],
"fetched_at": time.time(),
})
def get_status(self) -> dict:
"""Get current SWPC status."""
return self._status
def get_events(self) -> list:
"""Get current alert events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "swpc",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": max(self._last_tick.values()) if self._last_tick else 0,
}
"""NOAA Space Weather Prediction Center adapter."""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import SWPCConfig
logger = logging.getLogger(__name__)
class SWPCAdapter:
"""NOAA Space Weather -- multi-endpoint with staggered ticks."""
# Endpoint definitions: (url, interval_seconds)
ENDPOINTS = {
"scales": ("https://services.swpc.noaa.gov/products/noaa-scales.json", 300),
"kp": ("https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json", 600),
"alerts": ("https://services.swpc.noaa.gov/products/alerts.json", 120),
"f107": ("https://services.swpc.noaa.gov/json/f107_cm_flux.json", 86400),
}
def __init__(self, config: "SWPCConfig"):
self._last_tick = {} # endpoint -> last_tick timestamp
self._status = {}
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# Initialize tick times to 0
for endpoint in self.ENDPOINTS:
self._last_tick[endpoint] = 0.0
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
changed = False
now = time.time()
for endpoint, (url, interval) in self.ENDPOINTS.items():
if now - self._last_tick[endpoint] >= interval:
self._last_tick[endpoint] = now
if self._fetch_endpoint(endpoint, url):
changed = True
if changed:
self._update_events()
return changed
def _fetch_endpoint(self, endpoint: str, url: str) -> bool:
"""Fetch a single endpoint.
Returns:
True on success
"""
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"SWPC {endpoint} HTTP error: {e.code}")
self._last_error = f"{endpoint}: HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"SWPC {endpoint} connection error: {e.reason}")
self._last_error = f"{endpoint}: {e.reason}"
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"SWPC {endpoint} error: {e}")
self._last_error = f"{endpoint}: {e}"
self._consecutive_errors += 1
return False
# Parse based on endpoint
try:
if endpoint == "scales":
self._parse_scales(data)
elif endpoint == "kp":
self._parse_kp(data)
elif endpoint == "alerts":
self._parse_alerts(data)
elif endpoint == "f107":
self._parse_f107(data)
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
return True
except Exception as e:
logger.warning(f"SWPC {endpoint} parse error: {e}")
self._last_error = f"{endpoint}: parse error"
return False
def _parse_scales(self, data):
"""Parse noaa-scales.json.
Data format: {""-1": {...}, "0": {...}, "1": {...}, ...}
"0" is current.
"""
current = data.get("0", {})
r_data = current.get("R", {})
s_data = current.get("S", {})
g_data = current.get("G", {})
# Handle empty string or None Scale values
def parse_scale(val):
if val is None or val == "":
return 0
try:
return int(val)
except (ValueError, TypeError):
return 0
self._status["r_scale"] = parse_scale(r_data.get("Scale"))
self._status["s_scale"] = parse_scale(s_data.get("Scale"))
self._status["g_scale"] = parse_scale(g_data.get("Scale"))
def _parse_kp(self, data):
"""Parse noaa-planetary-k-index.json.
Data format: array of objects with time_tag, Kp, a_running, station_count
Last entry is most recent. Store full history for charting.
"""
if not data:
return
# Store full history (last 24-48 hours of readings)
kp_history = []
for entry in data:
if isinstance(entry, dict):
try:
kp_history.append({
"time": entry.get("time_tag", ""),
"value": float(entry.get("Kp", 0)),
})
except (ValueError, TypeError):
continue
elif isinstance(entry, list) and len(entry) > 1:
# Legacy array format fallback
try:
kp_history.append({
"time": entry[0] if len(entry) > 0 else "",
"value": float(entry[1]),
})
except (ValueError, TypeError):
continue
self._status["kp_history"] = kp_history
# Get last entry (most recent) for current value
last_entry = data[-1]
if isinstance(last_entry, dict):
try:
self._status["kp_current"] = float(last_entry.get("Kp", 0))
except (ValueError, TypeError):
pass
self._status["kp_timestamp"] = last_entry.get("time_tag", "")
elif isinstance(last_entry, list) and len(last_entry) > 1:
# Legacy array format fallback
try:
self._status["kp_current"] = float(last_entry[1])
except (ValueError, TypeError):
pass
if len(last_entry) > 0:
self._status["kp_timestamp"] = last_entry[0]
def _parse_alerts(self, data):
"""Parse alerts.json.
Data format: array of objects with product_id, issue_datetime, message
"""
warnings = []
if isinstance(data, list):
for alert in data[:5]: # Keep most recent 5
message = alert.get("message", "")
# Extract first line as headline
headline = message.split("\n")[0].strip()
if headline:
warnings.append(headline)
self._status["active_warnings"] = warnings
def _parse_f107(self, data):
"""Parse f107_cm_flux.json.
Data format: array of objects with time_tag, flux
Store history for potential charting.
"""
if not data:
return
# Store SFI history (last 30 days of readings)
sfi_history = []
if isinstance(data, list):
for entry in data[-30:]: # Last 30 entries
if isinstance(entry, dict):
try:
sfi_history.append({
"time": entry.get("time_tag", ""),
"value": float(entry.get("flux", 0)),
})
except (ValueError, TypeError):
continue
self._status["sfi_history"] = sfi_history
# Get most recent entry (last in list)
if isinstance(data, list) and data:
last = data[-1]
if isinstance(last, dict):
try:
self._status["sfi"] = float(last.get("flux", 0))
except (ValueError, TypeError):
pass
def _update_events(self):
"""Generate events for significant space weather conditions."""
# Generate events for R-scale >= 3 (radio blackout)
self._events = []
r_scale = self._status.get("r_scale", 0)
if r_scale >= 3:
self._events.append({
"source": "swpc",
"event_id": f"swpc_r{r_scale}_{int(time.time())}",
"event_type": f"R{r_scale} Radio Blackout",
"severity": "priority" if r_scale >= 3 else "routine",
"headline": f"R{r_scale} Radio Blackout in progress",
"expires": time.time() + 3600, # 1hr TTL
"areas": [],
"fetched_at": time.time(),
})
def get_status(self) -> dict:
"""Get current SWPC status."""
return self._status
def get_events(self) -> list:
"""Get current alert events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "swpc",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": max(self._last_tick.values()) if self._last_tick else 0,
}

508
meshai/env/traffic.py vendored
View file

@ -1,254 +1,254 @@
"""TomTom Traffic Flow adapter."""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import TomTomConfig
logger = logging.getLogger(__name__)
class TomTomTrafficAdapter:
"""TomTom Traffic Flow Segment Data polling."""
BASE_URL = "https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/10/json"
def __init__(self, config: "TomTomConfig"):
self._api_key = self._resolve_env(config.api_key or "")
self._corridors = config.corridors or []
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._daily_requests = 0
self._daily_reset = 0.0
if not self._api_key:
logger.warning("TomTom API key not configured, adapter disabled")
if not self._corridors:
logger.info("TomTom: No corridors configured, adapter idle")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Reset daily counter at midnight
if now - self._daily_reset > 86400:
self._daily_requests = 0
self._daily_reset = now
# No API key or corridors
if not self._api_key or not self._corridors:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch traffic flow for all configured corridors.
Returns:
True if data changed
"""
new_events = []
now = time.time()
any_error = False
for corridor in self._corridors:
# Support both dict and object formats
if isinstance(corridor, dict):
name = corridor.get("name", "Unknown")
lat = corridor.get("lat")
lon = corridor.get("lon")
else:
name = getattr(corridor, "name", "Unknown")
lat = getattr(corridor, "lat", None)
lon = getattr(corridor, "lon", None)
if lat is None or lon is None:
continue
event = self._fetch_point(name, lat, lon, now)
if event:
new_events.append(event)
else:
any_error = True
if any_error and not new_events:
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
if not any_error:
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"TomTom traffic updated: {len(new_events)} corridors")
return changed
def _fetch_point(self, name: str, lat: float, lon: float, now: float) -> dict:
"""Fetch traffic flow for a single point.
Args:
name: Corridor name
lat: Latitude
lon: Longitude
now: Current timestamp
Returns:
Event dict or None on error
"""
params = {
"point": f"{lat},{lon}",
"key": self._api_key,
"unit": "MPH",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
self._daily_requests += 1
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(f"TomTom auth error: {e.code} - check API key")
self._last_error = f"Auth error {e.code}"
else:
logger.warning(f"TomTom HTTP error for {name}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return None
except URLError as e:
logger.warning(f"TomTom connection error for {name}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return None
except Exception as e:
logger.warning(f"TomTom fetch error for {name}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return None
# Parse response
try:
flow = data.get("flowSegmentData", {})
current_speed = flow.get("currentSpeed", 0)
free_flow_speed = flow.get("freeFlowSpeed", 0)
current_time = flow.get("currentTravelTime", 0)
free_flow_time = flow.get("freeFlowTravelTime", 0)
confidence = flow.get("confidence", 0)
road_closure = flow.get("roadClosure", False)
# Calculate speed ratio for severity
if free_flow_speed > 0:
ratio = current_speed / free_flow_speed
else:
ratio = 1.0
# Determine severity
if road_closure:
severity = "warning"
elif ratio >= 0.8:
severity = "info"
elif ratio >= 0.5:
severity = "advisory"
else:
severity = "warning"
# Format headline
if road_closure:
headline = f"{name}: CLOSED"
else:
pct = int(ratio * 100)
headline = f"{name}: {int(current_speed)}mph ({pct}% of free flow)"
event = {
"source": "traffic",
"event_id": f"traffic_{name.replace(' ', '_').lower()}",
"event_type": "Traffic Flow",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 600, # 10 min TTL
"fetched_at": now,
"properties": {
"corridor": name,
"currentSpeed": current_speed,
"freeFlowSpeed": free_flow_speed,
"speedRatio": ratio,
"currentTravelTime": current_time,
"freeFlowTravelTime": free_flow_time,
"confidence": confidence,
"roadClosure": road_closure,
},
}
return event
except Exception as e:
logger.warning(f"TomTom parse error for {name}: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return None
def get_events(self) -> list:
"""Get current traffic events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "traffic",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"corridor_count": len(self._corridors),
"daily_requests": self._daily_requests,
}
"""TomTom Traffic Flow adapter."""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import TomTomConfig
logger = logging.getLogger(__name__)
class TomTomTrafficAdapter:
"""TomTom Traffic Flow Segment Data polling."""
BASE_URL = "https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/10/json"
def __init__(self, config: "TomTomConfig"):
self._api_key = self._resolve_env(config.api_key or "")
self._corridors = config.corridors or []
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._daily_requests = 0
self._daily_reset = 0.0
if not self._api_key:
logger.warning("TomTom API key not configured, adapter disabled")
if not self._corridors:
logger.info("TomTom: No corridors configured, adapter idle")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Reset daily counter at midnight
if now - self._daily_reset > 86400:
self._daily_requests = 0
self._daily_reset = now
# No API key or corridors
if not self._api_key or not self._corridors:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch traffic flow for all configured corridors.
Returns:
True if data changed
"""
new_events = []
now = time.time()
any_error = False
for corridor in self._corridors:
# Support both dict and object formats
if isinstance(corridor, dict):
name = corridor.get("name", "Unknown")
lat = corridor.get("lat")
lon = corridor.get("lon")
else:
name = getattr(corridor, "name", "Unknown")
lat = getattr(corridor, "lat", None)
lon = getattr(corridor, "lon", None)
if lat is None or lon is None:
continue
event = self._fetch_point(name, lat, lon, now)
if event:
new_events.append(event)
else:
any_error = True
if any_error and not new_events:
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
if not any_error:
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"TomTom traffic updated: {len(new_events)} corridors")
return changed
def _fetch_point(self, name: str, lat: float, lon: float, now: float) -> dict:
"""Fetch traffic flow for a single point.
Args:
name: Corridor name
lat: Latitude
lon: Longitude
now: Current timestamp
Returns:
Event dict or None on error
"""
params = {
"point": f"{lat},{lon}",
"key": self._api_key,
"unit": "MPH",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
self._daily_requests += 1
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(f"TomTom auth error: {e.code} - check API key")
self._last_error = f"Auth error {e.code}"
else:
logger.warning(f"TomTom HTTP error for {name}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return None
except URLError as e:
logger.warning(f"TomTom connection error for {name}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return None
except Exception as e:
logger.warning(f"TomTom fetch error for {name}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return None
# Parse response
try:
flow = data.get("flowSegmentData", {})
current_speed = flow.get("currentSpeed", 0)
free_flow_speed = flow.get("freeFlowSpeed", 0)
current_time = flow.get("currentTravelTime", 0)
free_flow_time = flow.get("freeFlowTravelTime", 0)
confidence = flow.get("confidence", 0)
road_closure = flow.get("roadClosure", False)
# Calculate speed ratio for severity
if free_flow_speed > 0:
ratio = current_speed / free_flow_speed
else:
ratio = 1.0
# Determine severity
if road_closure:
severity = "priority"
elif ratio >= 0.8:
severity = "routine"
elif ratio >= 0.5:
severity = "routine"
else:
severity = "priority"
# Format headline
if road_closure:
headline = f"{name}: CLOSED"
else:
pct = int(ratio * 100)
headline = f"{name}: {int(current_speed)}mph ({pct}% of free flow)"
event = {
"source": "traffic",
"event_id": f"traffic_{name.replace(' ', '_').lower()}",
"event_type": "Traffic Flow",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 600, # 10 min TTL
"fetched_at": now,
"properties": {
"corridor": name,
"currentSpeed": current_speed,
"freeFlowSpeed": free_flow_speed,
"speedRatio": ratio,
"currentTravelTime": current_time,
"freeFlowTravelTime": free_flow_time,
"confidence": confidence,
"roadClosure": road_closure,
},
}
return event
except Exception as e:
logger.warning(f"TomTom parse error for {name}: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return None
def get_events(self) -> list:
"""Get current traffic events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "traffic",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"corridor_count": len(self._corridors),
"daily_requests": self._daily_requests,
}

906
meshai/env/usgs.py vendored
View file

@ -1,453 +1,453 @@
"""USGS Water Services stream gauge adapter with NWS flood stage auto-lookup.
# TODO: Migrate to api.waterdata.usgs.gov OGC API before Q1 2027
# Legacy waterservices.usgs.gov will be decommissioned.
# See: https://www.usgs.gov/tools/usgs-water-data-apis
"""
import json
import logging
import time
from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import USGSConfig
logger = logging.getLogger(__name__)
# Minimum tick interval per USGS guidelines (do not fetch same data more than hourly)
MIN_TICK_SECONDS = 900 # 15 minutes
# Cache for NWS flood stages (rarely change)
_nwps_cache: dict[str, dict] = {}
_nwps_cache_time: dict[str, float] = {}
NWPS_CACHE_TTL = 86400 * 7 # 7 days
class USGSStreamsAdapter:
"""USGS instantaneous values for stream gauge readings with NWS flood stages."""
BASE_URL = "https://waterservices.usgs.gov/nwis/iv/"
NWPS_BASE_URL = "https://api.water.noaa.gov/nwps/v1/gauges"
def __init__(self, config: "USGSConfig"):
self._sites = config.sites or []
self._tick_interval = max(config.tick_seconds or 900, MIN_TICK_SECONDS)
self._flood_thresholds = getattr(config, "flood_thresholds", {}) or {}
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# Site metadata cache (name, flood stages from NWPS)
self._site_metadata: dict[str, dict] = {}
if self._tick_interval < MIN_TICK_SECONDS:
logger.warning(
f"USGS tick_seconds {config.tick_seconds} below minimum, using {MIN_TICK_SECONDS}"
)
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No sites configured
if not self._sites:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _get_site_ids(self) -> list[str]:
"""Extract site IDs from config (handles both string and dict formats)."""
site_ids = []
for site in self._sites:
if isinstance(site, str):
site_ids.append(site)
elif isinstance(site, dict):
site_ids.append(site.get("id", ""))
elif hasattr(site, "id"):
site_ids.append(site.id)
return [s for s in site_ids if s]
def _lookup_nwps_stages(self, usgs_site_id: str) -> Optional[dict]:
"""Lookup flood stages from NWS National Water Prediction Service.
The NWPS API uses NWS gauge IDs which may differ from USGS site IDs.
We try a mapping lookup first, then fall back to direct lookup.
Returns:
dict with action_stage, flood_stage, moderate_flood_stage, major_flood_stage
or None if not available
"""
global _nwps_cache, _nwps_cache_time
# Check cache
now = time.time()
if usgs_site_id in _nwps_cache:
if now - _nwps_cache_time.get(usgs_site_id, 0) < NWPS_CACHE_TTL:
return _nwps_cache[usgs_site_id]
# Try to find NWS gauge ID from USGS site ID
# First, query USGS site info to get the NWS ID crosswalk
nws_gauge_id = self._usgs_to_nws_crosswalk(usgs_site_id)
if not nws_gauge_id:
# Fall back to using USGS ID directly (sometimes they match)
nws_gauge_id = usgs_site_id
# Query NWPS for flood stages
url = f"{self.NWPS_BASE_URL}/{nws_gauge_id}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
# Extract flood stages
stages = {}
flood_info = data.get("flood", {})
if "action" in flood_info:
stages["action_stage"] = flood_info["action"].get("stage")
if "minor" in flood_info:
stages["flood_stage"] = flood_info["minor"].get("stage")
if "moderate" in flood_info:
stages["moderate_flood_stage"] = flood_info["moderate"].get("stage")
if "major" in flood_info:
stages["major_flood_stage"] = flood_info["major"].get("stage")
# Also grab the official name if available
stages["nws_name"] = data.get("name", "")
stages["nws_gauge_id"] = nws_gauge_id
# Cache result
_nwps_cache[usgs_site_id] = stages
_nwps_cache_time[usgs_site_id] = now
logger.info(f"NWPS flood stages for {usgs_site_id}: {stages}")
return stages
except HTTPError as e:
if e.code == 404:
# No NWPS data for this gauge - cache the miss
_nwps_cache[usgs_site_id] = {}
_nwps_cache_time[usgs_site_id] = now
logger.debug(f"No NWPS data for gauge {usgs_site_id}")
else:
logger.warning(f"NWPS lookup failed for {usgs_site_id}: HTTP {e.code}")
return None
except Exception as e:
logger.warning(f"NWPS lookup error for {usgs_site_id}: {e}")
return None
def _usgs_to_nws_crosswalk(self, usgs_site_id: str) -> Optional[str]:
"""Try to find NWS gauge ID from USGS site ID.
The USGS provides a crosswalk in their site metadata, but it's not
always populated. This is a best-effort lookup.
"""
# Try USGS site service for metadata including NWS ID
url = f"https://waterservices.usgs.gov/nwis/site/?format=rdb&sites={usgs_site_id}&siteOutput=expanded"
try:
req = Request(url, headers={"User-Agent": "MeshAI/1.0"})
with urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
# Parse RDB format - look for NWS ID in the data
# This is a simplified parser; full implementation would be more robust
for line in content.split("\n"):
if line.startswith(usgs_site_id):
# NWS station ID is typically in column ~30ish
# This varies by USGS response format
pass
except Exception:
pass
return None
def lookup_site(self, site_id: str) -> dict:
"""Lookup site metadata for config UI auto-populate.
Returns:
{
"site_id": "13090500",
"name": "Snake River nr Twin Falls ID",
"lat": 42.xxx,
"lon": -114.xxx,
"flood_stages": {
"action_stage": 9.0,
"flood_stage": 10.5,
"moderate_flood_stage": 12.0,
"major_flood_stage": 14.0,
} or None
}
"""
result = {"site_id": site_id, "name": None, "lat": None, "lon": None, "flood_stages": None}
# Get USGS site info
params = {
"format": "json",
"sites": site_id,
"siteOutput": "expanded",
}
url = f"https://waterservices.usgs.gov/nwis/site/?{urlencode(params)}"
try:
req = Request(url, headers={"User-Agent": "MeshAI/1.0", "Accept": "application/json"})
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
sites = data.get("value", {}).get("timeSeries", [])
if not sites:
# Try alternate format
sites_list = data.get("value", {}).get("sites", [])
if sites_list:
site_info = sites_list[0]
result["name"] = site_info.get("siteName", "")
result["lat"] = site_info.get("geoLocation", {}).get("geogLocation", {}).get("latitude")
result["lon"] = site_info.get("geoLocation", {}).get("geogLocation", {}).get("longitude")
except Exception as e:
logger.warning(f"USGS site lookup failed for {site_id}: {e}")
# Get NWS flood stages
stages = self._lookup_nwps_stages(site_id)
if stages:
result["flood_stages"] = {
"action_stage": stages.get("action_stage"),
"flood_stage": stages.get("flood_stage"),
"moderate_flood_stage": stages.get("moderate_flood_stage"),
"major_flood_stage": stages.get("major_flood_stage"),
}
if stages.get("nws_name") and not result["name"]:
result["name"] = stages["nws_name"]
return result
def _fetch(self) -> bool:
"""Fetch instantaneous values from USGS Water Services.
Returns:
True if data changed
"""
site_ids = self._get_site_ids()
if not site_ids:
return False
params = {
"format": "json",
"sites": ",".join(site_ids),
"parameterCd": "00060,00065", # Streamflow (cfs) and Gage height (ft)
"siteStatus": "active",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"USGS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"USGS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"USGS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
new_events = []
now = time.time()
try:
time_series = data.get("value", {}).get("timeSeries", [])
for ts in time_series:
source_info = ts.get("sourceInfo", {})
variable = ts.get("variable", {})
values_list = ts.get("values", [])
# Extract site info
site_name = source_info.get("siteName", "Unknown Site")
site_codes = source_info.get("siteCode", [])
site_id = site_codes[0].get("value", "") if site_codes else ""
# Cache site name
if site_id and site_id not in self._site_metadata:
self._site_metadata[site_id] = {"name": site_name}
# Extract location
geo_loc = source_info.get("geoLocation", {}).get("geogLocation", {})
lat = geo_loc.get("latitude")
lon = geo_loc.get("longitude")
# Extract variable info
var_name = variable.get("variableName", "Unknown")
unit_info = variable.get("unit", {})
unit_code = unit_info.get("unitCode", "")
# Determine parameter type
if "Streamflow" in var_name or "00060" in str(variable.get("variableCode", [])):
param_type = "flow"
param_name = "Streamflow"
elif "Gage height" in var_name or "00065" in str(variable.get("variableCode", [])):
param_type = "height"
param_name = "Gage height"
else:
param_type = "other"
param_name = var_name
# Get current value (most recent)
if not values_list or not values_list[0].get("value"):
continue
value_entries = values_list[0].get("value", [])
if not value_entries:
continue
latest = value_entries[-1]
value_str = latest.get("value", "")
timestamp_str = latest.get("dateTime", "")
try:
value = float(value_str)
except (ValueError, TypeError):
continue
# Get flood stages for this site
nwps_stages = self._lookup_nwps_stages(site_id)
# Determine severity based on flood stages (for gage height)
severity = "info"
flood_status = None
if param_type == "height" and nwps_stages:
major = nwps_stages.get("major_flood_stage")
moderate = nwps_stages.get("moderate_flood_stage")
minor = nwps_stages.get("flood_stage")
action = nwps_stages.get("action_stage")
if major and value >= major:
severity = "critical"
flood_status = "Major Flood"
elif moderate and value >= moderate:
severity = "warning"
flood_status = "Moderate Flood"
elif minor and value >= minor:
severity = "warning"
flood_status = "Minor Flood"
elif action and value >= action:
severity = "advisory"
flood_status = "Action Stage"
# Fall back to legacy manual thresholds
if severity == "info":
threshold = self._flood_thresholds.get(site_id, {}).get(param_type)
if threshold and value > threshold:
severity = "warning"
# Format headline
if param_type == "flow":
headline = f"{site_name}: {value:,.0f} {unit_code}"
else:
headline = f"{site_name}: {value:.1f} {unit_code}"
if flood_status:
headline += f"{flood_status}"
event = {
"source": "usgs",
"event_id": f"{site_id}_{param_type}",
"event_type": "Stream Gauge",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 1800, # 30 min TTL
"fetched_at": now,
"properties": {
"site_id": site_id,
"site_name": site_name,
"parameter": param_name,
"value": value,
"unit": unit_code,
"timestamp": timestamp_str,
"flood_status": flood_status,
"flood_stages": nwps_stages if nwps_stages else None,
},
}
new_events.append(event)
except Exception as e:
logger.warning(f"USGS parse error: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids or len(self._events) != len(new_events)
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"USGS streams updated: {len(new_events)} readings from {len(site_ids)} sites")
return changed
def get_events(self) -> list:
"""Get current stream gauge events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "usgs",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"site_count": len(self._get_site_ids()),
}
"""USGS Water Services stream gauge adapter with NWS flood stage auto-lookup.
# TODO: Migrate to api.waterdata.usgs.gov OGC API before Q1 2027
# Legacy waterservices.usgs.gov will be decommissioned.
# See: https://www.usgs.gov/tools/usgs-water-data-apis
"""
import json
import logging
import time
from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import USGSConfig
logger = logging.getLogger(__name__)
# Minimum tick interval per USGS guidelines (do not fetch same data more than hourly)
MIN_TICK_SECONDS = 900 # 15 minutes
# Cache for NWS flood stages (rarely change)
_nwps_cache: dict[str, dict] = {}
_nwps_cache_time: dict[str, float] = {}
NWPS_CACHE_TTL = 86400 * 7 # 7 days
class USGSStreamsAdapter:
"""USGS instantaneous values for stream gauge readings with NWS flood stages."""
BASE_URL = "https://waterservices.usgs.gov/nwis/iv/"
NWPS_BASE_URL = "https://api.water.noaa.gov/nwps/v1/gauges"
def __init__(self, config: "USGSConfig"):
self._sites = config.sites or []
self._tick_interval = max(config.tick_seconds or 900, MIN_TICK_SECONDS)
self._flood_thresholds = getattr(config, "flood_thresholds", {}) or {}
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# Site metadata cache (name, flood stages from NWPS)
self._site_metadata: dict[str, dict] = {}
if self._tick_interval < MIN_TICK_SECONDS:
logger.warning(
f"USGS tick_seconds {config.tick_seconds} below minimum, using {MIN_TICK_SECONDS}"
)
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No sites configured
if not self._sites:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _get_site_ids(self) -> list[str]:
"""Extract site IDs from config (handles both string and dict formats)."""
site_ids = []
for site in self._sites:
if isinstance(site, str):
site_ids.append(site)
elif isinstance(site, dict):
site_ids.append(site.get("id", ""))
elif hasattr(site, "id"):
site_ids.append(site.id)
return [s for s in site_ids if s]
def _lookup_nwps_stages(self, usgs_site_id: str) -> Optional[dict]:
"""Lookup flood stages from NWS National Water Prediction Service.
The NWPS API uses NWS gauge IDs which may differ from USGS site IDs.
We try a mapping lookup first, then fall back to direct lookup.
Returns:
dict with action_stage, flood_stage, moderate_flood_stage, major_flood_stage
or None if not available
"""
global _nwps_cache, _nwps_cache_time
# Check cache
now = time.time()
if usgs_site_id in _nwps_cache:
if now - _nwps_cache_time.get(usgs_site_id, 0) < NWPS_CACHE_TTL:
return _nwps_cache[usgs_site_id]
# Try to find NWS gauge ID from USGS site ID
# First, query USGS site info to get the NWS ID crosswalk
nws_gauge_id = self._usgs_to_nws_crosswalk(usgs_site_id)
if not nws_gauge_id:
# Fall back to using USGS ID directly (sometimes they match)
nws_gauge_id = usgs_site_id
# Query NWPS for flood stages
url = f"{self.NWPS_BASE_URL}/{nws_gauge_id}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
# Extract flood stages
stages = {}
flood_info = data.get("flood", {})
if "action" in flood_info:
stages["action_stage"] = flood_info["action"].get("stage")
if "minor" in flood_info:
stages["flood_stage"] = flood_info["minor"].get("stage")
if "moderate" in flood_info:
stages["moderate_flood_stage"] = flood_info["moderate"].get("stage")
if "major" in flood_info:
stages["major_flood_stage"] = flood_info["major"].get("stage")
# Also grab the official name if available
stages["nws_name"] = data.get("name", "")
stages["nws_gauge_id"] = nws_gauge_id
# Cache result
_nwps_cache[usgs_site_id] = stages
_nwps_cache_time[usgs_site_id] = now
logger.info(f"NWPS flood stages for {usgs_site_id}: {stages}")
return stages
except HTTPError as e:
if e.code == 404:
# No NWPS data for this gauge - cache the miss
_nwps_cache[usgs_site_id] = {}
_nwps_cache_time[usgs_site_id] = now
logger.debug(f"No NWPS data for gauge {usgs_site_id}")
else:
logger.warning(f"NWPS lookup failed for {usgs_site_id}: HTTP {e.code}")
return None
except Exception as e:
logger.warning(f"NWPS lookup error for {usgs_site_id}: {e}")
return None
def _usgs_to_nws_crosswalk(self, usgs_site_id: str) -> Optional[str]:
"""Try to find NWS gauge ID from USGS site ID.
The USGS provides a crosswalk in their site metadata, but it's not
always populated. This is a best-effort lookup.
"""
# Try USGS site service for metadata including NWS ID
url = f"https://waterservices.usgs.gov/nwis/site/?format=rdb&sites={usgs_site_id}&siteOutput=expanded"
try:
req = Request(url, headers={"User-Agent": "MeshAI/1.0"})
with urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
# Parse RDB format - look for NWS ID in the data
# This is a simplified parser; full implementation would be more robust
for line in content.split("\n"):
if line.startswith(usgs_site_id):
# NWS station ID is typically in column ~30ish
# This varies by USGS response format
pass
except Exception:
pass
return None
def lookup_site(self, site_id: str) -> dict:
"""Lookup site metadata for config UI auto-populate.
Returns:
{
"site_id": "13090500",
"name": "Snake River nr Twin Falls ID",
"lat": 42.xxx,
"lon": -114.xxx,
"flood_stages": {
"action_stage": 9.0,
"flood_stage": 10.5,
"moderate_flood_stage": 12.0,
"major_flood_stage": 14.0,
} or None
}
"""
result = {"site_id": site_id, "name": None, "lat": None, "lon": None, "flood_stages": None}
# Get USGS site info
params = {
"format": "json",
"sites": site_id,
"siteOutput": "expanded",
}
url = f"https://waterservices.usgs.gov/nwis/site/?{urlencode(params)}"
try:
req = Request(url, headers={"User-Agent": "MeshAI/1.0", "Accept": "application/json"})
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
sites = data.get("value", {}).get("timeSeries", [])
if not sites:
# Try alternate format
sites_list = data.get("value", {}).get("sites", [])
if sites_list:
site_info = sites_list[0]
result["name"] = site_info.get("siteName", "")
result["lat"] = site_info.get("geoLocation", {}).get("geogLocation", {}).get("latitude")
result["lon"] = site_info.get("geoLocation", {}).get("geogLocation", {}).get("longitude")
except Exception as e:
logger.warning(f"USGS site lookup failed for {site_id}: {e}")
# Get NWS flood stages
stages = self._lookup_nwps_stages(site_id)
if stages:
result["flood_stages"] = {
"action_stage": stages.get("action_stage"),
"flood_stage": stages.get("flood_stage"),
"moderate_flood_stage": stages.get("moderate_flood_stage"),
"major_flood_stage": stages.get("major_flood_stage"),
}
if stages.get("nws_name") and not result["name"]:
result["name"] = stages["nws_name"]
return result
def _fetch(self) -> bool:
"""Fetch instantaneous values from USGS Water Services.
Returns:
True if data changed
"""
site_ids = self._get_site_ids()
if not site_ids:
return False
params = {
"format": "json",
"sites": ",".join(site_ids),
"parameterCd": "00060,00065", # Streamflow (cfs) and Gage height (ft)
"siteStatus": "active",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"USGS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"USGS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"USGS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
new_events = []
now = time.time()
try:
time_series = data.get("value", {}).get("timeSeries", [])
for ts in time_series:
source_info = ts.get("sourceInfo", {})
variable = ts.get("variable", {})
values_list = ts.get("values", [])
# Extract site info
site_name = source_info.get("siteName", "Unknown Site")
site_codes = source_info.get("siteCode", [])
site_id = site_codes[0].get("value", "") if site_codes else ""
# Cache site name
if site_id and site_id not in self._site_metadata:
self._site_metadata[site_id] = {"name": site_name}
# Extract location
geo_loc = source_info.get("geoLocation", {}).get("geogLocation", {})
lat = geo_loc.get("latitude")
lon = geo_loc.get("longitude")
# Extract variable info
var_name = variable.get("variableName", "Unknown")
unit_info = variable.get("unit", {})
unit_code = unit_info.get("unitCode", "")
# Determine parameter type
if "Streamflow" in var_name or "00060" in str(variable.get("variableCode", [])):
param_type = "flow"
param_name = "Streamflow"
elif "Gage height" in var_name or "00065" in str(variable.get("variableCode", [])):
param_type = "height"
param_name = "Gage height"
else:
param_type = "other"
param_name = var_name
# Get current value (most recent)
if not values_list or not values_list[0].get("value"):
continue
value_entries = values_list[0].get("value", [])
if not value_entries:
continue
latest = value_entries[-1]
value_str = latest.get("value", "")
timestamp_str = latest.get("dateTime", "")
try:
value = float(value_str)
except (ValueError, TypeError):
continue
# Get flood stages for this site
nwps_stages = self._lookup_nwps_stages(site_id)
# Determine severity based on flood stages (for gage height)
severity = "routine"
flood_status = None
if param_type == "height" and nwps_stages:
major = nwps_stages.get("major_flood_stage")
moderate = nwps_stages.get("moderate_flood_stage")
minor = nwps_stages.get("flood_stage")
action = nwps_stages.get("action_stage")
if major and value >= major:
severity = "immediate"
flood_status = "Major Flood"
elif moderate and value >= moderate:
severity = "priority"
flood_status = "Moderate Flood"
elif minor and value >= minor:
severity = "priority"
flood_status = "Minor Flood"
elif action and value >= action:
severity = "routine"
flood_status = "Action Stage"
# Fall back to legacy manual thresholds
if severity == "info":
threshold = self._flood_thresholds.get(site_id, {}).get(param_type)
if threshold and value > threshold:
severity = "priority"
# Format headline
if param_type == "flow":
headline = f"{site_name}: {value:,.0f} {unit_code}"
else:
headline = f"{site_name}: {value:.1f} {unit_code}"
if flood_status:
headline += f"{flood_status}"
event = {
"source": "usgs",
"event_id": f"{site_id}_{param_type}",
"event_type": "Stream Gauge",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 1800, # 30 min TTL
"fetched_at": now,
"properties": {
"site_id": site_id,
"site_name": site_name,
"parameter": param_name,
"value": value,
"unit": unit_code,
"timestamp": timestamp_str,
"flood_status": flood_status,
"flood_stages": nwps_stages if nwps_stages else None,
},
}
new_events.append(event)
except Exception as e:
logger.warning(f"USGS parse error: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids or len(self._events) != len(new_events)
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"USGS streams updated: {len(new_events)} readings from {len(site_ids)} sites")
return changed
def get_events(self) -> list:
"""Get current stream gauge events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "usgs",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"site_count": len(self._get_site_ids()),
}