Add node/edge deduplication to MeshSourceManager

- Add _extract_node_num() helper to normalize node IDs across formats
- Add _normalize_edge_key() helper for undirected edge deduplication
- Update get_all_nodes() to deduplicate by nodeNum with _sources field
- Update get_all_edges() to deduplicate by sorted (from, to) tuple
- Update get_all_telemetry() to deduplicate by (nodeNum, timestamp)
- Add get_dedup_stats() method for raw vs dedup counts
- Add get_stats_by_source() method for per-source statistics
- Change _source field to _sources (list) for multi-source tracking

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-04 18:02:47 +00:00
commit 959c128c6f

View file

@ -1,4 +1,4 @@
"""Mesh data source manager."""
"""Mesh data source manager with deduplication."""
import logging
import time
@ -11,8 +11,104 @@ from .sources.meshmonitor_data import MeshMonitorDataSource
logger = logging.getLogger(__name__)
def _extract_node_num(node: dict) -> int | None:
"""Extract numeric node ID from various formats.
Handles:
- nodeNum: 662178887 (numeric)
- node_id: "!27780c47" (hex with prefix)
- node_id: "27780c47" (hex without prefix)
- num: 662178887 (numeric field)
Args:
node: Node dict from any source
Returns:
Numeric node ID or None if not extractable
"""
# Try numeric fields first
for field in ("nodeNum", "num", "node_num"):
if field in node:
val = node[field]
if isinstance(val, int):
return val
if isinstance(val, str) and val.isdigit():
return int(val)
# Try hex node_id field
if "node_id" in node:
nid = node["node_id"]
if isinstance(nid, str):
# Strip leading ! if present
hex_str = nid.lstrip("!")
try:
return int(hex_str, 16)
except ValueError:
pass
elif isinstance(nid, int):
return nid
# Try generic id field
if "id" in node:
val = node["id"]
if isinstance(val, int):
return val
if isinstance(val, str):
if val.isdigit():
return int(val)
# Might be hex
hex_str = val.lstrip("!")
try:
return int(hex_str, 16)
except ValueError:
pass
return None
def _normalize_edge_key(edge: dict) -> tuple[int, int] | None:
"""Normalize edge to a canonical (from_num, to_num) tuple.
Edges are undirected for deduplication purposes, so
we return a sorted tuple (smaller_id, larger_id).
Args:
edge: Edge dict from Meshview
Returns:
Sorted tuple of (from_num, to_num) or None if invalid
"""
from_num = edge.get("from_node") or edge.get("from") or edge.get("from_num")
to_num = edge.get("to_node") or edge.get("to") or edge.get("to_num")
if from_num is None or to_num is None:
return None
# Convert to int if string
if isinstance(from_num, str):
if from_num.isdigit():
from_num = int(from_num)
else:
try:
from_num = int(from_num.lstrip("!"), 16)
except ValueError:
return None
if isinstance(to_num, str):
if to_num.isdigit():
to_num = int(to_num)
else:
try:
to_num = int(to_num.lstrip("!"), 16)
except ValueError:
return None
# Return sorted tuple for consistent deduplication
return (min(from_num, to_num), max(from_num, to_num))
class MeshSourceManager:
"""Manages multiple mesh data sources."""
"""Manages multiple mesh data sources with deduplication."""
def __init__(self, source_configs: list[MeshSourceConfig]):
"""Initialize source manager.
@ -84,61 +180,146 @@ class MeshSourceManager:
return self._sources.get(name)
def get_all_nodes(self) -> list[dict]:
"""Get nodes from all sources, tagged with source name.
"""Get deduplicated nodes from all sources.
Nodes are deduplicated by their numeric node ID. When a node appears
in multiple sources, data is merged with the following rules:
- Most fields: last source wins
- _sources: accumulates all source names
Returns:
List of node dicts with '_source' field added
List of deduplicated node dicts with '_sources' field (list)
"""
all_nodes = []
nodes_by_num: dict[int, dict] = {}
for name, source in self._sources.items():
for node in source.nodes:
node_num = _extract_node_num(node)
if node_num is None:
# Can't deduplicate, include as-is with source tag
tagged = dict(node)
tagged["_source"] = name
all_nodes.append(tagged)
return all_nodes
tagged["_sources"] = [name]
# Use a negative counter as pseudo-key to avoid collisions
pseudo_key = -len(nodes_by_num) - 1
nodes_by_num[pseudo_key] = tagged
continue
if node_num in nodes_by_num:
# Merge: update existing with new data
existing = nodes_by_num[node_num]
# Add new source to sources list
if name not in existing["_sources"]:
existing["_sources"].append(name)
# Update all fields except _sources
for key, value in node.items():
if key != "_sources" and value is not None:
existing[key] = value
else:
# New node
tagged = dict(node)
tagged["_sources"] = [name]
nodes_by_num[node_num] = tagged
return list(nodes_by_num.values())
def get_all_edges(self) -> list[dict]:
"""Get edges from all Meshview sources, tagged with source name.
"""Get deduplicated edges from all Meshview sources.
Edges are deduplicated by (from_num, to_num) sorted tuple.
When an edge appears in multiple sources, data is merged.
Returns:
List of edge dicts with '_source' field added
List of deduplicated edge dicts with '_sources' field
"""
all_edges = []
edges_by_key: dict[tuple[int, int], dict] = {}
for name, source in self._sources.items():
if isinstance(source, MeshviewSource):
if not isinstance(source, MeshviewSource):
continue
for edge in source.edges:
edge_key = _normalize_edge_key(edge)
if edge_key is None:
# Can't deduplicate, include as-is
tagged = dict(edge)
tagged["_source"] = name
all_edges.append(tagged)
return all_edges
tagged["_sources"] = [name]
# Use a tuple with negative to avoid collision
pseudo_key = (-len(edges_by_key) - 1, 0)
edges_by_key[pseudo_key] = tagged
continue
if edge_key in edges_by_key:
# Merge: update existing
existing = edges_by_key[edge_key]
if name not in existing["_sources"]:
existing["_sources"].append(name)
# Update fields
for key, value in edge.items():
if key != "_sources" and value is not None:
existing[key] = value
else:
# New edge
tagged = dict(edge)
tagged["_sources"] = [name]
edges_by_key[edge_key] = tagged
return list(edges_by_key.values())
def get_all_telemetry(self) -> list[dict]:
"""Get telemetry from all MeshMonitor sources, tagged with source name.
"""Get deduplicated telemetry from all MeshMonitor sources.
Telemetry is deduplicated by (node_num, timestamp) tuple.
Returns:
List of telemetry dicts with '_source' field added
List of deduplicated telemetry dicts with '_sources' field
"""
all_telemetry = []
# Key: (node_num, timestamp)
telemetry_by_key: dict[tuple[int, float], dict] = {}
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
if not isinstance(source, MeshMonitorDataSource):
continue
for item in source.telemetry:
node_num = _extract_node_num(item)
timestamp = item.get("timestamp") or item.get("time") or item.get("ts")
if node_num is None or timestamp is None:
# Can't deduplicate
tagged = dict(item)
tagged["_source"] = name
all_telemetry.append(tagged)
return all_telemetry
tagged["_sources"] = [name]
pseudo_key = (-len(telemetry_by_key) - 1, 0.0)
telemetry_by_key[pseudo_key] = tagged
continue
key = (node_num, float(timestamp))
if key in telemetry_by_key:
existing = telemetry_by_key[key]
if name not in existing["_sources"]:
existing["_sources"].append(name)
for k, v in item.items():
if k != "_sources" and v is not None:
existing[k] = v
else:
tagged = dict(item)
tagged["_sources"] = [name]
telemetry_by_key[key] = tagged
return list(telemetry_by_key.values())
def get_all_traceroutes(self) -> list[dict]:
"""Get traceroutes from all MeshMonitor sources, tagged with source name.
Returns:
List of traceroute dicts with '_source' field added
List of traceroute dicts with '_sources' field
"""
all_traceroutes = []
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
for item in source.traceroutes:
tagged = dict(item)
tagged["_source"] = name
tagged["_sources"] = [name]
all_traceroutes.append(tagged)
return all_traceroutes
@ -146,14 +327,14 @@ class MeshSourceManager:
"""Get channels from all MeshMonitor sources, tagged with source name.
Returns:
List of channel dicts with '_source' field added
List of channel dicts with '_sources' field
"""
all_channels = []
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
for item in source.channels:
tagged = dict(item)
tagged["_source"] = name
tagged["_sources"] = [name]
all_channels.append(tagged)
return all_channels
@ -186,6 +367,61 @@ class MeshSourceManager:
return status_list
def get_stats_by_source(self) -> dict[str, dict]:
"""Get per-source statistics without summing across sources.
Returns:
Dict mapping source name to stats dict containing:
- node_count: Number of nodes from this source
- edge_count: Number of edges (Meshview only)
- telemetry_count: Number of telemetry records (MeshMonitor only)
- is_loaded: Whether source has data
"""
stats = {}
for name, source in self._sources.items():
source_stats = {
"node_count": len(source.nodes),
"is_loaded": source.is_loaded,
"last_refresh": source.last_refresh,
}
if isinstance(source, MeshviewSource):
source_stats["edge_count"] = len(source.edges)
source_stats["type"] = "meshview"
elif isinstance(source, MeshMonitorDataSource):
source_stats["telemetry_count"] = len(source.telemetry)
source_stats["traceroute_count"] = len(source.traceroutes)
source_stats["channel_count"] = len(source.channels)
source_stats["type"] = "meshmonitor"
stats[name] = source_stats
return stats
def get_dedup_stats(self) -> dict:
"""Get deduplication statistics.
Returns:
Dict with raw and deduplicated counts
"""
raw_nodes = sum(len(s.nodes) for s in self._sources.values())
raw_edges = sum(
len(s.edges) for s in self._sources.values()
if isinstance(s, MeshviewSource)
)
dedup_nodes = len(self.get_all_nodes())
dedup_edges = len(self.get_all_edges())
return {
"raw_node_count": raw_nodes,
"dedup_node_count": dedup_nodes,
"node_duplicates": raw_nodes - dedup_nodes,
"raw_edge_count": raw_edges,
"dedup_edge_count": dedup_edges,
"edge_duplicates": raw_edges - dedup_edges,
}
@property
def source_count(self) -> int:
"""Get number of active sources."""