feat(env): USGS stream gauges, TomTom traffic, 511 road conditions

This commit is contained in:
K7ZVX 2026-05-12 22:22:57 +00:00
commit f8bf7e5057
16 changed files with 2542 additions and 1183 deletions

366
meshai/env/roads511.py vendored Normal file
View file

@ -0,0 +1,366 @@
"""511 Road Conditions adapter.
Polls a configurable 511 API for road events. The base URL is fully
configurable as each state has a different 511 system.
"""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urljoin
if TYPE_CHECKING:
from ..config import Roads511Config
logger = logging.getLogger(__name__)
class Roads511Adapter:
"""511 road conditions polling adapter."""
def __init__(self, config: "Roads511Config"):
self._api_key = self._resolve_env(config.api_key or "")
self._base_url = (config.base_url or "").rstrip("/")
self._endpoints = config.endpoints or ["/get/event"]
self._bbox = config.bbox or [] # [west, south, east, north]
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._auth_failed = False # Stop retrying on auth failures
if not self._base_url:
logger.info("511: No base URL configured, adapter disabled")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No base URL configured
if not self._base_url:
return False
# Auth failed - don't keep retrying
if self._auth_failed:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch events from all configured endpoints.
Returns:
True if data changed
"""
new_events = []
now = time.time()
for endpoint in self._endpoints:
events = self._fetch_endpoint(endpoint, now)
if events:
new_events.extend(events)
# Apply bbox filter if configured
if self._bbox and len(self._bbox) == 4:
west, south, east, north = self._bbox
new_events = [
e for e in new_events
if e.get("lat") is not None and e.get("lon") is not None
and west <= e["lon"] <= east and south <= e["lat"] <= north
]
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._is_loaded = True
if changed:
logger.info(f"511 road events updated: {len(new_events)} active")
return changed
def _fetch_endpoint(self, endpoint: str, now: float) -> list:
"""Fetch events from a single endpoint.
Args:
endpoint: API endpoint path
now: Current timestamp
Returns:
List of event dicts
"""
url = urljoin(self._base_url + "/", endpoint.lstrip("/"))
# Add API key if configured
if self._api_key:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}key={self._api_key}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(
f"511 auth error: {e.code} - check API key configuration for {self._base_url}"
)
self._last_error = f"Auth error {e.code} - check API key"
self._auth_failed = True
return []
else:
logger.warning(f"511 HTTP error for {endpoint}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return []
except URLError as e:
logger.warning(f"511 connection error for {endpoint}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return []
except Exception as e:
logger.warning(f"511 fetch error for {endpoint}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return []
# Parse response - handle various 511 API formats
return self._parse_response(data, now)
def _parse_response(self, data, now: float) -> list:
"""Parse 511 API response.
Different states use different formats. Try common patterns.
Args:
data: JSON response data
now: Current timestamp
Returns:
List of event dicts
"""
events = []
# Handle array response
if isinstance(data, list):
items = data
# Handle wrapped response
elif isinstance(data, dict):
# Try common wrapper keys
items = (
data.get("events") or
data.get("items") or
data.get("data") or
data.get("results") or
[]
)
if not isinstance(items, list):
items = [data] if self._looks_like_event(data) else []
else:
return []
for item in items:
event = self._parse_event(item, now)
if event:
events.append(event)
self._consecutive_errors = 0
self._last_error = None
return events
def _looks_like_event(self, item: dict) -> bool:
"""Check if dict looks like a 511 event."""
return bool(
item.get("id") or item.get("EventId") or item.get("event_id")
)
def _parse_event(self, item: dict, now: float) -> dict:
"""Parse a single 511 event.
Args:
item: Event dict from API
now: Current timestamp
Returns:
Normalized event dict or None
"""
try:
# Try various ID field names
event_id = (
item.get("id") or
item.get("EventId") or
item.get("event_id") or
item.get("ID") or
str(hash(str(item)))[:12]
)
# Try various type field names
event_type = (
item.get("EventType") or
item.get("event_type") or
item.get("type") or
item.get("Type") or
item.get("category") or
"Road Event"
)
# Try various road name fields
roadway = (
item.get("RoadwayName") or
item.get("roadway_name") or
item.get("roadway") or
item.get("Roadway") or
item.get("road") or
item.get("route") or
""
)
# Try various description fields
description = (
item.get("Description") or
item.get("description") or
item.get("message") or
item.get("Message") or
item.get("details") or
""
)
# Try various location fields
lat = (
item.get("Latitude") or
item.get("latitude") or
item.get("lat") or
item.get("StartLatitude") or
None
)
lon = (
item.get("Longitude") or
item.get("longitude") or
item.get("lon") or
item.get("lng") or
item.get("StartLongitude") or
None
)
# Try to get coordinates from nested location object
if lat is None and "location" in item:
loc = item["location"]
if isinstance(loc, dict):
lat = loc.get("latitude") or loc.get("lat")
lon = loc.get("longitude") or loc.get("lon") or loc.get("lng")
# Check closure status
is_closure = (
item.get("IsFullClosure") or
item.get("is_full_closure") or
item.get("fullClosure") or
item.get("closed") or
"closure" in str(event_type).lower() or
"closed" in str(description).lower()
)
# Determine severity
if is_closure:
severity = "warning"
elif "construction" in str(event_type).lower():
severity = "advisory"
elif "incident" in str(event_type).lower():
severity = "advisory"
else:
severity = "info"
# Format headline
if roadway and description:
headline = f"{roadway}: {description[:100]}"
elif roadway:
headline = f"{roadway}: {event_type}"
elif description:
headline = description[:120]
else:
headline = f"{event_type}"
# Try to get timestamp for expiry
last_updated = (
item.get("LastUpdated") or
item.get("last_updated") or
item.get("updated") or
item.get("timestamp") or
None
)
# Default 6 hour TTL, refreshed every tick
expires = now + 21600
event = {
"source": "511",
"event_id": f"511_{event_id}",
"event_type": event_type,
"headline": headline,
"description": description[:500] if description else "",
"severity": severity,
"lat": float(lat) if lat is not None else None,
"lon": float(lon) if lon is not None else None,
"expires": expires,
"fetched_at": now,
"properties": {
"roadway": roadway,
"is_closure": bool(is_closure),
"last_updated": last_updated,
},
}
return event
except Exception as e:
logger.debug(f"511 event parse error: {e} - item: {item}")
return None
def get_events(self) -> list:
"""Get current road events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "511",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"auth_failed": self._auth_failed,
}

