diff --git a/meshai/connector.py b/meshai/connector.py index e66ed6e..77caa24 100644 --- a/meshai/connector.py +++ b/meshai/connector.py @@ -2,6 +2,7 @@ import asyncio import logging +import threading from dataclasses import dataclass, field from typing import Callable, Optional @@ -46,6 +47,7 @@ class MeshConnector: self._node_names: dict[str, str] = {} self._connected = False self._loop: Optional[asyncio.AbstractEventLoop] = None + self._lock = threading.Lock() @property def connected(self) -> bool: @@ -127,34 +129,36 @@ class MeshConnector: if not self._interface: return - for node_id, node in self._interface.nodes.items(): - # Cache name - if user := node.get("user"): - name = user.get("shortName") or user.get("longName") or node_id - self._node_names[node_id] = name + with self._lock: + for node_id, node in self._interface.nodes.items(): + # Cache name + if user := node.get("user"): + name = user.get("shortName") or user.get("longName") or node_id + self._node_names[node_id] = name - # Cache position - if position := node.get("position"): - lat = position.get("latitude") - lon = position.get("longitude") - if lat is not None and lon is not None: - self._node_positions[node_id] = (lat, lon) + # Cache position + if position := node.get("position"): + lat = position.get("latitude") + lon = position.get("longitude") + if lat is not None and lon is not None: + self._node_positions[node_id] = (lat, lon) def _on_node_update(self, node, interface) -> None: """Handle node info updates.""" node_id = f"!{node['num']:08x}" - # Update name cache - if user := node.get("user"): - name = user.get("shortName") or user.get("longName") or node_id - self._node_names[node_id] = name + with self._lock: + # Update name cache + if user := node.get("user"): + name = user.get("shortName") or user.get("longName") or node_id + self._node_names[node_id] = name - # Update position cache - if position := node.get("position"): - lat = position.get("latitude") - lon = position.get("longitude") - if lat is not None and lon is not None: - self._node_positions[node_id] = (lat, lon) + # Update position cache + if position := node.get("position"): + lat = position.get("latitude") + lon = position.get("longitude") + if lat is not None and lon is not None: + self._node_positions[node_id] = (lat, lon) def _on_receive(self, packet, interface) -> None: """Handle incoming text message.""" @@ -175,8 +179,11 @@ class MeshConnector: # Determine if DM (sent directly to us, not broadcast) is_dm = to_num == self._my_node_id - # Get sender name - sender_name = self._node_names.get(sender_num, sender_num) + with self._lock: + # Get sender name + sender_name = self._node_names.get(sender_num, sender_num) + # Get position if available + position = self._node_positions.get(sender_num) # Create message object msg = MeshMessage( @@ -189,8 +196,8 @@ class MeshConnector: ) # Attach position if available - if sender_num in self._node_positions: - msg._position = self._node_positions[sender_num] + if position: + msg._position = position # Schedule callback on event loop self._loop.call_soon_threadsafe( @@ -258,7 +265,8 @@ class MeshConnector: Returns: Tuple of (latitude, longitude) or None if not available """ - return self._node_positions.get(node_id) + with self._lock: + return self._node_positions.get(node_id) def get_node_name(self, node_id: str) -> str: """Get cached name for a node. @@ -269,4 +277,5 @@ class MeshConnector: Returns: Node name or the node ID if name not available """ - return self._node_names.get(node_id, node_id) + with self._lock: + return self._node_names.get(node_id, node_id) diff --git a/meshai/web_status.py b/meshai/web_status.py index 19611c6..7824431 100644 --- a/meshai/web_status.py +++ b/meshai/web_status.py @@ -3,6 +3,7 @@ import asyncio import json import logging +import threading import time from datetime import datetime from http.server import BaseHTTPRequestHandler, HTTPServer @@ -18,6 +19,7 @@ class StatusData: """Container for status information.""" def __init__(self): + self._lock = threading.Lock() self.start_time = time.time() self.message_count = 0 self.response_count = 0 @@ -29,31 +31,34 @@ class StatusData: def record_message(self, sender_id: str, sender_name: str): """Record an incoming message.""" - self.message_count += 1 - self.last_message_time = time.time() - self.connected_nodes.add(sender_id) + with self._lock: + self.message_count += 1 + self.last_message_time = time.time() + self.connected_nodes.add(sender_id) - self.recent_activity.append({ - "type": "message", - "time": datetime.now().isoformat(), - "sender": sender_name, - }) - # Keep only last 20 activities - self.recent_activity = self.recent_activity[-20:] + self.recent_activity.append({ + "type": "message", + "time": datetime.now().isoformat(), + "sender": sender_name, + }) + # Keep only last 20 activities + self.recent_activity = self.recent_activity[-20:] def record_response(self): """Record an outgoing response.""" - self.response_count += 1 + with self._lock: + self.response_count += 1 def record_error(self, error: str): """Record an error.""" - self.error_count += 1 - self.recent_activity.append({ - "type": "error", - "time": datetime.now().isoformat(), - "error": error[:100], - }) - self.recent_activity = self.recent_activity[-20:] + with self._lock: + self.error_count += 1 + self.recent_activity.append({ + "type": "error", + "time": datetime.now().isoformat(), + "error": error[:100], + }) + self.recent_activity = self.recent_activity[-20:] def get_uptime(self) -> str: """Get formatted uptime string.""" @@ -75,22 +80,23 @@ class StatusData: def to_dict(self, include_activity: bool = False) -> dict: """Convert to dictionary for JSON response.""" - data = { - "status": "online", - "uptime": self.get_uptime(), - "uptime_seconds": int(time.time() - self.start_time), - "messages_received": self.message_count, - "responses_sent": self.response_count, - "errors": self.error_count, - "connected_nodes": len(self.connected_nodes), - "using_fallback": self.using_fallback, - } + with self._lock: + data = { + "status": "online", + "uptime": self.get_uptime(), + "uptime_seconds": int(time.time() - self.start_time), + "messages_received": self.message_count, + "responses_sent": self.response_count, + "errors": self.error_count, + "connected_nodes": len(self.connected_nodes), + "using_fallback": self.using_fallback, + } - if self.last_message_time: - data["last_message_ago"] = int(time.time() - self.last_message_time) + if self.last_message_time: + data["last_message_ago"] = int(time.time() - self.last_message_time) - if include_activity: - data["recent_activity"] = self.recent_activity + if include_activity: + data["recent_activity"] = list(self.recent_activity) return data