Merge origin/feature/mesh-intelligence into feature/mesh-intelligence

Merged remote changes with local notification verification system:
- Kept local: channels.py, router.py, notification_routes.py, Notifications.tsx
  (contains the new end-to-end verification system)
- Accepted remote: Config, Environment, Reference pages, new commands,
  categories, summarizer, and other supporting files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zvx-echo6 2026-05-13 18:41:36 -06:00
commit 5b78e38d2e
43 changed files with 8966 additions and 4183 deletions

File diff suppressed because it is too large Load diff

View file

@ -162,6 +162,7 @@ def create_dispatcher(
health_engine=None,
subscription_manager=None,
env_store=None,
notification_router=None,
) -> CommandDispatcher:
"""Create and populate command dispatcher with default commands.
@ -224,24 +225,24 @@ def create_dispatcher(
dispatcher.register(alias_handler)
# Register subscription commands
sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store)
sub_cmd = SubCommand(subscription_manager, mesh_reporter, data_store, notification_router)
dispatcher.register(sub_cmd)
for alias in getattr(sub_cmd, 'aliases', []):
alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store)
alias_handler = SubCommand(subscription_manager, mesh_reporter, data_store, notification_router)
alias_handler.name = alias
dispatcher.register(alias_handler)
unsub_cmd = UnsubCommand(subscription_manager)
unsub_cmd = UnsubCommand(subscription_manager, notification_router)
dispatcher.register(unsub_cmd)
for alias in getattr(unsub_cmd, 'aliases', []):
alias_handler = UnsubCommand(subscription_manager)
alias_handler = UnsubCommand(subscription_manager, notification_router)
alias_handler.name = alias
dispatcher.register(alias_handler)
mysubs_cmd = MySubsCommand(subscription_manager)
mysubs_cmd = MySubsCommand(subscription_manager, notification_router)
dispatcher.register(mysubs_cmd)
for alias in getattr(mysubs_cmd, 'aliases', []):
alias_handler = MySubsCommand(subscription_manager)
alias_handler = MySubsCommand(subscription_manager, notification_router)
alias_handler.name = alias
dispatcher.register(alias_handler)
@ -281,6 +282,33 @@ def create_dispatcher(
avalanche_cmd.name = "avalanche"
dispatcher.register(avalanche_cmd)
# Register streams command
from .streams_cmd import StreamsCommand
streams_cmd = StreamsCommand(env_store)
dispatcher.register(streams_cmd)
for alias in getattr(streams_cmd, 'aliases', []):
alias_handler = StreamsCommand(env_store)
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register roads command
from .roads_cmd import RoadsCommand
roads_cmd = RoadsCommand(env_store)
dispatcher.register(roads_cmd)
for alias in getattr(roads_cmd, 'aliases', []):
alias_handler = RoadsCommand(env_store)
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register hotspots command (NASA FIRMS satellite fire detection)
from .hotspots_cmd import HotspotsCommand
hotspots_cmd = HotspotsCommand(env_store)
dispatcher.register(hotspots_cmd)
for alias in getattr(hotspots_cmd, 'aliases', []):
alias_handler = HotspotsCommand(env_store)
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register custom commands
if custom_commands:
for name, response in custom_commands.items():

View file

@ -0,0 +1,100 @@
"""Satellite fire hotspot command."""
from .base import CommandContext, CommandHandler
class HotspotsCommand(CommandHandler):
"""Show NASA FIRMS satellite fire hotspot data."""
aliases = ["satellite", "ignitions"]
def __init__(self, env_store):
self._env_store = env_store
self._name = "hotspots"
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return "Show satellite fire hotspots"
@property
def usage(self) -> str:
return "!hotspots [--new]"
async def execute(self, args: str, context: CommandContext) -> str:
if not self._env_store:
return "Environmental feeds not configured."
# Check for --new flag
new_only = "--new" in args.lower() or "new" in args.lower().split()
# Get FIRMS adapter
firms_adapter = getattr(self._env_store, "_firms", None)
if not firms_adapter:
return "Satellite hotspot monitoring not configured."
if not firms_adapter._is_loaded:
return "Satellite data not yet loaded. Try again shortly."
if firms_adapter._consecutive_errors >= 999:
return "Satellite monitoring disabled (invalid API key)."
# Get events
if new_only:
events = firms_adapter.get_new_ignitions()
title = "NEW IGNITIONS"
else:
events = firms_adapter.get_events()
title = "FIRE HOTSPOTS"
if not events:
if new_only:
return "No new ignitions detected. All hotspots near known fires."
return "No satellite fire hotspots detected in monitored area."
# Build response
lines = [f"{title} ({len(events)}):"]
# Sort by severity (warning > watch > advisory) then by FRP
severity_order = {"warning": 0, "watch": 1, "advisory": 2}
sorted_events = sorted(
events,
key=lambda e: (
severity_order.get(e.get("severity", "advisory"), 3),
-(e.get("properties", {}).get("frp") or 0),
),
)
for event in sorted_events[:8]: # Limit for mesh
props = event.get("properties", {})
severity = event.get("severity", "advisory").upper()[:1] # W/A
# Format line
line = f"[{severity}] {event.get('headline', 'Unknown')}"
# Add confidence and FRP if available
details = []
if props.get("confidence"):
details.append(f"conf:{props['confidence']}")
if props.get("frp"):
details.append(f"{int(props['frp'])}MW")
if props.get("acq_time"):
details.append(f"@{props['acq_time']}Z")
if details:
line += f" ({', '.join(details)})"
lines.append(line)
if len(events) > 8:
lines.append(f"...and {len(events) - 8} more")
return "\n".join(lines)

File diff suppressed because it is too large Load diff

View file

@ -1,85 +1,99 @@
"""Alert API routes."""
from fastapi import APIRouter, Request
router = APIRouter(tags=["alerts"])
@router.get("/alerts/active")
async def get_active_alerts(request: Request):
"""Get currently active alerts."""
alert_engine = request.app.state.alert_engine
if not alert_engine:
return []
# Get recent alerts from alert engine if it has internal state
alerts = []
# Check for AlertState or similar if available
if hasattr(alert_engine, "get_active_alerts"):
try:
raw_alerts = alert_engine.get_active_alerts()
for alert in raw_alerts:
alerts.append({
"type": alert.get("type", "unknown"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": alert.get("timestamp"),
"scope_type": alert.get("scope_type"),
"scope_value": alert.get("scope_value"),
})
except Exception:
pass
elif hasattr(alert_engine, "_recent_alerts"):
try:
for alert in alert_engine._recent_alerts:
alerts.append({
"type": alert.get("type", "unknown"),
"severity": alert.get("severity", "info"),
"message": alert.get("message", ""),
"timestamp": alert.get("timestamp"),
})
except Exception:
pass
return alerts
@router.get("/alerts/history")
async def get_alert_history(
request: Request,
limit: int = 50,
offset: int = 0,
):
"""Get historical alerts with pagination."""
# Historical alert data would come from SQLite
# For now, return empty list
return []
@router.get("/subscriptions")
async def get_subscriptions(request: Request):
"""Get all alert subscriptions."""
subscription_manager = request.app.state.subscription_manager
if not subscription_manager:
return []
try:
subs = subscription_manager.get_all_subs()
return [
{
"id": sub["id"],
"user_id": sub["user_id"],
"sub_type": sub["sub_type"],
"schedule_time": sub.get("schedule_time"),
"schedule_day": sub.get("schedule_day"),
"scope_type": sub.get("scope_type", "mesh"),
"scope_value": sub.get("scope_value"),
"enabled": sub.get("enabled", 1) == 1,
}
for sub in subs
]
except Exception:
return []
"""Alert API routes."""
from fastapi import APIRouter, Request, Query
from typing import Optional
router = APIRouter(tags=["alerts"])
@router.get("/alerts/active")
async def get_active_alerts(request: Request):
"""Get currently active alerts."""
alert_engine = getattr(request.app.state, "alert_engine", None)
if not alert_engine:
return []
alerts = []
# Try get_pending_alerts first (our method)
if hasattr(alert_engine, "get_pending_alerts"):
try:
raw_alerts = alert_engine.get_pending_alerts()
for alert in raw_alerts:
alerts.append({
"type": alert.get("type", "unknown"),
"severity": _map_severity(alert),
"message": alert.get("message", ""),
"timestamp": alert.get("timestamp"),
"scope_type": alert.get("scope_type"),
"scope_value": alert.get("scope_value"),
})
except Exception:
pass
return alerts
@router.get("/alerts/history")
async def get_alert_history(
request: Request,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
type: Optional[str] = Query(None),
severity: Optional[str] = Query(None),
):
"""Get historical alerts with pagination and filtering.
Note: Alert history persistence is not yet implemented.
Returns empty array for now.
"""
# Future: Query SQLite for historical alerts
# For now, return empty with proper structure
return {
"items": [],
"total": 0,
}
@router.get("/subscriptions")
async def get_subscriptions(request: Request):
"""Get all alert subscriptions."""
subscription_manager = getattr(request.app.state, "subscription_manager", None)
if not subscription_manager:
return []
try:
subs = subscription_manager.get_all_subs()
return [
{
"id": sub["id"],
"user_id": sub["user_id"],
"sub_type": sub["sub_type"],
"schedule_time": sub.get("schedule_time"),
"schedule_day": sub.get("schedule_day"),
"scope_type": sub.get("scope_type", "mesh"),
"scope_value": sub.get("scope_value"),
"enabled": sub.get("enabled", 1) == 1,
}
for sub in subs
]
except Exception:
return []
def _map_severity(alert: dict) -> str:
"""Map alert properties to severity level."""
if alert.get("is_critical"):
return "critical"
alert_type = alert.get("type", "")
if "emergency" in alert_type:
return "emergency"
if "critical" in alert_type:
return "critical"
if "warning" in alert_type:
return "warning"
if "watch" in alert_type:
return "watch"
return "info"

View file

@ -26,7 +26,9 @@ RESTART_REQUIRED_SECTIONS = {
}
# Valid config section names
VALID_SECTIONS = {
VALID_SECTIONS = {
"notifications",
"environmental",
"bot",
"connection",
"response",

View file

@ -106,3 +106,87 @@ async def get_avalanche_data(request: Request):
"off_season": False,
"advisories": env_store.get_active(source="avalanche"),
}
@router.get("/env/streams")
async def get_streams_data(request: Request):
"""Get USGS stream gauge readings."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return []
return env_store.get_active(source="usgs")
@router.get("/env/usgs/lookup/{site_id}")
async def lookup_usgs_site(request: Request, site_id: str):
"""Lookup USGS site metadata and NWS flood stages.
Returns site name, location, and flood stage thresholds from NWS NWPS.
Used by the config UI to auto-populate fields when adding a new gauge.
"""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return {"error": "Environmental feeds not enabled"}
adapters = getattr(env_store, "_adapters", {})
usgs_adapter = adapters.get("usgs")
if not usgs_adapter:
# Create a temporary adapter for lookup
from meshai.env.usgs import USGSStreamsAdapter
from meshai.config import USGSConfig
usgs_adapter = USGSStreamsAdapter(USGSConfig())
try:
result = usgs_adapter.lookup_site(site_id)
return result
except Exception as e:
return {"error": str(e), "site_id": site_id}
@router.get("/env/traffic")
async def get_traffic_data(request: Request):
"""Get TomTom traffic flow data."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return []
return env_store.get_active(source="traffic")
@router.get("/env/roads")
async def get_roads_data(request: Request):
"""Get 511 road conditions."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return []
return env_store.get_active(source="511")
@router.get("/env/hotspots")
async def get_hotspots_data(request: Request):
"""Get NASA FIRMS satellite fire hotspots."""
env_store = getattr(request.app.state, "env_store", None)
if not env_store:
return {"hotspots": [], "new_ignitions": 0}
firms_adapter = getattr(env_store, "_firms", None)
if not firms_adapter:
return {"hotspots": [], "new_ignitions": 0, "enabled": False}
hotspots = env_store.get_active(source="firms")
new_ignitions = [h for h in hotspots if h.get("properties", {}).get("new_ignition")]
return {
"enabled": True,
"hotspots": hotspots,
"new_ignitions": len(new_ignitions),
}

View file

@ -1,356 +1,409 @@
"""Mesh health and node API routes."""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, Request
router = APIRouter(tags=["mesh"])
def _serialize_health_score(score) -> dict:
"""Serialize a HealthScore object."""
return {
"composite": round(score.composite, 1),
"tier": score.tier,
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"solar_index": round(score.solar_index, 1),
}
def _serialize_region(region) -> dict:
"""Serialize a RegionHealth object."""
return {
"name": region.name,
"center_lat": region.center_lat,
"center_lon": region.center_lon,
"node_count": len(region.node_ids),
"locality_count": len(region.localities),
"score": _serialize_health_score(region.score),
"node_ids": region.node_ids,
}
def _format_timestamp(ts: Optional[float]) -> Optional[str]:
"""Format a Unix timestamp as ISO string."""
if not ts or ts <= 0:
return None
try:
return datetime.fromtimestamp(ts).isoformat()
except (ValueError, OSError):
return None
@router.get("/health")
async def get_health(request: Request):
"""Get mesh health data."""
health_engine = request.app.state.health_engine
if not health_engine or not health_engine.mesh_health:
return {
"score": 0,
"tier": "Unknown",
"message": "Health engine not ready",
}
health = health_engine.mesh_health
score = health.score
return {
"score": round(score.composite, 1),
"tier": score.tier,
"pillars": {
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
},
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"total_nodes": health.total_nodes,
"total_regions": health.total_regions,
"unlocated_count": len(health.unlocated_nodes),
"last_computed": _format_timestamp(health.last_computed),
"recommendations": [], # TODO: Add recommendations
}
@router.get("/nodes")
async def get_nodes(request: Request):
"""Get all nodes."""
data_store = request.app.state.data_store
health_engine = request.app.state.health_engine
if not data_store:
return []
try:
raw_nodes = data_store.get_all_nodes()
except Exception:
return []
nodes = []
for node in raw_nodes:
# Extract node_num from various formats
node_num = node.get("nodeNum") or node.get("num") or node.get("node_num")
if node_num is None:
node_id = node.get("node_id") or node.get("id")
if node_id and isinstance(node_id, str):
try:
node_num = int(node_id.lstrip("!"), 16)
except ValueError:
continue
if node_num is None:
continue
# Get health data if available
health_data = {}
if health_engine and health_engine.mesh_health:
node_health = health_engine.mesh_health.nodes.get(str(node_num))
if node_health:
health_data = {
"region": node_health.region,
"locality": node_health.locality,
"is_infrastructure": node_health.is_infrastructure,
"is_online": node_health.is_online,
"packet_count_24h": node_health.packet_count_24h,
}
# Build node dict
node_dict = {
"node_num": node_num,
"node_id_hex": f"!{node_num:08x}",
"short_name": node.get("shortName") or node.get("short_name") or "",
"long_name": node.get("longName") or node.get("long_name") or "",
"role": node.get("role") or "",
"latitude": node.get("latitude"),
"longitude": node.get("longitude"),
"last_heard": _format_timestamp(node.get("last_heard")),
"battery_level": node.get("battery_level") or node.get("batteryLevel"),
"voltage": node.get("voltage"),
"snr": node.get("snr"),
"firmware": node.get("firmware_version") or node.get("firmwareVersion") or "",
"hardware": node.get("hw_model") or node.get("hwModel") or "",
"uptime": node.get("uptime_seconds") or node.get("uptimeSeconds"),
"sources": node.get("_sources", []),
**health_data,
}
nodes.append(node_dict)
return nodes
@router.get("/nodes/{node_num}")
async def get_node_detail(node_num: int, request: Request):
"""Get detailed info for a specific node."""
data_store = request.app.state.data_store
health_engine = request.app.state.health_engine
if not data_store:
raise HTTPException(status_code=404, detail="Data store not available")
# Find the node
try:
raw_nodes = data_store.get_all_nodes()
except Exception:
raise HTTPException(status_code=500, detail="Failed to fetch nodes")
target_node = None
for node in raw_nodes:
n_num = node.get("nodeNum") or node.get("num") or node.get("node_num")
if n_num is None:
node_id = node.get("node_id") or node.get("id")
if node_id and isinstance(node_id, str):
try:
n_num = int(node_id.lstrip("!"), 16)
except ValueError:
continue
if n_num == node_num:
target_node = node
break
if not target_node:
raise HTTPException(status_code=404, detail=f"Node {node_num} not found")
# Get health data
health_data = {}
if health_engine and health_engine.mesh_health:
node_health = health_engine.mesh_health.nodes.get(str(node_num))
if node_health:
health_data = {
"region": node_health.region,
"locality": node_health.locality,
"is_infrastructure": node_health.is_infrastructure,
"is_online": node_health.is_online,
"packet_count_24h": node_health.packet_count_24h,
"text_packet_count_24h": node_health.text_packet_count_24h,
"non_text_packets": node_health.non_text_packets,
"has_solar": node_health.has_solar,
}
# Get neighbors from edges
neighbors = []
try:
edges = data_store.get_all_edges()
for edge in edges:
from_num = edge.get("from_node") or edge.get("from")
to_num = edge.get("to_node") or edge.get("to")
if from_num == node_num:
neighbors.append({
"node_num": to_num,
"snr": edge.get("snr"),
})
elif to_num == node_num:
neighbors.append({
"node_num": from_num,
"snr": edge.get("snr"),
})
except Exception:
pass
return {
"node_num": node_num,
"node_id_hex": f"!{node_num:08x}",
"short_name": target_node.get("shortName") or target_node.get("short_name") or "",
"long_name": target_node.get("longName") or target_node.get("long_name") or "",
"role": target_node.get("role") or "",
"latitude": target_node.get("latitude"),
"longitude": target_node.get("longitude"),
"last_heard": _format_timestamp(target_node.get("last_heard")),
"battery_level": target_node.get("battery_level") or target_node.get("batteryLevel"),
"voltage": target_node.get("voltage"),
"snr": target_node.get("snr"),
"firmware": target_node.get("firmware_version") or target_node.get("firmwareVersion") or "",
"hardware": target_node.get("hw_model") or target_node.get("hwModel") or "",
"uptime": target_node.get("uptime_seconds") or target_node.get("uptimeSeconds"),
"sources": target_node.get("_sources", []),
"neighbors": neighbors,
**health_data,
}
@router.get("/regions")
async def get_regions(request: Request):
"""Get region summaries."""
health_engine = request.app.state.health_engine
if not health_engine or not health_engine.mesh_health:
return []
regions = []
for region in health_engine.mesh_health.regions:
# Count online infrastructure
infra_online = 0
infra_total = 0
online_count = 0
for nid in region.node_ids:
node = health_engine.mesh_health.nodes.get(nid)
if node:
if node.is_online:
online_count += 1
if node.is_infrastructure:
infra_total += 1
if node.is_online:
infra_online += 1
regions.append({
"name": region.name,
"local_name": region.name, # Could be overridden by region_labels
"node_count": len(region.node_ids),
"infra_count": infra_total,
"infra_online": infra_online,
"online_count": online_count,
"score": round(region.score.composite, 1),
"tier": region.score.tier,
"center_lat": region.center_lat,
"center_lon": region.center_lon,
})
return regions
@router.get("/sources")
async def get_sources(request: Request):
"""Get per-source health information."""
data_store = request.app.state.data_store
if not data_store:
return []
sources = []
try:
for name, source in data_store._sources.items():
source_info = {
"name": name,
"type": "meshview" if hasattr(source, "edges") else "meshmonitor",
"url": getattr(source, "url", ""),
"is_loaded": source.is_loaded,
"last_error": source.last_error,
"consecutive_errors": getattr(source, "consecutive_errors", 0),
"response_time_ms": getattr(source, "last_response_time_ms", None),
"tick_count": getattr(source, "tick_count", 0),
"node_count": len(source.nodes) if hasattr(source, "nodes") else 0,
}
sources.append(source_info)
except Exception:
pass
return sources
@router.get("/edges")
async def get_edges(request: Request):
"""Get neighbor/edge relationships."""
data_store = request.app.state.data_store
if not data_store:
return []
try:
raw_edges = data_store.get_all_edges()
except Exception:
return []
edges = []
for edge in raw_edges:
from_num = edge.get("from_node") or edge.get("from")
to_num = edge.get("to_node") or edge.get("to")
snr = edge.get("snr")
# Derive quality from SNR
if snr is None:
quality = "unknown"
elif snr > 12:
quality = "excellent"
elif snr > 8:
quality = "good"
elif snr > 5:
quality = "fair"
elif snr > 3:
quality = "marginal"
else:
quality = "poor"
edges.append({
"from_node": from_num,
"to_node": to_num,
"snr": snr,
"quality": quality,
})
return edges
"""Mesh health and node API routes."""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, Request
router = APIRouter(tags=["mesh"])
def _serialize_health_score(score) -> dict:
"""Serialize a HealthScore object."""
return {
"composite": round(score.composite, 1),
"tier": score.tier,
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"util_max_percent": round(getattr(score, 'util_max_percent', score.util_percent), 1),
"util_method": getattr(score, 'util_method', 'unknown'),
"util_node_count": getattr(score, 'util_node_count', 0),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"solar_index": round(score.solar_index, 1),
}
def _serialize_region(region) -> dict:
"""Serialize a RegionHealth object."""
return {
"name": region.name,
"center_lat": region.center_lat,
"center_lon": region.center_lon,
"node_count": len(region.node_ids),
"locality_count": len(region.localities),
"score": _serialize_health_score(region.score),
"node_ids": region.node_ids,
}
def _format_timestamp(ts: Optional[float]) -> Optional[str]:
"""Format a Unix timestamp as ISO string."""
if not ts or ts <= 0:
return None
try:
return datetime.fromtimestamp(ts).isoformat()
except (ValueError, OSError):
return None
@router.get("/health")
async def get_health(request: Request):
"""Get mesh health data."""
health_engine = request.app.state.health_engine
if not health_engine or not health_engine.mesh_health:
return {
"score": 0,
"tier": "Unknown",
"message": "Health engine not ready",
}
health = health_engine.mesh_health
score = health.score
return {
"score": round(score.composite, 1),
"tier": score.tier,
"pillars": {
"infrastructure": round(score.infrastructure, 1),
"utilization": round(score.utilization, 1),
"behavior": round(score.behavior, 1),
"power": round(score.power, 1),
},
"infra_online": score.infra_online,
"infra_total": score.infra_total,
"util_percent": round(score.util_percent, 1),
"util_max_percent": round(getattr(score, 'util_max_percent', score.util_percent), 1),
"util_method": getattr(score, 'util_method', 'unknown'),
"util_node_count": getattr(score, 'util_node_count', 0),
"flagged_nodes": score.flagged_nodes,
"battery_warnings": score.battery_warnings,
"total_nodes": health.total_nodes,
"total_regions": health.total_regions,
"unlocated_count": len(health.unlocated_nodes),
"last_computed": _format_timestamp(health.last_computed),
"recommendations": [], # TODO: Add recommendations
}
@router.get("/nodes")
async def get_nodes(request: Request):
"""Get all nodes."""
data_store = request.app.state.data_store
health_engine = request.app.state.health_engine
if not data_store:
return []
try:
raw_nodes = data_store.get_all_nodes()
except Exception:
return []
nodes = []
for node in raw_nodes:
# Extract node_num from various formats
node_num = node.get("nodeNum") or node.get("num") or node.get("node_num")
if node_num is None:
node_id = node.get("node_id") or node.get("id")
if node_id and isinstance(node_id, str):
try:
node_num = int(node_id.lstrip("!"), 16)
except ValueError:
continue
if node_num is None:
continue
# Get health data if available
health_data = {}
if health_engine and health_engine.mesh_health:
node_health = health_engine.mesh_health.nodes.get(str(node_num))
if node_health:
health_data = {
"region": node_health.region,
"locality": node_health.locality,
"is_infrastructure": node_health.is_infrastructure,
"is_online": node_health.is_online,
"packet_count_24h": node_health.packet_count_24h,
}
# Build node dict
node_dict = {
"node_num": node_num,
"node_id_hex": f"!{node_num:08x}",
"short_name": node.get("shortName") or node.get("short_name") or "",
"long_name": node.get("longName") or node.get("long_name") or "",
"role": node.get("role") or "",
"latitude": node.get("latitude"),
"longitude": node.get("longitude"),
"last_heard": _format_timestamp(node.get("last_heard")),
"battery_level": node.get("battery_level") or node.get("batteryLevel"),
"voltage": node.get("voltage"),
"snr": node.get("snr"),
"firmware": node.get("firmware_version") or node.get("firmwareVersion") or "",
"hardware": node.get("hw_model") or node.get("hwModel") or "",
"uptime": node.get("uptime_seconds") or node.get("uptimeSeconds"),
"sources": node.get("_sources", []),
**health_data,
}
nodes.append(node_dict)
return nodes
@router.get("/nodes/{node_num}")
async def get_node_detail(node_num: int, request: Request):
"""Get detailed info for a specific node."""
data_store = request.app.state.data_store
health_engine = request.app.state.health_engine
if not data_store:
raise HTTPException(status_code=404, detail="Data store not available")
# Find the node
try:
raw_nodes = data_store.get_all_nodes()
except Exception:
raise HTTPException(status_code=500, detail="Failed to fetch nodes")
target_node = None
for node in raw_nodes:
n_num = node.get("nodeNum") or node.get("num") or node.get("node_num")
if n_num is None:
node_id = node.get("node_id") or node.get("id")
if node_id and isinstance(node_id, str):
try:
n_num = int(node_id.lstrip("!"), 16)
except ValueError:
continue
if n_num == node_num:
target_node = node
break
if not target_node:
raise HTTPException(status_code=404, detail=f"Node {node_num} not found")
# Get health data
health_data = {}
if health_engine and health_engine.mesh_health:
node_health = health_engine.mesh_health.nodes.get(str(node_num))
if node_health:
health_data = {
"region": node_health.region,
"locality": node_health.locality,
"is_infrastructure": node_health.is_infrastructure,
"is_online": node_health.is_online,
"packet_count_24h": node_health.packet_count_24h,
"text_packet_count_24h": node_health.text_packet_count_24h,
"non_text_packets": node_health.non_text_packets,
"has_solar": node_health.has_solar,
}
# Get neighbors from edges
neighbors = []
try:
edges = data_store.get_all_edges()
for edge in edges:
from_num = edge.get("from_node") or edge.get("from")
to_num = edge.get("to_node") or edge.get("to")
if from_num == node_num:
neighbors.append({
"node_num": to_num,
"snr": edge.get("snr"),
})
elif to_num == node_num:
neighbors.append({
"node_num": from_num,
"snr": edge.get("snr"),
})
except Exception:
pass
return {
"node_num": node_num,
"node_id_hex": f"!{node_num:08x}",
"short_name": target_node.get("shortName") or target_node.get("short_name") or "",
"long_name": target_node.get("longName") or target_node.get("long_name") or "",
"role": target_node.get("role") or "",
"latitude": target_node.get("latitude"),
"longitude": target_node.get("longitude"),
"last_heard": _format_timestamp(target_node.get("last_heard")),
"battery_level": target_node.get("battery_level") or target_node.get("batteryLevel"),
"voltage": target_node.get("voltage"),
"snr": target_node.get("snr"),
"firmware": target_node.get("firmware_version") or target_node.get("firmwareVersion") or "",
"hardware": target_node.get("hw_model") or target_node.get("hwModel") or "",
"uptime": target_node.get("uptime_seconds") or target_node.get("uptimeSeconds"),
"sources": target_node.get("_sources", []),
"neighbors": neighbors,
**health_data,
}
@router.get("/regions")
async def get_regions(request: Request):
"""Get region summaries."""
health_engine = request.app.state.health_engine
if not health_engine or not health_engine.mesh_health:
return []
regions = []
for region in health_engine.mesh_health.regions:
# Count online infrastructure
infra_online = 0
infra_total = 0
online_count = 0
for nid in region.node_ids:
node = health_engine.mesh_health.nodes.get(nid)
if node:
if node.is_online:
online_count += 1
if node.is_infrastructure:
infra_total += 1
if node.is_online:
infra_online += 1
regions.append({
"name": region.name,
"local_name": region.name, # Could be overridden by region_labels
"node_count": len(region.node_ids),
"infra_count": infra_total,
"infra_online": infra_online,
"online_count": online_count,
"score": round(region.score.composite, 1),
"tier": region.score.tier,
"center_lat": region.center_lat,
"center_lon": region.center_lon,
})
return regions
@router.get("/sources")
async def get_sources(request: Request):
"""Get per-source health information."""
data_store = request.app.state.data_store
if not data_store:
return []
sources = []
try:
for name, source in data_store._sources.items():
source_info = {
"name": name,
"type": "meshview" if hasattr(source, "edges") else "meshmonitor",
"url": getattr(source, "url", ""),
"is_loaded": source.is_loaded,
"last_error": source.last_error,
"consecutive_errors": getattr(source, "consecutive_errors", 0),
"response_time_ms": getattr(source, "last_response_time_ms", None),
"tick_count": getattr(source, "tick_count", 0),
"node_count": len(source.nodes) if hasattr(source, "nodes") else 0,
}
sources.append(source_info)
except Exception:
pass
return sources
@router.get("/edges")
async def get_edges(request: Request):
"""Get neighbor/edge relationships."""
data_store = request.app.state.data_store
if not data_store:
return []
try:
raw_edges = data_store.get_all_edges()
except Exception:
return []
edges = []
for edge in raw_edges:
from_num = edge.get("from_node") or edge.get("from")
to_num = edge.get("to_node") or edge.get("to")
snr = edge.get("snr")
# Derive quality from SNR
if snr is None:
quality = "unknown"
elif snr > 12:
quality = "excellent"
elif snr > 8:
quality = "good"
elif snr > 5:
quality = "fair"
elif snr > 3:
quality = "marginal"
else:
quality = "poor"
edges.append({
"from_node": from_num,
"to_node": to_num,
"snr": snr,
"quality": quality,
})
return edges
@router.get("/channels")
async def get_channels(request: Request):
"""Get radio channels from the connected Meshtastic interface."""
connector = getattr(request.app.state, "connector", None)
if not connector or not connector.connected:
return []
try:
interface = connector._interface
if not interface or not hasattr(interface, "localNode"):
return []
local_node = interface.localNode
if not local_node or not hasattr(local_node, "channels"):
return []
channels = []
for ch in local_node.channels:
if ch is None:
continue
# Get channel settings
settings = getattr(ch, "settings", None)
name = getattr(settings, "name", "") if settings else ""
role_val = getattr(ch, "role", 0)
# Map role enum to string
role_map = {0: "DISABLED", 1: "PRIMARY", 2: "SECONDARY"}
role = role_map.get(role_val, "UNKNOWN")
channels.append({
"index": ch.index,
"name": name or f"Channel {ch.index}",
"role": role,
"enabled": role_val != 0,
})
return channels
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"Failed to get channels: {e}")
return []

