feat: Phase 3 - LLM mesh health integration, recommendations, and health commands

New files:
- mesh_reporter.py: MeshReporter class for prompt injection
  - build_tier1_summary(): ~500-800 token mesh health summary
  - build_region_detail(): Detailed region breakdown
  - build_node_detail(): Single node info with recommendations
  - build_recommendations(): Optimization suggestions
  - build_lora_compact(): Short format for LoRa messages
  - list_regions_compact(): Region list with scores

- commands/health.py: !health and !region commands
  - !health: Quick mesh summary (no LLM)
  - !region [name]: Region info or list all regions

Modified files:
- router.py: Mesh question detection and prompt injection
  - _is_mesh_question(): Keyword/phrase matching
  - _detect_mesh_scope(): Node/region/mesh scope detection
  - Inject Tier 1/2 data for mesh questions
  - Add mesh awareness instructions to LLM

- main.py: Create MeshReporter, pass to dispatcher/router

- commands/dispatcher.py: Register health/region commands

- mesh_health.py: Fix role type (int -> str)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-04 19:19:42 +00:00
commit 44c74ccfd4
6 changed files with 1546 additions and 784 deletions

View file

@ -157,6 +157,7 @@ def create_dispatcher(
prefix: str = "!",
disabled_commands: Optional[list[str]] = None,
custom_commands: Optional[dict] = None,
mesh_reporter=None,
) -> CommandDispatcher:
"""Create and populate command dispatcher with default commands.
@ -164,6 +165,7 @@ def create_dispatcher(
prefix: Command prefix (default: "!")
disabled_commands: List of command names to disable
custom_commands: Dict of name -> response for custom commands
mesh_reporter: MeshReporter instance for health commands
Returns:
Configured CommandDispatcher
@ -174,6 +176,7 @@ def create_dispatcher(
from .reset import ResetCommand
from .status import StatusCommand
from .weather import WeatherCommand
from .health import HealthCommand, RegionCommand
dispatcher = CommandDispatcher(prefix=prefix, disabled_commands=disabled_commands)
@ -185,6 +188,23 @@ def create_dispatcher(
dispatcher.register(StatusCommand())
dispatcher.register(WeatherCommand())
# Register mesh health commands
health_cmd = HealthCommand(mesh_reporter)
dispatcher.register(health_cmd)
# Register aliases for health command
for alias in getattr(health_cmd, 'aliases', []):
alias_handler = HealthCommand(mesh_reporter)
alias_handler.name = alias
dispatcher.register(alias_handler)
region_cmd = RegionCommand(mesh_reporter)
dispatcher.register(region_cmd)
# Register aliases for region command
for alias in getattr(region_cmd, 'aliases', []):
alias_handler = RegionCommand(mesh_reporter)
alias_handler.name = alias
dispatcher.register(alias_handler)
# Register custom commands
if custom_commands:
for name, response in custom_commands.items():

58
meshai/commands/health.py Normal file
View file

@ -0,0 +1,58 @@
"""Health and region commands for mesh status."""
from .base import CommandContext, CommandHandler
class HealthCommand(CommandHandler):
"""Quick mesh health summary."""
name = "health"
description = "Show mesh health summary"
usage = "!health"
aliases = ["mesh", "status"]
def __init__(self, mesh_reporter=None):
"""Initialize with optional mesh reporter.
Args:
mesh_reporter: MeshReporter instance for health data
"""
self._mesh_reporter = mesh_reporter
async def execute(self, args: str, context: CommandContext) -> str:
"""Return compact mesh health summary."""
if not self._mesh_reporter:
return "Mesh health not available."
return self._mesh_reporter.build_lora_compact("mesh")
class RegionCommand(CommandHandler):
"""Region health information."""
name = "region"
description = "Show region health info"
usage = "!region [name]"
aliases = ["reg"]
def __init__(self, mesh_reporter=None):
"""Initialize with optional mesh reporter.
Args:
mesh_reporter: MeshReporter instance for health data
"""
self._mesh_reporter = mesh_reporter
async def execute(self, args: str, context: CommandContext) -> str:
"""Return region health info."""
if not self._mesh_reporter:
return "Mesh health not available."
args = args.strip()
if not args:
# List all regions
return self._mesh_reporter.list_regions_compact()
# Get specific region detail (compact for LoRa)
return self._mesh_reporter.build_lora_compact("region", args)

View file

@ -1,439 +1,450 @@
"""Main entry point for MeshAI."""
import argparse
import asyncio
import logging
import os
import signal
import sys
import time
from pathlib import Path
from typing import Optional
from . import __version__
from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend
from .cli import run_configurator
from .commands import CommandDispatcher
from .commands.dispatcher import create_dispatcher
from .commands.status import set_start_time
from .config import Config, load_config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .memory import ConversationSummary
from .responder import Responder
from .router import MessageRouter, RouteType
logger = logging.getLogger(__name__)
class MeshAI:
"""Main application class."""
def __init__(self, config: Config):
self.config = config
self.connector: Optional[MeshConnector] = None
self.history: Optional[ConversationHistory] = None
self.dispatcher: Optional[CommandDispatcher] = None
self.llm: Optional[LLMBackend] = None
self.context: Optional[MeshContext] = None
self.meshmonitor_sync = None
self.knowledge = None
self.source_manager = None
self.health_engine = None
self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None
self._running = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._last_cleanup: float = 0.0
self._last_health_compute: float = 0.0
async def start(self) -> None:
"""Start the bot."""
logger.info(f"Starting MeshAI v{__version__}")
set_start_time(time.time())
# Initialize components
await self._init_components()
# Connect to Meshtastic
self.connector.connect()
self.connector.set_message_callback(self._on_message, asyncio.get_event_loop())
# Add own node ID to context ignore list
if self.context and self.connector.my_node_id:
self.context._ignore_nodes.add(self.connector.my_node_id)
self._running = True
self._loop = asyncio.get_event_loop()
self._last_cleanup = time.time()
self._last_health_compute = 0.0
# Write PID file
self._write_pid()
logger.info("MeshAI started successfully")
# Keep running
while self._running:
await asyncio.sleep(1)
# Periodic MeshMonitor refresh
if self.meshmonitor_sync:
self.meshmonitor_sync.maybe_refresh()
# Periodic mesh source refresh and health computation
if self.source_manager:
refreshed = self.source_manager.refresh_all()
# Recompute health after source refresh
if refreshed > 0 and self.health_engine:
self.health_engine.compute(self.source_manager)
self._last_health_compute = time.time()
# Periodic cleanup
if time.time() - self._last_cleanup >= 3600:
await self.history.cleanup_expired()
if self.context:
self.context.prune()
self._last_cleanup = time.time()
async def stop(self) -> None:
"""Stop the bot."""
logger.info("Stopping MeshAI...")
self._running = False
if self.connector:
self.connector.disconnect()
if self.history:
await self.history.close()
if self.llm:
await self.llm.close()
if self.knowledge:
self.knowledge.close()
self._remove_pid()
logger.info("MeshAI stopped")
async def _init_components(self) -> None:
"""Initialize all components."""
# Conversation history
self.history = ConversationHistory(self.config.history)
await self.history.initialize()
# Command dispatcher
self.dispatcher = create_dispatcher(
prefix=self.config.commands.prefix,
disabled_commands=self.config.commands.disabled_commands,
custom_commands=self.config.commands.custom_commands,
)
# LLM backend
api_key = self.config.resolve_api_key()
if not api_key:
logger.warning("No API key configured - LLM responses will fail")
# Memory config
mem_cfg = self.config.memory
window_size = mem_cfg.window_size if mem_cfg.enabled else 0
summarize_threshold = mem_cfg.summarize_threshold
# Create backend
backend = self.config.llm.backend.lower()
if backend == "openai":
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "anthropic":
self.llm = AnthropicBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "google":
self.llm = GoogleBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
else:
logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI")
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
# Load persisted summaries into memory cache
await self._load_summaries()
# Meshtastic connector
self.connector = MeshConnector(self.config.connection)
# Passive mesh context buffer
ctx_cfg = self.config.context
if ctx_cfg.enabled:
self.context = MeshContext(
observe_channels=ctx_cfg.observe_channels or None,
ignore_nodes=ctx_cfg.ignore_nodes or None,
max_age=ctx_cfg.max_age,
)
logger.info("Mesh context buffer enabled")
else:
self.context = None
# MeshMonitor trigger sync
mm_cfg = self.config.meshmonitor
if mm_cfg.enabled and mm_cfg.url:
from .meshmonitor import MeshMonitorSync
self.meshmonitor_sync = MeshMonitorSync(
url=mm_cfg.url,
refresh_interval=mm_cfg.refresh_interval,
)
count = self.meshmonitor_sync.load()
logger.info(f"MeshMonitor sync enabled, loaded {count} triggers")
else:
self.meshmonitor_sync = None
# Mesh data sources
enabled_sources = [s for s in self.config.mesh_sources if s.enabled]
if enabled_sources:
from .mesh_sources import MeshSourceManager
self.source_manager = MeshSourceManager(enabled_sources)
# Initial fetch
self.source_manager.refresh_all()
# Log status
for status in self.source_manager.get_status():
if status["is_loaded"]:
logger.info(
f"Mesh source '{status['name']}' ({status['type']}): "
f"{status['node_count']} nodes"
)
else:
logger.warning(
f"Mesh source '{status['name']}' ({status['type']}): "
f"failed - {status.get('last_error', 'unknown error')}"
)
else:
self.source_manager = None
# Mesh health engine
mi_cfg = self.config.mesh_intelligence
if mi_cfg.enabled and self.source_manager:
from .mesh_health import MeshHealthEngine
self.health_engine = MeshHealthEngine(
regions=mi_cfg.regions,
locality_radius=mi_cfg.locality_radius_miles,
offline_threshold_hours=mi_cfg.offline_threshold_hours,
packet_threshold=mi_cfg.packet_threshold,
battery_warning_percent=mi_cfg.battery_warning_percent,
)
# Initial health computation
mesh_health = self.health_engine.compute(self.source_manager)
self._last_health_compute = time.time()
logger.info(
f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, "
f"{mesh_health.total_regions} regions, "
f"score {mesh_health.score.composite:.0f}/100 ({mesh_health.score.tier})"
)
else:
self.health_engine = None
# Knowledge base
kb_cfg = self.config.knowledge
if kb_cfg.enabled and kb_cfg.db_path:
from .knowledge import KnowledgeSearch
self.knowledge = KnowledgeSearch(
db_path=kb_cfg.db_path,
top_k=kb_cfg.top_k,
)
else:
self.knowledge = None
# Message router
self.router = MessageRouter(
self.config, self.connector, self.history, self.dispatcher, self.llm,
context=self.context,
meshmonitor_sync=self.meshmonitor_sync,
knowledge=self.knowledge,
source_manager=self.source_manager,
health_engine=self.health_engine,
)
# Responder
self.responder = Responder(self.config.response, self.connector)
async def _on_message(self, message: MeshMessage) -> None:
"""Handle incoming message."""
try:
# Passively observe channel broadcasts for context (before filtering)
if self.context and not message.is_dm and message.text:
self.context.observe(
sender_name=message.sender_name,
sender_id=message.sender_id,
text=message.text,
channel=message.channel,
is_dm=False,
)
# Check if we should respond
if not self.router.should_respond(message):
return
logger.info(
f"Processing message from {message.sender_name} ({message.sender_id}): "
f"{message.text[:50]}..."
)
# Route the message
# Check for continuation request first
continuation_messages = self.router.check_continuation(message)
if continuation_messages:
await self.responder.send_response(
continuation_messages,
destination=message.sender_id,
channel=message.channel,
)
return
result = await self.router.route(message)
if result.route_type == RouteType.IGNORE:
return
# Determine response
if result.route_type == RouteType.COMMAND:
messages = result.response # Commands return single string
elif result.route_type == RouteType.LLM:
messages = await self.router.generate_llm_response(message, result.query)
else:
return
if not messages:
return
# Send DM response
await self.responder.send_response(
messages,
destination=message.sender_id,
channel=message.channel,
)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
async def _load_summaries(self) -> None:
"""Load persisted summaries from database into memory cache."""
memory = self.llm.get_memory()
if not memory:
return
if not self.history or not self.history._db:
return
try:
async with self.history._lock:
cursor = await self.history._db.execute(
"SELECT user_id, summary, message_count, updated_at "
"FROM conversation_summaries"
)
rows = await cursor.fetchall()
loaded = 0
for row in rows:
user_id, summary_text, message_count, updated_at = row
summary = ConversationSummary(
summary=summary_text,
last_updated=updated_at,
message_count=message_count,
)
memory.load_summary(user_id, summary)
loaded += 1
if loaded:
logger.info(f"Loaded {loaded} conversation summaries from database")
except Exception as e:
logger.warning(f"Failed to load summaries from database: {e}")
def _write_pid(self) -> None:
"""Write PID file."""
pid_file = Path("/tmp/meshai.pid")
pid_file.write_text(str(os.getpid()))
def _remove_pid(self) -> None:
"""Remove PID file."""
pid_file = Path("/tmp/meshai.pid")
if pid_file.exists():
pid_file.unlink()
def setup_logging(verbose: bool = False) -> None:
"""Configure logging."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="MeshAI - LLM-powered Meshtastic assistant",
prog="meshai",
)
parser.add_argument(
"--version", "-V", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument(
"--config", "-c", action="store_true", help="Launch configuration tool"
)
parser.add_argument(
"--config-file",
"-f",
type=Path,
default=Path("config.yaml"),
help="Path to config file (default: config.yaml)",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
setup_logging(args.verbose)
# Launch configurator if requested
if args.config:
run_configurator(args.config_file)
return
# Load config
config = load_config(args.config_file)
# Check if config exists
if not args.config_file.exists():
logger.warning(f"Config file not found: {args.config_file}")
logger.info("Run 'meshai --config' to create one, or copy config.example.yaml")
sys.exit(1)
# Create and run bot
bot = MeshAI(config)
# Handle signals
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}")
loop.create_task(bot.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
loop.run_until_complete(bot.start())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(bot.stop())
loop.close()
if __name__ == "__main__":
main()
"""Main entry point for MeshAI."""
import argparse
import asyncio
import logging
import os
import signal
import sys
import time
from pathlib import Path
from typing import Optional
from . import __version__
from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend
from .cli import run_configurator
from .commands import CommandDispatcher
from .commands.dispatcher import create_dispatcher
from .commands.status import set_start_time
from .config import Config, load_config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .memory import ConversationSummary
from .responder import Responder
from .router import MessageRouter, RouteType
logger = logging.getLogger(__name__)
class MeshAI:
"""Main application class."""
def __init__(self, config: Config):
self.config = config
self.connector: Optional[MeshConnector] = None
self.history: Optional[ConversationHistory] = None
self.dispatcher: Optional[CommandDispatcher] = None
self.llm: Optional[LLMBackend] = None
self.context: Optional[MeshContext] = None
self.meshmonitor_sync = None
self.knowledge = None
self.source_manager = None
self.health_engine = None
self.mesh_reporter = None
self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None
self._running = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._last_cleanup: float = 0.0
self._last_health_compute: float = 0.0
async def start(self) -> None:
"""Start the bot."""
logger.info(f"Starting MeshAI v{__version__}")
set_start_time(time.time())
# Initialize components
await self._init_components()
# Connect to Meshtastic
self.connector.connect()
self.connector.set_message_callback(self._on_message, asyncio.get_event_loop())
# Add own node ID to context ignore list
if self.context and self.connector.my_node_id:
self.context._ignore_nodes.add(self.connector.my_node_id)
self._running = True
self._loop = asyncio.get_event_loop()
self._last_cleanup = time.time()
self._last_health_compute = 0.0
# Write PID file
self._write_pid()
logger.info("MeshAI started successfully")
# Keep running
while self._running:
await asyncio.sleep(1)
# Periodic MeshMonitor refresh
if self.meshmonitor_sync:
self.meshmonitor_sync.maybe_refresh()
# Periodic mesh source refresh and health computation
if self.source_manager:
refreshed = self.source_manager.refresh_all()
# Recompute health after source refresh
if refreshed > 0 and self.health_engine:
self.health_engine.compute(self.source_manager)
self._last_health_compute = time.time()
# Periodic cleanup
if time.time() - self._last_cleanup >= 3600:
await self.history.cleanup_expired()
if self.context:
self.context.prune()
self._last_cleanup = time.time()
async def stop(self) -> None:
"""Stop the bot."""
logger.info("Stopping MeshAI...")
self._running = False
if self.connector:
self.connector.disconnect()
if self.history:
await self.history.close()
if self.llm:
await self.llm.close()
if self.knowledge:
self.knowledge.close()
self._remove_pid()
logger.info("MeshAI stopped")
async def _init_components(self) -> None:
"""Initialize all components."""
# Conversation history
self.history = ConversationHistory(self.config.history)
await self.history.initialize()
# LLM backend
api_key = self.config.resolve_api_key()
if not api_key:
logger.warning("No API key configured - LLM responses will fail")
# Memory config
mem_cfg = self.config.memory
window_size = mem_cfg.window_size if mem_cfg.enabled else 0
summarize_threshold = mem_cfg.summarize_threshold
# Create backend
backend = self.config.llm.backend.lower()
if backend == "openai":
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "anthropic":
self.llm = AnthropicBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "google":
self.llm = GoogleBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
else:
logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI")
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
# Load persisted summaries into memory cache
await self._load_summaries()
# Meshtastic connector
self.connector = MeshConnector(self.config.connection)
# Passive mesh context buffer
ctx_cfg = self.config.context
if ctx_cfg.enabled:
self.context = MeshContext(
observe_channels=ctx_cfg.observe_channels or None,
ignore_nodes=ctx_cfg.ignore_nodes or None,
max_age=ctx_cfg.max_age,
)
logger.info("Mesh context buffer enabled")
else:
self.context = None
# MeshMonitor trigger sync
mm_cfg = self.config.meshmonitor
if mm_cfg.enabled and mm_cfg.url:
from .meshmonitor import MeshMonitorSync
self.meshmonitor_sync = MeshMonitorSync(
url=mm_cfg.url,
refresh_interval=mm_cfg.refresh_interval,
)
count = self.meshmonitor_sync.load()
logger.info(f"MeshMonitor sync enabled, loaded {count} triggers")
else:
self.meshmonitor_sync = None
# Mesh data sources
enabled_sources = [s for s in self.config.mesh_sources if s.enabled]
if enabled_sources:
from .mesh_sources import MeshSourceManager
self.source_manager = MeshSourceManager(enabled_sources)
# Initial fetch
self.source_manager.refresh_all()
# Log status
for status in self.source_manager.get_status():
if status["is_loaded"]:
logger.info(
f"Mesh source '{status['name']}' ({status['type']}): "
f"{status['node_count']} nodes"
)
else:
logger.warning(
f"Mesh source '{status['name']}' ({status['type']}): "
f"failed - {status.get('last_error', 'unknown error')}"
)
else:
self.source_manager = None
# Mesh health engine
mi_cfg = self.config.mesh_intelligence
if mi_cfg.enabled and self.source_manager:
from .mesh_health import MeshHealthEngine
self.health_engine = MeshHealthEngine(
regions=mi_cfg.regions,
locality_radius=mi_cfg.locality_radius_miles,
offline_threshold_hours=mi_cfg.offline_threshold_hours,
packet_threshold=mi_cfg.packet_threshold,
battery_warning_percent=mi_cfg.battery_warning_percent,
)
# Initial health computation
mesh_health = self.health_engine.compute(self.source_manager)
self._last_health_compute = time.time()
logger.info(
f"Mesh intelligence enabled: {mesh_health.total_nodes} nodes, "
f"{mesh_health.total_regions} regions, "
f"score {mesh_health.score.composite:.0f}/100 ({mesh_health.score.tier})"
)
else:
self.health_engine = None
# Mesh reporter (for LLM prompt injection and commands)
if self.health_engine and self.source_manager:
from .mesh_reporter import MeshReporter
self.mesh_reporter = MeshReporter(self.health_engine, self.source_manager)
logger.info("Mesh reporter enabled")
else:
self.mesh_reporter = None
# Knowledge base
kb_cfg = self.config.knowledge
if kb_cfg.enabled and kb_cfg.db_path:
from .knowledge import KnowledgeSearch
self.knowledge = KnowledgeSearch(
db_path=kb_cfg.db_path,
top_k=kb_cfg.top_k,
)
else:
self.knowledge = None
# Command dispatcher (needs mesh_reporter for health commands)
self.dispatcher = create_dispatcher(
prefix=self.config.commands.prefix,
disabled_commands=self.config.commands.disabled_commands,
custom_commands=self.config.commands.custom_commands,
mesh_reporter=self.mesh_reporter,
)
# Message router
self.router = MessageRouter(
self.config, self.connector, self.history, self.dispatcher, self.llm,
context=self.context,
meshmonitor_sync=self.meshmonitor_sync,
knowledge=self.knowledge,
source_manager=self.source_manager,
health_engine=self.health_engine,
mesh_reporter=self.mesh_reporter,
)
# Responder
self.responder = Responder(self.config.response, self.connector)
async def _on_message(self, message: MeshMessage) -> None:
"""Handle incoming message."""
try:
# Passively observe channel broadcasts for context (before filtering)
if self.context and not message.is_dm and message.text:
self.context.observe(
sender_name=message.sender_name,
sender_id=message.sender_id,
text=message.text,
channel=message.channel,
is_dm=False,
)
# Check if we should respond
if not self.router.should_respond(message):
return
logger.info(
f"Processing message from {message.sender_name} ({message.sender_id}): "
f"{message.text[:50]}..."
)
# Route the message
# Check for continuation request first
continuation_messages = self.router.check_continuation(message)
if continuation_messages:
await self.responder.send_response(
continuation_messages,
destination=message.sender_id,
channel=message.channel,
)
return
result = await self.router.route(message)
if result.route_type == RouteType.IGNORE:
return
# Determine response
if result.route_type == RouteType.COMMAND:
messages = result.response # Commands return single string
elif result.route_type == RouteType.LLM:
messages = await self.router.generate_llm_response(message, result.query)
else:
return
if not messages:
return
# Send DM response
await self.responder.send_response(
messages,
destination=message.sender_id,
channel=message.channel,
)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
async def _load_summaries(self) -> None:
"""Load persisted summaries from database into memory cache."""
memory = self.llm.get_memory()
if not memory:
return
if not self.history or not self.history._db:
return
try:
async with self.history._lock:
cursor = await self.history._db.execute(
"SELECT user_id, summary, message_count, updated_at "
"FROM conversation_summaries"
)
rows = await cursor.fetchall()
loaded = 0
for row in rows:
user_id, summary_text, message_count, updated_at = row
summary = ConversationSummary(
summary=summary_text,
last_updated=updated_at,
message_count=message_count,
)
memory.load_summary(user_id, summary)
loaded += 1
if loaded:
logger.info(f"Loaded {loaded} conversation summaries from database")
except Exception as e:
logger.warning(f"Failed to load summaries from database: {e}")
def _write_pid(self) -> None:
"""Write PID file."""
pid_file = Path("/tmp/meshai.pid")
pid_file.write_text(str(os.getpid()))
def _remove_pid(self) -> None:
"""Remove PID file."""
pid_file = Path("/tmp/meshai.pid")
if pid_file.exists():
pid_file.unlink()
def setup_logging(verbose: bool = False) -> None:
"""Configure logging."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="MeshAI - LLM-powered Meshtastic assistant",
prog="meshai",
)
parser.add_argument(
"--version", "-V", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument(
"--config", "-c", action="store_true", help="Launch configuration tool"
)
parser.add_argument(
"--config-file",
"-f",
type=Path,
default=Path("config.yaml"),
help="Path to config file (default: config.yaml)",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
setup_logging(args.verbose)
# Launch configurator if requested
if args.config:
run_configurator(args.config_file)
return
# Load config
config = load_config(args.config_file)
# Check if config exists
if not args.config_file.exists():
logger.warning(f"Config file not found: {args.config_file}")
logger.info("Run 'meshai --config' to create one, or copy config.example.yaml")
sys.exit(1)
# Create and run bot
bot = MeshAI(config)
# Handle signals
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}")
loop.create_task(bot.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
loop.run_until_complete(bot.start())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(bot.stop())
loop.close()
if __name__ == "__main__":
main()

View file

@ -279,7 +279,7 @@ class MeshHealthEngine:
role = node.get("role") or node.get("hwModel") or ""
# Determine if infrastructure
is_infra = role.upper() in INFRASTRUCTURE_ROLES
is_infra = str(role).upper() in INFRASTRUCTURE_ROLES
# Get position (handle different API formats)
lat = node.get("latitude") or node.get("lat")

545
meshai/mesh_reporter.py Normal file
View file

@ -0,0 +1,545 @@
"""Mesh health reporting for LLM prompt injection and commands."""
import logging
import time
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
def _format_age(timestamp: float) -> str:
"""Format a timestamp as human-readable age."""
if not timestamp:
return "never"
age_seconds = time.time() - timestamp
if age_seconds < 0:
return "just now"
elif age_seconds < 60:
return f"{int(age_seconds)}s ago"
elif age_seconds < 3600:
return f"{int(age_seconds / 60)}m ago"
elif age_seconds < 86400:
return f"{int(age_seconds / 3600)}h ago"
else:
return f"{int(age_seconds / 86400)}d ago"
def _tier_flag(tier: str) -> str:
"""Get warning flag for health tier."""
if tier == "Critical":
return " !!"
elif tier == "Warning":
return " !"
elif tier == "Unhealthy":
return " !"
return ""
class MeshReporter:
"""Builds text blocks for mesh health prompt injection."""
def __init__(self, health_engine, source_manager):
"""Initialize reporter.
Args:
health_engine: MeshHealthEngine instance
source_manager: MeshSourceManager instance
"""
self.health_engine = health_engine
self.source_manager = source_manager
def build_tier1_summary(self) -> str:
"""Build compact mesh summary for LLM injection (~500-800 tokens).
Returns:
Formatted summary string
"""
health = self.health_engine.mesh_health
if not health:
return "LIVE MESH HEALTH DATA: No data available yet."
score = health.score
ts = datetime.fromtimestamp(health.last_computed).strftime("%H:%M %Z")
# Infrastructure stats
infra_online = score.infra_online
infra_total = score.infra_total
infra_pct = int((infra_online / infra_total * 100) if infra_total > 0 else 100)
# Utilization
util = score.util_percent
if util < 15:
util_label = "Low"
elif util < 20:
util_label = "Moderate"
elif util < 25:
util_label = "Elevated"
else:
util_label = "High"
# Power
if score.battery_warnings == 0:
power_label = "Good"
elif score.battery_warnings <= 2:
power_label = "Some low batteries"
else:
power_label = f"{score.battery_warnings} low batteries"
lines = [
f"LIVE MESH HEALTH DATA (as of {ts}):",
"",
f"Overall: {score.composite:.0f}/100 ({score.tier})",
f"Infrastructure: {infra_online}/{infra_total} online ({infra_pct}%)",
f"Channel Utilization: {util:.1f}% avg ({util_label})",
f"Node Behavior: {score.flagged_nodes} nodes flagged",
f"Power/Solar: {power_label} ({score.solar_index:.0f}% solar index)",
"",
"Regions:",
]
# Region summaries
for region in health.regions:
rs = region.score
flag = _tier_flag(rs.tier)
infra_str = f"{rs.infra_online}/{rs.infra_total} infra"
lines.append(f" {region.name}: {rs.composite:.0f}/100 - {infra_str}, {rs.util_percent:.0f}% util{flag}")
# Top issues
issues = self._gather_top_issues(health)
if issues:
lines.append("")
lines.append("Top Issues:")
for i, issue in enumerate(issues[:5], 1):
lines.append(f" {i}. {issue}")
lines.append("")
lines.append(f"{health.total_nodes} nodes across {health.total_regions} regions. User can ask about any region, locality, or node for details.")
return "\n".join(lines)
def _gather_top_issues(self, health) -> list[str]:
"""Gather top issues across all pillars."""
issues = []
# Infrastructure issues (offline nodes)
for region in health.regions:
offline_infra = []
for nid in region.node_ids:
node = health.nodes.get(nid)
if node and node.is_infrastructure and not node.is_online:
offline_infra.append(node.short_name or nid[:4])
if offline_infra:
total_infra = sum(1 for nid in region.node_ids
if health.nodes.get(nid) and health.nodes[nid].is_infrastructure)
online = total_infra - len(offline_infra)
issues.append(f"{region.name}: {online}/{total_infra} infrastructure nodes offline ({', '.join(offline_infra[:3])})")
# Utilization issues
for region in health.regions:
if region.score.util_percent >= 25:
issues.append(f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Warning)")
elif region.score.util_percent >= 20:
issues.append(f"{region.name}: channel utilization at {region.score.util_percent:.0f}% (Elevated)")
# Behavior issues (high packet nodes)
flagged = self.health_engine.get_flagged_nodes()
for node in flagged[:3]:
threshold = self.health_engine.packet_threshold
ratio = node.non_text_packets / threshold
issues.append(f"Node {node.short_name or node.node_id[:4]} sending {node.non_text_packets} non-text packets/24h ({ratio:.1f}x threshold)")
# Battery issues
battery_warnings = self.health_engine.get_battery_warnings()
for node in battery_warnings[:2]:
issues.append(f"Node {node.short_name or node.node_id[:4]} battery at {node.battery_percent:.0f}%")
return issues
def build_region_detail(self, region_name: str) -> str:
"""Build detailed breakdown for a specific region.
Args:
region_name: Region to get detail for
Returns:
Formatted region detail string
"""
health = self.health_engine.mesh_health
if not health:
return f"REGION DETAIL: {region_name}\nNo data available."
# Find region (fuzzy match)
region = self._find_region(region_name)
if not region:
return f"REGION DETAIL: {region_name}\nRegion not found."
rs = region.score
lines = [
f"REGION DETAIL: {region.name}",
f"Score: {rs.composite:.0f}/100 ({rs.tier})",
"",
f"Infrastructure ({rs.infra_online}/{rs.infra_total}):",
]
# List infrastructure nodes
for nid in region.node_ids:
node = health.nodes.get(nid)
if not node or not node.is_infrastructure:
continue
status = "+" if node.is_online else "X"
age = _format_age(node.last_seen)
bat = f", bat {node.battery_percent:.0f}%" if node.battery_percent else ""
role = node.role or "ROUTER"
lines.append(f" {status} {node.short_name or nid[:4]} ({role}) - last seen {age}{bat}")
if not node.is_online:
lines[-1] += " <- OFFLINE"
# Channel utilization by locality
lines.append("")
lines.append(f"Channel Utilization: {rs.util_percent:.0f}%")
if region.localities:
lines.append(" Localities:")
for loc in region.localities:
node_count = len(loc.node_ids)
lines.append(f" {loc.name}: {loc.score.util_percent:.0f}% - {node_count} nodes")
# Flagged nodes in this region
flagged_in_region = []
for nid in region.node_ids:
node = health.nodes.get(nid)
if node and node.non_text_packets > self.health_engine.packet_threshold:
flagged_in_region.append(node)
if flagged_in_region:
lines.append("")
lines.append("Flagged Nodes:")
for node in flagged_in_region[:5]:
lines.append(f" {node.short_name or node.node_id[:4]}: {node.non_text_packets} non-text pkts/24h")
# Power warnings in this region
low_bat = []
for nid in region.node_ids:
node = health.nodes.get(nid)
if node and node.battery_percent is not None and node.battery_percent < self.health_engine.battery_warning_percent:
low_bat.append(node)
if low_bat:
lines.append("")
lines.append("Power:")
bat_str = ", ".join(f"{n.short_name or n.node_id[:4]} at {n.battery_percent:.0f}%" for n in low_bat[:4])
lines.append(f" Low battery: {bat_str}")
return "\n".join(lines)
def build_node_detail(self, node_identifier: str) -> str:
"""Build detailed info for a specific node.
Args:
node_identifier: Shortname, longname, nodeId, or nodeNum
Returns:
Formatted node detail string
"""
health = self.health_engine.mesh_health
if not health:
return f"NODE DETAIL: {node_identifier}\nNo data available."
# Find node (multiple match strategies)
node = self._find_node(node_identifier)
if not node:
return f"NODE DETAIL: {node_identifier}\nNode not found."
lines = [
f"NODE DETAIL: {node.long_name or node.short_name} ({node.short_name})",
f"ID: {node.node_id}",
f"Hardware: {node.role or 'Unknown'}",
f"Role: {'Infrastructure' if node.is_infrastructure else 'Client'}",
f"Region: {node.region or 'Unknown'} / Locality: {node.locality or 'Unknown'}",
]
if node.latitude and node.longitude:
lines.append(f"Position: {node.latitude:.4f}, {node.longitude:.4f}")
age = _format_age(node.last_seen)
status = "Online" if node.is_online else "OFFLINE"
lines.append(f"Last Seen: {age} ({status})")
# Get source info from source manager
all_nodes = self.source_manager.get_all_nodes()
sources = []
for n in all_nodes:
nid = str(n.get("id") or n.get("nodeId") or n.get("num") or "")
if nid == node.node_id:
sources = n.get("_sources", [])
break
if sources:
lines.append(f"Sources: {', '.join(sources)}")
# Traffic stats
lines.append("")
lines.append("Traffic (24h):")
lines.append(f" Total packets: {node.packet_count_24h}")
lines.append(f" Text messages: {node.text_packet_count_24h}")
lines.append(f" Non-text: {node.non_text_packets}")
# Power
lines.append("")
lines.append("Power:")
if node.battery_percent is not None:
bat_status = "Low" if node.battery_percent < 20 else "OK"
lines.append(f" Battery: {node.battery_percent:.0f}% ({bat_status})")
else:
lines.append(" Battery: N/A")
if node.voltage:
lines.append(f" Voltage: {node.voltage:.2f}V")
lines.append(f" Solar: {'Yes' if node.has_solar else 'Unknown'}")
# Recommendations for this node
recs = self._node_recommendations(node)
if recs:
lines.append("")
lines.append("Recommendations:")
for rec in recs:
lines.append(f" - {rec}")
return "\n".join(lines)
def _node_recommendations(self, node) -> list[str]:
"""Generate recommendations for a specific node."""
recs = []
# High packet count
if node.non_text_packets > self.health_engine.packet_threshold:
ratio = node.non_text_packets / self.health_engine.packet_threshold
recs.append(f"Sending {ratio:.1f}x normal packets. Check position/telemetry intervals.")
# Low battery
if node.battery_percent is not None and node.battery_percent < 20:
recs.append(f"Battery at {node.battery_percent:.0f}%. Consider charging or adding solar.")
# Offline
if not node.is_online:
age = _format_age(node.last_seen)
recs.append(f"Node offline since {age}. Check power and connectivity.")
return recs
def build_recommendations(self, scope: str, scope_value: str = None) -> str:
"""Generate actionable optimization recommendations.
Args:
scope: "mesh", "region", or "node"
scope_value: Region name or node identifier (for scoped recommendations)
Returns:
Formatted recommendations string
"""
health = self.health_engine.mesh_health
if not health:
return ""
recs = []
if scope == "node" and scope_value:
node = self._find_node(scope_value)
if node:
recs.extend(self._node_recommendations(node))
elif scope == "region" and scope_value:
region = self._find_region(scope_value)
if region:
recs.extend(self._region_recommendations(region, health))
else: # mesh scope
recs.extend(self._mesh_recommendations(health))
if not recs:
return ""
lines = ["OPTIMIZATION RECOMMENDATIONS:"]
for rec in recs[:5]:
lines.append(f" - {rec}")
return "\n".join(lines)
def _region_recommendations(self, region, health) -> list[str]:
"""Generate recommendations for a region."""
recs = []
# High utilization
if region.score.util_percent >= 20:
recs.append(f"Channel utilization at {region.score.util_percent:.0f}%. Consider spreading nodes across frequencies or reducing telemetry intervals.")
# Offline infrastructure
offline_count = region.score.infra_total - region.score.infra_online
if offline_count > 0:
recs.append(f"{offline_count} infrastructure node(s) offline. Check power and connectivity.")
# Flagged nodes
flagged = []
for nid in region.node_ids:
node = health.nodes.get(nid)
if node and node.non_text_packets > self.health_engine.packet_threshold:
flagged.append(node)
if flagged:
names = ", ".join(n.short_name or n.node_id[:4] for n in flagged[:3])
recs.append(f"High-traffic nodes ({names}) impacting channel. Review their telemetry settings.")
return recs
def _mesh_recommendations(self, health) -> list[str]:
"""Generate mesh-wide recommendations."""
recs = []
# Overall utilization
if health.score.util_percent >= 20:
recs.append(f"Mesh-wide utilization at {health.score.util_percent:.0f}%. Consider reducing position/telemetry broadcast frequency.")
# Multiple regions with issues
problem_regions = [r for r in health.regions if r.score.composite < 75]
if len(problem_regions) > 1:
names = ", ".join(r.name for r in problem_regions[:3])
recs.append(f"Multiple regions degraded ({names}). Prioritize infrastructure improvements.")
# High packet nodes mesh-wide
flagged = self.health_engine.get_flagged_nodes()
if len(flagged) > 3:
total_excess = sum(n.non_text_packets - self.health_engine.packet_threshold for n in flagged)
recs.append(f"{len(flagged)} nodes exceeding packet threshold ({total_excess} excess packets/day). Review default telemetry intervals.")
# Battery warnings
battery_warnings = self.health_engine.get_battery_warnings()
if len(battery_warnings) > 2:
recs.append(f"{len(battery_warnings)} nodes with low battery. Consider solar additions for remote nodes.")
return recs
def build_lora_compact(self, scope: str, scope_value: str = None) -> str:
"""Build LoRa-optimized compact summary (~200 chars).
Args:
scope: "mesh" or "region"
scope_value: Region name if scope is "region"
Returns:
Compact formatted string
"""
health = self.health_engine.mesh_health
if not health:
return "Mesh: No data"
if scope == "region" and scope_value:
region = self._find_region(scope_value)
if not region:
return f"Region '{scope_value}' not found"
rs = region.score
return f"{region.name} {rs.composite:.0f}/100 | {rs.infra_online}/{rs.infra_total} infra | {rs.util_percent:.0f}% util"
# Mesh summary
s = health.score
lines = [f"Mesh {s.composite:.0f}/100 | {s.infra_online}/{s.infra_total} infra | {s.util_percent:.0f}% util"]
# Add warnings for problem regions/nodes
warnings = []
for region in health.regions:
if region.score.composite < 60:
offline = region.score.infra_total - region.score.infra_online
warnings.append(f"! {region.name} {region.score.composite:.0f}/100 - {offline} infra offline")
battery_warnings = self.health_engine.get_battery_warnings()
for node in battery_warnings[:2]:
warnings.append(f"! {node.short_name or node.node_id[:4]} bat {node.battery_percent:.0f}%")
for w in warnings[:2]:
lines.append(w)
return "\n".join(lines)
def _find_region(self, name: str):
"""Find a region by fuzzy name match."""
health = self.health_engine.mesh_health
if not health:
return None
name_lower = name.lower().strip()
# Exact match first
for region in health.regions:
if region.name.lower() == name_lower:
return region
# Substring match
for region in health.regions:
if name_lower in region.name.lower():
return region
# Try matching against anchor city names
for anchor in self.health_engine.regions:
# Check if search term matches anchor city or region name
anchor_name_lower = anchor.name.lower()
if name_lower in anchor_name_lower:
# Find the corresponding region
for region in health.regions:
if region.name == anchor.name:
return region
return None
def _find_node(self, identifier: str):
"""Find a node by shortname, longname, nodeId, or nodeNum."""
health = self.health_engine.mesh_health
if not health:
return None
identifier = identifier.strip()
id_lower = identifier.lower()
# Try shortname (case-insensitive)
for node in health.nodes.values():
if node.short_name and node.short_name.lower() == id_lower:
return node
# Try longname (substring)
for node in health.nodes.values():
if node.long_name and id_lower in node.long_name.lower():
return node
# Try exact nodeId
if identifier in health.nodes:
return health.nodes[identifier]
# Try hex nodeId with ! prefix
if identifier.startswith("!"):
hex_id = identifier[1:]
for nid, node in health.nodes.items():
if nid.lower() == hex_id.lower():
return node
# Try decimal nodeNum
if identifier.isdigit():
# Convert to hex and search
try:
hex_id = format(int(identifier), 'x')
for nid, node in health.nodes.items():
if hex_id in nid.lower():
return node
except ValueError:
pass
return None
def list_regions_compact(self) -> str:
"""List all regions with scores in compact format."""
health = self.health_engine.mesh_health
if not health or not health.regions:
return "No regions configured."
lines = ["Regions:"]
for region in health.regions:
s = region.score
flag = _tier_flag(s.tier)
lines.append(f" {region.name}: {s.composite:.0f}/100{flag}")
return "\n".join(lines)

View file

@ -1,344 +1,472 @@
"""Message routing logic for MeshAI."""
import asyncio
import logging
import re
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional
from .backends.base import LLMBackend
from .commands import CommandContext, CommandDispatcher
from .config import Config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .chunker import chunk_response, ContinuationState
logger = logging.getLogger(__name__)
class RouteType(Enum):
"""Type of message routing."""
IGNORE = auto() # Don't respond
COMMAND = auto() # Bang command
LLM = auto() # Route to LLM
@dataclass
class RouteResult:
"""Result of routing decision."""
route_type: RouteType
response: Optional[str] = None # For commands, the response
query: Optional[str] = None # For LLM, the cleaned query
# advBBS protocol and notification prefixes to ignore
ADVBBS_PREFIXES = (
"MAILREQ|", "MAILACK|", "MAILNAK|", "MAILDAT|", "MAILDLV|",
"BOARDREQ|", "BOARDACK|", "BOARDNAK|", "BOARDDAT|", "BOARDDLV|",
"advBBS|",
"[MAIL]",
)
# Patterns that suggest prompt injection attempts
_INJECTION_PATTERNS = [
re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE),
re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"you\s+are\s+now\b", re.IGNORECASE),
re.compile(r"new\s+instructions?\s*:", re.IGNORECASE),
re.compile(r"system\s*prompt\s*:", re.IGNORECASE),
]
class MessageRouter:
"""Routes incoming messages to appropriate handlers."""
def __init__(
self,
config: Config,
connector: MeshConnector,
history: ConversationHistory,
dispatcher: CommandDispatcher,
llm_backend: LLMBackend,
context: MeshContext = None,
meshmonitor_sync=None,
knowledge=None,
source_manager=None,
health_engine=None,
):
self.config = config
self.connector = connector
self.history = history
self.dispatcher = dispatcher
self.llm = llm_backend
self.context = context
self.meshmonitor_sync = meshmonitor_sync
self.knowledge = knowledge
self.source_manager = source_manager
self.health_engine = health_engine
self.continuations = ContinuationState(max_continuations=3)
def should_respond(self, message: MeshMessage) -> bool:
"""Determine if we should respond to this message.
DM-only bot: ignores all public channel messages.
Commands and conversational LLM responses both work in DMs.
Args:
message: Incoming message
Returns:
True if we should process this message
"""
# Always ignore our own messages
if message.sender_id == self.connector.my_node_id:
return False
# Only respond to DMs
if not message.is_dm:
return False
if not self.config.bot.respond_to_dms:
return False
# Ignore advBBS protocol and notification messages
if self.config.bot.filter_bbs_protocols:
if any(message.text.startswith(p) for p in ADVBBS_PREFIXES):
logger.debug(f"Ignoring advBBS message from {message.sender_id}: {message.text[:40]}...")
return False
# Ignore messages that MeshMonitor will handle
if self.meshmonitor_sync and self.meshmonitor_sync.matches(message.text):
logger.debug(f"Ignoring MeshMonitor-handled message: {message.text[:40]}...")
return False
return True
def check_continuation(self, message) -> list[str] | None:
"""Check if this is a continuation request and return messages if so.
Returns:
List of messages to send, or None if not a continuation
"""
user_id = message.sender_id
text = message.text.strip()
logger.info(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}")
if self.continuations.has_pending(user_id):
if self.continuations.is_continuation_request(text):
result = self.continuations.get_continuation(user_id)
if result:
messages, _ = result
return messages
# Max continuations reached, return None to fall through
else:
# User asked something new, clear pending continuation
self.continuations.clear(user_id)
return None
async def route(self, message: MeshMessage) -> RouteResult:
"""Route a message and generate response.
Args:
message: Incoming message to route
Returns:
RouteResult with routing decision and any response
"""
text = message.text.strip()
# Check for bang command first
if self.dispatcher.is_command(text):
context = self._make_command_context(message)
response = await self.dispatcher.dispatch(text, context)
return RouteResult(RouteType.COMMAND, response=response)
# Clean up the message (remove @mention)
query = self._clean_query(text)
if not query:
return RouteResult(RouteType.IGNORE)
# Route to LLM
return RouteResult(RouteType.LLM, query=query)
async def generate_llm_response(self, message: MeshMessage, query: str) -> str:
"""Generate LLM response for a message.
Args:
message: Original message
query: Cleaned query text
Returns:
Generated response
"""
# Add user message to history
await self.history.add_message(message.sender_id, "user", query)
# Get conversation history
history = await self.history.get_history_for_llm(message.sender_id)
# Build system prompt in order: identity -> static -> meshmonitor -> context
# 1. Dynamic identity from bot config
bot_name = self.config.bot.name or "MeshAI"
bot_owner = self.config.bot.owner or "Unknown"
identity = (
f"You are {bot_name}, an LLM-powered conversational assistant running on a "
f"Meshtastic mesh network. Your managing operator is {bot_owner}. "
f"You are open source at github.com/zvx-echo6/meshai.\n\n"
f"IDENTITY: Your name is {bot_name}. You respond to DMs only. You connect "
f"to a Meshtastic node via TCP through meshtasticd.\n\n"
)
# 2. Static system prompt from config
static_prompt = ""
if getattr(self.config.llm, 'use_system_prompt', True):
static_prompt = self.config.llm.system_prompt
system_prompt = identity + static_prompt
# 3. MeshMonitor info (only when enabled)
if (
self.meshmonitor_sync
and self.config.meshmonitor.enabled
and self.config.meshmonitor.inject_into_prompt
):
meshmonitor_intro = (
"\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same "
"meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, "
"traceroutes, security scanning, and auto-responder commands. Its trigger "
"commands are listed below — if someone asks what commands are available, "
"mention both yours and MeshMonitor's. If someone asks where to get "
"MeshMonitor, direct them to github.com/Yeraze/meshmonitor"
)
system_prompt += meshmonitor_intro
commands_summary = self.meshmonitor_sync.get_commands_summary()
if commands_summary:
system_prompt += "\n\n" + commands_summary
# 4. Inject mesh context if available
if self.context:
max_items = getattr(self.config.context, 'max_context_items', 20)
context_block = self.context.get_context_block(max_items=max_items)
if context_block:
system_prompt += (
"\n\n--- Recent mesh traffic (for context only, not messages to you) ---\n"
+ context_block
)
else:
system_prompt += (
"\n\n[No recent mesh traffic observed yet.]"
)
# 5. Knowledge base retrieval
if self.knowledge and query:
results = self.knowledge.search(query)
if results:
chunks = "\n\n".join(
f"[{r['title']}]: {r['excerpt']}" for r in results
)
system_prompt += (
"\n\nREFERENCE KNOWLEDGE - Answer using this information:\n"
+ chunks
)
# DEBUG: Log system prompt status
logger.warning(f"SYSTEM PROMPT LENGTH: {len(system_prompt)} chars")
logger.warning(f"HAS REFERENCE KNOWLEDGE: {'REFERENCE KNOWLEDGE' in system_prompt}")
try:
response = await self.llm.generate(
messages=history,
system_prompt=system_prompt,
max_tokens=500,
)
except asyncio.TimeoutError:
logger.error("LLM request timed out")
response = "Sorry, request timed out. Try again."
except Exception as e:
logger.error(f"LLM generation error: {e}")
response = "Sorry, I encountered an error. Please try again."
# Add assistant response to history
await self.history.add_message(message.sender_id, "assistant", response)
# Persist summary if one was created/updated
await self._persist_summary(message.sender_id)
# Chunk the response with sentence awareness
messages, remaining = chunk_response(
response,
max_chars=self.config.response.max_length,
max_messages=self.config.response.max_messages,
)
# Store remaining content for continuation
if remaining:
logger.info(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining")
self.continuations.store(message.sender_id, remaining)
else:
logger.info(f"No remaining content for {message.sender_id}")
return messages
async def _persist_summary(self, user_id: str) -> None:
"""Persist any cached summary to the database.
Args:
user_id: User identifier
"""
memory = self.llm.get_memory()
if not memory:
return
summary = memory.get_cached_summary(user_id)
if summary:
await self.history.store_summary(
user_id,
summary.summary,
summary.message_count,
)
logger.debug(f"Persisted summary for {user_id}")
def _clean_query(self, text: str) -> str:
"""Clean up query text and check for prompt injection."""
cleaned = " ".join(text.split())
cleaned = cleaned.strip()
# Check for prompt injection
for pattern in _INJECTION_PATTERNS:
if pattern.search(cleaned):
logger.warning(
f"Possible prompt injection detected: {cleaned[:80]}..."
)
match = pattern.search(cleaned)
cleaned = cleaned[:match.start()].strip()
if not cleaned:
cleaned = "Hello"
break
return cleaned
def _make_command_context(self, message: MeshMessage) -> CommandContext:
"""Create command context from message."""
return CommandContext(
sender_id=message.sender_id,
sender_name=message.sender_name,
channel=message.channel,
is_dm=message.is_dm,
position=message.sender_position,
config=self.config,
connector=self.connector,
history=self.history,
)
"""Message routing logic for MeshAI."""
import asyncio
import logging
import re
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional
from .backends.base import LLMBackend
from .commands import CommandContext, CommandDispatcher
from .config import Config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .chunker import chunk_response, ContinuationState
logger = logging.getLogger(__name__)
class RouteType(Enum):
"""Type of message routing."""
IGNORE = auto() # Don't respond
COMMAND = auto() # Bang command
LLM = auto() # Route to LLM
@dataclass
class RouteResult:
"""Result of routing decision."""
route_type: RouteType
response: Optional[str] = None # For commands, the response
query: Optional[str] = None # For LLM, the cleaned query
# advBBS protocol and notification prefixes to ignore
ADVBBS_PREFIXES = (
"MAILREQ|", "MAILACK|", "MAILNAK|", "MAILDAT|", "MAILDLV|",
"BOARDREQ|", "BOARDACK|", "BOARDNAK|", "BOARDDAT|", "BOARDDLV|",
"advBBS|",
"[MAIL]",
)
# Patterns that suggest prompt injection attempts
_INJECTION_PATTERNS = [
re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE),
re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"you\s+are\s+now\b", re.IGNORECASE),
re.compile(r"new\s+instructions?\s*:", re.IGNORECASE),
re.compile(r"system\s*prompt\s*:", re.IGNORECASE),
]
# Keywords that indicate mesh-related questions
_MESH_KEYWORDS = {
"mesh", "network", "health", "nodes", "node", "utilization", "signal",
"coverage", "battery", "solar", "offline", "router", "channel", "packet",
"hop", "optimize", "optimization", "infrastructure", "infra", "relay",
"repeater", "region", "locality", "congestion", "collision", "airtime",
"telemetry", "firmware", "subscribe", "alert", "snr", "rssi",
}
# Phrases that indicate mesh questions
_MESH_PHRASES = [
"how's the mesh",
"hows the mesh",
"mesh status",
"what's wrong",
"whats wrong",
"check node",
"node status",
"network health",
"mesh health",
]
# Mesh awareness instruction for LLM
_MESH_AWARENESS_PROMPT = """
When the user asks about mesh health, network status, or optimization:
- Use the LIVE MESH HEALTH DATA injected above to answer with real numbers
- Be specific: name nodes, cite utilization percentages, reference actual scores
- Give actionable recommendations based on the data
- If asked about a region or node you have detail for, use that detail
- If asked about something the data doesn't cover, say so - don't fabricate
- Keep responses concise - these go over LoRa with limited message size
- Users can run !health for a quick mesh summary or !region [name] for regional info
"""
class MessageRouter:
"""Routes incoming messages to appropriate handlers."""
def __init__(
self,
config: Config,
connector: MeshConnector,
history: ConversationHistory,
dispatcher: CommandDispatcher,
llm_backend: LLMBackend,
context: MeshContext = None,
meshmonitor_sync=None,
knowledge=None,
source_manager=None,
health_engine=None,
mesh_reporter=None,
):
self.config = config
self.connector = connector
self.history = history
self.dispatcher = dispatcher
self.llm = llm_backend
self.context = context
self.meshmonitor_sync = meshmonitor_sync
self.knowledge = knowledge
self.source_manager = source_manager
self.health_engine = health_engine
self.mesh_reporter = mesh_reporter
self.continuations = ContinuationState(max_continuations=3)
def should_respond(self, message: MeshMessage) -> bool:
"""Determine if we should respond to this message.
DM-only bot: ignores all public channel messages.
Commands and conversational LLM responses both work in DMs.
Args:
message: Incoming message
Returns:
True if we should process this message
"""
# Always ignore our own messages
if message.sender_id == self.connector.my_node_id:
return False
# Only respond to DMs
if not message.is_dm:
return False
if not self.config.bot.respond_to_dms:
return False
# Ignore advBBS protocol and notification messages
if self.config.bot.filter_bbs_protocols:
if any(message.text.startswith(p) for p in ADVBBS_PREFIXES):
logger.debug(f"Ignoring advBBS message from {message.sender_id}: {message.text[:40]}...")
return False
# Ignore messages that MeshMonitor will handle
if self.meshmonitor_sync and self.meshmonitor_sync.matches(message.text):
logger.debug(f"Ignoring MeshMonitor-handled message: {message.text[:40]}...")
return False
return True
def check_continuation(self, message) -> list[str] | None:
"""Check if this is a continuation request and return messages if so.
Returns:
List of messages to send, or None if not a continuation
"""
user_id = message.sender_id
text = message.text.strip()
logger.debug(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}")
if self.continuations.has_pending(user_id):
if self.continuations.is_continuation_request(text):
result = self.continuations.get_continuation(user_id)
if result:
messages, _ = result
return messages
# Max continuations reached, return None to fall through
else:
# User asked something new, clear pending continuation
self.continuations.clear(user_id)
return None
async def route(self, message: MeshMessage) -> RouteResult:
"""Route a message and generate response.
Args:
message: Incoming message to route
Returns:
RouteResult with routing decision and any response
"""
text = message.text.strip()
# Check for bang command first
if self.dispatcher.is_command(text):
context = self._make_command_context(message)
response = await self.dispatcher.dispatch(text, context)
return RouteResult(RouteType.COMMAND, response=response)
# Clean up the message (remove @mention)
query = self._clean_query(text)
if not query:
return RouteResult(RouteType.IGNORE)
# Route to LLM
return RouteResult(RouteType.LLM, query=query)
def _is_mesh_question(self, message: str) -> bool:
"""Check if message is asking about mesh health/status.
Args:
message: User message text
Returns:
True if this is a mesh-related question
"""
msg_lower = message.lower()
# Check for mesh phrases
for phrase in _MESH_PHRASES:
if phrase in msg_lower:
return True
# Check for mesh keywords
words = set(re.findall(r'\b\w+\b', msg_lower))
if words & _MESH_KEYWORDS:
return True
return False
def _detect_mesh_scope(self, message: str) -> tuple[str, Optional[str]]:
"""Detect the scope of a mesh question.
Args:
message: User message text
Returns:
Tuple of (scope_type, scope_value):
- ("node", "{identifier}") if asking about specific node
- ("region", "{region_name}") if asking about specific region
- ("mesh", None) for general mesh questions
"""
msg_lower = message.lower()
# Check for node references
if self.health_engine and self.health_engine.mesh_health:
health = self.health_engine.mesh_health
# Look for node shortnames (4 chars, case-insensitive)
for node in health.nodes.values():
if node.short_name:
# Check if shortname appears as a word in message
pattern = r'\b' + re.escape(node.short_name.lower()) + r'\b'
if re.search(pattern, msg_lower):
return ("node", node.short_name)
# Check longname substring
if node.long_name and node.long_name.lower() in msg_lower:
return ("node", node.short_name or node.node_id)
# Check for region references
if self.health_engine:
for anchor in self.health_engine.regions:
anchor_lower = anchor.name.lower()
# Check region name
if anchor_lower in msg_lower:
return ("region", anchor.name)
# Check parts of region name (e.g., "wood river" matches "Wood River - ID")
parts = anchor_lower.replace("-", " ").replace("", " ").split()
for part in parts:
if len(part) > 3 and part in msg_lower:
return ("region", anchor.name)
return ("mesh", None)
async def generate_llm_response(self, message: MeshMessage, query: str) -> str:
"""Generate LLM response for a message.
Args:
message: Original message
query: Cleaned query text
Returns:
Generated response
"""
# Add user message to history
await self.history.add_message(message.sender_id, "user", query)
# Get conversation history
history = await self.history.get_history_for_llm(message.sender_id)
# Build system prompt in order: identity -> static -> meshmonitor -> context -> knowledge -> mesh
# 1. Dynamic identity from bot config
bot_name = self.config.bot.name or "MeshAI"
bot_owner = self.config.bot.owner or "Unknown"
identity = (
f"You are {bot_name}, an LLM-powered conversational assistant running on a "
f"Meshtastic mesh network. Your managing operator is {bot_owner}. "
f"You are open source at github.com/zvx-echo6/meshai.\n\n"
f"IDENTITY: Your name is {bot_name}. You respond to DMs only. You connect "
f"to a Meshtastic node via TCP through meshtasticd.\n\n"
)
# 2. Static system prompt from config
static_prompt = ""
if getattr(self.config.llm, 'use_system_prompt', True):
static_prompt = self.config.llm.system_prompt
system_prompt = identity + static_prompt
# 3. MeshMonitor info (only when enabled)
if (
self.meshmonitor_sync
and self.config.meshmonitor.enabled
and self.config.meshmonitor.inject_into_prompt
):
meshmonitor_intro = (
"\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same "
"meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, "
"traceroutes, security scanning, and auto-responder commands. Its trigger "
"commands are listed below — if someone asks what commands are available, "
"mention both yours and MeshMonitor's. If someone asks where to get "
"MeshMonitor, direct them to github.com/Yeraze/meshmonitor"
)
system_prompt += meshmonitor_intro
commands_summary = self.meshmonitor_sync.get_commands_summary()
if commands_summary:
system_prompt += "\n\n" + commands_summary
# 4. Inject mesh context if available
if self.context:
max_items = getattr(self.config.context, 'max_context_items', 20)
context_block = self.context.get_context_block(max_items=max_items)
if context_block:
system_prompt += (
"\n\n--- Recent mesh traffic (for context only, not messages to you) ---\n"
+ context_block
)
else:
system_prompt += (
"\n\n[No recent mesh traffic observed yet.]"
)
# 5. Knowledge base retrieval
if self.knowledge and query:
results = self.knowledge.search(query)
if results:
chunks = "\n\n".join(
f"[{r['title']}]: {r['excerpt']}" for r in results
)
system_prompt += (
"\n\nREFERENCE KNOWLEDGE - Answer using this information:\n"
+ chunks
)
# 6. Mesh Intelligence (inject health data for mesh questions)
if (
self.source_manager
and self.mesh_reporter
and self._is_mesh_question(query)
):
scope_type, scope_value = self._detect_mesh_scope(query)
# Always include Tier 1 summary for mesh questions
tier1 = self.mesh_reporter.build_tier1_summary()
system_prompt += "\n\n" + tier1
# Add Tier 2 detail if scoped
if scope_type == "region" and scope_value:
region_detail = self.mesh_reporter.build_region_detail(scope_value)
system_prompt += "\n\n" + region_detail
elif scope_type == "node" and scope_value:
node_detail = self.mesh_reporter.build_node_detail(scope_value)
system_prompt += "\n\n" + node_detail
# Always include relevant recommendations
recommendations = self.mesh_reporter.build_recommendations(scope_type, scope_value)
if recommendations:
system_prompt += "\n\n" + recommendations
# Add mesh awareness instructions
system_prompt += _MESH_AWARENESS_PROMPT
# DEBUG: Log system prompt status
logger.debug(f"System prompt length: {len(system_prompt)} chars")
try:
response = await self.llm.generate(
messages=history,
system_prompt=system_prompt,
max_tokens=500,
)
except asyncio.TimeoutError:
logger.error("LLM request timed out")
response = "Sorry, request timed out. Try again."
except Exception as e:
logger.error(f"LLM generation error: {e}")
response = "Sorry, I encountered an error. Please try again."
# Add assistant response to history
await self.history.add_message(message.sender_id, "assistant", response)
# Persist summary if one was created/updated
await self._persist_summary(message.sender_id)
# Chunk the response with sentence awareness
messages, remaining = chunk_response(
response,
max_chars=self.config.response.max_length,
max_messages=self.config.response.max_messages,
)
# Store remaining content for continuation
if remaining:
logger.debug(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining")
self.continuations.store(message.sender_id, remaining)
return messages
async def _persist_summary(self, user_id: str) -> None:
"""Persist any cached summary to the database.
Args:
user_id: User identifier
"""
memory = self.llm.get_memory()
if not memory:
return
summary = memory.get_cached_summary(user_id)
if summary:
await self.history.store_summary(
user_id,
summary.summary,
summary.message_count,
)
logger.debug(f"Persisted summary for {user_id}")
def _clean_query(self, text: str) -> str:
"""Clean up query text and check for prompt injection."""
cleaned = " ".join(text.split())
cleaned = cleaned.strip()
# Check for prompt injection
for pattern in _INJECTION_PATTERNS:
if pattern.search(cleaned):
logger.warning(
f"Possible prompt injection detected: {cleaned[:80]}..."
)
match = pattern.search(cleaned)
cleaned = cleaned[:match.start()].strip()
if not cleaned:
cleaned = "Hello"
break
return cleaned
def _make_command_context(self, message: MeshMessage) -> CommandContext:
"""Create command context from message."""
return CommandContext(
sender_id=message.sender_id,
sender_name=message.sender_name,
channel=message.channel,
is_dm=message.is_dm,
position=message.sender_position,
config=self.config,
connector=self.connector,
history=self.history,
)