feat: Complete data pipeline — utilization, behavior, power, solar, traceroutes all wired

This commit is contained in:
K7ZVX 2026-05-04 21:22:30 +00:00
commit 3959444a09
3 changed files with 365 additions and 20 deletions

View file

@ -109,9 +109,17 @@ class NodeHealth:
# Metrics # Metrics
packet_count_24h: int = 0 packet_count_24h: int = 0
text_packet_count_24h: int = 0 text_packet_count_24h: int = 0
position_packet_count_24h: int = 0
telemetry_packet_count_24h: int = 0
battery_percent: Optional[float] = None battery_percent: Optional[float] = None
voltage: Optional[float] = None voltage: Optional[float] = None
channel_utilization: Optional[float] = None # From device telemetry
air_util_tx: Optional[float] = None # From device telemetry
has_solar: bool = False has_solar: bool = False
uplink_enabled: bool = False
# Packet breakdown by portnum
packets_by_portnum: dict[str, int] = field(default_factory=dict)
# Scores # Scores
score: HealthScore = field(default_factory=HealthScore) score: HealthScore = field(default_factory=HealthScore)
@ -121,6 +129,13 @@ class NodeHealth:
"""Non-text packets in 24h.""" """Non-text packets in 24h."""
return self.packet_count_24h - self.text_packet_count_24h return self.packet_count_24h - self.text_packet_count_24h
@property
def estimated_position_interval(self) -> Optional[float]:
"""Estimate position broadcast interval in seconds."""
if self.position_packet_count_24h > 0:
return 86400 / self.position_packet_count_24h
return None
@dataclass @dataclass
class LocalityHealth: class LocalityHealth:
@ -155,6 +170,20 @@ class MeshHealth:
score: HealthScore = field(default_factory=HealthScore) score: HealthScore = field(default_factory=HealthScore)
last_computed: float = 0.0 last_computed: float = 0.0
# Data availability flags for reporting
has_packet_data: bool = False
has_telemetry_data: bool = False
has_traceroute_data: bool = False
has_channel_data: bool = False
# Traceroute statistics
traceroute_count: int = 0
avg_hop_count: float = 0.0
max_hop_count: int = 0
# MQTT/uplink statistics
uplink_node_count: int = 0
@property @property
def total_nodes(self) -> int: def total_nodes(self) -> int:
return len(self.nodes) return len(self.nodes)
@ -348,7 +377,21 @@ class MeshHealthEngine:
if voltage is not None: if voltage is not None:
node.voltage = float(voltage) node.voltage = float(voltage)
# Count packets per node (last 24h) # Extract channel utilization and air_util_tx from device metrics
ch_util = telem.get("channelUtilization") or telem.get("channel_utilization")
if ch_util is not None:
node.channel_utilization = float(ch_util)
air_tx = telem.get("airUtilTx") or telem.get("air_util_tx")
if air_tx is not None:
node.air_util_tx = float(air_tx)
# Check for uplink (MQTT) enabled
uplink = telem.get("uplinkEnabled") or telem.get("uplink_enabled")
if uplink:
node.uplink_enabled = True
# Count packets per node (last 24h) with portnum breakdown
twenty_four_hours_ago = now - 86400 twenty_four_hours_ago = now - 86400
for pkt in all_packets: for pkt in all_packets:
pkt_time = pkt.get("timestamp") or pkt.get("rxTime") or 0 pkt_time = pkt.get("timestamp") or pkt.get("rxTime") or 0
@ -361,10 +404,24 @@ class MeshHealthEngine:
nodes[from_id].packet_count_24h += 1 nodes[from_id].packet_count_24h += 1
# Get portnum for breakdown
port_num = pkt.get("portnum") or pkt.get("port_num") or pkt.get("portnum_name") or ""
port_name = str(port_num).upper()
# Track by portnum
if port_name:
nodes[from_id].packets_by_portnum[port_name] = \
nodes[from_id].packets_by_portnum.get(port_name, 0) + 1
# Check if text message # Check if text message
port_num = pkt.get("portnum") or pkt.get("port_num") or "" if "TEXT" in port_name:
if "TEXT" in str(port_num).upper():
nodes[from_id].text_packet_count_24h += 1 nodes[from_id].text_packet_count_24h += 1
# Count position packets
elif "POSITION" in port_name:
nodes[from_id].position_packet_count_24h += 1
# Count telemetry packets
elif "TELEMETRY" in port_name:
nodes[from_id].telemetry_packet_count_24h += 1
# Initialize regions from anchors # Initialize regions from anchors
region_map: dict[str, RegionHealth] = {} region_map: dict[str, RegionHealth] = {}
@ -497,19 +554,58 @@ class MeshHealthEngine:
self._compute_region_scores(regions, nodes, has_packet_data) self._compute_region_scores(regions, nodes, has_packet_data)
mesh_score = self._compute_mesh_score(regions, nodes, has_packet_data) mesh_score = self._compute_mesh_score(regions, nodes, has_packet_data)
# Build result # Get traceroute data for statistics
all_traceroutes = source_manager.get_all_traceroutes()
traceroute_count = len(all_traceroutes)
hop_counts = []
for tr in all_traceroutes:
# Extract hop count from traceroute data
route = tr.get("route") or tr.get("hops") or []
if isinstance(route, list):
hop_counts.append(len(route))
avg_hop_count = sum(hop_counts) / len(hop_counts) if hop_counts else 0.0
max_hop_count = max(hop_counts) if hop_counts else 0
# Get channel data and count MQTT/uplink nodes
all_channels = source_manager.get_all_channels()
uplink_count = sum(1 for node in nodes.values() if node.uplink_enabled)
# Build result with data availability flags
mesh_health = MeshHealth( mesh_health = MeshHealth(
regions=regions, regions=regions,
unlocated_nodes=unlocated, unlocated_nodes=unlocated,
nodes=nodes, nodes=nodes,
score=mesh_score, score=mesh_score,
last_computed=now, last_computed=now,
has_packet_data=has_packet_data,
has_telemetry_data=len(all_telemetry) > 0,
has_traceroute_data=traceroute_count > 0,
has_channel_data=len(all_channels) > 0,
traceroute_count=traceroute_count,
avg_hop_count=avg_hop_count,
max_hop_count=max_hop_count,
uplink_node_count=uplink_count,
) )
self._mesh_health = mesh_health self._mesh_health = mesh_health
# Log computation summary with data availability
data_sources = []
if has_packet_data:
data_sources.append(f"{len(all_packets)} pkts")
if len(all_telemetry) > 0:
data_sources.append(f"{len(all_telemetry)} telem")
if traceroute_count > 0:
data_sources.append(f"{traceroute_count} traces")
if len(all_channels) > 0:
data_sources.append(f"{len(all_channels)} ch")
data_str = ", ".join(data_sources) if data_sources else "nodes only"
logger.info( logger.info(
f"Mesh health computed: {mesh_health.total_nodes} nodes, " f"Mesh health computed: {mesh_health.total_nodes} nodes, "
f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100" f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100 "
f"[{data_str}]"
) )
return mesh_health return mesh_health
@ -696,4 +792,3 @@ class MeshHealthEngine:
n for n in self._mesh_health.nodes.values() n for n in self._mesh_health.nodes.values()
if n.battery_percent is not None and n.battery_percent < self.battery_warning_percent if n.battery_percent is not None and n.battery_percent < self.battery_warning_percent
] ]

