mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-21 23:24:44 +02:00
feat: Feeder-level gateway awareness from /api/packets_seen
Samples recent packets and calls /api/packets_seen to discover which physical MQTT gateways hear each node. Per-gateway RSSI and SNR. UnifiedNode: - feeder_gateways list with gateway_id, gateway_name, avg_rssi, avg_snr - feeder_count, feeder_best (strongest signal), feeder_worst MeshviewSource: - Added feeders to ENDPOINT_SCHEDULE (every 20 ticks / 10 min) - _fetch_feeders() samples 20 packets and queries packets_seen - Auto-disables if endpoint returns 404 MeshDataStore: - _enrich_feeder_data() aggregates gateway data across all sources - _normalize_node_id() helper for hex/decimal conversion - get_feeder_map() shows per-gateway coverage statistics - get_node_feeders() returns sorted gateway list for a node MeshReporter: - Node detail shows feeder gateways with signal strength - Tier 1 shows total unique gateways and avg per node Discovered gateways: AIDA, BKBS, STLR, N7MH, stor, JTS Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
b3c79f12da
commit
1d9c90911b
4 changed files with 568 additions and 239 deletions
|
|
@ -529,6 +529,22 @@ class MeshDataStore:
|
|||
# Normalization
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def _normalize_node_id(self, raw) -> Optional[int]:
|
||||
"""Convert raw node ID (hex string, decimal string, int) to int node_num."""
|
||||
if isinstance(raw, int):
|
||||
return raw
|
||||
if isinstance(raw, str):
|
||||
stripped = raw.lstrip("!")
|
||||
try:
|
||||
# Check if hex
|
||||
if any(c in "abcdefABCDEF" for c in stripped):
|
||||
return int(stripped, 16)
|
||||
return int(stripped)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
def _extract_node_num(self, raw: dict) -> Optional[int]:
|
||||
"""Extract canonical node number from various formats."""
|
||||
# MeshMonitor: nodeNum is the canonical field
|
||||
|
|
@ -865,6 +881,9 @@ class MeshDataStore:
|
|||
# Enrich with deliverability metrics
|
||||
self._enrich_deliverability()
|
||||
|
||||
# Enrich with feeder gateway data
|
||||
self._enrich_feeder_data()
|
||||
|
||||
# Enrich edges with SNR from traceroutes and node data
|
||||
self._enrich_edges_from_traceroutes()
|
||||
self._enrich_edges_from_node_snr()
|
||||
|
|
@ -1317,6 +1336,100 @@ class MeshDataStore:
|
|||
except Exception as e:
|
||||
logger.warning(f"Failed to enrich environmental data: {e}")
|
||||
|
||||
|
||||
def _enrich_feeder_data(self):
|
||||
"""Populate feeder gateway info on UnifiedNode from source sampling.
|
||||
|
||||
Each Meshview source has ONE gateway feeding it. By aggregating
|
||||
packets_seen data from ALL sources for the same sender node,
|
||||
we discover which gateways hear that node.
|
||||
"""
|
||||
# First, build a map of gateway node_id -> node info for name lookup
|
||||
gateway_names = {}
|
||||
for node in self._nodes.values():
|
||||
gateway_names[node.node_num] = node.short_name or node.long_name or ""
|
||||
|
||||
# Aggregate feeder data from all sources
|
||||
# Key insight: each source's feeder_data tells us that source's gateway heard certain nodes
|
||||
for source_name, source in self._sources.items():
|
||||
if not hasattr(source, 'feeder_data'):
|
||||
continue
|
||||
|
||||
feeder_data = source.feeder_data
|
||||
if not feeder_data:
|
||||
continue
|
||||
|
||||
for from_node_raw, gateways in feeder_data.items():
|
||||
# Normalize from_node to int
|
||||
node_num = self._normalize_node_id(from_node_raw)
|
||||
if node_num is None or node_num not in self._nodes:
|
||||
continue
|
||||
|
||||
node = self._nodes[node_num]
|
||||
|
||||
# Merge gateways from this source
|
||||
existing_gw_ids = {g["gateway_id"] for g in node.feeder_gateways}
|
||||
|
||||
for gw in gateways:
|
||||
gw_id = gw["gateway_id"]
|
||||
if gw_id in existing_gw_ids:
|
||||
# Merge signal data for existing gateway
|
||||
for existing_gw in node.feeder_gateways:
|
||||
if existing_gw["gateway_id"] == gw_id:
|
||||
# Average the signal values
|
||||
if gw.get("avg_rssi") is not None:
|
||||
if existing_gw.get("avg_rssi") is not None:
|
||||
existing_gw["avg_rssi"] = (existing_gw["avg_rssi"] + gw["avg_rssi"]) / 2
|
||||
else:
|
||||
existing_gw["avg_rssi"] = gw["avg_rssi"]
|
||||
if gw.get("avg_snr") is not None:
|
||||
if existing_gw.get("avg_snr") is not None:
|
||||
existing_gw["avg_snr"] = (existing_gw["avg_snr"] + gw["avg_snr"]) / 2
|
||||
else:
|
||||
existing_gw["avg_snr"] = gw["avg_snr"]
|
||||
existing_gw["packet_count"] = existing_gw.get("packet_count", 0) + gw.get("packet_count", 1)
|
||||
break
|
||||
else:
|
||||
# Add new gateway entry
|
||||
gw_node_id = self._normalize_node_id(gw_id)
|
||||
gw_name = gw.get("gateway_name") or ""
|
||||
if not gw_name and gw_node_id:
|
||||
gw_name = gateway_names.get(gw_node_id, gw.get("gateway_hex", gw_id))
|
||||
|
||||
node.feeder_gateways.append({
|
||||
"gateway_id": gw_id,
|
||||
"gateway_name": gw_name,
|
||||
"avg_rssi": gw.get("avg_rssi"),
|
||||
"avg_snr": gw.get("avg_snr"),
|
||||
"packet_count": gw.get("packet_count", 1),
|
||||
"source": source_name,
|
||||
})
|
||||
existing_gw_ids.add(gw_id)
|
||||
|
||||
# Update summary fields for all nodes with feeder data
|
||||
for node in self._nodes.values():
|
||||
if not node.feeder_gateways:
|
||||
continue
|
||||
|
||||
node.feeder_count = len(node.feeder_gateways)
|
||||
|
||||
# Best = strongest average RSSI (closest to 0)
|
||||
with_rssi = [g for g in node.feeder_gateways if g.get("avg_rssi") is not None]
|
||||
if with_rssi:
|
||||
best = max(with_rssi, key=lambda g: g["avg_rssi"])
|
||||
worst = min(with_rssi, key=lambda g: g["avg_rssi"])
|
||||
node.feeder_best = best.get("gateway_name") or best["gateway_id"]
|
||||
node.feeder_worst = worst.get("gateway_name") or worst["gateway_id"]
|
||||
|
||||
# Log summary
|
||||
nodes_with_feeders = sum(1 for n in self._nodes.values() if n.feeder_count > 0)
|
||||
if nodes_with_feeders > 0:
|
||||
total_gw = set()
|
||||
for n in self._nodes.values():
|
||||
for gw in n.feeder_gateways:
|
||||
total_gw.add(gw["gateway_id"])
|
||||
logger.info(f"Feeder enrichment: {nodes_with_feeders} nodes, {len(total_gw)} unique gateways")
|
||||
|
||||
def _enrich_deliverability(self) -> None:
|
||||
"""Compute deliverability metrics from node source overlap.
|
||||
|
||||
|
|
@ -2299,6 +2412,56 @@ class MeshDataStore:
|
|||
})
|
||||
return health
|
||||
|
||||
|
||||
def get_feeder_map(self) -> dict:
|
||||
"""Get which gateways hear which nodes.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"gateway_name": {
|
||||
"gateway_id": str,
|
||||
"nodes_heard": int,
|
||||
"avg_rssi": float,
|
||||
"regions_covered": list[str],
|
||||
},
|
||||
...
|
||||
}
|
||||
"""
|
||||
gw_map = {}
|
||||
for node in self._nodes.values():
|
||||
for gw in node.feeder_gateways:
|
||||
gw_name = gw.get("gateway_name") or gw["gateway_id"]
|
||||
if gw_name not in gw_map:
|
||||
gw_map[gw_name] = {
|
||||
"gateway_id": gw["gateway_id"],
|
||||
"nodes_heard": 0,
|
||||
"rssi_values": [],
|
||||
"regions": set(),
|
||||
}
|
||||
gw_map[gw_name]["nodes_heard"] += 1
|
||||
if gw.get("avg_rssi") is not None:
|
||||
gw_map[gw_name]["rssi_values"].append(gw["avg_rssi"])
|
||||
if node.region:
|
||||
gw_map[gw_name]["regions"].add(node.region)
|
||||
|
||||
# Compute averages
|
||||
result = {}
|
||||
for name, data in gw_map.items():
|
||||
result[name] = {
|
||||
"gateway_id": data["gateway_id"],
|
||||
"nodes_heard": data["nodes_heard"],
|
||||
"avg_rssi": sum(data["rssi_values"]) / len(data["rssi_values"]) if data["rssi_values"] else None,
|
||||
"regions_covered": sorted(data["regions"]),
|
||||
}
|
||||
return result
|
||||
|
||||
def get_node_feeders(self, node_num: int) -> list:
|
||||
"""Get feeder gateways for a specific node, sorted by signal strength."""
|
||||
node = self._nodes.get(node_num)
|
||||
if node and node.feeder_gateways:
|
||||
return sorted(node.feeder_gateways, key=lambda g: g.get("avg_rssi") or -999, reverse=True)
|
||||
return []
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close database connection."""
|
||||
if self._db:
|
||||
|
|
|
|||
|
|
@ -118,6 +118,13 @@ class UnifiedNode:
|
|||
|
||||
# Deliverability / Coverage (from Meshview gateway counts)
|
||||
avg_gateways: Optional[float] = None # Avg unique gateways that hear this node's packets
|
||||
|
||||
# Feeder-level gateway awareness (from /api/packets_seen across sources)
|
||||
feeder_gateways: list = field(default_factory=list)
|
||||
# Each entry: {"gateway_id": str, "gateway_name": str, "avg_rssi": float, "avg_snr": float, "packet_count": int}
|
||||
feeder_count: int = 0 # Number of unique physical gateways that hear this node
|
||||
feeder_best: Optional[str] = None # Gateway with strongest signal to this node
|
||||
feeder_worst: Optional[str] = None # Gateway with weakest signal
|
||||
deliverability_score: Optional[float] = None # % of packets reaching 2+ gateways (0-100)
|
||||
max_gateways: Optional[int] = None # Max gateways any single packet reached
|
||||
source_reach: Optional[float] = None # Avg number of Meshview sources that see this node's packets
|
||||
|
|
|
|||
|
|
@ -457,6 +457,19 @@ class MeshReporter:
|
|||
if pb["critical"]: parts.append(f"{pb['critical']} battery critical")
|
||||
lines.append(f"POWER (infra): {', '.join(parts)}")
|
||||
|
||||
# Feeder gateway summary
|
||||
all_nodes = list(health.nodes.values())
|
||||
nodes_with_feeders = [n for n in all_nodes if getattr(n, 'feeder_count', 0) > 0]
|
||||
if nodes_with_feeders:
|
||||
total_unique_gw = set()
|
||||
for n in nodes_with_feeders:
|
||||
for gw in n.feeder_gateways:
|
||||
total_unique_gw.add(gw["gateway_id"])
|
||||
|
||||
avg_feeders = sum(n.feeder_count for n in nodes_with_feeders) / len(nodes_with_feeders)
|
||||
lines.append("")
|
||||
lines.append(f"FEEDERS: {len(total_unique_gw)} physical gateways, avg {avg_feeders:.1f} per node ({len(nodes_with_feeders)} nodes sampled)")
|
||||
|
||||
# Source health section
|
||||
lines.extend(self._build_source_health_section())
|
||||
|
||||
|
|
@ -935,6 +948,27 @@ class MeshReporter:
|
|||
if node.sources:
|
||||
lines.append(f" Seen by: {', '.join(node.sources)} ({len(node.sources)} sources)")
|
||||
|
||||
# Feeder gateways
|
||||
if node.feeder_gateways:
|
||||
lines.append(f" Feeders ({node.feeder_count} gateways):")
|
||||
# Sort by signal strength (best RSSI first, closest to 0)
|
||||
sorted_gw = sorted(
|
||||
node.feeder_gateways,
|
||||
key=lambda g: g.get("avg_rssi") or -999,
|
||||
reverse=True
|
||||
)
|
||||
for gw in sorted_gw[:8]: # Top 8
|
||||
name_str = gw.get("gateway_name") or gw["gateway_id"]
|
||||
parts = []
|
||||
if gw.get("avg_rssi") is not None:
|
||||
parts.append(f"RSSI {gw['avg_rssi']:.0f}")
|
||||
if gw.get("avg_snr") is not None:
|
||||
parts.append(f"SNR {gw['avg_snr']:.1f}")
|
||||
sig_str = f" [{', '.join(parts)}]" if parts else ""
|
||||
lines.append(f" {name_str}{sig_str}")
|
||||
if len(sorted_gw) > 8:
|
||||
lines.append(f" ...and {len(sorted_gw) - 8} more")
|
||||
|
||||
# Neighbors section
|
||||
if node.neighbors:
|
||||
lines.append("")
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ class MeshviewSource:
|
|||
("edges", 6), # Every 6 ticks (3 min)
|
||||
("counts", 8), # Every 8 ticks (4 min)
|
||||
("traceroutes", 10), # Every 10 ticks (5 min)
|
||||
("feeders", 20), # Every 20 ticks (10 min) — sample packets_seen for gateway data
|
||||
]
|
||||
|
||||
def __init__(self, url: str, refresh_interval: int = 30, polite_mode: bool = False):
|
||||
|
|
@ -64,11 +65,15 @@ class MeshviewSource:
|
|||
self._is_loaded: bool = False
|
||||
self._data_changed: bool = False
|
||||
|
||||
# Feeder gateway data from packets_seen
|
||||
self._feeder_data: dict = {} # {from_node: [{gateway_id, gateway_name, avg_rssi, avg_snr, packet_count}]}
|
||||
|
||||
# Capabilities (discovered on first fetch)
|
||||
self._capabilities: dict = {
|
||||
"packets": True,
|
||||
"packets_since": False,
|
||||
"traceroutes": True,
|
||||
"packets_seen": True,
|
||||
}
|
||||
self._capabilities_probed: bool = False
|
||||
|
||||
|
|
@ -180,6 +185,8 @@ class MeshviewSource:
|
|||
success = self._fetch_counts()
|
||||
elif endpoint == "traceroutes":
|
||||
success = self._fetch_traceroutes()
|
||||
elif endpoint == "feeders":
|
||||
success = self._fetch_feeders()
|
||||
|
||||
if success:
|
||||
self._consecutive_errors = 0
|
||||
|
|
@ -389,6 +396,124 @@ class MeshviewSource:
|
|||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _fetch_feeders(self) -> bool:
|
||||
"""Sample recent packets to discover which gateway hears them.
|
||||
|
||||
Calls /api/packets_seen/{packet_id} for a sample of recent packets.
|
||||
Builds a per-node gateway map with signal quality.
|
||||
|
||||
Note: Each Meshview source has ONE gateway feeding it. To get multiple
|
||||
gateways, the data store aggregates feeder data across all sources.
|
||||
"""
|
||||
if not self._packets:
|
||||
return False
|
||||
|
||||
if not self._capabilities.get("packets_seen", True):
|
||||
return False
|
||||
|
||||
# Sample 20 recent packets spread across different senders
|
||||
by_sender = {}
|
||||
for pkt in reversed(self._packets): # Most recent first
|
||||
sender = pkt.get("from_node") or pkt.get("from") or pkt.get("from_node_id")
|
||||
if sender and sender not in by_sender:
|
||||
by_sender[sender] = pkt
|
||||
if len(by_sender) >= 20:
|
||||
break
|
||||
|
||||
sample_packets = list(by_sender.values())
|
||||
if not sample_packets:
|
||||
return False
|
||||
|
||||
# Build: node_gateways[from_node] = {gateway_id: {"rssi_values": [], "snr_values": []}}
|
||||
node_gateways: dict = {}
|
||||
errors = 0
|
||||
|
||||
for pkt in sample_packets:
|
||||
pkt_id = pkt.get("packet_id") or pkt.get("id")
|
||||
from_node = pkt.get("from_node") or pkt.get("from") or pkt.get("from_node_id")
|
||||
if not pkt_id or not from_node:
|
||||
continue
|
||||
|
||||
seen_data = self._fetch_with_tracking(f"/api/packets_seen/{pkt_id}")
|
||||
if seen_data is None:
|
||||
errors += 1
|
||||
if errors >= 3:
|
||||
# Endpoint probably doesn't exist
|
||||
self._capabilities["packets_seen"] = False
|
||||
logger.info(f"Meshview {self._url}: packets_seen not available, disabling")
|
||||
return False
|
||||
continue
|
||||
|
||||
# Extract gateway list from {"seen": [...]}
|
||||
gateways = []
|
||||
if isinstance(seen_data, dict):
|
||||
gateways = seen_data.get("seen", [])
|
||||
elif isinstance(seen_data, list):
|
||||
gateways = seen_data
|
||||
|
||||
if not gateways:
|
||||
continue
|
||||
|
||||
if from_node not in node_gateways:
|
||||
node_gateways[from_node] = {}
|
||||
|
||||
for gw in gateways:
|
||||
# Fields from API: node_id, rx_snr, rx_rssi, topic
|
||||
gw_node_id = gw.get("node_id")
|
||||
if not gw_node_id:
|
||||
continue
|
||||
|
||||
gw_id = str(gw_node_id)
|
||||
rssi = gw.get("rx_rssi")
|
||||
snr = gw.get("rx_snr")
|
||||
|
||||
# Extract hex ID from topic for gateway name lookup
|
||||
# topic format: "msh/US/2/e/Freq51/!27780c47"
|
||||
topic = gw.get("topic", "")
|
||||
gw_hex = ""
|
||||
if topic and "!" in topic:
|
||||
gw_hex = topic.split("!")[-1] if "!" in topic else ""
|
||||
gw_hex = "!" + gw_hex if gw_hex else ""
|
||||
|
||||
if gw_id not in node_gateways[from_node]:
|
||||
node_gateways[from_node][gw_id] = {
|
||||
"gateway_id": gw_id,
|
||||
"gateway_hex": gw_hex,
|
||||
"rssi_values": [],
|
||||
"snr_values": [],
|
||||
}
|
||||
|
||||
if rssi is not None:
|
||||
node_gateways[from_node][gw_id]["rssi_values"].append(rssi)
|
||||
if snr is not None:
|
||||
node_gateways[from_node][gw_id]["snr_values"].append(snr)
|
||||
|
||||
# Aggregate into feeder_data
|
||||
self._feeder_data = {}
|
||||
for from_node, gateways in node_gateways.items():
|
||||
self._feeder_data[from_node] = []
|
||||
for gw_id, gw_info in gateways.items():
|
||||
avg_rssi = sum(gw_info["rssi_values"]) / len(gw_info["rssi_values"]) if gw_info["rssi_values"] else None
|
||||
avg_snr = sum(gw_info["snr_values"]) / len(gw_info["snr_values"]) if gw_info["snr_values"] else None
|
||||
self._feeder_data[from_node].append({
|
||||
"gateway_id": gw_id,
|
||||
"gateway_hex": gw_info["gateway_hex"],
|
||||
"gateway_name": "", # Will be filled in by data store from node lookup
|
||||
"avg_rssi": avg_rssi,
|
||||
"avg_snr": avg_snr,
|
||||
"packet_count": len(gw_info["rssi_values"]) or len(gw_info["snr_values"]) or 1,
|
||||
})
|
||||
|
||||
self._data_changed = True
|
||||
logger.info(f"Feeder sampling: {len(sample_packets)} packets, {len(self._feeder_data)} nodes with gateway data")
|
||||
return True
|
||||
|
||||
@property
|
||||
def feeder_data(self) -> dict:
|
||||
"""Get per-node feeder gateway data."""
|
||||
return self._feeder_data
|
||||
|
||||
def fetch_all(self) -> bool:
|
||||
"""Fetch all data at once. Used for initial load and force refresh."""
|
||||
success = 0
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue