diff --git a/dashboard-frontend/src/pages/Reference.tsx b/dashboard-frontend/src/pages/Reference.tsx index 6b459bc..43c825a 100644 --- a/dashboard-frontend/src/pages/Reference.tsx +++ b/dashboard-frontend/src/pages/Reference.tsx @@ -746,27 +746,32 @@ export default function Reference() { Utilization (25%)

- Estimates how much of the radio channel's airtime is being used. MeshAI can't measure airtime directly, so it estimates based on packet counts over the last 24 hours. -

-

- packets_per_hour = non_text_packets ÷ 24
- airtime_estimate = (packets_per_hour × 200ms) ÷ 3,600,000ms × 100% + MeshAI reads the channel utilization that each router reports in its telemetry — this is the firmware's own measurement of how busy the radio channel is. MeshAI uses the highest value from any infrastructure node because the busiest router is the bottleneck for the whole mesh.

- The 200ms is an approximation for the MediumFast radio preset — each LoRa packet takes roughly 200ms of airtime. Text messages don't count toward utilization (chatting is the point of a mesh). + How it works: +

+
    +
  1. Collect channel_utilization from all infrastructure nodes that report it
  2. +
  3. If no infra nodes have telemetry, try all nodes
  4. +
  5. Use the maximum value for scoring (busiest node = bottleneck)
  6. +
  7. If no nodes report utilization (older firmware), fall back to packet count estimate
  8. +
+

+ Fallback method (when telemetry unavailable): estimates from packet counts using 200ms/packet airtime. This is less accurate — it assumes MediumFast preset and sums packets across all nodes.

- Special case: If MeshAI doesn't have packet data (no sources reporting packet counts), this pillar scores 100. You're not penalized for missing data. + Special case: If no utilization data is available (no telemetry and no packet data), this pillar scores 100. You're not penalized for missing data.

Coverage (20%) diff --git a/meshai/dashboard/api/mesh_routes.py b/meshai/dashboard/api/mesh_routes.py index c5eed0b..e5c9e2d 100644 --- a/meshai/dashboard/api/mesh_routes.py +++ b/meshai/dashboard/api/mesh_routes.py @@ -1,403 +1,409 @@ -"""Mesh health and node API routes.""" - -from datetime import datetime -from typing import Optional - -from fastapi import APIRouter, HTTPException, Request - -router = APIRouter(tags=["mesh"]) - - -def _serialize_health_score(score) -> dict: - """Serialize a HealthScore object.""" - return { - "composite": round(score.composite, 1), - "tier": score.tier, - "infrastructure": round(score.infrastructure, 1), - "utilization": round(score.utilization, 1), - "behavior": round(score.behavior, 1), - "power": round(score.power, 1), - "infra_online": score.infra_online, - "infra_total": score.infra_total, - "util_percent": round(score.util_percent, 1), - "flagged_nodes": score.flagged_nodes, - "battery_warnings": score.battery_warnings, - "solar_index": round(score.solar_index, 1), - } - - -def _serialize_region(region) -> dict: - """Serialize a RegionHealth object.""" - return { - "name": region.name, - "center_lat": region.center_lat, - "center_lon": region.center_lon, - "node_count": len(region.node_ids), - "locality_count": len(region.localities), - "score": _serialize_health_score(region.score), - "node_ids": region.node_ids, - } - - -def _format_timestamp(ts: Optional[float]) -> Optional[str]: - """Format a Unix timestamp as ISO string.""" - if not ts or ts <= 0: - return None - try: - return datetime.fromtimestamp(ts).isoformat() - except (ValueError, OSError): - return None - - -@router.get("/health") -async def get_health(request: Request): - """Get mesh health data.""" - health_engine = request.app.state.health_engine - - if not health_engine or not health_engine.mesh_health: - return { - "score": 0, - "tier": "Unknown", - "message": "Health engine not ready", - } - - health = health_engine.mesh_health - score = health.score - - return { - "score": round(score.composite, 1), - "tier": score.tier, - "pillars": { - "infrastructure": round(score.infrastructure, 1), - "utilization": round(score.utilization, 1), - "behavior": round(score.behavior, 1), - "power": round(score.power, 1), - }, - "infra_online": score.infra_online, - "infra_total": score.infra_total, - "util_percent": round(score.util_percent, 1), - "flagged_nodes": score.flagged_nodes, - "battery_warnings": score.battery_warnings, - "total_nodes": health.total_nodes, - "total_regions": health.total_regions, - "unlocated_count": len(health.unlocated_nodes), - "last_computed": _format_timestamp(health.last_computed), - "recommendations": [], # TODO: Add recommendations - } - - -@router.get("/nodes") -async def get_nodes(request: Request): - """Get all nodes.""" - data_store = request.app.state.data_store - health_engine = request.app.state.health_engine - - if not data_store: - return [] - - try: - raw_nodes = data_store.get_all_nodes() - except Exception: - return [] - - nodes = [] - for node in raw_nodes: - # Extract node_num from various formats - node_num = node.get("nodeNum") or node.get("num") or node.get("node_num") - if node_num is None: - node_id = node.get("node_id") or node.get("id") - if node_id and isinstance(node_id, str): - try: - node_num = int(node_id.lstrip("!"), 16) - except ValueError: - continue - - if node_num is None: - continue - - # Get health data if available - health_data = {} - if health_engine and health_engine.mesh_health: - node_health = health_engine.mesh_health.nodes.get(str(node_num)) - if node_health: - health_data = { - "region": node_health.region, - "locality": node_health.locality, - "is_infrastructure": node_health.is_infrastructure, - "is_online": node_health.is_online, - "packet_count_24h": node_health.packet_count_24h, - } - - # Build node dict - node_dict = { - "node_num": node_num, - "node_id_hex": f"!{node_num:08x}", - "short_name": node.get("shortName") or node.get("short_name") or "", - "long_name": node.get("longName") or node.get("long_name") or "", - "role": node.get("role") or "", - "latitude": node.get("latitude"), - "longitude": node.get("longitude"), - "last_heard": _format_timestamp(node.get("last_heard")), - "battery_level": node.get("battery_level") or node.get("batteryLevel"), - "voltage": node.get("voltage"), - "snr": node.get("snr"), - "firmware": node.get("firmware_version") or node.get("firmwareVersion") or "", - "hardware": node.get("hw_model") or node.get("hwModel") or "", - "uptime": node.get("uptime_seconds") or node.get("uptimeSeconds"), - "sources": node.get("_sources", []), - **health_data, - } - nodes.append(node_dict) - - return nodes - - -@router.get("/nodes/{node_num}") -async def get_node_detail(node_num: int, request: Request): - """Get detailed info for a specific node.""" - data_store = request.app.state.data_store - health_engine = request.app.state.health_engine - - if not data_store: - raise HTTPException(status_code=404, detail="Data store not available") - - # Find the node - try: - raw_nodes = data_store.get_all_nodes() - except Exception: - raise HTTPException(status_code=500, detail="Failed to fetch nodes") - - target_node = None - for node in raw_nodes: - n_num = node.get("nodeNum") or node.get("num") or node.get("node_num") - if n_num is None: - node_id = node.get("node_id") or node.get("id") - if node_id and isinstance(node_id, str): - try: - n_num = int(node_id.lstrip("!"), 16) - except ValueError: - continue - - if n_num == node_num: - target_node = node - break - - if not target_node: - raise HTTPException(status_code=404, detail=f"Node {node_num} not found") - - # Get health data - health_data = {} - if health_engine and health_engine.mesh_health: - node_health = health_engine.mesh_health.nodes.get(str(node_num)) - if node_health: - health_data = { - "region": node_health.region, - "locality": node_health.locality, - "is_infrastructure": node_health.is_infrastructure, - "is_online": node_health.is_online, - "packet_count_24h": node_health.packet_count_24h, - "text_packet_count_24h": node_health.text_packet_count_24h, - "non_text_packets": node_health.non_text_packets, - "has_solar": node_health.has_solar, - } - - # Get neighbors from edges - neighbors = [] - try: - edges = data_store.get_all_edges() - for edge in edges: - from_num = edge.get("from_node") or edge.get("from") - to_num = edge.get("to_node") or edge.get("to") - - if from_num == node_num: - neighbors.append({ - "node_num": to_num, - "snr": edge.get("snr"), - }) - elif to_num == node_num: - neighbors.append({ - "node_num": from_num, - "snr": edge.get("snr"), - }) - except Exception: - pass - - return { - "node_num": node_num, - "node_id_hex": f"!{node_num:08x}", - "short_name": target_node.get("shortName") or target_node.get("short_name") or "", - "long_name": target_node.get("longName") or target_node.get("long_name") or "", - "role": target_node.get("role") or "", - "latitude": target_node.get("latitude"), - "longitude": target_node.get("longitude"), - "last_heard": _format_timestamp(target_node.get("last_heard")), - "battery_level": target_node.get("battery_level") or target_node.get("batteryLevel"), - "voltage": target_node.get("voltage"), - "snr": target_node.get("snr"), - "firmware": target_node.get("firmware_version") or target_node.get("firmwareVersion") or "", - "hardware": target_node.get("hw_model") or target_node.get("hwModel") or "", - "uptime": target_node.get("uptime_seconds") or target_node.get("uptimeSeconds"), - "sources": target_node.get("_sources", []), - "neighbors": neighbors, - **health_data, - } - - -@router.get("/regions") -async def get_regions(request: Request): - """Get region summaries.""" - health_engine = request.app.state.health_engine - - if not health_engine or not health_engine.mesh_health: - return [] - - regions = [] - for region in health_engine.mesh_health.regions: - # Count online infrastructure - infra_online = 0 - infra_total = 0 - online_count = 0 - - for nid in region.node_ids: - node = health_engine.mesh_health.nodes.get(nid) - if node: - if node.is_online: - online_count += 1 - if node.is_infrastructure: - infra_total += 1 - if node.is_online: - infra_online += 1 - - regions.append({ - "name": region.name, - "local_name": region.name, # Could be overridden by region_labels - "node_count": len(region.node_ids), - "infra_count": infra_total, - "infra_online": infra_online, - "online_count": online_count, - "score": round(region.score.composite, 1), - "tier": region.score.tier, - "center_lat": region.center_lat, - "center_lon": region.center_lon, - }) - - return regions - - -@router.get("/sources") -async def get_sources(request: Request): - """Get per-source health information.""" - data_store = request.app.state.data_store - - if not data_store: - return [] - - sources = [] - try: - for name, source in data_store._sources.items(): - source_info = { - "name": name, - "type": "meshview" if hasattr(source, "edges") else "meshmonitor", - "url": getattr(source, "url", ""), - "is_loaded": source.is_loaded, - "last_error": source.last_error, - "consecutive_errors": getattr(source, "consecutive_errors", 0), - "response_time_ms": getattr(source, "last_response_time_ms", None), - "tick_count": getattr(source, "tick_count", 0), - "node_count": len(source.nodes) if hasattr(source, "nodes") else 0, - } - sources.append(source_info) - except Exception: - pass - - return sources - - -@router.get("/edges") -async def get_edges(request: Request): - """Get neighbor/edge relationships.""" - data_store = request.app.state.data_store - - if not data_store: - return [] - - try: - raw_edges = data_store.get_all_edges() - except Exception: - return [] - - edges = [] - for edge in raw_edges: - from_num = edge.get("from_node") or edge.get("from") - to_num = edge.get("to_node") or edge.get("to") - snr = edge.get("snr") - - # Derive quality from SNR - if snr is None: - quality = "unknown" - elif snr > 12: - quality = "excellent" - elif snr > 8: - quality = "good" - elif snr > 5: - quality = "fair" - elif snr > 3: - quality = "marginal" - else: - quality = "poor" - - edges.append({ - "from_node": from_num, - "to_node": to_num, - "snr": snr, - "quality": quality, - }) - - return edges - - - -@router.get("/channels") -async def get_channels(request: Request): - """Get radio channels from the connected Meshtastic interface.""" - connector = getattr(request.app.state, "connector", None) - - if not connector or not connector.connected: - return [] - - try: - interface = connector._interface - if not interface or not hasattr(interface, "localNode"): - return [] - - local_node = interface.localNode - if not local_node or not hasattr(local_node, "channels"): - return [] - - channels = [] - for ch in local_node.channels: - if ch is None: - continue - - # Get channel settings - settings = getattr(ch, "settings", None) - name = getattr(settings, "name", "") if settings else "" - role_val = getattr(ch, "role", 0) - - # Map role enum to string - role_map = {0: "DISABLED", 1: "PRIMARY", 2: "SECONDARY"} - role = role_map.get(role_val, "UNKNOWN") - - channels.append({ - "index": ch.index, - "name": name or f"Channel {ch.index}", - "role": role, - "enabled": role_val != 0, - }) - - return channels - - except Exception as e: - import logging - logging.getLogger(__name__).warning(f"Failed to get channels: {e}") - return [] +"""Mesh health and node API routes.""" + +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, HTTPException, Request + +router = APIRouter(tags=["mesh"]) + + +def _serialize_health_score(score) -> dict: + """Serialize a HealthScore object.""" + return { + "composite": round(score.composite, 1), + "tier": score.tier, + "infrastructure": round(score.infrastructure, 1), + "utilization": round(score.utilization, 1), + "behavior": round(score.behavior, 1), + "power": round(score.power, 1), + "infra_online": score.infra_online, + "infra_total": score.infra_total, + "util_percent": round(score.util_percent, 1), + "util_max_percent": round(getattr(score, 'util_max_percent', score.util_percent), 1), + "util_method": getattr(score, 'util_method', 'unknown'), + "util_node_count": getattr(score, 'util_node_count', 0), + "flagged_nodes": score.flagged_nodes, + "battery_warnings": score.battery_warnings, + "solar_index": round(score.solar_index, 1), + } + + +def _serialize_region(region) -> dict: + """Serialize a RegionHealth object.""" + return { + "name": region.name, + "center_lat": region.center_lat, + "center_lon": region.center_lon, + "node_count": len(region.node_ids), + "locality_count": len(region.localities), + "score": _serialize_health_score(region.score), + "node_ids": region.node_ids, + } + + +def _format_timestamp(ts: Optional[float]) -> Optional[str]: + """Format a Unix timestamp as ISO string.""" + if not ts or ts <= 0: + return None + try: + return datetime.fromtimestamp(ts).isoformat() + except (ValueError, OSError): + return None + + +@router.get("/health") +async def get_health(request: Request): + """Get mesh health data.""" + health_engine = request.app.state.health_engine + + if not health_engine or not health_engine.mesh_health: + return { + "score": 0, + "tier": "Unknown", + "message": "Health engine not ready", + } + + health = health_engine.mesh_health + score = health.score + + return { + "score": round(score.composite, 1), + "tier": score.tier, + "pillars": { + "infrastructure": round(score.infrastructure, 1), + "utilization": round(score.utilization, 1), + "behavior": round(score.behavior, 1), + "power": round(score.power, 1), + }, + "infra_online": score.infra_online, + "infra_total": score.infra_total, + "util_percent": round(score.util_percent, 1), + "util_max_percent": round(getattr(score, 'util_max_percent', score.util_percent), 1), + "util_method": getattr(score, 'util_method', 'unknown'), + "util_node_count": getattr(score, 'util_node_count', 0), + "flagged_nodes": score.flagged_nodes, + "battery_warnings": score.battery_warnings, + "total_nodes": health.total_nodes, + "total_regions": health.total_regions, + "unlocated_count": len(health.unlocated_nodes), + "last_computed": _format_timestamp(health.last_computed), + "recommendations": [], # TODO: Add recommendations + } + + +@router.get("/nodes") +async def get_nodes(request: Request): + """Get all nodes.""" + data_store = request.app.state.data_store + health_engine = request.app.state.health_engine + + if not data_store: + return [] + + try: + raw_nodes = data_store.get_all_nodes() + except Exception: + return [] + + nodes = [] + for node in raw_nodes: + # Extract node_num from various formats + node_num = node.get("nodeNum") or node.get("num") or node.get("node_num") + if node_num is None: + node_id = node.get("node_id") or node.get("id") + if node_id and isinstance(node_id, str): + try: + node_num = int(node_id.lstrip("!"), 16) + except ValueError: + continue + + if node_num is None: + continue + + # Get health data if available + health_data = {} + if health_engine and health_engine.mesh_health: + node_health = health_engine.mesh_health.nodes.get(str(node_num)) + if node_health: + health_data = { + "region": node_health.region, + "locality": node_health.locality, + "is_infrastructure": node_health.is_infrastructure, + "is_online": node_health.is_online, + "packet_count_24h": node_health.packet_count_24h, + } + + # Build node dict + node_dict = { + "node_num": node_num, + "node_id_hex": f"!{node_num:08x}", + "short_name": node.get("shortName") or node.get("short_name") or "", + "long_name": node.get("longName") or node.get("long_name") or "", + "role": node.get("role") or "", + "latitude": node.get("latitude"), + "longitude": node.get("longitude"), + "last_heard": _format_timestamp(node.get("last_heard")), + "battery_level": node.get("battery_level") or node.get("batteryLevel"), + "voltage": node.get("voltage"), + "snr": node.get("snr"), + "firmware": node.get("firmware_version") or node.get("firmwareVersion") or "", + "hardware": node.get("hw_model") or node.get("hwModel") or "", + "uptime": node.get("uptime_seconds") or node.get("uptimeSeconds"), + "sources": node.get("_sources", []), + **health_data, + } + nodes.append(node_dict) + + return nodes + + +@router.get("/nodes/{node_num}") +async def get_node_detail(node_num: int, request: Request): + """Get detailed info for a specific node.""" + data_store = request.app.state.data_store + health_engine = request.app.state.health_engine + + if not data_store: + raise HTTPException(status_code=404, detail="Data store not available") + + # Find the node + try: + raw_nodes = data_store.get_all_nodes() + except Exception: + raise HTTPException(status_code=500, detail="Failed to fetch nodes") + + target_node = None + for node in raw_nodes: + n_num = node.get("nodeNum") or node.get("num") or node.get("node_num") + if n_num is None: + node_id = node.get("node_id") or node.get("id") + if node_id and isinstance(node_id, str): + try: + n_num = int(node_id.lstrip("!"), 16) + except ValueError: + continue + + if n_num == node_num: + target_node = node + break + + if not target_node: + raise HTTPException(status_code=404, detail=f"Node {node_num} not found") + + # Get health data + health_data = {} + if health_engine and health_engine.mesh_health: + node_health = health_engine.mesh_health.nodes.get(str(node_num)) + if node_health: + health_data = { + "region": node_health.region, + "locality": node_health.locality, + "is_infrastructure": node_health.is_infrastructure, + "is_online": node_health.is_online, + "packet_count_24h": node_health.packet_count_24h, + "text_packet_count_24h": node_health.text_packet_count_24h, + "non_text_packets": node_health.non_text_packets, + "has_solar": node_health.has_solar, + } + + # Get neighbors from edges + neighbors = [] + try: + edges = data_store.get_all_edges() + for edge in edges: + from_num = edge.get("from_node") or edge.get("from") + to_num = edge.get("to_node") or edge.get("to") + + if from_num == node_num: + neighbors.append({ + "node_num": to_num, + "snr": edge.get("snr"), + }) + elif to_num == node_num: + neighbors.append({ + "node_num": from_num, + "snr": edge.get("snr"), + }) + except Exception: + pass + + return { + "node_num": node_num, + "node_id_hex": f"!{node_num:08x}", + "short_name": target_node.get("shortName") or target_node.get("short_name") or "", + "long_name": target_node.get("longName") or target_node.get("long_name") or "", + "role": target_node.get("role") or "", + "latitude": target_node.get("latitude"), + "longitude": target_node.get("longitude"), + "last_heard": _format_timestamp(target_node.get("last_heard")), + "battery_level": target_node.get("battery_level") or target_node.get("batteryLevel"), + "voltage": target_node.get("voltage"), + "snr": target_node.get("snr"), + "firmware": target_node.get("firmware_version") or target_node.get("firmwareVersion") or "", + "hardware": target_node.get("hw_model") or target_node.get("hwModel") or "", + "uptime": target_node.get("uptime_seconds") or target_node.get("uptimeSeconds"), + "sources": target_node.get("_sources", []), + "neighbors": neighbors, + **health_data, + } + + +@router.get("/regions") +async def get_regions(request: Request): + """Get region summaries.""" + health_engine = request.app.state.health_engine + + if not health_engine or not health_engine.mesh_health: + return [] + + regions = [] + for region in health_engine.mesh_health.regions: + # Count online infrastructure + infra_online = 0 + infra_total = 0 + online_count = 0 + + for nid in region.node_ids: + node = health_engine.mesh_health.nodes.get(nid) + if node: + if node.is_online: + online_count += 1 + if node.is_infrastructure: + infra_total += 1 + if node.is_online: + infra_online += 1 + + regions.append({ + "name": region.name, + "local_name": region.name, # Could be overridden by region_labels + "node_count": len(region.node_ids), + "infra_count": infra_total, + "infra_online": infra_online, + "online_count": online_count, + "score": round(region.score.composite, 1), + "tier": region.score.tier, + "center_lat": region.center_lat, + "center_lon": region.center_lon, + }) + + return regions + + +@router.get("/sources") +async def get_sources(request: Request): + """Get per-source health information.""" + data_store = request.app.state.data_store + + if not data_store: + return [] + + sources = [] + try: + for name, source in data_store._sources.items(): + source_info = { + "name": name, + "type": "meshview" if hasattr(source, "edges") else "meshmonitor", + "url": getattr(source, "url", ""), + "is_loaded": source.is_loaded, + "last_error": source.last_error, + "consecutive_errors": getattr(source, "consecutive_errors", 0), + "response_time_ms": getattr(source, "last_response_time_ms", None), + "tick_count": getattr(source, "tick_count", 0), + "node_count": len(source.nodes) if hasattr(source, "nodes") else 0, + } + sources.append(source_info) + except Exception: + pass + + return sources + + +@router.get("/edges") +async def get_edges(request: Request): + """Get neighbor/edge relationships.""" + data_store = request.app.state.data_store + + if not data_store: + return [] + + try: + raw_edges = data_store.get_all_edges() + except Exception: + return [] + + edges = [] + for edge in raw_edges: + from_num = edge.get("from_node") or edge.get("from") + to_num = edge.get("to_node") or edge.get("to") + snr = edge.get("snr") + + # Derive quality from SNR + if snr is None: + quality = "unknown" + elif snr > 12: + quality = "excellent" + elif snr > 8: + quality = "good" + elif snr > 5: + quality = "fair" + elif snr > 3: + quality = "marginal" + else: + quality = "poor" + + edges.append({ + "from_node": from_num, + "to_node": to_num, + "snr": snr, + "quality": quality, + }) + + return edges + + + +@router.get("/channels") +async def get_channels(request: Request): + """Get radio channels from the connected Meshtastic interface.""" + connector = getattr(request.app.state, "connector", None) + + if not connector or not connector.connected: + return [] + + try: + interface = connector._interface + if not interface or not hasattr(interface, "localNode"): + return [] + + local_node = interface.localNode + if not local_node or not hasattr(local_node, "channels"): + return [] + + channels = [] + for ch in local_node.channels: + if ch is None: + continue + + # Get channel settings + settings = getattr(ch, "settings", None) + name = getattr(settings, "name", "") if settings else "" + role_val = getattr(ch, "role", 0) + + # Map role enum to string + role_map = {0: "DISABLED", 1: "PRIMARY", 2: "SECONDARY"} + role = role_map.get(role_val, "UNKNOWN") + + channels.append({ + "index": ch.index, + "name": name or f"Channel {ch.index}", + "role": role, + "enabled": role_val != 0, + }) + + return channels + + except Exception as e: + import logging + logging.getLogger(__name__).warning(f"Failed to get channels: {e}") + return [] diff --git a/meshai/mesh_health.py b/meshai/mesh_health.py index cc95d44..00b5832 100644 --- a/meshai/mesh_health.py +++ b/meshai/mesh_health.py @@ -1,752 +1,844 @@ -"""Mesh health scoring engine. - -Computes four-pillar health scores at every hierarchy level: -- Infrastructure Uptime (40%) -- Channel Utilization (25%) -- Node Behavior (20%) -- Power Health (15%) -""" - -import logging -import time -from dataclasses import dataclass, field -from typing import Optional - -from .geo import ( - cluster_by_distance, - get_cluster_center, - haversine_distance, -) -from .mesh_models import UnifiedNode - -logger = logging.getLogger(__name__) - -# Infrastructure roles (auto-detected) -INFRASTRUCTURE_ROLES = {"ROUTER", "ROUTER_LATE", "ROUTER_CLIENT"} - -# Default thresholds -DEFAULT_LOCALITY_RADIUS_MILES = 8.0 -DEFAULT_OFFLINE_THRESHOLD_HOURS = 24 -DEFAULT_PACKET_THRESHOLD = 500 # Non-text packets per 24h -DEFAULT_BATTERY_WARNING_PERCENT = 20 - -# Utilization thresholds (percentage) -UTIL_HEALTHY = 20 -UTIL_CAUTION = 25 -UTIL_WARNING = 35 -UTIL_UNHEALTHY = 45 - -# Pillar weights (5-pillar system) -WEIGHT_INFRASTRUCTURE = 0.30 -WEIGHT_UTILIZATION = 0.25 -WEIGHT_COVERAGE = 0.20 -WEIGHT_BEHAVIOR = 0.15 -WEIGHT_POWER = 0.10 - - -@dataclass -class HealthScore: - """Health score for a single entity (mesh, region, locality, node).""" - - infrastructure: float = 100.0 # 0-100 - utilization: float = 100.0 # 0-100 - coverage: float = 100.0 # 0-100 (NEW: 5th pillar) - behavior: float = 100.0 # 0-100 - power: float = 100.0 # 0-100 - - # Underlying metrics - infra_online: int = 0 - infra_total: int = 0 - util_percent: float = 0.0 - coverage_avg_gateways: float = 0.0 - coverage_single_gw_count: int = 0 - coverage_full_count: int = 0 - flagged_nodes: int = 0 - battery_warnings: int = 0 - solar_index: float = 100.0 - - # Flag to indicate if utilization data is available - util_data_available: bool = False - coverage_data_available: bool = False - - @property - def composite(self) -> float: - """Calculate weighted composite score.""" - return ( - self.infrastructure * WEIGHT_INFRASTRUCTURE + - self.utilization * WEIGHT_UTILIZATION + - self.coverage * WEIGHT_COVERAGE + - self.behavior * WEIGHT_BEHAVIOR + - self.power * WEIGHT_POWER - ) - - @property - def tier(self) -> str: - """Get health tier label.""" - score = self.composite - if score >= 90: - return "Healthy" - elif score >= 75: - return "Slight degradation" - elif score >= 50: - return "Unhealthy" - elif score >= 25: - return "Warning" - else: - return "Critical" - - -@dataclass -class LocalityHealth: - """Health data for a locality (sub-region cluster).""" - - name: str - center_lat: float = 0.0 - center_lon: float = 0.0 - node_ids: list[str] = field(default_factory=list) - score: HealthScore = field(default_factory=HealthScore) - - -@dataclass -class RegionHealth: - """Health data for a region.""" - - name: str - center_lat: float = 0.0 - center_lon: float = 0.0 - localities: list[LocalityHealth] = field(default_factory=list) - node_ids: list[str] = field(default_factory=list) - score: HealthScore = field(default_factory=HealthScore) - - -@dataclass -class MeshHealth: - """Health data for the entire mesh.""" - - regions: list[RegionHealth] = field(default_factory=list) - unlocated_nodes: list[str] = field(default_factory=list) - nodes: dict[int, UnifiedNode] = field(default_factory=dict) - score: HealthScore = field(default_factory=HealthScore) - last_computed: float = 0.0 - - # Data availability flags for reporting - has_packet_data: bool = False - has_telemetry_data: bool = False - has_traceroute_data: bool = False - has_channel_data: bool = False - - # Traceroute statistics - traceroute_count: int = 0 - avg_hop_count: float = 0.0 - max_hop_count: int = 0 - - # MQTT/uplink statistics - uplink_node_count: int = 0 - - @property - def total_nodes(self) -> int: - return len(self.nodes) - - @property - def total_regions(self) -> int: - return len(self.regions) - - -@dataclass -class RegionAnchor: - """A fixed region anchor point for assignment.""" - name: str - lat: float - lon: float - - -class MeshHealthEngine: - """Computes mesh health scores from aggregated source data.""" - - def __init__( - self, - regions: Optional[list] = None, - locality_radius: float = DEFAULT_LOCALITY_RADIUS_MILES, - offline_threshold_hours: int = DEFAULT_OFFLINE_THRESHOLD_HOURS, - packet_threshold: int = DEFAULT_PACKET_THRESHOLD, - battery_warning_percent: int = DEFAULT_BATTERY_WARNING_PERCENT, - ): - """Initialize health engine. - - Args: - regions: List of region anchors (dicts or RegionAnchor with name, lat, lon) - locality_radius: Miles radius for locality clustering within regions - offline_threshold_hours: Hours before a node is considered offline - packet_threshold: Non-text packets per 24h to flag a node - battery_warning_percent: Battery level for warnings - """ - # Convert region configs to RegionAnchor objects - self.regions: list[RegionAnchor] = [] - if regions: - for r in regions: - if hasattr(r, 'name'): - self.regions.append(RegionAnchor(r.name, r.lat, r.lon)) - elif isinstance(r, dict): - self.regions.append(RegionAnchor(r['name'], r['lat'], r['lon'])) - - self.locality_radius = locality_radius - self.offline_threshold_hours = offline_threshold_hours - self.packet_threshold = packet_threshold - self.battery_warning_percent = battery_warning_percent - - self._mesh_health: Optional[MeshHealth] = None - - @property - def mesh_health(self) -> Optional[MeshHealth]: - """Get last computed mesh health.""" - return self._mesh_health - - def _find_nearest_region(self, lat: float, lon: float) -> Optional[str]: - """Find the nearest region anchor to a GPS point. - - Args: - lat: Latitude - lon: Longitude - - Returns: - Region name or None if no regions defined - """ - if not self.regions: - return None - - nearest = None - min_dist = float("inf") - - for region in self.regions: - dist = haversine_distance(lat, lon, region.lat, region.lon) - if dist < min_dist: - min_dist = dist - nearest = region.name - - return nearest - - def compute(self, data_store) -> MeshHealth: - """Compute mesh health from data store. - - Args: - data_store: MeshDataStore with aggregated mesh data - - Returns: - MeshHealth with computed scores - """ - # Store data_store reference for coverage calculations - self.data_store = data_store - source_manager = data_store # Alias for backwards compat with method body - now = time.time() - offline_threshold = now - (self.offline_threshold_hours * 3600) - - # Aggregate all nodes from all sources - all_nodes = source_manager.get_all_nodes() - all_telemetry = source_manager.get_all_telemetry() - - # FIX: Use aggregator method for deduped packets - all_packets = source_manager.get_all_packets() - - # Track if we have packet data for utilization calculation - has_packet_data = len(all_packets) > 0 - - # Use UnifiedNode objects directly from data_store - NO NodeHealth - nodes: dict[int, UnifiedNode] = {} - for node_num, unified in data_store.nodes.items(): - # Set is_infrastructure based on role - unified.is_infrastructure = str(unified.role).upper() in INFRASTRUCTURE_ROLES - # Set is_online based on last_heard - unified.is_online = unified.last_heard > offline_threshold if unified.last_heard else False - nodes[node_num] = unified - - # Skip all the old NodeHealth creation, telemetry, and packet parsing - # That data is already on UnifiedNode from MeshDataStore - - # REMOVED: All the telemetry parsing loop - # REMOVED: All the packet counting loop - # Data is already available on UnifiedNode: - # - unified.battery_percent, voltage, channel_utilization, air_util_tx - # - unified.packets_sent_24h, text_messages_24h, packets_by_type - # - unified.uplink_enabled, neighbor_count, neighbors - # - unified.avg_gateways, deliverability_score - - # Initialize regions from anchors - region_map: dict[str, RegionHealth] = {} - for anchor in self.regions: - region_map[anchor.name] = RegionHealth( - name=anchor.name, - center_lat=anchor.lat, - center_lon=anchor.lon, - ) - - # Assign nodes to nearest region (first pass: GPS-based) - unlocated = [] - for node_num, node in nodes.items(): - if node.latitude and node.longitude: - region_name = self._find_nearest_region(node.latitude, node.longitude) - if region_name and region_name in region_map: - node.region = region_name - region_map[region_name].node_ids.append(str(node_num)) - else: - unlocated.append(str(node_num)) - else: - unlocated.append(str(node_num)) - - # Build BIDIRECTIONAL neighbor map from ALL sources: - # 1. Each node's own neighbor list (from NeighborInfo packets) - # 2. REVERSE: if A lists B as neighbor, B also sees A - # 3. Edges from traceroutes and other connections - all_neighbor_map: dict[int, set[int]] = {} - - # First: add each node's own neighbor list AND reverse relationships - for node_num, node in nodes.items(): - if node.neighbors: - if node_num not in all_neighbor_map: - all_neighbor_map[node_num] = set() - for nb_num in node.neighbors: - all_neighbor_map[node_num].add(nb_num) - # REVERSE: if this node sees nb_num, nb_num also "sees" this node - if nb_num not in all_neighbor_map: - all_neighbor_map[nb_num] = set() - all_neighbor_map[nb_num].add(node_num) - - # Second: add from edges (connections from traceroutes, etc.) - if hasattr(data_store, 'edges'): - for edge in data_store.edges: - from_num = edge.from_node - to_num = edge.to_node - if from_num not in all_neighbor_map: - all_neighbor_map[from_num] = set() - if to_num not in all_neighbor_map: - all_neighbor_map[to_num] = set() - all_neighbor_map[from_num].add(to_num) - all_neighbor_map[to_num].add(from_num) - - # Also add from raw edges API - all_edges = source_manager.get_all_edges() - for edge in all_edges: - from_raw = edge.get("from") or edge.get("from_node") or edge.get("source") - to_raw = edge.get("to") or edge.get("to_node") or edge.get("target") - if not from_raw or not to_raw: - continue - try: - from_num = int(from_raw) if not str(from_raw).startswith("!") else int(str(from_raw)[1:], 16) - to_num = int(to_raw) if not str(to_raw).startswith("!") else int(str(to_raw)[1:], 16) - except (ValueError, TypeError): - continue - if from_num not in all_neighbor_map: - all_neighbor_map[from_num] = set() - if to_num not in all_neighbor_map: - all_neighbor_map[to_num] = set() - all_neighbor_map[from_num].add(to_num) - all_neighbor_map[to_num].add(from_num) - - # Second pass: Assign unlocated nodes based on BIDIRECTIONAL neighbor map - # This catches nodes that OTHER nodes list as neighbors - max_iterations = 10 - for _ in range(max_iterations): - newly_assigned = [] - for node_id_str in unlocated: - try: - node_num = int(node_id_str) - except ValueError: - continue - if node_num not in nodes: - continue - node = nodes[node_num] - if node.region: - continue # Already assigned - - # Use the BIDIRECTIONAL neighbor map - neighbor_nums = all_neighbor_map.get(node_num, set()) - region_counts: dict[str, int] = {} - for neighbor_num in neighbor_nums: - neighbor_node = nodes.get(neighbor_num) - if neighbor_node and neighbor_node.region: - r = neighbor_node.region - region_counts[r] = region_counts.get(r, 0) + 1 - - if region_counts: - # Assign to most common neighbor region - best_region = max(region_counts, key=region_counts.get) - node.region = best_region - region_map[best_region].node_ids.append(node_id_str) - newly_assigned.append(node_id_str) - - # Remove newly assigned from unlocated - for nid in newly_assigned: - if nid in unlocated: - unlocated.remove(nid) - - if not newly_assigned: - break # No more progress - - regions = list(region_map.values()) - - # Create localities within each region (cluster by proximity) - for region in regions: - if not region.node_ids: - continue - - region_nodes = [] - for nid_str in region.node_ids: - try: - nid = int(nid_str) - except ValueError: - continue - node = nodes.get(nid) - if node and node.latitude and node.longitude: - region_nodes.append({"id": nid_str, "latitude": node.latitude, "longitude": node.longitude}) - - if not region_nodes: - continue - - locality_clusters = cluster_by_distance( - region_nodes, - self.locality_radius, - lat_key="latitude", - lon_key="longitude", - id_key="id", - ) - - for i, cluster in enumerate(locality_clusters): - center_lat, center_lon = get_cluster_center(cluster) - - locality = LocalityHealth( - name=f"{region.name} L{i+1}", - center_lat=center_lat, - center_lon=center_lon, - node_ids=[n["id"] for n in cluster], - ) - region.localities.append(locality) - - # Mark nodes with their locality - for n in cluster: - if n["id"] in nodes: - try: - loc_nid = int(n["id"]) - if loc_nid in nodes: - nodes[loc_nid].locality = locality.name - except (ValueError, TypeError): - pass - - # Compute scores at each level (pass packet data availability flag) - self._compute_locality_scores(regions, nodes, has_packet_data) - self._compute_region_scores(regions, nodes, has_packet_data) - mesh_score = self._compute_mesh_score(regions, nodes, has_packet_data) - - # Get traceroute data for statistics - all_traceroutes = source_manager.get_all_traceroutes() - traceroute_count = len(all_traceroutes) - hop_counts = [] - for tr in all_traceroutes: - # Extract hop count from traceroute data - route = tr.get("route") or tr.get("hops") or [] - if isinstance(route, list): - hop_counts.append(len(route)) - - avg_hop_count = sum(hop_counts) / len(hop_counts) if hop_counts else 0.0 - max_hop_count = max(hop_counts) if hop_counts else 0 - - # Get channel data and count MQTT/uplink nodes - all_channels = source_manager.get_all_channels() - uplink_count = sum(1 for node in nodes.values() if node.uplink_enabled) - - # Build result with data availability flags - mesh_health = MeshHealth( - regions=regions, - unlocated_nodes=unlocated, - nodes=nodes, - score=mesh_score, - last_computed=now, - has_packet_data=has_packet_data, - has_telemetry_data=len(all_telemetry) > 0, - has_traceroute_data=traceroute_count > 0, - has_channel_data=len(all_channels) > 0, - traceroute_count=traceroute_count, - avg_hop_count=avg_hop_count, - max_hop_count=max_hop_count, - uplink_node_count=uplink_count, - ) - - self._mesh_health = mesh_health - - # Health scores are computed for node groups/regions, not individual nodes - # UnifiedNode objects already have their individual scores set during compute - - # Log computation summary with data availability - data_sources = [] - if has_packet_data: - data_sources.append(f"{len(all_packets)} pkts") - if len(all_telemetry) > 0: - data_sources.append(f"{len(all_telemetry)} telem") - if traceroute_count > 0: - data_sources.append(f"{traceroute_count} traces") - if len(all_channels) > 0: - data_sources.append(f"{len(all_channels)} ch") - data_str = ", ".join(data_sources) if data_sources else "nodes only" - - logger.info( - f"Mesh health computed: {mesh_health.total_nodes} nodes, " - f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100 " - f"[{data_str}]" - ) - - return mesh_health - - def _compute_locality_scores( - self, - regions: list[RegionHealth], - nodes: dict[int, UnifiedNode], - has_packet_data: bool = False, - ) -> None: - """Compute health scores for each locality.""" - for region in regions: - for locality in region.localities: - locality_nodes = [] - for nid_str in locality.node_ids: - try: - nid = int(nid_str) - except ValueError: - continue - if nid in nodes: - locality_nodes.append(nodes[nid]) - locality.score = self._compute_node_group_score(locality_nodes, has_packet_data) - - def _compute_region_scores( - self, - regions: list[RegionHealth], - nodes: dict[int, UnifiedNode], - has_packet_data: bool = False, - ) -> None: - """Compute health scores for each region.""" - for region in regions: - region_nodes = [] - for nid_str in region.node_ids: - try: - nid = int(nid_str) - except ValueError: - continue - if nid in nodes: - region_nodes.append(nodes[nid]) - region.score = self._compute_node_group_score(region_nodes, has_packet_data) - - def _compute_mesh_score( - self, - regions: list[RegionHealth], - nodes: dict[int, UnifiedNode], - has_packet_data: bool = False, - ) -> HealthScore: - """Compute mesh-wide health score.""" - all_nodes = list(nodes.values()) - return self._compute_node_group_score(all_nodes, has_packet_data) - - def _compute_node_group_score( - self, - node_list: list[UnifiedNode], - has_packet_data: bool = False, - ) -> HealthScore: - """Compute health score for a group of nodes. - - Args: - node_list: List of UnifiedNode objects - has_packet_data: Whether packet data is available for utilization calc - - Returns: - HealthScore for the group - """ - if not node_list: - return HealthScore() - - # Infrastructure uptime - infra_nodes = [n for n in node_list if n.is_infrastructure] - infra_online = sum(1 for n in infra_nodes if n.is_online) - infra_total = len(infra_nodes) - - if infra_total > 0: - infra_score = (infra_online / infra_total) * 100 - else: - infra_score = 100.0 # No infrastructure = not penalized - - # Channel utilization (based on packet counts if available) - # BUG 7 FIX: Use actual Meshtastic airtime calculation - if has_packet_data: - total_non_text_packets = sum((n.packets_sent_24h - n.text_messages_24h) for n in node_list) - # Average airtime per packet on MediumFast: ~200ms - # Total available airtime per hour: 3,600,000ms - # Utilization = (packets_per_hour * airtime_ms) / total_airtime_ms * 100 - packets_per_hour = total_non_text_packets / 24.0 # 24h window - airtime_per_packet_ms = 200 # ~200ms on MediumFast preset - util_percent = (packets_per_hour * airtime_per_packet_ms) / 3_600_000 * 100 - - # Apply scoring thresholds with interpolation - if util_percent < UTIL_HEALTHY: # <15% - util_score = 100.0 - elif util_percent < UTIL_CAUTION: # 15-20% - util_score = 100.0 - ((util_percent - UTIL_HEALTHY) / (UTIL_CAUTION - UTIL_HEALTHY)) * 25 - elif util_percent < UTIL_WARNING: # 20-25% - util_score = 75.0 - ((util_percent - UTIL_CAUTION) / (UTIL_WARNING - UTIL_CAUTION)) * 25 - elif util_percent < UTIL_UNHEALTHY: # 25-35% - util_score = 50.0 - ((util_percent - UTIL_WARNING) / (UTIL_UNHEALTHY - UTIL_WARNING)) * 25 - else: # 35%+ - util_score = max(0.0, 25.0 - ((util_percent - UTIL_UNHEALTHY) / 10) * 25) - else: - # No packet data available - assume healthy utilization - # This prevents penalizing the score when we simply don't have data - util_percent = 0.0 - util_score = 100.0 - - # Node behavior (flagged nodes) - flagged = [n for n in node_list if (n.packets_sent_24h - n.text_messages_24h) > self.packet_threshold] - flagged_count = len(flagged) - - if flagged_count == 0: - behavior_score = 100.0 - elif flagged_count == 1: - behavior_score = 80.0 - elif flagged_count <= 3: - behavior_score = 60.0 - elif flagged_count <= 5: - behavior_score = 40.0 - else: - behavior_score = 20.0 - - # Power health - battery_warnings = 0 - nodes_with_battery = 0 - for n in node_list: - if n.battery_percent is not None: - nodes_with_battery += 1 - if n.battery_percent < self.battery_warning_percent: - battery_warnings += 1 - - if nodes_with_battery > 0: - battery_ratio = battery_warnings / nodes_with_battery - power_score = 100.0 * (1 - battery_ratio) - else: - power_score = 100.0 - - solar_index = 100.0 - - - # Coverage scoring (5th pillar) - gateway redundancy - coverage_score = 100.0 - coverage_avg_gw = 0.0 - coverage_single = 0 - coverage_full = 0 - coverage_available = False - - if hasattr(self, 'data_store') and self.data_store: - total_sources = len(self.data_store._sources) if hasattr(self.data_store, '_sources') else 0 - nodes_with_coverage = [] - - for n in node_list: - node_num = n.node_num - unified = self.data_store.nodes.get(node_num) - if unified and unified.avg_gateways is not None: - nodes_with_coverage.append(unified) - - if nodes_with_coverage and total_sources > 0: - coverage_available = True - coverage_avg_gw = sum(u.avg_gateways for u in nodes_with_coverage) / len(nodes_with_coverage) - coverage_single = sum(1 for u in nodes_with_coverage if u.avg_gateways <= 1.0) - coverage_full = sum(1 for u in nodes_with_coverage if u.avg_gateways >= total_sources) - - # Score: penalize single-gateway nodes heavily - coverage_ratio = coverage_avg_gw / total_sources - single_penalty = (coverage_single / len(nodes_with_coverage)) * 40 if nodes_with_coverage else 0 - - if coverage_ratio >= 1.0: - coverage_score = 100.0 - single_penalty - elif coverage_ratio >= 0.7: - coverage_score = max(0, 90.0 - single_penalty - ((1.0 - coverage_ratio) * 30)) - elif coverage_ratio >= 0.5: - coverage_score = max(0, 70.0 - single_penalty - ((0.7 - coverage_ratio) * 50)) - else: - coverage_score = max(0, 50.0 - single_penalty - ((0.5 - coverage_ratio) * 100)) - - return HealthScore( - infrastructure=infra_score, - utilization=util_score, - coverage=coverage_score, - behavior=behavior_score, - power=power_score, - infra_online=infra_online, - infra_total=infra_total, - util_percent=util_percent, - coverage_avg_gateways=coverage_avg_gw, - coverage_single_gw_count=coverage_single, - coverage_full_count=coverage_full, - flagged_nodes=flagged_count, - battery_warnings=battery_warnings, - solar_index=solar_index, - util_data_available=has_packet_data, - coverage_data_available=coverage_available, - ) - - def get_region(self, name: str) -> Optional[RegionHealth]: - """Get a region by name.""" - if not self._mesh_health: - return None - - name_lower = name.lower() - for region in self._mesh_health.regions: - if region.name.lower() == name_lower: - return region - return None - - def get_node(self, identifier: str) -> Optional[UnifiedNode]: - """Get a node by ID, name, or hex.""" - if not self._mesh_health: - return None - - # Try as int (node_num) - try: - num = int(identifier) - if num in self._mesh_health.nodes: - return self._mesh_health.nodes[num] - except ValueError: - pass - - # Try shortname/longname - id_lower = identifier.lower().strip() - for node in self._mesh_health.nodes.values(): - if node.short_name and node.short_name.lower() == id_lower: - return node - if node.long_name and id_lower in node.long_name.lower(): - return node - - # Try hex - if identifier.startswith("!"): - try: - num = int(identifier[1:], 16) - if num in self._mesh_health.nodes: - return self._mesh_health.nodes[num] - except ValueError: - pass - - return None - - def get_infrastructure_nodes(self) -> list[UnifiedNode]: - """Get all infrastructure nodes.""" - if not self._mesh_health: - return [] - return [n for n in self._mesh_health.nodes.values() if n.is_infrastructure] - - def get_flagged_nodes(self) -> list[UnifiedNode]: - """Get nodes flagged for excessive packets.""" - if not self._mesh_health: - return [] - return [ - n for n in self._mesh_health.nodes.values() - if (n.packets_sent_24h - n.text_messages_24h) > self.packet_threshold - ] - - def get_battery_warnings(self) -> list[UnifiedNode]: - """Get nodes with low battery.""" - if not self._mesh_health: - return [] - return [ - n for n in self._mesh_health.nodes.values() - if n.battery_percent is not None and n.battery_percent < self.battery_warning_percent - ] +"""Mesh health scoring engine. + +Computes four-pillar health scores at every hierarchy level: +- Infrastructure Uptime (40%) +- Channel Utilization (25%) +- Node Behavior (20%) +- Power Health (15%) +""" + +import logging +import time +from dataclasses import dataclass, field +from typing import Optional + +from .geo import ( + cluster_by_distance, + get_cluster_center, + haversine_distance, +) +from .mesh_models import UnifiedNode + +logger = logging.getLogger(__name__) + +# Infrastructure roles (auto-detected) +INFRASTRUCTURE_ROLES = {"ROUTER", "ROUTER_LATE", "ROUTER_CLIENT"} + +# Default thresholds +DEFAULT_LOCALITY_RADIUS_MILES = 8.0 +DEFAULT_OFFLINE_THRESHOLD_HOURS = 24 +DEFAULT_PACKET_THRESHOLD = 500 # Non-text packets per 24h +DEFAULT_BATTERY_WARNING_PERCENT = 20 + +# Utilization thresholds (percentage) - based on real Meshtastic behavior +# Firmware starts throttling GPS at 25%, severe degradation above 35% +UTIL_HEALTHY = 20 # Under 20% = channel is clear +UTIL_CAUTION = 25 # 20-25% = slight degradation, occasional collisions +UTIL_WARNING = 35 # 25-35% = severe degradation, firmware throttling +UTIL_UNHEALTHY = 45 # 35-45% = mesh struggling badly, reliability dropping + +# Pillar weights (5-pillar system) +WEIGHT_INFRASTRUCTURE = 0.30 +WEIGHT_UTILIZATION = 0.25 +WEIGHT_COVERAGE = 0.20 +WEIGHT_BEHAVIOR = 0.15 +WEIGHT_POWER = 0.10 + + +@dataclass +class HealthScore: + """Health score for a single entity (mesh, region, locality, node).""" + + infrastructure: float = 100.0 # 0-100 + utilization: float = 100.0 # 0-100 + coverage: float = 100.0 # 0-100 (NEW: 5th pillar) + behavior: float = 100.0 # 0-100 + power: float = 100.0 # 0-100 + + # Underlying metrics + infra_online: int = 0 + infra_total: int = 0 + util_percent: float = 0.0 + util_max_percent: float = 0.0 # Highest node utilization (hotspot indicator) + util_method: str = "none" # "telemetry", "packet_estimate", or "none" + util_node_count: int = 0 # Nodes reporting utilization + coverage_avg_gateways: float = 0.0 + coverage_single_gw_count: int = 0 + coverage_full_count: int = 0 + flagged_nodes: int = 0 + battery_warnings: int = 0 + solar_index: float = 100.0 + + # Flag to indicate if utilization data is available + util_data_available: bool = False + coverage_data_available: bool = False + + @property + def composite(self) -> float: + """Calculate weighted composite score.""" + return ( + self.infrastructure * WEIGHT_INFRASTRUCTURE + + self.utilization * WEIGHT_UTILIZATION + + self.coverage * WEIGHT_COVERAGE + + self.behavior * WEIGHT_BEHAVIOR + + self.power * WEIGHT_POWER + ) + + @property + def tier(self) -> str: + """Get health tier label.""" + score = self.composite + if score >= 90: + return "Healthy" + elif score >= 75: + return "Slight degradation" + elif score >= 50: + return "Unhealthy" + elif score >= 25: + return "Warning" + else: + return "Critical" + + +@dataclass +class LocalityHealth: + """Health data for a locality (sub-region cluster).""" + + name: str + center_lat: float = 0.0 + center_lon: float = 0.0 + node_ids: list[str] = field(default_factory=list) + score: HealthScore = field(default_factory=HealthScore) + + +@dataclass +class RegionHealth: + """Health data for a region.""" + + name: str + center_lat: float = 0.0 + center_lon: float = 0.0 + localities: list[LocalityHealth] = field(default_factory=list) + node_ids: list[str] = field(default_factory=list) + score: HealthScore = field(default_factory=HealthScore) + + +@dataclass +class MeshHealth: + """Health data for the entire mesh.""" + + regions: list[RegionHealth] = field(default_factory=list) + unlocated_nodes: list[str] = field(default_factory=list) + nodes: dict[int, UnifiedNode] = field(default_factory=dict) + score: HealthScore = field(default_factory=HealthScore) + last_computed: float = 0.0 + + # Data availability flags for reporting + has_packet_data: bool = False + has_telemetry_data: bool = False + has_traceroute_data: bool = False + has_channel_data: bool = False + + # Traceroute statistics + traceroute_count: int = 0 + avg_hop_count: float = 0.0 + max_hop_count: int = 0 + + # MQTT/uplink statistics + uplink_node_count: int = 0 + + @property + def total_nodes(self) -> int: + return len(self.nodes) + + @property + def total_regions(self) -> int: + return len(self.regions) + + +@dataclass +class RegionAnchor: + """A fixed region anchor point for assignment.""" + name: str + lat: float + lon: float + + +class MeshHealthEngine: + """Computes mesh health scores from aggregated source data.""" + + def __init__( + self, + regions: Optional[list] = None, + locality_radius: float = DEFAULT_LOCALITY_RADIUS_MILES, + offline_threshold_hours: int = DEFAULT_OFFLINE_THRESHOLD_HOURS, + packet_threshold: int = DEFAULT_PACKET_THRESHOLD, + battery_warning_percent: int = DEFAULT_BATTERY_WARNING_PERCENT, + ): + """Initialize health engine. + + Args: + regions: List of region anchors (dicts or RegionAnchor with name, lat, lon) + locality_radius: Miles radius for locality clustering within regions + offline_threshold_hours: Hours before a node is considered offline + packet_threshold: Non-text packets per 24h to flag a node + battery_warning_percent: Battery level for warnings + """ + # Convert region configs to RegionAnchor objects + self.regions: list[RegionAnchor] = [] + if regions: + for r in regions: + if hasattr(r, 'name'): + self.regions.append(RegionAnchor(r.name, r.lat, r.lon)) + elif isinstance(r, dict): + self.regions.append(RegionAnchor(r['name'], r['lat'], r['lon'])) + + self.locality_radius = locality_radius + self.offline_threshold_hours = offline_threshold_hours + self.packet_threshold = packet_threshold + self.battery_warning_percent = battery_warning_percent + + self._mesh_health: Optional[MeshHealth] = None + + @property + def mesh_health(self) -> Optional[MeshHealth]: + """Get last computed mesh health.""" + return self._mesh_health + + def _find_nearest_region(self, lat: float, lon: float) -> Optional[str]: + """Find the nearest region anchor to a GPS point. + + Args: + lat: Latitude + lon: Longitude + + Returns: + Region name or None if no regions defined + """ + if not self.regions: + return None + + nearest = None + min_dist = float("inf") + + for region in self.regions: + dist = haversine_distance(lat, lon, region.lat, region.lon) + if dist < min_dist: + min_dist = dist + nearest = region.name + + return nearest + + def compute(self, data_store) -> MeshHealth: + """Compute mesh health from data store. + + Args: + data_store: MeshDataStore with aggregated mesh data + + Returns: + MeshHealth with computed scores + """ + # Store data_store reference for coverage calculations + self.data_store = data_store + source_manager = data_store # Alias for backwards compat with method body + now = time.time() + offline_threshold = now - (self.offline_threshold_hours * 3600) + + # Aggregate all nodes from all sources + all_nodes = source_manager.get_all_nodes() + all_telemetry = source_manager.get_all_telemetry() + + # FIX: Use aggregator method for deduped packets + all_packets = source_manager.get_all_packets() + + # Track if we have packet data for utilization calculation + has_packet_data = len(all_packets) > 0 + + # Use UnifiedNode objects directly from data_store - NO NodeHealth + nodes: dict[int, UnifiedNode] = {} + for node_num, unified in data_store.nodes.items(): + # Set is_infrastructure based on role + unified.is_infrastructure = str(unified.role).upper() in INFRASTRUCTURE_ROLES + # Set is_online based on last_heard + unified.is_online = unified.last_heard > offline_threshold if unified.last_heard else False + nodes[node_num] = unified + + # Skip all the old NodeHealth creation, telemetry, and packet parsing + # That data is already on UnifiedNode from MeshDataStore + + # REMOVED: All the telemetry parsing loop + # REMOVED: All the packet counting loop + # Data is already available on UnifiedNode: + # - unified.battery_percent, voltage, channel_utilization, air_util_tx + # - unified.packets_sent_24h, text_messages_24h, packets_by_type + # - unified.uplink_enabled, neighbor_count, neighbors + # - unified.avg_gateways, deliverability_score + + # Initialize regions from anchors + region_map: dict[str, RegionHealth] = {} + for anchor in self.regions: + region_map[anchor.name] = RegionHealth( + name=anchor.name, + center_lat=anchor.lat, + center_lon=anchor.lon, + ) + + # Assign nodes to nearest region (first pass: GPS-based) + unlocated = [] + for node_num, node in nodes.items(): + if node.latitude and node.longitude: + region_name = self._find_nearest_region(node.latitude, node.longitude) + if region_name and region_name in region_map: + node.region = region_name + region_map[region_name].node_ids.append(str(node_num)) + else: + unlocated.append(str(node_num)) + else: + unlocated.append(str(node_num)) + + # Build BIDIRECTIONAL neighbor map from ALL sources: + # 1. Each node's own neighbor list (from NeighborInfo packets) + # 2. REVERSE: if A lists B as neighbor, B also sees A + # 3. Edges from traceroutes and other connections + all_neighbor_map: dict[int, set[int]] = {} + + # First: add each node's own neighbor list AND reverse relationships + for node_num, node in nodes.items(): + if node.neighbors: + if node_num not in all_neighbor_map: + all_neighbor_map[node_num] = set() + for nb_num in node.neighbors: + all_neighbor_map[node_num].add(nb_num) + # REVERSE: if this node sees nb_num, nb_num also "sees" this node + if nb_num not in all_neighbor_map: + all_neighbor_map[nb_num] = set() + all_neighbor_map[nb_num].add(node_num) + + # Second: add from edges (connections from traceroutes, etc.) + if hasattr(data_store, 'edges'): + for edge in data_store.edges: + from_num = edge.from_node + to_num = edge.to_node + if from_num not in all_neighbor_map: + all_neighbor_map[from_num] = set() + if to_num not in all_neighbor_map: + all_neighbor_map[to_num] = set() + all_neighbor_map[from_num].add(to_num) + all_neighbor_map[to_num].add(from_num) + + # Also add from raw edges API + all_edges = source_manager.get_all_edges() + for edge in all_edges: + from_raw = edge.get("from") or edge.get("from_node") or edge.get("source") + to_raw = edge.get("to") or edge.get("to_node") or edge.get("target") + if not from_raw or not to_raw: + continue + try: + from_num = int(from_raw) if not str(from_raw).startswith("!") else int(str(from_raw)[1:], 16) + to_num = int(to_raw) if not str(to_raw).startswith("!") else int(str(to_raw)[1:], 16) + except (ValueError, TypeError): + continue + if from_num not in all_neighbor_map: + all_neighbor_map[from_num] = set() + if to_num not in all_neighbor_map: + all_neighbor_map[to_num] = set() + all_neighbor_map[from_num].add(to_num) + all_neighbor_map[to_num].add(from_num) + + # Second pass: Assign unlocated nodes based on BIDIRECTIONAL neighbor map + # This catches nodes that OTHER nodes list as neighbors + max_iterations = 10 + for _ in range(max_iterations): + newly_assigned = [] + for node_id_str in unlocated: + try: + node_num = int(node_id_str) + except ValueError: + continue + if node_num not in nodes: + continue + node = nodes[node_num] + if node.region: + continue # Already assigned + + # Use the BIDIRECTIONAL neighbor map + neighbor_nums = all_neighbor_map.get(node_num, set()) + region_counts: dict[str, int] = {} + for neighbor_num in neighbor_nums: + neighbor_node = nodes.get(neighbor_num) + if neighbor_node and neighbor_node.region: + r = neighbor_node.region + region_counts[r] = region_counts.get(r, 0) + 1 + + if region_counts: + # Assign to most common neighbor region + best_region = max(region_counts, key=region_counts.get) + node.region = best_region + region_map[best_region].node_ids.append(node_id_str) + newly_assigned.append(node_id_str) + + # Remove newly assigned from unlocated + for nid in newly_assigned: + if nid in unlocated: + unlocated.remove(nid) + + if not newly_assigned: + break # No more progress + + regions = list(region_map.values()) + + # Create localities within each region (cluster by proximity) + for region in regions: + if not region.node_ids: + continue + + region_nodes = [] + for nid_str in region.node_ids: + try: + nid = int(nid_str) + except ValueError: + continue + node = nodes.get(nid) + if node and node.latitude and node.longitude: + region_nodes.append({"id": nid_str, "latitude": node.latitude, "longitude": node.longitude}) + + if not region_nodes: + continue + + locality_clusters = cluster_by_distance( + region_nodes, + self.locality_radius, + lat_key="latitude", + lon_key="longitude", + id_key="id", + ) + + for i, cluster in enumerate(locality_clusters): + center_lat, center_lon = get_cluster_center(cluster) + + locality = LocalityHealth( + name=f"{region.name} L{i+1}", + center_lat=center_lat, + center_lon=center_lon, + node_ids=[n["id"] for n in cluster], + ) + region.localities.append(locality) + + # Mark nodes with their locality + for n in cluster: + if n["id"] in nodes: + try: + loc_nid = int(n["id"]) + if loc_nid in nodes: + nodes[loc_nid].locality = locality.name + except (ValueError, TypeError): + pass + + # Compute scores at each level (pass packet data availability flag) + self._compute_locality_scores(regions, nodes, has_packet_data) + self._compute_region_scores(regions, nodes, has_packet_data) + mesh_score = self._compute_mesh_score(regions, nodes, has_packet_data) + + # Get traceroute data for statistics + all_traceroutes = source_manager.get_all_traceroutes() + traceroute_count = len(all_traceroutes) + hop_counts = [] + for tr in all_traceroutes: + # Extract hop count from traceroute data + route = tr.get("route") or tr.get("hops") or [] + if isinstance(route, list): + hop_counts.append(len(route)) + + avg_hop_count = sum(hop_counts) / len(hop_counts) if hop_counts else 0.0 + max_hop_count = max(hop_counts) if hop_counts else 0 + + # Get channel data and count MQTT/uplink nodes + all_channels = source_manager.get_all_channels() + uplink_count = sum(1 for node in nodes.values() if node.uplink_enabled) + + # Build result with data availability flags + mesh_health = MeshHealth( + regions=regions, + unlocated_nodes=unlocated, + nodes=nodes, + score=mesh_score, + last_computed=now, + has_packet_data=has_packet_data, + has_telemetry_data=len(all_telemetry) > 0, + has_traceroute_data=traceroute_count > 0, + has_channel_data=len(all_channels) > 0, + traceroute_count=traceroute_count, + avg_hop_count=avg_hop_count, + max_hop_count=max_hop_count, + uplink_node_count=uplink_count, + ) + + self._mesh_health = mesh_health + + # Health scores are computed for node groups/regions, not individual nodes + # UnifiedNode objects already have their individual scores set during compute + + # Log computation summary with data availability + data_sources = [] + if has_packet_data: + data_sources.append(f"{len(all_packets)} pkts") + if len(all_telemetry) > 0: + data_sources.append(f"{len(all_telemetry)} telem") + if traceroute_count > 0: + data_sources.append(f"{traceroute_count} traces") + if len(all_channels) > 0: + data_sources.append(f"{len(all_channels)} ch") + data_str = ", ".join(data_sources) if data_sources else "nodes only" + + # Log utilization method used + util_method = mesh_score.util_method + if util_method == "telemetry": + util_info = f"util={mesh_score.util_percent:.1f}% (max={mesh_score.util_max_percent:.1f}%, {mesh_score.util_node_count} nodes reporting)" + elif util_method == "packet_estimate": + util_info = f"util={mesh_score.util_percent:.1f}% (packet estimate fallback)" + else: + util_info = "util=N/A (no data)" + + logger.info( + f"Mesh health computed: {mesh_health.total_nodes} nodes, " + f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100 " + f"[{data_str}] [{util_info}]" + ) + + return mesh_health + + def _compute_locality_scores( + self, + regions: list[RegionHealth], + nodes: dict[int, UnifiedNode], + has_packet_data: bool = False, + ) -> None: + """Compute health scores for each locality.""" + for region in regions: + for locality in region.localities: + locality_nodes = [] + for nid_str in locality.node_ids: + try: + nid = int(nid_str) + except ValueError: + continue + if nid in nodes: + locality_nodes.append(nodes[nid]) + locality.score = self._compute_node_group_score(locality_nodes, has_packet_data) + + def _compute_region_scores( + self, + regions: list[RegionHealth], + nodes: dict[int, UnifiedNode], + has_packet_data: bool = False, + ) -> None: + """Compute health scores for each region.""" + for region in regions: + region_nodes = [] + for nid_str in region.node_ids: + try: + nid = int(nid_str) + except ValueError: + continue + if nid in nodes: + region_nodes.append(nodes[nid]) + region.score = self._compute_node_group_score(region_nodes, has_packet_data) + + def _compute_mesh_score( + self, + regions: list[RegionHealth], + nodes: dict[int, UnifiedNode], + has_packet_data: bool = False, + ) -> HealthScore: + """Compute mesh-wide health score.""" + all_nodes = list(nodes.values()) + return self._compute_node_group_score(all_nodes, has_packet_data) + + def _compute_utilization_score(self, util_percent: float) -> float: + """Convert utilization percentage to health score using thresholds. + + Thresholds based on real Meshtastic behavior: + - Under 20%: Clear channel (score 100) + - 20-25%: Slight degradation (score 75-100) + - 25-35%: Severe degradation, firmware throttling (score 50-75) + - 35-45%: Mesh struggling badly (score 25-50) + - Over 45%: Mesh effectively dead (score 0-25) + """ + if util_percent < UTIL_HEALTHY: # <20% + return 100.0 + elif util_percent < UTIL_CAUTION: # 20-25% + # Interpolate from 100 to 75 + return 100.0 - ((util_percent - UTIL_HEALTHY) / (UTIL_CAUTION - UTIL_HEALTHY)) * 25 + elif util_percent < UTIL_WARNING: # 25-35% + # Interpolate from 75 to 50 + return 75.0 - ((util_percent - UTIL_CAUTION) / (UTIL_WARNING - UTIL_CAUTION)) * 25 + elif util_percent < UTIL_UNHEALTHY: # 35-45% + # Interpolate from 50 to 25 + return 50.0 - ((util_percent - UTIL_WARNING) / (UTIL_UNHEALTHY - UTIL_WARNING)) * 25 + else: # 45%+ + # Interpolate from 25 to 0 over next 10% + return max(0.0, 25.0 - ((util_percent - UTIL_UNHEALTHY) / 10) * 25) + + def _compute_node_group_score( + self, + node_list: list[UnifiedNode], + has_packet_data: bool = False, + ) -> HealthScore: + """Compute health score for a group of nodes. + + Args: + node_list: List of UnifiedNode objects + has_packet_data: Whether packet data is available for utilization calc + + Returns: + HealthScore for the group + """ + if not node_list: + return HealthScore() + + # Infrastructure uptime + infra_nodes = [n for n in node_list if n.is_infrastructure] + infra_online = sum(1 for n in infra_nodes if n.is_online) + infra_total = len(infra_nodes) + + if infra_total > 0: + infra_score = (infra_online / infra_total) * 100 + else: + infra_score = 100.0 # No infrastructure = not penalized + + # Channel utilization - prefer real telemetry over packet estimate + # + # Priority 1: Use firmware-reported channel_utilization from nodes + # This is the most accurate measure - the firmware calculates this + # from actual radio activity over the last minute. + # + # Priority 2: Fall back to packet count estimate if no telemetry + # This is a rough approximation using 200ms/packet (MediumFast preset). + # It's less accurate because different presets have different airtime, + # and it sums packets across all nodes regardless of channel. + + util_percent = 0.0 + util_max_percent = 0.0 + util_score = 100.0 + util_method = "none" + util_node_count = 0 + util_data_available = False + + # Try to get real channel_utilization from infrastructure nodes + # Use infrastructure nodes because they're the routers - they see the most traffic + util_readings = [] + for n in infra_nodes: + if n.channel_utilization is not None and n.channel_utilization >= 0: + util_readings.append(n.channel_utilization) + + # If no infra nodes have it, try all nodes + if not util_readings: + for n in node_list: + if n.channel_utilization is not None and n.channel_utilization >= 0: + util_readings.append(n.channel_utilization) + + if util_readings: + # Use the HIGHEST value - the busiest node is the bottleneck + # If one router is at 45% utilization, the mesh has a problem + # even if other nodes are at 10% + util_max_percent = max(util_readings) + util_percent = util_max_percent # Use max for scoring + util_score = self._compute_utilization_score(util_percent) + util_method = "telemetry" + util_node_count = len(util_readings) + util_data_available = True + + # Also compute average for informational purposes + # (stored in util_percent, max in util_max_percent) + # Actually, use max for the score since that's the bottleneck + + elif has_packet_data: + # Fallback: Estimate from packet counts + # This is a rough approximation - only use when telemetry unavailable + # + # WARNING: This method has known issues: + # - Assumes 200ms airtime per packet (only correct for MediumFast) + # - Sums packets across all nodes even on different channels + # - Can't distinguish retries from new packets + # Use real channel_utilization from telemetry when available. + + total_non_text_packets = sum((n.packets_sent_24h - n.text_messages_24h) for n in node_list) + packets_per_hour = total_non_text_packets / 24.0 # 24h window + airtime_per_packet_ms = 200 # ~200ms on MediumFast preset + util_percent = (packets_per_hour * airtime_per_packet_ms) / 3_600_000 * 100 + util_max_percent = util_percent # No per-node data available + util_score = self._compute_utilization_score(util_percent) + util_method = "packet_estimate" + util_node_count = 0 + util_data_available = True + + logger.debug( + f"Utilization using packet estimate fallback: {util_percent:.1f}% " + f"({total_non_text_packets} non-text packets/24h)" + ) + else: + # No utilization data available - don't penalize + util_percent = 0.0 + util_max_percent = 0.0 + util_score = 100.0 + util_method = "none" + util_node_count = 0 + util_data_available = False + + # Node behavior (flagged nodes) + flagged = [n for n in node_list if (n.packets_sent_24h - n.text_messages_24h) > self.packet_threshold] + flagged_count = len(flagged) + + if flagged_count == 0: + behavior_score = 100.0 + elif flagged_count == 1: + behavior_score = 80.0 + elif flagged_count <= 3: + behavior_score = 60.0 + elif flagged_count <= 5: + behavior_score = 40.0 + else: + behavior_score = 20.0 + + # Power health + battery_warnings = 0 + nodes_with_battery = 0 + for n in node_list: + if n.battery_percent is not None: + nodes_with_battery += 1 + if n.battery_percent < self.battery_warning_percent: + battery_warnings += 1 + + if nodes_with_battery > 0: + battery_ratio = battery_warnings / nodes_with_battery + power_score = 100.0 * (1 - battery_ratio) + else: + power_score = 100.0 + + solar_index = 100.0 + + + # Coverage scoring (5th pillar) - gateway redundancy + coverage_score = 100.0 + coverage_avg_gw = 0.0 + coverage_single = 0 + coverage_full = 0 + coverage_available = False + + if hasattr(self, 'data_store') and self.data_store: + total_sources = len(self.data_store._sources) if hasattr(self.data_store, '_sources') else 0 + nodes_with_coverage = [] + + for n in node_list: + node_num = n.node_num + unified = self.data_store.nodes.get(node_num) + if unified and unified.avg_gateways is not None: + nodes_with_coverage.append(unified) + + if nodes_with_coverage and total_sources > 0: + coverage_available = True + coverage_avg_gw = sum(u.avg_gateways for u in nodes_with_coverage) / len(nodes_with_coverage) + coverage_single = sum(1 for u in nodes_with_coverage if u.avg_gateways <= 1.0) + coverage_full = sum(1 for u in nodes_with_coverage if u.avg_gateways >= total_sources) + + # Score: penalize single-gateway nodes heavily + coverage_ratio = coverage_avg_gw / total_sources + single_penalty = (coverage_single / len(nodes_with_coverage)) * 40 if nodes_with_coverage else 0 + + if coverage_ratio >= 1.0: + coverage_score = 100.0 - single_penalty + elif coverage_ratio >= 0.7: + coverage_score = max(0, 90.0 - single_penalty - ((1.0 - coverage_ratio) * 30)) + elif coverage_ratio >= 0.5: + coverage_score = max(0, 70.0 - single_penalty - ((0.7 - coverage_ratio) * 50)) + else: + coverage_score = max(0, 50.0 - single_penalty - ((0.5 - coverage_ratio) * 100)) + + return HealthScore( + infrastructure=infra_score, + utilization=util_score, + coverage=coverage_score, + behavior=behavior_score, + power=power_score, + infra_online=infra_online, + infra_total=infra_total, + util_percent=util_percent, + util_max_percent=util_max_percent, + util_method=util_method, + util_node_count=util_node_count, + coverage_avg_gateways=coverage_avg_gw, + coverage_single_gw_count=coverage_single, + coverage_full_count=coverage_full, + flagged_nodes=flagged_count, + battery_warnings=battery_warnings, + solar_index=solar_index, + util_data_available=util_data_available, + coverage_data_available=coverage_available, + ) + + def get_region(self, name: str) -> Optional[RegionHealth]: + """Get a region by name.""" + if not self._mesh_health: + return None + + name_lower = name.lower() + for region in self._mesh_health.regions: + if region.name.lower() == name_lower: + return region + return None + + def get_node(self, identifier: str) -> Optional[UnifiedNode]: + """Get a node by ID, name, or hex.""" + if not self._mesh_health: + return None + + # Try as int (node_num) + try: + num = int(identifier) + if num in self._mesh_health.nodes: + return self._mesh_health.nodes[num] + except ValueError: + pass + + # Try shortname/longname + id_lower = identifier.lower().strip() + for node in self._mesh_health.nodes.values(): + if node.short_name and node.short_name.lower() == id_lower: + return node + if node.long_name and id_lower in node.long_name.lower(): + return node + + # Try hex + if identifier.startswith("!"): + try: + num = int(identifier[1:], 16) + if num in self._mesh_health.nodes: + return self._mesh_health.nodes[num] + except ValueError: + pass + + return None + + def get_infrastructure_nodes(self) -> list[UnifiedNode]: + """Get all infrastructure nodes.""" + if not self._mesh_health: + return [] + return [n for n in self._mesh_health.nodes.values() if n.is_infrastructure] + + def get_flagged_nodes(self) -> list[UnifiedNode]: + """Get nodes flagged for excessive packets.""" + if not self._mesh_health: + return [] + return [ + n for n in self._mesh_health.nodes.values() + if (n.packets_sent_24h - n.text_messages_24h) > self.packet_threshold + ] + + def get_battery_warnings(self) -> list[UnifiedNode]: + """Get nodes with low battery.""" + if not self._mesh_health: + return [] + return [ + n for n in self._mesh_health.nodes.values() + if n.battery_percent is not None and n.battery_percent < self.battery_warning_percent + ]