diff --git a/meshai/commands/dispatcher.py b/meshai/commands/dispatcher.py index 0d56fd6..4db50bd 100644 --- a/meshai/commands/dispatcher.py +++ b/meshai/commands/dispatcher.py @@ -157,6 +157,7 @@ def create_dispatcher( prefix: str = "!", disabled_commands: Optional[list[str]] = None, custom_commands: Optional[dict] = None, + mesh_reporter=None, ) -> CommandDispatcher: """Create and populate command dispatcher with default commands. @@ -164,6 +165,7 @@ def create_dispatcher( prefix: Command prefix (default: "!") disabled_commands: List of command names to disable custom_commands: Dict of name -> response for custom commands + mesh_reporter: MeshReporter instance for health commands Returns: Configured CommandDispatcher @@ -174,6 +176,7 @@ def create_dispatcher( from .reset import ResetCommand from .status import StatusCommand from .weather import WeatherCommand + from .health import HealthCommand, RegionCommand dispatcher = CommandDispatcher(prefix=prefix, disabled_commands=disabled_commands) @@ -185,6 +188,23 @@ def create_dispatcher( dispatcher.register(StatusCommand()) dispatcher.register(WeatherCommand()) + # Register mesh health commands + health_cmd = HealthCommand(mesh_reporter) + dispatcher.register(health_cmd) + # Register aliases for health command + for alias in getattr(health_cmd, 'aliases', []): + alias_handler = HealthCommand(mesh_reporter) + alias_handler.name = alias + dispatcher.register(alias_handler) + + region_cmd = RegionCommand(mesh_reporter) + dispatcher.register(region_cmd) + # Register aliases for region command + for alias in getattr(region_cmd, 'aliases', []): + alias_handler = RegionCommand(mesh_reporter) + alias_handler.name = alias + dispatcher.register(alias_handler) + # Register custom commands if custom_commands: for name, response in custom_commands.items(): diff --git a/meshai/commands/health.py b/meshai/commands/health.py new file mode 100644 index 0000000..588ba74 --- /dev/null +++ b/meshai/commands/health.py @@ -0,0 +1,58 @@ +"""Health and region commands for mesh status.""" + +from .base import CommandContext, CommandHandler + + +class HealthCommand(CommandHandler): + """Quick mesh health summary.""" + + name = "health" + description = "Show mesh health summary" + usage = "!health" + aliases = ["mesh", "status"] + + def __init__(self, mesh_reporter=None): + """Initialize with optional mesh reporter. + + Args: + mesh_reporter: MeshReporter instance for health data + """ + self._mesh_reporter = mesh_reporter + + async def execute(self, args: str, context: CommandContext) -> str: + """Return compact mesh health summary.""" + if not self._mesh_reporter: + return "Mesh health not available." + + return self._mesh_reporter.build_lora_compact("mesh") + + +class RegionCommand(CommandHandler): + """Region health information.""" + + name = "region" + description = "Show region health info" + usage = "!region [name]" + aliases = ["reg"] + + def __init__(self, mesh_reporter=None): + """Initialize with optional mesh reporter. + + Args: + mesh_reporter: MeshReporter instance for health data + """ + self._mesh_reporter = mesh_reporter + + async def execute(self, args: str, context: CommandContext) -> str: + """Return region health info.""" + if not self._mesh_reporter: + return "Mesh health not available." + + args = args.strip() + + if not args: + # List all regions + return self._mesh_reporter.list_regions_compact() + + # Get specific region detail (compact for LoRa) + return self._mesh_reporter.build_lora_compact("region", args) diff --git a/meshai/main.py b/meshai/main.py index 71629a5..89728d5 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -1,439 +1,450 @@ -"""Main entry point for MeshAI.""" - -import argparse -import asyncio -import logging -import os -import signal -import sys -import time -from pathlib import Path -from typing import Optional - -from . import __version__ -from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend -from .cli import run_configurator -from .commands import CommandDispatcher -from .commands.dispatcher import create_dispatcher -from .commands.status import set_start_time -from .config import Config, load_config -from .connector import MeshConnector, MeshMessage -from .context import MeshContext -from .history import ConversationHistory -from .memory import ConversationSummary -from .responder import Responder -from .router import MessageRouter, RouteType - -logger = logging.getLogger(__name__) - - -class MeshAI: - """Main application class.""" - - def __init__(self, config: Config): - self.config = config - self.connector: Optional[MeshConnector] = None - self.history: Optional[ConversationHistory] = None - self.dispatcher: Optional[CommandDispatcher] = None - self.llm: Optional[LLMBackend] = None - self.context: Optional[MeshContext] = None - self.meshmonitor_sync = None - self.knowledge = None - self.source_manager = None - self.health_engine = None - self.router: Optional[MessageRouter] = None - self.responder: Optional[Responder] = None - self._running = False - self._loop: Optional[asyncio.AbstractEventLoop] = None - self._last_cleanup: float = 0.0 - self._last_health_compute: float = 0.0 - - async def start(self) -> None: - """Start the bot.""" - logger.info(f"Starting MeshAI v{__version__}") - set_start_time(time.time()) - - # Initialize components - await self._init_components() - - # Connect to Meshtastic - self.connector.connect() - self.connector.set_message_callback(self._on_message, asyncio.get_event_loop()) - - # Add own node ID to context ignore list - if self.context and self.connector.my_node_id: - self.context._ignore_nodes.add(self.connector.my_node_id) - - self._running = True - self._loop = asyncio.get_event_loop() - self._last_cleanup = time.time() - self._last_health_compute = 0.0 - - # Write PID file - self._write_pid() - - logger.info("MeshAI started successfully") - - # Keep running - while self._running: - await asyncio.sleep(1) - - # Periodic MeshMonitor refresh - if self.meshmonitor_sync: - self.meshmonitor_sync.maybe_refresh() - - # Periodic mesh source refresh and health computation - if self.source_manager: - refreshed = self.source_manager.refresh_all() - # Recompute health after source refresh - if refreshed > 0 and self.health_engine: - self.health_engine.compute(self.source_manager) - self._last_health_compute = time.time() - - # Periodic cleanup - if time.time() - self._last_cleanup >= 3600: - await self.history.cleanup_expired() - if self.context: - self.context.prune() - self._last_cleanup = time.time() - - async def stop(self) -> None: - """Stop the bot.""" - logger.info("Stopping MeshAI...") - self._running = False - - if self.connector: - self.connector.disconnect() - - if self.history: - await self.history.close() - - if self.llm: - await self.llm.close() - if self.knowledge: - self.knowledge.close() - - self._remove_pid() - logger.info("MeshAI stopped") - - async def _init_components(self) -> None: - """Initialize all components.""" - # Conversation history - self.history = ConversationHistory(self.config.history) - await self.history.initialize() - - # Command dispatcher - self.dispatcher = create_dispatcher( - prefix=self.config.commands.prefix, - disabled_commands=self.config.commands.disabled_commands, - custom_commands=self.config.commands.custom_commands, - ) - - # LLM backend - api_key = self.config.resolve_api_key() - if not api_key: - logger.warning("No API key configured - LLM responses will fail") - - # Memory config - mem_cfg = self.config.memory - window_size = mem_cfg.window_size if mem_cfg.enabled else 0 - summarize_threshold = mem_cfg.summarize_threshold - - # Create backend - backend = self.config.llm.backend.lower() - if backend == "openai": - self.llm = OpenAIBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - elif backend == "anthropic": - self.llm = AnthropicBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - elif backend == "google": - self.llm = GoogleBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - else: - logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI") - self.llm = OpenAIBackend( - self.config.llm, api_key, window_size, summarize_threshold - ) - - # Load persisted summaries into memory cache - await self._load_summaries() - - # Meshtastic connector - self.connector = MeshConnector(self.config.connection) - - # Passive mesh context buffer - ctx_cfg = self.config.context - if ctx_cfg.enabled: - self.context = MeshContext( - observe_channels=ctx_cfg.observe_channels or None, - ignore_nodes=ctx_cfg.ignore_nodes or None, - max_age=ctx_cfg.max_age, - ) - logger.info("Mesh context buffer enabled") - else: - self.context = None - - # MeshMonitor trigger sync - mm_cfg = self.config.meshmonitor - if mm_cfg.enabled and mm_cfg.url: - from .meshmonitor import MeshMonitorSync - self.meshmonitor_sync = MeshMonitorSync( - url=mm_cfg.url, - refresh_interval=mm_cfg.refresh_interval, - ) - count = self.meshmonitor_sync.load() - logger.info(f"MeshMonitor sync enabled, loaded {count} triggers") - else: - self.meshmonitor_sync = None - - # Mesh data sources - enabled_sources = [s for s in self.config.mesh_sources if s.enabled] - if enabled_sources: - from .mesh_sources import MeshSourceManager - self.source_manager = MeshSourceManager(enabled_sources) - # Initial fetch - self.source_manager.refresh_all() - # Log status - for status in self.source_manager.get_status(): - if status["is_loaded"]: - logger.info( - f"Mesh source '{status['name']}' ({status['type']}): " - f"{status['node_count']} nodes" - ) - else: - logger.warning( - f"Mesh source '{status['name']}' ({status['type']}): " - f"failed - {status.get('last_error', 'unknown error')}" - ) - else: - self.source_manager = None - - # Mesh health engine - mi_cfg = self.config.mesh_intelligence - if mi_cfg.enabled and self.source_manager: - from .mesh_health import MeshHealthEngine - self.health_engine = MeshHealthEngine( - regions=mi_cfg.regions, - locality_radius=mi_cfg.locality_radius_miles, - offline_threshold_hours=mi_cfg.offline_threshold_hours, - packet_threshold=mi_cfg.packet_threshold, - battery_warning_percent=mi_cfg.battery_warning_percent, - ) - # Initial health computation - mesh_health = self.health_engine.compute(self.source_manager) - self._last_health_compute = time.time() - logger.info( - f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, " - f"{mesh_health.total_regions} regions, " - f"score {mesh_health.score.composite:.0f}/100 ({mesh_health.score.tier})" - ) - else: - self.health_engine = None - - # Knowledge base - kb_cfg = self.config.knowledge - if kb_cfg.enabled and kb_cfg.db_path: - from .knowledge import KnowledgeSearch - self.knowledge = KnowledgeSearch( - db_path=kb_cfg.db_path, - top_k=kb_cfg.top_k, - ) - else: - self.knowledge = None - - # Message router - self.router = MessageRouter( - self.config, self.connector, self.history, self.dispatcher, self.llm, - context=self.context, - meshmonitor_sync=self.meshmonitor_sync, - knowledge=self.knowledge, - source_manager=self.source_manager, - health_engine=self.health_engine, - ) - - # Responder - self.responder = Responder(self.config.response, self.connector) - - async def _on_message(self, message: MeshMessage) -> None: - """Handle incoming message.""" - try: - # Passively observe channel broadcasts for context (before filtering) - if self.context and not message.is_dm and message.text: - self.context.observe( - sender_name=message.sender_name, - sender_id=message.sender_id, - text=message.text, - channel=message.channel, - is_dm=False, - ) - - # Check if we should respond - if not self.router.should_respond(message): - return - - logger.info( - f"Processing message from {message.sender_name} ({message.sender_id}): " - f"{message.text[:50]}..." - ) - - # Route the message - # Check for continuation request first - continuation_messages = self.router.check_continuation(message) - if continuation_messages: - await self.responder.send_response( - continuation_messages, - destination=message.sender_id, - channel=message.channel, - ) - return - - result = await self.router.route(message) - - if result.route_type == RouteType.IGNORE: - return - - # Determine response - if result.route_type == RouteType.COMMAND: - messages = result.response # Commands return single string - elif result.route_type == RouteType.LLM: - messages = await self.router.generate_llm_response(message, result.query) - else: - return - - if not messages: - return - - # Send DM response - await self.responder.send_response( - messages, - destination=message.sender_id, - channel=message.channel, - ) - - except Exception as e: - logger.error(f"Error handling message: {e}", exc_info=True) - - async def _load_summaries(self) -> None: - """Load persisted summaries from database into memory cache.""" - memory = self.llm.get_memory() - if not memory: - return - - if not self.history or not self.history._db: - return - - try: - async with self.history._lock: - cursor = await self.history._db.execute( - "SELECT user_id, summary, message_count, updated_at " - "FROM conversation_summaries" - ) - rows = await cursor.fetchall() - - loaded = 0 - for row in rows: - user_id, summary_text, message_count, updated_at = row - summary = ConversationSummary( - summary=summary_text, - last_updated=updated_at, - message_count=message_count, - ) - memory.load_summary(user_id, summary) - loaded += 1 - - if loaded: - logger.info(f"Loaded {loaded} conversation summaries from database") - - except Exception as e: - logger.warning(f"Failed to load summaries from database: {e}") - - def _write_pid(self) -> None: - """Write PID file.""" - pid_file = Path("/tmp/meshai.pid") - pid_file.write_text(str(os.getpid())) - - def _remove_pid(self) -> None: - """Remove PID file.""" - pid_file = Path("/tmp/meshai.pid") - if pid_file.exists(): - pid_file.unlink() - - -def setup_logging(verbose: bool = False) -> None: - """Configure logging.""" - level = logging.DEBUG if verbose else logging.INFO - logging.basicConfig( - level=level, - format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - -def main() -> None: - """Main entry point.""" - parser = argparse.ArgumentParser( - description="MeshAI - LLM-powered Meshtastic assistant", - prog="meshai", - ) - parser.add_argument( - "--version", "-V", action="version", version=f"%(prog)s {__version__}" - ) - parser.add_argument( - "--config", "-c", action="store_true", help="Launch configuration tool" - ) - parser.add_argument( - "--config-file", - "-f", - type=Path, - default=Path("config.yaml"), - help="Path to config file (default: config.yaml)", - ) - parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") - - args = parser.parse_args() - - setup_logging(args.verbose) - - # Launch configurator if requested - if args.config: - run_configurator(args.config_file) - return - - # Load config - config = load_config(args.config_file) - - # Check if config exists - if not args.config_file.exists(): - logger.warning(f"Config file not found: {args.config_file}") - logger.info("Run 'meshai --config' to create one, or copy config.example.yaml") - sys.exit(1) - - # Create and run bot - bot = MeshAI(config) - - # Handle signals - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - def signal_handler(sig, frame): - logger.info(f"Received signal {sig}") - loop.create_task(bot.stop()) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - try: - loop.run_until_complete(bot.start()) - except KeyboardInterrupt: - pass - finally: - loop.run_until_complete(bot.stop()) - loop.close() - - -if __name__ == "__main__": - main() +"""Main entry point for MeshAI.""" + +import argparse +import asyncio +import logging +import os +import signal +import sys +import time +from pathlib import Path +from typing import Optional + +from . import __version__ +from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend +from .cli import run_configurator +from .commands import CommandDispatcher +from .commands.dispatcher import create_dispatcher +from .commands.status import set_start_time +from .config import Config, load_config +from .connector import MeshConnector, MeshMessage +from .context import MeshContext +from .history import ConversationHistory +from .memory import ConversationSummary +from .responder import Responder +from .router import MessageRouter, RouteType + +logger = logging.getLogger(__name__) + + +class MeshAI: + """Main application class.""" + + def __init__(self, config: Config): + self.config = config + self.connector: Optional[MeshConnector] = None + self.history: Optional[ConversationHistory] = None + self.dispatcher: Optional[CommandDispatcher] = None + self.llm: Optional[LLMBackend] = None + self.context: Optional[MeshContext] = None + self.meshmonitor_sync = None + self.knowledge = None + self.source_manager = None + self.health_engine = None + self.mesh_reporter = None + self.router: Optional[MessageRouter] = None + self.responder: Optional[Responder] = None + self._running = False + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._last_cleanup: float = 0.0 + self._last_health_compute: float = 0.0 + + async def start(self) -> None: + """Start the bot.""" + logger.info(f"Starting MeshAI v{__version__}") + set_start_time(time.time()) + + # Initialize components + await self._init_components() + + # Connect to Meshtastic + self.connector.connect() + self.connector.set_message_callback(self._on_message, asyncio.get_event_loop()) + + # Add own node ID to context ignore list + if self.context and self.connector.my_node_id: + self.context._ignore_nodes.add(self.connector.my_node_id) + + self._running = True + self._loop = asyncio.get_event_loop() + self._last_cleanup = time.time() + self._last_health_compute = 0.0 + + # Write PID file + self._write_pid() + + logger.info("MeshAI started successfully") + + # Keep running + while self._running: + await asyncio.sleep(1) + + # Periodic MeshMonitor refresh + if self.meshmonitor_sync: + self.meshmonitor_sync.maybe_refresh() + + # Periodic mesh source refresh and health computation + if self.source_manager: + refreshed = self.source_manager.refresh_all() + # Recompute health after source refresh + if refreshed > 0 and self.health_engine: + self.health_engine.compute(self.source_manager) + self._last_health_compute = time.time() + + # Periodic cleanup + if time.time() - self._last_cleanup >= 3600: + await self.history.cleanup_expired() + if self.context: + self.context.prune() + self._last_cleanup = time.time() + + async def stop(self) -> None: + """Stop the bot.""" + logger.info("Stopping MeshAI...") + self._running = False + + if self.connector: + self.connector.disconnect() + + if self.history: + await self.history.close() + + if self.llm: + await self.llm.close() + if self.knowledge: + self.knowledge.close() + + self._remove_pid() + logger.info("MeshAI stopped") + + async def _init_components(self) -> None: + """Initialize all components.""" + # Conversation history + self.history = ConversationHistory(self.config.history) + await self.history.initialize() + + # LLM backend + api_key = self.config.resolve_api_key() + if not api_key: + logger.warning("No API key configured - LLM responses will fail") + + # Memory config + mem_cfg = self.config.memory + window_size = mem_cfg.window_size if mem_cfg.enabled else 0 + summarize_threshold = mem_cfg.summarize_threshold + + # Create backend + backend = self.config.llm.backend.lower() + if backend == "openai": + self.llm = OpenAIBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + elif backend == "anthropic": + self.llm = AnthropicBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + elif backend == "google": + self.llm = GoogleBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + else: + logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI") + self.llm = OpenAIBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + + # Load persisted summaries into memory cache + await self._load_summaries() + + # Meshtastic connector + self.connector = MeshConnector(self.config.connection) + + # Passive mesh context buffer + ctx_cfg = self.config.context + if ctx_cfg.enabled: + self.context = MeshContext( + observe_channels=ctx_cfg.observe_channels or None, + ignore_nodes=ctx_cfg.ignore_nodes or None, + max_age=ctx_cfg.max_age, + ) + logger.info("Mesh context buffer enabled") + else: + self.context = None + + # MeshMonitor trigger sync + mm_cfg = self.config.meshmonitor + if mm_cfg.enabled and mm_cfg.url: + from .meshmonitor import MeshMonitorSync + self.meshmonitor_sync = MeshMonitorSync( + url=mm_cfg.url, + refresh_interval=mm_cfg.refresh_interval, + ) + count = self.meshmonitor_sync.load() + logger.info(f"MeshMonitor sync enabled, loaded {count} triggers") + else: + self.meshmonitor_sync = None + + # Mesh data sources + enabled_sources = [s for s in self.config.mesh_sources if s.enabled] + if enabled_sources: + from .mesh_sources import MeshSourceManager + self.source_manager = MeshSourceManager(enabled_sources) + # Initial fetch + self.source_manager.refresh_all() + # Log status + for status in self.source_manager.get_status(): + if status["is_loaded"]: + logger.info( + f"Mesh source '{status['name']}' ({status['type']}): " + f"{status['node_count']} nodes" + ) + else: + logger.warning( + f"Mesh source '{status['name']}' ({status['type']}): " + f"failed - {status.get('last_error', 'unknown error')}" + ) + else: + self.source_manager = None + + # Mesh health engine + mi_cfg = self.config.mesh_intelligence + if mi_cfg.enabled and self.source_manager: + from .mesh_health import MeshHealthEngine + self.health_engine = MeshHealthEngine( + regions=mi_cfg.regions, + locality_radius=mi_cfg.locality_radius_miles, + offline_threshold_hours=mi_cfg.offline_threshold_hours, + packet_threshold=mi_cfg.packet_threshold, + battery_warning_percent=mi_cfg.battery_warning_percent, + ) + # Initial health computation + mesh_health = self.health_engine.compute(self.source_manager) + self._last_health_compute = time.time() + logger.info( + f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, " + f"{mesh_health.total_regions} regions, " + f"score {mesh_health.score.composite:.0f}/100 ({mesh_health.score.tier})" + ) + else: + self.health_engine = None + + # Mesh reporter (for LLM prompt injection and commands) + if self.health_engine and self.source_manager: + from .mesh_reporter import MeshReporter + self.mesh_reporter = MeshReporter(self.health_engine, self.source_manager) + logger.info("Mesh reporter enabled") + else: + self.mesh_reporter = None + + # Knowledge base + kb_cfg = self.config.knowledge + if kb_cfg.enabled and kb_cfg.db_path: + from .knowledge import KnowledgeSearch + self.knowledge = KnowledgeSearch( + db_path=kb_cfg.db_path, + top_k=kb_cfg.top_k, + ) + else: + self.knowledge = None + + # Command dispatcher (needs mesh_reporter for health commands) + self.dispatcher = create_dispatcher( + prefix=self.config.commands.prefix, + disabled_commands=self.config.commands.disabled_commands, + custom_commands=self.config.commands.custom_commands, + mesh_reporter=self.mesh_reporter, + ) + + # Message router + self.router = MessageRouter( + self.config, self.connector, self.history, self.dispatcher, self.llm, + context=self.context, + meshmonitor_sync=self.meshmonitor_sync, + knowledge=self.knowledge, + source_manager=self.source_manager, + health_engine=self.health_engine, + mesh_reporter=self.mesh_reporter, + ) + + # Responder + self.responder = Responder(self.config.response, self.connector) + + async def _on_message(self, message: MeshMessage) -> None: + """Handle incoming message.""" + try: + # Passively observe channel broadcasts for context (before filtering) + if self.context and not message.is_dm and message.text: + self.context.observe( + sender_name=message.sender_name, + sender_id=message.sender_id, + text=message.text, + channel=message.channel, + is_dm=False, + ) + + # Check if we should respond + if not self.router.should_respond(message): + return + + logger.info( + f"Processing message from {message.sender_name} ({message.sender_id}): " + f"{message.text[:50]}..." + ) + + # Route the message + # Check for continuation request first + continuation_messages = self.router.check_continuation(message) + if continuation_messages: + await self.responder.send_response( + continuation_messages, + destination=message.sender_id, + channel=message.channel, + ) + return + + result = await self.router.route(message) + + if result.route_type == RouteType.IGNORE: + return + + # Determine response + if result.route_type == RouteType.COMMAND: + messages = result.response # Commands return single string + elif result.route_type == RouteType.LLM: + messages = await self.router.generate_llm_response(message, result.query) + else: + return + + if not messages: + return + + # Send DM response + await self.responder.send_response( + messages, + destination=message.sender_id, + channel=message.channel, + ) + + except Exception as e: + logger.error(f"Error handling message: {e}", exc_info=True) + + async def _load_summaries(self) -> None: + """Load persisted summaries from database into memory cache.""" + memory = self.llm.get_memory() + if not memory: + return + + if not self.history or not self.history._db: + return + + try: + async with self.history._lock: + cursor = await self.history._db.execute( + "SELECT user_id, summary, message_count, updated_at " + "FROM conversation_summaries" + ) + rows = await cursor.fetchall() + + loaded = 0 + for row in rows: + user_id, summary_text, message_count, updated_at = row + summary = ConversationSummary( + summary=summary_text, + last_updated=updated_at, + message_count=message_count, + ) + memory.load_summary(user_id, summary) + loaded += 1 + + if loaded: + logger.info(f"Loaded {loaded} conversation summaries from database") + + except Exception as e: + logger.warning(f"Failed to load summaries from database: {e}") + + def _write_pid(self) -> None: + """Write PID file.""" + pid_file = Path("/tmp/meshai.pid") + pid_file.write_text(str(os.getpid())) + + def _remove_pid(self) -> None: + """Remove PID file.""" + pid_file = Path("/tmp/meshai.pid") + if pid_file.exists(): + pid_file.unlink() + + +def setup_logging(verbose: bool = False) -> None: + """Configure logging.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + +def main() -> None: + """Main entry point.""" + parser = argparse.ArgumentParser( + description="MeshAI - LLM-powered Meshtastic assistant", + prog="meshai", + ) + parser.add_argument( + "--version", "-V", action="version", version=f"%(prog)s {__version__}" + ) + parser.add_argument( + "--config", "-c", action="store_true", help="Launch configuration tool" + ) + parser.add_argument( + "--config-file", + "-f", + type=Path, + default=Path("config.yaml"), + help="Path to config file (default: config.yaml)", + ) + parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + setup_logging(args.verbose) + + # Launch configurator if requested + if args.config: + run_configurator(args.config_file) + return + + # Load config + config = load_config(args.config_file) + + # Check if config exists + if not args.config_file.exists(): + logger.warning(f"Config file not found: {args.config_file}") + logger.info("Run 'meshai --config' to create one, or copy config.example.yaml") + sys.exit(1) + + # Create and run bot + bot = MeshAI(config) + + # Handle signals + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}") + loop.create_task(bot.stop()) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + loop.run_until_complete(bot.start()) + except KeyboardInterrupt: + pass + finally: + loop.run_until_complete(bot.stop()) + loop.close() + + +if __name__ == "__main__": + main() diff --git a/meshai/mesh_health.py b/meshai/mesh_health.py index 7822887..a9c9aa4 100644 --- a/meshai/mesh_health.py +++ b/meshai/mesh_health.py @@ -279,7 +279,7 @@ class MeshHealthEngine: role = node.get("role") or node.get("hwModel") or "" # Determine if infrastructure - is_infra = role.upper() in INFRASTRUCTURE_ROLES + is_infra = str(role).upper() in INFRASTRUCTURE_ROLES # Get position (handle different API formats) lat = node.get("latitude") or node.get("lat") diff --git a/meshai/mesh_reporter.py b/meshai/mesh_reporter.py new file mode 100644 index 0000000..8b7e74c --- /dev/null +++ b/meshai/mesh_reporter.py @@ -0,0 +1,545 @@ +"""Mesh health reporting for LLM prompt injection and commands.""" + +import logging +import time +from datetime import datetime +from typing import Optional + +logger = logging.getLogger(__name__) + + +def _format_age(timestamp: float) -> str: + """Format a timestamp as human-readable age.""" + if not timestamp: + return "never" + + age_seconds = time.time() - timestamp + if age_seconds < 0: + return "just now" + elif age_seconds < 60: + return f"{int(age_seconds)}s ago" + elif age_seconds < 3600: + return f"{int(age_seconds / 60)}m ago" + elif age_seconds < 86400: + return f"{int(age_seconds / 3600)}h ago" + else: + return f"{int(age_seconds / 86400)}d ago" + + +def _tier_flag(tier: str) -> str: + """Get warning flag for health tier.""" + if tier == "Critical": + return " !!" + elif tier == "Warning": + return " !" + elif tier == "Unhealthy": + return " !" + return "" + + +class MeshReporter: + """Builds text blocks for mesh health prompt injection.""" + + def __init__(self, health_engine, source_manager): + """Initialize reporter. + + Args: + health_engine: MeshHealthEngine instance + source_manager: MeshSourceManager instance + """ + self.health_engine = health_engine + self.source_manager = source_manager + + def build_tier1_summary(self) -> str: + """Build compact mesh summary for LLM injection (~500-800 tokens). + + Returns: + Formatted summary string + """ + health = self.health_engine.mesh_health + if not health: + return "LIVE MESH HEALTH DATA: No data available yet." + + score = health.score + ts = datetime.fromtimestamp(health.last_computed).strftime("%H:%M %Z") + + # Infrastructure stats + infra_online = score.infra_online + infra_total = score.infra_total + infra_pct = int((infra_online / infra_total * 100) if infra_total > 0 else 100) + + # Utilization + util = score.util_percent + if util < 15: + util_label = "Low" + elif util < 20: + util_label = "Moderate" + elif util < 25: + util_label = "Elevated" + else: + util_label = "High" + + # Power + if score.battery_warnings == 0: + power_label = "Good" + elif score.battery_warnings <= 2: + power_label = "Some low batteries" + else: + power_label = f"{score.battery_warnings} low batteries" + + lines = [ + f"LIVE MESH HEALTH DATA (as of {ts}):", + "", + f"Overall: {score.composite:.0f}/100 ({score.tier})", + f"Infrastructure: {infra_online}/{infra_total} online ({infra_pct}%)", + f"Channel Utilization: {util:.1f}% avg ({util_label})", + f"Node Behavior: {score.flagged_nodes} nodes flagged", + f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)", + "", + "Regions:", + ] + + # Region summaries + for region in health.regions: + rs = region.score + flag = _tier_flag(rs.tier) + infra_str = f"{rs.infra_online}/{rs.infra_total} infra" + lines.append(f" {region.name}: {rs.composite:.0f}/100 - {infra_str}, {rs.util_percent:.0f}% util{flag}") + + # Top issues + issues = self._gather_top_issues(health) + if issues: + lines.append("") + lines.append("Top Issues:") + for i, issue in enumerate(issues[:5], 1): + lines.append(f" {i}. {issue}") + + lines.append("") + lines.append(f"{health.total_nodes} nodes across {health.total_regions} regions. User can ask about any region, locality, or node for details.") + + return "\n".join(lines) + + def _gather_top_issues(self, health) -> list[str]: + """Gather top issues across all pillars.""" + issues = [] + + # Infrastructure issues (offline nodes) + for region in health.regions: + offline_infra = [] + for nid in region.node_ids: + node = health.nodes.get(nid) + if node and node.is_infrastructure and not node.is_online: + offline_infra.append(node.short_name or nid[:4]) + if offline_infra: + total_infra = sum(1 for nid in region.node_ids + if health.nodes.get(nid) and health.nodes[nid].is_infrastructure) + online = total_infra - len(offline_infra) + issues.append(f"{region.name}: {online}/{total_infra} infrastructure nodes offline ({', '.join(offline_infra[:3])})") + + # Utilization issues + for region in health.regions: + if region.score.util_percent >= 25: + issues.append(f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Warning)") + elif region.score.util_percent >= 20: + issues.append(f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Elevated)") + + # Behavior issues (high packet nodes) + flagged = self.health_engine.get_flagged_nodes() + for node in flagged[:3]: + threshold = self.health_engine.packet_threshold + ratio = node.non_text_packets / threshold + issues.append(f"Node {node.short_name or node.node_id[:4]} sending {node.non_text_packets} non-text packets/24h ({ratio:.1f}x threshold)") + + # Battery issues + battery_warnings = self.health_engine.get_battery_warnings() + for node in battery_warnings[:2]: + issues.append(f"Node {node.short_name or node.node_id[:4]} battery at {node.battery_percent:.0f}%") + + return issues + + def build_region_detail(self, region_name: str) -> str: + """Build detailed breakdown for a specific region. + + Args: + region_name: Region to get detail for + + Returns: + Formatted region detail string + """ + health = self.health_engine.mesh_health + if not health: + return f"REGION DETAIL: {region_name}\nNo data available." + + # Find region (fuzzy match) + region = self._find_region(region_name) + if not region: + return f"REGION DETAIL: {region_name}\nRegion not found." + + rs = region.score + lines = [ + f"REGION DETAIL: {region.name}", + f"Score: {rs.composite:.0f}/100 ({rs.tier})", + "", + f"Infrastructure ({rs.infra_online}/{rs.infra_total}):", + ] + + # List infrastructure nodes + for nid in region.node_ids: + node = health.nodes.get(nid) + if not node or not node.is_infrastructure: + continue + status = "+" if node.is_online else "X" + age = _format_age(node.last_seen) + bat = f", bat {node.battery_percent:.0f}%" if node.battery_percent else "" + role = node.role or "ROUTER" + lines.append(f" {status} {node.short_name or nid[:4]} ({role}) - last seen {age}{bat}") + if not node.is_online: + lines[-1] += " <- OFFLINE" + + # Channel utilization by locality + lines.append("") + lines.append(f"Channel Utilization: {rs.util_percent:.0f}%") + if region.localities: + lines.append(" Localities:") + for loc in region.localities: + node_count = len(loc.node_ids) + lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes") + + # Flagged nodes in this region + flagged_in_region = [] + for nid in region.node_ids: + node = health.nodes.get(nid) + if node and node.non_text_packets > self.health_engine.packet_threshold: + flagged_in_region.append(node) + + if flagged_in_region: + lines.append("") + lines.append("Flagged Nodes:") + for node in flagged_in_region[:5]: + lines.append(f" {node.short_name or node.node_id[:4]}: {node.non_text_packets} non-text pkts/24h") + + # Power warnings in this region + low_bat = [] + for nid in region.node_ids: + node = health.nodes.get(nid) + if node and node.battery_percent is not None and node.battery_percent < self.health_engine.battery_warning_percent: + low_bat.append(node) + + if low_bat: + lines.append("") + lines.append("Power:") + bat_str = ", ".join(f"{n.short_name or n.node_id[:4]} at {n.battery_percent:.0f}%" for n in low_bat[:4]) + lines.append(f" Low battery: {bat_str}") + + return "\n".join(lines) + + def build_node_detail(self, node_identifier: str) -> str: + """Build detailed info for a specific node. + + Args: + node_identifier: Shortname, longname, nodeId, or nodeNum + + Returns: + Formatted node detail string + """ + health = self.health_engine.mesh_health + if not health: + return f"NODE DETAIL: {node_identifier}\nNo data available." + + # Find node (multiple match strategies) + node = self._find_node(node_identifier) + if not node: + return f"NODE DETAIL: {node_identifier}\nNode not found." + + lines = [ + f"NODE DETAIL: {node.long_name or node.short_name} ({node.short_name})", + f"ID: {node.node_id}", + f"Hardware: {node.role or 'Unknown'}", + f"Role: {'Infrastructure' if node.is_infrastructure else 'Client'}", + f"Region: {node.region or 'Unknown'} / Locality: {node.locality or 'Unknown'}", + ] + + if node.latitude and node.longitude: + lines.append(f"Position: {node.latitude:.4f}, {node.longitude:.4f}") + + age = _format_age(node.last_seen) + status = "Online" if node.is_online else "OFFLINE" + lines.append(f"Last Seen: {age} ({status})") + + # Get source info from source manager + all_nodes = self.source_manager.get_all_nodes() + sources = [] + for n in all_nodes: + nid = str(n.get("id") or n.get("nodeId") or n.get("num") or "") + if nid == node.node_id: + sources = n.get("_sources", []) + break + if sources: + lines.append(f"Sources: {', '.join(sources)}") + + # Traffic stats + lines.append("") + lines.append("Traffic (24h):") + lines.append(f" Total packets: {node.packet_count_24h}") + lines.append(f" Text messages: {node.text_packet_count_24h}") + lines.append(f" Non-text: {node.non_text_packets}") + + # Power + lines.append("") + lines.append("Power:") + if node.battery_percent is not None: + bat_status = "Low" if node.battery_percent < 20 else "OK" + lines.append(f" Battery: {node.battery_percent:.0f}% ({bat_status})") + else: + lines.append(" Battery: N/A") + if node.voltage: + lines.append(f" Voltage: {node.voltage:.2f}V") + lines.append(f" Solar: {'Yes' if node.has_solar else 'Unknown'}") + + # Recommendations for this node + recs = self._node_recommendations(node) + if recs: + lines.append("") + lines.append("Recommendations:") + for rec in recs: + lines.append(f" - {rec}") + + return "\n".join(lines) + + def _node_recommendations(self, node) -> list[str]: + """Generate recommendations for a specific node.""" + recs = [] + + # High packet count + if node.non_text_packets > self.health_engine.packet_threshold: + ratio = node.non_text_packets / self.health_engine.packet_threshold + recs.append(f"Sending {ratio:.1f}x normal packets. Check position/telemetry intervals.") + + # Low battery + if node.battery_percent is not None and node.battery_percent < 20: + recs.append(f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar.") + + # Offline + if not node.is_online: + age = _format_age(node.last_seen) + recs.append(f"Node offline since {age}. Check power and connectivity.") + + return recs + + def build_recommendations(self, scope: str, scope_value: str = None) -> str: + """Generate actionable optimization recommendations. + + Args: + scope: "mesh", "region", or "node" + scope_value: Region name or node identifier (for scoped recommendations) + + Returns: + Formatted recommendations string + """ + health = self.health_engine.mesh_health + if not health: + return "" + + recs = [] + + if scope == "node" and scope_value: + node = self._find_node(scope_value) + if node: + recs.extend(self._node_recommendations(node)) + + elif scope == "region" and scope_value: + region = self._find_region(scope_value) + if region: + recs.extend(self._region_recommendations(region, health)) + + else: # mesh scope + recs.extend(self._mesh_recommendations(health)) + + if not recs: + return "" + + lines = ["OPTIMIZATION RECOMMENDATIONS:"] + for rec in recs[:5]: + lines.append(f" - {rec}") + + return "\n".join(lines) + + def _region_recommendations(self, region, health) -> list[str]: + """Generate recommendations for a region.""" + recs = [] + + # High utilization + if region.score.util_percent >= 20: + recs.append(f"Channel utilization at {region.score.util_percent:.0f}%. Consider spreading nodes across frequencies or reducing telemetry intervals.") + + # Offline infrastructure + offline_count = region.score.infra_total - region.score.infra_online + if offline_count > 0: + recs.append(f"{offline_count} infrastructure node(s) offline. Check power and connectivity.") + + # Flagged nodes + flagged = [] + for nid in region.node_ids: + node = health.nodes.get(nid) + if node and node.non_text_packets > self.health_engine.packet_threshold: + flagged.append(node) + if flagged: + names = ", ".join(n.short_name or n.node_id[:4] for n in flagged[:3]) + recs.append(f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings.") + + return recs + + def _mesh_recommendations(self, health) -> list[str]: + """Generate mesh-wide recommendations.""" + recs = [] + + # Overall utilization + if health.score.util_percent >= 20: + recs.append(f"Mesh-wide utilization at {health.score.util_percent:.0f}%. Consider reducing position/telemetry broadcast frequency.") + + # Multiple regions with issues + problem_regions = [r for r in health.regions if r.score.composite < 75] + if len(problem_regions) > 1: + names = ", ".join(r.name for r in problem_regions[:3]) + recs.append(f"Multiple regions degraded ({names}). Prioritize infrastructure improvements.") + + # High packet nodes mesh-wide + flagged = self.health_engine.get_flagged_nodes() + if len(flagged) > 3: + total_excess = sum(n.non_text_packets - self.health_engine.packet_threshold for n in flagged) + recs.append(f"{len(flagged)} nodes exceeding packet threshold ({total_excess} excess packets/day). Review default telemetry intervals.") + + # Battery warnings + battery_warnings = self.health_engine.get_battery_warnings() + if len(battery_warnings) > 2: + recs.append(f"{len(battery_warnings)} nodes with low battery. Consider solar additions for remote nodes.") + + return recs + + def build_lora_compact(self, scope: str, scope_value: str = None) -> str: + """Build LoRa-optimized compact summary (~200 chars). + + Args: + scope: "mesh" or "region" + scope_value: Region name if scope is "region" + + Returns: + Compact formatted string + """ + health = self.health_engine.mesh_health + if not health: + return "Mesh: No data" + + if scope == "region" and scope_value: + region = self._find_region(scope_value) + if not region: + return f"Region '{scope_value}' not found" + rs = region.score + return f"{region.name} {rs.composite:.0f}/100 | {rs.infra_online}/{rs.infra_total} infra | {rs.util_percent:.0f}% util" + + # Mesh summary + s = health.score + lines = [f"Mesh {s.composite:.0f}/100 | {s.infra_online}/{s.infra_total} infra | {s.util_percent:.0f}% util"] + + # Add warnings for problem regions/nodes + warnings = [] + for region in health.regions: + if region.score.composite < 60: + offline = region.score.infra_total - region.score.infra_online + warnings.append(f"! {region.name} {region.score.composite:.0f}/100 - {offline} infra offline") + + battery_warnings = self.health_engine.get_battery_warnings() + for node in battery_warnings[:2]: + warnings.append(f"! {node.short_name or node.node_id[:4]} bat {node.battery_percent:.0f}%") + + for w in warnings[:2]: + lines.append(w) + + return "\n".join(lines) + + def _find_region(self, name: str): + """Find a region by fuzzy name match.""" + health = self.health_engine.mesh_health + if not health: + return None + + name_lower = name.lower().strip() + + # Exact match first + for region in health.regions: + if region.name.lower() == name_lower: + return region + + # Substring match + for region in health.regions: + if name_lower in region.name.lower(): + return region + + # Try matching against anchor city names + for anchor in self.health_engine.regions: + # Check if search term matches anchor city or region name + anchor_name_lower = anchor.name.lower() + if name_lower in anchor_name_lower: + # Find the corresponding region + for region in health.regions: + if region.name == anchor.name: + return region + + return None + + def _find_node(self, identifier: str): + """Find a node by shortname, longname, nodeId, or nodeNum.""" + health = self.health_engine.mesh_health + if not health: + return None + + identifier = identifier.strip() + id_lower = identifier.lower() + + # Try shortname (case-insensitive) + for node in health.nodes.values(): + if node.short_name and node.short_name.lower() == id_lower: + return node + + # Try longname (substring) + for node in health.nodes.values(): + if node.long_name and id_lower in node.long_name.lower(): + return node + + # Try exact nodeId + if identifier in health.nodes: + return health.nodes[identifier] + + # Try hex nodeId with ! prefix + if identifier.startswith("!"): + hex_id = identifier[1:] + for nid, node in health.nodes.items(): + if nid.lower() == hex_id.lower(): + return node + + # Try decimal nodeNum + if identifier.isdigit(): + # Convert to hex and search + try: + hex_id = format(int(identifier), 'x') + for nid, node in health.nodes.items(): + if hex_id in nid.lower(): + return node + except ValueError: + pass + + return None + + def list_regions_compact(self) -> str: + """List all regions with scores in compact format.""" + health = self.health_engine.mesh_health + if not health or not health.regions: + return "No regions configured." + + lines = ["Regions:"] + for region in health.regions: + s = region.score + flag = _tier_flag(s.tier) + lines.append(f" {region.name}: {s.composite:.0f}/100{flag}") + + return "\n".join(lines) diff --git a/meshai/router.py b/meshai/router.py index f008c40..4bc5315 100644 --- a/meshai/router.py +++ b/meshai/router.py @@ -1,344 +1,472 @@ -"""Message routing logic for MeshAI.""" - -import asyncio -import logging -import re -from dataclasses import dataclass -from enum import Enum, auto -from typing import Optional - -from .backends.base import LLMBackend -from .commands import CommandContext, CommandDispatcher -from .config import Config -from .connector import MeshConnector, MeshMessage -from .context import MeshContext -from .history import ConversationHistory -from .chunker import chunk_response, ContinuationState - -logger = logging.getLogger(__name__) - - -class RouteType(Enum): - """Type of message routing.""" - - IGNORE = auto() # Don't respond - COMMAND = auto() # Bang command - LLM = auto() # Route to LLM - - -@dataclass -class RouteResult: - """Result of routing decision.""" - - route_type: RouteType - response: Optional[str] = None # For commands, the response - query: Optional[str] = None # For LLM, the cleaned query - - -# advBBS protocol and notification prefixes to ignore -ADVBBS_PREFIXES = ( - "MAILREQ|", "MAILACK|", "MAILNAK|", "MAILDAT|", "MAILDLV|", - "BOARDREQ|", "BOARDACK|", "BOARDNAK|", "BOARDDAT|", "BOARDDLV|", - "advBBS|", - "[MAIL]", -) - -# Patterns that suggest prompt injection attempts -_INJECTION_PATTERNS = [ - re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE), - re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE), - re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE), - re.compile(r"you\s+are\s+now\b", re.IGNORECASE), - re.compile(r"new\s+instructions?\s*:", re.IGNORECASE), - re.compile(r"system\s*prompt\s*:", re.IGNORECASE), -] - - -class MessageRouter: - """Routes incoming messages to appropriate handlers.""" - - def __init__( - self, - config: Config, - connector: MeshConnector, - history: ConversationHistory, - dispatcher: CommandDispatcher, - llm_backend: LLMBackend, - context: MeshContext = None, - meshmonitor_sync=None, - knowledge=None, - source_manager=None, - health_engine=None, - ): - self.config = config - self.connector = connector - self.history = history - self.dispatcher = dispatcher - self.llm = llm_backend - self.context = context - self.meshmonitor_sync = meshmonitor_sync - self.knowledge = knowledge - self.source_manager = source_manager - self.health_engine = health_engine - self.continuations = ContinuationState(max_continuations=3) - - - def should_respond(self, message: MeshMessage) -> bool: - """Determine if we should respond to this message. - - DM-only bot: ignores all public channel messages. - Commands and conversational LLM responses both work in DMs. - - Args: - message: Incoming message - - Returns: - True if we should process this message - """ - # Always ignore our own messages - if message.sender_id == self.connector.my_node_id: - return False - - # Only respond to DMs - if not message.is_dm: - return False - - if not self.config.bot.respond_to_dms: - return False - - # Ignore advBBS protocol and notification messages - if self.config.bot.filter_bbs_protocols: - if any(message.text.startswith(p) for p in ADVBBS_PREFIXES): - logger.debug(f"Ignoring advBBS message from {message.sender_id}: {message.text[:40]}...") - return False - - # Ignore messages that MeshMonitor will handle - if self.meshmonitor_sync and self.meshmonitor_sync.matches(message.text): - logger.debug(f"Ignoring MeshMonitor-handled message: {message.text[:40]}...") - return False - - return True - - def check_continuation(self, message) -> list[str] | None: - """Check if this is a continuation request and return messages if so. - - Returns: - List of messages to send, or None if not a continuation - """ - user_id = message.sender_id - text = message.text.strip() - - logger.info(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}") - - if self.continuations.has_pending(user_id): - if self.continuations.is_continuation_request(text): - result = self.continuations.get_continuation(user_id) - if result: - messages, _ = result - return messages - # Max continuations reached, return None to fall through - else: - # User asked something new, clear pending continuation - self.continuations.clear(user_id) - - return None - - async def route(self, message: MeshMessage) -> RouteResult: - """Route a message and generate response. - - Args: - message: Incoming message to route - - Returns: - RouteResult with routing decision and any response - """ - text = message.text.strip() - - # Check for bang command first - if self.dispatcher.is_command(text): - context = self._make_command_context(message) - response = await self.dispatcher.dispatch(text, context) - return RouteResult(RouteType.COMMAND, response=response) - - # Clean up the message (remove @mention) - query = self._clean_query(text) - - if not query: - return RouteResult(RouteType.IGNORE) - - # Route to LLM - return RouteResult(RouteType.LLM, query=query) - - async def generate_llm_response(self, message: MeshMessage, query: str) -> str: - """Generate LLM response for a message. - - Args: - message: Original message - query: Cleaned query text - - Returns: - Generated response - """ - # Add user message to history - await self.history.add_message(message.sender_id, "user", query) - - # Get conversation history - history = await self.history.get_history_for_llm(message.sender_id) - - # Build system prompt in order: identity -> static -> meshmonitor -> context - - # 1. Dynamic identity from bot config - bot_name = self.config.bot.name or "MeshAI" - bot_owner = self.config.bot.owner or "Unknown" - - identity = ( - f"You are {bot_name}, an LLM-powered conversational assistant running on a " - f"Meshtastic mesh network. Your managing operator is {bot_owner}. " - f"You are open source at github.com/zvx-echo6/meshai.\n\n" - f"IDENTITY: Your name is {bot_name}. You respond to DMs only. You connect " - f"to a Meshtastic node via TCP through meshtasticd.\n\n" - ) - - # 2. Static system prompt from config - static_prompt = "" - if getattr(self.config.llm, 'use_system_prompt', True): - static_prompt = self.config.llm.system_prompt - - system_prompt = identity + static_prompt - - # 3. MeshMonitor info (only when enabled) - if ( - self.meshmonitor_sync - and self.config.meshmonitor.enabled - and self.config.meshmonitor.inject_into_prompt - ): - meshmonitor_intro = ( - "\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same " - "meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, " - "traceroutes, security scanning, and auto-responder commands. Its trigger " - "commands are listed below — if someone asks what commands are available, " - "mention both yours and MeshMonitor's. If someone asks where to get " - "MeshMonitor, direct them to github.com/Yeraze/meshmonitor" - ) - system_prompt += meshmonitor_intro - - commands_summary = self.meshmonitor_sync.get_commands_summary() - if commands_summary: - system_prompt += "\n\n" + commands_summary - - # 4. Inject mesh context if available - if self.context: - max_items = getattr(self.config.context, 'max_context_items', 20) - context_block = self.context.get_context_block(max_items=max_items) - if context_block: - system_prompt += ( - "\n\n--- Recent mesh traffic (for context only, not messages to you) ---\n" - + context_block - ) - else: - system_prompt += ( - "\n\n[No recent mesh traffic observed yet.]" - ) - - - - # 5. Knowledge base retrieval - if self.knowledge and query: - results = self.knowledge.search(query) - if results: - chunks = "\n\n".join( - f"[{r['title']}]: {r['excerpt']}" for r in results - ) - system_prompt += ( - "\n\nREFERENCE KNOWLEDGE - Answer using this information:\n" - + chunks - ) - - # DEBUG: Log system prompt status - logger.warning(f"SYSTEM PROMPT LENGTH: {len(system_prompt)} chars") - logger.warning(f"HAS REFERENCE KNOWLEDGE: {'REFERENCE KNOWLEDGE' in system_prompt}") - try: - response = await self.llm.generate( - messages=history, - system_prompt=system_prompt, - max_tokens=500, - ) - except asyncio.TimeoutError: - logger.error("LLM request timed out") - response = "Sorry, request timed out. Try again." - except Exception as e: - logger.error(f"LLM generation error: {e}") - response = "Sorry, I encountered an error. Please try again." - - # Add assistant response to history - await self.history.add_message(message.sender_id, "assistant", response) - - # Persist summary if one was created/updated - await self._persist_summary(message.sender_id) - - # Chunk the response with sentence awareness - messages, remaining = chunk_response( - response, - max_chars=self.config.response.max_length, - max_messages=self.config.response.max_messages, - ) - - # Store remaining content for continuation - if remaining: - logger.info(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining") - self.continuations.store(message.sender_id, remaining) - else: - logger.info(f"No remaining content for {message.sender_id}") - - return messages - - async def _persist_summary(self, user_id: str) -> None: - """Persist any cached summary to the database. - - Args: - user_id: User identifier - """ - memory = self.llm.get_memory() - if not memory: - return - - summary = memory.get_cached_summary(user_id) - if summary: - await self.history.store_summary( - user_id, - summary.summary, - summary.message_count, - ) - logger.debug(f"Persisted summary for {user_id}") - - def _clean_query(self, text: str) -> str: - """Clean up query text and check for prompt injection.""" - cleaned = " ".join(text.split()) - cleaned = cleaned.strip() - - # Check for prompt injection - for pattern in _INJECTION_PATTERNS: - if pattern.search(cleaned): - logger.warning( - f"Possible prompt injection detected: {cleaned[:80]}..." - ) - match = pattern.search(cleaned) - cleaned = cleaned[:match.start()].strip() - if not cleaned: - cleaned = "Hello" - break - - return cleaned - - def _make_command_context(self, message: MeshMessage) -> CommandContext: - """Create command context from message.""" - return CommandContext( - sender_id=message.sender_id, - sender_name=message.sender_name, - channel=message.channel, - is_dm=message.is_dm, - position=message.sender_position, - config=self.config, - connector=self.connector, - history=self.history, - ) +"""Message routing logic for MeshAI.""" + +import asyncio +import logging +import re +from dataclasses import dataclass +from enum import Enum, auto +from typing import Optional + +from .backends.base import LLMBackend +from .commands import CommandContext, CommandDispatcher +from .config import Config +from .connector import MeshConnector, MeshMessage +from .context import MeshContext +from .history import ConversationHistory +from .chunker import chunk_response, ContinuationState + +logger = logging.getLogger(__name__) + + +class RouteType(Enum): + """Type of message routing.""" + + IGNORE = auto() # Don't respond + COMMAND = auto() # Bang command + LLM = auto() # Route to LLM + + +@dataclass +class RouteResult: + """Result of routing decision.""" + + route_type: RouteType + response: Optional[str] = None # For commands, the response + query: Optional[str] = None # For LLM, the cleaned query + + +# advBBS protocol and notification prefixes to ignore +ADVBBS_PREFIXES = ( + "MAILREQ|", "MAILACK|", "MAILNAK|", "MAILDAT|", "MAILDLV|", + "BOARDREQ|", "BOARDACK|", "BOARDNAK|", "BOARDDAT|", "BOARDDLV|", + "advBBS|", + "[MAIL]", +) + +# Patterns that suggest prompt injection attempts +_INJECTION_PATTERNS = [ + re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE), + re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE), + re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE), + re.compile(r"you\s+are\s+now\b", re.IGNORECASE), + re.compile(r"new\s+instructions?\s*:", re.IGNORECASE), + re.compile(r"system\s*prompt\s*:", re.IGNORECASE), +] + +# Keywords that indicate mesh-related questions +_MESH_KEYWORDS = { + "mesh", "network", "health", "nodes", "node", "utilization", "signal", + "coverage", "battery", "solar", "offline", "router", "channel", "packet", + "hop", "optimize", "optimization", "infrastructure", "infra", "relay", + "repeater", "region", "locality", "congestion", "collision", "airtime", + "telemetry", "firmware", "subscribe", "alert", "snr", "rssi", +} + +# Phrases that indicate mesh questions +_MESH_PHRASES = [ + "how's the mesh", + "hows the mesh", + "mesh status", + "what's wrong", + "whats wrong", + "check node", + "node status", + "network health", + "mesh health", +] + +# Mesh awareness instruction for LLM +_MESH_AWARENESS_PROMPT = """ +When the user asks about mesh health, network status, or optimization: +- Use the LIVE MESH HEALTH DATA injected above to answer with real numbers +- Be specific: name nodes, cite utilization percentages, reference actual scores +- Give actionable recommendations based on the data +- If asked about a region or node you have detail for, use that detail +- If asked about something the data doesn't cover, say so - don't fabricate +- Keep responses concise - these go over LoRa with limited message size +- Users can run !health for a quick mesh summary or !region [name] for regional info +""" + + +class MessageRouter: + """Routes incoming messages to appropriate handlers.""" + + def __init__( + self, + config: Config, + connector: MeshConnector, + history: ConversationHistory, + dispatcher: CommandDispatcher, + llm_backend: LLMBackend, + context: MeshContext = None, + meshmonitor_sync=None, + knowledge=None, + source_manager=None, + health_engine=None, + mesh_reporter=None, + ): + self.config = config + self.connector = connector + self.history = history + self.dispatcher = dispatcher + self.llm = llm_backend + self.context = context + self.meshmonitor_sync = meshmonitor_sync + self.knowledge = knowledge + self.source_manager = source_manager + self.health_engine = health_engine + self.mesh_reporter = mesh_reporter + self.continuations = ContinuationState(max_continuations=3) + + def should_respond(self, message: MeshMessage) -> bool: + """Determine if we should respond to this message. + + DM-only bot: ignores all public channel messages. + Commands and conversational LLM responses both work in DMs. + + Args: + message: Incoming message + + Returns: + True if we should process this message + """ + # Always ignore our own messages + if message.sender_id == self.connector.my_node_id: + return False + + # Only respond to DMs + if not message.is_dm: + return False + + if not self.config.bot.respond_to_dms: + return False + + # Ignore advBBS protocol and notification messages + if self.config.bot.filter_bbs_protocols: + if any(message.text.startswith(p) for p in ADVBBS_PREFIXES): + logger.debug(f"Ignoring advBBS message from {message.sender_id}: {message.text[:40]}...") + return False + + # Ignore messages that MeshMonitor will handle + if self.meshmonitor_sync and self.meshmonitor_sync.matches(message.text): + logger.debug(f"Ignoring MeshMonitor-handled message: {message.text[:40]}...") + return False + + return True + + def check_continuation(self, message) -> list[str] | None: + """Check if this is a continuation request and return messages if so. + + Returns: + List of messages to send, or None if not a continuation + """ + user_id = message.sender_id + text = message.text.strip() + + logger.debug(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}") + + if self.continuations.has_pending(user_id): + if self.continuations.is_continuation_request(text): + result = self.continuations.get_continuation(user_id) + if result: + messages, _ = result + return messages + # Max continuations reached, return None to fall through + else: + # User asked something new, clear pending continuation + self.continuations.clear(user_id) + + return None + + async def route(self, message: MeshMessage) -> RouteResult: + """Route a message and generate response. + + Args: + message: Incoming message to route + + Returns: + RouteResult with routing decision and any response + """ + text = message.text.strip() + + # Check for bang command first + if self.dispatcher.is_command(text): + context = self._make_command_context(message) + response = await self.dispatcher.dispatch(text, context) + return RouteResult(RouteType.COMMAND, response=response) + + # Clean up the message (remove @mention) + query = self._clean_query(text) + + if not query: + return RouteResult(RouteType.IGNORE) + + # Route to LLM + return RouteResult(RouteType.LLM, query=query) + + def _is_mesh_question(self, message: str) -> bool: + """Check if message is asking about mesh health/status. + + Args: + message: User message text + + Returns: + True if this is a mesh-related question + """ + msg_lower = message.lower() + + # Check for mesh phrases + for phrase in _MESH_PHRASES: + if phrase in msg_lower: + return True + + # Check for mesh keywords + words = set(re.findall(r'\b\w+\b', msg_lower)) + if words & _MESH_KEYWORDS: + return True + + return False + + def _detect_mesh_scope(self, message: str) -> tuple[str, Optional[str]]: + """Detect the scope of a mesh question. + + Args: + message: User message text + + Returns: + Tuple of (scope_type, scope_value): + - ("node", "{identifier}") if asking about specific node + - ("region", "{region_name}") if asking about specific region + - ("mesh", None) for general mesh questions + """ + msg_lower = message.lower() + + # Check for node references + if self.health_engine and self.health_engine.mesh_health: + health = self.health_engine.mesh_health + + # Look for node shortnames (4 chars, case-insensitive) + for node in health.nodes.values(): + if node.short_name: + # Check if shortname appears as a word in message + pattern = r'\b' + re.escape(node.short_name.lower()) + r'\b' + if re.search(pattern, msg_lower): + return ("node", node.short_name) + + # Check longname substring + if node.long_name and node.long_name.lower() in msg_lower: + return ("node", node.short_name or node.node_id) + + # Check for region references + if self.health_engine: + for anchor in self.health_engine.regions: + anchor_lower = anchor.name.lower() + # Check region name + if anchor_lower in msg_lower: + return ("region", anchor.name) + + # Check parts of region name (e.g., "wood river" matches "Wood River - ID") + parts = anchor_lower.replace("-", " ").replace("–", " ").split() + for part in parts: + if len(part) > 3 and part in msg_lower: + return ("region", anchor.name) + + return ("mesh", None) + + async def generate_llm_response(self, message: MeshMessage, query: str) -> str: + """Generate LLM response for a message. + + Args: + message: Original message + query: Cleaned query text + + Returns: + Generated response + """ + # Add user message to history + await self.history.add_message(message.sender_id, "user", query) + + # Get conversation history + history = await self.history.get_history_for_llm(message.sender_id) + + # Build system prompt in order: identity -> static -> meshmonitor -> context -> knowledge -> mesh + + # 1. Dynamic identity from bot config + bot_name = self.config.bot.name or "MeshAI" + bot_owner = self.config.bot.owner or "Unknown" + + identity = ( + f"You are {bot_name}, an LLM-powered conversational assistant running on a " + f"Meshtastic mesh network. Your managing operator is {bot_owner}. " + f"You are open source at github.com/zvx-echo6/meshai.\n\n" + f"IDENTITY: Your name is {bot_name}. You respond to DMs only. You connect " + f"to a Meshtastic node via TCP through meshtasticd.\n\n" + ) + + # 2. Static system prompt from config + static_prompt = "" + if getattr(self.config.llm, 'use_system_prompt', True): + static_prompt = self.config.llm.system_prompt + + system_prompt = identity + static_prompt + + # 3. MeshMonitor info (only when enabled) + if ( + self.meshmonitor_sync + and self.config.meshmonitor.enabled + and self.config.meshmonitor.inject_into_prompt + ): + meshmonitor_intro = ( + "\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same " + "meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, " + "traceroutes, security scanning, and auto-responder commands. Its trigger " + "commands are listed below — if someone asks what commands are available, " + "mention both yours and MeshMonitor's. If someone asks where to get " + "MeshMonitor, direct them to github.com/Yeraze/meshmonitor" + ) + system_prompt += meshmonitor_intro + + commands_summary = self.meshmonitor_sync.get_commands_summary() + if commands_summary: + system_prompt += "\n\n" + commands_summary + + # 4. Inject mesh context if available + if self.context: + max_items = getattr(self.config.context, 'max_context_items', 20) + context_block = self.context.get_context_block(max_items=max_items) + if context_block: + system_prompt += ( + "\n\n--- Recent mesh traffic (for context only, not messages to you) ---\n" + + context_block + ) + else: + system_prompt += ( + "\n\n[No recent mesh traffic observed yet.]" + ) + + # 5. Knowledge base retrieval + if self.knowledge and query: + results = self.knowledge.search(query) + if results: + chunks = "\n\n".join( + f"[{r['title']}]: {r['excerpt']}" for r in results + ) + system_prompt += ( + "\n\nREFERENCE KNOWLEDGE - Answer using this information:\n" + + chunks + ) + + # 6. Mesh Intelligence (inject health data for mesh questions) + if ( + self.source_manager + and self.mesh_reporter + and self._is_mesh_question(query) + ): + scope_type, scope_value = self._detect_mesh_scope(query) + + # Always include Tier 1 summary for mesh questions + tier1 = self.mesh_reporter.build_tier1_summary() + system_prompt += "\n\n" + tier1 + + # Add Tier 2 detail if scoped + if scope_type == "region" and scope_value: + region_detail = self.mesh_reporter.build_region_detail(scope_value) + system_prompt += "\n\n" + region_detail + elif scope_type == "node" and scope_value: + node_detail = self.mesh_reporter.build_node_detail(scope_value) + system_prompt += "\n\n" + node_detail + + # Always include relevant recommendations + recommendations = self.mesh_reporter.build_recommendations(scope_type, scope_value) + if recommendations: + system_prompt += "\n\n" + recommendations + + # Add mesh awareness instructions + system_prompt += _MESH_AWARENESS_PROMPT + + # DEBUG: Log system prompt status + logger.debug(f"System prompt length: {len(system_prompt)} chars") + + try: + response = await self.llm.generate( + messages=history, + system_prompt=system_prompt, + max_tokens=500, + ) + except asyncio.TimeoutError: + logger.error("LLM request timed out") + response = "Sorry, request timed out. Try again." + except Exception as e: + logger.error(f"LLM generation error: {e}") + response = "Sorry, I encountered an error. Please try again." + + # Add assistant response to history + await self.history.add_message(message.sender_id, "assistant", response) + + # Persist summary if one was created/updated + await self._persist_summary(message.sender_id) + + # Chunk the response with sentence awareness + messages, remaining = chunk_response( + response, + max_chars=self.config.response.max_length, + max_messages=self.config.response.max_messages, + ) + + # Store remaining content for continuation + if remaining: + logger.debug(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining") + self.continuations.store(message.sender_id, remaining) + + return messages + + async def _persist_summary(self, user_id: str) -> None: + """Persist any cached summary to the database. + + Args: + user_id: User identifier + """ + memory = self.llm.get_memory() + if not memory: + return + + summary = memory.get_cached_summary(user_id) + if summary: + await self.history.store_summary( + user_id, + summary.summary, + summary.message_count, + ) + logger.debug(f"Persisted summary for {user_id}") + + def _clean_query(self, text: str) -> str: + """Clean up query text and check for prompt injection.""" + cleaned = " ".join(text.split()) + cleaned = cleaned.strip() + + # Check for prompt injection + for pattern in _INJECTION_PATTERNS: + if pattern.search(cleaned): + logger.warning( + f"Possible prompt injection detected: {cleaned[:80]}..." + ) + match = pattern.search(cleaned) + cleaned = cleaned[:match.start()].strip() + if not cleaned: + cleaned = "Hello" + break + + return cleaned + + def _make_command_context(self, message: MeshMessage) -> CommandContext: + """Create command context from message.""" + return CommandContext( + sender_id=message.sender_id, + sender_name=message.sender_name, + channel=message.channel, + is_dm=message.is_dm, + position=message.sender_position, + config=self.config, + connector=self.connector, + history=self.history, + )