diff --git a/meshai/mesh_data_store.py b/meshai/mesh_data_store.py index 6880cc5..847c23d 100644 --- a/meshai/mesh_data_store.py +++ b/meshai/mesh_data_store.py @@ -1204,107 +1204,89 @@ class MeshDataStore: logger.warning(f"Failed to enrich environmental data: {e}") def _enrich_deliverability(self) -> None: - """Enrich with deliverability metrics from Meshview counts. + """Compute deliverability metrics from node source overlap. - Computes mesh-wide average gateways per packet from Meshview's - total_packets and total_seen counts. + Each Meshview/MeshMonitor source represents a gateway's view of the mesh. + If a node is seen by N sources, its packets are reaching N gateways. + This uses node visibility as a proxy for packet deliverability. """ - # Get counts from Meshview sources - for name, source in self._sources.items(): - if isinstance(source, MeshviewSource): - counts = source.counts - if counts: - total_packets = counts.get("total_packets", 0) - total_seen = counts.get("total_seen", 0) - - if total_packets > 0: - avg_gateways = total_seen / total_packets - self._deliverability = { - "avg_gateways": avg_gateways, - "total_packets": total_packets, - "total_seen": total_seen, - "gateway_count": 1, # Count of unique gateways (from is_mqtt_gateway) - } - - # Count MQTT gateways - gw_count = sum( - 1 for n in self._nodes.values() - if n.is_mqtt_gateway - ) - if gw_count > 0: - self._deliverability["gateway_count"] = gw_count - - logger.debug( - f"Deliverability: avg {avg_gateways:.2f} gateways/packet " - f"({total_seen}/{total_packets})" - ) - - # Sample gateway coverage for infrastructure nodes - self._sample_gateway_coverage(source) - return - - def _sample_gateway_coverage(self, source: "MeshviewSource") -> None: - """Sample gateway coverage for infrastructure nodes. - - Samples 10-20 recent packets to measure per-node gateway reach. - Updates UnifiedNode.avg_gateways and deliverability_score. - """ - import random - - # Get infrastructure nodes to sample - infra_nodes = [ - n for n in self._nodes.values() - if n.role in {"ROUTER", "ROUTER_LATE", "ROUTER_CLIENT", "REPEATER"} - and n.packets_sent_24h > 0 - ] - - if not infra_nodes: + total_sources = len(self._sources) + if total_sources == 0: return - # Sample up to 5 infrastructure nodes - sample_nodes = random.sample(infra_nodes, min(5, len(infra_nodes))) - sampled_count = 0 + # Count Meshview sources specifically (these are the MQTT gateways) + meshview_count = sum( + 1 for src in self._sources.values() + if isinstance(src, MeshviewSource) + ) - # Check if source supports required methods - if not hasattr(source, "fetch_recent_packets"): - logger.debug("Gateway sampling skipped: source lacks fetch_recent_packets") - return + nodes_with_data = 0 + total_gateway_sum = 0 + max_gateways_seen = 0 - for node in sample_nodes: - # Get recent packets from this node - packets = source.fetch_recent_packets(node.node_num, limit=5) - if not packets: + for node in self._nodes.values(): + if not node.sources: continue - gateway_counts = [] - unique_gateways = set() + # Count how many sources see this node + source_count = len(node.sources) - for pkt in packets[:5]: # Limit to 5 packets per node - pkt_id = pkt.get("id") or pkt.get("packet_id") - if not pkt_id: - continue + # Set per-node metrics + node.avg_gateways = float(source_count) + node.max_gateways = source_count + node.source_reach = float(source_count) - seen_data = source.fetch_packets_seen(pkt_id) - if seen_data: - gateway_counts.append(len(seen_data)) - for gw in seen_data: - gw_id = gw.get("node_id") - if gw_id: - unique_gateways.add(gw_id) - sampled_count += 1 + # Deliverability score: % of max possible sources + node.deliverability_score = (source_count / total_sources) * 100 - if gateway_counts: - avg_gw = sum(gateway_counts) / len(gateway_counts) - # Deliverability: % of packets reaching 2+ gateways - multi_gw = sum(1 for c in gateway_counts if c >= 2) - deliver_pct = (multi_gw / len(gateway_counts)) * 100 + nodes_with_data += 1 + total_gateway_sum += source_count + max_gateways_seen = max(max_gateways_seen, source_count) - node.avg_gateways = avg_gw - node.deliverability_score = deliver_pct - node.max_gateways = max(gateway_counts) if gateway_counts else None + # Compute mesh-wide metrics + if nodes_with_data > 0: + mesh_avg = total_gateway_sum / nodes_with_data + self._deliverability = { + "avg_gateways": mesh_avg, + "max_gateways": max_gateways_seen, + "total_sources": total_sources, + "meshview_sources": meshview_count, + "nodes_with_data": nodes_with_data, + "source": "node_source_overlap", + } - if sampled_count > 0: - logger.debug(f"Gateway sampling: {sampled_count} packets from {len(sample_nodes)} nodes") + # Distribution: how many nodes reach N+ gateways + dist = {} + for threshold in range(1, total_sources + 1): + count = sum( + 1 for n in self._nodes.values() + if n.avg_gateways is not None and n.avg_gateways >= threshold + ) + dist[f"reaching_{threshold}_plus"] = count + + self._deliverability["distribution"] = dist + + logger.info( + f"Deliverability: {nodes_with_data} nodes, " + f"avg {mesh_avg:.2f} gateways/node, " + f"max {max_gateways_seen}/{total_sources} sources" + ) + else: + # Fallback to single-source ratio if no node overlap data + for name, source in self._sources.items(): + if isinstance(source, MeshviewSource): + counts = source.counts + if counts: + tp = counts.get("total_packets", 0) + ts = counts.get("total_seen", 0) + if tp > 0: + self._deliverability = { + "avg_gateways": ts / tp, + "total_sources": total_sources, + "nodes_with_data": 0, + "source": "single_source_fallback", + } + return def get_coverage_gaps(self) -> list[dict]: """Get nodes with poor coverage (low gateway reach). @@ -1873,9 +1855,18 @@ class MeshDataStore: """Get mesh-wide deliverability metrics. Returns: - Dict with avg_gateways, total_packets, total_seen, gateway_count + Dict with avg_gateways, max_gateways, total_sources, nodes_with_data, etc. """ - return self._deliverability.copy() + result = self._deliverability.copy() + + # Add computed summary if we have per-node data + nodes_with_gw = [n for n in self._nodes.values() if n.avg_gateways is not None] + if nodes_with_gw: + result["computed_avg"] = sum(n.avg_gateways for n in nodes_with_gw) / len(nodes_with_gw) + result["computed_max"] = max(n.max_gateways or 0 for n in nodes_with_gw) + result["computed_nodes"] = len(nodes_with_gw) + + return result def get_node_deliverability(self, node_num: int) -> Optional[dict]: """Get per-node deliverability if available. @@ -2102,7 +2093,7 @@ class MeshDataStore: "to_node": edge.to_node, "snr": edge.snr, "rssi": edge.rssi, - "timestamp": edge.timestamp, + "timestamp": edge.last_seen, } result.append(edge_dict) return result