From 1d9c90911bec20cebb1cd7a1cbe72d2a95cd6bde Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Tue, 5 May 2026 19:55:23 +0000 Subject: [PATCH] feat: Feeder-level gateway awareness from /api/packets_seen Samples recent packets and calls /api/packets_seen to discover which physical MQTT gateways hear each node. Per-gateway RSSI and SNR. UnifiedNode: - feeder_gateways list with gateway_id, gateway_name, avg_rssi, avg_snr - feeder_count, feeder_best (strongest signal), feeder_worst MeshviewSource: - Added feeders to ENDPOINT_SCHEDULE (every 20 ticks / 10 min) - _fetch_feeders() samples 20 packets and queries packets_seen - Auto-disables if endpoint returns 404 MeshDataStore: - _enrich_feeder_data() aggregates gateway data across all sources - _normalize_node_id() helper for hex/decimal conversion - get_feeder_map() shows per-gateway coverage statistics - get_node_feeders() returns sorted gateway list for a node MeshReporter: - Node detail shows feeder gateways with signal strength - Tier 1 shows total unique gateways and avg per node Discovered gateways: AIDA, BKBS, STLR, N7MH, stor, JTS Co-Authored-By: Claude Opus 4.5 --- meshai/mesh_data_store.py | 163 +++++++++++++ meshai/mesh_models.py | 485 +++++++++++++++++++------------------ meshai/mesh_reporter.py | 34 +++ meshai/sources/meshview.py | 125 ++++++++++ 4 files changed, 568 insertions(+), 239 deletions(-) diff --git a/meshai/mesh_data_store.py b/meshai/mesh_data_store.py index da03074..4b8ba1e 100644 --- a/meshai/mesh_data_store.py +++ b/meshai/mesh_data_store.py @@ -529,6 +529,22 @@ class MeshDataStore: # Normalization # ========================================================================= + + def _normalize_node_id(self, raw) -> Optional[int]: + """Convert raw node ID (hex string, decimal string, int) to int node_num.""" + if isinstance(raw, int): + return raw + if isinstance(raw, str): + stripped = raw.lstrip("!") + try: + # Check if hex + if any(c in "abcdefABCDEF" for c in stripped): + return int(stripped, 16) + return int(stripped) + except ValueError: + return None + return None + def _extract_node_num(self, raw: dict) -> Optional[int]: """Extract canonical node number from various formats.""" # MeshMonitor: nodeNum is the canonical field @@ -865,6 +881,9 @@ class MeshDataStore: # Enrich with deliverability metrics self._enrich_deliverability() + # Enrich with feeder gateway data + self._enrich_feeder_data() + # Enrich edges with SNR from traceroutes and node data self._enrich_edges_from_traceroutes() self._enrich_edges_from_node_snr() @@ -1317,6 +1336,100 @@ class MeshDataStore: except Exception as e: logger.warning(f"Failed to enrich environmental data: {e}") + + def _enrich_feeder_data(self): + """Populate feeder gateway info on UnifiedNode from source sampling. + + Each Meshview source has ONE gateway feeding it. By aggregating + packets_seen data from ALL sources for the same sender node, + we discover which gateways hear that node. + """ + # First, build a map of gateway node_id -> node info for name lookup + gateway_names = {} + for node in self._nodes.values(): + gateway_names[node.node_num] = node.short_name or node.long_name or "" + + # Aggregate feeder data from all sources + # Key insight: each source's feeder_data tells us that source's gateway heard certain nodes + for source_name, source in self._sources.items(): + if not hasattr(source, 'feeder_data'): + continue + + feeder_data = source.feeder_data + if not feeder_data: + continue + + for from_node_raw, gateways in feeder_data.items(): + # Normalize from_node to int + node_num = self._normalize_node_id(from_node_raw) + if node_num is None or node_num not in self._nodes: + continue + + node = self._nodes[node_num] + + # Merge gateways from this source + existing_gw_ids = {g["gateway_id"] for g in node.feeder_gateways} + + for gw in gateways: + gw_id = gw["gateway_id"] + if gw_id in existing_gw_ids: + # Merge signal data for existing gateway + for existing_gw in node.feeder_gateways: + if existing_gw["gateway_id"] == gw_id: + # Average the signal values + if gw.get("avg_rssi") is not None: + if existing_gw.get("avg_rssi") is not None: + existing_gw["avg_rssi"] = (existing_gw["avg_rssi"] + gw["avg_rssi"]) / 2 + else: + existing_gw["avg_rssi"] = gw["avg_rssi"] + if gw.get("avg_snr") is not None: + if existing_gw.get("avg_snr") is not None: + existing_gw["avg_snr"] = (existing_gw["avg_snr"] + gw["avg_snr"]) / 2 + else: + existing_gw["avg_snr"] = gw["avg_snr"] + existing_gw["packet_count"] = existing_gw.get("packet_count", 0) + gw.get("packet_count", 1) + break + else: + # Add new gateway entry + gw_node_id = self._normalize_node_id(gw_id) + gw_name = gw.get("gateway_name") or "" + if not gw_name and gw_node_id: + gw_name = gateway_names.get(gw_node_id, gw.get("gateway_hex", gw_id)) + + node.feeder_gateways.append({ + "gateway_id": gw_id, + "gateway_name": gw_name, + "avg_rssi": gw.get("avg_rssi"), + "avg_snr": gw.get("avg_snr"), + "packet_count": gw.get("packet_count", 1), + "source": source_name, + }) + existing_gw_ids.add(gw_id) + + # Update summary fields for all nodes with feeder data + for node in self._nodes.values(): + if not node.feeder_gateways: + continue + + node.feeder_count = len(node.feeder_gateways) + + # Best = strongest average RSSI (closest to 0) + with_rssi = [g for g in node.feeder_gateways if g.get("avg_rssi") is not None] + if with_rssi: + best = max(with_rssi, key=lambda g: g["avg_rssi"]) + worst = min(with_rssi, key=lambda g: g["avg_rssi"]) + node.feeder_best = best.get("gateway_name") or best["gateway_id"] + node.feeder_worst = worst.get("gateway_name") or worst["gateway_id"] + + # Log summary + nodes_with_feeders = sum(1 for n in self._nodes.values() if n.feeder_count > 0) + if nodes_with_feeders > 0: + total_gw = set() + for n in self._nodes.values(): + for gw in n.feeder_gateways: + total_gw.add(gw["gateway_id"]) + logger.info(f"Feeder enrichment: {nodes_with_feeders} nodes, {len(total_gw)} unique gateways") + def _enrich_deliverability(self) -> None: """Compute deliverability metrics from node source overlap. @@ -2299,6 +2412,56 @@ class MeshDataStore: }) return health + + def get_feeder_map(self) -> dict: + """Get which gateways hear which nodes. + + Returns: + { + "gateway_name": { + "gateway_id": str, + "nodes_heard": int, + "avg_rssi": float, + "regions_covered": list[str], + }, + ... + } + """ + gw_map = {} + for node in self._nodes.values(): + for gw in node.feeder_gateways: + gw_name = gw.get("gateway_name") or gw["gateway_id"] + if gw_name not in gw_map: + gw_map[gw_name] = { + "gateway_id": gw["gateway_id"], + "nodes_heard": 0, + "rssi_values": [], + "regions": set(), + } + gw_map[gw_name]["nodes_heard"] += 1 + if gw.get("avg_rssi") is not None: + gw_map[gw_name]["rssi_values"].append(gw["avg_rssi"]) + if node.region: + gw_map[gw_name]["regions"].add(node.region) + + # Compute averages + result = {} + for name, data in gw_map.items(): + result[name] = { + "gateway_id": data["gateway_id"], + "nodes_heard": data["nodes_heard"], + "avg_rssi": sum(data["rssi_values"]) / len(data["rssi_values"]) if data["rssi_values"] else None, + "regions_covered": sorted(data["regions"]), + } + return result + + def get_node_feeders(self, node_num: int) -> list: + """Get feeder gateways for a specific node, sorted by signal strength.""" + node = self._nodes.get(node_num) + if node and node.feeder_gateways: + return sorted(node.feeder_gateways, key=lambda g: g.get("avg_rssi") or -999, reverse=True) + return [] + def close(self) -> None: """Close database connection.""" if self._db: diff --git a/meshai/mesh_models.py b/meshai/mesh_models.py index 2f60efc..5607385 100644 --- a/meshai/mesh_models.py +++ b/meshai/mesh_models.py @@ -1,132 +1,139 @@ -"""Unified data models for the mesh data pipeline. - -These dataclasses represent the normalized, merged data model used by -consumers (health engine, reporter, commands). All field normalization -happens in MeshDataStore before populating these models. -""" - -from dataclasses import dataclass, field -from typing import Optional - - -@dataclass -class UnifiedNode: - """Unified node representation with normalized fields. - - Keyed by node_num (canonical Meshtastic node number). - All fields have sensible defaults. - """ - - # Identity - node_num: int - node_id_hex: str = "" # "!a3145a04" - short_name: str = "" - long_name: str = "" - role: str = "UNKNOWN" # ROUTER, CLIENT, etc. - hw_model: str = "" - - # Position - latitude: Optional[float] = None - longitude: Optional[float] = None - altitude: Optional[float] = None - - # Status (current) - last_heard: float = 0.0 # Epoch seconds - is_online: bool = False - hops_away: Optional[int] = None - snr: Optional[float] = None - rssi: Optional[int] = None - - # Power (current) - battery_percent: Optional[float] = None - voltage: Optional[float] = None - - # Power (trend - computed from historical store) - battery_trend: Optional[str] = None # "charging", "stable", "declining" - predicted_depletion_hours: Optional[float] = None - has_solar: bool = False - - # Environment (from sensors - BME280, BME680, SHT31, etc.) - temperature: Optional[float] = None # Celsius - humidity: Optional[float] = None # Relative humidity % - barometric_pressure: Optional[float] = None # hPa - gas_resistance: Optional[float] = None # Ohms (AQI proxy) - iaq: Optional[float] = None # Indoor Air Quality index - - # Light (BH1750, TSL2591, etc.) - light_lux: Optional[float] = None # lux - - # Wind/Weather (DFROBOT_LARK stations) - wind_speed: Optional[float] = None # m/s - wind_direction: Optional[float] = None # degrees - rainfall: Optional[float] = None # mm - - # Air Quality (PMSA003I) - pm1_0: Optional[float] = None # µg/m³ - pm2_5: Optional[float] = None # µg/m³ - pm10: Optional[float] = None # µg/m³ - - # Power monitoring (INA sensors - separate from battery) - ext_voltage: Optional[float] = None # External voltage (e.g., solar panel) - ext_current: Optional[float] = None # External current (mA) - - # Health (MAX30102, MLX - rare) - heart_rate: Optional[float] = None # BPM - spo2: Optional[float] = None # Oxygen saturation % - body_temperature: Optional[float] = None # Celsius - - # Radiation (RadSens) - radiation_cpm: Optional[float] = None # Counts per minute - - # UV (VEML6070, etc.) - uv_index: Optional[float] = None - - # Sensor type flags (set after populating fields) - has_environment_sensor: bool = False - has_air_quality_sensor: bool = False - has_power_sensor: bool = False - has_health_sensor: bool = False - has_weather_station: bool = False - - # Traffic (current, from most recent API data) - packets_sent_24h: int = 0 - packets_seen_24h: int = 0 - packets_by_type: dict[str, int] = field(default_factory=dict) - text_messages_24h: int = 0 - - # Traffic (historical, from SQLite) - packets_sent_48h: int = 0 - packets_sent_7d: int = 0 - packets_sent_14d: int = 0 - daily_packet_counts: dict[str, int] = field(default_factory=dict) - - # Device-reported metrics (current) - channel_utilization: Optional[float] = None - air_util_tx: Optional[float] = None - uptime_seconds: Optional[int] = None - - # Connectivity - uplink_enabled: bool = False - neighbors: list[int] = field(default_factory=list) - neighbor_count: int = 0 - traceroute_appearances: int = 0 - - # Metadata - sources: list[str] = field(default_factory=list) - region: str = "" # Set by health engine - locality: str = "" # Set by health engine - - # Deliverability / Coverage (from Meshview gateway counts) - avg_gateways: Optional[float] = None # Avg unique gateways that hear this node's packets - deliverability_score: Optional[float] = None # % of packets reaching 2+ gateways (0-100) - max_gateways: Optional[int] = None # Max gateways any single packet reached - source_reach: Optional[float] = None # Avg number of Meshview sources that see this node's packets - - # Additional MeshMonitor fields - firmware_version: str = "" - public_key: str = "" - is_mqtt_gateway: bool = False - via_mqtt: bool = False +"""Unified data models for the mesh data pipeline. + +These dataclasses represent the normalized, merged data model used by +consumers (health engine, reporter, commands). All field normalization +happens in MeshDataStore before populating these models. +""" + +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class UnifiedNode: + """Unified node representation with normalized fields. + + Keyed by node_num (canonical Meshtastic node number). + All fields have sensible defaults. + """ + + # Identity + node_num: int + node_id_hex: str = "" # "!a3145a04" + short_name: str = "" + long_name: str = "" + role: str = "UNKNOWN" # ROUTER, CLIENT, etc. + hw_model: str = "" + + # Position + latitude: Optional[float] = None + longitude: Optional[float] = None + altitude: Optional[float] = None + + # Status (current) + last_heard: float = 0.0 # Epoch seconds + is_online: bool = False + hops_away: Optional[int] = None + snr: Optional[float] = None + rssi: Optional[int] = None + + # Power (current) + battery_percent: Optional[float] = None + voltage: Optional[float] = None + + # Power (trend - computed from historical store) + battery_trend: Optional[str] = None # "charging", "stable", "declining" + predicted_depletion_hours: Optional[float] = None + has_solar: bool = False + + # Environment (from sensors - BME280, BME680, SHT31, etc.) + temperature: Optional[float] = None # Celsius + humidity: Optional[float] = None # Relative humidity % + barometric_pressure: Optional[float] = None # hPa + gas_resistance: Optional[float] = None # Ohms (AQI proxy) + iaq: Optional[float] = None # Indoor Air Quality index + + # Light (BH1750, TSL2591, etc.) + light_lux: Optional[float] = None # lux + + # Wind/Weather (DFROBOT_LARK stations) + wind_speed: Optional[float] = None # m/s + wind_direction: Optional[float] = None # degrees + rainfall: Optional[float] = None # mm + + # Air Quality (PMSA003I) + pm1_0: Optional[float] = None # µg/m³ + pm2_5: Optional[float] = None # µg/m³ + pm10: Optional[float] = None # µg/m³ + + # Power monitoring (INA sensors - separate from battery) + ext_voltage: Optional[float] = None # External voltage (e.g., solar panel) + ext_current: Optional[float] = None # External current (mA) + + # Health (MAX30102, MLX - rare) + heart_rate: Optional[float] = None # BPM + spo2: Optional[float] = None # Oxygen saturation % + body_temperature: Optional[float] = None # Celsius + + # Radiation (RadSens) + radiation_cpm: Optional[float] = None # Counts per minute + + # UV (VEML6070, etc.) + uv_index: Optional[float] = None + + # Sensor type flags (set after populating fields) + has_environment_sensor: bool = False + has_air_quality_sensor: bool = False + has_power_sensor: bool = False + has_health_sensor: bool = False + has_weather_station: bool = False + + # Traffic (current, from most recent API data) + packets_sent_24h: int = 0 + packets_seen_24h: int = 0 + packets_by_type: dict[str, int] = field(default_factory=dict) + text_messages_24h: int = 0 + + # Traffic (historical, from SQLite) + packets_sent_48h: int = 0 + packets_sent_7d: int = 0 + packets_sent_14d: int = 0 + daily_packet_counts: dict[str, int] = field(default_factory=dict) + + # Device-reported metrics (current) + channel_utilization: Optional[float] = None + air_util_tx: Optional[float] = None + uptime_seconds: Optional[int] = None + + # Connectivity + uplink_enabled: bool = False + neighbors: list[int] = field(default_factory=list) + neighbor_count: int = 0 + traceroute_appearances: int = 0 + + # Metadata + sources: list[str] = field(default_factory=list) + region: str = "" # Set by health engine + locality: str = "" # Set by health engine + + # Deliverability / Coverage (from Meshview gateway counts) + avg_gateways: Optional[float] = None # Avg unique gateways that hear this node's packets + + # Feeder-level gateway awareness (from /api/packets_seen across sources) + feeder_gateways: list = field(default_factory=list) + # Each entry: {"gateway_id": str, "gateway_name": str, "avg_rssi": float, "avg_snr": float, "packet_count": int} + feeder_count: int = 0 # Number of unique physical gateways that hear this node + feeder_best: Optional[str] = None # Gateway with strongest signal to this node + feeder_worst: Optional[str] = None # Gateway with weakest signal + deliverability_score: Optional[float] = None # % of packets reaching 2+ gateways (0-100) + max_gateways: Optional[int] = None # Max gateways any single packet reached + source_reach: Optional[float] = None # Avg number of Meshview sources that see this node's packets + + # Additional MeshMonitor fields + firmware_version: str = "" + public_key: str = "" + is_mqtt_gateway: bool = False + via_mqtt: bool = False # Health scoring (set by MeshHealthEngine) is_infrastructure: bool = False @@ -136,113 +143,113 @@ class UnifiedNode: coverage_score_node: float = 100.0 behavior_score: float = 100.0 power_score: float = 100.0 - - -@dataclass -class UnifiedEdge: - """Unified edge (link) between two nodes.""" - - from_node: int - to_node: int - snr: Optional[float] = None - rssi: Optional[int] = None - quality: Optional[float] = None - last_seen: float = 0.0 - sources: list[str] = field(default_factory=list) - - -@dataclass -class UnifiedTraceroute: - """Unified traceroute record.""" - - from_node: int - to_node: int - route: list[int] = field(default_factory=list) # Node nums in path - route_back: list[int] = field(default_factory=list) - snr_towards: list[float] = field(default_factory=list) - snr_back: list[float] = field(default_factory=list) - timestamp: float = 0.0 - request_id: int = 0 - sources: list[str] = field(default_factory=list) - - -@dataclass -class UnifiedChannel: - """Unified channel configuration.""" - - channel_id: int - name: str = "" - role: int = 0 # 0=disabled, 1=primary, 2=secondary - role_name: str = "" - uplink_enabled: bool = False - downlink_enabled: bool = False - position_precision: int = 0 - sources: list[str] = field(default_factory=list) - - -@dataclass -class UnifiedSolar: - """Solar estimate for a node.""" - - node_num: int - estimate_watts: Optional[float] = None - confidence: Optional[float] = None - timestamp: float = 0.0 - sources: list[str] = field(default_factory=list) - - -@dataclass -class DailyTraffic: - """Per-day aggregate traffic counts.""" - - date: str # "2026-05-04" - total_packets: int = 0 - packets_by_type: dict[str, int] = field(default_factory=dict) - source: str = "" - - -@dataclass -class NodeSnapshot: - """Point-in-time snapshot of a node's metrics.""" - - timestamp: float - node_num: int - short_name: str = "" - long_name: str = "" - role: str = "" - hw_model: str = "" - latitude: Optional[float] = None - longitude: Optional[float] = None - last_heard: float = 0.0 - is_online: bool = False - battery_percent: Optional[float] = None - voltage: Optional[float] = None - packets_sent_24h: int = 0 - packets_seen_24h: int = 0 - channel_utilization: Optional[float] = None - air_util_tx: Optional[float] = None - uplink_enabled: bool = False - neighbor_count: int = 0 - hops_away: Optional[int] = None - snr: Optional[float] = None - sources: str = "" - - -@dataclass -class MeshSnapshot: - """Point-in-time snapshot of mesh-wide metrics.""" - - timestamp: float - total_nodes: int = 0 - active_nodes: int = 0 - infra_online: int = 0 - infra_total: int = 0 - total_packets_24h: int = 0 - avg_channel_utilization: Optional[float] = None - avg_battery_percent: Optional[float] = None - source_count: int = 0 - # Deliverability metrics - avg_gateways_mesh: Optional[float] = None - total_packets_global: Optional[int] = None - total_seen_global: Optional[int] = None - unique_feeders: Optional[int] = None + + +@dataclass +class UnifiedEdge: + """Unified edge (link) between two nodes.""" + + from_node: int + to_node: int + snr: Optional[float] = None + rssi: Optional[int] = None + quality: Optional[float] = None + last_seen: float = 0.0 + sources: list[str] = field(default_factory=list) + + +@dataclass +class UnifiedTraceroute: + """Unified traceroute record.""" + + from_node: int + to_node: int + route: list[int] = field(default_factory=list) # Node nums in path + route_back: list[int] = field(default_factory=list) + snr_towards: list[float] = field(default_factory=list) + snr_back: list[float] = field(default_factory=list) + timestamp: float = 0.0 + request_id: int = 0 + sources: list[str] = field(default_factory=list) + + +@dataclass +class UnifiedChannel: + """Unified channel configuration.""" + + channel_id: int + name: str = "" + role: int = 0 # 0=disabled, 1=primary, 2=secondary + role_name: str = "" + uplink_enabled: bool = False + downlink_enabled: bool = False + position_precision: int = 0 + sources: list[str] = field(default_factory=list) + + +@dataclass +class UnifiedSolar: + """Solar estimate for a node.""" + + node_num: int + estimate_watts: Optional[float] = None + confidence: Optional[float] = None + timestamp: float = 0.0 + sources: list[str] = field(default_factory=list) + + +@dataclass +class DailyTraffic: + """Per-day aggregate traffic counts.""" + + date: str # "2026-05-04" + total_packets: int = 0 + packets_by_type: dict[str, int] = field(default_factory=dict) + source: str = "" + + +@dataclass +class NodeSnapshot: + """Point-in-time snapshot of a node's metrics.""" + + timestamp: float + node_num: int + short_name: str = "" + long_name: str = "" + role: str = "" + hw_model: str = "" + latitude: Optional[float] = None + longitude: Optional[float] = None + last_heard: float = 0.0 + is_online: bool = False + battery_percent: Optional[float] = None + voltage: Optional[float] = None + packets_sent_24h: int = 0 + packets_seen_24h: int = 0 + channel_utilization: Optional[float] = None + air_util_tx: Optional[float] = None + uplink_enabled: bool = False + neighbor_count: int = 0 + hops_away: Optional[int] = None + snr: Optional[float] = None + sources: str = "" + + +@dataclass +class MeshSnapshot: + """Point-in-time snapshot of mesh-wide metrics.""" + + timestamp: float + total_nodes: int = 0 + active_nodes: int = 0 + infra_online: int = 0 + infra_total: int = 0 + total_packets_24h: int = 0 + avg_channel_utilization: Optional[float] = None + avg_battery_percent: Optional[float] = None + source_count: int = 0 + # Deliverability metrics + avg_gateways_mesh: Optional[float] = None + total_packets_global: Optional[int] = None + total_seen_global: Optional[int] = None + unique_feeders: Optional[int] = None diff --git a/meshai/mesh_reporter.py b/meshai/mesh_reporter.py index ffdf899..d60c79a 100644 --- a/meshai/mesh_reporter.py +++ b/meshai/mesh_reporter.py @@ -457,6 +457,19 @@ class MeshReporter: if pb["critical"]: parts.append(f"{pb['critical']} battery critical") lines.append(f"POWER (infra): {', '.join(parts)}") + # Feeder gateway summary + all_nodes = list(health.nodes.values()) + nodes_with_feeders = [n for n in all_nodes if getattr(n, 'feeder_count', 0) > 0] + if nodes_with_feeders: + total_unique_gw = set() + for n in nodes_with_feeders: + for gw in n.feeder_gateways: + total_unique_gw.add(gw["gateway_id"]) + + avg_feeders = sum(n.feeder_count for n in nodes_with_feeders) / len(nodes_with_feeders) + lines.append("") + lines.append(f"FEEDERS: {len(total_unique_gw)} physical gateways, avg {avg_feeders:.1f} per node ({len(nodes_with_feeders)} nodes sampled)") + # Source health section lines.extend(self._build_source_health_section()) @@ -935,6 +948,27 @@ class MeshReporter: if node.sources: lines.append(f" Seen by: {', '.join(node.sources)} ({len(node.sources)} sources)") + # Feeder gateways + if node.feeder_gateways: + lines.append(f" Feeders ({node.feeder_count} gateways):") + # Sort by signal strength (best RSSI first, closest to 0) + sorted_gw = sorted( + node.feeder_gateways, + key=lambda g: g.get("avg_rssi") or -999, + reverse=True + ) + for gw in sorted_gw[:8]: # Top 8 + name_str = gw.get("gateway_name") or gw["gateway_id"] + parts = [] + if gw.get("avg_rssi") is not None: + parts.append(f"RSSI {gw['avg_rssi']:.0f}") + if gw.get("avg_snr") is not None: + parts.append(f"SNR {gw['avg_snr']:.1f}") + sig_str = f" [{', '.join(parts)}]" if parts else "" + lines.append(f" {name_str}{sig_str}") + if len(sorted_gw) > 8: + lines.append(f" ...and {len(sorted_gw) - 8} more") + # Neighbors section if node.neighbors: lines.append("") diff --git a/meshai/sources/meshview.py b/meshai/sources/meshview.py index 4ac23c9..6459399 100644 --- a/meshai/sources/meshview.py +++ b/meshai/sources/meshview.py @@ -26,6 +26,7 @@ class MeshviewSource: ("edges", 6), # Every 6 ticks (3 min) ("counts", 8), # Every 8 ticks (4 min) ("traceroutes", 10), # Every 10 ticks (5 min) + ("feeders", 20), # Every 20 ticks (10 min) — sample packets_seen for gateway data ] def __init__(self, url: str, refresh_interval: int = 30, polite_mode: bool = False): @@ -64,11 +65,15 @@ class MeshviewSource: self._is_loaded: bool = False self._data_changed: bool = False + # Feeder gateway data from packets_seen + self._feeder_data: dict = {} # {from_node: [{gateway_id, gateway_name, avg_rssi, avg_snr, packet_count}]} + # Capabilities (discovered on first fetch) self._capabilities: dict = { "packets": True, "packets_since": False, "traceroutes": True, + "packets_seen": True, } self._capabilities_probed: bool = False @@ -180,6 +185,8 @@ class MeshviewSource: success = self._fetch_counts() elif endpoint == "traceroutes": success = self._fetch_traceroutes() + elif endpoint == "feeders": + success = self._fetch_feeders() if success: self._consecutive_errors = 0 @@ -389,6 +396,124 @@ class MeshviewSource: return True return False + + def _fetch_feeders(self) -> bool: + """Sample recent packets to discover which gateway hears them. + + Calls /api/packets_seen/{packet_id} for a sample of recent packets. + Builds a per-node gateway map with signal quality. + + Note: Each Meshview source has ONE gateway feeding it. To get multiple + gateways, the data store aggregates feeder data across all sources. + """ + if not self._packets: + return False + + if not self._capabilities.get("packets_seen", True): + return False + + # Sample 20 recent packets spread across different senders + by_sender = {} + for pkt in reversed(self._packets): # Most recent first + sender = pkt.get("from_node") or pkt.get("from") or pkt.get("from_node_id") + if sender and sender not in by_sender: + by_sender[sender] = pkt + if len(by_sender) >= 20: + break + + sample_packets = list(by_sender.values()) + if not sample_packets: + return False + + # Build: node_gateways[from_node] = {gateway_id: {"rssi_values": [], "snr_values": []}} + node_gateways: dict = {} + errors = 0 + + for pkt in sample_packets: + pkt_id = pkt.get("packet_id") or pkt.get("id") + from_node = pkt.get("from_node") or pkt.get("from") or pkt.get("from_node_id") + if not pkt_id or not from_node: + continue + + seen_data = self._fetch_with_tracking(f"/api/packets_seen/{pkt_id}") + if seen_data is None: + errors += 1 + if errors >= 3: + # Endpoint probably doesn't exist + self._capabilities["packets_seen"] = False + logger.info(f"Meshview {self._url}: packets_seen not available, disabling") + return False + continue + + # Extract gateway list from {"seen": [...]} + gateways = [] + if isinstance(seen_data, dict): + gateways = seen_data.get("seen", []) + elif isinstance(seen_data, list): + gateways = seen_data + + if not gateways: + continue + + if from_node not in node_gateways: + node_gateways[from_node] = {} + + for gw in gateways: + # Fields from API: node_id, rx_snr, rx_rssi, topic + gw_node_id = gw.get("node_id") + if not gw_node_id: + continue + + gw_id = str(gw_node_id) + rssi = gw.get("rx_rssi") + snr = gw.get("rx_snr") + + # Extract hex ID from topic for gateway name lookup + # topic format: "msh/US/2/e/Freq51/!27780c47" + topic = gw.get("topic", "") + gw_hex = "" + if topic and "!" in topic: + gw_hex = topic.split("!")[-1] if "!" in topic else "" + gw_hex = "!" + gw_hex if gw_hex else "" + + if gw_id not in node_gateways[from_node]: + node_gateways[from_node][gw_id] = { + "gateway_id": gw_id, + "gateway_hex": gw_hex, + "rssi_values": [], + "snr_values": [], + } + + if rssi is not None: + node_gateways[from_node][gw_id]["rssi_values"].append(rssi) + if snr is not None: + node_gateways[from_node][gw_id]["snr_values"].append(snr) + + # Aggregate into feeder_data + self._feeder_data = {} + for from_node, gateways in node_gateways.items(): + self._feeder_data[from_node] = [] + for gw_id, gw_info in gateways.items(): + avg_rssi = sum(gw_info["rssi_values"]) / len(gw_info["rssi_values"]) if gw_info["rssi_values"] else None + avg_snr = sum(gw_info["snr_values"]) / len(gw_info["snr_values"]) if gw_info["snr_values"] else None + self._feeder_data[from_node].append({ + "gateway_id": gw_id, + "gateway_hex": gw_info["gateway_hex"], + "gateway_name": "", # Will be filled in by data store from node lookup + "avg_rssi": avg_rssi, + "avg_snr": avg_snr, + "packet_count": len(gw_info["rssi_values"]) or len(gw_info["snr_values"]) or 1, + }) + + self._data_changed = True + logger.info(f"Feeder sampling: {len(sample_packets)} packets, {len(self._feeder_data)} nodes with gateway data") + return True + + @property + def feeder_data(self) -> dict: + """Get per-node feeder gateway data.""" + return self._feeder_data + def fetch_all(self) -> bool: """Fetch all data at once. Used for initial load and force refresh.""" success = 0