feat: Tick-based staggered polling for all sources

Both MeshviewSource and MeshMonitorDataSource now use tick-based
staggered polling instead of batch-every-5-minutes:

MeshviewSource (30s ticks):
- Packets: every tick (30s)
- Nodes: every 4 ticks (2 min)
- Stats/Edges: every 6 ticks (3 min)
- Traceroutes: every 10 ticks (5 min)

MeshMonitorDataSource (30s ticks):
- Packets: every 2 ticks (60s)
- Nodes/Telemetry: every 4 ticks (2 min)
- Traceroutes/Channels/Network/Topology: every 10 ticks (5 min)
- Solar: every 20 ticks (10 min)

Features:
- Source health status (avg_response_ms, tick_count, backed_off)
- Source coverage analysis (unique vs shared nodes)
- Tier 1 DATA SOURCES section shows all source health
- Node detail shows source visibility
- Incremental packets and telemetry with dedup
- Rate limit detection (429) with backoff
- Consecutive error exponential backoff
- polite_mode config option for shared instances

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-05 19:16:00 +00:00
commit b3c79f12da
5 changed files with 1137 additions and 473 deletions

View file

@ -145,7 +145,8 @@ class MeshMonitorConfig:
enabled: bool = False
url: str = "" # e.g., http://100.64.0.11:3333
inject_into_prompt: bool = True # Tell LLM about MeshMonitor commands
refresh_interval: int = 300 # Seconds between refreshes
refresh_interval: int = 30 # Tick interval in seconds (default 30)
polite_mode: bool = False # Reduces polling frequency for shared instances # Seconds between refreshes
@dataclass
@ -165,7 +166,8 @@ class MeshSourceConfig:
type: str = "" # "meshview" or "meshmonitor"
url: str = ""
api_token: str = "" # MeshMonitor only, supports ${ENV_VAR}
refresh_interval: int = 300
refresh_interval: int = 30 # Tick interval in seconds (default 30)
polite_mode: bool = False # Reduces polling frequency for shared instances
enabled: bool = True

View file