View file

@ -52,6 +52,7 @@ def create_app() -> FastAPI:
from .api.mesh_routes import router as mesh_router
from .api.env_routes import router as env_router
from .api.alert_routes import router as alert_router
from .api.notification_routes import router as notification_router
app.include_router(system_router, prefix="/api")
app.include_router(config_router, prefix="/api")
@ -59,6 +60,7 @@ def create_app() -> FastAPI:
app.include_router(env_router, prefix="/api")
app.include_router(alert_router, prefix="/api")
app.include_router(notification_router, prefix="/api")
# WebSocket router (no prefix, path is /ws/live)
app.include_router(ws_router)
@ -110,6 +112,8 @@ async def start_dashboard(meshai_instance: "MeshAI") -> DashboardBroadcaster:
app.state.alert_engine = getattr(meshai_instance, "alert_engine", None)
app.state.env_store = getattr(meshai_instance, "env_store", None)
app.state.subscription_manager = meshai_instance.subscription_manager
app.state.notification_router = getattr(meshai_instance, "notification_router", None)
app.state.connector = meshai_instance.connector
# Create broadcaster and attach to app state
broadcaster = DashboardBroadcaster()

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1,17 +1,17 @@
<!DOCTYPE html>
<html lang="en" class="dark">
<head>
<meta charset="UTF-8" />
<link rel="icon" type="image/svg+xml" href="/favicon.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>MeshAI Dashboard</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-BaC2Rd9C.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-0HCYKWnt.css">
</head>
<body>
<div id="root"></div>
</body>
</html>
<!DOCTYPE html>
<html lang="en" class="dark">
<head>
<meta charset="UTF-8" />
<link rel="icon" type="image/svg+xml" href="/favicon.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>MeshAI Dashboard</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-BXyt_EfK.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-CtFYHJy4.css">
</head>
<body>
<div id="root"></div>
</body>
</html>

View file

@ -31,7 +31,7 @@ class AvalancheAdapter:
}
def __init__(self, config: "AvalancheConfig"):
self._center_ids = config.center_ids or ["SNFAC"]
self._center_ids = config.center_ids
self._tick_interval = config.tick_seconds or 1800
self._season_months = config.season_months or [12, 1, 2, 3, 4]
self._last_tick = 0.0

View file

@ -28,8 +28,8 @@ class DuctingAdapter:
"""Tropospheric ducting assessment from Open-Meteo GFS pressure levels."""
def __init__(self, config: "DuctingConfig"):
self._lat = config.latitude or 42.56
self._lon = config.longitude or -114.47
self._lat = config.latitude
self._lon = config.longitude
self._tick_interval = config.tick_seconds or 10800 # 3 hours
self._last_tick = 0.0
self._status = {}

2
meshai/env/fires.py vendored
View file

@ -20,7 +20,7 @@ class NICFFiresAdapter:
BASE_URL = "https://services3.arcgis.com/T4QMspbfLg3qTGWY/arcgis/rest/services/WFIGS_Interagency_Perimeters_Current/FeatureServer/0/query"
def __init__(self, config: "NICFFiresConfig", region_anchors: list = None):
self._state = config.state or "US-ID"
self._state = config.state
self._tick_interval = config.tick_seconds or 600
self._last_tick = 0.0
self._events = []

365
meshai/env/firms.py vendored Normal file
View file

