mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-21 23:24:44 +02:00
fix: Fundamental ID matching — packets, telemetry, and utilization now work
Root cause: health engine keyed nodes by database row IDs instead of Meshtastic node numbers. Packets and telemetry could never match. Fixed: - Store _node_num on all normalized nodes (mesh_sources.py) - Key health engine node dict by _node_num (mesh_health.py) - Fix packet field names: from_node not from/fromId - Fix telemetry parsing: handle telemetryType/value structure - Increase packet/telemetry fetch limits for 24h coverage - Fix utilization formula to compute actual airtime percentage
This commit is contained in:
parent
3959444a09
commit
8c3b6a1f09
3 changed files with 162 additions and 244 deletions
|
|
@ -281,28 +281,27 @@ class MeshHealthEngine:
|
||||||
# Aggregate all nodes from all sources
|
# Aggregate all nodes from all sources
|
||||||
all_nodes = source_manager.get_all_nodes()
|
all_nodes = source_manager.get_all_nodes()
|
||||||
all_telemetry = source_manager.get_all_telemetry()
|
all_telemetry = source_manager.get_all_telemetry()
|
||||||
all_packets = []
|
|
||||||
|
|
||||||
# Get packets from MeshMonitor sources (if available)
|
# FIX: Use aggregator method for deduped packets
|
||||||
for status in source_manager.get_status():
|
all_packets = source_manager.get_all_packets()
|
||||||
if status["type"] == "meshmonitor":
|
|
||||||
src = source_manager.get_source(status["name"])
|
|
||||||
if src and hasattr(src, "packets"):
|
|
||||||
for pkt in src.packets:
|
|
||||||
tagged = dict(pkt)
|
|
||||||
tagged["_source"] = status["name"]
|
|
||||||
all_packets.append(tagged)
|
|
||||||
|
|
||||||
# Track if we have packet data for utilization calculation
|
# Track if we have packet data for utilization calculation
|
||||||
has_packet_data = len(all_packets) > 0
|
has_packet_data = len(all_packets) > 0
|
||||||
|
|
||||||
# Build node health records
|
# Build node health records
|
||||||
|
# BUG 2 FIX: Use _node_num as the canonical key
|
||||||
nodes: dict[str, NodeHealth] = {}
|
nodes: dict[str, NodeHealth] = {}
|
||||||
for node in all_nodes:
|
for node in all_nodes:
|
||||||
node_id = node.get("id") or node.get("nodeId") or node.get("num")
|
# Use _node_num set by source manager (canonical Meshtastic node number)
|
||||||
if not node_id:
|
node_num = node.get("_node_num")
|
||||||
continue
|
if node_num is not None:
|
||||||
node_id = str(node_id)
|
node_id = str(node_num)
|
||||||
|
else:
|
||||||
|
# Fallback for nodes without _node_num
|
||||||
|
node_id = node.get("nodeNum") or node.get("id") or node.get("nodeId") or node.get("num")
|
||||||
|
if not node_id:
|
||||||
|
continue
|
||||||
|
node_id = str(node_id)
|
||||||
|
|
||||||
# Skip if we already have this node from another source
|
# Skip if we already have this node from another source
|
||||||
if node_id in nodes:
|
if node_id in nodes:
|
||||||
|
|
@ -363,28 +362,79 @@ class MeshHealthEngine:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Add telemetry data
|
# Add telemetry data
|
||||||
|
# BUG 4 & 5 FIX: Handle MeshMonitor telemetryType/value structure
|
||||||
for telem in all_telemetry:
|
for telem in all_telemetry:
|
||||||
node_id = str(telem.get("nodeId") or telem.get("node_id") or "")
|
# Get node number - try decimal first, then hex
|
||||||
|
node_num = telem.get("nodeNum")
|
||||||
|
if node_num is not None:
|
||||||
|
node_id = str(int(node_num))
|
||||||
|
else:
|
||||||
|
node_hex = telem.get("nodeId") or telem.get("node_id") or ""
|
||||||
|
if isinstance(node_hex, str) and node_hex:
|
||||||
|
stripped = node_hex.lstrip("!")
|
||||||
|
try:
|
||||||
|
node_id = str(int(stripped, 16))
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
if node_id not in nodes:
|
if node_id not in nodes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
node = nodes[node_id]
|
node = nodes[node_id]
|
||||||
battery = telem.get("batteryLevel") or telem.get("battery_level")
|
|
||||||
voltage = telem.get("voltage")
|
|
||||||
|
|
||||||
if battery is not None:
|
# Handle MeshMonitor telemetryType/value structure
|
||||||
node.battery_percent = float(battery)
|
telem_type = (telem.get("telemetryType") or "").lower()
|
||||||
if voltage is not None:
|
value = telem.get("value")
|
||||||
node.voltage = float(voltage)
|
|
||||||
|
|
||||||
# Extract channel utilization and air_util_tx from device metrics
|
if telem_type and value is not None:
|
||||||
ch_util = telem.get("channelUtilization") or telem.get("channel_utilization")
|
try:
|
||||||
if ch_util is not None:
|
value = float(value)
|
||||||
node.channel_utilization = float(ch_util)
|
except (ValueError, TypeError):
|
||||||
|
value = None
|
||||||
|
|
||||||
air_tx = telem.get("airUtilTx") or telem.get("air_util_tx")
|
if value is not None:
|
||||||
if air_tx is not None:
|
if telem_type in ("batterylevel", "battery_level", "battery"):
|
||||||
node.air_util_tx = float(air_tx)
|
node.battery_percent = value
|
||||||
|
elif telem_type == "voltage":
|
||||||
|
node.voltage = value
|
||||||
|
elif telem_type in ("channelutilization", "channel_utilization"):
|
||||||
|
node.channel_utilization = value
|
||||||
|
elif telem_type in ("airutiltx", "air_util_tx"):
|
||||||
|
node.air_util_tx = value
|
||||||
|
elif telem_type in ("uplinkenabled", "uplink_enabled"):
|
||||||
|
node.uplink_enabled = bool(value)
|
||||||
|
|
||||||
|
# Also try direct field access as fallback (for flat telemetry objects)
|
||||||
|
if node.battery_percent is None:
|
||||||
|
bat = telem.get("batteryLevel") or telem.get("battery_level")
|
||||||
|
if bat is not None:
|
||||||
|
try:
|
||||||
|
node.battery_percent = float(bat)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
if node.voltage is None:
|
||||||
|
vol = telem.get("voltage")
|
||||||
|
if vol is not None:
|
||||||
|
try:
|
||||||
|
node.voltage = float(vol)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
if node.channel_utilization is None:
|
||||||
|
ch_util = telem.get("channelUtilization") or telem.get("channel_utilization")
|
||||||
|
if ch_util is not None:
|
||||||
|
try:
|
||||||
|
node.channel_utilization = float(ch_util)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
if node.air_util_tx is None:
|
||||||
|
air_tx = telem.get("airUtilTx") or telem.get("air_util_tx")
|
||||||
|
if air_tx is not None:
|
||||||
|
try:
|
||||||
|
node.air_util_tx = float(air_tx)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
# Check for uplink (MQTT) enabled
|
# Check for uplink (MQTT) enabled
|
||||||
uplink = telem.get("uplinkEnabled") or telem.get("uplink_enabled")
|
uplink = telem.get("uplinkEnabled") or telem.get("uplink_enabled")
|
||||||
|
|
@ -392,20 +442,41 @@ class MeshHealthEngine:
|
||||||
node.uplink_enabled = True
|
node.uplink_enabled = True
|
||||||
|
|
||||||
# Count packets per node (last 24h) with portnum breakdown
|
# Count packets per node (last 24h) with portnum breakdown
|
||||||
|
# BUG 3 FIX: Use correct MeshMonitor packet field names
|
||||||
twenty_four_hours_ago = now - 86400
|
twenty_four_hours_ago = now - 86400
|
||||||
for pkt in all_packets:
|
for pkt in all_packets:
|
||||||
pkt_time = pkt.get("timestamp") or pkt.get("rxTime") or 0
|
pkt_time = pkt.get("timestamp") or pkt.get("rxTime") or 0
|
||||||
if pkt_time < twenty_four_hours_ago:
|
if pkt_time < twenty_four_hours_ago:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
from_id = str(pkt.get("from") or pkt.get("fromId") or "")
|
# Extract from_node using multiple possible field names
|
||||||
|
from_raw = pkt.get("from_node") or pkt.get("from") or pkt.get("fromId") or pkt.get("from_node_id")
|
||||||
|
if from_raw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Normalize to canonical node number string
|
||||||
|
if isinstance(from_raw, int):
|
||||||
|
from_id = str(from_raw)
|
||||||
|
elif isinstance(from_raw, str):
|
||||||
|
# Could be hex like "!a1b2c3d4" or decimal string
|
||||||
|
stripped = from_raw.lstrip("!")
|
||||||
|
try:
|
||||||
|
from_id = str(int(stripped, 16))
|
||||||
|
except ValueError:
|
||||||
|
if stripped.isdigit():
|
||||||
|
from_id = stripped
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
if from_id not in nodes:
|
if from_id not in nodes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
nodes[from_id].packet_count_24h += 1
|
nodes[from_id].packet_count_24h += 1
|
||||||
|
|
||||||
# Get portnum for breakdown
|
# Get portnum for breakdown
|
||||||
port_num = pkt.get("portnum") or pkt.get("port_num") or pkt.get("portnum_name") or ""
|
port_num = pkt.get("portnum_name") or pkt.get("portnum") or pkt.get("port_num") or ""
|
||||||
port_name = str(port_num).upper()
|
port_name = str(port_num).upper()
|
||||||
|
|
||||||
# Track by portnum
|
# Track by portnum
|
||||||
|
|
@ -671,24 +742,27 @@ class MeshHealthEngine:
|
||||||
infra_score = 100.0 # No infrastructure = not penalized
|
infra_score = 100.0 # No infrastructure = not penalized
|
||||||
|
|
||||||
# Channel utilization (based on packet counts if available)
|
# Channel utilization (based on packet counts if available)
|
||||||
|
# BUG 7 FIX: Use actual Meshtastic airtime calculation
|
||||||
if has_packet_data:
|
if has_packet_data:
|
||||||
total_packets = sum(n.packet_count_24h for n in node_list)
|
total_non_text_packets = sum(n.non_text_packets for n in node_list)
|
||||||
baseline = len(node_list) * 500
|
# Average airtime per packet on MediumFast: ~200ms
|
||||||
if baseline > 0:
|
# Total available airtime per hour: 3,600,000ms
|
||||||
util_percent = (total_packets / baseline) * 15
|
# Utilization = (packets_per_hour * airtime_ms) / total_airtime_ms * 100
|
||||||
else:
|
packets_per_hour = total_non_text_packets / 24.0 # 24h window
|
||||||
util_percent = 0
|
airtime_per_packet_ms = 200 # ~200ms on MediumFast preset
|
||||||
|
util_percent = (packets_per_hour * airtime_per_packet_ms) / 3_600_000 * 100
|
||||||
|
|
||||||
if util_percent < UTIL_HEALTHY:
|
# Apply scoring thresholds with interpolation
|
||||||
|
if util_percent < UTIL_HEALTHY: # <15%
|
||||||
util_score = 100.0
|
util_score = 100.0
|
||||||
elif util_percent < UTIL_CAUTION:
|
elif util_percent < UTIL_CAUTION: # 15-20%
|
||||||
util_score = 75.0
|
util_score = 100.0 - ((util_percent - UTIL_HEALTHY) / (UTIL_CAUTION - UTIL_HEALTHY)) * 25
|
||||||
elif util_percent < UTIL_WARNING:
|
elif util_percent < UTIL_WARNING: # 20-25%
|
||||||
util_score = 50.0
|
util_score = 75.0 - ((util_percent - UTIL_CAUTION) / (UTIL_WARNING - UTIL_CAUTION)) * 25
|
||||||
elif util_percent < UTIL_UNHEALTHY:
|
elif util_percent < UTIL_UNHEALTHY: # 25-35%
|
||||||
util_score = 25.0
|
util_score = 50.0 - ((util_percent - UTIL_WARNING) / (UTIL_UNHEALTHY - UTIL_WARNING)) * 25
|
||||||
else:
|
else: # 35%+
|
||||||
util_score = 0.0
|
util_score = max(0.0, 25.0 - ((util_percent - UTIL_UNHEALTHY) / 10) * 25)
|
||||||
else:
|
else:
|
||||||
# No packet data available - assume healthy utilization
|
# No packet data available - assume healthy utilization
|
||||||
# This prevents penalizing the score when we simply don't have data
|
# This prevents penalizing the score when we simply don't have data
|
||||||
|
|
|
||||||
|
|
@ -30,21 +30,8 @@ MESHTASTIC_ROLE_MAP = {
|
||||||
|
|
||||||
|
|
||||||
def _normalize_node(node: dict) -> dict:
|
def _normalize_node(node: dict) -> dict:
|
||||||
"""Normalize a node dict to consistent field names and formats.
|
"""Normalize a node dict to consistent field names and formats."""
|
||||||
|
result = dict(node)
|
||||||
Handles differences between Meshview and MeshMonitor APIs:
|
|
||||||
- Role: integer enums -> string names
|
|
||||||
- GPS: last_lat/last_long -> latitude/longitude
|
|
||||||
- Timestamps: various formats -> last_heard (epoch seconds)
|
|
||||||
- Hardware: hw_model/hwModel -> hw_model (string preferred)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node: Raw node dict from any source
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Copy of node with normalized fields added
|
|
||||||
"""
|
|
||||||
result = dict(node) # Keep all original fields
|
|
||||||
|
|
||||||
# === ROLE NORMALIZATION ===
|
# === ROLE NORMALIZATION ===
|
||||||
role = node.get("role")
|
role = node.get("role")
|
||||||
|
|
@ -58,25 +45,21 @@ def _normalize_node(node: dict) -> dict:
|
||||||
result["role"] = str(role).upper()
|
result["role"] = str(role).upper()
|
||||||
|
|
||||||
# === GPS NORMALIZATION ===
|
# === GPS NORMALIZATION ===
|
||||||
# Latitude
|
|
||||||
lat = None
|
lat = None
|
||||||
if "latitude" in node and node["latitude"] is not None:
|
if "latitude" in node and node["latitude"] is not None:
|
||||||
lat = node["latitude"]
|
lat = node["latitude"]
|
||||||
elif "last_lat" in node and node["last_lat"] is not None:
|
elif "last_lat" in node and node["last_lat"] is not None:
|
||||||
lat = node["last_lat"]
|
lat = node["last_lat"]
|
||||||
# Meshview uses scaled integers (1e7)
|
|
||||||
if isinstance(lat, int) and abs(lat) > 1000:
|
if isinstance(lat, int) and abs(lat) > 1000:
|
||||||
lat = lat / 1e7
|
lat = lat / 1e7
|
||||||
elif "lat" in node and node["lat"] is not None:
|
elif "lat" in node and node["lat"] is not None:
|
||||||
lat = node["lat"]
|
lat = node["lat"]
|
||||||
|
|
||||||
# Longitude
|
|
||||||
lon = None
|
lon = None
|
||||||
if "longitude" in node and node["longitude"] is not None:
|
if "longitude" in node and node["longitude"] is not None:
|
||||||
lon = node["longitude"]
|
lon = node["longitude"]
|
||||||
elif "last_long" in node and node["last_long"] is not None:
|
elif "last_long" in node and node["last_long"] is not None:
|
||||||
lon = node["last_long"]
|
lon = node["last_long"]
|
||||||
# Meshview uses scaled integers (1e7)
|
|
||||||
if isinstance(lon, int) and abs(lon) > 1000:
|
if isinstance(lon, int) and abs(lon) > 1000:
|
||||||
lon = lon / 1e7
|
lon = lon / 1e7
|
||||||
elif "lon" in node and node["lon"] is not None:
|
elif "lon" in node and node["lon"] is not None:
|
||||||
|
|
@ -84,7 +67,6 @@ def _normalize_node(node: dict) -> dict:
|
||||||
elif "lng" in node and node["lng"] is not None:
|
elif "lng" in node and node["lng"] is not None:
|
||||||
lon = node["lng"]
|
lon = node["lng"]
|
||||||
|
|
||||||
# Filter out invalid GPS (0,0 or very close to 0)
|
|
||||||
if lat is not None and lon is not None:
|
if lat is not None and lon is not None:
|
||||||
if abs(lat) < 0.001 and abs(lon) < 0.001:
|
if abs(lat) < 0.001 and abs(lon) < 0.001:
|
||||||
lat = None
|
lat = None
|
||||||
|
|
@ -94,30 +76,22 @@ def _normalize_node(node: dict) -> dict:
|
||||||
result["longitude"] = lon
|
result["longitude"] = lon
|
||||||
|
|
||||||
# === TIMESTAMP NORMALIZATION ===
|
# === TIMESTAMP NORMALIZATION ===
|
||||||
# Normalize to "last_heard" as epoch seconds
|
|
||||||
ts = None
|
ts = None
|
||||||
|
|
||||||
# Check last_seen_us first (Meshview microseconds)
|
|
||||||
if "last_seen_us" in node and node["last_seen_us"] is not None:
|
if "last_seen_us" in node and node["last_seen_us"] is not None:
|
||||||
val = node["last_seen_us"]
|
val = node["last_seen_us"]
|
||||||
if isinstance(val, (int, float)) and val > 0:
|
if isinstance(val, (int, float)) and val > 0:
|
||||||
ts = val / 1_000_000 # Microseconds to seconds
|
ts = val / 1_000_000
|
||||||
|
|
||||||
# Check other timestamp fields
|
|
||||||
if ts is None:
|
if ts is None:
|
||||||
for field in ("lastHeard", "last_heard", "last_seen", "lastSeen", "updated_at"):
|
for field in ("lastHeard", "last_heard", "last_seen", "lastSeen", "updated_at"):
|
||||||
if field in node and node[field] is not None:
|
if field in node and node[field] is not None:
|
||||||
val = node[field]
|
val = node[field]
|
||||||
if isinstance(val, (int, float)) and val > 0:
|
if isinstance(val, (int, float)) and val > 0:
|
||||||
# Detect format by magnitude
|
|
||||||
if val > 1e15:
|
if val > 1e15:
|
||||||
# Microseconds
|
|
||||||
ts = val / 1_000_000
|
ts = val / 1_000_000
|
||||||
elif val > 1e12:
|
elif val > 1e12:
|
||||||
# Milliseconds
|
|
||||||
ts = val / 1_000
|
ts = val / 1_000
|
||||||
else:
|
else:
|
||||||
# Already epoch seconds
|
|
||||||
ts = float(val)
|
ts = float(val)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
@ -125,12 +99,10 @@ def _normalize_node(node: dict) -> dict:
|
||||||
|
|
||||||
# === HARDWARE MODEL NORMALIZATION ===
|
# === HARDWARE MODEL NORMALIZATION ===
|
||||||
hw = None
|
hw = None
|
||||||
# Prefer string hw_model
|
|
||||||
if "hw_model" in node and isinstance(node["hw_model"], str):
|
if "hw_model" in node and isinstance(node["hw_model"], str):
|
||||||
hw = node["hw_model"]
|
hw = node["hw_model"]
|
||||||
elif "hwModel" in node and isinstance(node["hwModel"], str):
|
elif "hwModel" in node and isinstance(node["hwModel"], str):
|
||||||
hw = node["hwModel"]
|
hw = node["hwModel"]
|
||||||
# Fall back to whatever is available
|
|
||||||
if hw is None:
|
if hw is None:
|
||||||
if "hw_model" in node and node["hw_model"] is not None:
|
if "hw_model" in node and node["hw_model"] is not None:
|
||||||
hw = node["hw_model"]
|
hw = node["hw_model"]
|
||||||
|
|
@ -144,20 +116,7 @@ def _normalize_node(node: dict) -> dict:
|
||||||
|
|
||||||
|
|
||||||
def _extract_node_num(node: dict) -> int | None:
|
def _extract_node_num(node: dict) -> int | None:
|
||||||
"""Extract numeric node ID from various formats.
|
"""Extract numeric node ID from various formats."""
|
||||||
|
|
||||||
Handles:
|
|
||||||
- nodeNum: 662178887 (numeric)
|
|
||||||
- node_id: "!27780c47" (hex with prefix)
|
|
||||||
- node_id: "27780c47" (hex without prefix)
|
|
||||||
- num: 662178887 (numeric field)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node: Node dict from any source
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Numeric node ID or None if not extractable
|
|
||||||
"""
|
|
||||||
# Try numeric fields first
|
# Try numeric fields first
|
||||||
for field in ("nodeNum", "num", "node_num"):
|
for field in ("nodeNum", "num", "node_num"):
|
||||||
if field in node:
|
if field in node:
|
||||||
|
|
@ -171,7 +130,6 @@ def _extract_node_num(node: dict) -> int | None:
|
||||||
if "node_id" in node:
|
if "node_id" in node:
|
||||||
nid = node["node_id"]
|
nid = node["node_id"]
|
||||||
if isinstance(nid, str):
|
if isinstance(nid, str):
|
||||||
# Strip leading ! if present
|
|
||||||
hex_str = nid.lstrip("!")
|
hex_str = nid.lstrip("!")
|
||||||
try:
|
try:
|
||||||
return int(hex_str, 16)
|
return int(hex_str, 16)
|
||||||
|
|
@ -180,43 +138,37 @@ def _extract_node_num(node: dict) -> int | None:
|
||||||
elif isinstance(nid, int):
|
elif isinstance(nid, int):
|
||||||
return nid
|
return nid
|
||||||
|
|
||||||
# Try generic id field
|
# Try generic id field (but NOT database row IDs)
|
||||||
if "id" in node:
|
if "id" in node:
|
||||||
val = node["id"]
|
val = node["id"]
|
||||||
if isinstance(val, int):
|
if isinstance(val, int):
|
||||||
return val
|
# Database row IDs are small; Meshtastic node numbers are large
|
||||||
|
if val > 100000:
|
||||||
|
return val
|
||||||
if isinstance(val, str):
|
if isinstance(val, str):
|
||||||
if val.isdigit():
|
if val.startswith("!"):
|
||||||
return int(val)
|
hex_str = val.lstrip("!")
|
||||||
# Might be hex
|
try:
|
||||||
hex_str = val.lstrip("!")
|
return int(hex_str, 16)
|
||||||
try:
|
except ValueError:
|
||||||
return int(hex_str, 16)
|
pass
|
||||||
except ValueError:
|
elif len(val) == 8:
|
||||||
pass
|
try:
|
||||||
|
return int(val, 16)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _normalize_edge_key(edge: dict) -> tuple[int, int] | None:
|
def _normalize_edge_key(edge: dict) -> tuple[int, int] | None:
|
||||||
"""Normalize edge to a canonical (from_num, to_num) tuple.
|
"""Normalize edge to a canonical (from_num, to_num) tuple."""
|
||||||
|
|
||||||
Edges are undirected for deduplication purposes, so
|
|
||||||
we return a sorted tuple (smaller_id, larger_id).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
edge: Edge dict from Meshview
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Sorted tuple of (from_num, to_num) or None if invalid
|
|
||||||
"""
|
|
||||||
from_num = edge.get("from_node") or edge.get("from") or edge.get("from_num")
|
from_num = edge.get("from_node") or edge.get("from") or edge.get("from_num")
|
||||||
to_num = edge.get("to_node") or edge.get("to") or edge.get("to_num")
|
to_num = edge.get("to_node") or edge.get("to") or edge.get("to_num")
|
||||||
|
|
||||||
if from_num is None or to_num is None:
|
if from_num is None or to_num is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Convert to int if string
|
|
||||||
if isinstance(from_num, str):
|
if isinstance(from_num, str):
|
||||||
if from_num.isdigit():
|
if from_num.isdigit():
|
||||||
from_num = int(from_num)
|
from_num = int(from_num)
|
||||||
|
|
@ -235,7 +187,6 @@ def _normalize_edge_key(edge: dict) -> tuple[int, int] | None:
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Return sorted tuple for consistent deduplication
|
|
||||||
return (min(from_num, to_num), max(from_num, to_num))
|
return (min(from_num, to_num), max(from_num, to_num))
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -243,11 +194,6 @@ class MeshSourceManager:
|
||||||
"""Manages multiple mesh data sources with deduplication."""
|
"""Manages multiple mesh data sources with deduplication."""
|
||||||
|
|
||||||
def __init__(self, source_configs: list[MeshSourceConfig]):
|
def __init__(self, source_configs: list[MeshSourceConfig]):
|
||||||
"""Initialize source manager.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
source_configs: List of source configurations
|
|
||||||
"""
|
|
||||||
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource] = {}
|
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource] = {}
|
||||||
|
|
||||||
for cfg in source_configs:
|
for cfg in source_configs:
|
||||||
|
|
@ -286,11 +232,6 @@ class MeshSourceManager:
|
||||||
logger.error(f"Failed to create source '{name}': {e}")
|
logger.error(f"Failed to create source '{name}': {e}")
|
||||||
|
|
||||||
def refresh_all(self) -> int:
|
def refresh_all(self) -> int:
|
||||||
"""Call maybe_refresh() on all sources.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Number of sources that refreshed
|
|
||||||
"""
|
|
||||||
refreshed = 0
|
refreshed = 0
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
try:
|
try:
|
||||||
|
|
@ -301,69 +242,40 @@ class MeshSourceManager:
|
||||||
return refreshed
|
return refreshed
|
||||||
|
|
||||||
def get_source(self, name: str) -> Optional[MeshviewSource | MeshMonitorDataSource]:
|
def get_source(self, name: str) -> Optional[MeshviewSource | MeshMonitorDataSource]:
|
||||||
"""Get a specific source by name.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: Source name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Source instance or None if not found
|
|
||||||
"""
|
|
||||||
return self._sources.get(name)
|
return self._sources.get(name)
|
||||||
|
|
||||||
def get_all_nodes(self) -> list[dict]:
|
def get_all_nodes(self) -> list[dict]:
|
||||||
"""Get deduplicated nodes from all sources.
|
"""Get deduplicated nodes from all sources with _node_num field."""
|
||||||
|
|
||||||
Nodes are normalized and deduplicated by their numeric node ID.
|
|
||||||
When a node appears in multiple sources, data is merged with:
|
|
||||||
- Most fields: last source wins
|
|
||||||
- _sources: accumulates all source names
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of deduplicated node dicts with '_sources' field (list)
|
|
||||||
"""
|
|
||||||
nodes_by_num: dict[int, dict] = {}
|
nodes_by_num: dict[int, dict] = {}
|
||||||
|
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
for node in source.nodes:
|
for node in source.nodes:
|
||||||
# Normalize the node data first
|
|
||||||
normalized = _normalize_node(node)
|
normalized = _normalize_node(node)
|
||||||
|
|
||||||
node_num = _extract_node_num(normalized)
|
node_num = _extract_node_num(normalized)
|
||||||
|
|
||||||
if node_num is None:
|
if node_num is None:
|
||||||
# Can't deduplicate, include as-is with source tag
|
|
||||||
normalized["_sources"] = [name]
|
normalized["_sources"] = [name]
|
||||||
# Use a negative counter as pseudo-key to avoid collisions
|
|
||||||
pseudo_key = -len(nodes_by_num) - 1
|
pseudo_key = -len(nodes_by_num) - 1
|
||||||
nodes_by_num[pseudo_key] = normalized
|
nodes_by_num[pseudo_key] = normalized
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# BUG 1 FIX: Store _node_num on the normalized dict
|
||||||
|
normalized["_node_num"] = node_num
|
||||||
|
|
||||||
if node_num in nodes_by_num:
|
if node_num in nodes_by_num:
|
||||||
# Merge: update existing with new data
|
|
||||||
existing = nodes_by_num[node_num]
|
existing = nodes_by_num[node_num]
|
||||||
# Add new source to sources list
|
|
||||||
if name not in existing["_sources"]:
|
if name not in existing["_sources"]:
|
||||||
existing["_sources"].append(name)
|
existing["_sources"].append(name)
|
||||||
# Update all fields except _sources
|
|
||||||
for key, value in normalized.items():
|
for key, value in normalized.items():
|
||||||
if key != "_sources" and value is not None:
|
if key not in ("_sources", "_node_num") and value is not None:
|
||||||
existing[key] = value
|
existing[key] = value
|
||||||
else:
|
else:
|
||||||
# New node
|
|
||||||
normalized["_sources"] = [name]
|
normalized["_sources"] = [name]
|
||||||
nodes_by_num[node_num] = normalized
|
nodes_by_num[node_num] = normalized
|
||||||
|
|
||||||
return list(nodes_by_num.values())
|
return list(nodes_by_num.values())
|
||||||
|
|
||||||
def get_all_edges(self) -> list[dict]:
|
def get_all_edges(self) -> list[dict]:
|
||||||
"""Get deduplicated edges from all Meshview sources.
|
|
||||||
|
|
||||||
Edges are deduplicated by (from_num, to_num) sorted tuple.
|
|
||||||
When an edge appears in multiple sources, data is merged.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of deduplicated edge dicts with '_sources' field
|
|
||||||
"""
|
|
||||||
edges_by_key: dict[tuple[int, int], dict] = {}
|
edges_by_key: dict[tuple[int, int], dict] = {}
|
||||||
|
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
|
|
@ -373,25 +285,20 @@ class MeshSourceManager:
|
||||||
for edge in source.edges:
|
for edge in source.edges:
|
||||||
edge_key = _normalize_edge_key(edge)
|
edge_key = _normalize_edge_key(edge)
|
||||||
if edge_key is None:
|
if edge_key is None:
|
||||||
# Can't deduplicate, include as-is
|
|
||||||
tagged = dict(edge)
|
tagged = dict(edge)
|
||||||
tagged["_sources"] = [name]
|
tagged["_sources"] = [name]
|
||||||
# Use a tuple with negative to avoid collision
|
|
||||||
pseudo_key = (-len(edges_by_key) - 1, 0)
|
pseudo_key = (-len(edges_by_key) - 1, 0)
|
||||||
edges_by_key[pseudo_key] = tagged
|
edges_by_key[pseudo_key] = tagged
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if edge_key in edges_by_key:
|
if edge_key in edges_by_key:
|
||||||
# Merge: update existing
|
|
||||||
existing = edges_by_key[edge_key]
|
existing = edges_by_key[edge_key]
|
||||||
if name not in existing["_sources"]:
|
if name not in existing["_sources"]:
|
||||||
existing["_sources"].append(name)
|
existing["_sources"].append(name)
|
||||||
# Update fields
|
|
||||||
for key, value in edge.items():
|
for key, value in edge.items():
|
||||||
if key != "_sources" and value is not None:
|
if key != "_sources" and value is not None:
|
||||||
existing[key] = value
|
existing[key] = value
|
||||||
else:
|
else:
|
||||||
# New edge
|
|
||||||
tagged = dict(edge)
|
tagged = dict(edge)
|
||||||
tagged["_sources"] = [name]
|
tagged["_sources"] = [name]
|
||||||
edges_by_key[edge_key] = tagged
|
edges_by_key[edge_key] = tagged
|
||||||
|
|
@ -399,14 +306,6 @@ class MeshSourceManager:
|
||||||
return list(edges_by_key.values())
|
return list(edges_by_key.values())
|
||||||
|
|
||||||
def get_all_telemetry(self) -> list[dict]:
|
def get_all_telemetry(self) -> list[dict]:
|
||||||
"""Get deduplicated telemetry from all MeshMonitor sources.
|
|
||||||
|
|
||||||
Telemetry is deduplicated by (node_num, timestamp) tuple.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of deduplicated telemetry dicts with '_sources' field
|
|
||||||
"""
|
|
||||||
# Key: (node_num, timestamp)
|
|
||||||
telemetry_by_key: dict[tuple[int, float], dict] = {}
|
telemetry_by_key: dict[tuple[int, float], dict] = {}
|
||||||
|
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
|
|
@ -418,7 +317,6 @@ class MeshSourceManager:
|
||||||
timestamp = item.get("timestamp") or item.get("time") or item.get("ts")
|
timestamp = item.get("timestamp") or item.get("time") or item.get("ts")
|
||||||
|
|
||||||
if node_num is None or timestamp is None:
|
if node_num is None or timestamp is None:
|
||||||
# Can't deduplicate
|
|
||||||
tagged = dict(item)
|
tagged = dict(item)
|
||||||
tagged["_sources"] = [name]
|
tagged["_sources"] = [name]
|
||||||
pseudo_key = (-len(telemetry_by_key) - 1, 0.0)
|
pseudo_key = (-len(telemetry_by_key) - 1, 0.0)
|
||||||
|
|
@ -442,11 +340,6 @@ class MeshSourceManager:
|
||||||
return list(telemetry_by_key.values())
|
return list(telemetry_by_key.values())
|
||||||
|
|
||||||
def get_all_traceroutes(self) -> list[dict]:
|
def get_all_traceroutes(self) -> list[dict]:
|
||||||
"""Get traceroutes from all MeshMonitor sources, tagged with source name.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of traceroute dicts with '_sources' field
|
|
||||||
"""
|
|
||||||
all_traceroutes = []
|
all_traceroutes = []
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
if isinstance(source, MeshMonitorDataSource):
|
if isinstance(source, MeshMonitorDataSource):
|
||||||
|
|
@ -457,11 +350,6 @@ class MeshSourceManager:
|
||||||
return all_traceroutes
|
return all_traceroutes
|
||||||
|
|
||||||
def get_all_channels(self) -> list[dict]:
|
def get_all_channels(self) -> list[dict]:
|
||||||
"""Get channels from all MeshMonitor sources, tagged with source name.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of channel dicts with '_sources' field
|
|
||||||
"""
|
|
||||||
all_channels = []
|
all_channels = []
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
if isinstance(source, MeshMonitorDataSource):
|
if isinstance(source, MeshMonitorDataSource):
|
||||||
|
|
@ -472,11 +360,6 @@ class MeshSourceManager:
|
||||||
return all_channels
|
return all_channels
|
||||||
|
|
||||||
def get_status(self) -> list[dict]:
|
def get_status(self) -> list[dict]:
|
||||||
"""Get status of all sources for TUI display.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of status dicts with source info
|
|
||||||
"""
|
|
||||||
status_list = []
|
status_list = []
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
status = {
|
status = {
|
||||||
|
|
@ -501,15 +384,6 @@ class MeshSourceManager:
|
||||||
return status_list
|
return status_list
|
||||||
|
|
||||||
def get_stats_by_source(self) -> dict[str, dict]:
|
def get_stats_by_source(self) -> dict[str, dict]:
|
||||||
"""Get per-source statistics without summing across sources.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dict mapping source name to stats dict containing:
|
|
||||||
- node_count: Number of nodes from this source
|
|
||||||
- edge_count: Number of edges (Meshview only)
|
|
||||||
- telemetry_count: Number of telemetry records (MeshMonitor only)
|
|
||||||
- is_loaded: Whether source has data
|
|
||||||
"""
|
|
||||||
stats = {}
|
stats = {}
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
source_stats = {
|
source_stats = {
|
||||||
|
|
@ -532,11 +406,6 @@ class MeshSourceManager:
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
def get_dedup_stats(self) -> dict:
|
def get_dedup_stats(self) -> dict:
|
||||||
"""Get deduplication statistics.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dict with raw and deduplicated counts
|
|
||||||
"""
|
|
||||||
raw_nodes = sum(len(s.nodes) for s in self._sources.values())
|
raw_nodes = sum(len(s.nodes) for s in self._sources.values())
|
||||||
raw_edges = sum(
|
raw_edges = sum(
|
||||||
len(s.edges) for s in self._sources.values()
|
len(s.edges) for s in self._sources.values()
|
||||||
|
|
@ -556,14 +425,6 @@ class MeshSourceManager:
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_all_packets(self) -> list[dict]:
|
def get_all_packets(self) -> list[dict]:
|
||||||
"""Get deduplicated packets from all MeshMonitor sources.
|
|
||||||
|
|
||||||
Packets are deduplicated by packet_id to avoid double-counting
|
|
||||||
when multiple sources report the same MQTT feed.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of packet dicts with '_sources' field
|
|
||||||
"""
|
|
||||||
packets_by_id: dict[int, dict] = {}
|
packets_by_id: dict[int, dict] = {}
|
||||||
|
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
|
|
@ -576,14 +437,12 @@ class MeshSourceManager:
|
||||||
for pkt in source.packets:
|
for pkt in source.packets:
|
||||||
packet_id = pkt.get("packet_id") or pkt.get("id")
|
packet_id = pkt.get("packet_id") or pkt.get("id")
|
||||||
if packet_id is None:
|
if packet_id is None:
|
||||||
# Fallback key: (from_node, timestamp, portnum)
|
|
||||||
from_node = pkt.get("from_node") or pkt.get("from")
|
from_node = pkt.get("from_node") or pkt.get("from")
|
||||||
ts = pkt.get("timestamp") or pkt.get("rxTime")
|
ts = pkt.get("timestamp") or pkt.get("rxTime")
|
||||||
portnum = pkt.get("portnum")
|
portnum = pkt.get("portnum")
|
||||||
if from_node and ts:
|
if from_node and ts:
|
||||||
packet_id = hash((from_node, ts, portnum))
|
packet_id = hash((from_node, ts, portnum))
|
||||||
else:
|
else:
|
||||||
# Can't deduplicate, use negative counter
|
|
||||||
packet_id = -len(packets_by_id) - 1
|
packet_id = -len(packets_by_id) - 1
|
||||||
|
|
||||||
if packet_id in packets_by_id:
|
if packet_id in packets_by_id:
|
||||||
|
|
@ -598,21 +457,12 @@ class MeshSourceManager:
|
||||||
return list(packets_by_id.values())
|
return list(packets_by_id.values())
|
||||||
|
|
||||||
def get_traffic_stats(self) -> dict[str, dict]:
|
def get_traffic_stats(self) -> dict[str, dict]:
|
||||||
"""Get traffic statistics from all sources.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dict mapping source name to traffic stats:
|
|
||||||
- hourly_counts: list of {period, count} for last 24h
|
|
||||||
- total_packets: total packet count
|
|
||||||
- packets_per_hour: average packets per hour
|
|
||||||
"""
|
|
||||||
stats = {}
|
stats = {}
|
||||||
|
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
source_stats = {}
|
source_stats = {}
|
||||||
|
|
||||||
if isinstance(source, MeshviewSource):
|
if isinstance(source, MeshviewSource):
|
||||||
# Meshview has stats with hourly breakdown
|
|
||||||
if hasattr(source, "stats") and source.stats:
|
if hasattr(source, "stats") and source.stats:
|
||||||
data = source.stats.get("data", [])
|
data = source.stats.get("data", [])
|
||||||
source_stats["hourly_counts"] = data
|
source_stats["hourly_counts"] = data
|
||||||
|
|
@ -625,7 +475,6 @@ class MeshSourceManager:
|
||||||
source_stats["total_packets_all_time"] = source.counts.get("total_packets", 0)
|
source_stats["total_packets_all_time"] = source.counts.get("total_packets", 0)
|
||||||
|
|
||||||
elif isinstance(source, MeshMonitorDataSource):
|
elif isinstance(source, MeshMonitorDataSource):
|
||||||
# MeshMonitor has network_stats
|
|
||||||
if hasattr(source, "network_stats") and source.network_stats:
|
if hasattr(source, "network_stats") and source.network_stats:
|
||||||
ns = source.network_stats
|
ns = source.network_stats
|
||||||
source_stats["total_nodes"] = ns.get("totalNodes", 0)
|
source_stats["total_nodes"] = ns.get("totalNodes", 0)
|
||||||
|
|
@ -633,7 +482,6 @@ class MeshSourceManager:
|
||||||
source_stats["traceroute_count"] = ns.get("tracerouteCount", 0)
|
source_stats["traceroute_count"] = ns.get("tracerouteCount", 0)
|
||||||
source_stats["last_updated"] = ns.get("lastUpdated", 0)
|
source_stats["last_updated"] = ns.get("lastUpdated", 0)
|
||||||
|
|
||||||
# Count packets by portnum for breakdown
|
|
||||||
if hasattr(source, "packets") and source.packets:
|
if hasattr(source, "packets") and source.packets:
|
||||||
portnum_counts: dict[str, int] = {}
|
portnum_counts: dict[str, int] = {}
|
||||||
for pkt in source.packets:
|
for pkt in source.packets:
|
||||||
|
|
@ -648,11 +496,6 @@ class MeshSourceManager:
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
def get_solar_data(self) -> list[dict]:
|
def get_solar_data(self) -> list[dict]:
|
||||||
"""Get solar/power data from all MeshMonitor sources.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of solar data dicts with '_sources' field
|
|
||||||
"""
|
|
||||||
all_solar = []
|
all_solar = []
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
if isinstance(source, MeshMonitorDataSource):
|
if isinstance(source, MeshMonitorDataSource):
|
||||||
|
|
@ -664,11 +507,6 @@ class MeshSourceManager:
|
||||||
return all_solar
|
return all_solar
|
||||||
|
|
||||||
def get_network_stats(self) -> dict[str, dict]:
|
def get_network_stats(self) -> dict[str, dict]:
|
||||||
"""Get network statistics from all sources.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dict mapping source name to network stats dict
|
|
||||||
"""
|
|
||||||
stats = {}
|
stats = {}
|
||||||
|
|
||||||
for name, source in self._sources.items():
|
for name, source in self._sources.items():
|
||||||
|
|
@ -699,10 +537,8 @@ class MeshSourceManager:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def source_count(self) -> int:
|
def source_count(self) -> int:
|
||||||
"""Get number of active sources."""
|
|
||||||
return len(self._sources)
|
return len(self._sources)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def source_names(self) -> list[str]:
|
def source_names(self) -> list[str]:
|
||||||
"""Get list of source names."""
|
|
||||||
return list(self._sources.keys())
|
return list(self._sources.keys())
|
||||||
|
|
|
||||||
|
|
@ -181,8 +181,11 @@ class MeshMonitorDataSource:
|
||||||
else:
|
else:
|
||||||
errors.append("channels")
|
errors.append("channels")
|
||||||
|
|
||||||
# Fetch telemetry
|
# Fetch telemetry - BUG 6 FIX: Request more records for 24h coverage
|
||||||
data = self._fetch_json("/api/v1/telemetry")
|
data = self._fetch_json("/api/v1/telemetry?limit=5000")
|
||||||
|
if data is None:
|
||||||
|
# Fallback without limit param
|
||||||
|
data = self._fetch_json("/api/v1/telemetry")
|
||||||
if data is not None:
|
if data is not None:
|
||||||
self._telemetry = data if isinstance(data, list) else []
|
self._telemetry = data if isinstance(data, list) else []
|
||||||
success_count += 1
|
success_count += 1
|
||||||
|
|
@ -190,8 +193,10 @@ class MeshMonitorDataSource:
|
||||||
else:
|
else:
|
||||||
errors.append("telemetry")
|
errors.append("telemetry")
|
||||||
|
|
||||||
# Fetch traceroutes
|
# Fetch traceroutes - BUG 6 FIX: Request more records
|
||||||
data = self._fetch_json("/api/v1/traceroutes")
|
data = self._fetch_json("/api/v1/traceroutes?limit=1000")
|
||||||
|
if data is None:
|
||||||
|
data = self._fetch_json("/api/v1/traceroutes")
|
||||||
if data is not None:
|
if data is not None:
|
||||||
self._traceroutes = data if isinstance(data, list) else []
|
self._traceroutes = data if isinstance(data, list) else []
|
||||||
success_count += 1
|
success_count += 1
|
||||||
|
|
@ -217,8 +222,11 @@ class MeshMonitorDataSource:
|
||||||
else:
|
else:
|
||||||
errors.append("topology")
|
errors.append("topology")
|
||||||
|
|
||||||
# Fetch packets
|
# Fetch packets - BUG 6 FIX: Request more packets for 24h coverage
|
||||||
data = self._fetch_json("/api/v1/packets")
|
data = self._fetch_json("/api/v1/packets?limit=5000")
|
||||||
|
if data is None:
|
||||||
|
# Fallback without limit param
|
||||||
|
data = self._fetch_json("/api/v1/packets")
|
||||||
if data is not None:
|
if data is not None:
|
||||||
self._packets = data if isinstance(data, list) else []
|
self._packets = data if isinstance(data, list) else []
|
||||||
success_count += 1
|
success_count += 1
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue