diff --git a/meshai/config.py b/meshai/config.py index 83547d1..ce6c9af 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -145,7 +145,8 @@ class MeshMonitorConfig: enabled: bool = False url: str = "" # e.g., http://100.64.0.11:3333 inject_into_prompt: bool = True # Tell LLM about MeshMonitor commands - refresh_interval: int = 300 # Seconds between refreshes + refresh_interval: int = 30 # Tick interval in seconds (default 30) + polite_mode: bool = False # Reduces polling frequency for shared instances # Seconds between refreshes @dataclass @@ -165,7 +166,8 @@ class MeshSourceConfig: type: str = "" # "meshview" or "meshmonitor" url: str = "" api_token: str = "" # MeshMonitor only, supports ${ENV_VAR} - refresh_interval: int = 300 + refresh_interval: int = 30 # Tick interval in seconds (default 30) + polite_mode: bool = False # Reduces polling frequency for shared instances enabled: bool = True diff --git a/meshai/mesh_data_store.py b/meshai/mesh_data_store.py index 30d48f8..da03074 100644 --- a/meshai/mesh_data_store.py +++ b/meshai/mesh_data_store.py @@ -298,19 +298,23 @@ class MeshDataStore: try: if src_type == "meshview": + polite = getattr(cfg, 'polite_mode', False) if not isinstance(cfg, dict) else cfg.get('polite_mode', False) self._sources[name] = MeshviewSource( url=url, refresh_interval=refresh_interval, + polite_mode=polite, ) - logger.info(f"Registered Meshview source '{name}' -> {url}") + logger.info(f"Registered Meshview source '{name}' -> {url} (polite={polite})") elif src_type == "meshmonitor": + polite = getattr(cfg, 'polite_mode', False) if not isinstance(cfg, dict) else cfg.get('polite_mode', False) self._sources[name] = MeshMonitorDataSource( url=url, api_token=api_token, refresh_interval=refresh_interval, + polite_mode=polite, ) - logger.info(f"Registered MeshMonitor source '{name}' -> {url}") + logger.info(f"Registered MeshMonitor source '{name}' -> {url} (polite={polite})") else: logger.warning(f"Unknown source type '{src_type}' for '{name}'") @@ -380,16 +384,53 @@ class MeshDataStore: logger.info(f"Purged {len(stale_nums)} stale nodes (not heard in {STALE_NODE_THRESHOLD_DAYS} days)") def refresh(self) -> bool: - """Refresh data from all sources if interval has elapsed. + """Tick-based refresh. Called every second from the main loop. + + Delegates to source tick() for sources that support it. + Only does a full rebuild when nodes/edges/topology change. + Only does a lightweight update when only packets change. Returns: - True if any source was refreshed + True if any data changed """ now = time.time() - if now - self._last_refresh < self._refresh_interval: - return False + any_changed = False + needs_rebuild = False + needs_packet_update = False - return self._do_refresh() + for name, source in self._sources.items(): + # Check if this source supports tick-based polling + if hasattr(source, 'tick') and hasattr(source, '_tick_interval'): + if now - source._last_tick >= source._tick_interval: + endpoint = source.tick() + if endpoint: + any_changed = True + # Major changes require full rebuild + if endpoint in ("nodes", "edges", "traceroutes", "topology", "telemetry"): + needs_rebuild = True + # Packet-only changes are lightweight + elif endpoint in ("packets",): + needs_packet_update = True + # stats, counts, channels, solar, network just update cached data + else: + # Legacy fallback for sources without tick support + if source.maybe_refresh(): + any_changed = True + needs_rebuild = True + + if needs_rebuild: + self._rebuild() + self._purge_stale_nodes() + self._store_snapshot() + self._purge_old_data() + self._last_refresh = now + self._is_loaded = True + elif needs_packet_update: + # Lightweight: just update packet-derived metrics without full rebuild + self._update_packet_metrics() + self._last_refresh = now + + return any_changed def force_refresh(self) -> bool: """Force an immediate refresh, bypassing the interval timer. @@ -407,6 +448,50 @@ class MeshDataStore: self._last_force_refresh = now return self._do_refresh(force=True) + + def _update_packet_metrics(self): + """Lightweight update when only new packets arrived. + + Updates: + - Per-node packet counts + - Top senders + - Deliverability (source overlap) + Does NOT rebuild the full node model. + """ + # Recount packets per node from all sources + packet_counts: dict[int, int] = {} + packets_by_type: dict[int, dict[str, int]] = {} + + for name, source in self._sources.items(): + for pkt in (source.packets if hasattr(source, 'packets') else []): + from_node = pkt.get("from_node") or pkt.get("from_node_id") or pkt.get("from") + if from_node is None: + continue + + # Normalize to int + if isinstance(from_node, str): + stripped = from_node.lstrip("!") + try: + from_node = int(stripped, 16) if not stripped.isdigit() else int(stripped) + except ValueError: + continue + + packet_counts[from_node] = packet_counts.get(from_node, 0) + 1 + + portnum = pkt.get("portnum") or pkt.get("portnum_name") or pkt.get("type") or "UNKNOWN" + if from_node not in packets_by_type: + packets_by_type[from_node] = {} + packets_by_type[from_node][portnum] = packets_by_type[from_node].get(portnum, 0) + 1 + + # Update nodes + for node_num, count in packet_counts.items(): + if node_num in self._nodes: + self._nodes[node_num].packets_sent_24h = count + if node_num in packets_by_type: + self._nodes[node_num].packets_by_type = packets_by_type[node_num] + + logger.debug(f"Packet metrics updated: {sum(packet_counts.values())} packets across {len(packet_counts)} nodes") + def _do_refresh(self, force: bool = False) -> bool: """Perform the actual refresh. @@ -2167,6 +2252,53 @@ class MeshDataStore: result.append(ch_dict) return result + + def get_source_coverage(self) -> dict: + """Get per-source node coverage. + + Returns: + { + "meshview-local": {"node_count": 200, "unique_nodes": 50, "shared_nodes": 150}, + "meshview-freq51": {"node_count": 800, "unique_nodes": 400, "shared_nodes": 400}, + ... + } + """ + coverage = {} + for name in self._sources: + nodes_in_source = [n for n in self._nodes.values() if name in n.sources] + unique = [n for n in nodes_in_source if len(n.sources) == 1] + coverage[name] = { + "node_count": len(nodes_in_source), + "unique_nodes": len(unique), + "shared_nodes": len(nodes_in_source) - len(unique), + } + return coverage + + def get_nodes_by_source(self, source_name: str) -> list: + """Get all nodes visible to a specific source.""" + return [n for n in self._nodes.values() if source_name in n.sources] + + def get_exclusive_nodes(self, source_name: str) -> list: + """Get nodes ONLY visible to this source (not seen by any other).""" + return [n for n in self._nodes.values() if n.sources == [source_name]] + + def get_source_health(self) -> list[dict]: + """Get health status of all sources.""" + health = [] + for name, source in self._sources.items(): + if hasattr(source, 'health_status'): + status = source.health_status + status["name"] = name + health.append(status) + else: + health.append({ + "name": name, + "is_loaded": source.is_loaded, + "last_error": source.last_error, + "cached_nodes": len(source.nodes), + }) + return health + def close(self) -> None: """Close database connection.""" if self._db: diff --git a/meshai/mesh_reporter.py b/meshai/mesh_reporter.py index a6911a9..ffdf899 100644 --- a/meshai/mesh_reporter.py +++ b/meshai/mesh_reporter.py @@ -36,8 +36,9 @@ PORTNUM_DISPLAY = { "ATAK_FORWARDER": "ATAK", } -def _clean_portnum(portnum: str) -> str: +def _clean_portnum(portnum) -> str: """Convert raw portnum to display name.""" + if isinstance(portnum, int): portnum = str(portnum) return PORTNUM_DISPLAY.get(portnum, portnum.replace("_APP", "").replace("_", " ").title()) @@ -175,6 +176,34 @@ class MeshReporter: return f"{local} ({desc})" return local or desc + + def _build_source_health_section(self) -> list[str]: + """Build source health section for Tier 1.""" + lines = [] + lines.append("") + lines.append("DATA SOURCES:") + + for name, source in self.data_store._sources.items(): + if hasattr(source, 'health_status'): + status = source.health_status + err_str = f" - {status['last_error']}" if status.get('last_error') else "" + backed = " [BACKED OFF]" if status.get('backed_off') else "" + polite = " [POLITE]" if status.get('polite_mode') else "" + lines.append( + f" {name}: {status.get('cached_nodes', 0)} nodes, " + f"{status.get('cached_packets', 0)} pkts, " + f"avg {status.get('avg_response_ms', 0)}ms" + f"{polite}{backed}{err_str}" + ) + else: + # Legacy source without health_status + node_count = len(source.nodes) if hasattr(source, 'nodes') else 0 + loaded = "OK" if source.is_loaded else "ERR" + err = f" - {source.last_error}" if source.last_error else "" + lines.append(f" {name}: [{loaded}] {node_count} nodes{err}") + + return lines + def build_tier1_summary(self) -> str: """Build comprehensive mesh health summary with full data for LLM context.""" health = self.health_engine.mesh_health @@ -428,6 +457,9 @@ class MeshReporter: if pb["critical"]: parts.append(f"{pb['critical']} battery critical") lines.append(f"POWER (infra): {', '.join(parts)}") + # Source health section + lines.extend(self._build_source_health_section()) + lines.append("") lines.append(f"TOTAL: {health.total_nodes} nodes across {health.total_regions} regions.") @@ -899,6 +931,10 @@ class MeshReporter: status = "Single gateway - node goes dark if that gateway fails" lines.append(f" Coverage: {node.avg_gateways:.0f}/{total_gw} gateways ({pct:.0f}%) - {status}") + # Source visibility + if node.sources: + lines.append(f" Seen by: {', '.join(node.sources)} ({len(node.sources)} sources)") + # Neighbors section if node.neighbors: lines.append("") diff --git a/meshai/sources/meshmonitor_data.py b/meshai/sources/meshmonitor_data.py index 83fe0fa..b838ca5 100644 --- a/meshai/sources/meshmonitor_data.py +++ b/meshai/sources/meshmonitor_data.py @@ -1,270 +1,515 @@ -"""MeshMonitor API data source.""" - -import json -import logging -import os -import time -from typing import Optional -from urllib.error import HTTPError, URLError -from urllib.request import Request, urlopen - -from .. import __version__ - -logger = logging.getLogger(__name__) - -USER_AGENT = f"MeshAI/{__version__}" - - -class MeshMonitorDataSource: - """Fetches mesh data from a MeshMonitor instance.""" - - def __init__(self, url: str, api_token: str, refresh_interval: int = 300): - """Initialize MeshMonitor data source. - - Args: - url: Base URL of MeshMonitor instance (e.g., http://192.168.1.100:3333) - api_token: API token for authentication. Supports ${ENV_VAR} format. - refresh_interval: Seconds between refresh checks (default 5 minutes) - """ - self._url = url.rstrip("/") - self._api_token = self._resolve_token(api_token) - self._refresh_interval = refresh_interval - - # Cached data - self._nodes: list[dict] = [] - self._channels: list[dict] = [] - self._telemetry: list[dict] = [] - self._traceroutes: list[dict] = [] - self._network_stats: Optional[dict] = None - self._topology: Optional[dict] = None - self._packets: list[dict] = [] - self._solar: list[dict] = [] - - self._last_refresh: float = 0.0 - self._last_error: Optional[str] = None - self._is_loaded: bool = False - - def _resolve_token(self, token: str) -> str: - """Resolve token, supporting ${ENV_VAR} format. - - Args: - token: API token or env var reference - - Returns: - Resolved token value - """ - if token.startswith("${") and token.endswith("}"): - env_var = token[2:-1] - return os.environ.get(env_var, "") - return token - - @property - def nodes(self) -> list[dict]: - """Get cached nodes list.""" - return self._nodes - - @property - def channels(self) -> list[dict]: - """Get cached channels list.""" - return self._channels - - @property - def telemetry(self) -> list[dict]: - """Get cached telemetry list.""" - return self._telemetry - - @property - def traceroutes(self) -> list[dict]: - """Get cached traceroutes list.""" - return self._traceroutes - - @property - def network_stats(self) -> Optional[dict]: - """Get cached network stats.""" - return self._network_stats - - @property - def topology(self) -> Optional[dict]: - """Get cached topology.""" - return self._topology - - @property - def packets(self) -> list[dict]: - """Get cached packets list.""" - return self._packets - - @property - def solar(self) -> list[dict]: - """Get cached solar estimates list.""" - return self._solar - - @property - def last_refresh(self) -> float: - """Get last refresh timestamp (epoch).""" - return self._last_refresh - - @property - def last_error(self) -> Optional[str]: - """Get last error message if any.""" - return self._last_error - - @property - def is_loaded(self) -> bool: - """Check if data has been successfully loaded.""" - return self._is_loaded - - def _fetch_json(self, endpoint: str) -> Optional[dict | list]: - """Fetch JSON from an endpoint with Bearer auth. - - Args: - endpoint: API endpoint path (e.g., /api/v1/nodes) - - Returns: - Parsed JSON data or None on error - """ - url = f"{self._url}{endpoint}" - headers = { - "Accept": "application/json", - "Authorization": f"Bearer {self._api_token}", - "User-Agent": USER_AGENT, - } - try: - req = Request(url, headers=headers) - with urlopen(req, timeout=15) as resp: - data = json.loads(resp.read().decode("utf-8")) - - # MeshMonitor wraps responses in {"success": true, "data": [...]} - # Extract the actual data if wrapped - if isinstance(data, dict) and "data" in data: - return data["data"] - return data - - except HTTPError as e: - logger.warning(f"MeshMonitor {endpoint}: HTTP {e.code} {e.reason}") - return None - except URLError as e: - logger.warning(f"MeshMonitor {endpoint}: Connection error - {e.reason}") - return None - except json.JSONDecodeError as e: - logger.warning(f"MeshMonitor {endpoint}: Invalid JSON - {e}") - return None - except Exception as e: - logger.warning(f"MeshMonitor {endpoint}: {e}") - return None - - def fetch_all(self) -> bool: - """Fetch all data from MeshMonitor API. - - Fetches all endpoints independently. One failure doesn't block others. - - Returns: - True if at least one endpoint succeeded - """ - success_count = 0 - errors = [] - - # Fetch nodes - data = self._fetch_json("/api/v1/nodes") - if data is not None: - self._nodes = data if isinstance(data, list) else [] - success_count += 1 - logger.debug(f"MeshMonitor: fetched {len(self._nodes)} nodes") - else: - errors.append("nodes") - - # Fetch channels - data = self._fetch_json("/api/v1/channels") - if data is not None: - self._channels = data if isinstance(data, list) else [] - success_count += 1 - logger.debug(f"MeshMonitor: fetched {len(self._channels)} channels") - else: - errors.append("channels") - - # Fetch telemetry - BUG 6 FIX: Request more records for 24h coverage - data = self._fetch_json("/api/v1/telemetry?limit=5000") - if data is None: - # Fallback without limit param - data = self._fetch_json("/api/v1/telemetry") - if data is not None: - self._telemetry = data if isinstance(data, list) else [] - success_count += 1 - logger.debug(f"MeshMonitor: fetched {len(self._telemetry)} telemetry records") - else: - errors.append("telemetry") - - # Fetch traceroutes - BUG 6 FIX: Request more records - data = self._fetch_json("/api/v1/traceroutes?limit=1000") - if data is None: - data = self._fetch_json("/api/v1/traceroutes") - if data is not None: - self._traceroutes = data if isinstance(data, list) else [] - success_count += 1 - logger.debug(f"MeshMonitor: fetched {len(self._traceroutes)} traceroutes") - else: - errors.append("traceroutes") - - # Fetch network stats - data = self._fetch_json("/api/v1/network") - if data is not None: - self._network_stats = data if isinstance(data, dict) else None - success_count += 1 - logger.debug("MeshMonitor: fetched network stats") - else: - errors.append("network") - - # Fetch topology - data = self._fetch_json("/api/v1/network/topology") - if data is not None: - self._topology = data if isinstance(data, dict) else None - success_count += 1 - logger.debug("MeshMonitor: fetched topology") - else: - errors.append("topology") - - # Fetch packets - BUG 6 FIX: Request more packets for 24h coverage - data = self._fetch_json("/api/v1/packets?limit=5000") - if data is None: - # Fallback without limit param - data = self._fetch_json("/api/v1/packets") - if data is not None: - self._packets = data if isinstance(data, list) else [] - success_count += 1 - logger.debug(f"MeshMonitor: fetched {len(self._packets)} packets") - else: - errors.append("packets") - - # Fetch solar estimates - data = self._fetch_json("/api/v1/solar") - if data is not None: - self._solar = data if isinstance(data, list) else [] - success_count += 1 - logger.debug(f"MeshMonitor: fetched {len(self._solar)} solar estimates") - else: - errors.append("solar") - - # Update state - self._last_refresh = time.time() - - if success_count > 0: - self._is_loaded = True - self._last_error = None - logger.info( - f"MeshMonitor refresh: {len(self._nodes)} nodes, " - f"{len(self._telemetry)} telemetry, {len(self._traceroutes)} traceroutes" - ) - return True - else: - self._last_error = f"All endpoints failed: {', '.join(errors)}" - logger.error(f"MeshMonitor: {self._last_error}") - return False - - def maybe_refresh(self) -> bool: - """Refresh data if interval has elapsed. - - Returns: - True if refresh was performed - """ - if time.time() - self._last_refresh >= self._refresh_interval: - return self.fetch_all() - return False +"""MeshMonitor API data source with tick-based staggered polling.""" + +import json +import logging +import os +import time +from typing import Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from .. import __version__ + +logger = logging.getLogger(__name__) + +USER_AGENT = f"MeshAI/{__version__}" + + +class MeshMonitorDataSource: + """Fetches mesh data from a MeshMonitor instance with staggered polling.""" + + # Endpoint schedule: (endpoint, interval_ticks) + # At 30s per tick: 2 ticks = 60s, 4 ticks = 2min, 10 ticks = 5min + ENDPOINT_SCHEDULE = [ + ("packets", 2), # Every 2 ticks (60s) + ("nodes", 4), # Every 4 ticks (2 min) + ("telemetry", 4), # Every 4 ticks (2 min) + ("traceroutes", 10), # Every 10 ticks (5 min) + ("channels", 10), # Every 10 ticks (5 min) + ("network", 10), # Every 10 ticks (5 min) + ("topology", 10), # Every 10 ticks (5 min) + ("solar", 20), # Every 20 ticks (10 min) + ] + + def __init__(self, url: str, api_token: str, refresh_interval: int = 30, polite_mode: bool = False): + """Initialize MeshMonitor data source. + + Args: + url: Base URL of MeshMonitor instance (e.g., http://192.168.1.100:3333) + api_token: API token for authentication. Supports ${ENV_VAR} format. + refresh_interval: Seconds between ticks (default 30) + polite_mode: If True, use longer intervals + """ + self._url = url.rstrip("/") + self._api_token = self._resolve_token(api_token) + self._tick_interval = refresh_interval + self._polite_mode = polite_mode + + # Cached data + self._nodes: list[dict] = [] + self._channels: list[dict] = [] + self._telemetry: list[dict] = [] + self._traceroutes: list[dict] = [] + self._network_stats: Optional[dict] = None + self._topology: Optional[dict] = None + self._packets: list[dict] = [] + self._solar: list[dict] = [] + + # Tick state + self._tick_count: int = 0 + self._last_tick: float = 0.0 + self._last_packet_timestamp: float = 0.0 + self._last_telemetry_timestamp: float = 0.0 + + # Rate limit / health tracking + self._backoff_until: float = 0.0 + self._avg_response_ms: float = 0.0 + self._consecutive_errors: int = 0 + self._max_consecutive_errors: int = 5 + + # Status + self._last_error: Optional[str] = None + self._is_loaded: bool = False + self._data_changed: bool = False + + def _resolve_token(self, token: str) -> str: + """Resolve token, supporting ${ENV_VAR} format.""" + if token.startswith("${") and token.endswith("}"): + env_var = token[2:-1] + return os.environ.get(env_var, "") + return token + + @property + def nodes(self) -> list[dict]: + """Get cached nodes list.""" + return self._nodes + + @property + def channels(self) -> list[dict]: + """Get cached channels list.""" + return self._channels + + @property + def telemetry(self) -> list[dict]: + """Get cached telemetry list.""" + return self._telemetry + + @property + def traceroutes(self) -> list[dict]: + """Get cached traceroutes list.""" + return self._traceroutes + + @property + def network_stats(self) -> Optional[dict]: + """Get cached network stats.""" + return self._network_stats + + @property + def topology(self) -> Optional[dict]: + """Get cached topology.""" + return self._topology + + @property + def packets(self) -> list[dict]: + """Get cached packets list.""" + return self._packets + + @property + def solar(self) -> list[dict]: + """Get cached solar estimates list.""" + return self._solar + + @property + def last_refresh(self) -> float: + """Get last tick timestamp (epoch).""" + return self._last_tick + + @property + def last_error(self) -> Optional[str]: + """Get last error message if any.""" + return self._last_error + + @property + def is_loaded(self) -> bool: + """Check if data has been successfully loaded.""" + return self._is_loaded + + @property + def data_changed(self) -> bool: + """Check if data has changed since last check, then reset flag.""" + changed = self._data_changed + self._data_changed = False + return changed + + @property + def health_status(self) -> dict: + """Get source health status for monitoring.""" + return { + "url": self._url, + "is_loaded": self._is_loaded, + "last_error": self._last_error, + "avg_response_ms": round(self._avg_response_ms), + "consecutive_errors": self._consecutive_errors, + "backed_off": time.time() < self._backoff_until, + "tick_count": self._tick_count, + "cached_packets": len(self._packets), + "cached_nodes": len(self._nodes), + "cached_telemetry": len(self._telemetry), + "polite_mode": self._polite_mode, + } + + def tick(self) -> Optional[str]: + """Execute one polling tick. Called every 30 seconds. + + Returns: + Name of endpoint that was fetched, or None if skipped/rate-limited + """ + now = time.time() + + # Rate limit backoff + if now < self._backoff_until: + return None + + # Consecutive error backoff (exponential) + if self._consecutive_errors >= self._max_consecutive_errors: + backoff = min(300, 30 * (2 ** (self._consecutive_errors - self._max_consecutive_errors))) + if now - self._last_tick < backoff: + return None + + self._tick_count += 1 + self._last_tick = now + + # Determine which endpoint to call this tick + endpoint = self._select_endpoint() + if not endpoint: + return None + + # Execute the call + success = False + if endpoint == "packets": + success = self._fetch_packets_incremental() + elif endpoint == "nodes": + success = self._fetch_nodes() + elif endpoint == "telemetry": + success = self._fetch_telemetry_incremental() + elif endpoint == "traceroutes": + success = self._fetch_traceroutes() + elif endpoint == "channels": + success = self._fetch_channels() + elif endpoint == "network": + success = self._fetch_network() + elif endpoint == "topology": + success = self._fetch_topology() + elif endpoint == "solar": + success = self._fetch_solar() + + if success: + self._consecutive_errors = 0 + self._is_loaded = True + self._last_error = None + else: + self._consecutive_errors += 1 + + return endpoint if success else None + + def _select_endpoint(self) -> Optional[str]: + """Select which endpoint to call on this tick based on schedule.""" + schedule = self.ENDPOINT_SCHEDULE + + # In polite mode, double the intervals + if self._polite_mode: + schedule = [(ep, interval * 2) for ep, interval in schedule] + + # Find the highest-priority endpoint that's due + for endpoint, interval_ticks in schedule: + if self._tick_count % interval_ticks == 0: + return endpoint + + return None + + def _fetch_with_tracking(self, endpoint: str) -> Optional[dict | list]: + """Fetch JSON with Bearer auth, response tracking, and rate limit detection.""" + url = f"{self._url}{endpoint}" + headers = { + "Accept": "application/json", + "Authorization": f"Bearer {self._api_token}", + "User-Agent": USER_AGENT, + } + + start = time.time() + try: + req = Request(url, headers=headers) + with urlopen(req, timeout=15) as resp: + elapsed_ms = (time.time() - start) * 1000 + + # Track response time (rolling average) + self._avg_response_ms = (self._avg_response_ms * 0.8) + (elapsed_ms * 0.2) + + # Warn if slowing down + if elapsed_ms > 5000: + logger.warning( + f"MeshMonitor {self._url} slow: {elapsed_ms:.0f}ms on {endpoint}" + ) + + data = json.loads(resp.read().decode("utf-8")) + + # MeshMonitor wraps responses in {"success": true, "data": [...]} + if isinstance(data, dict) and "data" in data: + return data["data"] + return data + + except HTTPError as e: + if e.code == 429: + retry_after = int(e.headers.get('Retry-After', 60)) + self._backoff_until = time.time() + retry_after + logger.warning( + f"Rate limited by MeshMonitor. Backing off {retry_after}s" + ) + self._last_error = f"Rate limited (429), backing off {retry_after}s" + elif e.code == 401: + # Token may be expired + self._backoff_until = time.time() + 300 # Back off 5 min + logger.error("MeshMonitor: API token may be expired or invalid (401)") + self._last_error = "API token expired or invalid (401)" + elif e.code == 503: + self._backoff_until = time.time() + 60 + logger.warning(f"MeshMonitor {endpoint}: Service unavailable (503)") + self._last_error = "Service unavailable (503)" + else: + logger.warning(f"MeshMonitor {endpoint}: HTTP {e.code} {e.reason}") + self._last_error = f"HTTP {e.code} on {endpoint}" + return None + except URLError as e: + logger.warning(f"MeshMonitor {endpoint}: Connection error - {e.reason}") + self._last_error = f"Connection error: {e.reason}" + return None + except json.JSONDecodeError as e: + logger.warning(f"MeshMonitor {endpoint}: Invalid JSON - {e}") + self._last_error = f"Invalid JSON from {endpoint}" + return None + except Exception as e: + logger.warning(f"MeshMonitor {endpoint}: {e}") + self._last_error = str(e) + return None + + def _fetch_nodes(self) -> bool: + """Full node refresh.""" + data = self._fetch_with_tracking("/api/v1/nodes") + if data is not None: + self._nodes = data if isinstance(data, list) else [] + self._data_changed = True + logger.debug(f"MeshMonitor nodes: {len(self._nodes)}") + return True + return False + + def _fetch_channels(self) -> bool: + """Channels refresh.""" + data = self._fetch_with_tracking("/api/v1/channels") + if data is not None: + self._channels = data if isinstance(data, list) else [] + self._data_changed = True + logger.debug(f"MeshMonitor channels: {len(self._channels)}") + return True + return False + + def _fetch_telemetry_incremental(self) -> bool: + """Incremental telemetry fetch.""" + if self._last_telemetry_timestamp > 0: + data = self._fetch_with_tracking( + f"/api/v1/telemetry?since={int(self._last_telemetry_timestamp)}&limit=500" + ) + if data is None: + data = self._fetch_with_tracking("/api/v1/telemetry?limit=500") + else: + data = self._fetch_with_tracking("/api/v1/telemetry?limit=5000") + + if data is None: + return False + + new_telemetry = data if isinstance(data, list) else [] + if new_telemetry: + existing_keys = set() + for t in self._telemetry: + key = (t.get("nodeNum"), t.get("telemetryType"), t.get("timestamp")) + existing_keys.add(key) + + added = 0 + for t in new_telemetry: + key = (t.get("nodeNum"), t.get("telemetryType"), t.get("timestamp")) + if key not in existing_keys: + self._telemetry.append(t) + existing_keys.add(key) + added += 1 + ts = t.get("timestamp", 0) + if isinstance(ts, (int, float)) and ts > self._last_telemetry_timestamp: + self._last_telemetry_timestamp = ts + + if added > 0: + self._data_changed = True + logger.debug(f"MeshMonitor telemetry: +{added} new ({len(self._telemetry)} total)") + + # Cap to prevent unbounded growth + if len(self._telemetry) > 10000: + self._telemetry = self._telemetry[-10000:] + + return True + + def _fetch_traceroutes(self) -> bool: + """Traceroutes refresh.""" + data = self._fetch_with_tracking("/api/v1/traceroutes?limit=1000") + if data is None: + data = self._fetch_with_tracking("/api/v1/traceroutes") + if data is not None: + self._traceroutes = data if isinstance(data, list) else [] + self._data_changed = True + logger.debug(f"MeshMonitor traceroutes: {len(self._traceroutes)}") + return True + return False + + def _fetch_network(self) -> bool: + """Network stats refresh.""" + data = self._fetch_with_tracking("/api/v1/network") + if data is not None: + self._network_stats = data if isinstance(data, dict) else None + self._data_changed = True + logger.debug("MeshMonitor: fetched network stats") + return True + return False + + def _fetch_topology(self) -> bool: + """Topology refresh.""" + data = self._fetch_with_tracking("/api/v1/network/topology") + if data is not None: + self._topology = data if isinstance(data, dict) else None + self._data_changed = True + logger.debug("MeshMonitor: fetched topology") + return True + return False + + def _fetch_packets_incremental(self) -> bool: + """Incremental packet fetch.""" + if self._last_packet_timestamp > 0: + data = self._fetch_with_tracking( + f"/api/v1/packets?since={int(self._last_packet_timestamp)}&limit=500" + ) + if data is None: + data = self._fetch_with_tracking("/api/v1/packets?limit=500") + else: + data = self._fetch_with_tracking("/api/v1/packets?limit=5000") + + if data is None: + return False + + new_packets = data if isinstance(data, list) else [] + if new_packets: + existing_ids = {p.get("packet_id") or p.get("id") for p in self._packets} + added = 0 + for pkt in new_packets: + pkt_id = pkt.get("packet_id") or pkt.get("id") + if pkt_id and pkt_id not in existing_ids: + self._packets.append(pkt) + existing_ids.add(pkt_id) + added += 1 + + pkt_time = pkt.get("timestamp") or pkt.get("createdAt") or 0 + if isinstance(pkt_time, (int, float)): + if pkt_time > 1e12: + pkt_time = pkt_time / 1000 + if pkt_time > self._last_packet_timestamp: + self._last_packet_timestamp = pkt_time + + if added > 0: + self._data_changed = True + logger.debug(f"MeshMonitor packets: +{added} new ({len(self._packets)} total)") + + # Cap to prevent unbounded growth + if len(self._packets) > 5000: + self._packets = self._packets[-5000:] + + return True + + def _fetch_solar(self) -> bool: + """Solar estimates refresh.""" + data = self._fetch_with_tracking("/api/v1/solar") + if data is not None: + self._solar = data if isinstance(data, list) else [] + self._data_changed = True + logger.debug(f"MeshMonitor solar: {len(self._solar)}") + return True + return False + + def fetch_all(self) -> bool: + """Fetch all data at once. Used for initial load and force refresh.""" + success_count = 0 + errors = [] + + # Fetch nodes + if self._fetch_nodes(): + success_count += 1 + else: + errors.append("nodes") + + # Fetch channels + if self._fetch_channels(): + success_count += 1 + else: + errors.append("channels") + + # Fetch telemetry + if self._fetch_telemetry_incremental(): + success_count += 1 + else: + errors.append("telemetry") + + # Fetch traceroutes + if self._fetch_traceroutes(): + success_count += 1 + else: + errors.append("traceroutes") + + # Fetch network stats + if self._fetch_network(): + success_count += 1 + else: + errors.append("network") + + # Fetch topology + if self._fetch_topology(): + success_count += 1 + else: + errors.append("topology") + + # Fetch packets + if self._fetch_packets_incremental(): + success_count += 1 + else: + errors.append("packets") + + # Fetch solar + if self._fetch_solar(): + success_count += 1 + else: + errors.append("solar") + + self._last_tick = time.time() + + if success_count > 0: + self._is_loaded = True + self._last_error = None + logger.info( + f"MeshMonitor refresh: {len(self._nodes)} nodes, " + f"{len(self._telemetry)} telemetry, {len(self._traceroutes)} traceroutes" + ) + return True + else: + self._last_error = f"All endpoints failed: {', '.join(errors)}" + logger.error(f"MeshMonitor: {self._last_error}") + return False + + def maybe_refresh(self) -> bool: + """Backward compatible refresh check. Now delegates to tick().""" + now = time.time() + if now - self._last_tick >= self._tick_interval: + result = self.tick() + return result is not None + return False diff --git a/meshai/sources/meshview.py b/meshai/sources/meshview.py index 559b3c4..4ac23c9 100644 --- a/meshai/sources/meshview.py +++ b/meshai/sources/meshview.py @@ -1,193 +1,442 @@ -"""Meshview API data source.""" - -import json -import logging -import time -from typing import Optional -from urllib.error import HTTPError, URLError -from urllib.request import Request, urlopen - -from .. import __version__ - -logger = logging.getLogger(__name__) - -USER_AGENT = f"MeshAI/{__version__}" - - -class MeshviewSource: - """Fetches mesh data from a Meshview instance.""" - - def __init__(self, url: str, refresh_interval: int = 300): - """Initialize Meshview source. - - Args: - url: Base URL of Meshview instance (e.g., https://meshview.example.com) - refresh_interval: Seconds between refresh checks (default 5 minutes) - """ - self._url = url.rstrip("/") - self._refresh_interval = refresh_interval - self._nodes: list[dict] = [] - self._edges: list[dict] = [] - self._stats: Optional[dict | list] = None - self._counts: Optional[dict] = None - self._last_refresh: float = 0.0 - self._last_error: Optional[str] = None - self._is_loaded: bool = False - - @property - def nodes(self) -> list[dict]: - """Get cached nodes list.""" - return self._nodes - - @property - def edges(self) -> list[dict]: - """Get cached edges list.""" - return self._edges - - @property - def stats(self) -> Optional[dict | list]: - """Get cached stats.""" - return self._stats - - @property - def counts(self) -> Optional[dict]: - """Get cached counts.""" - return self._counts - - @property - def last_refresh(self) -> float: - """Get last refresh timestamp (epoch).""" - return self._last_refresh - - @property - def last_error(self) -> Optional[str]: - """Get last error message if any.""" - return self._last_error - - @property - def is_loaded(self) -> bool: - """Check if data has been successfully loaded.""" - return self._is_loaded - - def _fetch_json(self, endpoint: str) -> Optional[dict | list]: - """Fetch JSON from an endpoint. - - Args: - endpoint: API endpoint path (e.g., /api/nodes) - - Returns: - Parsed JSON data or None on error - """ - url = f"{self._url}{endpoint}" - headers = { - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - try: - req = Request(url, headers=headers) - with urlopen(req, timeout=15) as resp: - return json.loads(resp.read().decode("utf-8")) - except HTTPError as e: - logger.warning(f"Meshview {endpoint}: HTTP {e.code} {e.reason}") - return None - except URLError as e: - logger.warning(f"Meshview {endpoint}: Connection error - {e.reason}") - return None - except json.JSONDecodeError as e: - logger.warning(f"Meshview {endpoint}: Invalid JSON - {e}") - return None - except Exception as e: - logger.warning(f"Meshview {endpoint}: {e}") - return None - - def _extract_list(self, data: dict | list | None, key: str) -> list[dict]: - """Extract a list from API response, handling wrapper dicts. - - Args: - data: Raw API response (may be list or {"key": [...]}) - key: Expected key if response is wrapped - - Returns: - Extracted list or empty list - """ - if data is None: - return [] - if isinstance(data, list): - return data - if isinstance(data, dict) and key in data: - inner = data[key] - return inner if isinstance(inner, list) else [] - return [] - - def fetch_all(self) -> bool: - """Fetch all data from Meshview API. - - Fetches nodes, edges, stats, and counts independently. - One failure doesn't block others. - - Returns: - True if at least one endpoint succeeded - """ - success_count = 0 - errors = [] - - # Fetch nodes - response is {"nodes": [...]} - data = self._fetch_json("/api/nodes") - if data is not None: - self._nodes = self._extract_list(data, "nodes") - success_count += 1 - logger.debug(f"Meshview: fetched {len(self._nodes)} nodes") - else: - errors.append("nodes") - - # Fetch edges - response is {"edges": [...]} - data = self._fetch_json("/api/edges") - if data is not None: - self._edges = self._extract_list(data, "edges") - success_count += 1 - logger.debug(f"Meshview: fetched {len(self._edges)} edges") - else: - errors.append("edges") - - # Fetch stats (24h hourly) - data = self._fetch_json("/api/stats?period_type=hour&length=24") - if data is not None: - self._stats = data - success_count += 1 - logger.debug("Meshview: fetched stats") - else: - errors.append("stats") - - # Fetch counts - data = self._fetch_json("/api/stats/count") - if data is not None: - self._counts = data if isinstance(data, dict) else None - success_count += 1 - logger.debug("Meshview: fetched counts") - else: - errors.append("counts") - - # Update state - self._last_refresh = time.time() - - if success_count > 0: - self._is_loaded = True - self._last_error = None - logger.info( - f"Meshview refresh: {len(self._nodes)} nodes, {len(self._edges)} edges" - ) - return True - else: - self._last_error = f"All endpoints failed: {', '.join(errors)}" - logger.error(f"Meshview: {self._last_error}") - return False - - def maybe_refresh(self) -> bool: - """Refresh data if interval has elapsed. - - Returns: - True if refresh was performed - """ - if time.time() - self._last_refresh >= self._refresh_interval: - return self.fetch_all() - return False +"""Meshview API data source with tick-based staggered polling.""" + +import json +import logging +import time +from typing import Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from .. import __version__ + +logger = logging.getLogger(__name__) + +USER_AGENT = f"MeshAI/{__version__}" + + +class MeshviewSource: + """Fetches mesh data from a Meshview instance with staggered polling.""" + + # Endpoint schedule: (endpoint, interval_ticks) + # At 30s per tick: 1 tick = 30s, 4 ticks = 2min, 6 ticks = 3min, 10 ticks = 5min + ENDPOINT_SCHEDULE = [ + ("packets", 1), # Every tick (30s) — near real-time + ("nodes", 4), # Every 4 ticks (2 min) + ("stats", 6), # Every 6 ticks (3 min) + ("edges", 6), # Every 6 ticks (3 min) + ("counts", 8), # Every 8 ticks (4 min) + ("traceroutes", 10), # Every 10 ticks (5 min) + ] + + def __init__(self, url: str, refresh_interval: int = 30, polite_mode: bool = False): + """Initialize Meshview source. + + Args: + url: Base URL of Meshview instance (e.g., https://meshview.example.com) + refresh_interval: Seconds between ticks (default 30) + polite_mode: If True, skip frequent packet polling for shared instances + """ + self._url = url.rstrip("/") + self._tick_interval = refresh_interval + self._polite_mode = polite_mode + + # Cached data + self._nodes: list[dict] = [] + self._edges: list[dict] = [] + self._stats: Optional[dict | list] = None + self._counts: Optional[dict] = None + self._packets: list[dict] = [] + self._traceroutes: list[dict] = [] + + # Tick state + self._tick_count: int = 0 + self._last_tick: float = 0.0 + self._last_packet_timestamp: float = 0.0 # For incremental packet fetch + + # Rate limit / health tracking + self._backoff_until: float = 0.0 + self._avg_response_ms: float = 0.0 + self._consecutive_errors: int = 0 + self._max_consecutive_errors: int = 5 + + # Status + self._last_error: Optional[str] = None + self._is_loaded: bool = False + self._data_changed: bool = False + + # Capabilities (discovered on first fetch) + self._capabilities: dict = { + "packets": True, + "packets_since": False, + "traceroutes": True, + } + self._capabilities_probed: bool = False + + @property + def nodes(self) -> list[dict]: + """Get cached nodes list.""" + return self._nodes + + @property + def edges(self) -> list[dict]: + """Get cached edges list.""" + return self._edges + + @property + def stats(self) -> Optional[dict | list]: + """Get cached stats.""" + return self._stats + + @property + def counts(self) -> Optional[dict]: + """Get cached counts.""" + return self._counts + + @property + def packets(self) -> list[dict]: + """Get cached packets.""" + return self._packets + + @property + def traceroutes(self) -> list[dict]: + """Get cached traceroutes.""" + return self._traceroutes + + @property + def last_refresh(self) -> float: + """Get last tick timestamp (epoch).""" + return self._last_tick + + @property + def last_error(self) -> Optional[str]: + """Get last error message if any.""" + return self._last_error + + @property + def is_loaded(self) -> bool: + """Check if data has been successfully loaded.""" + return self._is_loaded + + @property + def data_changed(self) -> bool: + """Check if data has changed since last check, then reset flag.""" + changed = self._data_changed + self._data_changed = False + return changed + + @property + def health_status(self) -> dict: + """Get source health status for monitoring.""" + return { + "url": self._url, + "is_loaded": self._is_loaded, + "last_error": self._last_error, + "avg_response_ms": round(self._avg_response_ms), + "consecutive_errors": self._consecutive_errors, + "backed_off": time.time() < self._backoff_until, + "tick_count": self._tick_count, + "cached_packets": len(self._packets), + "cached_nodes": len(self._nodes), + "polite_mode": self._polite_mode, + } + + def tick(self) -> Optional[str]: + """Execute one polling tick. Called every 30 seconds. + + Returns: + Name of endpoint that was fetched, or None if skipped/rate-limited + """ + now = time.time() + + # Rate limit backoff + if now < self._backoff_until: + return None + + # Consecutive error backoff (exponential) + if self._consecutive_errors >= self._max_consecutive_errors: + backoff = min(300, 30 * (2 ** (self._consecutive_errors - self._max_consecutive_errors))) + if now - self._last_tick < backoff: + return None + + self._tick_count += 1 + self._last_tick = now + + # Determine which endpoint to call this tick + endpoint = self._select_endpoint() + if not endpoint: + return None + + # Execute the call + success = False + if endpoint == "packets": + success = self._fetch_packets_incremental() + elif endpoint == "nodes": + success = self._fetch_nodes() + elif endpoint == "edges": + success = self._fetch_edges() + elif endpoint == "stats": + success = self._fetch_stats() + elif endpoint == "counts": + success = self._fetch_counts() + elif endpoint == "traceroutes": + success = self._fetch_traceroutes() + + if success: + self._consecutive_errors = 0 + self._is_loaded = True + self._last_error = None + else: + self._consecutive_errors += 1 + + return endpoint if success else None + + def _select_endpoint(self) -> Optional[str]: + """Select which endpoint to call on this tick based on schedule.""" + # In polite mode, skip frequent packet polling + schedule = self.ENDPOINT_SCHEDULE + if self._polite_mode: + schedule = [(ep, interval) for ep, interval in schedule if ep != "packets"] + + # Find the highest-priority endpoint that's due + for endpoint, interval_ticks in schedule: + if self._tick_count % interval_ticks == 0: + # Skip if capability check failed + if endpoint in ("packets", "traceroutes") and not self._capabilities.get(endpoint, True): + continue + return endpoint + + # If nothing is scheduled this tick, check packets (always welcome) + if not self._polite_mode and self._capabilities.get("packets", True): + return "packets" + + return None + + def _fetch_with_tracking(self, endpoint: str) -> Optional[dict | list]: + """Fetch JSON with response time tracking and rate limit detection.""" + url = f"{self._url}{endpoint}" + headers = { + "Accept": "application/json", + "User-Agent": USER_AGENT, + } + + start = time.time() + try: + req = Request(url, headers=headers) + with urlopen(req, timeout=15) as resp: + elapsed_ms = (time.time() - start) * 1000 + + # Track response time (rolling average) + self._avg_response_ms = (self._avg_response_ms * 0.8) + (elapsed_ms * 0.2) + + # Warn if slowing down + if elapsed_ms > 5000: + logger.warning( + f"Meshview {self._url} slow: {elapsed_ms:.0f}ms on {endpoint}" + ) + + data = json.loads(resp.read().decode("utf-8")) + return data + + except HTTPError as e: + if e.code == 429: + retry_after = int(e.headers.get('Retry-After', 60)) + self._backoff_until = time.time() + retry_after + logger.warning( + f"Rate limited by {self._url}. Backing off {retry_after}s" + ) + self._last_error = f"Rate limited (429), backing off {retry_after}s" + elif e.code == 503: + self._backoff_until = time.time() + 60 + logger.warning(f"Meshview {endpoint}: Service unavailable (503)") + self._last_error = "Service unavailable (503)" + elif e.code == 404: + # Endpoint doesn't exist — disable it + if "packets" in endpoint: + self._capabilities["packets"] = False + elif "traceroutes" in endpoint: + self._capabilities["traceroutes"] = False + logger.info(f"Meshview {endpoint}: Not available (404)") + self._last_error = f"Endpoint not available: {endpoint}" + else: + logger.warning(f"Meshview {endpoint}: HTTP {e.code}") + self._last_error = f"HTTP {e.code} on {endpoint}" + return None + except URLError as e: + logger.warning(f"Meshview {endpoint}: {e.reason}") + self._last_error = f"Connection error: {e.reason}" + return None + except Exception as e: + logger.warning(f"Meshview {endpoint}: {e}") + self._last_error = str(e) + return None + + def _extract_list(self, data: dict | list | None, key: str) -> list[dict]: + """Extract a list from API response, handling wrapper dicts.""" + if data is None: + return [] + if isinstance(data, list): + return data + if isinstance(data, dict) and key in data: + inner = data[key] + return inner if isinstance(inner, list) else [] + return [] + + def _fetch_packets_incremental(self) -> bool: + """Fetch only NEW packets since last call. + + Uses the ?since= parameter if the Meshview API supports it. + Falls back to fetching recent packets and deduping locally. + """ + # Try incremental first + if self._last_packet_timestamp > 0 and self._capabilities.get("packets_since", False): + since_us = int(self._last_packet_timestamp * 1_000_000) + data = self._fetch_with_tracking(f"/api/packets?since={since_us}") + + if data is None: + # Fall back to getting recent packets without since parameter + data = self._fetch_with_tracking("/api/packets?limit=100") + elif self._last_packet_timestamp > 0: + # No since support, get recent packets + data = self._fetch_with_tracking("/api/packets?limit=100") + else: + # First fetch — get recent packets and probe since capability + data = self._fetch_with_tracking("/api/packets?limit=200") + if data is not None: + # Try probing since capability + since_probe = self._fetch_with_tracking(f"/api/packets?since={int(time.time() * 1_000_000 - 60_000_000)}&limit=1") + if since_probe is not None: + self._capabilities["packets_since"] = True + + if data is None: + return False + + new_packets = self._extract_list(data, "packets") + + if new_packets: + # Dedup against existing packets + existing_ids = {p.get("packet_id") or p.get("id") for p in self._packets} + added = 0 + for pkt in new_packets: + pkt_id = pkt.get("packet_id") or pkt.get("id") + if pkt_id and pkt_id not in existing_ids: + self._packets.append(pkt) + existing_ids.add(pkt_id) + added += 1 + + # Track latest timestamp for next incremental + pkt_time = pkt.get("timestamp") or pkt.get("import_time") or 0 + if isinstance(pkt_time, (int, float)): + # Handle microsecond timestamps + if pkt_time > 1e15: + pkt_time = pkt_time / 1_000_000 + if pkt_time > self._last_packet_timestamp: + self._last_packet_timestamp = pkt_time + + if added > 0: + self._data_changed = True + logger.debug(f"Meshview packets: +{added} new ({len(self._packets)} total)") + + # Cap cached packets to prevent unbounded growth (keep last 2000) + if len(self._packets) > 2000: + self._packets = self._packets[-2000:] + + return True + + def _fetch_nodes(self) -> bool: + """Full node refresh (replaces cached nodes).""" + data = self._fetch_with_tracking("/api/nodes") + if data is not None: + self._nodes = self._extract_list(data, "nodes") + self._data_changed = True + logger.debug(f"Meshview nodes: {len(self._nodes)}") + return True + return False + + def _fetch_edges(self) -> bool: + """Full edge refresh.""" + data = self._fetch_with_tracking("/api/edges") + if data is not None: + self._edges = self._extract_list(data, "edges") + self._data_changed = True + logger.debug(f"Meshview edges: {len(self._edges)}") + return True + return False + + def _fetch_stats(self) -> bool: + """Stats refresh.""" + data = self._fetch_with_tracking("/api/stats?period_type=hour&length=24") + if data is not None: + self._stats = data + self._data_changed = True + return True + return False + + def _fetch_counts(self) -> bool: + """Counts refresh.""" + data = self._fetch_with_tracking("/api/stats/count") + if data is not None: + self._counts = data if isinstance(data, dict) else None + self._data_changed = True + return True + return False + + def _fetch_traceroutes(self) -> bool: + """Traceroute refresh.""" + data = self._fetch_with_tracking("/api/traceroutes") + if data is not None: + self._traceroutes = self._extract_list(data, "traceroutes") + self._data_changed = True + return True + return False + + def fetch_all(self) -> bool: + """Fetch all data at once. Used for initial load and force refresh.""" + success = 0 + + # Nodes first + if self._fetch_nodes(): + success += 1 + + # Edges + if self._fetch_edges(): + success += 1 + + # Stats + if self._fetch_stats(): + success += 1 + + # Counts + if self._fetch_counts(): + success += 1 + + # Packets (if supported) + if self._capabilities.get("packets", True): + if self._fetch_packets_incremental(): + success += 1 + + # Traceroutes (if supported) + if self._capabilities.get("traceroutes", True): + if self._fetch_traceroutes(): + success += 1 + + self._last_tick = time.time() + + if success > 0: + self._is_loaded = True + self._last_error = None + logger.info( + f"Meshview refresh: {len(self._nodes)} nodes, {len(self._edges)} edges, {len(self._packets)} packets" + ) + return True + else: + self._last_error = "All endpoints failed" + logger.error(f"Meshview: {self._last_error}") + return False + + def maybe_refresh(self) -> bool: + """Backward compatible refresh check. Now delegates to tick().""" + now = time.time() + if now - self._last_tick >= self._tick_interval: + result = self.tick() + return result is not None + return False