@ -298,19 +298,23 @@ class MeshDataStore:
try:
if src_type == "meshview":
polite = getattr(cfg, 'polite_mode', False) if not isinstance(cfg, dict) else cfg.get('polite_mode', False)
self._sources[name] = MeshviewSource(
url=url,
refresh_interval=refresh_interval,
polite_mode=polite,
)
logger.info(f"Registered Meshview source '{name}' -> {url}")
logger.info(f"Registered Meshview source '{name}' -> {url} (polite={polite})")
elif src_type == "meshmonitor":
polite = getattr(cfg, 'polite_mode', False) if not isinstance(cfg, dict) else cfg.get('polite_mode', False)
self._sources[name] = MeshMonitorDataSource(
url=url,
api_token=api_token,
refresh_interval=refresh_interval,
polite_mode=polite,
)
logger.info(f"Registered MeshMonitor source '{name}' -> {url}")
logger.info(f"Registered MeshMonitor source '{name}' -> {url} (polite={polite})")
else:
logger.warning(f"Unknown source type '{src_type}' for '{name}'")
@ -380,16 +384,53 @@ class MeshDataStore:
logger.info(f"Purged {len(stale_nums)} stale nodes (not heard in {STALE_NODE_THRESHOLD_DAYS} days)")
def refresh(self) -> bool:
"""Refresh data from all sources if interval has elapsed.
"""Tick-based refresh. Called every second from the main loop.
Delegates to source tick() for sources that support it.
Only does a full rebuild when nodes/edges/topology change.
Only does a lightweight update when only packets change.
Returns:
True if any source was refreshed
True if any data changed
"""
now = time.time()
if now - self._last_refresh < self._refresh_interval:
return False
any_changed = False
needs_rebuild = False
needs_packet_update = False
return self._do_refresh()
for name, source in self._sources.items():
# Check if this source supports tick-based polling
if hasattr(source, 'tick') and hasattr(source, '_tick_interval'):
if now - source._last_tick >= source._tick_interval:
endpoint = source.tick()
if endpoint:
any_changed = True
# Major changes require full rebuild
if endpoint in ("nodes", "edges", "traceroutes", "topology", "telemetry"):
needs_rebuild = True
# Packet-only changes are lightweight
elif endpoint in ("packets",):
needs_packet_update = True
# stats, counts, channels, solar, network just update cached data
else:
# Legacy fallback for sources without tick support
if source.maybe_refresh():
any_changed = True
needs_rebuild = True
if needs_rebuild:
self._rebuild()
self._purge_stale_nodes()
self._store_snapshot()
self._purge_old_data()
self._last_refresh = now
self._is_loaded = True
elif needs_packet_update:
# Lightweight: just update packet-derived metrics without full rebuild
self._update_packet_metrics()
self._last_refresh = now
return any_changed
def force_refresh(self) -> bool:
"""Force an immediate refresh, bypassing the interval timer.
@ -407,6 +448,50 @@ class MeshDataStore:
self._last_force_refresh = now
return self._do_refresh(force=True)
def _update_packet_metrics(self):
"""Lightweight update when only new packets arrived.
Updates:
- Per-node packet counts
- Top senders
- Deliverability (source overlap)
Does NOT rebuild the full node model.
"""
# Recount packets per node from all sources
packet_counts: dict[int, int] = {}
packets_by_type: dict[int, dict[str, int]] = {}
for name, source in self._sources.items():
for pkt in (source.packets if hasattr(source, 'packets') else []):
from_node = pkt.get("from_node") or pkt.get("from_node_id") or pkt.get("from")
if from_node is None:
continue
# Normalize to int
if isinstance(from_node, str):
stripped = from_node.lstrip("!")
try:
from_node = int(stripped, 16) if not stripped.isdigit() else int(stripped)
except ValueError:
continue
packet_counts[from_node] = packet_counts.get(from_node, 0) + 1
portnum = pkt.get("portnum") or pkt.get("portnum_name") or pkt.get("type") or "UNKNOWN"
if from_node not in packets_by_type:
packets_by_type[from_node] = {}
packets_by_type[from_node][portnum] = packets_by_type[from_node].get(portnum, 0) + 1
# Update nodes
for node_num, count in packet_counts.items():
if node_num in self._nodes:
self._nodes[node_num].packets_sent_24h = count
if node_num in packets_by_type:
self._nodes[node_num].packets_by_type = packets_by_type[node_num]
logger.debug(f"Packet metrics updated: {sum(packet_counts.values())} packets across {len(packet_counts)} nodes")
def _do_refresh(self, force: bool = False) -> bool:
"""Perform the actual refresh.
@ -2167,6 +2252,53 @@ class MeshDataStore:
result.append(ch_dict)
return result
def get_source_coverage(self) -> dict:
"""Get per-source node coverage.
Returns:
{
"meshview-local": {"node_count": 200, "unique_nodes": 50, "shared_nodes": 150},
"meshview-freq51": {"node_count": 800, "unique_nodes": 400, "shared_nodes": 400},
...
}
"""
coverage = {}
for name in self._sources:
nodes_in_source = [n for n in self._nodes.values() if name in n.sources]
unique = [n for n in nodes_in_source if len(n.sources) == 1]
coverage[name] = {
"node_count": len(nodes_in_source),
"unique_nodes": len(unique),
"shared_nodes": len(nodes_in_source) - len(unique),
}
return coverage
def get_nodes_by_source(self, source_name: str) -> list:
"""Get all nodes visible to a specific source."""
return [n for n in self._nodes.values() if source_name in n.sources]
def get_exclusive_nodes(self, source_name: str) -> list:
"""Get nodes ONLY visible to this source (not seen by any other)."""
return [n for n in self._nodes.values() if n.sources == [source_name]]
def get_source_health(self) -> list[dict]:
"""Get health status of all sources."""
health = []
for name, source in self._sources.items():
if hasattr(source, 'health_status'):
status = source.health_status
status["name"] = name
health.append(status)
else:
health.append({
"name": name,
"is_loaded": source.is_loaded,
"last_error": source.last_error,
"cached_nodes": len(source.nodes),
})
return health
def close(self) -> None:
"""Close database connection."""
if self._db:

View file

@ -36,8 +36,9 @@ PORTNUM_DISPLAY = {
"ATAK_FORWARDER": "ATAK",
}
def _clean_portnum(portnum: str) -> str:
def _clean_portnum(portnum) -> str:
"""Convert raw portnum to display name."""
if isinstance(portnum, int): portnum = str(portnum)
return PORTNUM_DISPLAY.get(portnum, portnum.replace("_APP", "").replace("_", " ").title())
@ -175,6 +176,34 @@ class MeshReporter:
return f"{local} ({desc})"
return local or desc
def _build_source_health_section(self) -> list[str]:
"""Build source health section for Tier 1."""
lines = []
lines.append("")
lines.append("DATA SOURCES:")
for name, source in self.data_store._sources.items():
if hasattr(source, 'health_status'):
status = source.health_status
err_str = f" - {status['last_error']}" if status.get('last_error') else ""
backed = " [BACKED OFF]" if status.get('backed_off') else ""
polite = " [POLITE]" if status.get('polite_mode') else ""
lines.append(
f" {name}: {status.get('cached_nodes', 0)} nodes, "
f"{status.get('cached_packets', 0)} pkts, "
f"avg {status.get('avg_response_ms', 0)}ms"
f"{polite}{backed}{err_str}"
)
else:
# Legacy source without health_status
node_count = len(source.nodes) if hasattr(source, 'nodes') else 0
loaded = "OK" if source.is_loaded else "ERR"
err = f" - {source.last_error}" if source.last_error else ""
lines.append(f" {name}: [{loaded}] {node_count} nodes{err}")
return lines
def build_tier1_summary(self) -> str:
"""Build comprehensive mesh health summary with full data for LLM context."""
health = self.health_engine.mesh_health
@ -428,6 +457,9 @@ class MeshReporter:
if pb["critical"]: parts.append(f"{pb['critical']} battery critical")
lines.append(f"POWER (infra): {', '.join(parts)}")
# Source health section
lines.extend(self._build_source_health_section())
lines.append("")
lines.append(f"TOTAL: {health.total_nodes} nodes across {health.total_regions} regions.")
@ -899,6 +931,10 @@ class MeshReporter:
status = "Single gateway - node goes dark if that gateway fails"
lines.append(f" Coverage: {node.avg_gateways:.0f}/{total_gw} gateways ({pct:.0f}%) - {status}")
# Source visibility
if node.sources:
lines.append(f" Seen by: {', '.join(node.sources)} ({len(node.sources)} sources)")
# Neighbors section
if node.neighbors:
lines.append("")

View file

@ -1,270 +1,515 @@
"""MeshMonitor API data source."""
import json
import logging
import os
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from .. import __version__
logger = logging.getLogger(__name__)
USER_AGENT = f"MeshAI/{__version__}"
class MeshMonitorDataSource:
"""Fetches mesh data from a MeshMonitor instance."""
def __init__(self, url: str, api_token: str, refresh_interval: int = 300):
"""Initialize MeshMonitor data source.
Args:
url: Base URL of MeshMonitor instance (e.g., http://192.168.1.100:3333)
api_token: API token for authentication. Supports ${ENV_VAR} format.
refresh_interval: Seconds between refresh checks (default 5 minutes)
"""
self._url = url.rstrip("/")
self._api_token = self._resolve_token(api_token)
self._refresh_interval = refresh_interval
# Cached data
self._nodes: list[dict] = []
self._channels: list[dict] = []
self._telemetry: list[dict] = []
self._traceroutes: list[dict] = []
self._network_stats: Optional[dict] = None
self._topology: Optional[dict] = None
self._packets: list[dict] = []
self._solar: list[dict] = []
self._last_refresh: float = 0.0
self._last_error: Optional[str] = None
self._is_loaded: bool = False
def _resolve_token(self, token: str) -> str:
"""Resolve token, supporting ${ENV_VAR} format.
Args:
token: API token or env var reference
Returns:
Resolved token value
"""
if token.startswith("${") and token.endswith("}"):
env_var = token[2:-1]
return os.environ.get(env_var, "")
return token
@property
def nodes(self) -> list[dict]:
"""Get cached nodes list."""
return self._nodes
@property
def channels(self) -> list[dict]:
"""Get cached channels list."""
return self._channels
@property
def telemetry(self) -> list[dict]:
"""Get cached telemetry list."""
return self._telemetry
@property
def traceroutes(self) -> list[dict]:
"""Get cached traceroutes list."""
return self._traceroutes
@property
def network_stats(self) -> Optional[dict]:
"""Get cached network stats."""
return self._network_stats
@property
def topology(self) -> Optional[dict]:
"""Get cached topology."""
return self._topology
@property
def packets(self) -> list[dict]:
"""Get cached packets list."""
return self._packets
@property
def solar(self) -> list[dict]:
"""Get cached solar estimates list."""
return self._solar
@property
def last_refresh(self) -> float:
"""Get last refresh timestamp (epoch)."""
return self._last_refresh
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
@property
def is_loaded(self) -> bool:
"""Check if data has been successfully loaded."""
return self._is_loaded
def _fetch_json(self, endpoint: str) -> Optional[dict | list]:
"""Fetch JSON from an endpoint with Bearer auth.
Args:
endpoint: API endpoint path (e.g., /api/v1/nodes)
Returns:
Parsed JSON data or None on error
"""
url = f"{self._url}{endpoint}"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {self._api_token}",
"User-Agent": USER_AGENT,
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
# MeshMonitor wraps responses in {"success": true, "data": [...]}
# Extract the actual data if wrapped
if isinstance(data, dict) and "data" in data:
return data["data"]
return data
except HTTPError as e:
logger.warning(f"MeshMonitor {endpoint}: HTTP {e.code} {e.reason}")
return None
except URLError as e:
logger.warning(f"MeshMonitor {endpoint}: Connection error - {e.reason}")
return None
except json.JSONDecodeError as e:
logger.warning(f"MeshMonitor {endpoint}: Invalid JSON - {e}")
return None
except Exception as e:
logger.warning(f"MeshMonitor {endpoint}: {e}")
return None
def fetch_all(self) -> bool:
"""Fetch all data from MeshMonitor API.
Fetches all endpoints independently. One failure doesn't block others.
Returns:
True if at least one endpoint succeeded
"""
success_count = 0
errors = []
# Fetch nodes
data = self._fetch_json("/api/v1/nodes")
if data is not None:
self._nodes = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._nodes)} nodes")
else:
errors.append("nodes")
# Fetch channels
data = self._fetch_json("/api/v1/channels")
if data is not None:
self._channels = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._channels)} channels")
else:
errors.append("channels")
# Fetch telemetry - BUG 6 FIX: Request more records for 24h coverage
data = self._fetch_json("/api/v1/telemetry?limit=5000")
if data is None:
# Fallback without limit param
data = self._fetch_json("/api/v1/telemetry")
if data is not None:
self._telemetry = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._telemetry)} telemetry records")
else:
errors.append("telemetry")
# Fetch traceroutes - BUG 6 FIX: Request more records
data = self._fetch_json("/api/v1/traceroutes?limit=1000")
if data is None:
data = self._fetch_json("/api/v1/traceroutes")
if data is not None:
self._traceroutes = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._traceroutes)} traceroutes")
else:
errors.append("traceroutes")
# Fetch network stats
data = self._fetch_json("/api/v1/network")
if data is not None:
self._network_stats = data if isinstance(data, dict) else None
success_count += 1
logger.debug("MeshMonitor: fetched network stats")
else:
errors.append("network")
# Fetch topology
data = self._fetch_json("/api/v1/network/topology")
if data is not None:
self._topology = data if isinstance(data, dict) else None
success_count += 1
logger.debug("MeshMonitor: fetched topology")
else:
errors.append("topology")
# Fetch packets - BUG 6 FIX: Request more packets for 24h coverage
data = self._fetch_json("/api/v1/packets?limit=5000")
if data is None:
# Fallback without limit param
data = self._fetch_json("/api/v1/packets")
if data is not None:
self._packets = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._packets)} packets")
else:
errors.append("packets")
# Fetch solar estimates
data = self._fetch_json("/api/v1/solar")
if data is not None:
self._solar = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._solar)} solar estimates")
else:
errors.append("solar")
# Update state
self._last_refresh = time.time()
if success_count > 0:
self._is_loaded = True
self._last_error = None
logger.info(
f"MeshMonitor refresh: {len(self._nodes)} nodes, "
f"{len(self._telemetry)} telemetry, {len(self._traceroutes)} traceroutes"
)
return True
else:
self._last_error = f"All endpoints failed: {', '.join(errors)}"
logger.error(f"MeshMonitor: {self._last_error}")
return False
def maybe_refresh(self) -> bool:
"""Refresh data if interval has elapsed.
Returns:
True if refresh was performed
"""
if time.time() - self._last_refresh >= self._refresh_interval:
return self.fetch_all()
return False
"""MeshMonitor API data source with tick-based staggered polling."""
import json
import logging
import os
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from .. import __version__
logger = logging.getLogger(__name__)
USER_AGENT = f"MeshAI/{__version__}"
class MeshMonitorDataSource:
"""Fetches mesh data from a MeshMonitor instance with staggered polling."""
# Endpoint schedule: (endpoint, interval_ticks)
# At 30s per tick: 2 ticks = 60s, 4 ticks = 2min, 10 ticks = 5min
ENDPOINT_SCHEDULE = [
("packets", 2), # Every 2 ticks (60s)
("nodes", 4), # Every 4 ticks (2 min)
("telemetry", 4), # Every 4 ticks (2 min)
("traceroutes", 10), # Every 10 ticks (5 min)
("channels", 10), # Every 10 ticks (5 min)
("network", 10), # Every 10 ticks (5 min)
("topology", 10), # Every 10 ticks (5 min)
("solar", 20), # Every 20 ticks (10 min)
]
def __init__(self, url: str, api_token: str, refresh_interval: int = 30, polite_mode: bool = False):
"""Initialize MeshMonitor data source.
Args:
url: Base URL of MeshMonitor instance (e.g., http://192.168.1.100:3333)
api_token: API token for authentication. Supports ${ENV_VAR} format.
refresh_interval: Seconds between ticks (default 30)
polite_mode: If True, use longer intervals
"""
self._url = url.rstrip("/")
self._api_token = self._resolve_token(api_token)
self._tick_interval = refresh_interval
self._polite_mode = polite_mode
# Cached data
self._nodes: list[dict] = []
self._channels: list[dict] = []
self._telemetry: list[dict] = []
self._traceroutes: list[dict] = []
self._network_stats: Optional[dict] = None
self._topology: Optional[dict] = None
self._packets: list[dict] = []
self._solar: list[dict] = []
# Tick state
self._tick_count: int = 0
self._last_tick: float = 0.0
self._last_packet_timestamp: float = 0.0
self._last_telemetry_timestamp: float = 0.0
# Rate limit / health tracking
self._backoff_until: float = 0.0
self._avg_response_ms: float = 0.0
self._consecutive_errors: int = 0
self._max_consecutive_errors: int = 5
# Status
self._last_error: Optional[str] = None
self._is_loaded: bool = False
self._data_changed: bool = False
def _resolve_token(self, token: str) -> str:
"""Resolve token, supporting ${ENV_VAR} format."""
if token.startswith("${") and token.endswith("}"):
env_var = token[2:-1]
return os.environ.get(env_var, "")
return token
@property
def nodes(self) -> list[dict]:
"""Get cached nodes list."""
return self._nodes
@property
def channels(self) -> list[dict]:
"""Get cached channels list."""
return self._channels
@property
def telemetry(self) -> list[dict]:
"""Get cached telemetry list."""
return self._telemetry
@property
def traceroutes(self) -> list[dict]:
"""Get cached traceroutes list."""
return self._traceroutes
@property
def network_stats(self) -> Optional[dict]:
"""Get cached network stats."""
return self._network_stats
@property
def topology(self) -> Optional[dict]:
"""Get cached topology."""
return self._topology
@property
def packets(self) -> list[dict]:
"""Get cached packets list."""
return self._packets
@property
def solar(self) -> list[dict]:
"""Get cached solar estimates list."""
return self._solar
@property
def last_refresh(self) -> float:
"""Get last tick timestamp (epoch)."""
return self._last_tick
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
@property
def is_loaded(self) -> bool:
"""Check if data has been successfully loaded."""
return self._is_loaded
@property
def data_changed(self) -> bool:
"""Check if data has changed since last check, then reset flag."""
changed = self._data_changed
self._data_changed = False
return changed
@property
def health_status(self) -> dict:
"""Get source health status for monitoring."""
return {
"url": self._url,
"is_loaded": self._is_loaded,
"last_error": self._last_error,
"avg_response_ms": round(self._avg_response_ms),
"consecutive_errors": self._consecutive_errors,
"backed_off": time.time() < self._backoff_until,
"tick_count": self._tick_count,
"cached_packets": len(self._packets),
"cached_nodes": len(self._nodes),
"cached_telemetry": len(self._telemetry),
"polite_mode": self._polite_mode,
}
def tick(self) -> Optional[str]:
"""Execute one polling tick. Called every 30 seconds.
Returns:
Name of endpoint that was fetched, or None if skipped/rate-limited
"""
now = time.time()
# Rate limit backoff
if now < self._backoff_until:
return None
# Consecutive error backoff (exponential)
if self._consecutive_errors >= self._max_consecutive_errors:
backoff = min(300, 30 * (2 ** (self._consecutive_errors - self._max_consecutive_errors)))
if now - self._last_tick < backoff:
return None
self._tick_count += 1
self._last_tick = now
# Determine which endpoint to call this tick
endpoint = self._select_endpoint()
if not endpoint:
return None
# Execute the call
success = False
if endpoint == "packets":
success = self._fetch_packets_incremental()
elif endpoint == "nodes":
success = self._fetch_nodes()
elif endpoint == "telemetry":
success = self._fetch_telemetry_incremental()
elif endpoint == "traceroutes":
success = self._fetch_traceroutes()
elif endpoint == "channels":
success = self._fetch_channels()
elif endpoint == "network":
success = self._fetch_network()
elif endpoint == "topology":
success = self._fetch_topology()
elif endpoint == "solar":
success = self._fetch_solar()
if success:
self._consecutive_errors = 0
self._is_loaded = True
self._last_error = None
else:
self._consecutive_errors += 1
return endpoint if success else None
def _select_endpoint(self) -> Optional[str]:
"""Select which endpoint to call on this tick based on schedule."""
schedule = self.ENDPOINT_SCHEDULE
# In polite mode, double the intervals
if self._polite_mode:
schedule = [(ep, interval * 2) for ep, interval in schedule]
# Find the highest-priority endpoint that's due
for endpoint, interval_ticks in schedule:
if self._tick_count % interval_ticks == 0:
return endpoint
return None
def _fetch_with_tracking(self, endpoint: str) -> Optional[dict | list]:
"""Fetch JSON with Bearer auth, response tracking, and rate limit detection."""
url = f"{self._url}{endpoint}"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {self._api_token}",
"User-Agent": USER_AGENT,
}
start = time.time()
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
elapsed_ms = (time.time() - start) * 1000
# Track response time (rolling average)
self._avg_response_ms = (self._avg_response_ms * 0.8) + (elapsed_ms * 0.2)
# Warn if slowing down
if elapsed_ms > 5000:
logger.warning(
f"MeshMonitor {self._url} slow: {elapsed_ms:.0f}ms on {endpoint}"
)
data = json.loads(resp.read().decode("utf-8"))
# MeshMonitor wraps responses in {"success": true, "data": [...]}
if isinstance(data, dict) and "data" in data:
return data["data"]
return data
except HTTPError as e:
if e.code == 429:
retry_after = int(e.headers.get('Retry-After', 60))
self._backoff_until = time.time() + retry_after
logger.warning(
f"Rate limited by MeshMonitor. Backing off {retry_after}s"
)
self._last_error = f"Rate limited (429), backing off {retry_after}s"
elif e.code == 401:
# Token may be expired
self._backoff_until = time.time() + 300 # Back off 5 min
logger.error("MeshMonitor: API token may be expired or invalid (401)")
self._last_error = "API token expired or invalid (401)"
elif e.code == 503:
self._backoff_until = time.time() + 60
logger.warning(f"MeshMonitor {endpoint}: Service unavailable (503)")
self._last_error = "Service unavailable (503)"
else:
logger.warning(f"MeshMonitor {endpoint}: HTTP {e.code} {e.reason}")
self._last_error = f"HTTP {e.code} on {endpoint}"
return None
except URLError as e:
logger.warning(f"MeshMonitor {endpoint}: Connection error - {e.reason}")
self._last_error = f"Connection error: {e.reason}"
return None
except json.JSONDecodeError as e:
logger.warning(f"MeshMonitor {endpoint}: Invalid JSON - {e}")
self._last_error = f"Invalid JSON from {endpoint}"
return None
except Exception as e:
logger.warning(f"MeshMonitor {endpoint}: {e}")
self._last_error = str(e)
return None
def _fetch_nodes(self) -> bool:
"""Full node refresh."""
data = self._fetch_with_tracking("/api/v1/nodes")
if data is not None:
self._nodes = data if isinstance(data, list) else []
self._data_changed = True
logger.debug(f"MeshMonitor nodes: {len(self._nodes)}")
return True
return False
def _fetch_channels(self) -> bool:
"""Channels refresh."""
data = self._fetch_with_tracking("/api/v1/channels")
if data is not None:
self._channels = data if isinstance(data, list) else []
self._data_changed = True
logger.debug(f"MeshMonitor channels: {len(self._channels)}")
return True
return False
def _fetch_telemetry_incremental(self) -> bool:
"""Incremental telemetry fetch."""
if self._last_telemetry_timestamp > 0:
data = self._fetch_with_tracking(
f"/api/v1/telemetry?since={int(self._last_telemetry_timestamp)}&limit=500"
)
if data is None:
data = self._fetch_with_tracking("/api/v1/telemetry?limit=500")
else:
data = self._fetch_with_tracking("/api/v1/telemetry?limit=5000")
if data is None:
return False
new_telemetry = data if isinstance(data, list) else []
if new_telemetry:
existing_keys = set()
for t in self._telemetry:
key = (t.get("nodeNum"), t.get("telemetryType"), t.get("timestamp"))
existing_keys.add(key)
added = 0
for t in new_telemetry:
key = (t.get("nodeNum"), t.get("telemetryType"), t.get("timestamp"))
if key not in existing_keys:
self._telemetry.append(t)
existing_keys.add(key)
added += 1
ts = t.get("timestamp", 0)
if isinstance(ts, (int, float)) and ts > self._last_telemetry_timestamp:
self._last_telemetry_timestamp = ts
if added > 0:
self._data_changed = True
logger.debug(f"MeshMonitor telemetry: +{added} new ({len(self._telemetry)} total)")
# Cap to prevent unbounded growth
if len(self._telemetry) > 10000:
self._telemetry = self._telemetry[-10000:]
return True
def _fetch_traceroutes(self) -> bool:
"""Traceroutes refresh."""
data = self._fetch_with_tracking("/api/v1/traceroutes?limit=1000")
if data is None:
data = self._fetch_with_tracking("/api/v1/traceroutes")
if data is not None:
self._traceroutes = data if isinstance(data, list) else []
self._data_changed = True
logger.debug(f"MeshMonitor traceroutes: {len(self._traceroutes)}")
return True
return False
def _fetch_network(self) -> bool:
"""Network stats refresh."""
data = self._fetch_with_tracking("/api/v1/network")
if data is not None:
self._network_stats = data if isinstance(data, dict) else None
self._data_changed = True
logger.debug("MeshMonitor: fetched network stats")
return True
return False
def _fetch_topology(self) -> bool:
"""Topology refresh."""
data = self._fetch_with_tracking("/api/v1/network/topology")
if data is not None:
self._topology = data if isinstance(data, dict) else None
self._data_changed = True
logger.debug("MeshMonitor: fetched topology")
return True
return False
def _fetch_packets_incremental(self) -> bool:
"""Incremental packet fetch."""
if self._last_packet_timestamp > 0:
data = self._fetch_with_tracking(
f"/api/v1/packets?since={int(self._last_packet_timestamp)}&limit=500"
)
if data is None:
data = self._fetch_with_tracking("/api/v1/packets?limit=500")
else:
data = self._fetch_with_tracking("/api/v1/packets?limit=5000")
if data is None:
return False
new_packets = data if isinstance(data, list) else []
if new_packets:
existing_ids = {p.get("packet_id") or p.get("id") for p in self._packets}
added = 0
for pkt in new_packets:
pkt_id = pkt.get("packet_id") or pkt.get("id")
if pkt_id and pkt_id not in existing_ids:
self._packets.append(pkt)
existing_ids.add(pkt_id)
added += 1
pkt_time = pkt.get("timestamp") or pkt.get("createdAt") or 0
if isinstance(pkt_time, (int, float)):
if pkt_time > 1e12:
pkt_time = pkt_time / 1000
if pkt_time > self._last_packet_timestamp:
self._last_packet_timestamp = pkt_time
if added > 0:
self._data_changed = True
logger.debug(f"MeshMonitor packets: +{added} new ({len(self._packets)} total)")
# Cap to prevent unbounded growth
if len(self._packets) > 5000:
self._packets = self._packets[-5000:]
return True
def _fetch_solar(self) -> bool:
"""Solar estimates refresh."""
data = self._fetch_with_tracking("/api/v1/solar")
if data is not None:
self._solar = data if isinstance(data, list) else []
self._data_changed = True
logger.debug(f"MeshMonitor solar: {len(self._solar)}")
return True
return False
def fetch_all(self) -> bool:
"""Fetch all data at once. Used for initial load and force refresh."""
success_count = 0
errors = []
# Fetch nodes
if self._fetch_nodes():
success_count += 1
else:
errors.append("nodes")
# Fetch channels
if self._fetch_channels():
success_count += 1
else:
errors.append("channels")
# Fetch telemetry
if self._fetch_telemetry_incremental():
success_count += 1
else:
errors.append("telemetry")
# Fetch traceroutes
if self._fetch_traceroutes():
success_count += 1
else:
errors.append("traceroutes")
# Fetch network stats
if self._fetch_network():
success_count += 1
else:
errors.append("network")
# Fetch topology
if self._fetch_topology():
success_count += 1
else:
errors.append("topology")
# Fetch packets
if self._fetch_packets_incremental():
success_count += 1
else:
errors.append("packets")
# Fetch solar
if self._fetch_solar():
success_count += 1
else:
errors.append("solar")
self._last_tick = time.time()
if success_count > 0:
self._is_loaded = True
self._last_error = None
logger.info(
f"MeshMonitor refresh: {len(self._nodes)} nodes, "
f"{len(self._telemetry)} telemetry, {len(self._traceroutes)} traceroutes"
)
return True
else:
self._last_error = f"All endpoints failed: {', '.join(errors)}"
logger.error(f"MeshMonitor: {self._last_error}")
return False
def maybe_refresh(self) -> bool:
"""Backward compatible refresh check. Now delegates to tick()."""
now = time.time()
if now - self._last_tick >= self._tick_interval:
result = self.tick()
return result is not None
return False

View file

@ -1,193 +1,442 @@
"""Meshview API data source."""
import json
import logging
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from .. import __version__
logger = logging.getLogger(__name__)
USER_AGENT = f"MeshAI/{__version__}"
class MeshviewSource:
"""Fetches mesh data from a Meshview instance."""
def __init__(self, url: str, refresh_interval: int = 300):
"""Initialize Meshview source.
Args:
url: Base URL of Meshview instance (e.g., https://meshview.example.com)
refresh_interval: Seconds between refresh checks (default 5 minutes)
"""
self._url = url.rstrip("/")
self._refresh_interval = refresh_interval
self._nodes: list[dict] = []
self._edges: list[dict] = []
self._stats: Optional[dict | list] = None
self._counts: Optional[dict] = None
self._last_refresh: float = 0.0
self._last_error: Optional[str] = None
self._is_loaded: bool = False
@property
def nodes(self) -> list[dict]:
"""Get cached nodes list."""
return self._nodes
@property
def edges(self) -> list[dict]:
"""Get cached edges list."""
return self._edges
@property
def stats(self) -> Optional[dict | list]:
"""Get cached stats."""
return self._stats
@property
def counts(self) -> Optional[dict]:
"""Get cached counts."""
return self._counts
@property
def last_refresh(self) -> float:
"""Get last refresh timestamp (epoch)."""
return self._last_refresh
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
@property
def is_loaded(self) -> bool:
"""Check if data has been successfully loaded."""
return self._is_loaded
def _fetch_json(self, endpoint: str) -> Optional[dict | list]:
"""Fetch JSON from an endpoint.
Args:
endpoint: API endpoint path (e.g., /api/nodes)
Returns:
Parsed JSON data or None on error
"""
url = f"{self._url}{endpoint}"
headers = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"Meshview {endpoint}: HTTP {e.code} {e.reason}")
return None
except URLError as e:
logger.warning(f"Meshview {endpoint}: Connection error - {e.reason}")
return None
except json.JSONDecodeError as e:
logger.warning(f"Meshview {endpoint}: Invalid JSON - {e}")
return None
except Exception as e:
logger.warning(f"Meshview {endpoint}: {e}")
return None
def _extract_list(self, data: dict | list | None, key: str) -> list[dict]:
"""Extract a list from API response, handling wrapper dicts.
Args:
data: Raw API response (may be list or {"key": [...]})
key: Expected key if response is wrapped
Returns:
Extracted list or empty list
"""
if data is None:
return []
if isinstance(data, list):
return data
if isinstance(data, dict) and key in data:
inner = data[key]
return inner if isinstance(inner, list) else []
return []
def fetch_all(self) -> bool:
"""Fetch all data from Meshview API.
Fetches nodes, edges, stats, and counts independently.
One failure doesn't block others.
Returns:
True if at least one endpoint succeeded
"""
success_count = 0
errors = []
# Fetch nodes - response is {"nodes": [...]}
data = self._fetch_json("/api/nodes")
if data is not None:
self._nodes = self._extract_list(data, "nodes")
success_count += 1
logger.debug(f"Meshview: fetched {len(self._nodes)} nodes")
else:
errors.append("nodes")
# Fetch edges - response is {"edges": [...]}
data = self._fetch_json("/api/edges")
if data is not None:
self._edges = self._extract_list(data, "edges")
success_count += 1
logger.debug(f"Meshview: fetched {len(self._edges)} edges")
else:
errors.append("edges")
# Fetch stats (24h hourly)
data = self._fetch_json("/api/stats?period_type=hour&length=24")
if data is not None:
self._stats = data
success_count += 1
logger.debug("Meshview: fetched stats")
else:
errors.append("stats")
# Fetch counts
data = self._fetch_json("/api/stats/count")
if data is not None:
self._counts = data if isinstance(data, dict) else None
success_count += 1
logger.debug("Meshview: fetched counts")
else:
errors.append("counts")
# Update state
self._last_refresh = time.time()
if success_count > 0:
self._is_loaded = True
self._last_error = None
logger.info(
f"Meshview refresh: {len(self._nodes)} nodes, {len(self._edges)} edges"
)
return True
else:
self._last_error = f"All endpoints failed: {', '.join(errors)}"
logger.error(f"Meshview: {self._last_error}")
return False
def maybe_refresh(self) -> bool:
"""Refresh data if interval has elapsed.
Returns:
True if refresh was performed
"""
if time.time() - self._last_refresh >= self._refresh_interval:
return self.fetch_all()
return False
"""Meshview API data source with tick-based staggered polling."""
import json
import logging
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from .. import __version__
logger = logging.getLogger(__name__)
USER_AGENT = f"MeshAI/{__version__}"
class MeshviewSource:
"""Fetches mesh data from a Meshview instance with staggered polling."""
# Endpoint schedule: (endpoint, interval_ticks)
# At 30s per tick: 1 tick = 30s, 4 ticks = 2min, 6 ticks = 3min, 10 ticks = 5min
ENDPOINT_SCHEDULE = [
("packets", 1), # Every tick (30s) — near real-time
("nodes", 4), # Every 4 ticks (2 min)
("stats", 6), # Every 6 ticks (3 min)
("edges", 6), # Every 6 ticks (3 min)
("counts", 8), # Every 8 ticks (4 min)
("traceroutes", 10), # Every 10 ticks (5 min)
]
def __init__(self, url: str, refresh_interval: int = 30, polite_mode: bool = False):
"""Initialize Meshview source.
Args:
url: Base URL of Meshview instance (e.g., https://meshview.example.com)
refresh_interval: Seconds between ticks (default 30)
polite_mode: If True, skip frequent packet polling for shared instances
"""
self._url = url.rstrip("/")
self._tick_interval = refresh_interval
self._polite_mode = polite_mode
# Cached data
self._nodes: list[dict] = []
self._edges: list[dict] = []
self._stats: Optional[dict | list] = None
self._counts: Optional[dict] = None
self._packets: list[dict] = []
self._traceroutes: list[dict] = []
# Tick state
self._tick_count: int = 0
self._last_tick: float = 0.0
self._last_packet_timestamp: float = 0.0 # For incremental packet fetch
# Rate limit / health tracking
self._backoff_until: float = 0.0
self._avg_response_ms: float = 0.0
self._consecutive_errors: int = 0
self._max_consecutive_errors: int = 5
# Status
self._last_error: Optional[str] = None
self._is_loaded: bool = False
self._data_changed: bool = False
# Capabilities (discovered on first fetch)
self._capabilities: dict = {
"packets": True,
"packets_since": False,
"traceroutes": True,
}
self._capabilities_probed: bool = False
@property
def nodes(self) -> list[dict]:
"""Get cached nodes list."""
return self._nodes
@property
def edges(self) -> list[dict]:
"""Get cached edges list."""
return self._edges
@property
def stats(self) -> Optional[dict | list]:
"""Get cached stats."""
return self._stats
@property
def counts(self) -> Optional[dict]:
"""Get cached counts."""
return self._counts
@property
def packets(self) -> list[dict]:
"""Get cached packets."""
return self._packets
@property
def traceroutes(self) -> list[dict]:
"""Get cached traceroutes."""
return self._traceroutes
@property
def last_refresh(self) -> float:
"""Get last tick timestamp (epoch)."""
return self._last_tick
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
@property
def is_loaded(self) -> bool:
"""Check if data has been successfully loaded."""
return self._is_loaded
@property
def data_changed(self) -> bool:
"""Check if data has changed since last check, then reset flag."""
changed = self._data_changed
self._data_changed = False
return changed
@property
def health_status(self) -> dict:
"""Get source health status for monitoring."""
return {
"url": self._url,
"is_loaded": self._is_loaded,
"last_error": self._last_error,
"avg_response_ms": round(self._avg_response_ms),
"consecutive_errors": self._consecutive_errors,
"backed_off": time.time() < self._backoff_until,
"tick_count": self._tick_count,
"cached_packets": len(self._packets),
"cached_nodes": len(self._nodes),
"polite_mode": self._polite_mode,
}
def tick(self) -> Optional[str]:
"""Execute one polling tick. Called every 30 seconds.
Returns:
Name of endpoint that was fetched, or None if skipped/rate-limited
"""
now = time.time()
# Rate limit backoff
if now < self._backoff_until:
return None
# Consecutive error backoff (exponential)
if self._consecutive_errors >= self._max_consecutive_errors:
backoff = min(300, 30 * (2 ** (self._consecutive_errors - self._max_consecutive_errors)))
if now - self._last_tick < backoff:
return None
self._tick_count += 1
self._last_tick = now
# Determine which endpoint to call this tick
endpoint = self._select_endpoint()
if not endpoint:
return None
# Execute the call
success = False
if endpoint == "packets":
success = self._fetch_packets_incremental()
elif endpoint == "nodes":
success = self._fetch_nodes()
elif endpoint == "edges":
success = self._fetch_edges()
elif endpoint == "stats":
success = self._fetch_stats()
elif endpoint == "counts":
success = self._fetch_counts()
elif endpoint == "traceroutes":
success = self._fetch_traceroutes()
if success:
self._consecutive_errors = 0
self._is_loaded = True
self._last_error = None
else:
self._consecutive_errors += 1
return endpoint if success else None
def _select_endpoint(self) -> Optional[str]:
"""Select which endpoint to call on this tick based on schedule."""
# In polite mode, skip frequent packet polling
schedule = self.ENDPOINT_SCHEDULE
if self._polite_mode:
schedule = [(ep, interval) for ep, interval in schedule if ep != "packets"]
# Find the highest-priority endpoint that's due
for endpoint, interval_ticks in schedule:
if self._tick_count % interval_ticks == 0:
# Skip if capability check failed
if endpoint in ("packets", "traceroutes") and not self._capabilities.get(endpoint, True):
continue
return endpoint
# If nothing is scheduled this tick, check packets (always welcome)
if not self._polite_mode and self._capabilities.get("packets", True):
return "packets"
return None
def _fetch_with_tracking(self, endpoint: str) -> Optional[dict | list]:
"""Fetch JSON with response time tracking and rate limit detection."""
url = f"{self._url}{endpoint}"
headers = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
start = time.time()
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
elapsed_ms = (time.time() - start) * 1000
# Track response time (rolling average)
self._avg_response_ms = (self._avg_response_ms * 0.8) + (elapsed_ms * 0.2)
# Warn if slowing down
if elapsed_ms > 5000:
logger.warning(
f"Meshview {self._url} slow: {elapsed_ms:.0f}ms on {endpoint}"
)
data = json.loads(resp.read().decode("utf-8"))
return data
except HTTPError as e:
if e.code == 429:
retry_after = int(e.headers.get('Retry-After', 60))
self._backoff_until = time.time() + retry_after
logger.warning(
f"Rate limited by {self._url}. Backing off {retry_after}s"
)
self._last_error = f"Rate limited (429), backing off {retry_after}s"
elif e.code == 503:
self._backoff_until = time.time() + 60
logger.warning(f"Meshview {endpoint}: Service unavailable (503)")
self._last_error = "Service unavailable (503)"
elif e.code == 404:
# Endpoint doesn't exist — disable it
if "packets" in endpoint:
self._capabilities["packets"] = False
elif "traceroutes" in endpoint:
self._capabilities["traceroutes"] = False
logger.info(f"Meshview {endpoint}: Not available (404)")
self._last_error = f"Endpoint not available: {endpoint}"
else:
logger.warning(f"Meshview {endpoint}: HTTP {e.code}")
self._last_error = f"HTTP {e.code} on {endpoint}"
return None
except URLError as e:
logger.warning(f"Meshview {endpoint}: {e.reason}")
self._last_error = f"Connection error: {e.reason}"
return None
except Exception as e:
logger.warning(f"Meshview {endpoint}: {e}")
self._last_error = str(e)
return None
def _extract_list(self, data: dict | list | None, key: str) -> list[dict]:
"""Extract a list from API response, handling wrapper dicts."""
if data is None:
return []
if isinstance(data, list):
return data
if isinstance(data, dict) and key in data:
inner = data[key]
return inner if isinstance(inner, list) else []
return []
def _fetch_packets_incremental(self) -> bool:
"""Fetch only NEW packets since last call.
Uses the ?since= parameter if the Meshview API supports it.
Falls back to fetching recent packets and deduping locally.
"""
# Try incremental first
if self._last_packet_timestamp > 0 and self._capabilities.get("packets_since", False):
since_us = int(self._last_packet_timestamp * 1_000_000)
data = self._fetch_with_tracking(f"/api/packets?since={since_us}")
if data is None:
# Fall back to getting recent packets without since parameter
data = self._fetch_with_tracking("/api/packets?limit=100")
elif self._last_packet_timestamp > 0:
# No since support, get recent packets
data = self._fetch_with_tracking("/api/packets?limit=100")
else:
# First fetch — get recent packets and probe since capability
data = self._fetch_with_tracking("/api/packets?limit=200")
if data is not None:
# Try probing since capability
since_probe = self._fetch_with_tracking(f"/api/packets?since={int(time.time() * 1_000_000 - 60_000_000)}&limit=1")
if since_probe is not None:
self._capabilities["packets_since"] = True
if data is None:
return False
new_packets = self._extract_list(data, "packets")
if new_packets:
# Dedup against existing packets
existing_ids = {p.get("packet_id") or p.get("id") for p in self._packets}
added = 0
for pkt in new_packets:
pkt_id = pkt.get("packet_id") or pkt.get("id")
if pkt_id and pkt_id not in existing_ids:
self._packets.append(pkt)
existing_ids.add(pkt_id)
added += 1
# Track latest timestamp for next incremental
pkt_time = pkt.get("timestamp") or pkt.get("import_time") or 0
if isinstance(pkt_time, (int, float)):
# Handle microsecond timestamps
if pkt_time > 1e15:
pkt_time = pkt_time / 1_000_000
if pkt_time > self._last_packet_timestamp:
self._last_packet_timestamp = pkt_time
if added > 0:
self._data_changed = True
logger.debug(f"Meshview packets: +{added} new ({len(self._packets)} total)")
# Cap cached packets to prevent unbounded growth (keep last 2000)
if len(self._packets) > 2000:
self._packets = self._packets[-2000:]
return True
def _fetch_nodes(self) -> bool:
"""Full node refresh (replaces cached nodes)."""
data = self._fetch_with_tracking("/api/nodes")
if data is not None:
self._nodes = self._extract_list(data, "nodes")
self._data_changed = True
logger.debug(f"Meshview nodes: {len(self._nodes)}")
return True
return False
def _fetch_edges(self) -> bool:
"""Full edge refresh."""
data = self._fetch_with_tracking("/api/edges")
if data is not None:
self._edges = self._extract_list(data, "edges")
self._data_changed = True
logger.debug(f"Meshview edges: {len(self._edges)}")
return True
return False
def _fetch_stats(self) -> bool:
"""Stats refresh."""
data = self._fetch_with_tracking("/api/stats?period_type=hour&length=24")
if data is not None:
self._stats = data
self._data_changed = True
return True
return False
def _fetch_counts(self) -> bool:
"""Counts refresh."""
data = self._fetch_with_tracking("/api/stats/count")
if data is not None:
self._counts = data if isinstance(data, dict) else None
self._data_changed = True
return True
return False
def _fetch_traceroutes(self) -> bool:
"""Traceroute refresh."""
data = self._fetch_with_tracking("/api/traceroutes")
if data is not None:
self._traceroutes = self._extract_list(data, "traceroutes")
self._data_changed = True
return True
return False
def fetch_all(self) -> bool:
"""Fetch all data at once. Used for initial load and force refresh."""
success = 0
# Nodes first
if self._fetch_nodes():
success += 1
# Edges
if self._fetch_edges():
success += 1
# Stats
if self._fetch_stats():
success += 1
# Counts
if self._fetch_counts():
success += 1
# Packets (if supported)
if self._capabilities.get("packets", True):
if self._fetch_packets_incremental():
success += 1
# Traceroutes (if supported)
if self._capabilities.get("traceroutes", True):
if self._fetch_traceroutes():
success += 1
self._last_tick = time.time()
if success > 0:
self._is_loaded = True
self._last_error = None
logger.info(
f"Meshview refresh: {len(self._nodes)} nodes, {len(self._edges)} edges, {len(self._packets)} packets"
)
return True
else:
self._last_error = "All endpoints failed"
logger.error(f"Meshview: {self._last_error}")
return False
def maybe_refresh(self) -> bool:
"""Backward compatible refresh check. Now delegates to tick()."""
now = time.time()
if now - self._last_tick >= self._tick_interval:
result = self.tick()
return result is not None
return False