429
meshai/env/store.py vendored
View file

@ -1,198 +1,231 @@
"""Environmental data store with tick-based adapter polling."""
import logging
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import EnvironmentalConfig
logger = logging.getLogger(__name__)
class EnvironmentalStore:
"""Cache and tick-driver for all environmental feed adapters."""
def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None):
self._adapters = {} # name -> adapter instance
self._events = {} # (source, event_id) -> event dict
self._swpc_status = {} # Kp/SFI/scales snapshot
self._ducting_status = {} # tropo ducting assessment
self._mesh_zones = config.nws_zones or []
self._region_anchors = region_anchors or []
# Create adapter instances based on config
if config.nws.enabled:
from .nws import NWSAlertsAdapter
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
if config.swpc.enabled:
from .swpc import SWPCAdapter
self._adapters["swpc"] = SWPCAdapter(config.swpc)
if config.ducting.enabled:
from .ducting import DuctingAdapter
self._adapters["ducting"] = DuctingAdapter(config.ducting)
if config.fires.enabled:
from .fires import NICFFiresAdapter
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
if config.avalanche.enabled:
from .avalanche import AvalancheAdapter
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
def refresh(self) -> bool:
"""Called every second from main loop. Ticks each adapter.
Returns:
True if any data changed
"""
changed = False
for name, adapter in self._adapters.items():
try:
if adapter.tick():
changed = True
self._ingest(name, adapter)
except Exception as e:
logger.warning("Env adapter %s error: %s", name, e)
self._purge_expired()
return changed
def _ingest(self, name: str, adapter):
"""Ingest data from an adapter after it ticks."""
if name == "swpc":
self._swpc_status = adapter.get_status()
# Also ingest any alert events (R-scale >= 3)
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
elif name == "ducting":
self._ducting_status = adapter.get_status()
else:
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
def _purge_expired(self):
"""Remove expired events."""
now = time.time()
expired = [
k for k, v in self._events.items()
if v.get("expires") and v["expires"] < now
]
for k in expired:
del self._events[k]
def get_active(self, source: str = None) -> list:
"""Get active events, optionally filtered by source.
Args:
source: Filter to specific source (nws, swpc, etc.)
Returns:
List of event dicts sorted by fetched_at (newest first)
"""
events = list(self._events.values())
if source:
events = [e for e in events if e["source"] == source]
return sorted(events, key=lambda e: e.get("fetched_at", 0), reverse=True)
def get_for_zones(self, zones: list) -> list:
"""Get events affecting specific NWS zones.
Args:
zones: List of UGC zone codes (e.g., ["IDZ016", "IDZ030"])
Returns:
List of events with overlapping zone coverage
"""
zone_set = set(zones)
return [
e for e in self._events.values()
if set(e.get("areas", [])) & zone_set
]
def get_swpc_status(self) -> dict:
"""Get current SWPC space weather status."""
return self._swpc_status
def get_ducting_status(self) -> dict:
"""Get current tropospheric ducting status."""
return self._ducting_status
def get_rf_propagation(self) -> dict:
"""Combined HF + UHF propagation summary for dashboard/LLM."""
return {
"hf": self._swpc_status,
"uhf_ducting": self._ducting_status,
}
def get_summary(self) -> str:
"""Compact text block for LLM context injection."""
lines = []
lines.append(f"### Current Conditions (as of {time.strftime('%H:%M:%S MT')}):")
# NWS alerts
nws = self.get_active(source="nws")
if nws:
lines.append(f"NWS: {len(nws)} active alert(s):")
for a in nws[:3]:
lines.append(f" - {a['event_type']}: {a['headline'][:120]}")
else:
lines.append("NWS: No active alerts for mesh area.")
# Space weather indices (raw - LLM interprets)
s = self._swpc_status
if s:
kp = s.get("kp_current", "?")
sfi = s.get("sfi", "?")
r = s.get("r_scale", 0)
g = s.get("g_scale", 0)
lines.append(f"Space Weather: SFI {sfi}, Kp {kp}, R{r}/G{g}")
warnings = s.get("active_warnings", [])
if warnings:
for w in warnings[:2]:
lines.append(f" Warning: {w}")
else:
lines.append("Space Weather: Data not available.")
# Tropospheric ducting (raw - LLM interprets)
d = self._ducting_status
if d:
condition = d.get("condition", "unknown")
gradient = d.get("min_gradient", "?")
if condition == "normal":
lines.append(f"Tropospheric: Normal (dM/dz {gradient} M-units/km)")
else:
thickness = d.get("duct_thickness_m", "?")
lines.append(f"Tropospheric: {condition.replace('_', ' ').title()}")
lines.append(f" dM/dz: {gradient} M-units/km, duct ~{thickness}m thick")
# Active fires
fires = self.get_active(source="nifc")
if fires:
lines.append(f"Wildfires: {len(fires)} active")
for f in fires[:2]:
name = f.get("name", "Unknown")
acres = f.get("acres", 0)
pct = f.get("pct_contained", 0)
dist = f.get("distance_km")
lines.append(f" - {name}: {int(acres):,} ac, {int(pct)}% contained" +
(f" ({int(dist)} km)" if dist else ""))
# Avalanche advisories
avy = self.get_active(source="avalanche")
if avy:
lines.append(f"Avalanche: {len(avy)} zone(s) with advisories")
for a in avy[:2]:
zone = a.get("zone_name", "Unknown")
danger = a.get("danger_name", "Unknown")
lines.append(f" - {zone}: {danger}")
return "\n".join(lines)
def get_source_health(self) -> list:
"""Get health status for all adapters."""
return [a.health_status for a in self._adapters.values()]
"""Environmental data store with tick-based adapter polling."""
import logging
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import EnvironmentalConfig
logger = logging.getLogger(__name__)
class EnvironmentalStore:
"""Cache and tick-driver for all environmental feed adapters."""
def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None):
self._adapters = {} # name -> adapter instance
self._events = {} # (source, event_id) -> event dict
self._swpc_status = {} # Kp/SFI/scales snapshot
self._ducting_status = {} # tropo ducting assessment
self._mesh_zones = config.nws_zones or []
self._region_anchors = region_anchors or []
# Create adapter instances based on config
if config.nws.enabled:
from .nws import NWSAlertsAdapter
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
if config.swpc.enabled:
from .swpc import SWPCAdapter
self._adapters["swpc"] = SWPCAdapter(config.swpc)
if config.ducting.enabled:
from .ducting import DuctingAdapter
self._adapters["ducting"] = DuctingAdapter(config.ducting)
if config.fires.enabled:
from .fires import NICFFiresAdapter
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
if config.avalanche.enabled:
from .avalanche import AvalancheAdapter
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
if config.usgs.enabled:
from .usgs import USGSStreamsAdapter
self._adapters["usgs"] = USGSStreamsAdapter(config.usgs)
if config.traffic.enabled:
from .traffic import TomTomTrafficAdapter
self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic)
if config.roads511.enabled:
from .roads511 import Roads511Adapter
self._adapters["roads511"] = Roads511Adapter(config.roads511)
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
def refresh(self) -> bool:
"""Called every second from main loop. Ticks each adapter.
Returns:
True if any data changed
"""
changed = False
for name, adapter in self._adapters.items():
try:
if adapter.tick():
changed = True
self._ingest(name, adapter)
except Exception as e:
logger.warning("Env adapter %s error: %s", name, e)
self._purge_expired()
return changed
def _ingest(self, name: str, adapter):
"""Ingest data from an adapter after it ticks."""
if name == "swpc":
self._swpc_status = adapter.get_status()
# Also ingest any alert events (R-scale >= 3)
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
elif name == "ducting":
self._ducting_status = adapter.get_status()
else:
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
def _purge_expired(self):
"""Remove expired events."""
now = time.time()
expired = [
k for k, v in self._events.items()
if v.get("expires") and v["expires"] < now
]
for k in expired:
del self._events[k]
def get_active(self, source: str = None) -> list:
"""Get active events, optionally filtered by source.
Args:
source: Filter to specific source (nws, swpc, etc.)
Returns:
List of event dicts sorted by fetched_at (newest first)
"""
events = list(self._events.values())
if source:
events = [e for e in events if e["source"] == source]
return sorted(events, key=lambda e: e.get("fetched_at", 0), reverse=True)
def get_for_zones(self, zones: list) -> list:
"""Get events affecting specific NWS zones.
Args:
zones: List of UGC zone codes (e.g., ["IDZ016", "IDZ030"])
Returns:
List of events with overlapping zone coverage
"""
zone_set = set(zones)
return [
e for e in self._events.values()
if set(e.get("areas", [])) & zone_set
]
def get_swpc_status(self) -> dict:
"""Get current SWPC space weather status."""
return self._swpc_status
def get_ducting_status(self) -> dict:
"""Get current tropospheric ducting status."""
return self._ducting_status
def get_rf_propagation(self) -> dict:
"""Combined HF + UHF propagation summary for dashboard/LLM."""
return {
"hf": self._swpc_status,
"uhf_ducting": self._ducting_status,
}
def get_summary(self) -> str:
"""Compact text block for LLM context injection."""
lines = []
lines.append(f"### Current Conditions (as of {time.strftime('%H:%M:%S MT')}):")
# NWS alerts
nws = self.get_active(source="nws")
if nws:
lines.append(f"NWS: {len(nws)} active alert(s):")
for a in nws[:3]:
lines.append(f" - {a['event_type']}: {a['headline'][:120]}")
else:
lines.append("NWS: No active alerts for mesh area.")
# Space weather indices (raw - LLM interprets)
s = self._swpc_status
if s:
kp = s.get("kp_current", "?")
sfi = s.get("sfi", "?")
r = s.get("r_scale", 0)
g = s.get("g_scale", 0)
lines.append(f"Space Weather: SFI {sfi}, Kp {kp}, R{r}/G{g}")
warnings = s.get("active_warnings", [])
if warnings:
for w in warnings[:2]:
lines.append(f" Warning: {w}")
else:
lines.append("Space Weather: Data not available.")
# Tropospheric ducting (raw - LLM interprets)
d = self._ducting_status
if d:
condition = d.get("condition", "unknown")
gradient = d.get("min_gradient", "?")
if condition == "normal":
lines.append(f"Tropospheric: Normal (dM/dz {gradient} M-units/km)")
else:
thickness = d.get("duct_thickness_m", "?")
lines.append(f"Tropospheric: {condition.replace('_', ' ').title()}")
lines.append(f" dM/dz: {gradient} M-units/km, duct ~{thickness}m thick")
# Active fires
fires = self.get_active(source="nifc")
if fires:
lines.append(f"Wildfires: {len(fires)} active")
for f in fires[:2]:
name = f.get("name", "Unknown")
acres = f.get("acres", 0)
pct = f.get("pct_contained", 0)
dist = f.get("distance_km")
lines.append(f" - {name}: {int(acres):,} ac, {int(pct)}% contained" +
(f" ({int(dist)} km)" if dist else ""))
# Avalanche advisories
avy = self.get_active(source="avalanche")
if avy:
lines.append(f"Avalanche: {len(avy)} zone(s) with advisories")
for a in avy[:2]:
zone = a.get("zone_name", "Unknown")
danger = a.get("danger_name", "Unknown")
lines.append(f" - {zone}: {danger}")
# Stream gauges
streams = self.get_active(source="usgs")
if streams:
lines.append(f"Stream Gauges: {len(streams)} readings")
for s in streams[:2]:
lines.append(f" - {s['headline']}")
# Traffic flow
traffic = self.get_active(source="traffic")
if traffic:
lines.append(f"Traffic: {len(traffic)} corridors")
for t in traffic[:2]:
lines.append(f" - {t['headline']}")
# 511 road events
roads = self.get_active(source="511")
if roads:
lines.append(f"Road Events: {len(roads)} active")
for r in roads[:2]:
lines.append(f" - {r['headline'][:60]}")
return "\n".join(lines)
def get_source_health(self) -> list:
"""Get health status for all adapters."""
return [a.health_status for a in self._adapters.values()]

