From 3959444a09a4c9bc78b33b0c04ca6f24d575c772 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Mon, 4 May 2026 21:22:30 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Complete=20data=20pipeline=20=E2=80=94?= =?UTF-8?q?=20utilization,=20behavior,=20power,=20solar,=20traceroutes=20a?= =?UTF-8?q?ll=20wired?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- meshai/mesh_health.py | 107 ++++++++++++++++++++++++++++-- meshai/mesh_reporter.py | 136 ++++++++++++++++++++++++++++++++++---- meshai/mesh_sources.py | 142 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 365 insertions(+), 20 deletions(-) diff --git a/meshai/mesh_health.py b/meshai/mesh_health.py index 4cdac09..c178a5e 100644 --- a/meshai/mesh_health.py +++ b/meshai/mesh_health.py @@ -109,9 +109,17 @@ class NodeHealth: # Metrics packet_count_24h: int = 0 text_packet_count_24h: int = 0 + position_packet_count_24h: int = 0 + telemetry_packet_count_24h: int = 0 battery_percent: Optional[float] = None voltage: Optional[float] = None + channel_utilization: Optional[float] = None # From device telemetry + air_util_tx: Optional[float] = None # From device telemetry has_solar: bool = False + uplink_enabled: bool = False + + # Packet breakdown by portnum + packets_by_portnum: dict[str, int] = field(default_factory=dict) # Scores score: HealthScore = field(default_factory=HealthScore) @@ -121,6 +129,13 @@ class NodeHealth: """Non-text packets in 24h.""" return self.packet_count_24h - self.text_packet_count_24h + @property + def estimated_position_interval(self) -> Optional[float]: + """Estimate position broadcast interval in seconds.""" + if self.position_packet_count_24h > 0: + return 86400 / self.position_packet_count_24h + return None + @dataclass class LocalityHealth: @@ -155,6 +170,20 @@ class MeshHealth: score: HealthScore = field(default_factory=HealthScore) last_computed: float = 0.0 + # Data availability flags for reporting + has_packet_data: bool = False + has_telemetry_data: bool = False + has_traceroute_data: bool = False + has_channel_data: bool = False + + # Traceroute statistics + traceroute_count: int = 0 + avg_hop_count: float = 0.0 + max_hop_count: int = 0 + + # MQTT/uplink statistics + uplink_node_count: int = 0 + @property def total_nodes(self) -> int: return len(self.nodes) @@ -348,7 +377,21 @@ class MeshHealthEngine: if voltage is not None: node.voltage = float(voltage) - # Count packets per node (last 24h) + # Extract channel utilization and air_util_tx from device metrics + ch_util = telem.get("channelUtilization") or telem.get("channel_utilization") + if ch_util is not None: + node.channel_utilization = float(ch_util) + + air_tx = telem.get("airUtilTx") or telem.get("air_util_tx") + if air_tx is not None: + node.air_util_tx = float(air_tx) + + # Check for uplink (MQTT) enabled + uplink = telem.get("uplinkEnabled") or telem.get("uplink_enabled") + if uplink: + node.uplink_enabled = True + + # Count packets per node (last 24h) with portnum breakdown twenty_four_hours_ago = now - 86400 for pkt in all_packets: pkt_time = pkt.get("timestamp") or pkt.get("rxTime") or 0 @@ -361,10 +404,24 @@ class MeshHealthEngine: nodes[from_id].packet_count_24h += 1 + # Get portnum for breakdown + port_num = pkt.get("portnum") or pkt.get("port_num") or pkt.get("portnum_name") or "" + port_name = str(port_num).upper() + + # Track by portnum + if port_name: + nodes[from_id].packets_by_portnum[port_name] = \ + nodes[from_id].packets_by_portnum.get(port_name, 0) + 1 + # Check if text message - port_num = pkt.get("portnum") or pkt.get("port_num") or "" - if "TEXT" in str(port_num).upper(): + if "TEXT" in port_name: nodes[from_id].text_packet_count_24h += 1 + # Count position packets + elif "POSITION" in port_name: + nodes[from_id].position_packet_count_24h += 1 + # Count telemetry packets + elif "TELEMETRY" in port_name: + nodes[from_id].telemetry_packet_count_24h += 1 # Initialize regions from anchors region_map: dict[str, RegionHealth] = {} @@ -497,19 +554,58 @@ class MeshHealthEngine: self._compute_region_scores(regions, nodes, has_packet_data) mesh_score = self._compute_mesh_score(regions, nodes, has_packet_data) - # Build result + # Get traceroute data for statistics + all_traceroutes = source_manager.get_all_traceroutes() + traceroute_count = len(all_traceroutes) + hop_counts = [] + for tr in all_traceroutes: + # Extract hop count from traceroute data + route = tr.get("route") or tr.get("hops") or [] + if isinstance(route, list): + hop_counts.append(len(route)) + + avg_hop_count = sum(hop_counts) / len(hop_counts) if hop_counts else 0.0 + max_hop_count = max(hop_counts) if hop_counts else 0 + + # Get channel data and count MQTT/uplink nodes + all_channels = source_manager.get_all_channels() + uplink_count = sum(1 for node in nodes.values() if node.uplink_enabled) + + # Build result with data availability flags mesh_health = MeshHealth( regions=regions, unlocated_nodes=unlocated, nodes=nodes, score=mesh_score, last_computed=now, + has_packet_data=has_packet_data, + has_telemetry_data=len(all_telemetry) > 0, + has_traceroute_data=traceroute_count > 0, + has_channel_data=len(all_channels) > 0, + traceroute_count=traceroute_count, + avg_hop_count=avg_hop_count, + max_hop_count=max_hop_count, + uplink_node_count=uplink_count, ) self._mesh_health = mesh_health + + # Log computation summary with data availability + data_sources = [] + if has_packet_data: + data_sources.append(f"{len(all_packets)} pkts") + if len(all_telemetry) > 0: + data_sources.append(f"{len(all_telemetry)} telem") + if traceroute_count > 0: + data_sources.append(f"{traceroute_count} traces") + if len(all_channels) > 0: + data_sources.append(f"{len(all_channels)} ch") + data_str = ", ".join(data_sources) if data_sources else "nodes only" + logger.info( f"Mesh health computed: {mesh_health.total_nodes} nodes, " - f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100" + f"{mesh_health.total_regions} regions, score {mesh_score.composite:.0f}/100 " + f"[{data_str}]" ) return mesh_health @@ -696,4 +792,3 @@ class MeshHealthEngine: n for n in self._mesh_health.nodes.values() if n.battery_percent is not None and n.battery_percent < self.battery_warning_percent ] - diff --git a/meshai/mesh_reporter.py b/meshai/mesh_reporter.py index 3203f39..69df3dd 100644 --- a/meshai/mesh_reporter.py +++ b/meshai/mesh_reporter.py @@ -70,7 +70,7 @@ class MeshReporter: # Utilization util = score.util_percent - util_data_available = getattr(score, 'util_data_available', False) + util_data_available = getattr(health, 'has_packet_data', False) or getattr(score, 'util_data_available', False) if not util_data_available: util_label = "N/A - no packet data" elif util < 15: @@ -95,13 +95,29 @@ class MeshReporter: "", f"Overall: {score.composite:.0f}/100 ({score.tier})", f"Infrastructure: {infra_online}/{infra_total} online ({infra_pct}%)", - f"Channel Utilization: {util:.1f}% avg ({util_label})", - f"Node Behavior: {score.flagged_nodes} nodes flagged", - f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)", - "", - "Regions:", ] + # Channel Utilization with data availability + if util_data_available: + lines.append(f"Channel Utilization: {util:.1f}% avg ({util_label})") + else: + lines.append("Channel Utilization: No data available") + + lines.append(f"Node Behavior: {score.flagged_nodes} nodes flagged") + lines.append(f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)") + + # Network topology stats (if available) + if health.has_traceroute_data: + lines.append(f"Routing: {health.traceroute_count} traceroutes, avg {health.avg_hop_count:.1f} hops, max {health.max_hop_count}") + else: + lines.append("Routing: No traceroute data available") + + # MQTT uplink stats + lines.append(f"MQTT Uplinks: {health.uplink_node_count} nodes") + + lines.append("") + lines.append("Regions:") + # Region summaries for region in health.regions: rs = region.score @@ -221,12 +237,22 @@ class MeshReporter: # Channel utilization by locality lines.append("") - lines.append(f"Channel Utilization: {rs.util_percent:.0f}%") - if region.localities: - lines.append(" Localities:") - for loc in region.localities: - node_count = len(loc.node_ids) - lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes") + mesh_health = self.health_engine.mesh_health + if mesh_health and mesh_health.has_packet_data: + lines.append(f"Channel Utilization: {rs.util_percent:.0f}%") + if region.localities: + lines.append(" Localities:") + for loc in region.localities: + node_count = len(loc.node_ids) + lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes") + else: + lines.append("Channel Utilization: No data available") + + # MQTT uplink stats for region + uplink_nodes = [health.nodes.get(nid) for nid in region.node_ids + if health.nodes.get(nid) and health.nodes[nid].uplink_enabled] + lines.append("") + lines.append(f"MQTT Uplinks: {len(uplink_nodes)} nodes") # Flagged nodes in this region flagged_in_region = [] @@ -305,7 +331,24 @@ class MeshReporter: lines.append("Traffic (24h):") lines.append(f" Total packets: {node.packet_count_24h}") lines.append(f" Text messages: {node.text_packet_count_24h}") - lines.append(f" Non-text: {node.non_text_packets}") + lines.append(f" Position: {node.position_packet_count_24h}") + lines.append(f" Telemetry: {node.telemetry_packet_count_24h}") + lines.append(f" Other non-text: {node.non_text_packets - node.position_packet_count_24h - node.telemetry_packet_count_24h}") + + # Estimated intervals + est_pos = node.estimated_position_interval + if est_pos is not None: + if est_pos < 60: + interval_str = f"{int(est_pos)}s" + else: + interval_str = f"{int(est_pos / 60)}m" + lines.append(f" Est. position interval: {interval_str}") + + # Channel utilization from device telemetry + if node.channel_utilization is not None: + lines.append(f" Channel util (device): {node.channel_utilization:.1f}%") + if node.air_util_tx is not None: + lines.append(f" TX airtime: {node.air_util_tx:.1f}%") # Power lines.append("") @@ -319,6 +362,11 @@ class MeshReporter: lines.append(f" Voltage: {node.voltage:.2f}V") lines.append(f" Solar: {'Yes' if node.has_solar else 'Unknown'}") + # Connectivity + lines.append("") + lines.append("Connectivity:") + lines.append(f" MQTT Uplink: {'Enabled' if node.uplink_enabled else 'Disabled'}") + # Recommendations for this node recs = self._node_recommendations(node) if recs: @@ -338,6 +386,19 @@ class MeshReporter: ratio = node.non_text_packets / self.health_engine.packet_threshold recs.append(f"Sending {ratio:.1f}x normal packets. Check position/telemetry intervals.") + # Position interval too frequent (< 300s = 5 min) + est_interval = node.estimated_position_interval + if est_interval is not None and est_interval < 300: + recs.append(f"Position interval ~{int(est_interval)}s is aggressive. Recommend 900s (15 min) for battery life.") + + # High channel utilization on this node + if node.channel_utilization is not None and node.channel_utilization > 25: + recs.append(f"Channel utilization {node.channel_utilization:.0f}% - consider moving to less congested frequency.") + + # High air_util_tx (this node transmitting a lot) + if node.air_util_tx is not None and node.air_util_tx > 10: + recs.append(f"TX airtime {node.air_util_tx:.1f}% - reduce telemetry frequency to be a better mesh citizen.") + # Low battery if node.battery_percent is not None and node.battery_percent < 20: recs.append(f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar.") @@ -347,6 +408,10 @@ class MeshReporter: age = _format_age(node.last_seen) recs.append(f"Node offline since {age}. Check power and connectivity.") + # Infrastructure node without MQTT uplink + if node.is_infrastructure and not node.uplink_enabled: + recs.append("Infrastructure node without MQTT uplink. Consider enabling for better mesh visibility.") + return recs def build_recommendations(self, scope: str, scope_value: str = None) -> str: @@ -410,6 +475,27 @@ class MeshReporter: names = ", ".join(n.short_name or n.node_id[:4] for n in flagged[:3]) recs.append(f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings.") + # Check for nodes with aggressive position intervals + aggressive_interval_nodes = [] + for nid in region.node_ids: + node = health.nodes.get(nid) + if node: + est = node.estimated_position_interval + if est is not None and est < 300: + aggressive_interval_nodes.append(node) + if aggressive_interval_nodes: + names = ", ".join(n.short_name or n.node_id[:4] for n in aggressive_interval_nodes[:3]) + recs.append(f"Nodes with frequent position broadcasts ({names}). Recommend 900s interval.") + + # Check MQTT/uplink coverage in region + infra_nodes = [health.nodes.get(nid) for nid in region.node_ids + if health.nodes.get(nid) and health.nodes[nid].is_infrastructure] + uplink_count = sum(1 for n in infra_nodes if n and n.uplink_enabled) + if infra_nodes and uplink_count == 0: + recs.append("No MQTT uplinks in region. Consider enabling on at least one infrastructure node.") + elif len(infra_nodes) >= 3 and uplink_count == 1: + recs.append(f"Only 1/{len(infra_nodes)} infrastructure nodes with MQTT uplink. Consider adding redundancy.") + return recs def _mesh_recommendations(self, health) -> list[str]: @@ -437,6 +523,29 @@ class MeshReporter: if len(battery_warnings) > 2: recs.append(f"{len(battery_warnings)} nodes with low battery. Consider solar additions for remote nodes.") + # Hop count recommendation from traceroutes + if health.has_traceroute_data: + if health.avg_hop_count > 4: + recs.append(f"Average hop count {health.avg_hop_count:.1f} is high. Consider adding infrastructure to reduce latency.") + elif health.max_hop_count > 6: + recs.append(f"Max hop count {health.max_hop_count} indicates long routes. Strategic node placement could improve reach.") + + # MQTT uplink coverage + if health.uplink_node_count == 0: + total_infra = sum(1 for n in health.nodes.values() if n.is_infrastructure) + if total_infra > 0: + recs.append("No MQTT uplinks detected. Enable on infrastructure nodes for better mesh visibility.") + elif health.total_regions > 0: + uplinks_per_region = health.uplink_node_count / health.total_regions + if uplinks_per_region < 1: + recs.append(f"Only {health.uplink_node_count} MQTT uplinks across {health.total_regions} regions. Consider adding redundancy.") + + # Aggressive position intervals mesh-wide + aggressive_nodes = [n for n in health.nodes.values() + if n.estimated_position_interval is not None and n.estimated_position_interval < 300] + if len(aggressive_nodes) > 5: + recs.append(f"{len(aggressive_nodes)} nodes with position interval <5min. Recommend 15min (900s) as default.") + return recs def build_lora_compact(self, scope: str, scope_value: str = None) -> str: @@ -566,4 +675,3 @@ class MeshReporter: lines.append(f" {region.name}: {s.composite:.0f}/100{flag}") return "\n".join(lines) - diff --git a/meshai/mesh_sources.py b/meshai/mesh_sources.py index 1ee8b6a..56098ea 100644 --- a/meshai/mesh_sources.py +++ b/meshai/mesh_sources.py @@ -555,6 +555,148 @@ class MeshSourceManager: "edge_duplicates": raw_edges - dedup_edges, } + def get_all_packets(self) -> list[dict]: + """Get deduplicated packets from all MeshMonitor sources. + + Packets are deduplicated by packet_id to avoid double-counting + when multiple sources report the same MQTT feed. + + Returns: + List of packet dicts with '_sources' field + """ + packets_by_id: dict[int, dict] = {} + + for name, source in self._sources.items(): + if not isinstance(source, MeshMonitorDataSource): + continue + + if not hasattr(source, "packets"): + continue + + for pkt in source.packets: + packet_id = pkt.get("packet_id") or pkt.get("id") + if packet_id is None: + # Fallback key: (from_node, timestamp, portnum) + from_node = pkt.get("from_node") or pkt.get("from") + ts = pkt.get("timestamp") or pkt.get("rxTime") + portnum = pkt.get("portnum") + if from_node and ts: + packet_id = hash((from_node, ts, portnum)) + else: + # Can't deduplicate, use negative counter + packet_id = -len(packets_by_id) - 1 + + if packet_id in packets_by_id: + existing = packets_by_id[packet_id] + if name not in existing["_sources"]: + existing["_sources"].append(name) + else: + tagged = dict(pkt) + tagged["_sources"] = [name] + packets_by_id[packet_id] = tagged + + return list(packets_by_id.values()) + + def get_traffic_stats(self) -> dict[str, dict]: + """Get traffic statistics from all sources. + + Returns: + Dict mapping source name to traffic stats: + - hourly_counts: list of {period, count} for last 24h + - total_packets: total packet count + - packets_per_hour: average packets per hour + """ + stats = {} + + for name, source in self._sources.items(): + source_stats = {} + + if isinstance(source, MeshviewSource): + # Meshview has stats with hourly breakdown + if hasattr(source, "stats") and source.stats: + data = source.stats.get("data", []) + source_stats["hourly_counts"] = data + total = sum(item.get("count", 0) for item in data) + source_stats["total_packets"] = total + source_stats["packets_per_hour"] = total / len(data) if data else 0 + + if hasattr(source, "counts") and source.counts: + source_stats["total_seen"] = source.counts.get("total_seen", 0) + source_stats["total_packets_all_time"] = source.counts.get("total_packets", 0) + + elif isinstance(source, MeshMonitorDataSource): + # MeshMonitor has network_stats + if hasattr(source, "network_stats") and source.network_stats: + ns = source.network_stats + source_stats["total_nodes"] = ns.get("totalNodes", 0) + source_stats["active_nodes"] = ns.get("activeNodes", 0) + source_stats["traceroute_count"] = ns.get("tracerouteCount", 0) + source_stats["last_updated"] = ns.get("lastUpdated", 0) + + # Count packets by portnum for breakdown + if hasattr(source, "packets") and source.packets: + portnum_counts: dict[str, int] = {} + for pkt in source.packets: + portnum = pkt.get("portnum_name") or str(pkt.get("portnum", "UNKNOWN")) + portnum_counts[portnum] = portnum_counts.get(portnum, 0) + 1 + source_stats["packets_by_portnum"] = portnum_counts + source_stats["packet_count"] = len(source.packets) + + if source_stats: + stats[name] = source_stats + + return stats + + def get_solar_data(self) -> list[dict]: + """Get solar/power data from all MeshMonitor sources. + + Returns: + List of solar data dicts with '_sources' field + """ + all_solar = [] + for name, source in self._sources.items(): + if isinstance(source, MeshMonitorDataSource): + if hasattr(source, "solar") and source.solar: + for item in source.solar: + tagged = dict(item) + tagged["_sources"] = [name] + all_solar.append(tagged) + return all_solar + + def get_network_stats(self) -> dict[str, dict]: + """Get network statistics from all sources. + + Returns: + Dict mapping source name to network stats dict + """ + stats = {} + + for name, source in self._sources.items(): + source_stats = {} + + if isinstance(source, MeshviewSource): + if hasattr(source, "counts") and source.counts: + source_stats.update(source.counts) + source_stats["node_count"] = len(source.nodes) + source_stats["edge_count"] = len(source.edges) + + elif isinstance(source, MeshMonitorDataSource): + if hasattr(source, "network_stats") and source.network_stats: + source_stats.update(source.network_stats) + if hasattr(source, "topology") and source.topology: + source_stats["topology"] = source.topology + source_stats["node_count"] = len(source.nodes) + source_stats["telemetry_count"] = len(source.telemetry) + source_stats["traceroute_count"] = len(source.traceroutes) + source_stats["channel_count"] = len(source.channels) + if hasattr(source, "packets"): + source_stats["packet_count"] = len(source.packets) + + if source_stats: + stats[name] = source_stats + + return stats + @property def source_count(self) -> int: """Get number of active sources."""