@ -0,0 +1,365 @@
"""NASA FIRMS satellite fire hotspot adapter."""
import json
import logging
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
if TYPE_CHECKING:
from ..config import FIRMSConfig
logger = logging.getLogger(__name__)
class FIRMSAdapter:
"""NASA FIRMS satellite fire hotspot polling.
Detects fire hotspots from satellite data (MODIS, VIIRS) typically
hours before NIFC publishes official perimeters. Early warning.
API: https://firms.modaps.eosdis.nasa.gov/api/area/csv/{MAP_KEY}/{SOURCE}/{BBOX}/{DAY_RANGE}
"""
BASE_URL = "https://firms.modaps.eosdis.nasa.gov/api/area/csv"
def __init__(self, config: "FIRMSConfig", region_anchors: list = None, fires_adapter=None):
self._map_key = config.map_key
self._source = config.source or "VIIRS_SNPP_NRT"
self._bbox = config.bbox # [west, south, east, north]
self._day_range = config.day_range or 1
self._tick_interval = config.tick_seconds or 1800
self._confidence_min = config.confidence_min or "nominal"
self._proximity_km = config.proximity_km or 10.0 # km to match known fire
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# For cross-referencing
self._region_anchors = region_anchors or []
self._fires_adapter = fires_adapter # NICFFiresAdapter for cross-ref
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
if not self._map_key:
if not self._last_error:
logger.warning("FIRMS: No MAP_KEY configured, skipping")
self._last_error = "No MAP_KEY configured"
return False
if not self._bbox or len(self._bbox) != 4:
if not self._last_error:
logger.warning("FIRMS: No valid bbox configured, skipping")
self._last_error = "No valid bbox configured"
return False
return self._fetch()
def _fetch(self) -> bool:
"""Fetch fire hotspots from NASA FIRMS.
Returns:
True if data changed
"""
# Format bbox as west,south,east,north
bbox_str = ",".join(str(c) for c in self._bbox)
url = f"{self.BASE_URL}/{self._map_key}/{self._source}/{bbox_str}/{self._day_range}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "text/csv",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
csv_data = resp.read().decode("utf-8")
except HTTPError as e:
if e.code == 401:
logger.error("FIRMS: Invalid MAP_KEY, disabling adapter")
self._last_error = "Invalid MAP_KEY"
self._consecutive_errors = 999 # Disable
return False
logger.warning(f"FIRMS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"FIRMS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"FIRMS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse CSV response
new_events = self._parse_csv(csv_data)
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
new_ignitions = sum(1 for e in new_events if e.get("properties", {}).get("new_ignition"))
logger.info(f"FIRMS hotspots updated: {len(new_events)} total, {new_ignitions} potential new ignitions")
return changed
def _parse_csv(self, csv_data: str) -> list:
"""Parse FIRMS CSV response into events."""
lines = csv_data.strip().split("\n")
if len(lines) < 2:
return []
# Parse header
header = lines[0].split(",")
header_map = {col.strip().lower(): i for i, col in enumerate(header)}
# Required columns
lat_idx = header_map.get("latitude")
lon_idx = header_map.get("longitude")
conf_idx = header_map.get("confidence")
frp_idx = header_map.get("frp") # Fire Radiative Power
acq_date_idx = header_map.get("acq_date")
acq_time_idx = header_map.get("acq_time")
bright_idx = header_map.get("bright_ti4") or header_map.get("brightness")
if lat_idx is None or lon_idx is None:
logger.warning("FIRMS CSV missing required columns")
return []
events = []
now = time.time()
# Confidence mapping
conf_values = {"low": 1, "l": 1, "nominal": 2, "n": 2, "high": 3, "h": 3}
min_conf = conf_values.get(self._confidence_min.lower(), 2)
# Get known fire locations for cross-referencing
known_fires = self._get_known_fires()
for line in lines[1:]:
cols = line.split(",")
if len(cols) < max(filter(None, [lat_idx, lon_idx, conf_idx])) + 1:
continue
try:
lat = float(cols[lat_idx])
lon = float(cols[lon_idx])
except (ValueError, IndexError):
continue
# Parse confidence
conf_raw = cols[conf_idx].strip() if conf_idx is not None and conf_idx < len(cols) else "n"
conf_value = conf_values.get(conf_raw.lower(), 2)
# Filter by confidence
if conf_value < min_conf:
continue
# Parse FRP (fire radiative power in MW)
frp = None
if frp_idx is not None and frp_idx < len(cols):
try:
frp = float(cols[frp_idx])
except ValueError:
pass
# Parse brightness temperature
brightness = None
if bright_idx is not None and bright_idx < len(cols):
try:
brightness = float(cols[bright_idx])
except ValueError:
pass
# Parse acquisition datetime
acq_date = cols[acq_date_idx].strip() if acq_date_idx is not None and acq_date_idx < len(cols) else ""
acq_time = cols[acq_time_idx].strip() if acq_time_idx is not None and acq_time_idx < len(cols) else ""
# Create unique ID from position and time
event_id = f"firms_{lat:.4f}_{lon:.4f}_{acq_date}_{acq_time}"
# Check if near known fire
near_fire, fire_name, distance_to_fire = self._check_near_known_fire(lat, lon, known_fires)
# Determine severity
if not near_fire:
# Potential new ignition
severity = "watch"
new_ignition = True
headline = f"NEW HOTSPOT detected"
else:
# Near known fire
severity = "advisory"
new_ignition = False
headline = f"Hotspot near {fire_name}"
# Bump severity for high FRP
if frp is not None and frp > 100:
if severity == "advisory":
severity = "watch"
elif severity == "watch":
severity = "warning"
headline += f" ({int(frp)} MW)"
# Compute proximity to region anchors
distance_km, nearest_anchor = self._nearest_anchor_distance(lat, lon)
if distance_km is not None and nearest_anchor:
headline += f" ({int(distance_km)} km from {nearest_anchor})"
event = {
"source": "firms",
"event_id": event_id,
"event_type": "Fire Hotspot",
"severity": severity,
"headline": headline,
"lat": lat,
"lon": lon,
"expires": now + 21600, # 6 hour TTL
"fetched_at": now,
"properties": {
"new_ignition": new_ignition,
"confidence": conf_raw,
"frp": frp,
"brightness": brightness,
"acq_date": acq_date,
"acq_time": acq_time,
"near_fire": fire_name if near_fire else None,
"distance_to_fire_km": distance_to_fire,
"distance_km": distance_km,
"nearest_anchor": nearest_anchor,
},
}
events.append(event)
return events
def _get_known_fires(self) -> list:
"""Get known fire locations from NIFC adapter."""
if not self._fires_adapter:
return []
fires = self._fires_adapter.get_events()
return [
{
"name": f.get("name", "Unknown"),
"lat": f.get("lat"),
"lon": f.get("lon"),
}
for f in fires
if f.get("lat") is not None and f.get("lon") is not None
]
def _check_near_known_fire(self, lat: float, lon: float, known_fires: list) -> tuple:
"""Check if hotspot is near a known fire.
Returns:
(is_near, fire_name, distance_km)
"""
if not known_fires:
return (False, None, None)
from ..geo import haversine_distance
for fire in known_fires:
fire_lat = fire.get("lat")
fire_lon = fire.get("lon")
if fire_lat is None or fire_lon is None:
continue
# haversine_distance returns miles, convert to km
dist_miles = haversine_distance(lat, lon, fire_lat, fire_lon)
dist_km = dist_miles * 1.60934
if dist_km <= self._proximity_km:
return (True, fire.get("name"), dist_km)
return (False, None, None)
def _nearest_anchor_distance(self, lat: float, lon: float) -> tuple:
"""Find distance to nearest region anchor.
Returns:
(distance_km, anchor_name) or (None, None)
"""
if not self._region_anchors:
return (None, None)
from ..geo import haversine_distance
min_dist = float("inf")
nearest_name = None
for anchor in self._region_anchors:
anchor_lat = anchor.get("lat") if isinstance(anchor, dict) else getattr(anchor, "lat", None)
anchor_lon = anchor.get("lon") if isinstance(anchor, dict) else getattr(anchor, "lon", None)
anchor_name = anchor.get("name") if isinstance(anchor, dict) else getattr(anchor, "name", "Unknown")
if anchor_lat is None or anchor_lon is None:
continue
# haversine_distance returns miles, convert to km
dist_miles = haversine_distance(lat, lon, anchor_lat, anchor_lon)
dist_km = dist_miles * 1.60934
if dist_km < min_dist:
min_dist = dist_km
nearest_name = anchor_name
if min_dist < float("inf"):
return (min_dist, nearest_name)
return (None, None)
def get_events(self) -> list:
"""Get current hotspot events."""
return self._events
def get_new_ignitions(self) -> list:
"""Get only potential new ignitions (not near known fires)."""
return [e for e in self._events if e.get("properties", {}).get("new_ignition")]
@property
def health_status(self) -> dict:
"""Get adapter health status."""
new_ignitions = len(self.get_new_ignitions())
return {
"source": "firms",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"new_ignitions": new_ignitions,
"last_fetch": self._last_tick,
}

366
meshai/env/roads511.py vendored Normal file
View file

@ -0,0 +1,366 @@
"""511 Road Conditions adapter.
Polls a configurable 511 API for road events. The base URL is fully
configurable as each state has a different 511 system.
"""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urljoin
if TYPE_CHECKING:
from ..config import Roads511Config
logger = logging.getLogger(__name__)
class Roads511Adapter:
"""511 road conditions polling adapter."""
def __init__(self, config: "Roads511Config"):
self._api_key = self._resolve_env(config.api_key or "")
self._base_url = (config.base_url or "").rstrip("/")
self._endpoints = config.endpoints or ["/get/event"]
self._bbox = config.bbox or [] # [west, south, east, north]
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._auth_failed = False # Stop retrying on auth failures
if not self._base_url:
logger.info("511: No base URL configured, adapter disabled")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No base URL configured
if not self._base_url:
return False
# Auth failed - don't keep retrying
if self._auth_failed:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch events from all configured endpoints.
Returns:
True if data changed
"""
new_events = []
now = time.time()
for endpoint in self._endpoints:
events = self._fetch_endpoint(endpoint, now)
if events:
new_events.extend(events)
# Apply bbox filter if configured
if self._bbox and len(self._bbox) == 4:
west, south, east, north = self._bbox
new_events = [
e for e in new_events
if e.get("lat") is not None and e.get("lon") is not None
and west <= e["lon"] <= east and south <= e["lat"] <= north
]
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
self._is_loaded = True
if changed:
logger.info(f"511 road events updated: {len(new_events)} active")
return changed
def _fetch_endpoint(self, endpoint: str, now: float) -> list:
"""Fetch events from a single endpoint.
Args:
endpoint: API endpoint path
now: Current timestamp
Returns:
List of event dicts
"""
url = urljoin(self._base_url + "/", endpoint.lstrip("/"))
# Add API key if configured
if self._api_key:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}key={self._api_key}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(
f"511 auth error: {e.code} - check API key configuration for {self._base_url}"
)
self._last_error = f"Auth error {e.code} - check API key"
self._auth_failed = True
return []
else:
logger.warning(f"511 HTTP error for {endpoint}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return []
except URLError as e:
logger.warning(f"511 connection error for {endpoint}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return []
except Exception as e:
logger.warning(f"511 fetch error for {endpoint}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return []
# Parse response - handle various 511 API formats
return self._parse_response(data, now)
def _parse_response(self, data, now: float) -> list:
"""Parse 511 API response.
Different states use different formats. Try common patterns.
Args:
data: JSON response data
now: Current timestamp
Returns:
List of event dicts
"""
events = []
# Handle array response
if isinstance(data, list):
items = data
# Handle wrapped response
elif isinstance(data, dict):
# Try common wrapper keys
items = (
data.get("events") or
data.get("items") or
data.get("data") or
data.get("results") or
[]
)
if not isinstance(items, list):
items = [data] if self._looks_like_event(data) else []
else:
return []
for item in items:
event = self._parse_event(item, now)
if event:
events.append(event)
self._consecutive_errors = 0
self._last_error = None
return events
def _looks_like_event(self, item: dict) -> bool:
"""Check if dict looks like a 511 event."""
return bool(
item.get("id") or item.get("EventId") or item.get("event_id")
)
def _parse_event(self, item: dict, now: float) -> dict:
"""Parse a single 511 event.
Args:
item: Event dict from API
now: Current timestamp
Returns:
Normalized event dict or None
"""
try:
# Try various ID field names
event_id = (
item.get("id") or
item.get("EventId") or
item.get("event_id") or
item.get("ID") or
str(hash(str(item)))[:12]
)
# Try various type field names
event_type = (
item.get("EventType") or
item.get("event_type") or
item.get("type") or
item.get("Type") or
item.get("category") or
"Road Event"
)
# Try various road name fields
roadway = (
item.get("RoadwayName") or
item.get("roadway_name") or
item.get("roadway") or
item.get("Roadway") or
item.get("road") or
item.get("route") or
""
)
# Try various description fields
description = (
item.get("Description") or
item.get("description") or
item.get("message") or
item.get("Message") or
item.get("details") or
""
)
# Try various location fields
lat = (
item.get("Latitude") or
item.get("latitude") or
item.get("lat") or
item.get("StartLatitude") or
None
)
lon = (
item.get("Longitude") or
item.get("longitude") or
item.get("lon") or
item.get("lng") or
item.get("StartLongitude") or
None
)
# Try to get coordinates from nested location object
if lat is None and "location" in item:
loc = item["location"]
if isinstance(loc, dict):
lat = loc.get("latitude") or loc.get("lat")
lon = loc.get("longitude") or loc.get("lon") or loc.get("lng")
# Check closure status
is_closure = (
item.get("IsFullClosure") or
item.get("is_full_closure") or
item.get("fullClosure") or
item.get("closed") or
"closure" in str(event_type).lower() or
"closed" in str(description).lower()
)
# Determine severity
if is_closure:
severity = "warning"
elif "construction" in str(event_type).lower():
severity = "advisory"
elif "incident" in str(event_type).lower():
severity = "advisory"
else:
severity = "info"
# Format headline
if roadway and description:
headline = f"{roadway}: {description[:100]}"
elif roadway:
headline = f"{roadway}: {event_type}"
elif description:
headline = description[:120]
else:
headline = f"{event_type}"
# Try to get timestamp for expiry
last_updated = (
item.get("LastUpdated") or
item.get("last_updated") or
item.get("updated") or
item.get("timestamp") or
None
)
# Default 6 hour TTL, refreshed every tick
expires = now + 21600
event = {
"source": "511",
"event_id": f"511_{event_id}",
"event_type": event_type,
"headline": headline,
"description": description[:500] if description else "",
"severity": severity,
"lat": float(lat) if lat is not None else None,
"lon": float(lon) if lon is not None else None,
"expires": expires,
"fetched_at": now,
"properties": {
"roadway": roadway,
"is_closure": bool(is_closure),
"last_updated": last_updated,
},
}
return event
except Exception as e:
logger.debug(f"511 event parse error: {e} - item: {item}")
return None
def get_events(self) -> list:
"""Get current road events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "511",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"auth_failed": self._auth_failed,
}

446
meshai/env/store.py vendored
View file