View file

@ -70,7 +70,7 @@ class MeshReporter:
# Utilization # Utilization
util = score.util_percent util = score.util_percent
util_data_available = getattr(score, 'util_data_available', False) util_data_available = getattr(health, 'has_packet_data', False) or getattr(score, 'util_data_available', False)
if not util_data_available: if not util_data_available:
util_label = "N/A - no packet data" util_label = "N/A - no packet data"
elif util < 15: elif util < 15:
@ -95,13 +95,29 @@ class MeshReporter:
"", "",
f"Overall: {score.composite:.0f}/100 ({score.tier})", f"Overall: {score.composite:.0f}/100 ({score.tier})",
f"Infrastructure: {infra_online}/{infra_total} online ({infra_pct}%)", f"Infrastructure: {infra_online}/{infra_total} online ({infra_pct}%)",
f"Channel Utilization: {util:.1f}% avg ({util_label})",
f"Node Behavior: {score.flagged_nodes} nodes flagged",
f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)",
"",
"Regions:",
] ]
# Channel Utilization with data availability
if util_data_available:
lines.append(f"Channel Utilization: {util:.1f}% avg ({util_label})")
else:
lines.append("Channel Utilization: No data available")
lines.append(f"Node Behavior: {score.flagged_nodes} nodes flagged")
lines.append(f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)")
# Network topology stats (if available)
if health.has_traceroute_data:
lines.append(f"Routing: {health.traceroute_count} traceroutes, avg {health.avg_hop_count:.1f} hops, max {health.max_hop_count}")
else:
lines.append("Routing: No traceroute data available")
# MQTT uplink stats
lines.append(f"MQTT Uplinks: {health.uplink_node_count} nodes")
lines.append("")
lines.append("Regions:")
# Region summaries # Region summaries
for region in health.regions: for region in health.regions:
rs = region.score rs = region.score
@ -221,12 +237,22 @@ class MeshReporter:
# Channel utilization by locality # Channel utilization by locality
lines.append("") lines.append("")
mesh_health = self.health_engine.mesh_health
if mesh_health and mesh_health.has_packet_data:
lines.append(f"Channel Utilization: {rs.util_percent:.0f}%") lines.append(f"Channel Utilization: {rs.util_percent:.0f}%")
if region.localities: if region.localities:
lines.append(" Localities:") lines.append(" Localities:")
for loc in region.localities: for loc in region.localities:
node_count = len(loc.node_ids) node_count = len(loc.node_ids)
lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes") lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes")
else:
lines.append("Channel Utilization: No data available")
# MQTT uplink stats for region
uplink_nodes = [health.nodes.get(nid) for nid in region.node_ids
if health.nodes.get(nid) and health.nodes[nid].uplink_enabled]
lines.append("")
lines.append(f"MQTT Uplinks: {len(uplink_nodes)} nodes")
# Flagged nodes in this region # Flagged nodes in this region
flagged_in_region = [] flagged_in_region = []
@ -305,7 +331,24 @@ class MeshReporter:
lines.append("Traffic (24h):") lines.append("Traffic (24h):")
lines.append(f" Total packets: {node.packet_count_24h}") lines.append(f" Total packets: {node.packet_count_24h}")
lines.append(f" Text messages: {node.text_packet_count_24h}") lines.append(f" Text messages: {node.text_packet_count_24h}")
lines.append(f" Non-text: {node.non_text_packets}") lines.append(f" Position: {node.position_packet_count_24h}")
lines.append(f" Telemetry: {node.telemetry_packet_count_24h}")
lines.append(f" Other non-text: {node.non_text_packets - node.position_packet_count_24h - node.telemetry_packet_count_24h}")
# Estimated intervals
est_pos = node.estimated_position_interval
if est_pos is not None:
if est_pos < 60:
interval_str = f"{int(est_pos)}s"
else:
interval_str = f"{int(est_pos / 60)}m"
lines.append(f" Est. position interval: {interval_str}")
# Channel utilization from device telemetry
if node.channel_utilization is not None:
lines.append(f" Channel util (device): {node.channel_utilization:.1f}%")
if node.air_util_tx is not None:
lines.append(f" TX airtime: {node.air_util_tx:.1f}%")
# Power # Power
lines.append("") lines.append("")
@ -319,6 +362,11 @@ class MeshReporter:
lines.append(f" Voltage: {node.voltage:.2f}V") lines.append(f" Voltage: {node.voltage:.2f}V")
lines.append(f" Solar: {'Yes' if node.has_solar else 'Unknown'}") lines.append(f" Solar: {'Yes' if node.has_solar else 'Unknown'}")
# Connectivity
lines.append("")
lines.append("Connectivity:")
lines.append(f" MQTT Uplink: {'Enabled' if node.uplink_enabled else 'Disabled'}")
# Recommendations for this node # Recommendations for this node
recs = self._node_recommendations(node) recs = self._node_recommendations(node)
if recs: if recs:
@ -338,6 +386,19 @@ class MeshReporter:
ratio = node.non_text_packets / self.health_engine.packet_threshold ratio = node.non_text_packets / self.health_engine.packet_threshold
recs.append(f"Sending {ratio:.1f}x normal packets. Check position/telemetry intervals.") recs.append(f"Sending {ratio:.1f}x normal packets. Check position/telemetry intervals.")
# Position interval too frequent (< 300s = 5 min)
est_interval = node.estimated_position_interval
if est_interval is not None and est_interval < 300:
recs.append(f"Position interval ~{int(est_interval)}s is aggressive. Recommend 900s (15 min) for battery life.")
# High channel utilization on this node
if node.channel_utilization is not None and node.channel_utilization > 25:
recs.append(f"Channel utilization {node.channel_utilization:.0f}% - consider moving to less congested frequency.")
# High air_util_tx (this node transmitting a lot)
if node.air_util_tx is not None and node.air_util_tx > 10:
recs.append(f"TX airtime {node.air_util_tx:.1f}% - reduce telemetry frequency to be a better mesh citizen.")
# Low battery # Low battery
if node.battery_percent is not None and node.battery_percent < 20: if node.battery_percent is not None and node.battery_percent < 20:
recs.append(f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar.") recs.append(f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar.")
@ -347,6 +408,10 @@ class MeshReporter:
age = _format_age(node.last_seen) age = _format_age(node.last_seen)
recs.append(f"Node offline since {age}. Check power and connectivity.") recs.append(f"Node offline since {age}. Check power and connectivity.")
# Infrastructure node without MQTT uplink
if node.is_infrastructure and not node.uplink_enabled:
recs.append("Infrastructure node without MQTT uplink. Consider enabling for better mesh visibility.")
return recs return recs
def build_recommendations(self, scope: str, scope_value: str = None) -> str: def build_recommendations(self, scope: str, scope_value: str = None) -> str:
@ -410,6 +475,27 @@ class MeshReporter:
names = ", ".join(n.short_name or n.node_id[:4] for n in flagged[:3]) names = ", ".join(n.short_name or n.node_id[:4] for n in flagged[:3])
recs.append(f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings.") recs.append(f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings.")
# Check for nodes with aggressive position intervals
aggressive_interval_nodes = []
for nid in region.node_ids:
node = health.nodes.get(nid)
if node:
est = node.estimated_position_interval
if est is not None and est < 300:
aggressive_interval_nodes.append(node)
if aggressive_interval_nodes:
names = ", ".join(n.short_name or n.node_id[:4] for n in aggressive_interval_nodes[:3])
recs.append(f"Nodes with frequent position broadcasts ({names}). Recommend 900s interval.")
# Check MQTT/uplink coverage in region
infra_nodes = [health.nodes.get(nid) for nid in region.node_ids
if health.nodes.get(nid) and health.nodes[nid].is_infrastructure]
uplink_count = sum(1 for n in infra_nodes if n and n.uplink_enabled)
if infra_nodes and uplink_count == 0:
recs.append("No MQTT uplinks in region. Consider enabling on at least one infrastructure node.")
elif len(infra_nodes) >= 3 and uplink_count == 1:
recs.append(f"Only 1/{len(infra_nodes)} infrastructure nodes with MQTT uplink. Consider adding redundancy.")
return recs return recs
def _mesh_recommendations(self, health) -> list[str]: def _mesh_recommendations(self, health) -> list[str]:
@ -437,6 +523,29 @@ class MeshReporter:
if len(battery_warnings) > 2: if len(battery_warnings) > 2:
recs.append(f"{len(battery_warnings)} nodes with low battery. Consider solar additions for remote nodes.") recs.append(f"{len(battery_warnings)} nodes with low battery. Consider solar additions for remote nodes.")
# Hop count recommendation from traceroutes
if health.has_traceroute_data:
if health.avg_hop_count > 4:
recs.append(f"Average hop count {health.avg_hop_count:.1f} is high. Consider adding infrastructure to reduce latency.")
elif health.max_hop_count > 6:
recs.append(f"Max hop count {health.max_hop_count} indicates long routes. Strategic node placement could improve reach.")
# MQTT uplink coverage
if health.uplink_node_count == 0:
total_infra = sum(1 for n in health.nodes.values() if n.is_infrastructure)
if total_infra > 0:
recs.append("No MQTT uplinks detected. Enable on infrastructure nodes for better mesh visibility.")
elif health.total_regions > 0:
uplinks_per_region = health.uplink_node_count / health.total_regions
if uplinks_per_region < 1:
recs.append(f"Only {health.uplink_node_count} MQTT uplinks across {health.total_regions} regions. Consider adding redundancy.")
# Aggressive position intervals mesh-wide
aggressive_nodes = [n for n in health.nodes.values()
if n.estimated_position_interval is not None and n.estimated_position_interval < 300]
if len(aggressive_nodes) > 5:
recs.append(f"{len(aggressive_nodes)} nodes with position interval <5min. Recommend 15min (900s) as default.")
return recs return recs
def build_lora_compact(self, scope: str, scope_value: str = None) -> str: def build_lora_compact(self, scope: str, scope_value: str = None) -> str:
@ -566,4 +675,3 @@ class MeshReporter:
lines.append(f" {region.name}: {s.composite:.0f}/100{flag}") lines.append(f" {region.name}: {s.composite:.0f}/100{flag}")
return "\n".join(lines) return "\n".join(lines)