254
meshai/env/traffic.py vendored Normal file
View file

@ -0,0 +1,254 @@
"""TomTom Traffic Flow adapter."""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import TomTomConfig
logger = logging.getLogger(__name__)
class TomTomTrafficAdapter:
"""TomTom Traffic Flow Segment Data polling."""
BASE_URL = "https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/10/json"
def __init__(self, config: "TomTomConfig"):
self._api_key = self._resolve_env(config.api_key or "")
self._corridors = config.corridors or []
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._daily_requests = 0
self._daily_reset = 0.0
if not self._api_key:
logger.warning("TomTom API key not configured, adapter disabled")
if not self._corridors:
logger.info("TomTom: No corridors configured, adapter idle")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Reset daily counter at midnight
if now - self._daily_reset > 86400:
self._daily_requests = 0
self._daily_reset = now
# No API key or corridors
if not self._api_key or not self._corridors:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch traffic flow for all configured corridors.
Returns:
True if data changed
"""
new_events = []
now = time.time()
any_error = False
for corridor in self._corridors:
# Support both dict and object formats
if isinstance(corridor, dict):
name = corridor.get("name", "Unknown")
lat = corridor.get("lat")
lon = corridor.get("lon")
else:
name = getattr(corridor, "name", "Unknown")
lat = getattr(corridor, "lat", None)
lon = getattr(corridor, "lon", None)
if lat is None or lon is None:
continue
event = self._fetch_point(name, lat, lon, now)
if event:
new_events.append(event)
else:
any_error = True
if any_error and not new_events:
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
if not any_error:
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"TomTom traffic updated: {len(new_events)} corridors")
return changed
def _fetch_point(self, name: str, lat: float, lon: float, now: float) -> dict:
"""Fetch traffic flow for a single point.
Args:
name: Corridor name
lat: Latitude
lon: Longitude
now: Current timestamp
Returns:
Event dict or None on error
"""
params = {
"point": f"{lat},{lon}",
"key": self._api_key,
"unit": "MPH",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
self._daily_requests += 1
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(f"TomTom auth error: {e.code} - check API key")
self._last_error = f"Auth error {e.code}"
else:
logger.warning(f"TomTom HTTP error for {name}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return None
except URLError as e:
logger.warning(f"TomTom connection error for {name}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return None
except Exception as e:
logger.warning(f"TomTom fetch error for {name}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return None
# Parse response
try:
flow = data.get("flowSegmentData", {})
current_speed = flow.get("currentSpeed", 0)
free_flow_speed = flow.get("freeFlowSpeed", 0)
current_time = flow.get("currentTravelTime", 0)
free_flow_time = flow.get("freeFlowTravelTime", 0)
confidence = flow.get("confidence", 0)
road_closure = flow.get("roadClosure", False)
# Calculate speed ratio for severity
if free_flow_speed > 0:
ratio = current_speed / free_flow_speed
else:
ratio = 1.0
# Determine severity
if road_closure:
severity = "warning"
elif ratio >= 0.8:
severity = "info"
elif ratio >= 0.5:
severity = "advisory"
else:
severity = "warning"
# Format headline
if road_closure:
headline = f"{name}: CLOSED"
else:
pct = int(ratio * 100)
headline = f"{name}: {int(current_speed)}mph ({pct}% of free flow)"
event = {
"source": "traffic",
"event_id": f"traffic_{name.replace(' ', '_').lower()}",
"event_type": "Traffic Flow",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 600, # 10 min TTL
"fetched_at": now,
"properties": {
"corridor": name,
"currentSpeed": current_speed,
"freeFlowSpeed": free_flow_speed,
"speedRatio": ratio,
"currentTravelTime": current_time,
"freeFlowTravelTime": free_flow_time,
"confidence": confidence,
"roadClosure": road_closure,
},
}
return event
except Exception as e:
logger.warning(f"TomTom parse error for {name}: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return None
def get_events(self) -> list:
"""Get current traffic events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "traffic",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"corridor_count": len(self._corridors),
"daily_requests": self._daily_requests,
}