@ -1,198 +1,248 @@
"""Environmental data store with tick-based adapter polling."""
import logging
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import EnvironmentalConfig
logger = logging.getLogger(__name__)
class EnvironmentalStore:
"""Cache and tick-driver for all environmental feed adapters."""
def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None):
self._adapters = {} # name -> adapter instance
self._events = {} # (source, event_id) -> event dict
self._swpc_status = {} # Kp/SFI/scales snapshot
self._ducting_status = {} # tropo ducting assessment
self._mesh_zones = config.nws_zones or []
self._region_anchors = region_anchors or []
# Create adapter instances based on config
if config.nws.enabled:
from .nws import NWSAlertsAdapter
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
if config.swpc.enabled:
from .swpc import SWPCAdapter
self._adapters["swpc"] = SWPCAdapter(config.swpc)
if config.ducting.enabled:
from .ducting import DuctingAdapter
self._adapters["ducting"] = DuctingAdapter(config.ducting)
if config.fires.enabled:
from .fires import NICFFiresAdapter
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
if config.avalanche.enabled:
from .avalanche import AvalancheAdapter
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
def refresh(self) -> bool:
"""Called every second from main loop. Ticks each adapter.
Returns:
True if any data changed
"""
changed = False
for name, adapter in self._adapters.items():
try:
if adapter.tick():
changed = True
self._ingest(name, adapter)
except Exception as e:
logger.warning("Env adapter %s error: %s", name, e)
self._purge_expired()
return changed
def _ingest(self, name: str, adapter):
"""Ingest data from an adapter after it ticks."""
if name == "swpc":
self._swpc_status = adapter.get_status()
# Also ingest any alert events (R-scale >= 3)
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
elif name == "ducting":
self._ducting_status = adapter.get_status()
else:
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
def _purge_expired(self):
"""Remove expired events."""
now = time.time()
expired = [
k for k, v in self._events.items()
if v.get("expires") and v["expires"] < now
]
for k in expired:
del self._events[k]
def get_active(self, source: str = None) -> list:
"""Get active events, optionally filtered by source.
Args:
source: Filter to specific source (nws, swpc, etc.)
Returns:
List of event dicts sorted by fetched_at (newest first)
"""
events = list(self._events.values())
if source:
events = [e for e in events if e["source"] == source]
return sorted(events, key=lambda e: e.get("fetched_at", 0), reverse=True)
def get_for_zones(self, zones: list) -> list:
"""Get events affecting specific NWS zones.
Args:
zones: List of UGC zone codes (e.g., ["IDZ016", "IDZ030"])
Returns:
List of events with overlapping zone coverage
"""
zone_set = set(zones)
return [
e for e in self._events.values()
if set(e.get("areas", [])) & zone_set
]
def get_swpc_status(self) -> dict:
"""Get current SWPC space weather status."""
return self._swpc_status
def get_ducting_status(self) -> dict:
"""Get current tropospheric ducting status."""
return self._ducting_status
def get_rf_propagation(self) -> dict:
"""Combined HF + UHF propagation summary for dashboard/LLM."""
return {
"hf": self._swpc_status,
"uhf_ducting": self._ducting_status,
}
def get_summary(self) -> str:
"""Compact text block for LLM context injection."""
lines = []
lines.append(f"### Current Conditions (as of {time.strftime('%H:%M:%S MT')}):")
# NWS alerts
nws = self.get_active(source="nws")
if nws:
lines.append(f"NWS: {len(nws)} active alert(s):")
for a in nws[:3]:
lines.append(f" - {a['event_type']}: {a['headline'][:120]}")
else:
lines.append("NWS: No active alerts for mesh area.")
# Space weather indices (raw - LLM interprets)
s = self._swpc_status
if s:
kp = s.get("kp_current", "?")
sfi = s.get("sfi", "?")
r = s.get("r_scale", 0)
g = s.get("g_scale", 0)
lines.append(f"Space Weather: SFI {sfi}, Kp {kp}, R{r}/G{g}")
warnings = s.get("active_warnings", [])
if warnings:
for w in warnings[:2]:
lines.append(f" Warning: {w}")
else:
lines.append("Space Weather: Data not available.")
# Tropospheric ducting (raw - LLM interprets)
d = self._ducting_status
if d:
condition = d.get("condition", "unknown")
gradient = d.get("min_gradient", "?")
if condition == "normal":
lines.append(f"Tropospheric: Normal (dM/dz {gradient} M-units/km)")
else:
thickness = d.get("duct_thickness_m", "?")
lines.append(f"Tropospheric: {condition.replace('_', ' ').title()}")
lines.append(f" dM/dz: {gradient} M-units/km, duct ~{thickness}m thick")
# Active fires
fires = self.get_active(source="nifc")
if fires:
lines.append(f"Wildfires: {len(fires)} active")
for f in fires[:2]:
name = f.get("name", "Unknown")
acres = f.get("acres", 0)
pct = f.get("pct_contained", 0)
dist = f.get("distance_km")
lines.append(f" - {name}: {int(acres):,} ac, {int(pct)}% contained" +
(f" ({int(dist)} km)" if dist else ""))
# Avalanche advisories
avy = self.get_active(source="avalanche")
if avy:
lines.append(f"Avalanche: {len(avy)} zone(s) with advisories")
for a in avy[:2]:
zone = a.get("zone_name", "Unknown")
danger = a.get("danger_name", "Unknown")
lines.append(f" - {zone}: {danger}")
return "\n".join(lines)
def get_source_health(self) -> list:
"""Get health status for all adapters."""
return [a.health_status for a in self._adapters.values()]
"""Environmental data store with tick-based adapter polling."""
import logging
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import EnvironmentalConfig
logger = logging.getLogger(__name__)
class EnvironmentalStore:
"""Cache and tick-driver for all environmental feed adapters."""
def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None):
self._adapters = {} # name -> adapter instance
self._events = {} # (source, event_id) -> event dict
self._swpc_status = {} # Kp/SFI/scales snapshot
self._ducting_status = {} # tropo ducting assessment
self._mesh_zones = config.nws_zones or []
self._region_anchors = region_anchors or []
# Create adapter instances based on config
if config.nws.enabled:
from .nws import NWSAlertsAdapter
self._adapters["nws"] = NWSAlertsAdapter(config.nws)
if config.swpc.enabled:
from .swpc import SWPCAdapter
self._adapters["swpc"] = SWPCAdapter(config.swpc)
if config.ducting.enabled:
from .ducting import DuctingAdapter
self._adapters["ducting"] = DuctingAdapter(config.ducting)
if config.fires.enabled:
from .fires import NICFFiresAdapter
self._adapters["nifc"] = NICFFiresAdapter(config.fires, self._region_anchors)
if config.avalanche.enabled:
from .avalanche import AvalancheAdapter
self._adapters["avalanche"] = AvalancheAdapter(config.avalanche)
if config.usgs.enabled:
from .usgs import USGSStreamsAdapter
self._adapters["usgs"] = USGSStreamsAdapter(config.usgs)
if config.traffic.enabled:
from .traffic import TomTomTrafficAdapter
self._adapters["traffic"] = TomTomTrafficAdapter(config.traffic)
if config.roads511.enabled:
from .roads511 import Roads511Adapter
self._adapters["roads511"] = Roads511Adapter(config.roads511)
# FIRMS needs reference to NIFC adapter for cross-referencing
if config.firms.enabled:
from .firms import FIRMSAdapter
fires_adapter = self._adapters.get("nifc")
self._firms = FIRMSAdapter(config.firms, self._region_anchors, fires_adapter)
self._adapters["firms"] = self._firms
logger.info(f"EnvironmentalStore initialized with {len(self._adapters)} adapters")
def refresh(self) -> bool:
"""Called every second from main loop. Ticks each adapter.
Returns:
True if any data changed
"""
changed = False
for name, adapter in self._adapters.items():
try:
if adapter.tick():
changed = True
self._ingest(name, adapter)
except Exception as e:
logger.warning("Env adapter %s error: %s", name, e)
self._purge_expired()
return changed
def _ingest(self, name: str, adapter):
"""Ingest data from an adapter after it ticks."""
if name == "swpc":
self._swpc_status = adapter.get_status()
# Also ingest any alert events (R-scale >= 3)
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
elif name == "ducting":
self._ducting_status = adapter.get_status()
else:
for evt in adapter.get_events():
self._events[(evt["source"], evt["event_id"])] = evt
def _purge_expired(self):
"""Remove expired events."""
now = time.time()
expired = [
k for k, v in self._events.items()
if v.get("expires") and v["expires"] < now
]
for k in expired:
del self._events[k]
def get_active(self, source: str = None) -> list:
"""Get active events, optionally filtered by source.
Args:
source: Filter to specific source (nws, swpc, etc.)
Returns:
List of event dicts sorted by fetched_at (newest first)
"""
events = list(self._events.values())
if source:
events = [e for e in events if e["source"] == source]
return sorted(events, key=lambda e: e.get("fetched_at", 0), reverse=True)
def get_for_zones(self, zones: list) -> list:
"""Get events affecting specific NWS zones.
Args:
zones: List of UGC zone codes (e.g., ["IDZ016", "IDZ030"])
Returns:
List of events with overlapping zone coverage
"""
zone_set = set(zones)
return [
e for e in self._events.values()
if set(e.get("areas", [])) & zone_set
]
def get_swpc_status(self) -> dict:
"""Get current SWPC space weather status."""
return self._swpc_status
def get_ducting_status(self) -> dict:
"""Get current tropospheric ducting status."""
return self._ducting_status
def get_rf_propagation(self) -> dict:
"""Combined HF + UHF propagation summary for dashboard/LLM."""
return {
"hf": self._swpc_status,
"uhf_ducting": self._ducting_status,
}
def get_summary(self) -> str:
"""Compact text block for LLM context injection."""
lines = []
lines.append(f"### Current Conditions (as of {time.strftime('%H:%M:%S MT')}):")
# NWS alerts
nws = self.get_active(source="nws")
if nws:
lines.append(f"NWS: {len(nws)} active alert(s):")
for a in nws[:3]:
lines.append(f" - {a['event_type']}: {a['headline'][:120]}")
else:
lines.append("NWS: No active alerts for mesh area.")
# Space weather indices (raw - LLM interprets)
s = self._swpc_status
if s:
kp = s.get("kp_current", "?")
sfi = s.get("sfi", "?")
r = s.get("r_scale", 0)
g = s.get("g_scale", 0)
lines.append(f"Space Weather: SFI {sfi}, Kp {kp}, R{r}/G{g}")
warnings = s.get("active_warnings", [])
if warnings:
for w in warnings[:2]:
lines.append(f" Warning: {w}")
else:
lines.append("Space Weather: Data not available.")
# Tropospheric ducting (raw - LLM interprets)
d = self._ducting_status
if d:
condition = d.get("condition", "unknown")
gradient = d.get("min_gradient", "?")
if condition == "normal":
lines.append(f"Tropospheric: Normal (dM/dz {gradient} M-units/km)")
else:
thickness = d.get("duct_thickness_m", "?")
lines.append(f"Tropospheric: {condition.replace('_', ' ').title()}")
lines.append(f" dM/dz: {gradient} M-units/km, duct ~{thickness}m thick")
# Active fires
fires = self.get_active(source="nifc")
if fires:
lines.append(f"Wildfires: {len(fires)} active")
for f in fires[:2]:
name = f.get("name", "Unknown")
acres = f.get("acres", 0)
pct = f.get("pct_contained", 0)
dist = f.get("distance_km")
lines.append(f" - {name}: {int(acres):,} ac, {int(pct)}% contained" +
(f" ({int(dist)} km)" if dist else ""))
# Avalanche advisories
avy = self.get_active(source="avalanche")
if avy:
lines.append(f"Avalanche: {len(avy)} zone(s) with advisories")
for a in avy[:2]:
zone = a.get("zone_name", "Unknown")
danger = a.get("danger_name", "Unknown")
lines.append(f" - {zone}: {danger}")
# Stream gauges
streams = self.get_active(source="usgs")
if streams:
lines.append(f"Stream Gauges: {len(streams)} readings")
for s in streams[:2]:
lines.append(f" - {s['headline']}")
# Traffic flow
traffic = self.get_active(source="traffic")
if traffic:
lines.append(f"Traffic: {len(traffic)} corridors")
for t in traffic[:2]:
lines.append(f" - {t['headline']}")
# 511 road events
roads = self.get_active(source="511")
if roads:
lines.append(f"Road Events: {len(roads)} active")
for r in roads[:2]:
lines.append(f" - {r['headline'][:60]}")
# Satellite hotspots
hotspots = self.get_active(source="firms")
if hotspots:
new_ignitions = [h for h in hotspots if h.get("properties", {}).get("new_ignition")]
lines.append(f"Satellite Hotspots: {len(hotspots)} detected")
if new_ignitions:
lines.append(f" *** {len(new_ignitions)} POTENTIAL NEW IGNITION(S) ***")
for h in hotspots[:2]:
lines.append(f" - {h['headline']}")
return "\n".join(lines)
def get_source_health(self) -> list:
"""Get health status for all adapters."""
return [a.health_status for a in self._adapters.values()]

45
meshai/env/swpc.py vendored
View file

@ -140,15 +140,36 @@ class SWPCAdapter:
"""Parse noaa-planetary-k-index.json.
Data format: array of objects with time_tag, Kp, a_running, station_count
Last entry is most recent.
Last entry is most recent. Store full history for charting.
"""
if not data:
return
# Get last entry (most recent)
last_entry = data[-1]
# Store full history (last 24-48 hours of readings)
kp_history = []
for entry in data:
if isinstance(entry, dict):
try:
kp_history.append({
"time": entry.get("time_tag", ""),
"value": float(entry.get("Kp", 0)),
})
except (ValueError, TypeError):
continue
elif isinstance(entry, list) and len(entry) > 1:
# Legacy array format fallback
try:
kp_history.append({
"time": entry[0] if len(entry) > 0 else "",
"value": float(entry[1]),
})
except (ValueError, TypeError):
continue
# Handle both dict format (new API) and list format (legacy)
self._status["kp_history"] = kp_history
# Get last entry (most recent) for current value
last_entry = data[-1]
if isinstance(last_entry, dict):
try:
self._status["kp_current"] = float(last_entry.get("Kp", 0))
@ -184,10 +205,26 @@ class SWPCAdapter:
"""Parse f107_cm_flux.json.
Data format: array of objects with time_tag, flux
Store history for potential charting.
"""
if not data:
return
# Store SFI history (last 30 days of readings)
sfi_history = []
if isinstance(data, list):
for entry in data[-30:]: # Last 30 entries
if isinstance(entry, dict):
try:
sfi_history.append({
"time": entry.get("time_tag", ""),
"value": float(entry.get("flux", 0)),
})
except (ValueError, TypeError):
continue
self._status["sfi_history"] = sfi_history
# Get most recent entry (last in list)
if isinstance(data, list) and data:
last = data[-1]

254
meshai/env/traffic.py vendored Normal file
View file

