From ab7392c5185cc162853a8308091b4042109fac55 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Tue, 12 May 2026 21:57:11 +0000 Subject: [PATCH] feat: Add MQTT source adapter --- config.example.yaml | 356 +++--- dashboard-frontend/src/pages/Config.tsx | 32 +- meshai/config.py | 66 +- meshai/main.py | 1445 ++++++++++++----------- meshai/mesh_data_store.py | 79 +- meshai/sources/mqtt_source.py | 435 +++++++ pyproject.toml | 1 + requirements.txt | 1 + 8 files changed, 1515 insertions(+), 900 deletions(-) create mode 100644 meshai/sources/mqtt_source.py diff --git a/config.example.yaml b/config.example.yaml index cb45e3f..81975bf 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,173 +1,183 @@ -# MeshAI Configuration -# LLM-powered Meshtastic assistant -# -# Copy this to config.yaml and customize as needed -# For Docker: mount as /data/config.yaml - -# === BOT IDENTITY === -bot: - name: ai # Bot's display name - owner: "" # Owner's callsign (optional) - respond_to_dms: true # Respond to direct messages - filter_bbs_protocols: true # Ignore advBBS sync/notification messages - -# === MESHTASTIC CONNECTION === -connection: - type: tcp # serial | tcp - serial_port: /dev/ttyUSB0 # For serial connection - tcp_host: localhost # For TCP connection (meshtasticd) - tcp_port: 4403 - -# === RESPONSE BEHAVIOR === -response: - delay_min: 2.2 # Min delay before responding (seconds) - delay_max: 3.0 # Max delay before responding - max_length: 200 # Max chars per message chunk - max_messages: 3 # Max message chunks per response - -# === CONVERSATION HISTORY === -history: - database: /data/conversations.db - max_messages_per_user: 50 # Messages to keep per user - conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h) - auto_cleanup: true # Auto-delete old conversations - cleanup_interval_hours: 24 # How often to run cleanup - max_age_days: 30 # Delete conversations older than this - -# === MEMORY OPTIMIZATION === -memory: - enabled: true # Enable rolling summary memory - window_size: 4 # Recent message pairs to keep in full - summarize_threshold: 8 # Messages before re-summarizing - -# === MESH CONTEXT === -context: - enabled: true # Observe channel traffic for LLM context - observe_channels: [] # Channel indices to observe (empty = all) - ignore_nodes: [] # Node IDs to exclude from observation - max_age: 2592000 # Max age in seconds (default 30 days) - max_context_items: 20 # Max observations injected into LLM context - -# === LLM BACKEND === -llm: - backend: openai # openai | anthropic | google - api_key: "" # API key (or use LLM_API_KEY env var) - base_url: https://api.openai.com/v1 # API base URL - model: gpt-4o-mini # Model name - timeout: 30 # Request timeout (seconds) - system_prompt: >- - You are a helpful assistant on a Meshtastic mesh network. - Keep responses very brief - 1-2 short sentences, under 300 characters. - Only give longer answers if the user explicitly asks for detail or explanation. - Be concise but friendly. No markdown formatting. - google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries) - -# === WEATHER === -weather: - primary: openmeteo # openmeteo | wttr | llm - fallback: llm # openmeteo | wttr | llm | none - default_location: "" # Default location for !weather (optional) - -# === MESHMONITOR INTEGRATION === -meshmonitor: - enabled: false # Enable MeshMonitor trigger sync - url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333) - inject_into_prompt: true # Include trigger list in LLM prompt - refresh_interval: 300 # Seconds between trigger refreshes - -# === KNOWLEDGE BASE (RAG) === -knowledge: - enabled: false # Enable knowledge base search - db_path: "" # Path to knowledge SQLite database - top_k: 5 # Number of chunks to retrieve per query - -# === MESH DATA SOURCES === -# Connect to Meshview and/or MeshMonitor instances for live mesh -# network analysis. Supports multiple sources. Configure via TUI -# with meshai --config (Mesh Sources menu). -# -# mesh_sources: -# - name: "my-meshview" -# type: meshview -# url: "https://meshview.example.com" -# refresh_interval: 300 -# enabled: true -# -# - name: "my-meshmonitor" -# type: meshmonitor -# url: "http://192.168.1.100:3333" -# api_token: "${MM_API_TOKEN}" -# refresh_interval: 300 -# enabled: true -mesh_sources: [] - -# === MESH INTELLIGENCE === -# Geographic clustering and health scoring for mesh analysis. -# Requires mesh_sources to be configured with at least one data source. -# -# mesh_intelligence: -# enabled: true -# region_radius_miles: 40.0 # Radius for region clustering -# locality_radius_miles: 8.0 # Radius for locality clustering -# offline_threshold_hours: 24 # Hours before node considered offline -# packet_threshold: 500 # Non-text packets per 24h to flag -# battery_warning_percent: 20 # Battery level for warnings -# infra_overrides: [] # Node IDs to exclude from infrastructure -# region_labels: {} # Override auto-names: {"Twin Falls": "Magic Valley"} -mesh_intelligence: - enabled: false - region_radius_miles: 40.0 - locality_radius_miles: 8.0 - offline_threshold_hours: 24 - packet_threshold: 500 - battery_warning_percent: 20 - infra_overrides: [] - region_labels: {} - -# === ENVIRONMENTAL FEEDS === -# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo. -# Provides weather alerts, HF propagation assessment, and tropospheric ducting. -# -environmental: - enabled: false - nws_zones: - - "IDZ016" # Western Magic Valley - - "IDZ030" # Southern Twin Falls County - - # NWS Weather Alerts (api.weather.gov) - nws: - enabled: true - tick_seconds: 60 - areas: ["ID"] - severity_min: "moderate" - user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS - - # NOAA Space Weather (services.swpc.noaa.gov) - swpc: - enabled: true - - # Tropospheric ducting assessment (Open-Meteo GFS, no auth) - ducting: - enabled: true - tick_seconds: 10800 # 3 hours - latitude: 42.56 # center of mesh coverage area - longitude: -114.47 - - # NIFC Fire Perimeters (Phase 2) - fires: - enabled: false - tick_seconds: 600 - state: "US-ID" - - # Avalanche Advisories (Phase 2) - avalanche: - enabled: false - tick_seconds: 1800 - center_ids: ["SNFAC"] - season_months: [12, 1, 2, 3, 4] - -# === WEB DASHBOARD === -dashboard: - enabled: true - port: 8080 - host: "0.0.0.0" +# MeshAI Configuration +# LLM-powered Meshtastic assistant +# +# Copy this to config.yaml and customize as needed +# For Docker: mount as /data/config.yaml + +# === BOT IDENTITY === +bot: + name: ai # Bot's display name + owner: "" # Owner's callsign (optional) + respond_to_dms: true # Respond to direct messages + filter_bbs_protocols: true # Ignore advBBS sync/notification messages + +# === MESHTASTIC CONNECTION === +connection: + type: tcp # serial | tcp + serial_port: /dev/ttyUSB0 # For serial connection + tcp_host: localhost # For TCP connection (meshtasticd) + tcp_port: 4403 + +# === RESPONSE BEHAVIOR === +response: + delay_min: 2.2 # Min delay before responding (seconds) + delay_max: 3.0 # Max delay before responding + max_length: 200 # Max chars per message chunk + max_messages: 3 # Max message chunks per response + +# === CONVERSATION HISTORY === +history: + database: /data/conversations.db + max_messages_per_user: 50 # Messages to keep per user + conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h) + auto_cleanup: true # Auto-delete old conversations + cleanup_interval_hours: 24 # How often to run cleanup + max_age_days: 30 # Delete conversations older than this + +# === MEMORY OPTIMIZATION === +memory: + enabled: true # Enable rolling summary memory + window_size: 4 # Recent message pairs to keep in full + summarize_threshold: 8 # Messages before re-summarizing + +# === MESH CONTEXT === +context: + enabled: true # Observe channel traffic for LLM context + observe_channels: [] # Channel indices to observe (empty = all) + ignore_nodes: [] # Node IDs to exclude from observation + max_age: 2592000 # Max age in seconds (default 30 days) + max_context_items: 20 # Max observations injected into LLM context + +# === LLM BACKEND === +llm: + backend: openai # openai | anthropic | google + api_key: "" # API key (or use LLM_API_KEY env var) + base_url: https://api.openai.com/v1 # API base URL + model: gpt-4o-mini # Model name + timeout: 30 # Request timeout (seconds) + system_prompt: >- + You are a helpful assistant on a Meshtastic mesh network. + Keep responses very brief - 1-2 short sentences, under 300 characters. + Only give longer answers if the user explicitly asks for detail or explanation. + Be concise but friendly. No markdown formatting. + google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries) + +# === WEATHER === +weather: + primary: openmeteo # openmeteo | wttr | llm + fallback: llm # openmeteo | wttr | llm | none + default_location: "" # Default location for !weather (optional) + +# === MESHMONITOR INTEGRATION === +meshmonitor: + enabled: false # Enable MeshMonitor trigger sync + url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333) + inject_into_prompt: true # Include trigger list in LLM prompt + refresh_interval: 300 # Seconds between trigger refreshes + +# === KNOWLEDGE BASE (RAG) === +knowledge: + enabled: false # Enable knowledge base search + db_path: "" # Path to knowledge SQLite database + top_k: 5 # Number of chunks to retrieve per query + +# === MESH DATA SOURCES === +# Connect to Meshview and/or MeshMonitor instances for live mesh +# network analysis. Supports multiple sources. Configure via TUI +# with meshai --config (Mesh Sources menu). +# +# mesh_sources: +# - name: "my-meshview" +# type: meshview +# url: "https://meshview.example.com" +# refresh_interval: 300 +# enabled: true +# +# - name: "my-meshmonitor" +# type: meshmonitor +# url: "http://192.168.1.100:3333" +# api_token: "${MM_API_TOKEN}" +# refresh_interval: 300 +# enabled: true +# +# - name: "mqtt-broker" +# type: mqtt +# host: "mqtt.meshtastic.org" +# port: 1883 +# username: "meshdev" +# password: "large4cats" +# topic_root: "msh/US" +# use_tls: false +# enabled: true +mesh_sources: [] + +# === MESH INTELLIGENCE === +# Geographic clustering and health scoring for mesh analysis. +# Requires mesh_sources to be configured with at least one data source. +# +# mesh_intelligence: +# enabled: true +# region_radius_miles: 40.0 # Radius for region clustering +# locality_radius_miles: 8.0 # Radius for locality clustering +# offline_threshold_hours: 24 # Hours before node considered offline +# packet_threshold: 500 # Non-text packets per 24h to flag +# battery_warning_percent: 20 # Battery level for warnings +# infra_overrides: [] # Node IDs to exclude from infrastructure +# region_labels: {} # Override auto-names: {"Twin Falls": "Magic Valley"} +mesh_intelligence: + enabled: false + region_radius_miles: 40.0 + locality_radius_miles: 8.0 + offline_threshold_hours: 24 + packet_threshold: 500 + battery_warning_percent: 20 + infra_overrides: [] + region_labels: {} + +# === ENVIRONMENTAL FEEDS === +# Live situational awareness from NWS, NOAA Space Weather, and Open-Meteo. +# Provides weather alerts, HF propagation assessment, and tropospheric ducting. +# +environmental: + enabled: false + nws_zones: + - "IDZ016" # Western Magic Valley + - "IDZ030" # Southern Twin Falls County + + # NWS Weather Alerts (api.weather.gov) + nws: + enabled: true + tick_seconds: 60 + areas: ["ID"] + severity_min: "moderate" + user_agent: "(meshai.example.com, ops@example.com)" # REQUIRED by NWS + + # NOAA Space Weather (services.swpc.noaa.gov) + swpc: + enabled: true + + # Tropospheric ducting assessment (Open-Meteo GFS, no auth) + ducting: + enabled: true + tick_seconds: 10800 # 3 hours + latitude: 42.56 # center of mesh coverage area + longitude: -114.47 + + # NIFC Fire Perimeters (Phase 2) + fires: + enabled: false + tick_seconds: 600 + state: "US-ID" + + # Avalanche Advisories (Phase 2) + avalanche: + enabled: false + tick_seconds: 1800 + center_ids: ["SNFAC"] + season_months: [12, 1, 2, 3, 4] + +# === WEB DASHBOARD === +dashboard: + enabled: true + port: 8080 + host: "0.0.0.0" diff --git a/dashboard-frontend/src/pages/Config.tsx b/dashboard-frontend/src/pages/Config.tsx index 06f981d..caf2d0b 100644 --- a/dashboard-frontend/src/pages/Config.tsx +++ b/dashboard-frontend/src/pages/Config.tsx @@ -111,6 +111,13 @@ interface MeshSourceConfig { refresh_interval: number polite_mode: boolean enabled: boolean + // MQTT-specific fields + host?: string + port?: number + username?: string + password?: string + topic_root?: string + use_tls?: boolean } interface RegionAnchor { @@ -697,13 +704,30 @@ function MeshSourceCard({ source, onChange, onDelete }: { options={[ { value: 'meshview', label: 'MeshView' }, { value: 'meshmonitor', label: 'MeshMonitor' }, + { value: 'mqtt', label: 'MQTT Broker' }, ]} /> - onChange({ ...source, url: v })} /> + {source.type !== 'mqtt' && ( + onChange({ ...source, url: v })} /> + )} {source.type === 'meshmonitor' && ( onChange({ ...source, api_token: v })} type="password" /> )} + {source.type === 'mqtt' && ( + <> +
+ onChange({ ...source, host: v })} /> + onChange({ ...source, port: v })} min={1} max={65535} /> +
+
+ onChange({ ...source, username: v })} /> + onChange({ ...source, password: v })} type="password" /> +
+ onChange({ ...source, topic_root: v })} /> + onChange({ ...source, use_tls: v })} /> + + )} onChange({ ...source, refresh_interval: v })} min={10} /> onChange({ ...source, enabled: v })} /> onChange({ ...source, polite_mode: v })} /> @@ -723,6 +747,12 @@ function MeshSourcesSection({ data, onChange }: { data: MeshSourceConfig[]; onCh refresh_interval: 30, polite_mode: false, enabled: true, + host: '', + port: 1883, + username: '', + password: '', + topic_root: 'msh/US', + use_tls: false, }]) } diff --git a/meshai/config.py b/meshai/config.py index e13a256..fb02ddf 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -60,6 +60,14 @@ class MemoryConfig: """Rolling summary memory settings.""" enabled: bool = True # Enable memory optimization + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection window_size: int = 4 # Recent message pairs to keep in full summarize_threshold: int = 8 # Messages before re-summarizing @@ -69,6 +77,14 @@ class ContextConfig: """Passive mesh context settings.""" enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection observe_channels: list[int] = field(default_factory=list) # Empty = all channels ignore_nodes: list[str] = field(default_factory=list) # Node IDs to ignore max_age: int = 2_592_000 # 30 days in seconds @@ -80,6 +96,14 @@ class CommandsConfig: """Command settings.""" enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection prefix: str = "!" disabled_commands: list[str] = field(default_factory=list) custom_commands: dict = field(default_factory=dict) @@ -179,12 +203,20 @@ class MeshSourceConfig: """Configuration for a mesh data source.""" name: str = "" - type: str = "" # "meshview" or "meshmonitor" + type: str = "" # "meshview", "meshmonitor", or "mqtt" url: str = "" api_token: str = "" # MeshMonitor only, supports ${ENV_VAR} refresh_interval: int = 30 # Tick interval in seconds (default 30) polite_mode: bool = False # Reduces polling frequency for shared instances enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection @dataclass @@ -263,6 +295,14 @@ class NWSConfig: """NWS weather alerts settings.""" enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection tick_seconds: int = 60 areas: list = field(default_factory=lambda: ["ID"]) severity_min: str = "moderate" @@ -274,6 +314,14 @@ class SWPCConfig: """NOAA Space Weather settings.""" enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection @dataclass @@ -281,6 +329,14 @@ class DuctingConfig: """Tropospheric ducting settings.""" enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection tick_seconds: int = 10800 # 3 hours latitude: float = 42.56 # Twin Falls area default longitude: float = -114.47 @@ -323,6 +379,14 @@ class DashboardConfig: """Web dashboard settings.""" enabled: bool = True + + # MQTT-specific fields (type=mqtt only) + host: str = "" # MQTT broker hostname + port: int = 1883 # MQTT broker port (1883 plain, 8883 TLS) + username: str = "" # MQTT username (optional) + password: str = "" # MQTT password (optional, supports ) + topic_root: str = "msh/US" # Topic root to subscribe to + use_tls: bool = False # Enable TLS for MQTT connection port: int = 8080 host: str = "0.0.0.0" diff --git a/meshai/main.py b/meshai/main.py index 452857d..df83d3b 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -1,721 +1,724 @@ -"""Main entry point for MeshAI.""" - -import argparse -import asyncio -import logging -import os -import signal -import sys -import time -from pathlib import Path -from typing import Optional - -from . import __version__ -from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend -from .cli import run_configurator -from .commands import CommandDispatcher -from .commands.dispatcher import create_dispatcher -from .commands.status import set_start_time -from .config import Config, load_config -from .connector import MeshConnector, MeshMessage -from .context import MeshContext -from .history import ConversationHistory -from .memory import ConversationSummary -from .responder import Responder -from .router import MessageRouter, RouteType - -logger = logging.getLogger(__name__) - - -class MeshAI: - """Main application class.""" - - def __init__(self, config: Config): - self.config = config - self.connector: Optional[MeshConnector] = None - self.history: Optional[ConversationHistory] = None - self.dispatcher: Optional[CommandDispatcher] = None - self.llm: Optional[LLMBackend] = None - self.context: Optional[MeshContext] = None - self.meshmonitor_sync = None - self.knowledge = None - self.data_store = None # Replaces source_manager - self.health_engine = None - self.mesh_reporter = None - self.subscription_manager = None - self.alert_engine = None - self.env_store = None # Environmental feeds store - self._last_sub_check: float = 0.0 - self.router: Optional[MessageRouter] = None - self.responder: Optional[Responder] = None - self._running = False - self._loop: Optional[asyncio.AbstractEventLoop] = None - self._last_cleanup: float = 0.0 - self._last_health_compute: float = 0.0 - self.broadcaster = None # Dashboard WebSocket broadcaster - - async def start(self) -> None: - """Start the bot.""" - logger.info(f"Starting MeshAI v{__version__}") - set_start_time(time.time()) - - # Initialize components - await self._init_components() - - # Connect to Meshtastic - self.connector.connect() - self.connector.set_message_callback(self._on_message, asyncio.get_event_loop()) - - # Add own node ID to context ignore list - if self.context and self.connector.my_node_id: - self.context._ignore_nodes.add(self.connector.my_node_id) - - self._running = True - self._loop = asyncio.get_event_loop() - self._last_cleanup = time.time() - self._last_health_compute = 0.0 - - # Write PID file - self._write_pid() - - logger.info("MeshAI started successfully") - - # Keep running - while self._running: - await asyncio.sleep(1) - - # Periodic MeshMonitor refresh - if self.meshmonitor_sync: - self.meshmonitor_sync.maybe_refresh() - - # Periodic data store refresh and health computation - if self.data_store: - refreshed = self.data_store.refresh() - # Recompute health after refresh - if refreshed and self.health_engine: - self.health_engine.compute(self.data_store) - self._last_health_compute = time.time() - - # Broadcast health update to dashboard - if self.broadcaster and self.health_engine.mesh_health: - try: - mh = self.health_engine.mesh_health - health_dict = { - "score": round(mh.score.composite, 1), - "tier": mh.score.tier, - "total_nodes": mh.total_nodes, - "total_regions": mh.total_regions, - "infra_online": mh.score.infra_online, - "infra_total": mh.score.infra_total, - "last_computed": mh.last_computed, - } - await self.broadcaster.broadcast("health_update", health_dict) - except Exception as e: - logger.debug("Dashboard broadcast error: %s", e) - - # Check for alertable conditions - if self.alert_engine: - alerts = self.alert_engine.check() - if alerts: - await self._dispatch_alerts(alerts) - - # Broadcast alerts to dashboard - if self.broadcaster: - for alert in alerts: - try: - await self.broadcaster.broadcast("alert_fired", alert) - except Exception: - pass - - # Environmental feed refresh - if self.env_store: - try: - env_changed = self.env_store.refresh() - if env_changed and self.alert_engine: - env_alerts = self.alert_engine.check_environmental(self.env_store) - if env_alerts: - await self._dispatch_alerts(env_alerts) - if self.broadcaster: - for ea in env_alerts: - await self.broadcaster.broadcast("alert_fired", ea) - - # Broadcast env updates to dashboard - if env_changed and self.broadcaster: - await self.broadcaster.broadcast("env_update", { - "active_count": len(self.env_store.get_active()), - "swpc": self.env_store.get_swpc_status(), - "ducting": self.env_store.get_ducting_status(), - }) - except Exception as e: - logger.debug("Env refresh error: %s", e) - - # Check scheduled subscriptions (every 60 seconds) - if self.subscription_manager and self.mesh_reporter: - if time.time() - self._last_sub_check >= 60: - await self._check_scheduled_subs() - self._last_sub_check = time.time() - - # Periodic cleanup - if time.time() - self._last_cleanup >= 3600: - await self.history.cleanup_expired() - if self.context: - self.context.prune() - self._last_cleanup = time.time() - - async def stop(self) -> None: - """Stop the bot.""" - logger.info("Stopping MeshAI...") - self._running = False - - if self.connector: - self.connector.disconnect() - - if self.history: - await self.history.close() - - if self.llm: - await self.llm.close() - if self.knowledge: - self.knowledge.close() - if self.data_store: - self.data_store.close() - if self.subscription_manager: - self.subscription_manager.close() - - self._remove_pid() - logger.info("MeshAI stopped") - - async def _init_components(self) -> None: - """Initialize all components.""" - # Conversation history - self.history = ConversationHistory(self.config.history) - await self.history.initialize() - - # LLM backend - api_key = self.config.resolve_api_key() - if not api_key: - logger.warning("No API key configured - LLM responses will fail") - - # Memory config - mem_cfg = self.config.memory - window_size = mem_cfg.window_size if mem_cfg.enabled else 0 - summarize_threshold = mem_cfg.summarize_threshold - - # Create backend - backend = self.config.llm.backend.lower() - if backend == "openai": - self.llm = OpenAIBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - elif backend == "anthropic": - self.llm = AnthropicBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - elif backend == "google": - self.llm = GoogleBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - else: - logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI") - self.llm = OpenAIBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - - # Load persisted summaries into memory cache - await self._load_summaries() - - # Meshtastic connector - self.connector = MeshConnector(self.config.connection) - - # Passive mesh context buffer - ctx_cfg = self.config.context - if ctx_cfg.enabled: - self.context = MeshContext( - observe_channels=ctx_cfg.observe_channels or None, - ignore_nodes=ctx_cfg.ignore_nodes or None, - max_age=ctx_cfg.max_age, - ) - logger.info("Mesh context buffer enabled") - else: - self.context = None - - # MeshMonitor trigger sync - mm_cfg = self.config.meshmonitor - if mm_cfg.enabled and mm_cfg.url: - from .meshmonitor import MeshMonitorSync - self.meshmonitor_sync = MeshMonitorSync( - url=mm_cfg.url, - refresh_interval=mm_cfg.refresh_interval, - ) - count = self.meshmonitor_sync.load() - logger.info(f"MeshMonitor sync enabled, loaded {count} triggers") - else: - self.meshmonitor_sync = None - - # Mesh data store (replaces MeshSourceManager) - # mesh_sources may be dicts or MeshSourceConfig objects depending on config version - enabled_sources = [ - s for s in self.config.mesh_sources - if (s.enabled if hasattr(s, 'enabled') else s.get('enabled', True)) - ] - if enabled_sources: - from .mesh_data_store import MeshDataStore - self.data_store = MeshDataStore( - source_configs=enabled_sources, - db_path="/data/mesh_history.db", - ) - # Initial fetch and backfill - self.data_store.force_refresh() - # Log status - for status in self.data_store.get_status(): - if status["is_loaded"]: - logger.info( - f"Mesh source '{status['name']}' ({status['type']}): " - f"{status['node_count']} nodes" - ) - else: - logger.warning( - f"Mesh source '{status['name']}' ({status['type']}): " - f"failed - {status.get('last_error', 'unknown error')}" - ) - else: - self.data_store = None - - # Mesh health engine - mi_cfg = self.config.mesh_intelligence - if mi_cfg.enabled and self.data_store: - from .mesh_health import MeshHealthEngine - self.health_engine = MeshHealthEngine( - regions=mi_cfg.regions, - locality_radius=mi_cfg.locality_radius_miles, - offline_threshold_hours=mi_cfg.offline_threshold_hours, - packet_threshold=mi_cfg.packet_threshold, - battery_warning_percent=mi_cfg.battery_warning_percent, - ) - # Initial health computation - mesh_health = self.health_engine.compute(self.data_store) - self._last_health_compute = time.time() - logger.info( - f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, " - f"{mesh_health.total_regions} regions, " - f"score {mesh_health.score.composite:.0f}/100 ({mesh_health.score.tier})" - ) - else: - self.health_engine = None - - # Mesh reporter (for LLM prompt injection and commands) - if self.health_engine and self.data_store: - from .mesh_reporter import MeshReporter - mi_regions = self.config.mesh_intelligence.regions if self.config.mesh_intelligence else [] - self.mesh_reporter = MeshReporter(self.health_engine, self.data_store, region_configs=mi_regions) - logger.info("Mesh reporter enabled") - else: - self.mesh_reporter = None - - # Subscription manager (uses same db as data_store) - if self.data_store: - from .subscriptions import SubscriptionManager - self.subscription_manager = SubscriptionManager(db_path="/data/mesh_history.db") - logger.info("Subscription manager enabled") - else: - self.subscription_manager = None - - # Alert engine (needs health engine, reporter, and subscription manager) - if self.health_engine and self.mesh_reporter and self.subscription_manager: - from .alert_engine import AlertEngine - mi = self.config.mesh_intelligence - self.alert_engine = AlertEngine( - health_engine=self.health_engine, - reporter=self.mesh_reporter, - subscription_manager=self.subscription_manager, - config=mi, - db_path="/data/mesh_history.db", - timezone=self.config.timezone, - ) - logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})") - - # Environmental feeds - env_cfg = self.config.environmental - if env_cfg.enabled: - from .env.store import EnvironmentalStore - # Pass region anchors for fire proximity calculation - region_anchors = self.config.mesh_intelligence.regions if self.config.mesh_intelligence.enabled else [] - self.env_store = EnvironmentalStore(config=env_cfg, region_anchors=region_anchors) - logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)") - else: - self.env_store = None - - # Knowledge base (optional - Qdrant with SQLite fallback) - kb_cfg = self.config.knowledge - self.knowledge = None - if kb_cfg.enabled: - # Try Qdrant first if configured - if kb_cfg.backend in ("qdrant", "auto") and kb_cfg.qdrant_host: - try: - from .knowledge import QdrantKnowledgeSearch - qdrant = QdrantKnowledgeSearch( - qdrant_host=kb_cfg.qdrant_host, - qdrant_port=kb_cfg.qdrant_port, - collection=kb_cfg.qdrant_collection, - tei_host=kb_cfg.tei_host, - tei_port=kb_cfg.tei_port, - sparse_host=kb_cfg.sparse_host, - sparse_port=kb_cfg.sparse_port, - use_sparse=kb_cfg.use_sparse, - top_k=kb_cfg.top_k, - ) - if qdrant.available: - self.knowledge = qdrant - logger.info("Using Qdrant knowledge backend (RECON hybrid)") - except Exception as e: - logger.warning(f"Qdrant knowledge unavailable: {e}") - - # Fall back to SQLite if Qdrant failed or not configured - if not self.knowledge and kb_cfg.backend in ("sqlite", "auto") and kb_cfg.db_path: - try: - from .knowledge import KnowledgeSearch - self.knowledge = KnowledgeSearch( - db_path=kb_cfg.db_path, - top_k=kb_cfg.top_k, - ) - except ImportError as e: - logger.warning(f"SQLite knowledge disabled - missing dependencies: {e}") - - # Command dispatcher (needs mesh_reporter for health commands) - self.dispatcher = create_dispatcher( - prefix=self.config.commands.prefix, - disabled_commands=self.config.commands.disabled_commands, - custom_commands=self.config.commands.custom_commands, - mesh_reporter=self.mesh_reporter, - data_store=self.data_store, - health_engine=self.health_engine, - subscription_manager=self.subscription_manager, - env_store=self.env_store, - ) - - # Message router - self.router = MessageRouter( - self.config, self.connector, self.history, self.dispatcher, self.llm, - context=self.context, - meshmonitor_sync=self.meshmonitor_sync, - knowledge=self.knowledge, - source_manager=self.data_store, - health_engine=self.health_engine, - mesh_reporter=self.mesh_reporter, - env_store=self.env_store, - ) - - # Responder - self.responder = Responder(self.config.response, self.connector) - - # Dashboard - if hasattr(self.config, 'dashboard') and self.config.dashboard.enabled: - try: - from .dashboard.server import start_dashboard - self.broadcaster = await start_dashboard(self) - logger.info("Dashboard started on port %d", self.config.dashboard.port) - except Exception as e: - logger.warning("Dashboard failed to start: %s", e) - self.broadcaster = None - else: - self.broadcaster = None - - async def _on_message(self, message: MeshMessage) -> None: - """Handle incoming message.""" - try: - # Passively observe channel broadcasts for context (before filtering) - if self.context and not message.is_dm and message.text: - self.context.observe( - sender_name=message.sender_name, - sender_id=message.sender_id, - text=message.text, - channel=message.channel, - is_dm=False, - ) - - # Check if we should respond - if not self.router.should_respond(message): - return - - logger.info( - f"Processing message from {message.sender_name} ({message.sender_id}): " - f"{message.text[:50]}..." - ) - - # Route the message - # Check for continuation request first - continuation_messages = self.router.check_continuation(message) - if continuation_messages: - await self.responder.send_response( - continuation_messages, - destination=message.sender_id, - channel=message.channel, - ) - return - - result = await self.router.route(message) - - if result.route_type == RouteType.IGNORE: - return - - # Determine response - if result.route_type == RouteType.COMMAND: - if isinstance(result.response, list): - # Command returned pre-split messages — send directly - messages = result.response - else: - # Single string — chunk it - from .chunker import chunk_response - messages, remaining = chunk_response( - result.response, - max_chars=self.config.response.max_length, - max_messages=self.config.response.max_messages, - ) - if remaining: - self.router.continuations.store(message.sender_id, remaining) - elif result.route_type == RouteType.LLM: - messages = await self.router.generate_llm_response(message, result.query) - else: - return - - if not messages: - return - - # Send DM response - await self.responder.send_response( - messages, - destination=message.sender_id, - channel=message.channel, - ) - - except Exception as e: - logger.error(f"Error handling message: {e}", exc_info=True) - - async def _load_summaries(self) -> None: - """Load persisted summaries from database into memory cache.""" - memory = self.llm.get_memory() - if not memory: - return - - if not self.history or not self.history._db: - return - - try: - async with self.history._lock: - cursor = await self.history._db.execute( - "SELECT user_id, summary, message_count, updated_at " - "FROM conversation_summaries" - ) - rows = await cursor.fetchall() - - loaded = 0 - for row in rows: - user_id, summary_text, message_count, updated_at = row - summary = ConversationSummary( - summary=summary_text, - last_updated=updated_at, - message_count=message_count, - ) - memory.load_summary(user_id, summary) - loaded += 1 - - if loaded: - logger.info(f"Loaded {loaded} conversation summaries from database") - - except Exception as e: - logger.warning(f"Failed to load summaries from database: {e}") - - def _write_pid(self) -> None: - """Write PID file.""" - pid_file = Path("/tmp/meshai.pid") - pid_file.write_text(str(os.getpid())) - - def _remove_pid(self) -> None: - """Remove PID file.""" - pid_file = Path("/tmp/meshai.pid") - if pid_file.exists(): - pid_file.unlink() - - async def _dispatch_alerts(self, alerts: list[dict]) -> None: - """Dispatch alerts to subscribers and alert channel.""" - mi = self.config.mesh_intelligence - alert_channel = getattr(mi, 'alert_channel', -1) - - for alert in alerts: - message = alert["message"] - logger.info(f"ALERT: {message}") - - # Send to alert channel if configured - if alert_channel >= 0 and self.connector: - try: - self.connector.send_message( - text=message, - destination=None, # Broadcast - channel=alert_channel, - ) - logger.info(f"Alert sent to channel {alert_channel}") - except Exception as e: - logger.error(f"Failed to send channel alert: {e}") - - # Send DMs to matching subscribers - if self.alert_engine and self.subscription_manager: - subscribers = self.alert_engine.get_subscribers_for_alert(alert) - for sub in subscribers: - user_id = sub["user_id"] - try: - await self._send_sub_dm(user_id, message) - logger.info(f"Alert DM sent to {user_id}: {alert['type']}") - except Exception as e: - logger.error(f"Failed to send alert DM to {user_id}: {e}") - - self.alert_engine.clear_pending() - - async def _check_scheduled_subs(self) -> None: - """Check for and deliver due scheduled reports.""" - from datetime import datetime - from zoneinfo import ZoneInfo - - tz = ZoneInfo(self.config.timezone) - now = datetime.now(tz) - current_hhmm = now.strftime("%H%M") - current_day = now.strftime("%a").lower() - - due_subs = self.subscription_manager.get_due_subscriptions(current_hhmm, current_day) - - for sub in due_subs: - try: - # Generate report based on scope - report = self._generate_sub_report(sub) - if not report: - continue - - # Send DM to subscriber - user_id = sub["user_id"] - await self._send_sub_dm(user_id, report) - - # Mark as sent - self.subscription_manager.mark_sent(sub["id"]) - logger.info(f"Delivered {sub['sub_type']} report to {user_id}") - - except Exception as e: - logger.error(f"Error delivering subscription {sub['id']}: {e}") - - def _generate_sub_report(self, sub: dict) -> str: - """Generate report content for a subscription.""" - if not self.mesh_reporter: - return None - - sub_type = sub["sub_type"] - scope_type = sub.get("scope_type", "mesh") - scope_value = sub.get("scope_value") - - if scope_type == "region" and scope_value: - # Region-scoped report - region = self.mesh_reporter._find_region(scope_value) - if region: - return self.mesh_reporter.build_region_compact(region.name) - return None - elif scope_type == "node" and scope_value: - # Node-scoped report - return self.mesh_reporter.build_node_compact(scope_value) - else: - # Mesh-wide report - return self.mesh_reporter.build_lora_compact(scope="mesh") - - async def _send_sub_dm(self, node_num: str, message: str) -> None: - """Send a subscription DM to a node.""" - if not self.connector: - return - - # Convert node_num to destination format - try: - dest = int(node_num) - except ValueError: - dest = node_num - - # Send via responder for proper chunking - if self.responder: - await self.responder.send_response( - message, - destination=dest, - channel=0, # DM channel - ) - else: - # Fallback to direct send - self.connector.send_message(message, destination=dest) - - -def setup_logging(verbose: bool = False) -> None: - """Configure logging.""" - level = logging.DEBUG if verbose else logging.INFO - logging.basicConfig( - level=level, - format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - -def main() -> None: - """Main entry point.""" - parser = argparse.ArgumentParser( - description="MeshAI - LLM-powered Meshtastic assistant", - prog="meshai", - ) - parser.add_argument( - "--version", "-V", action="version", version=f"%(prog)s {__version__}" - ) - parser.add_argument( - "--config", "-c", action="store_true", help="Launch configuration tool" - ) - parser.add_argument( - "--config-file", - "-f", - type=Path, - default=Path("config.yaml"), - help="Path to config file (default: config.yaml)", - ) - parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") - - args = parser.parse_args() - - setup_logging(args.verbose) - - # Launch configurator if requested - if args.config: - run_configurator(args.config_file) - return - - # Load config - config = load_config(args.config_file) - - # Check if config exists - if not args.config_file.exists(): - logger.warning(f"Config file not found: {args.config_file}") - logger.info("Run 'meshai --config' to create one, or copy config.example.yaml") - sys.exit(1) - - # Create and run bot - bot = MeshAI(config) - - # Handle signals - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - def signal_handler(sig, frame): - logger.info(f"Received signal {sig}") - loop.create_task(bot.stop()) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - try: - loop.run_until_complete(bot.start()) - except KeyboardInterrupt: - pass - finally: - loop.run_until_complete(bot.stop()) - loop.close() - - -if __name__ == "__main__": - main() +"""Main entry point for MeshAI.""" + +import argparse +import asyncio +import logging +import os +import signal +import sys +import time +from pathlib import Path +from typing import Optional + +from . import __version__ +from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend +from .cli import run_configurator +from .commands import CommandDispatcher +from .commands.dispatcher import create_dispatcher +from .commands.status import set_start_time +from .config import Config, load_config +from .connector import MeshConnector, MeshMessage +from .context import MeshContext +from .history import ConversationHistory +from .memory import ConversationSummary +from .responder import Responder +from .router import MessageRouter, RouteType + +logger = logging.getLogger(__name__) + + +class MeshAI: + """Main application class.""" + + def __init__(self, config: Config): + self.config = config + self.connector: Optional[MeshConnector] = None + self.history: Optional[ConversationHistory] = None + self.dispatcher: Optional[CommandDispatcher] = None + self.llm: Optional[LLMBackend] = None + self.context: Optional[MeshContext] = None + self.meshmonitor_sync = None + self.knowledge = None + self.data_store = None # Replaces source_manager + self.health_engine = None + self.mesh_reporter = None + self.subscription_manager = None + self.alert_engine = None + self.env_store = None # Environmental feeds store + self._last_sub_check: float = 0.0 + self.router: Optional[MessageRouter] = None + self.responder: Optional[Responder] = None + self._running = False + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._last_cleanup: float = 0.0 + self._last_health_compute: float = 0.0 + self.broadcaster = None # Dashboard WebSocket broadcaster + + async def start(self) -> None: + """Start the bot.""" + logger.info(f"Starting MeshAI v{__version__}") + set_start_time(time.time()) + + # Initialize components + await self._init_components() + + # Connect to Meshtastic + self.connector.connect() + self.connector.set_message_callback(self._on_message, asyncio.get_event_loop()) + + # Add own node ID to context ignore list + if self.context and self.connector.my_node_id: + self.context._ignore_nodes.add(self.connector.my_node_id) + + self._running = True + self._loop = asyncio.get_event_loop() + self._last_cleanup = time.time() + self._last_health_compute = 0.0 + + # Write PID file + self._write_pid() + + logger.info("MeshAI started successfully") + + # Keep running + while self._running: + await asyncio.sleep(1) + + # Periodic MeshMonitor refresh + if self.meshmonitor_sync: + self.meshmonitor_sync.maybe_refresh() + + # Periodic data store refresh and health computation + if self.data_store: + refreshed = self.data_store.refresh() + # Recompute health after refresh + if refreshed and self.health_engine: + self.health_engine.compute(self.data_store) + self._last_health_compute = time.time() + + # Broadcast health update to dashboard + if self.broadcaster and self.health_engine.mesh_health: + try: + mh = self.health_engine.mesh_health + health_dict = { + "score": round(mh.score.composite, 1), + "tier": mh.score.tier, + "total_nodes": mh.total_nodes, + "total_regions": mh.total_regions, + "infra_online": mh.score.infra_online, + "infra_total": mh.score.infra_total, + "last_computed": mh.last_computed, + } + await self.broadcaster.broadcast("health_update", health_dict) + except Exception as e: + logger.debug("Dashboard broadcast error: %s", e) + + # Check for alertable conditions + if self.alert_engine: + alerts = self.alert_engine.check() + if alerts: + await self._dispatch_alerts(alerts) + + # Broadcast alerts to dashboard + if self.broadcaster: + for alert in alerts: + try: + await self.broadcaster.broadcast("alert_fired", alert) + except Exception: + pass + + # Environmental feed refresh + if self.env_store: + try: + env_changed = self.env_store.refresh() + if env_changed and self.alert_engine: + env_alerts = self.alert_engine.check_environmental(self.env_store) + if env_alerts: + await self._dispatch_alerts(env_alerts) + if self.broadcaster: + for ea in env_alerts: + await self.broadcaster.broadcast("alert_fired", ea) + + # Broadcast env updates to dashboard + if env_changed and self.broadcaster: + await self.broadcaster.broadcast("env_update", { + "active_count": len(self.env_store.get_active()), + "swpc": self.env_store.get_swpc_status(), + "ducting": self.env_store.get_ducting_status(), + }) + except Exception as e: + logger.debug("Env refresh error: %s", e) + + # Check scheduled subscriptions (every 60 seconds) + if self.subscription_manager and self.mesh_reporter: + if time.time() - self._last_sub_check >= 60: + await self._check_scheduled_subs() + self._last_sub_check = time.time() + + # Periodic cleanup + if time.time() - self._last_cleanup >= 3600: + await self.history.cleanup_expired() + if self.context: + self.context.prune() + self._last_cleanup = time.time() + + async def stop(self) -> None: + """Stop the bot.""" + logger.info("Stopping MeshAI...") + self._running = False + + if self.connector: + self.connector.disconnect() + + if self.history: + await self.history.close() + + if self.llm: + await self.llm.close() + if self.knowledge: + self.knowledge.close() + if self.data_store: + await self.data_store.stop_mqtt_sources() + self.data_store.close() + if self.subscription_manager: + self.subscription_manager.close() + + self._remove_pid() + logger.info("MeshAI stopped") + + async def _init_components(self) -> None: + """Initialize all components.""" + # Conversation history + self.history = ConversationHistory(self.config.history) + await self.history.initialize() + + # LLM backend + api_key = self.config.resolve_api_key() + if not api_key: + logger.warning("No API key configured - LLM responses will fail") + + # Memory config + mem_cfg = self.config.memory + window_size = mem_cfg.window_size if mem_cfg.enabled else 0 + summarize_threshold = mem_cfg.summarize_threshold + + # Create backend + backend = self.config.llm.backend.lower() + if backend == "openai": + self.llm = OpenAIBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + elif backend == "anthropic": + self.llm = AnthropicBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + elif backend == "google": + self.llm = GoogleBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + else: + logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI") + self.llm = OpenAIBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + + # Load persisted summaries into memory cache + await self._load_summaries() + + # Meshtastic connector + self.connector = MeshConnector(self.config.connection) + + # Passive mesh context buffer + ctx_cfg = self.config.context + if ctx_cfg.enabled: + self.context = MeshContext( + observe_channels=ctx_cfg.observe_channels or None, + ignore_nodes=ctx_cfg.ignore_nodes or None, + max_age=ctx_cfg.max_age, + ) + logger.info("Mesh context buffer enabled") + else: + self.context = None + + # MeshMonitor trigger sync + mm_cfg = self.config.meshmonitor + if mm_cfg.enabled and mm_cfg.url: + from .meshmonitor import MeshMonitorSync + self.meshmonitor_sync = MeshMonitorSync( + url=mm_cfg.url, + refresh_interval=mm_cfg.refresh_interval, + ) + count = self.meshmonitor_sync.load() + logger.info(f"MeshMonitor sync enabled, loaded {count} triggers") + else: + self.meshmonitor_sync = None + + # Mesh data store (replaces MeshSourceManager) + # mesh_sources may be dicts or MeshSourceConfig objects depending on config version + enabled_sources = [ + s for s in self.config.mesh_sources + if (s.enabled if hasattr(s, 'enabled') else s.get('enabled', True)) + ] + if enabled_sources: + from .mesh_data_store import MeshDataStore + self.data_store = MeshDataStore( + source_configs=enabled_sources, + db_path="/data/mesh_history.db", + ) + # Initial fetch and backfill + self.data_store.force_refresh() + # Start MQTT source subscription loops + await self.data_store.start_mqtt_sources() + # Log status + for status in self.data_store.get_status(): + if status["is_loaded"]: + logger.info( + f"Mesh source '{status['name']}' ({status['type']}): " + f"{status['node_count']} nodes" + ) + else: + logger.warning( + f"Mesh source '{status['name']}' ({status['type']}): " + f"failed - {status.get('last_error', 'unknown error')}" + ) + else: + self.data_store = None + + # Mesh health engine + mi_cfg = self.config.mesh_intelligence + if mi_cfg.enabled and self.data_store: + from .mesh_health import MeshHealthEngine + self.health_engine = MeshHealthEngine( + regions=mi_cfg.regions, + locality_radius=mi_cfg.locality_radius_miles, + offline_threshold_hours=mi_cfg.offline_threshold_hours, + packet_threshold=mi_cfg.packet_threshold, + battery_warning_percent=mi_cfg.battery_warning_percent, + ) + # Initial health computation + mesh_health = self.health_engine.compute(self.data_store) + self._last_health_compute = time.time() + logger.info( + f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, " + f"{mesh_health.total_regions} regions, " + f"score {mesh_health.score.composite:.0f}/100 ({mesh_health.score.tier})" + ) + else: + self.health_engine = None + + # Mesh reporter (for LLM prompt injection and commands) + if self.health_engine and self.data_store: + from .mesh_reporter import MeshReporter + mi_regions = self.config.mesh_intelligence.regions if self.config.mesh_intelligence else [] + self.mesh_reporter = MeshReporter(self.health_engine, self.data_store, region_configs=mi_regions) + logger.info("Mesh reporter enabled") + else: + self.mesh_reporter = None + + # Subscription manager (uses same db as data_store) + if self.data_store: + from .subscriptions import SubscriptionManager + self.subscription_manager = SubscriptionManager(db_path="/data/mesh_history.db") + logger.info("Subscription manager enabled") + else: + self.subscription_manager = None + + # Alert engine (needs health engine, reporter, and subscription manager) + if self.health_engine and self.mesh_reporter and self.subscription_manager: + from .alert_engine import AlertEngine + mi = self.config.mesh_intelligence + self.alert_engine = AlertEngine( + health_engine=self.health_engine, + reporter=self.mesh_reporter, + subscription_manager=self.subscription_manager, + config=mi, + db_path="/data/mesh_history.db", + timezone=self.config.timezone, + ) + logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})") + + # Environmental feeds + env_cfg = self.config.environmental + if env_cfg.enabled: + from .env.store import EnvironmentalStore + # Pass region anchors for fire proximity calculation + region_anchors = self.config.mesh_intelligence.regions if self.config.mesh_intelligence.enabled else [] + self.env_store = EnvironmentalStore(config=env_cfg, region_anchors=region_anchors) + logger.info(f"Environmental feeds enabled ({len(self.env_store._adapters)} adapters)") + else: + self.env_store = None + + # Knowledge base (optional - Qdrant with SQLite fallback) + kb_cfg = self.config.knowledge + self.knowledge = None + if kb_cfg.enabled: + # Try Qdrant first if configured + if kb_cfg.backend in ("qdrant", "auto") and kb_cfg.qdrant_host: + try: + from .knowledge import QdrantKnowledgeSearch + qdrant = QdrantKnowledgeSearch( + qdrant_host=kb_cfg.qdrant_host, + qdrant_port=kb_cfg.qdrant_port, + collection=kb_cfg.qdrant_collection, + tei_host=kb_cfg.tei_host, + tei_port=kb_cfg.tei_port, + sparse_host=kb_cfg.sparse_host, + sparse_port=kb_cfg.sparse_port, + use_sparse=kb_cfg.use_sparse, + top_k=kb_cfg.top_k, + ) + if qdrant.available: + self.knowledge = qdrant + logger.info("Using Qdrant knowledge backend (RECON hybrid)") + except Exception as e: + logger.warning(f"Qdrant knowledge unavailable: {e}") + + # Fall back to SQLite if Qdrant failed or not configured + if not self.knowledge and kb_cfg.backend in ("sqlite", "auto") and kb_cfg.db_path: + try: + from .knowledge import KnowledgeSearch + self.knowledge = KnowledgeSearch( + db_path=kb_cfg.db_path, + top_k=kb_cfg.top_k, + ) + except ImportError as e: + logger.warning(f"SQLite knowledge disabled - missing dependencies: {e}") + + # Command dispatcher (needs mesh_reporter for health commands) + self.dispatcher = create_dispatcher( + prefix=self.config.commands.prefix, + disabled_commands=self.config.commands.disabled_commands, + custom_commands=self.config.commands.custom_commands, + mesh_reporter=self.mesh_reporter, + data_store=self.data_store, + health_engine=self.health_engine, + subscription_manager=self.subscription_manager, + env_store=self.env_store, + ) + + # Message router + self.router = MessageRouter( + self.config, self.connector, self.history, self.dispatcher, self.llm, + context=self.context, + meshmonitor_sync=self.meshmonitor_sync, + knowledge=self.knowledge, + source_manager=self.data_store, + health_engine=self.health_engine, + mesh_reporter=self.mesh_reporter, + env_store=self.env_store, + ) + + # Responder + self.responder = Responder(self.config.response, self.connector) + + # Dashboard + if hasattr(self.config, 'dashboard') and self.config.dashboard.enabled: + try: + from .dashboard.server import start_dashboard + self.broadcaster = await start_dashboard(self) + logger.info("Dashboard started on port %d", self.config.dashboard.port) + except Exception as e: + logger.warning("Dashboard failed to start: %s", e) + self.broadcaster = None + else: + self.broadcaster = None + + async def _on_message(self, message: MeshMessage) -> None: + """Handle incoming message.""" + try: + # Passively observe channel broadcasts for context (before filtering) + if self.context and not message.is_dm and message.text: + self.context.observe( + sender_name=message.sender_name, + sender_id=message.sender_id, + text=message.text, + channel=message.channel, + is_dm=False, + ) + + # Check if we should respond + if not self.router.should_respond(message): + return + + logger.info( + f"Processing message from {message.sender_name} ({message.sender_id}): " + f"{message.text[:50]}..." + ) + + # Route the message + # Check for continuation request first + continuation_messages = self.router.check_continuation(message) + if continuation_messages: + await self.responder.send_response( + continuation_messages, + destination=message.sender_id, + channel=message.channel, + ) + return + + result = await self.router.route(message) + + if result.route_type == RouteType.IGNORE: + return + + # Determine response + if result.route_type == RouteType.COMMAND: + if isinstance(result.response, list): + # Command returned pre-split messages — send directly + messages = result.response + else: + # Single string — chunk it + from .chunker import chunk_response + messages, remaining = chunk_response( + result.response, + max_chars=self.config.response.max_length, + max_messages=self.config.response.max_messages, + ) + if remaining: + self.router.continuations.store(message.sender_id, remaining) + elif result.route_type == RouteType.LLM: + messages = await self.router.generate_llm_response(message, result.query) + else: + return + + if not messages: + return + + # Send DM response + await self.responder.send_response( + messages, + destination=message.sender_id, + channel=message.channel, + ) + + except Exception as e: + logger.error(f"Error handling message: {e}", exc_info=True) + + async def _load_summaries(self) -> None: + """Load persisted summaries from database into memory cache.""" + memory = self.llm.get_memory() + if not memory: + return + + if not self.history or not self.history._db: + return + + try: + async with self.history._lock: + cursor = await self.history._db.execute( + "SELECT user_id, summary, message_count, updated_at " + "FROM conversation_summaries" + ) + rows = await cursor.fetchall() + + loaded = 0 + for row in rows: + user_id, summary_text, message_count, updated_at = row + summary = ConversationSummary( + summary=summary_text, + last_updated=updated_at, + message_count=message_count, + ) + memory.load_summary(user_id, summary) + loaded += 1 + + if loaded: + logger.info(f"Loaded {loaded} conversation summaries from database") + + except Exception as e: + logger.warning(f"Failed to load summaries from database: {e}") + + def _write_pid(self) -> None: + """Write PID file.""" + pid_file = Path("/tmp/meshai.pid") + pid_file.write_text(str(os.getpid())) + + def _remove_pid(self) -> None: + """Remove PID file.""" + pid_file = Path("/tmp/meshai.pid") + if pid_file.exists(): + pid_file.unlink() + + async def _dispatch_alerts(self, alerts: list[dict]) -> None: + """Dispatch alerts to subscribers and alert channel.""" + mi = self.config.mesh_intelligence + alert_channel = getattr(mi, 'alert_channel', -1) + + for alert in alerts: + message = alert["message"] + logger.info(f"ALERT: {message}") + + # Send to alert channel if configured + if alert_channel >= 0 and self.connector: + try: + self.connector.send_message( + text=message, + destination=None, # Broadcast + channel=alert_channel, + ) + logger.info(f"Alert sent to channel {alert_channel}") + except Exception as e: + logger.error(f"Failed to send channel alert: {e}") + + # Send DMs to matching subscribers + if self.alert_engine and self.subscription_manager: + subscribers = self.alert_engine.get_subscribers_for_alert(alert) + for sub in subscribers: + user_id = sub["user_id"] + try: + await self._send_sub_dm(user_id, message) + logger.info(f"Alert DM sent to {user_id}: {alert['type']}") + except Exception as e: + logger.error(f"Failed to send alert DM to {user_id}: {e}") + + self.alert_engine.clear_pending() + + async def _check_scheduled_subs(self) -> None: + """Check for and deliver due scheduled reports.""" + from datetime import datetime + from zoneinfo import ZoneInfo + + tz = ZoneInfo(self.config.timezone) + now = datetime.now(tz) + current_hhmm = now.strftime("%H%M") + current_day = now.strftime("%a").lower() + + due_subs = self.subscription_manager.get_due_subscriptions(current_hhmm, current_day) + + for sub in due_subs: + try: + # Generate report based on scope + report = self._generate_sub_report(sub) + if not report: + continue + + # Send DM to subscriber + user_id = sub["user_id"] + await self._send_sub_dm(user_id, report) + + # Mark as sent + self.subscription_manager.mark_sent(sub["id"]) + logger.info(f"Delivered {sub['sub_type']} report to {user_id}") + + except Exception as e: + logger.error(f"Error delivering subscription {sub['id']}: {e}") + + def _generate_sub_report(self, sub: dict) -> str: + """Generate report content for a subscription.""" + if not self.mesh_reporter: + return None + + sub_type = sub["sub_type"] + scope_type = sub.get("scope_type", "mesh") + scope_value = sub.get("scope_value") + + if scope_type == "region" and scope_value: + # Region-scoped report + region = self.mesh_reporter._find_region(scope_value) + if region: + return self.mesh_reporter.build_region_compact(region.name) + return None + elif scope_type == "node" and scope_value: + # Node-scoped report + return self.mesh_reporter.build_node_compact(scope_value) + else: + # Mesh-wide report + return self.mesh_reporter.build_lora_compact(scope="mesh") + + async def _send_sub_dm(self, node_num: str, message: str) -> None: + """Send a subscription DM to a node.""" + if not self.connector: + return + + # Convert node_num to destination format + try: + dest = int(node_num) + except ValueError: + dest = node_num + + # Send via responder for proper chunking + if self.responder: + await self.responder.send_response( + message, + destination=dest, + channel=0, # DM channel + ) + else: + # Fallback to direct send + self.connector.send_message(message, destination=dest) + + +def setup_logging(verbose: bool = False) -> None: + """Configure logging.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + +def main() -> None: + """Main entry point.""" + parser = argparse.ArgumentParser( + description="MeshAI - LLM-powered Meshtastic assistant", + prog="meshai", + ) + parser.add_argument( + "--version", "-V", action="version", version=f"%(prog)s {__version__}" + ) + parser.add_argument( + "--config", "-c", action="store_true", help="Launch configuration tool" + ) + parser.add_argument( + "--config-file", + "-f", + type=Path, + default=Path("config.yaml"), + help="Path to config file (default: config.yaml)", + ) + parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + setup_logging(args.verbose) + + # Launch configurator if requested + if args.config: + run_configurator(args.config_file) + return + + # Load config + config = load_config(args.config_file) + + # Check if config exists + if not args.config_file.exists(): + logger.warning(f"Config file not found: {args.config_file}") + logger.info("Run 'meshai --config' to create one, or copy config.example.yaml") + sys.exit(1) + + # Create and run bot + bot = MeshAI(config) + + # Handle signals + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}") + loop.create_task(bot.stop()) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + loop.run_until_complete(bot.start()) + except KeyboardInterrupt: + pass + finally: + loop.run_until_complete(bot.stop()) + loop.close() + + +if __name__ == "__main__": + main() diff --git a/meshai/mesh_data_store.py b/meshai/mesh_data_store.py index 51436a7..ac01870 100644 --- a/meshai/mesh_data_store.py +++ b/meshai/mesh_data_store.py @@ -27,6 +27,7 @@ from .mesh_models import ( ) from .sources.meshmonitor_data import MeshMonitorDataSource from .sources.meshview import MeshviewSource +from .sources.mqtt_source import MQTTSource logger = logging.getLogger(__name__) @@ -236,7 +237,7 @@ class MeshDataStore: source_configs: List of source configurations db_path: Path to SQLite database for historical data """ - self._sources: dict[str, MeshviewSource | MeshMonitorDataSource] = {} + self._sources: dict[str, MeshviewSource | MeshMonitorDataSource | MQTTSource] = {} self._db_path = db_path self._db: Optional[sqlite3.Connection] = None @@ -316,6 +317,42 @@ class MeshDataStore: ) logger.info(f"Registered MeshMonitor source '{name}' -> {url} (polite={polite})") + elif src_type == "mqtt": + # Extract MQTT-specific config + if isinstance(cfg, dict): + host = cfg.get('host', '') + port = cfg.get('port', 1883) + username = cfg.get('username', '') + password = cfg.get('password', '') + topic_root = cfg.get('topic_root', 'msh/US') + use_tls = cfg.get('use_tls', False) + else: + host = getattr(cfg, 'host', '') + port = getattr(cfg, 'port', 1883) + username = getattr(cfg, 'username', '') + password = getattr(cfg, 'password', '') + topic_root = getattr(cfg, 'topic_root', 'msh/US') + use_tls = getattr(cfg, 'use_tls', False) + + if not host: + logger.warning(f"MQTT source '{name}' missing host, skipping") + return + + self._sources[name] = MQTTSource( + host=host, + port=port, + username=username, + password=password, + topic_root=topic_root, + use_tls=use_tls, + name=name, + ) + # Track MQTT sources separately for async start + if not hasattr(self, '_mqtt_sources'): + self._mqtt_sources = [] + self._mqtt_sources.append(name) + logger.info(f"Registered MQTT source '{name}' -> {host}:{port} topic={topic_root}") + else: logger.warning(f"Unknown source type '{src_type}' for '{name}'") @@ -359,6 +396,24 @@ class MeshDataStore: # ========================================================================= + async def start_mqtt_sources(self) -> None: + """Start all MQTT source subscription loops.""" + if not hasattr(self, '_mqtt_sources'): + return + for name in self._mqtt_sources: + source = self._sources.get(name) + if source and hasattr(source, 'start'): + await source.start() + + async def stop_mqtt_sources(self) -> None: + """Stop all MQTT source subscription loops.""" + if not hasattr(self, '_mqtt_sources'): + return + for name in self._mqtt_sources: + source = self._sources.get(name) + if source and hasattr(source, 'stop'): + await source.stop() + def _purge_stale_nodes(self): """Remove nodes not heard from in more than 7 days. @@ -2120,11 +2175,19 @@ class MeshDataStore: """Get status of all sources.""" status_list = [] for name, source in self._sources.items(): + # Determine source type + if isinstance(source, MeshviewSource): + src_type = "meshview" + elif isinstance(source, MeshMonitorDataSource): + src_type = "meshmonitor" + elif isinstance(source, MQTTSource): + src_type = "mqtt" + else: + src_type = "unknown" + status = { "name": name, - "type": "meshview" - if isinstance(source, MeshviewSource) - else "meshmonitor", + "type": src_type, "enabled": True, "is_loaded": source.is_loaded, "last_refresh": source.last_refresh, @@ -2138,6 +2201,14 @@ class MeshDataStore: status["telemetry_count"] = len(source.telemetry) status["traceroute_count"] = len(source.traceroutes) status["channel_count"] = len(source.channels) + elif isinstance(source, MQTTSource): + health = source.health_status + status["is_connected"] = health.get("is_connected", False) + status["message_count"] = health.get("message_count", 0) + status["last_message"] = health.get("last_message", 0) + status["host"] = health.get("host", "") + status["port"] = health.get("port", 0) + status["topic_root"] = health.get("topic_root", "") status_list.append(status) diff --git a/meshai/sources/mqtt_source.py b/meshai/sources/mqtt_source.py new file mode 100644 index 0000000..b983e47 --- /dev/null +++ b/meshai/sources/mqtt_source.py @@ -0,0 +1,435 @@ +"""MQTT source adapter for Meshtastic broker subscriptions. + +Push-based source that subscribes to MQTT topics and decodes +ServiceEnvelope-wrapped MeshPackets. Provides live node/packet +data without polling. +""" + +import asyncio +import logging +import os +import time +from dataclasses import dataclass, field +from typing import Optional + +logger = logging.getLogger(__name__) + +# Port number to name mapping (from portnums_pb2) +PORTNUM_NAMES = { + 0: "UNKNOWN_APP", + 1: "TEXT_MESSAGE_APP", + 2: "REMOTE_HARDWARE_APP", + 3: "POSITION_APP", + 4: "NODEINFO_APP", + 5: "ROUTING_APP", + 6: "ADMIN_APP", + 7: "TEXT_MESSAGE_COMPRESSED_APP", + 8: "WAYPOINT_APP", + 9: "AUDIO_APP", + 10: "DETECTION_SENSOR_APP", + 11: "ALERT_APP", + 32: "REPLY_APP", + 33: "IP_TUNNEL_APP", + 34: "PAXCOUNTER_APP", + 64: "SERIAL_APP", + 65: "STORE_FORWARD_APP", + 66: "RANGE_TEST_APP", + 67: "TELEMETRY_APP", + 68: "ZPS_APP", + 69: "SIMULATOR_APP", + 70: "TRACEROUTE_APP", + 71: "NEIGHBORINFO_APP", + 72: "ATAK_PLUGIN", + 73: "MAP_REPORT_APP", + 74: "POWERSTRESS_APP", + 256: "PRIVATE_APP", + 257: "ATAK_FORWARDER", +} + + +@dataclass +class MQTTNodeInfo: + """Cached node info from MQTT.""" + + node_num: int + node_id_hex: str = "" + short_name: str = "" + long_name: str = "" + hw_model: str = "" + role: int = 0 + latitude: Optional[float] = None + longitude: Optional[float] = None + altitude: Optional[float] = None + last_heard: float = 0.0 + battery_percent: Optional[float] = None + voltage: Optional[float] = None + channel_utilization: Optional[float] = None + air_util_tx: Optional[float] = None + snr: Optional[float] = None + rssi: Optional[int] = None + via_mqtt: bool = True + + +@dataclass +class MQTTPacketInfo: + """Packet received from MQTT.""" + + packet_id: int + from_node: int + to_node: int + portnum: int + portnum_name: str + channel: int + timestamp: float + snr: Optional[float] = None + rssi: Optional[int] = None + hop_limit: Optional[int] = None + hop_start: Optional[int] = None + payload_size: int = 0 + gateway_id: str = "" + + +class MQTTSource: + """MQTT source adapter subscribing to Meshtastic broker topics. + + Maintains a subscription loop that processes ServiceEnvelope messages + and updates node/packet caches. Unlike poll-based sources, this is + push-based and receives data as it arrives. + """ + + def __init__( + self, + host: str, + port: int = 1883, + username: str = "", + password: str = "", + topic_root: str = "msh/US", + use_tls: bool = False, + name: str = "mqtt", + ): + """Initialize MQTT source. + + Args: + host: MQTT broker hostname + port: MQTT broker port (1883 for plain, 8883 for TLS) + username: MQTT username (optional) + password: MQTT password (optional, supports ${ENV_VAR}) + topic_root: Topic root to subscribe to (default: msh/US) + use_tls: Enable TLS for connection + name: Source name for logging/attribution + """ + self._host = host + self._port = port + self._username = username + self._password = self._resolve_env(password) + self._topic_root = topic_root.rstrip("/") + self._use_tls = use_tls + self._name = name + + # State + self._nodes: dict[int, MQTTNodeInfo] = {} + self._packets: list[MQTTPacketInfo] = [] + self._max_packets = 1000 # Ring buffer + self._is_connected: bool = False + self._is_loaded: bool = False + self._last_message: float = 0.0 + self._last_error: str = "" + self._message_count: int = 0 + self._data_changed: bool = False + + # Subscription task + self._task: Optional[asyncio.Task] = None + self._stop_event: Optional[asyncio.Event] = None + + # Retry settings + self._retry_delay = 5 # Initial retry delay + self._max_retry_delay = 300 # Max 5 minutes between retries + + def _resolve_env(self, value: str) -> str: + """Resolve ${ENV_VAR} references in value.""" + if value and value.startswith("${") and value.endswith("}"): + env_var = value[2:-1] + return os.environ.get(env_var, "") + return value + + @property + def nodes(self) -> dict[int, MQTTNodeInfo]: + """Return cached nodes.""" + return self._nodes + + @property + def packets(self) -> list[dict]: + """Return packets as dicts for compatibility.""" + return [ + { + "packet_id": p.packet_id, + "from_node": p.from_node, + "to_node": p.to_node, + "portnum": p.portnum, + "portnum_name": p.portnum_name, + "channel": p.channel, + "timestamp": p.timestamp, + "snr": p.snr, + "rssi": p.rssi, + "hop_limit": p.hop_limit, + "hop_start": p.hop_start, + "payload_size": p.payload_size, + "gateway_id": p.gateway_id, + } + for p in self._packets + ] + + @property + def is_loaded(self) -> bool: + """Return True if we have received any data.""" + return self._is_loaded + + @property + def data_changed(self) -> bool: + """Return True if data changed since last check, then reset.""" + changed = self._data_changed + self._data_changed = False + return changed + + @property + def health_status(self) -> dict: + """Return health status for dashboard.""" + return { + "name": self._name, + "type": "mqtt", + "host": self._host, + "port": self._port, + "topic_root": self._topic_root, + "is_connected": self._is_connected, + "is_loaded": self._is_loaded, + "last_message": self._last_message, + "last_error": self._last_error, + "message_count": self._message_count, + "node_count": len(self._nodes), + "packet_count": len(self._packets), + } + + async def start(self) -> None: + """Start the subscription loop.""" + if self._task is not None: + logger.warning(f"MQTT source '{self._name}' already started") + return + + self._stop_event = asyncio.Event() + self._task = asyncio.create_task(self._subscription_loop()) + logger.info(f"Started MQTT source '{self._name}' -> {self._host}:{self._port}") + + async def stop(self) -> None: + """Stop the subscription loop.""" + if self._stop_event: + self._stop_event.set() + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._is_connected = False + logger.info(f"Stopped MQTT source '{self._name}'") + + async def _subscription_loop(self) -> None: + """Main subscription loop with reconnection logic.""" + try: + import aiomqtt + except ImportError: + logger.error("aiomqtt not installed. Run: pip install aiomqtt") + self._last_error = "aiomqtt not installed" + return + + retry_delay = self._retry_delay + + while not self._stop_event.is_set(): + try: + # Build connection kwargs + kwargs = { + "hostname": self._host, + "port": self._port, + } + if self._username: + kwargs["username"] = self._username + if self._password: + kwargs["password"] = self._password + + # TLS setup + if self._use_tls: + import ssl + tls_context = ssl.create_default_context() + kwargs["tls_context"] = tls_context + + async with aiomqtt.Client(**kwargs) as client: + self._is_connected = True + self._last_error = "" + retry_delay = self._retry_delay # Reset on successful connect + logger.info(f"MQTT '{self._name}' connected to {self._host}:{self._port}") + + # Subscribe to all topics under root + # Meshtastic uses: msh/{region}/{channel}/json/{node_id} + # and: msh/{region}/{channel}/!{node_id} + topic = f"{self._topic_root}/#" + await client.subscribe(topic) + logger.info(f"MQTT '{self._name}' subscribed to {topic}") + + async for message in client.messages: + if self._stop_event.is_set(): + break + await self._process_message(message) + + except asyncio.CancelledError: + break + except Exception as e: + self._is_connected = False + self._last_error = str(e) + logger.warning(f"MQTT '{self._name}' error: {e}. Retrying in {retry_delay}s") + + # Exponential backoff + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, self._max_retry_delay) + + async def _process_message(self, message) -> None: + """Process an incoming MQTT message.""" + try: + topic = str(message.topic) + payload = message.payload + + # Skip JSON topics (we want binary ServiceEnvelope) + if "/json/" in topic: + return + + # Skip map reports (stat/ or map/ topics) + if "/stat/" in topic or "/map/" in topic: + return + + # Parse ServiceEnvelope + from meshtastic.protobuf import mqtt_pb2 + + envelope = mqtt_pb2.ServiceEnvelope() + envelope.ParseFromString(payload) + + if not envelope.packet: + return + + packet = envelope.packet + gateway_id = envelope.gateway_id or "" + channel_id = envelope.channel_id or "" + + # Update stats + self._last_message = time.time() + self._message_count += 1 + self._is_loaded = True + self._data_changed = True + + # Extract packet info + pkt_info = MQTTPacketInfo( + packet_id=packet.id, + from_node=packet.from_, + to_node=packet.to, + portnum=packet.decoded.portnum if packet.HasField("decoded") else 0, + portnum_name=PORTNUM_NAMES.get( + packet.decoded.portnum if packet.HasField("decoded") else 0, + "UNKNOWN" + ), + channel=packet.channel, + timestamp=time.time(), + snr=packet.rx_snr if packet.rx_snr else None, + rssi=packet.rx_rssi if packet.rx_rssi else None, + hop_limit=packet.hop_limit if packet.hop_limit else None, + hop_start=packet.hop_start if packet.hop_start else None, + payload_size=len(packet.decoded.payload) if packet.HasField("decoded") else 0, + gateway_id=gateway_id, + ) + + # Add to packet ring buffer + self._packets.append(pkt_info) + if len(self._packets) > self._max_packets: + self._packets = self._packets[-self._max_packets:] + + # Process decoded payload by portnum + if packet.HasField("decoded"): + await self._process_decoded(packet, gateway_id) + + except Exception as e: + logger.debug(f"MQTT message parse error: {e}") + + async def _process_decoded(self, packet, gateway_id: str) -> None: + """Process decoded packet payload.""" + decoded = packet.decoded + portnum = decoded.portnum + from_node = packet.from_ + + # Ensure node exists in cache + if from_node not in self._nodes: + self._nodes[from_node] = MQTTNodeInfo( + node_num=from_node, + node_id_hex=f"!{from_node:08x}", + ) + + node = self._nodes[from_node] + node.last_heard = time.time() + node.snr = packet.rx_snr if packet.rx_snr else node.snr + node.rssi = packet.rx_rssi if packet.rx_rssi else node.rssi + + # NODEINFO_APP (4) + if portnum == 4: + from meshtastic.protobuf import mesh_pb2 + user = mesh_pb2.User() + try: + user.ParseFromString(decoded.payload) + node.short_name = user.short_name or node.short_name + node.long_name = user.long_name or node.long_name + node.hw_model = mesh_pb2.HardwareModel.Name(user.hw_model) if user.hw_model else "" + node.role = user.role + except Exception: + pass + + # POSITION_APP (3) + elif portnum == 3: + from meshtastic.protobuf import mesh_pb2 + pos = mesh_pb2.Position() + try: + pos.ParseFromString(decoded.payload) + if pos.latitude_i: + node.latitude = pos.latitude_i * 1e-7 + if pos.longitude_i: + node.longitude = pos.longitude_i * 1e-7 + if pos.altitude: + node.altitude = pos.altitude + except Exception: + pass + + # TELEMETRY_APP (67) + elif portnum == 67: + from meshtastic.protobuf import telemetry_pb2 + telem = telemetry_pb2.Telemetry() + try: + telem.ParseFromString(decoded.payload) + if telem.HasField("device_metrics"): + dm = telem.device_metrics + if dm.battery_level and dm.battery_level <= 100: + node.battery_percent = dm.battery_level + if dm.voltage: + node.voltage = dm.voltage + if dm.channel_utilization: + node.channel_utilization = dm.channel_utilization + if dm.air_util_tx: + node.air_util_tx = dm.air_util_tx + except Exception: + pass + + # Compatibility methods for MeshDataStore integration + + def tick(self) -> Optional[str]: + """Tick method for compatibility. MQTT is push-based, not polled. + + Returns None since we do not poll endpoints. + """ + return None + + def maybe_refresh(self) -> bool: + """Check if data changed (for legacy compatibility).""" + return self.data_changed diff --git a/pyproject.toml b/pyproject.toml index f528c50..6bdd4b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "httpx>=0.25.0", "fastapi>=0.110.0", "uvicorn[standard]>=0.27.0", + "aiomqtt>=2.0.0", ] [project.optional-dependencies] diff --git a/requirements.txt b/requirements.txt index 79e572e..b992642 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ sqlite-vec>=0.1.0 numpy fastapi>=0.110.0 uvicorn[standard]>=0.27.0 +aiomqtt>=2.0.0