232
meshai/env/usgs.py vendored Normal file
View file

@ -0,0 +1,232 @@
"""USGS Water Services stream gauge adapter.
# TODO: Migrate to api.waterdata.usgs.gov OGC API before Q1 2027
# Legacy waterservices.usgs.gov will be decommissioned.
# See: https://www.usgs.gov/tools/usgs-water-data-apis
"""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import USGSConfig
logger = logging.getLogger(__name__)
# Minimum tick interval per USGS guidelines (do not fetch same data more than hourly)
MIN_TICK_SECONDS = 900 # 15 minutes
class USGSStreamsAdapter:
"""USGS instantaneous values for stream gauge readings."""
BASE_URL = "https://waterservices.usgs.gov/nwis/iv/"
def __init__(self, config: "USGSConfig"):
self._sites = config.sites or []
self._tick_interval = max(config.tick_seconds or 900, MIN_TICK_SECONDS)
self._flood_thresholds = getattr(config, "flood_thresholds", {}) or {}
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
if self._tick_interval < MIN_TICK_SECONDS:
logger.warning(
f"USGS tick_seconds {config.tick_seconds} below minimum, using {MIN_TICK_SECONDS}"
)
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No sites configured
if not self._sites:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _fetch(self) -> bool:
"""Fetch instantaneous values from USGS Water Services.
Returns:
True if data changed
"""
params = {
"format": "json",
"sites": ",".join(self._sites),
"parameterCd": "00060,00065", # Streamflow (cfs) and Gage height (ft)
"siteStatus": "active",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"USGS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"USGS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"USGS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
new_events = []
now = time.time()
try:
time_series = data.get("value", {}).get("timeSeries", [])
for ts in time_series:
source_info = ts.get("sourceInfo", {})
variable = ts.get("variable", {})
values_list = ts.get("values", [])
# Extract site info
site_name = source_info.get("siteName", "Unknown Site")
site_codes = source_info.get("siteCode", [])
site_id = site_codes[0].get("value", "") if site_codes else ""
# Extract location
geo_loc = source_info.get("geoLocation", {}).get("geogLocation", {})
lat = geo_loc.get("latitude")
lon = geo_loc.get("longitude")
# Extract variable info
var_name = variable.get("variableName", "Unknown")
unit_info = variable.get("unit", {})
unit_code = unit_info.get("unitCode", "")
# Determine parameter type
if "Streamflow" in var_name or "00060" in str(variable.get("variableCode", [])):
param_type = "flow"
param_name = "Streamflow"
elif "Gage height" in var_name or "00065" in str(variable.get("variableCode", [])):
param_type = "height"
param_name = "Gage height"
else:
param_type = "other"
param_name = var_name
# Get current value (most recent)
if not values_list or not values_list[0].get("value"):
continue
value_entries = values_list[0].get("value", [])
if not value_entries:
continue
latest = value_entries[-1]
value_str = latest.get("value", "")
timestamp_str = latest.get("dateTime", "")
try:
value = float(value_str)
except (ValueError, TypeError):
continue
# Check flood threshold
severity = "info"
threshold = self._flood_thresholds.get(site_id, {}).get(param_type)
if threshold and value > threshold:
severity = "warning"
# Format headline
if param_type == "flow":
headline = f"{site_name}: {value:,.0f} {unit_code}"
else:
headline = f"{site_name}: {value:.1f} {unit_code}"
event = {
"source": "usgs",
"event_id": f"{site_id}_{param_type}",
"event_type": "Stream Gauge",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 1800, # 30 min TTL
"fetched_at": now,
"properties": {
"site_id": site_id,
"site_name": site_name,
"parameter": param_name,
"value": value,
"unit": unit_code,
"timestamp": timestamp_str,
},
}
new_events.append(event)
except Exception as e:
logger.warning(f"USGS parse error: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids or len(self._events) != len(new_events)
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"USGS streams updated: {len(new_events)} readings from {len(self._sites)} sites")
return changed
def get_events(self) -> list:
"""Get current stream gauge events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "usgs",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"site_count": len(self._sites),
}