@ -0,0 +1,254 @@
"""TomTom Traffic Flow adapter."""
import json
import logging
import os
import time
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import TomTomConfig
logger = logging.getLogger(__name__)
class TomTomTrafficAdapter:
"""TomTom Traffic Flow Segment Data polling."""
BASE_URL = "https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/10/json"
def __init__(self, config: "TomTomConfig"):
self._api_key = self._resolve_env(config.api_key or "")
self._corridors = config.corridors or []
self._tick_interval = config.tick_seconds or 300
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
self._daily_requests = 0
self._daily_reset = 0.0
if not self._api_key:
logger.warning("TomTom API key not configured, adapter disabled")
if not self._corridors:
logger.info("TomTom: No corridors configured, adapter idle")
def _resolve_env(self, value: str) -> str:
"""Resolve ${ENV_VAR} references in value."""
if value and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
return os.environ.get(env_var, "")
return value
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# Reset daily counter at midnight
if now - self._daily_reset > 86400:
self._daily_requests = 0
self._daily_reset = now
# No API key or corridors
if not self._api_key or not self._corridors:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch_all()
def _fetch_all(self) -> bool:
"""Fetch traffic flow for all configured corridors.
Returns:
True if data changed
"""
new_events = []
now = time.time()
any_error = False
for corridor in self._corridors:
# Support both dict and object formats
if isinstance(corridor, dict):
name = corridor.get("name", "Unknown")
lat = corridor.get("lat")
lon = corridor.get("lon")
else:
name = getattr(corridor, "name", "Unknown")
lat = getattr(corridor, "lat", None)
lon = getattr(corridor, "lon", None)
if lat is None or lon is None:
continue
event = self._fetch_point(name, lat, lon, now)
if event:
new_events.append(event)
else:
any_error = True
if any_error and not new_events:
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids
self._events = new_events
if not any_error:
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"TomTom traffic updated: {len(new_events)} corridors")
return changed
def _fetch_point(self, name: str, lat: float, lon: float, now: float) -> dict:
"""Fetch traffic flow for a single point.
Args:
name: Corridor name
lat: Latitude
lon: Longitude
now: Current timestamp
Returns:
Event dict or None on error
"""
params = {
"point": f"{lat},{lon}",
"key": self._api_key,
"unit": "MPH",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
self._daily_requests += 1
except HTTPError as e:
if e.code == 401 or e.code == 403:
logger.error(f"TomTom auth error: {e.code} - check API key")
self._last_error = f"Auth error {e.code}"
else:
logger.warning(f"TomTom HTTP error for {name}: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return None
except URLError as e:
logger.warning(f"TomTom connection error for {name}: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return None
except Exception as e:
logger.warning(f"TomTom fetch error for {name}: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return None
# Parse response
try:
flow = data.get("flowSegmentData", {})
current_speed = flow.get("currentSpeed", 0)
free_flow_speed = flow.get("freeFlowSpeed", 0)
current_time = flow.get("currentTravelTime", 0)
free_flow_time = flow.get("freeFlowTravelTime", 0)
confidence = flow.get("confidence", 0)
road_closure = flow.get("roadClosure", False)
# Calculate speed ratio for severity
if free_flow_speed > 0:
ratio = current_speed / free_flow_speed
else:
ratio = 1.0
# Determine severity
if road_closure:
severity = "warning"
elif ratio >= 0.8:
severity = "info"
elif ratio >= 0.5:
severity = "advisory"
else:
severity = "warning"
# Format headline
if road_closure:
headline = f"{name}: CLOSED"
else:
pct = int(ratio * 100)
headline = f"{name}: {int(current_speed)}mph ({pct}% of free flow)"
event = {
"source": "traffic",
"event_id": f"traffic_{name.replace(' ', '_').lower()}",
"event_type": "Traffic Flow",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 600, # 10 min TTL
"fetched_at": now,
"properties": {
"corridor": name,
"currentSpeed": current_speed,
"freeFlowSpeed": free_flow_speed,
"speedRatio": ratio,
"currentTravelTime": current_time,
"freeFlowTravelTime": free_flow_time,
"confidence": confidence,
"roadClosure": road_closure,
},
}
return event
except Exception as e:
logger.warning(f"TomTom parse error for {name}: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return None
def get_events(self) -> list:
"""Get current traffic events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "traffic",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"corridor_count": len(self._corridors),
"daily_requests": self._daily_requests,
}

453
meshai/env/usgs.py vendored Normal file
View file

@ -0,0 +1,453 @@
"""USGS Water Services stream gauge adapter with NWS flood stage auto-lookup.
# TODO: Migrate to api.waterdata.usgs.gov OGC API before Q1 2027
# Legacy waterservices.usgs.gov will be decommissioned.
# See: https://www.usgs.gov/tools/usgs-water-data-apis
"""
import json
import logging
import time
from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
if TYPE_CHECKING:
from ..config import USGSConfig
logger = logging.getLogger(__name__)
# Minimum tick interval per USGS guidelines (do not fetch same data more than hourly)
MIN_TICK_SECONDS = 900 # 15 minutes
# Cache for NWS flood stages (rarely change)
_nwps_cache: dict[str, dict] = {}
_nwps_cache_time: dict[str, float] = {}
NWPS_CACHE_TTL = 86400 * 7 # 7 days
class USGSStreamsAdapter:
"""USGS instantaneous values for stream gauge readings with NWS flood stages."""
BASE_URL = "https://waterservices.usgs.gov/nwis/iv/"
NWPS_BASE_URL = "https://api.water.noaa.gov/nwps/v1/gauges"
def __init__(self, config: "USGSConfig"):
self._sites = config.sites or []
self._tick_interval = max(config.tick_seconds or 900, MIN_TICK_SECONDS)
self._flood_thresholds = getattr(config, "flood_thresholds", {}) or {}
self._last_tick = 0.0
self._events = []
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = False
# Site metadata cache (name, flood stages from NWPS)
self._site_metadata: dict[str, dict] = {}
if self._tick_interval < MIN_TICK_SECONDS:
logger.warning(
f"USGS tick_seconds {config.tick_seconds} below minimum, using {MIN_TICK_SECONDS}"
)
def tick(self) -> bool:
"""Execute one polling tick.
Returns:
True if data changed
"""
now = time.time()
# No sites configured
if not self._sites:
return False
# Check tick interval
if now - self._last_tick < self._tick_interval:
return False
self._last_tick = now
return self._fetch()
def _get_site_ids(self) -> list[str]:
"""Extract site IDs from config (handles both string and dict formats)."""
site_ids = []
for site in self._sites:
if isinstance(site, str):
site_ids.append(site)
elif isinstance(site, dict):
site_ids.append(site.get("id", ""))
elif hasattr(site, "id"):
site_ids.append(site.id)
return [s for s in site_ids if s]
def _lookup_nwps_stages(self, usgs_site_id: str) -> Optional[dict]:
"""Lookup flood stages from NWS National Water Prediction Service.
The NWPS API uses NWS gauge IDs which may differ from USGS site IDs.
We try a mapping lookup first, then fall back to direct lookup.
Returns:
dict with action_stage, flood_stage, moderate_flood_stage, major_flood_stage
or None if not available
"""
global _nwps_cache, _nwps_cache_time
# Check cache
now = time.time()
if usgs_site_id in _nwps_cache:
if now - _nwps_cache_time.get(usgs_site_id, 0) < NWPS_CACHE_TTL:
return _nwps_cache[usgs_site_id]
# Try to find NWS gauge ID from USGS site ID
# First, query USGS site info to get the NWS ID crosswalk
nws_gauge_id = self._usgs_to_nws_crosswalk(usgs_site_id)
if not nws_gauge_id:
# Fall back to using USGS ID directly (sometimes they match)
nws_gauge_id = usgs_site_id
# Query NWPS for flood stages
url = f"{self.NWPS_BASE_URL}/{nws_gauge_id}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
# Extract flood stages
stages = {}
flood_info = data.get("flood", {})
if "action" in flood_info:
stages["action_stage"] = flood_info["action"].get("stage")
if "minor" in flood_info:
stages["flood_stage"] = flood_info["minor"].get("stage")
if "moderate" in flood_info:
stages["moderate_flood_stage"] = flood_info["moderate"].get("stage")
if "major" in flood_info:
stages["major_flood_stage"] = flood_info["major"].get("stage")
# Also grab the official name if available
stages["nws_name"] = data.get("name", "")
stages["nws_gauge_id"] = nws_gauge_id
# Cache result
_nwps_cache[usgs_site_id] = stages
_nwps_cache_time[usgs_site_id] = now
logger.info(f"NWPS flood stages for {usgs_site_id}: {stages}")
return stages
except HTTPError as e:
if e.code == 404:
# No NWPS data for this gauge - cache the miss
_nwps_cache[usgs_site_id] = {}
_nwps_cache_time[usgs_site_id] = now
logger.debug(f"No NWPS data for gauge {usgs_site_id}")
else:
logger.warning(f"NWPS lookup failed for {usgs_site_id}: HTTP {e.code}")
return None
except Exception as e:
logger.warning(f"NWPS lookup error for {usgs_site_id}: {e}")
return None
def _usgs_to_nws_crosswalk(self, usgs_site_id: str) -> Optional[str]:
"""Try to find NWS gauge ID from USGS site ID.
The USGS provides a crosswalk in their site metadata, but it's not
always populated. This is a best-effort lookup.
"""
# Try USGS site service for metadata including NWS ID
url = f"https://waterservices.usgs.gov/nwis/site/?format=rdb&sites={usgs_site_id}&siteOutput=expanded"
try:
req = Request(url, headers={"User-Agent": "MeshAI/1.0"})
with urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
# Parse RDB format - look for NWS ID in the data
# This is a simplified parser; full implementation would be more robust
for line in content.split("\n"):
if line.startswith(usgs_site_id):
# NWS station ID is typically in column ~30ish
# This varies by USGS response format
pass
except Exception:
pass
return None
def lookup_site(self, site_id: str) -> dict:
"""Lookup site metadata for config UI auto-populate.
Returns:
{
"site_id": "13090500",
"name": "Snake River nr Twin Falls ID",
"lat": 42.xxx,
"lon": -114.xxx,
"flood_stages": {
"action_stage": 9.0,
"flood_stage": 10.5,
"moderate_flood_stage": 12.0,
"major_flood_stage": 14.0,
} or None
}
"""
result = {"site_id": site_id, "name": None, "lat": None, "lon": None, "flood_stages": None}
# Get USGS site info
params = {
"format": "json",
"sites": site_id,
"siteOutput": "expanded",
}
url = f"https://waterservices.usgs.gov/nwis/site/?{urlencode(params)}"
try:
req = Request(url, headers={"User-Agent": "MeshAI/1.0", "Accept": "application/json"})
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
sites = data.get("value", {}).get("timeSeries", [])
if not sites:
# Try alternate format
sites_list = data.get("value", {}).get("sites", [])
if sites_list:
site_info = sites_list[0]
result["name"] = site_info.get("siteName", "")
result["lat"] = site_info.get("geoLocation", {}).get("geogLocation", {}).get("latitude")
result["lon"] = site_info.get("geoLocation", {}).get("geogLocation", {}).get("longitude")
except Exception as e:
logger.warning(f"USGS site lookup failed for {site_id}: {e}")
# Get NWS flood stages
stages = self._lookup_nwps_stages(site_id)
if stages:
result["flood_stages"] = {
"action_stage": stages.get("action_stage"),
"flood_stage": stages.get("flood_stage"),
"moderate_flood_stage": stages.get("moderate_flood_stage"),
"major_flood_stage": stages.get("major_flood_stage"),
}
if stages.get("nws_name") and not result["name"]:
result["name"] = stages["nws_name"]
return result
def _fetch(self) -> bool:
"""Fetch instantaneous values from USGS Water Services.
Returns:
True if data changed
"""
site_ids = self._get_site_ids()
if not site_ids:
return False
params = {
"format": "json",
"sites": ",".join(site_ids),
"parameterCd": "00060,00065", # Streamflow (cfs) and Gage height (ft)
"siteStatus": "active",
}
url = f"{self.BASE_URL}?{urlencode(params)}"
headers = {
"User-Agent": "MeshAI/1.0 (stream gauge monitoring)",
"Accept": "application/json",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"USGS HTTP error: {e.code}")
self._last_error = f"HTTP {e.code}"
self._consecutive_errors += 1
return False
except URLError as e:
logger.warning(f"USGS connection error: {e.reason}")
self._last_error = str(e.reason)
self._consecutive_errors += 1
return False
except Exception as e:
logger.warning(f"USGS fetch error: {e}")
self._last_error = str(e)
self._consecutive_errors += 1
return False
# Parse response
new_events = []
now = time.time()
try:
time_series = data.get("value", {}).get("timeSeries", [])
for ts in time_series:
source_info = ts.get("sourceInfo", {})
variable = ts.get("variable", {})
values_list = ts.get("values", [])
# Extract site info
site_name = source_info.get("siteName", "Unknown Site")
site_codes = source_info.get("siteCode", [])
site_id = site_codes[0].get("value", "") if site_codes else ""
# Cache site name
if site_id and site_id not in self._site_metadata:
self._site_metadata[site_id] = {"name": site_name}
# Extract location
geo_loc = source_info.get("geoLocation", {}).get("geogLocation", {})
lat = geo_loc.get("latitude")
lon = geo_loc.get("longitude")
# Extract variable info
var_name = variable.get("variableName", "Unknown")
unit_info = variable.get("unit", {})
unit_code = unit_info.get("unitCode", "")
# Determine parameter type
if "Streamflow" in var_name or "00060" in str(variable.get("variableCode", [])):
param_type = "flow"
param_name = "Streamflow"
elif "Gage height" in var_name or "00065" in str(variable.get("variableCode", [])):
param_type = "height"
param_name = "Gage height"
else:
param_type = "other"
param_name = var_name
# Get current value (most recent)
if not values_list or not values_list[0].get("value"):
continue
value_entries = values_list[0].get("value", [])
if not value_entries:
continue
latest = value_entries[-1]
value_str = latest.get("value", "")
timestamp_str = latest.get("dateTime", "")
try:
value = float(value_str)
except (ValueError, TypeError):
continue
# Get flood stages for this site
nwps_stages = self._lookup_nwps_stages(site_id)
# Determine severity based on flood stages (for gage height)
severity = "info"
flood_status = None
if param_type == "height" and nwps_stages:
major = nwps_stages.get("major_flood_stage")
moderate = nwps_stages.get("moderate_flood_stage")
minor = nwps_stages.get("flood_stage")
action = nwps_stages.get("action_stage")
if major and value >= major:
severity = "critical"
flood_status = "Major Flood"
elif moderate and value >= moderate:
severity = "warning"
flood_status = "Moderate Flood"
elif minor and value >= minor:
severity = "warning"
flood_status = "Minor Flood"
elif action and value >= action:
severity = "advisory"
flood_status = "Action Stage"
# Fall back to legacy manual thresholds
if severity == "info":
threshold = self._flood_thresholds.get(site_id, {}).get(param_type)
if threshold and value > threshold:
severity = "warning"
# Format headline
if param_type == "flow":
headline = f"{site_name}: {value:,.0f} {unit_code}"
else:
headline = f"{site_name}: {value:.1f} {unit_code}"
if flood_status:
headline += f"{flood_status}"
event = {
"source": "usgs",
"event_id": f"{site_id}_{param_type}",
"event_type": "Stream Gauge",
"headline": headline,
"severity": severity,
"lat": lat,
"lon": lon,
"expires": now + 1800, # 30 min TTL
"fetched_at": now,
"properties": {
"site_id": site_id,
"site_name": site_name,
"parameter": param_name,
"value": value,
"unit": unit_code,
"timestamp": timestamp_str,
"flood_status": flood_status,
"flood_stages": nwps_stages if nwps_stages else None,
},
}
new_events.append(event)
except Exception as e:
logger.warning(f"USGS parse error: {e}")
self._last_error = f"Parse error: {e}"
self._consecutive_errors += 1
return False
# Check if data changed
old_ids = {e["event_id"] for e in self._events}
new_ids = {e["event_id"] for e in new_events}
changed = old_ids != new_ids or len(self._events) != len(new_events)
self._events = new_events
self._consecutive_errors = 0
self._last_error = None
self._is_loaded = True
if changed:
logger.info(f"USGS streams updated: {len(new_events)} readings from {len(site_ids)} sites")
return changed
def get_events(self) -> list:
"""Get current stream gauge events."""
return self._events
@property
def health_status(self) -> dict:
"""Get adapter health status."""
return {
"source": "usgs",
"is_loaded": self._is_loaded,
"last_error": str(self._last_error) if self._last_error else None,
"consecutive_errors": self._consecutive_errors,
"event_count": len(self._events),
"last_fetch": self._last_tick,
"site_count": len(self._get_site_ids()),
}

File diff suppressed because it is too large Load diff

View file

@ -27,6 +27,7 @@ from .mesh_models import (
)
from .sources.meshmonitor_data import MeshMonitorDataSource
from .sources.meshview import MeshviewSource
from .sources.mqtt_source import MQTTSource
logger = logging.getLogger(__name__)
@ -236,7 +237,7 @@ class MeshDataStore:
source_configs: List of source configurations
db_path: Path to SQLite database for historical data
"""
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource] = {}
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource | MQTTSource] = {}
self._db_path = db_path
self._db: Optional[sqlite3.Connection] = None
@ -316,6 +317,42 @@ class MeshDataStore:
)
logger.info(f"Registered MeshMonitor source '{name}' -> {url} (polite={polite})")
elif src_type == "mqtt":
# Extract MQTT-specific config
if isinstance(cfg, dict):
host = cfg.get('host', '')
port = cfg.get('port', 1883)
username = cfg.get('username', '')
password = cfg.get('password', '')
topic_root = cfg.get('topic_root', 'msh/US')
use_tls = cfg.get('use_tls', False)
else:
host = getattr(cfg, 'host', '')
port = getattr(cfg, 'port', 1883)
username = getattr(cfg, 'username', '')
password = getattr(cfg, 'password', '')
topic_root = getattr(cfg, 'topic_root', 'msh/US')
use_tls = getattr(cfg, 'use_tls', False)
if not host:
logger.warning(f"MQTT source '{name}' missing host, skipping")
return
self._sources[name] = MQTTSource(
host=host,
port=port,
username=username,
password=password,
topic_root=topic_root,
use_tls=use_tls,
name=name,
)
# Track MQTT sources separately for async start
if not hasattr(self, '_mqtt_sources'):
self._mqtt_sources = []
self._mqtt_sources.append(name)
logger.info(f"Registered MQTT source '{name}' -> {host}:{port} topic={topic_root}")
else:
logger.warning(f"Unknown source type '{src_type}' for '{name}'")
@ -359,6 +396,24 @@ class MeshDataStore:
# =========================================================================
async def start_mqtt_sources(self) -> None:
"""Start all MQTT source subscription loops."""
if not hasattr(self, '_mqtt_sources'):
return
for name in self._mqtt_sources:
source = self._sources.get(name)
if source and hasattr(source, 'start'):
await source.start()
async def stop_mqtt_sources(self) -> None:
"""Stop all MQTT source subscription loops."""
if not hasattr(self, '_mqtt_sources'):
return
for name in self._mqtt_sources:
source = self._sources.get(name)
if source and hasattr(source, 'stop'):
await source.stop()
def _purge_stale_nodes(self):
"""Remove nodes not heard from in more than 7 days.
@ -690,9 +745,11 @@ class MeshDataStore:
node.last_heard = ts or 0.0
# Is online (computed from last_heard)
now = time.time()
node.is_online = (now - node.last_heard) < 86400 if node.last_heard else False
# NOTE: is_online is set by MeshHealthEngine.compute() using the
# configured offline_threshold_hours. Don't set it here with a
# hardcoded value - let the health engine determine online status.
# The health engine runs on every refresh cycle and will set is_online
# based on: (now - last_heard) < (offline_threshold_hours * 3600)
# Hops, SNR, RSSI (MM)
node.hops_away = raw.get("hopsAway")
@ -2120,11 +2177,19 @@ class MeshDataStore:
"""Get status of all sources."""
status_list = []
for name, source in self._sources.items():
# Determine source type
if isinstance(source, MeshviewSource):
src_type = "meshview"
elif isinstance(source, MeshMonitorDataSource):
src_type = "meshmonitor"
elif isinstance(source, MQTTSource):
src_type = "mqtt"
else:
src_type = "unknown"
status = {
"name": name,
"type": "meshview"
if isinstance(source, MeshviewSource)
else "meshmonitor",
"type": src_type,
"enabled": True,
"is_loaded": source.is_loaded,
"last_refresh": source.last_refresh,
@ -2138,6 +2203,14 @@ class MeshDataStore:
status["telemetry_count"] = len(source.telemetry)
status["traceroute_count"] = len(source.traceroutes)
status["channel_count"] = len(source.channels)
elif isinstance(source, MQTTSource):
health = source.health_status
status["is_connected"] = health.get("is_connected", False)
status["message_count"] = health.get("message_count", 0)
status["last_message"] = health.get("last_message", 0)
status["host"] = health.get("host", "")
status["port"] = health.get("port", 0)
status["topic_root"] = health.get("topic_root", "")
status_list.append(status)

View file

@ -26,15 +26,19 @@ INFRASTRUCTURE_ROLES = {"ROUTER", "ROUTER_LATE", "ROUTER_CLIENT"}
# Default thresholds
DEFAULT_LOCALITY_RADIUS_MILES = 8.0
DEFAULT_OFFLINE_THRESHOLD_HOURS = 24
DEFAULT_PACKET_THRESHOLD = 500 # Non-text packets per 24h
DEFAULT_BATTERY_WARNING_PERCENT = 20
DEFAULT_OFFLINE_THRESHOLD_HOURS = 2 # Hours before node considered offline
DEFAULT_PACKET_THRESHOLD = 7200 # Non-text packets per 24h (5/min avg)
# NOTE: This is aligned with notification config's packet_flood threshold.
# 5 packets/min avg × 60 min × 24 hr = 7,200 packets/day.
# A node averaging 5+ non-text packets/min is misbehaving.
DEFAULT_BATTERY_WARNING_PERCENT = 30 # Battery level to warn (30% gives time to respond)
# Utilization thresholds (percentage)
UTIL_HEALTHY = 15
UTIL_CAUTION = 20
UTIL_WARNING = 25
UTIL_UNHEALTHY = 35
# Utilization thresholds (percentage) - based on real Meshtastic behavior
# Firmware starts throttling GPS at 25%, severe degradation above 35%
UTIL_HEALTHY = 20 # Under 20% = channel is clear
UTIL_CAUTION = 25 # 20-25% = slight degradation, occasional collisions
UTIL_WARNING = 35 # 25-35% = severe degradation, firmware throttling
UTIL_UNHEALTHY = 45 # 35-45% = mesh struggling badly, reliability dropping
# Pillar weights (5-pillar system)
WEIGHT_INFRASTRUCTURE = 0.30
@ -58,6 +62,9 @@ class HealthScore:
infra_online: int = 0
infra_total: int = 0
util_percent: float = 0.0
util_max_percent: float = 0.0 # Highest node utilization (hotspot indicator)
util_method: str = "none" # "telemetry", "packet_estimate", or "none"
util_node_count: int = 0 # Nodes reporting utilization
coverage_avg_gateways: float = 0.0
coverage_single_gw_count: int = 0
coverage_full_count: int = 0
@ -486,10 +493,19 @@ class MeshHealthEngine:
data_sources.append(f"{len(all_channels)} ch")
data_str = ", ".join(data_sources) if data_sources else "nodes only"
# Log utilization method used
util_method = mesh_score.util_method
if util_method == "telemetry":
util_info = f"util={mesh_score.util_percent:.1f}% (max={mesh_score.util_max_percent:.1f}%, {mesh_score.util_node_count} nodes reporting)"
elif util_method == "packet_estimate":
util_info = f"util={mesh_score.util_percent:.1f}% (packet estimate fallback)"
else:
util_info = "util=N/A (no data)"
logger.info(
f"Mesh health computed: {mesh_health.total_nodes} nodes, "
f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100 "
f"[{data_str}]"
f"[{data_str}] [{util_info}]"
)
return mesh_health
@ -541,6 +557,31 @@ class MeshHealthEngine:
all_nodes = list(nodes.values())
return self._compute_node_group_score(all_nodes, has_packet_data)
def _compute_utilization_score(self, util_percent: float) -> float:
"""Convert utilization percentage to health score using thresholds.
Thresholds based on real Meshtastic behavior:
- Under 20%: Clear channel (score 100)
- 20-25%: Slight degradation (score 75-100)
- 25-35%: Severe degradation, firmware throttling (score 50-75)
- 35-45%: Mesh struggling badly (score 25-50)
- Over 45%: Mesh effectively dead (score 0-25)
"""
if util_percent < UTIL_HEALTHY: # <20%
return 100.0
elif util_percent < UTIL_CAUTION: # 20-25%
# Interpolate from 100 to 75
return 100.0 - ((util_percent - UTIL_HEALTHY) / (UTIL_CAUTION - UTIL_HEALTHY)) * 25
elif util_percent < UTIL_WARNING: # 25-35%
# Interpolate from 75 to 50
return 75.0 - ((util_percent - UTIL_CAUTION) / (UTIL_WARNING - UTIL_CAUTION)) * 25
elif util_percent < UTIL_UNHEALTHY: # 35-45%
# Interpolate from 50 to 25
return 50.0 - ((util_percent - UTIL_WARNING) / (UTIL_UNHEALTHY - UTIL_WARNING)) * 25
else: # 45%+
# Interpolate from 25 to 0 over next 10%
return max(0.0, 25.0 - ((util_percent - UTIL_UNHEALTHY) / 10) * 25)
def _compute_node_group_score(
self,
node_list: list[UnifiedNode],
@ -568,33 +609,84 @@ class MeshHealthEngine:
else:
infra_score = 100.0 # No infrastructure = not penalized
# Channel utilization (based on packet counts if available)
# BUG 7 FIX: Use actual Meshtastic airtime calculation
if has_packet_data:
# Channel utilization - prefer real telemetry over packet estimate
#
# Priority 1: Use firmware-reported channel_utilization from nodes
# This is the most accurate measure - the firmware calculates this
# from actual radio activity over the last minute.
#
# Priority 2: Fall back to packet count estimate if no telemetry
# This is a rough approximation using 200ms/packet (MediumFast preset).
# It's less accurate because different presets have different airtime,
# and it sums packets across all nodes regardless of channel.
util_percent = 0.0
util_max_percent = 0.0
util_score = 100.0
util_method = "none"
util_node_count = 0
util_data_available = False
# Try to get real channel_utilization from infrastructure nodes
# Use infrastructure nodes because they're the routers - they see the most traffic
util_readings = []
for n in infra_nodes:
if n.channel_utilization is not None and n.channel_utilization >= 0:
util_readings.append(n.channel_utilization)
# If no infra nodes have it, try all nodes
if not util_readings:
for n in node_list:
if n.channel_utilization is not None and n.channel_utilization >= 0:
util_readings.append(n.channel_utilization)
if util_readings:
# Use the HIGHEST value - the busiest node is the bottleneck
# If one router is at 45% utilization, the mesh has a problem
# even if other nodes are at 10%
util_max_percent = max(util_readings)
util_percent = util_max_percent # Use max for scoring
util_score = self._compute_utilization_score(util_percent)
util_method = "telemetry"
util_node_count = len(util_readings)
util_data_available = True
# Also compute average for informational purposes
# (stored in util_percent, max in util_max_percent)
# Actually, use max for the score since that's the bottleneck
elif has_packet_data:
# Fallback: Estimate from packet counts
# This is a rough approximation - only use when telemetry unavailable
#
# WARNING: This method has known issues:
# - Assumes 200ms airtime per packet (only correct for MediumFast)
# - Sums packets across all nodes even on different channels
# - Can't distinguish retries from new packets
# Use real channel_utilization from telemetry when available.
total_non_text_packets = sum((n.packets_sent_24h - n.text_messages_24h) for n in node_list)
# Average airtime per packet on MediumFast: ~200ms
# Total available airtime per hour: 3,600,000ms
# Utilization = (packets_per_hour * airtime_ms) / total_airtime_ms * 100
packets_per_hour = total_non_text_packets / 24.0 # 24h window
airtime_per_packet_ms = 200 # ~200ms on MediumFast preset
util_percent = (packets_per_hour * airtime_per_packet_ms) / 3_600_000 * 100
util_max_percent = util_percent # No per-node data available
util_score = self._compute_utilization_score(util_percent)
util_method = "packet_estimate"
util_node_count = 0
util_data_available = True
# Apply scoring thresholds with interpolation
if util_percent < UTIL_HEALTHY: # <15%
util_score = 100.0
elif util_percent < UTIL_CAUTION: # 15-20%
util_score = 100.0 - ((util_percent - UTIL_HEALTHY) / (UTIL_CAUTION - UTIL_HEALTHY)) * 25
elif util_percent < UTIL_WARNING: # 20-25%
util_score = 75.0 - ((util_percent - UTIL_CAUTION) / (UTIL_WARNING - UTIL_CAUTION)) * 25
elif util_percent < UTIL_UNHEALTHY: # 25-35%
util_score = 50.0 - ((util_percent - UTIL_WARNING) / (UTIL_UNHEALTHY - UTIL_WARNING)) * 25
else: # 35%+
util_score = max(0.0, 25.0 - ((util_percent - UTIL_UNHEALTHY) / 10) * 25)
logger.debug(
f"Utilization using packet estimate fallback: {util_percent:.1f}% "
f"({total_non_text_packets} non-text packets/24h)"
)
else:
# No packet data available - assume healthy utilization
# This prevents penalizing the score when we simply don't have data
# No utilization data available - don't penalize
util_percent = 0.0
util_max_percent = 0.0
util_score = 100.0
util_method = "none"
util_node_count = 0
util_data_available = False
# Node behavior (flagged nodes)
flagged = [n for n in node_list if (n.packets_sent_24h - n.text_messages_24h) > self.packet_threshold]
@ -674,13 +766,16 @@ class MeshHealthEngine:
infra_online=infra_online,
infra_total=infra_total,
util_percent=util_percent,
util_max_percent=util_max_percent,
util_method=util_method,
util_node_count=util_node_count,
coverage_avg_gateways=coverage_avg_gw,
coverage_single_gw_count=coverage_single,
coverage_full_count=coverage_full,
flagged_nodes=flagged_count,
battery_warnings=battery_warnings,
solar_index=solar_index,
util_data_available=has_packet_data,
util_data_available=util_data_available,
coverage_data_available=coverage_available,
)

View file

@ -1,139 +1,218 @@
"""Alert category registry.
Defines all alertable conditions with human-readable names and descriptions.
Defines all alertable conditions with human-readable names, descriptions,
and example messages showing what users will receive.
"""
ALERT_CATEGORIES = {
# Infrastructure alerts
"infra_offline": {
"name": "Infrastructure Offline",
"description": "An infrastructure node stopped responding",
"name": "Infrastructure Node Offline",
"description": "An infrastructure node (router/repeater) stopped responding",
"default_severity": "warning",
"example_message": "⚠ Infrastructure Offline: MHR — Mountain Harrison Rptr has not been heard for 2 hours",
},
"critical_node_down": {
"name": "Critical Node Down",
"description": "A node marked as critical went offline",
"default_severity": "critical",
"description": "A node you marked as critical went offline",
"default_severity": "warning",
"example_message": "🚨 Critical Node Down: HPR — Hayden Peak Rptr offline for 1 hour",
},
"infra_recovery": {
"name": "Infrastructure Recovery",
"description": "An infrastructure node came back online",
"description": "An offline infrastructure node came back online",
"default_severity": "info",
"example_message": "✅ Recovery: MHR — Mountain Harrison Rptr back online after 2h outage",
},
"new_router": {
"name": "New Router",
"description": "A new router appeared on the mesh",
"default_severity": "info",
"example_message": "📡 New Router: Snake River Relay appeared in Wood River Valley",
},
# Power alerts
"battery_warning": {
"name": "Battery Warning",
"description": "Infrastructure node battery below warning threshold",
"default_severity": "warning",
"description": "Infrastructure node battery below 30% (3.60V)",
"default_severity": "advisory",
"example_message": "🔋 Battery Warning: BLD-MTN at 28% (3.58V), solar not charging",
},
"battery_critical": {
"name": "Battery Critical",
"description": "Infrastructure node battery below critical threshold",
"default_severity": "critical",
"description": "Infrastructure node battery below 15% (3.50V)",
"default_severity": "warning",
"example_message": "🔋 Battery Critical: BLD-MTN at 12% (3.48V) — shutdown in hours",
},
"battery_emergency": {
"name": "Battery Emergency",
"description": "Infrastructure node battery critically low",
"default_severity": "emergency",
"description": "Infrastructure node battery below 5% (3.40V) — shutdown imminent",
"default_severity": "critical",
"example_message": "🚨 Battery Emergency: BLD-MTN at 4% (3.38V) — shutdown imminent",
},
"battery_trend": {
"name": "Battery Declining",
"description": "Battery showing declining trend over 7 days",
"default_severity": "warning",
"description": "Battery showing declining trend over 7 days — possible solar or charging issue",
"default_severity": "advisory",
"example_message": "🔋 Battery Trend: HPR declining 85% → 62% over 7 days (-3.3%/day)",
},
"power_source_change": {
"name": "Power Source Change",
"description": "Node switched from USB to battery (possible outage)",
"description": "Node switched from USB to battery — possible power outage at site",
"default_severity": "warning",
"example_message": "⚡ Power Source: MHR switched from USB to battery — possible outage",
},
"solar_not_charging": {
"name": "Solar Not Charging",
"description": "Solar panel not charging during daylight hours",
"description": "Solar panel not charging during daylight hours — panel issue or obstruction",
"default_severity": "warning",
"example_message": "☀️ Solar Issue: BLD-MTN not charging during daylight (12:00 MDT)",
},
# Utilization alerts
"high_utilization": {
"name": "Channel Airtime High",
"description": "LoRa channel airtime exceeding threshold — mesh congestion",
"default_severity": "advisory",
"example_message": "📊 Channel Airtime: 47% utilization (threshold: 40%). Reliability may degrade.",
},
"sustained_high_util": {
"name": "High Utilization",
"description": "Channel utilization elevated for extended period",
"name": "Sustained High Utilization",
"description": "Channel airtime elevated for extended period — ongoing congestion",
"default_severity": "warning",
"example_message": "📊 Sustained Congestion: 45% channel utilization for 2+ hours. Consider reducing telemetry.",
},
"packet_flood": {
"name": "Packet Flood",
"description": "Node sending excessive packets",
"description": "A single node sending excessive radio packets (NOT water flooding) — possible firmware bug or stuck transmitter",
"default_severity": "warning",
"example_message": "📻 Packet Flood: Node 'BKBS' transmitting 42 packets/min (threshold: 10/min). Firmware bug?",
},
# Coverage alerts
"infra_single_gateway": {
"name": "Single Gateway",
"description": "Infrastructure node dropped to single gateway coverage",
"default_severity": "warning",
"description": "Infrastructure node dropped to single gateway coverage — reduced redundancy",
"default_severity": "advisory",
"example_message": "📶 Reduced Coverage: HPR dropped to single gateway. Previously had 3 paths.",
},
"feeder_offline": {
"name": "Feeder Offline",
"description": "A feeder gateway stopped responding",
"description": "A feeder gateway stopped responding — coverage gap possible",
"default_severity": "warning",
"example_message": "📡 Feeder Offline: AIDA-N2 gateway not responding. 5 nodes may lose uplink.",
},
"region_total_blackout": {
"name": "Region Blackout",
"description": "All infrastructure in a region is offline",
"default_severity": "emergency",
"description": "All infrastructure in a region is offline — complete coverage loss",
"default_severity": "critical",
"example_message": "🚨 REGION BLACKOUT: All infrastructure in Magic Valley offline!",
},
# Health score alerts
"mesh_score_low": {
"name": "Mesh Health Low",
"description": "Overall mesh health score below threshold",
"description": "Overall mesh health score dropped below threshold — multiple issues likely",
"default_severity": "warning",
"example_message": "📉 Mesh Health: Score 62/100 (threshold: 65). Infrastructure: 71, Connectivity: 58.",
},
"region_score_low": {
"name": "Region Health Low",
"description": "A region's health score below threshold",
"description": "A region's health score below threshold — localized issues",
"default_severity": "warning",
"example_message": "📉 Region Health: Magic Valley at 55/100 (threshold: 60). 2 nodes offline.",
},
# Environmental alerts
# Environmental - Weather
"weather_warning": {
"name": "Severe Weather",
"description": "NWS warning or advisory for mesh area",
"description": "NWS warning or advisory affecting your mesh area",
"default_severity": "warning",
"example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z",
},
# Environmental - Space Weather
"hf_blackout": {
"name": "HF Radio Blackout",
"description": "R3+ solar event degrading HF propagation",
"description": "R3+ solar flare degrading HF propagation on sunlit side",
"default_severity": "warning",
"example_message": "⚠ R3 Strong Radio Blackout — X1.2 flare. Wide-area HF blackout ~1 hour on sunlit side.",
},
"geomagnetic_storm": {
"name": "Geomagnetic Storm",
"description": "G2+ geomagnetic storm — HF degraded at higher latitudes, aurora possible",
"default_severity": "advisory",
"example_message": "🌐 G2 Moderate Geomagnetic Storm — Kp=6. HF fades at high latitudes, aurora to ~55°.",
},
# Environmental - Tropospheric
"tropospheric_ducting": {
"name": "Tropospheric Ducting",
"description": "Atmospheric conditions extending VHF/UHF range",
"description": "Atmospheric conditions trapping VHF/UHF signals — extended range",
"default_severity": "info",
"example_message": "📡 Tropospheric Ducting: Surface duct detected, dM/dz -45 M-units/km, ~120m thick. VHF/UHF extended range.",
},
# Environmental - Fire
"fire_proximity": {
"name": "Fire Near Mesh",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "warning",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR. Monitor closely.",
},
"wildfire_proximity": {
"name": "Fire Near Mesh",
"description": "Wildfire detected within configured distance",
"description": "Active wildfire within alert radius of mesh infrastructure",
"default_severity": "warning",
"example_message": "🔥 Fire Near Mesh: Rock Creek Fire — 1,240 ac, 15% contained, 12 km SSW of MHR.",
},
"new_ignition": {
"name": "New Fire Ignition",
"description": "Satellite hotspot not matching any known fire",
"default_severity": "warning",
"description": "Satellite hotspot detected NOT near any known fire — potential new wildfire",
"default_severity": "watch",
"example_message": "🛰 New Ignition: Satellite fire at 42.32°N, 114.30°W — high confidence, 47 MW FRP. Not near any known fire.",
},
"flood_warning": {
"name": "Flood Warning",
"description": "Stream gauge exceeds flood threshold",
# Environmental - Flood
"stream_flood_warning": {
"name": "Stream Flood Warning",
"description": "River gauge exceeds NWS flood stage threshold",
"default_severity": "warning",
"example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft — Minor Flood Stage is 10.5 ft.",
},
"stream_high_water": {
"name": "Stream High Water",
"description": "River gauge approaching flood stage — monitoring recommended",
"default_severity": "advisory",
"example_message": "🌊 High Water: Snake River at 9.8 ft — Action Stage is 9.0 ft. Monitor conditions.",
},
# Environmental - Roads
"road_closure": {
"name": "Road Closure",
"description": "Full road closure on monitored corridor",
"description": "Full road closure on a monitored corridor",
"default_severity": "warning",
"example_message": "🚧 Road Closure: I-84 EB at MP 173 — full closure, construction. Detour via US-30.",
},
"traffic_congestion": {
"name": "Traffic Congestion",
"description": "Traffic speed dropped below congestion threshold on a monitored corridor",
"default_severity": "advisory",
"example_message": "🚗 Traffic Congestion: I-84 Twin Falls — 35 mph (free-flow 70 mph), 50% speed ratio",
},
# Environmental - Avalanche
"avalanche_warning": {
"name": "Avalanche Danger High",
"description": "Avalanche danger level 4 (High) or 5 (Extreme) in your area",
"default_severity": "warning",
"example_message": "⛷ Avalanche Danger HIGH: Sawtooth Zone — avoid avalanche terrain. Natural avalanches likely.",
},
"avalanche_considerable": {
"name": "Avalanche Danger Considerable",
"description": "Avalanche danger level 3 (Considerable) — most fatalities occur at this level",
"default_severity": "watch",
"example_message": "⛷ Avalanche Danger CONSIDERABLE: Sawtooth Zone — dangerous conditions on steep slopes.",
},
}
@ -146,6 +225,7 @@ def get_category(category_id: str) -> dict:
"name": category_id.replace("_", " ").title(),
"description": f"Alert type: {category_id}",
"default_severity": "info",
"example_message": f"Alert: {category_id}",
}

View file

@ -94,7 +94,7 @@ _ENV_KEYWORDS = {
"solar", "hf", "propagation", "kp", "aurora", "blackout",
"flood", "stream", "river", "ducting", "tropo", "duct",
"uhf", "vhf", "band", "conditions", "forecast", "sfi",
"ionosphere", "geomagnetic", "storm",
"ionosphere", "geomagnetic", "storm", "traffic", "highway", "interstate", "gauge",
}
# City name to region mapping (hardcoded fallback)