View file

@ -555,6 +555,148 @@ class MeshSourceManager:
"edge_duplicates": raw_edges - dedup_edges, "edge_duplicates": raw_edges - dedup_edges,
} }
def get_all_packets(self) -> list[dict]:
"""Get deduplicated packets from all MeshMonitor sources.
Packets are deduplicated by packet_id to avoid double-counting
when multiple sources report the same MQTT feed.
Returns:
List of packet dicts with '_sources' field
"""
packets_by_id: dict[int, dict] = {}
for name, source in self._sources.items():
if not isinstance(source, MeshMonitorDataSource):
continue
if not hasattr(source, "packets"):
continue
for pkt in source.packets:
packet_id = pkt.get("packet_id") or pkt.get("id")
if packet_id is None:
# Fallback key: (from_node, timestamp, portnum)
from_node = pkt.get("from_node") or pkt.get("from")
ts = pkt.get("timestamp") or pkt.get("rxTime")
portnum = pkt.get("portnum")
if from_node and ts:
packet_id = hash((from_node, ts, portnum))
else:
# Can't deduplicate, use negative counter
packet_id = -len(packets_by_id) - 1
if packet_id in packets_by_id:
existing = packets_by_id[packet_id]
if name not in existing["_sources"]:
existing["_sources"].append(name)
else:
tagged = dict(pkt)
tagged["_sources"] = [name]
packets_by_id[packet_id] = tagged
return list(packets_by_id.values())
def get_traffic_stats(self) -> dict[str, dict]:
"""Get traffic statistics from all sources.
Returns:
Dict mapping source name to traffic stats:
- hourly_counts: list of {period, count} for last 24h
- total_packets: total packet count
- packets_per_hour: average packets per hour
"""
stats = {}
for name, source in self._sources.items():
source_stats = {}
if isinstance(source, MeshviewSource):
# Meshview has stats with hourly breakdown
if hasattr(source, "stats") and source.stats:
data = source.stats.get("data", [])
source_stats["hourly_counts"] = data
total = sum(item.get("count", 0) for item in data)
source_stats["total_packets"] = total
source_stats["packets_per_hour"] = total / len(data) if data else 0
if hasattr(source, "counts") and source.counts:
source_stats["total_seen"] = source.counts.get("total_seen", 0)
source_stats["total_packets_all_time"] = source.counts.get("total_packets", 0)
elif isinstance(source, MeshMonitorDataSource):
# MeshMonitor has network_stats
if hasattr(source, "network_stats") and source.network_stats:
ns = source.network_stats
source_stats["total_nodes"] = ns.get("totalNodes", 0)
source_stats["active_nodes"] = ns.get("activeNodes", 0)
source_stats["traceroute_count"] = ns.get("tracerouteCount", 0)
source_stats["last_updated"] = ns.get("lastUpdated", 0)
# Count packets by portnum for breakdown
if hasattr(source, "packets") and source.packets:
portnum_counts: dict[str, int] = {}
for pkt in source.packets:
portnum = pkt.get("portnum_name") or str(pkt.get("portnum", "UNKNOWN"))
portnum_counts[portnum] = portnum_counts.get(portnum, 0) + 1
source_stats["packets_by_portnum"] = portnum_counts
source_stats["packet_count"] = len(source.packets)
if source_stats:
stats[name] = source_stats
return stats
def get_solar_data(self) -> list[dict]:
"""Get solar/power data from all MeshMonitor sources.
Returns:
List of solar data dicts with '_sources' field
"""
all_solar = []
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
if hasattr(source, "solar") and source.solar:
for item in source.solar:
tagged = dict(item)
tagged["_sources"] = [name]
all_solar.append(tagged)
return all_solar
def get_network_stats(self) -> dict[str, dict]:
"""Get network statistics from all sources.
Returns:
Dict mapping source name to network stats dict
"""
stats = {}
for name, source in self._sources.items():
source_stats = {}
if isinstance(source, MeshviewSource):
if hasattr(source, "counts") and source.counts:
source_stats.update(source.counts)
source_stats["node_count"] = len(source.nodes)
source_stats["edge_count"] = len(source.edges)
elif isinstance(source, MeshMonitorDataSource):
if hasattr(source, "network_stats") and source.network_stats:
source_stats.update(source.network_stats)
if hasattr(source, "topology") and source.topology:
source_stats["topology"] = source.topology
source_stats["node_count"] = len(source.nodes)
source_stats["telemetry_count"] = len(source.telemetry)
source_stats["traceroute_count"] = len(source.traceroutes)
source_stats["channel_count"] = len(source.channels)
if hasattr(source, "packets"):
source_stats["packet_count"] = len(source.packets)
if source_stats:
stats[name] = source_stats
return stats
@property @property
def source_count(self) -> int: def source_count(self) -> int:
"""Get number of active sources.""" """Get number of active sources."""