feat: Phase 1 — multi-source data aggregation from Meshview and MeshMonitor APIs

- Add MeshviewSource class for fetching nodes, edges, stats from Meshview API
- Add MeshMonitorDataSource class for fetching nodes, channels, telemetry,
  traceroutes, network stats, topology, packets, solar from MeshMonitor API
- Add MeshSourceManager for managing multiple sources with aggregation
- Add MeshSourceConfig dataclass and mesh_sources list to config
- Integrate source_manager into main.py with periodic refresh
- Add source_manager parameter to MessageRouter (for future Phase 3)
- Add Mesh Sources TUI menu with add/edit/remove/test functionality
- Update config.example.yaml with mesh_sources section

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-04 16:26:58 +00:00
commit b945558ba3
9 changed files with 2830 additions and 1856 deletions

View file

@ -71,7 +71,7 @@ weather:
# === MESHMONITOR INTEGRATION ===
meshmonitor:
enabled: false # Enable MeshMonitor trigger sync
url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:8080)
url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:3333)
inject_into_prompt: true # Include trigger list in LLM prompt
refresh_interval: 300 # Seconds between trigger refreshes
@ -80,5 +80,23 @@ knowledge:
enabled: false # Enable knowledge base search
db_path: "" # Path to knowledge SQLite database
top_k: 5 # Number of chunks to retrieve per query
fts_weight: 0.5 # Weight for FTS5 keyword matches (0-1)
vector_weight: 0.5 # Weight for vector semantic matches (0-1)
# === MESH DATA SOURCES ===
# Connect to Meshview and/or MeshMonitor instances for live mesh
# network analysis. Supports multiple sources. Configure via TUI
# with meshai --config (Mesh Sources menu).
#
# mesh_sources:
# - name: "my-meshview"
# type: meshview
# url: "https://meshview.example.com"
# refresh_interval: 300
# enabled: true
#
# - name: "my-meshmonitor"
# type: meshmonitor
# url: "http://192.168.1.100:3333"
# api_token: "${MM_API_TOKEN}"
# refresh_interval: 300
# enabled: true
mesh_sources: []

File diff suppressed because it is too large Load diff

View file

@ -1,290 +1,315 @@
"""Configuration management for MeshAI."""
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import yaml
_config_logger = logging.getLogger(__name__)
@dataclass
class BotConfig:
"""Bot identity and trigger settings."""
name: str = "ai"
owner: str = ""
respond_to_dms: bool = True
filter_bbs_protocols: bool = True
@dataclass
class ConnectionConfig:
"""Meshtastic connection settings."""
type: str = "serial" # serial or tcp
serial_port: str = "/dev/ttyUSB0"
tcp_host: str = "192.168.1.100"
tcp_port: int = 4403
@dataclass
class ResponseConfig:
"""Response behavior settings."""
delay_min: float = 2.2
delay_max: float = 3.0
max_length: int = 150
max_messages: int = 2
@dataclass
class HistoryConfig:
"""Conversation history settings."""
database: str = "conversations.db"
max_messages_per_user: int = 50
conversation_timeout: int = 86400 # 24 hours
# Cleanup settings
auto_cleanup: bool = True
cleanup_interval_hours: int = 24
max_age_days: int = 30 # Delete conversations older than this
@dataclass
class MemoryConfig:
"""Rolling summary memory settings."""
enabled: bool = True # Enable memory optimization
window_size: int = 4 # Recent message pairs to keep in full
summarize_threshold: int = 8 # Messages before re-summarizing
@dataclass
class ContextConfig:
"""Passive mesh context settings."""
enabled: bool = True
observe_channels: list[int] = field(default_factory=list) # Empty = all channels
ignore_nodes: list[str] = field(default_factory=list) # Node IDs to ignore
max_age: int = 2_592_000 # 30 days in seconds
max_context_items: int = 20 # Max observations injected into LLM context
@dataclass
class CommandsConfig:
"""Command settings."""
enabled: bool = True
prefix: str = "!"
disabled_commands: list[str] = field(default_factory=list)
custom_commands: dict = field(default_factory=dict)
@dataclass
class LLMConfig:
"""LLM backend settings."""
backend: str = "openai" # openai, anthropic, google
api_key: str = ""
base_url: str = "https://api.openai.com/v1"
model: str = "gpt-4o-mini"
timeout: int = 30
system_prompt: str = (
"YOUR COMMANDS (handled directly by you via DM):\n"
"!help — List available commands.\n"
"!ping — Connectivity test, responds with pong.\n"
"!status — Shows your version, uptime, user count, and message count.\n"
"!weather [location] — Weather lookup using Open-Meteo API.\n"
"!reset — Clears conversation history and memory.\n"
"!clear — Same as !reset.\n\n"
"YOUR ARCHITECTURE: Modular Python — pluggable LLM backends (OpenAI, Anthropic, "
"Google, local), per-user SQLite conversation history, rolling summary memory, "
"passive mesh context buffer (observes channel traffic), smart chunking for LoRa "
"message limits, prompt injection defense, advBBS filtering.\n\n"
"RESPONSE RULES:\n"
"- Keep responses very brief — 1-2 short sentences, under 300 characters. Only give longer answers if the user explicitly asks for detail or explanation.\n"
"- Be concise but friendly. No markdown formatting.\n"
"- If asked about mesh activity and no recent traffic is shown, say you haven't "
"observed any yet.\n"
"- When asked about yourself or commands, answer conversationally. Don't dump lists.\n"
"- You are part of the freq51 mesh in the Twin Falls, Idaho area."
)
use_system_prompt: bool = True # Toggle to disable sending system prompt
web_search: bool = False # Enable web search (Open WebUI feature)
google_grounding: bool = False # Enable Google Search grounding (Gemini only)
@dataclass
class OpenMeteoConfig:
"""Open-Meteo weather provider settings."""
url: str = "https://api.open-meteo.com/v1"
@dataclass
class WttrConfig:
"""wttr.in weather provider settings."""
url: str = "https://wttr.in"
@dataclass
class WeatherConfig:
"""Weather command settings."""
primary: str = "openmeteo" # openmeteo, wttr, llm
fallback: str = "llm" # openmeteo, wttr, llm, none
default_location: str = ""
openmeteo: OpenMeteoConfig = field(default_factory=OpenMeteoConfig)
wttr: WttrConfig = field(default_factory=WttrConfig)
@dataclass
class MeshMonitorConfig:
"""MeshMonitor trigger sync settings."""
enabled: bool = False
url: str = "" # e.g., http://100.64.0.11:3333
inject_into_prompt: bool = True # Tell LLM about MeshMonitor commands
refresh_interval: int = 300 # Seconds between refreshes
@dataclass
class KnowledgeConfig:
"""FTS5 knowledge base settings."""
enabled: bool = False
db_path: str = ""
top_k: int = 5
@dataclass
class Config:
"""Main configuration container."""
bot: BotConfig = field(default_factory=BotConfig)
connection: ConnectionConfig = field(default_factory=ConnectionConfig)
response: ResponseConfig = field(default_factory=ResponseConfig)
history: HistoryConfig = field(default_factory=HistoryConfig)
memory: MemoryConfig = field(default_factory=MemoryConfig)
context: ContextConfig = field(default_factory=ContextConfig)
commands: CommandsConfig = field(default_factory=CommandsConfig)
llm: LLMConfig = field(default_factory=LLMConfig)
weather: WeatherConfig = field(default_factory=WeatherConfig)
meshmonitor: MeshMonitorConfig = field(default_factory=MeshMonitorConfig)
knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
_config_path: Optional[Path] = field(default=None, repr=False)
def resolve_api_key(self) -> str:
"""Resolve API key from config or environment."""
if self.llm.api_key:
# Check if it's an env var reference like ${LLM_API_KEY}
if self.llm.api_key.startswith("${") and self.llm.api_key.endswith("}"):
env_var = self.llm.api_key[2:-1]
return os.environ.get(env_var, "")
return self.llm.api_key
# Fall back to common env vars
for env_var in ["LLM_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"]:
if value := os.environ.get(env_var):
return value
return ""
def _dict_to_dataclass(cls, data: dict):
"""Recursively convert dict to dataclass, handling nested structures."""
if data is None:
return cls()
field_types = {f.name: f.type for f in cls.__dataclass_fields__.values()}
kwargs = {}
for key, value in data.items():
if key.startswith("_"):
continue
if key not in field_types:
continue
field_type = field_types[key]
# Handle nested dataclasses
if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(field_type, value)
else:
kwargs[key] = value
return cls(**kwargs)
def _dataclass_to_dict(obj) -> dict:
"""Recursively convert dataclass to dict for YAML serialization."""
if not hasattr(obj, "__dataclass_fields__"):
return obj
result = {}
for field_name in obj.__dataclass_fields__:
if field_name.startswith("_"):
continue
value = getattr(obj, field_name)
if hasattr(value, "__dataclass_fields__"):
result[field_name] = _dataclass_to_dict(value)
elif isinstance(value, list):
result[field_name] = list(value)
else:
result[field_name] = value
return result
def load_config(config_path: Optional[Path] = None) -> Config:
"""Load configuration from YAML file.
Args:
config_path: Path to config file. Defaults to ./config.yaml
Returns:
Config object with loaded settings
"""
if config_path is None:
config_path = Path("config.yaml")
config_path = Path(config_path)
if not config_path.exists():
# Return default config if file doesn't exist
config = Config()
config._config_path = config_path
return config
with open(config_path, "r") as f:
data = yaml.safe_load(f) or {}
config = _dict_to_dataclass(Config, data)
config._config_path = config_path
return config
def save_config(config: Config, config_path: Optional[Path] = None) -> None:
"""Save configuration to YAML file.
Args:
config: Config object to save
config_path: Path to save to. Uses config._config_path if not specified
"""
if config_path is None:
config_path = config._config_path or Path("config.yaml")
config_path = Path(config_path)
data = _dataclass_to_dict(config)
# Add header comment
header = "# MeshAI Configuration\n# Generated by meshai --config\n\n"
with open(config_path, "w") as f:
f.write(header)
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
"""Configuration management for MeshAI."""
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import yaml
_config_logger = logging.getLogger(__name__)
@dataclass
class BotConfig:
"""Bot identity and trigger settings."""
name: str = "ai"
owner: str = ""
respond_to_dms: bool = True
filter_bbs_protocols: bool = True
@dataclass
class ConnectionConfig:
"""Meshtastic connection settings."""
type: str = "serial" # serial or tcp
serial_port: str = "/dev/ttyUSB0"
tcp_host: str = "192.168.1.100"
tcp_port: int = 4403
@dataclass
class ResponseConfig:
"""Response behavior settings."""
delay_min: float = 2.2
delay_max: float = 3.0
max_length: int = 150
max_messages: int = 2
@dataclass
class HistoryConfig:
"""Conversation history settings."""
database: str = "conversations.db"
max_messages_per_user: int = 50
conversation_timeout: int = 86400 # 24 hours
# Cleanup settings
auto_cleanup: bool = True
cleanup_interval_hours: int = 24
max_age_days: int = 30 # Delete conversations older than this
@dataclass
class MemoryConfig:
"""Rolling summary memory settings."""
enabled: bool = True # Enable memory optimization
window_size: int = 4 # Recent message pairs to keep in full
summarize_threshold: int = 8 # Messages before re-summarizing
@dataclass
class ContextConfig:
"""Passive mesh context settings."""
enabled: bool = True
observe_channels: list[int] = field(default_factory=list) # Empty = all channels
ignore_nodes: list[str] = field(default_factory=list) # Node IDs to ignore
max_age: int = 2_592_000 # 30 days in seconds
max_context_items: int = 20 # Max observations injected into LLM context
@dataclass
class CommandsConfig:
"""Command settings."""
enabled: bool = True
prefix: str = "!"
disabled_commands: list[str] = field(default_factory=list)
custom_commands: dict = field(default_factory=dict)
@dataclass
class LLMConfig:
"""LLM backend settings."""
backend: str = "openai" # openai, anthropic, google
api_key: str = ""
base_url: str = "https://api.openai.com/v1"
model: str = "gpt-4o-mini"
timeout: int = 30
system_prompt: str = (
"YOUR COMMANDS (handled directly by you via DM):\n"
"!help — List available commands.\n"
"!ping — Connectivity test, responds with pong.\n"
"!status — Shows your version, uptime, user count, and message count.\n"
"!weather [location] — Weather lookup using Open-Meteo API.\n"
"!reset — Clears conversation history and memory.\n"
"!clear — Same as !reset.\n\n"
"YOUR ARCHITECTURE: Modular Python — pluggable LLM backends (OpenAI, Anthropic, "
"Google, local), per-user SQLite conversation history, rolling summary memory, "
"passive mesh context buffer (observes channel traffic), smart chunking for LoRa "
"message limits, prompt injection defense, advBBS filtering.\n\n"
"RESPONSE RULES:\n"
"- Keep responses very brief — 1-2 short sentences, under 300 characters. Only give longer answers if the user explicitly asks for detail or explanation.\n"
"- Be concise but friendly. No markdown formatting.\n"
"- If asked about mesh activity and no recent traffic is shown, say you haven't "
"observed any yet.\n"
"- When asked about yourself or commands, answer conversationally. Don't dump lists.\n"
"- You are part of the freq51 mesh in the Twin Falls, Idaho area."
)
use_system_prompt: bool = True # Toggle to disable sending system prompt
web_search: bool = False # Enable web search (Open WebUI feature)
google_grounding: bool = False # Enable Google Search grounding (Gemini only)
@dataclass
class OpenMeteoConfig:
"""Open-Meteo weather provider settings."""
url: str = "https://api.open-meteo.com/v1"
@dataclass
class WttrConfig:
"""wttr.in weather provider settings."""
url: str = "https://wttr.in"
@dataclass
class WeatherConfig:
"""Weather command settings."""
primary: str = "openmeteo" # openmeteo, wttr, llm
fallback: str = "llm" # openmeteo, wttr, llm, none
default_location: str = ""
openmeteo: OpenMeteoConfig = field(default_factory=OpenMeteoConfig)
wttr: WttrConfig = field(default_factory=WttrConfig)
@dataclass
class MeshMonitorConfig:
"""MeshMonitor trigger sync settings."""
enabled: bool = False
url: str = "" # e.g., http://100.64.0.11:3333
inject_into_prompt: bool = True # Tell LLM about MeshMonitor commands
refresh_interval: int = 300 # Seconds between refreshes
@dataclass
class KnowledgeConfig:
"""FTS5 knowledge base settings."""
enabled: bool = False
db_path: str = ""
top_k: int = 5
@dataclass
class MeshSourceConfig:
"""Configuration for a mesh data source."""
name: str = ""
type: str = "" # "meshview" or "meshmonitor"
url: str = ""
api_token: str = "" # MeshMonitor only, supports ${ENV_VAR}
refresh_interval: int = 300
enabled: bool = True
@dataclass
class Config:
"""Main configuration container."""
bot: BotConfig = field(default_factory=BotConfig)
connection: ConnectionConfig = field(default_factory=ConnectionConfig)
response: ResponseConfig = field(default_factory=ResponseConfig)
history: HistoryConfig = field(default_factory=HistoryConfig)
memory: MemoryConfig = field(default_factory=MemoryConfig)
context: ContextConfig = field(default_factory=ContextConfig)
commands: CommandsConfig = field(default_factory=CommandsConfig)
llm: LLMConfig = field(default_factory=LLMConfig)
weather: WeatherConfig = field(default_factory=WeatherConfig)
meshmonitor: MeshMonitorConfig = field(default_factory=MeshMonitorConfig)
knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
mesh_sources: list[MeshSourceConfig] = field(default_factory=list)
_config_path: Optional[Path] = field(default=None, repr=False)
def resolve_api_key(self) -> str:
"""Resolve API key from config or environment."""
if self.llm.api_key:
# Check if it's an env var reference like ${LLM_API_KEY}
if self.llm.api_key.startswith("${") and self.llm.api_key.endswith("}"):
env_var = self.llm.api_key[2:-1]
return os.environ.get(env_var, "")
return self.llm.api_key
# Fall back to common env vars
for env_var in ["LLM_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"]:
if value := os.environ.get(env_var):
return value
return ""
def _dict_to_dataclass(cls, data: dict):
"""Recursively convert dict to dataclass, handling nested structures."""
if data is None:
return cls()
field_types = {f.name: f.type for f in cls.__dataclass_fields__.values()}
kwargs = {}
for key, value in data.items():
if key.startswith("_"):
continue
if key not in field_types:
continue
field_type = field_types[key]
# Handle nested dataclasses
if hasattr(field_type, "__dataclass_fields__") and isinstance(value, dict):
kwargs[key] = _dict_to_dataclass(field_type, value)
# Handle list of MeshSourceConfig
elif key == "mesh_sources" and isinstance(value, list):
kwargs[key] = [
_dict_to_dataclass(MeshSourceConfig, item)
if isinstance(item, dict) else item
for item in value
]
else:
kwargs[key] = value
return cls(**kwargs)
def _dataclass_to_dict(obj) -> dict:
"""Recursively convert dataclass to dict for YAML serialization."""
if not hasattr(obj, "__dataclass_fields__"):
return obj
result = {}
for field_name in obj.__dataclass_fields__:
if field_name.startswith("_"):
continue
value = getattr(obj, field_name)
if hasattr(value, "__dataclass_fields__"):
result[field_name] = _dataclass_to_dict(value)
elif isinstance(value, list):
# Handle list of dataclasses (like mesh_sources)
result[field_name] = [
_dataclass_to_dict(item) if hasattr(item, "__dataclass_fields__") else item
for item in value
]
else:
result[field_name] = value
return result
def load_config(config_path: Optional[Path] = None) -> Config:
"""Load configuration from YAML file.
Args:
config_path: Path to config file. Defaults to ./config.yaml
Returns:
Config object with loaded settings
"""
if config_path is None:
config_path = Path("config.yaml")
config_path = Path(config_path)
if not config_path.exists():
# Return default config if file doesn't exist
config = Config()
config._config_path = config_path
return config
with open(config_path, "r") as f:
data = yaml.safe_load(f) or {}
config = _dict_to_dataclass(Config, data)
config._config_path = config_path
return config
def save_config(config: Config, config_path: Optional[Path] = None) -> None:
"""Save configuration to YAML file.
Args:
config: Config object to save
config_path: Path to save to. Uses config._config_path if not specified
"""
if config_path is None:
config_path = config._config_path or Path("config.yaml")
config_path = Path(config_path)
data = _dataclass_to_dict(config)
# Add header comment
header = "# MeshAI Configuration\n# Generated by meshai --config\n\n"
with open(config_path, "w") as f:
f.write(header)
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)

View file

@ -1,381 +1,409 @@
"""Main entry point for MeshAI."""
import argparse
import asyncio
import logging
import os
import signal
import sys
import time
from pathlib import Path
from typing import Optional
from . import __version__
from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend
from .cli import run_configurator
from .commands import CommandDispatcher
from .commands.dispatcher import create_dispatcher
from .commands.status import set_start_time
from .config import Config, load_config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .memory import ConversationSummary
from .responder import Responder
from .router import MessageRouter, RouteType
logger = logging.getLogger(__name__)
class MeshAI:
"""Main application class."""
def __init__(self, config: Config):
self.config = config
self.connector: Optional[MeshConnector] = None
self.history: Optional[ConversationHistory] = None
self.dispatcher: Optional[CommandDispatcher] = None
self.llm: Optional[LLMBackend] = None
self.context: Optional[MeshContext] = None
self.meshmonitor_sync = None
self.knowledge = None
self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None
self._running = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._last_cleanup: float = 0.0
async def start(self) -> None:
"""Start the bot."""
logger.info(f"Starting MeshAI v{__version__}")
set_start_time(time.time())
# Initialize components
await self._init_components()
# Connect to Meshtastic
self.connector.connect()
self.connector.set_message_callback(self._on_message, asyncio.get_event_loop())
# Add own node ID to context ignore list
if self.context and self.connector.my_node_id:
self.context._ignore_nodes.add(self.connector.my_node_id)
self._running = True
self._loop = asyncio.get_event_loop()
self._last_cleanup = time.time()
# Write PID file
self._write_pid()
logger.info("MeshAI started successfully")
# Keep running
while self._running:
await asyncio.sleep(1)
# Periodic MeshMonitor refresh
if self.meshmonitor_sync:
self.meshmonitor_sync.maybe_refresh()
# Periodic cleanup
if time.time() - self._last_cleanup >= 3600:
await self.history.cleanup_expired()
if self.context:
self.context.prune()
self._last_cleanup = time.time()
async def stop(self) -> None:
"""Stop the bot."""
logger.info("Stopping MeshAI...")
self._running = False
if self.connector:
self.connector.disconnect()
if self.history:
await self.history.close()
if self.llm:
await self.llm.close()
if self.knowledge:
self.knowledge.close()
self._remove_pid()
logger.info("MeshAI stopped")
async def _init_components(self) -> None:
"""Initialize all components."""
# Conversation history
self.history = ConversationHistory(self.config.history)
await self.history.initialize()
# Command dispatcher
self.dispatcher = create_dispatcher(
prefix=self.config.commands.prefix,
disabled_commands=self.config.commands.disabled_commands,
custom_commands=self.config.commands.custom_commands,
)
# LLM backend
api_key = self.config.resolve_api_key()
if not api_key:
logger.warning("No API key configured - LLM responses will fail")
# Memory config
mem_cfg = self.config.memory
window_size = mem_cfg.window_size if mem_cfg.enabled else 0
summarize_threshold = mem_cfg.summarize_threshold
# Create backend
backend = self.config.llm.backend.lower()
if backend == "openai":
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "anthropic":
self.llm = AnthropicBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "google":
self.llm = GoogleBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
else:
logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI")
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
# Load persisted summaries into memory cache
await self._load_summaries()
# Meshtastic connector
self.connector = MeshConnector(self.config.connection)
# Passive mesh context buffer
ctx_cfg = self.config.context
if ctx_cfg.enabled:
self.context = MeshContext(
observe_channels=ctx_cfg.observe_channels or None,
ignore_nodes=ctx_cfg.ignore_nodes or None,
max_age=ctx_cfg.max_age,
)
logger.info("Mesh context buffer enabled")
else:
self.context = None
# MeshMonitor trigger sync
mm_cfg = self.config.meshmonitor
if mm_cfg.enabled and mm_cfg.url:
from .meshmonitor import MeshMonitorSync
self.meshmonitor_sync = MeshMonitorSync(
url=mm_cfg.url,
refresh_interval=mm_cfg.refresh_interval,
)
count = self.meshmonitor_sync.load()
logger.info(f"MeshMonitor sync enabled, loaded {count} triggers")
else:
self.meshmonitor_sync = None
# Knowledge base
kb_cfg = self.config.knowledge
if kb_cfg.enabled and kb_cfg.db_path:
from .knowledge import KnowledgeSearch
self.knowledge = KnowledgeSearch(
db_path=kb_cfg.db_path,
top_k=kb_cfg.top_k,
)
else:
self.knowledge = None
# Message router
self.router = MessageRouter(
self.config, self.connector, self.history, self.dispatcher, self.llm,
context=self.context,
meshmonitor_sync=self.meshmonitor_sync,
knowledge=self.knowledge,
)
# Responder
self.responder = Responder(self.config.response, self.connector)
async def _on_message(self, message: MeshMessage) -> None:
"""Handle incoming message."""
try:
# Passively observe channel broadcasts for context (before filtering)
if self.context and not message.is_dm and message.text:
self.context.observe(
sender_name=message.sender_name,
sender_id=message.sender_id,
text=message.text,
channel=message.channel,
is_dm=False,
)
# Check if we should respond
if not self.router.should_respond(message):
return
logger.info(
f"Processing message from {message.sender_name} ({message.sender_id}): "
f"{message.text[:50]}..."
)
# Route the message
# Check for continuation request first
continuation_messages = self.router.check_continuation(message)
if continuation_messages:
await self.responder.send_response(
continuation_messages,
destination=message.sender_id,
channel=message.channel,
)
return
result = await self.router.route(message)
if result.route_type == RouteType.IGNORE:
return
# Determine response
if result.route_type == RouteType.COMMAND:
messages = result.response # Commands return single string
elif result.route_type == RouteType.LLM:
messages = await self.router.generate_llm_response(message, result.query)
else:
return
if not messages:
return
# Send DM response
await self.responder.send_response(
messages,
destination=message.sender_id,
channel=message.channel,
)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
async def _load_summaries(self) -> None:
"""Load persisted summaries from database into memory cache."""
memory = self.llm.get_memory()
if not memory:
return
if not self.history or not self.history._db:
return
try:
async with self.history._lock:
cursor = await self.history._db.execute(
"SELECT user_id, summary, message_count, updated_at "
"FROM conversation_summaries"
)
rows = await cursor.fetchall()
loaded = 0
for row in rows:
user_id, summary_text, message_count, updated_at = row
summary = ConversationSummary(
summary=summary_text,
last_updated=updated_at,
message_count=message_count,
)
memory.load_summary(user_id, summary)
loaded += 1
if loaded:
logger.info(f"Loaded {loaded} conversation summaries from database")
except Exception as e:
logger.warning(f"Failed to load summaries from database: {e}")
def _write_pid(self) -> None:
"""Write PID file."""
pid_file = Path("/tmp/meshai.pid")
pid_file.write_text(str(os.getpid()))
def _remove_pid(self) -> None:
"""Remove PID file."""
pid_file = Path("/tmp/meshai.pid")
if pid_file.exists():
pid_file.unlink()
def setup_logging(verbose: bool = False) -> None:
"""Configure logging."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="MeshAI - LLM-powered Meshtastic assistant",
prog="meshai",
)
parser.add_argument(
"--version", "-V", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument(
"--config", "-c", action="store_true", help="Launch configuration tool"
)
parser.add_argument(
"--config-file",
"-f",
type=Path,
default=Path("config.yaml"),
help="Path to config file (default: config.yaml)",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
setup_logging(args.verbose)
# Launch configurator if requested
if args.config:
run_configurator(args.config_file)
return
# Load config
config = load_config(args.config_file)
# Check if config exists
if not args.config_file.exists():
logger.warning(f"Config file not found: {args.config_file}")
logger.info("Run 'meshai --config' to create one, or copy config.example.yaml")
sys.exit(1)
# Create and run bot
bot = MeshAI(config)
# Handle signals
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}")
loop.create_task(bot.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
loop.run_until_complete(bot.start())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(bot.stop())
loop.close()
if __name__ == "__main__":
main()
"""Main entry point for MeshAI."""
import argparse
import asyncio
import logging
import os
import signal
import sys
import time
from pathlib import Path
from typing import Optional
from . import __version__
from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend
from .cli import run_configurator
from .commands import CommandDispatcher
from .commands.dispatcher import create_dispatcher
from .commands.status import set_start_time
from .config import Config, load_config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .memory import ConversationSummary
from .responder import Responder
from .router import MessageRouter, RouteType
logger = logging.getLogger(__name__)
class MeshAI:
"""Main application class."""
def __init__(self, config: Config):
self.config = config
self.connector: Optional[MeshConnector] = None
self.history: Optional[ConversationHistory] = None
self.dispatcher: Optional[CommandDispatcher] = None
self.llm: Optional[LLMBackend] = None
self.context: Optional[MeshContext] = None
self.meshmonitor_sync = None
self.knowledge = None
self.source_manager = None
self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None
self._running = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._last_cleanup: float = 0.0
async def start(self) -> None:
"""Start the bot."""
logger.info(f"Starting MeshAI v{__version__}")
set_start_time(time.time())
# Initialize components
await self._init_components()
# Connect to Meshtastic
self.connector.connect()
self.connector.set_message_callback(self._on_message, asyncio.get_event_loop())
# Add own node ID to context ignore list
if self.context and self.connector.my_node_id:
self.context._ignore_nodes.add(self.connector.my_node_id)
self._running = True
self._loop = asyncio.get_event_loop()
self._last_cleanup = time.time()
# Write PID file
self._write_pid()
logger.info("MeshAI started successfully")
# Keep running
while self._running:
await asyncio.sleep(1)
# Periodic MeshMonitor refresh
if self.meshmonitor_sync:
self.meshmonitor_sync.maybe_refresh()
# Periodic mesh source refresh
if self.source_manager:
self.source_manager.refresh_all()
# Periodic cleanup
if time.time() - self._last_cleanup >= 3600:
await self.history.cleanup_expired()
if self.context:
self.context.prune()
self._last_cleanup = time.time()
async def stop(self) -> None:
"""Stop the bot."""
logger.info("Stopping MeshAI...")
self._running = False
if self.connector:
self.connector.disconnect()
if self.history:
await self.history.close()
if self.llm:
await self.llm.close()
if self.knowledge:
self.knowledge.close()
self._remove_pid()
logger.info("MeshAI stopped")
async def _init_components(self) -> None:
"""Initialize all components."""
# Conversation history
self.history = ConversationHistory(self.config.history)
await self.history.initialize()
# Command dispatcher
self.dispatcher = create_dispatcher(
prefix=self.config.commands.prefix,
disabled_commands=self.config.commands.disabled_commands,
custom_commands=self.config.commands.custom_commands,
)
# LLM backend
api_key = self.config.resolve_api_key()
if not api_key:
logger.warning("No API key configured - LLM responses will fail")
# Memory config
mem_cfg = self.config.memory
window_size = mem_cfg.window_size if mem_cfg.enabled else 0
summarize_threshold = mem_cfg.summarize_threshold
# Create backend
backend = self.config.llm.backend.lower()
if backend == "openai":
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "anthropic":
self.llm = AnthropicBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "google":
self.llm = GoogleBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
else:
logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI")
self.llm = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
# Load persisted summaries into memory cache
await self._load_summaries()
# Meshtastic connector
self.connector = MeshConnector(self.config.connection)
# Passive mesh context buffer
ctx_cfg = self.config.context
if ctx_cfg.enabled:
self.context = MeshContext(
observe_channels=ctx_cfg.observe_channels or None,
ignore_nodes=ctx_cfg.ignore_nodes or None,
max_age=ctx_cfg.max_age,
)
logger.info("Mesh context buffer enabled")
else:
self.context = None
# MeshMonitor trigger sync
mm_cfg = self.config.meshmonitor
if mm_cfg.enabled and mm_cfg.url:
from .meshmonitor import MeshMonitorSync
self.meshmonitor_sync = MeshMonitorSync(
url=mm_cfg.url,
refresh_interval=mm_cfg.refresh_interval,
)
count = self.meshmonitor_sync.load()
logger.info(f"MeshMonitor sync enabled, loaded {count} triggers")
else:
self.meshmonitor_sync = None
# Mesh data sources
enabled_sources = [s for s in self.config.mesh_sources if s.enabled]
if enabled_sources:
from .mesh_sources import MeshSourceManager
self.source_manager = MeshSourceManager(enabled_sources)
# Initial fetch
self.source_manager.refresh_all()
# Log status
for status in self.source_manager.get_status():
if status["is_loaded"]:
logger.info(
f"Mesh source '{status['name']}' ({status['type']}): "
f"{status['node_count']} nodes"
)
else:
logger.warning(
f"Mesh source '{status['name']}' ({status['type']}): "
f"failed - {status.get('last_error', 'unknown error')}"
)
else:
self.source_manager = None
# Knowledge base
kb_cfg = self.config.knowledge
if kb_cfg.enabled and kb_cfg.db_path:
from .knowledge import KnowledgeSearch
self.knowledge = KnowledgeSearch(
db_path=kb_cfg.db_path,
top_k=kb_cfg.top_k,
)
else:
self.knowledge = None
# Message router
self.router = MessageRouter(
self.config, self.connector, self.history, self.dispatcher, self.llm,
context=self.context,
meshmonitor_sync=self.meshmonitor_sync,
knowledge=self.knowledge,
source_manager=self.source_manager,
)
# Responder
self.responder = Responder(self.config.response, self.connector)
async def _on_message(self, message: MeshMessage) -> None:
"""Handle incoming message."""
try:
# Passively observe channel broadcasts for context (before filtering)
if self.context and not message.is_dm and message.text:
self.context.observe(
sender_name=message.sender_name,
sender_id=message.sender_id,
text=message.text,
channel=message.channel,
is_dm=False,
)
# Check if we should respond
if not self.router.should_respond(message):
return
logger.info(
f"Processing message from {message.sender_name} ({message.sender_id}): "
f"{message.text[:50]}..."
)
# Route the message
# Check for continuation request first
continuation_messages = self.router.check_continuation(message)
if continuation_messages:
await self.responder.send_response(
continuation_messages,
destination=message.sender_id,
channel=message.channel,
)
return
result = await self.router.route(message)
if result.route_type == RouteType.IGNORE:
return
# Determine response
if result.route_type == RouteType.COMMAND:
messages = result.response # Commands return single string
elif result.route_type == RouteType.LLM:
messages = await self.router.generate_llm_response(message, result.query)
else:
return
if not messages:
return
# Send DM response
await self.responder.send_response(
messages,
destination=message.sender_id,
channel=message.channel,
)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
async def _load_summaries(self) -> None:
"""Load persisted summaries from database into memory cache."""
memory = self.llm.get_memory()
if not memory:
return
if not self.history or not self.history._db:
return
try:
async with self.history._lock:
cursor = await self.history._db.execute(
"SELECT user_id, summary, message_count, updated_at "
"FROM conversation_summaries"
)
rows = await cursor.fetchall()
loaded = 0
for row in rows:
user_id, summary_text, message_count, updated_at = row
summary = ConversationSummary(
summary=summary_text,
last_updated=updated_at,
message_count=message_count,
)
memory.load_summary(user_id, summary)
loaded += 1
if loaded:
logger.info(f"Loaded {loaded} conversation summaries from database")
except Exception as e:
logger.warning(f"Failed to load summaries from database: {e}")
def _write_pid(self) -> None:
"""Write PID file."""
pid_file = Path("/tmp/meshai.pid")
pid_file.write_text(str(os.getpid()))
def _remove_pid(self) -> None:
"""Remove PID file."""
pid_file = Path("/tmp/meshai.pid")
if pid_file.exists():
pid_file.unlink()
def setup_logging(verbose: bool = False) -> None:
"""Configure logging."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="MeshAI - LLM-powered Meshtastic assistant",
prog="meshai",
)
parser.add_argument(
"--version", "-V", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument(
"--config", "-c", action="store_true", help="Launch configuration tool"
)
parser.add_argument(
"--config-file",
"-f",
type=Path,
default=Path("config.yaml"),
help="Path to config file (default: config.yaml)",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
setup_logging(args.verbose)
# Launch configurator if requested
if args.config:
run_configurator(args.config_file)
return
# Load config
config = load_config(args.config_file)
# Check if config exists
if not args.config_file.exists():
logger.warning(f"Config file not found: {args.config_file}")
logger.info("Run 'meshai --config' to create one, or copy config.example.yaml")
sys.exit(1)
# Create and run bot
bot = MeshAI(config)
# Handle signals
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}")
loop.create_task(bot.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
loop.run_until_complete(bot.start())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(bot.stop())
loop.close()
if __name__ == "__main__":
main()

197
meshai/mesh_sources.py Normal file
View file

@ -0,0 +1,197 @@
"""Mesh data source manager."""
import logging
import time
from typing import Optional
from .config import MeshSourceConfig
from .sources.meshview import MeshviewSource
from .sources.meshmonitor_data import MeshMonitorDataSource
logger = logging.getLogger(__name__)
class MeshSourceManager:
"""Manages multiple mesh data sources."""
def __init__(self, source_configs: list[MeshSourceConfig]):
"""Initialize source manager.
Args:
source_configs: List of source configurations
"""
self._sources: dict[str, MeshviewSource | MeshMonitorDataSource] = {}
for cfg in source_configs:
if not cfg.enabled:
continue
name = cfg.name
if not name:
logger.warning("Skipping source with empty name")
continue
if name in self._sources:
logger.warning(f"Duplicate source name '{name}', skipping")
continue
try:
if cfg.type == "meshview":
self._sources[name] = MeshviewSource(
url=cfg.url,
refresh_interval=cfg.refresh_interval,
)
logger.info(f"Created Meshview source '{name}' -> {cfg.url}")
elif cfg.type == "meshmonitor":
self._sources[name] = MeshMonitorDataSource(
url=cfg.url,
api_token=cfg.api_token,
refresh_interval=cfg.refresh_interval,
)
logger.info(f"Created MeshMonitor source '{name}' -> {cfg.url}")
else:
logger.warning(f"Unknown source type '{cfg.type}' for '{name}'")
except Exception as e:
logger.error(f"Failed to create source '{name}': {e}")
def refresh_all(self) -> int:
"""Call maybe_refresh() on all sources.
Returns:
Number of sources that refreshed
"""
refreshed = 0
for name, source in self._sources.items():
try:
if source.maybe_refresh():
refreshed += 1
except Exception as e:
logger.error(f"Error refreshing source '{name}': {e}")
return refreshed
def get_source(self, name: str) -> Optional[MeshviewSource | MeshMonitorDataSource]:
"""Get a specific source by name.
Args:
name: Source name
Returns:
Source instance or None if not found
"""
return self._sources.get(name)
def get_all_nodes(self) -> list[dict]:
"""Get nodes from all sources, tagged with source name.
Returns:
List of node dicts with '_source' field added
"""
all_nodes = []
for name, source in self._sources.items():
for node in source.nodes:
tagged = dict(node)
tagged["_source"] = name
all_nodes.append(tagged)
return all_nodes
def get_all_edges(self) -> list[dict]:
"""Get edges from all Meshview sources, tagged with source name.
Returns:
List of edge dicts with '_source' field added
"""
all_edges = []
for name, source in self._sources.items():
if isinstance(source, MeshviewSource):
for edge in source.edges:
tagged = dict(edge)
tagged["_source"] = name
all_edges.append(tagged)
return all_edges
def get_all_telemetry(self) -> list[dict]:
"""Get telemetry from all MeshMonitor sources, tagged with source name.
Returns:
List of telemetry dicts with '_source' field added
"""
all_telemetry = []
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
for item in source.telemetry:
tagged = dict(item)
tagged["_source"] = name
all_telemetry.append(tagged)
return all_telemetry
def get_all_traceroutes(self) -> list[dict]:
"""Get traceroutes from all MeshMonitor sources, tagged with source name.
Returns:
List of traceroute dicts with '_source' field added
"""
all_traceroutes = []
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
for item in source.traceroutes:
tagged = dict(item)
tagged["_source"] = name
all_traceroutes.append(tagged)
return all_traceroutes
def get_all_channels(self) -> list[dict]:
"""Get channels from all MeshMonitor sources, tagged with source name.
Returns:
List of channel dicts with '_source' field added
"""
all_channels = []
for name, source in self._sources.items():
if isinstance(source, MeshMonitorDataSource):
for item in source.channels:
tagged = dict(item)
tagged["_source"] = name
all_channels.append(tagged)
return all_channels
def get_status(self) -> list[dict]:
"""Get status of all sources for TUI display.
Returns:
List of status dicts with source info
"""
status_list = []
for name, source in self._sources.items():
status = {
"name": name,
"type": "meshview" if isinstance(source, MeshviewSource) else "meshmonitor",
"enabled": True,
"is_loaded": source.is_loaded,
"last_refresh": source.last_refresh,
"last_error": source.last_error,
"node_count": len(source.nodes),
}
if isinstance(source, MeshviewSource):
status["edge_count"] = len(source.edges)
elif isinstance(source, MeshMonitorDataSource):
status["telemetry_count"] = len(source.telemetry)
status["traceroute_count"] = len(source.traceroutes)
status["channel_count"] = len(source.channels)
status_list.append(status)
return status_list
@property
def source_count(self) -> int:
"""Get number of active sources."""
return len(self._sources)
@property
def source_names(self) -> list[str]:
"""Get list of source names."""
return list(self._sources.keys())

View file

@ -1,340 +1,342 @@
"""Message routing logic for MeshAI."""
import asyncio
import logging
import re
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional
from .backends.base import LLMBackend
from .commands import CommandContext, CommandDispatcher
from .config import Config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .chunker import chunk_response, ContinuationState
logger = logging.getLogger(__name__)
class RouteType(Enum):
"""Type of message routing."""
IGNORE = auto() # Don't respond
COMMAND = auto() # Bang command
LLM = auto() # Route to LLM
@dataclass
class RouteResult:
"""Result of routing decision."""
route_type: RouteType
response: Optional[str] = None # For commands, the response
query: Optional[str] = None # For LLM, the cleaned query
# advBBS protocol and notification prefixes to ignore
ADVBBS_PREFIXES = (
"MAILREQ|", "MAILACK|", "MAILNAK|", "MAILDAT|", "MAILDLV|",
"BOARDREQ|", "BOARDACK|", "BOARDNAK|", "BOARDDAT|", "BOARDDLV|",
"advBBS|",
"[MAIL]",
)
# Patterns that suggest prompt injection attempts
_INJECTION_PATTERNS = [
re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE),
re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"you\s+are\s+now\b", re.IGNORECASE),
re.compile(r"new\s+instructions?\s*:", re.IGNORECASE),
re.compile(r"system\s*prompt\s*:", re.IGNORECASE),
]
class MessageRouter:
"""Routes incoming messages to appropriate handlers."""
def __init__(
self,
config: Config,
connector: MeshConnector,
history: ConversationHistory,
dispatcher: CommandDispatcher,
llm_backend: LLMBackend,
context: MeshContext = None,
meshmonitor_sync=None,
knowledge=None,
):
self.config = config
self.connector = connector
self.history = history
self.dispatcher = dispatcher
self.llm = llm_backend
self.context = context
self.meshmonitor_sync = meshmonitor_sync
self.knowledge = knowledge
self.continuations = ContinuationState(max_continuations=3)
def should_respond(self, message: MeshMessage) -> bool:
"""Determine if we should respond to this message.
DM-only bot: ignores all public channel messages.
Commands and conversational LLM responses both work in DMs.
Args:
message: Incoming message
Returns:
True if we should process this message
"""
# Always ignore our own messages
if message.sender_id == self.connector.my_node_id:
return False
# Only respond to DMs
if not message.is_dm:
return False
if not self.config.bot.respond_to_dms:
return False
# Ignore advBBS protocol and notification messages
if self.config.bot.filter_bbs_protocols:
if any(message.text.startswith(p) for p in ADVBBS_PREFIXES):
logger.debug(f"Ignoring advBBS message from {message.sender_id}: {message.text[:40]}...")
return False
# Ignore messages that MeshMonitor will handle
if self.meshmonitor_sync and self.meshmonitor_sync.matches(message.text):
logger.debug(f"Ignoring MeshMonitor-handled message: {message.text[:40]}...")
return False
return True
def check_continuation(self, message) -> list[str] | None:
"""Check if this is a continuation request and return messages if so.
Returns:
List of messages to send, or None if not a continuation
"""
user_id = message.sender_id
text = message.text.strip()
logger.info(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}")
if self.continuations.has_pending(user_id):
if self.continuations.is_continuation_request(text):
result = self.continuations.get_continuation(user_id)
if result:
messages, _ = result
return messages
# Max continuations reached, return None to fall through
else:
# User asked something new, clear pending continuation
self.continuations.clear(user_id)
return None
async def route(self, message: MeshMessage) -> RouteResult:
"""Route a message and generate response.
Args:
message: Incoming message to route
Returns:
RouteResult with routing decision and any response
"""
text = message.text.strip()
# Check for bang command first
if self.dispatcher.is_command(text):
context = self._make_command_context(message)
response = await self.dispatcher.dispatch(text, context)
return RouteResult(RouteType.COMMAND, response=response)
# Clean up the message (remove @mention)
query = self._clean_query(text)
if not query:
return RouteResult(RouteType.IGNORE)
# Route to LLM
return RouteResult(RouteType.LLM, query=query)
async def generate_llm_response(self, message: MeshMessage, query: str) -> str:
"""Generate LLM response for a message.
Args:
message: Original message
query: Cleaned query text
Returns:
Generated response
"""
# Add user message to history
await self.history.add_message(message.sender_id, "user", query)
# Get conversation history
history = await self.history.get_history_for_llm(message.sender_id)
# Build system prompt in order: identity -> static -> meshmonitor -> context
# 1. Dynamic identity from bot config
bot_name = self.config.bot.name or "MeshAI"
bot_owner = self.config.bot.owner or "Unknown"
identity = (
f"You are {bot_name}, an LLM-powered conversational assistant running on a "
f"Meshtastic mesh network. Your managing operator is {bot_owner}. "
f"You are open source at github.com/zvx-echo6/meshai.\n\n"
f"IDENTITY: Your name is {bot_name}. You respond to DMs only. You connect "
f"to a Meshtastic node via TCP through meshtasticd.\n\n"
)
# 2. Static system prompt from config
static_prompt = ""
if getattr(self.config.llm, 'use_system_prompt', True):
static_prompt = self.config.llm.system_prompt
system_prompt = identity + static_prompt
# 3. MeshMonitor info (only when enabled)
if (
self.meshmonitor_sync
and self.config.meshmonitor.enabled
and self.config.meshmonitor.inject_into_prompt
):
meshmonitor_intro = (
"\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same "
"meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, "
"traceroutes, security scanning, and auto-responder commands. Its trigger "
"commands are listed below — if someone asks what commands are available, "
"mention both yours and MeshMonitor's. If someone asks where to get "
"MeshMonitor, direct them to github.com/Yeraze/meshmonitor"
)
system_prompt += meshmonitor_intro
commands_summary = self.meshmonitor_sync.get_commands_summary()
if commands_summary:
system_prompt += "\n\n" + commands_summary
# 4. Inject mesh context if available
if self.context:
max_items = getattr(self.config.context, 'max_context_items', 20)
context_block = self.context.get_context_block(max_items=max_items)
if context_block:
system_prompt += (
"\n\n--- Recent mesh traffic (for context only, not messages to you) ---\n"
+ context_block
)
else:
system_prompt += (
"\n\n[No recent mesh traffic observed yet.]"
)
# 5. Knowledge base retrieval
if self.knowledge and query:
results = self.knowledge.search(query)
if results:
chunks = "\n\n".join(
f"[{r['title']}]: {r['excerpt']}" for r in results
)
system_prompt += (
"\n\nREFERENCE KNOWLEDGE - Answer using this information:\n"
+ chunks
)
# DEBUG: Log system prompt status
logger.warning(f"SYSTEM PROMPT LENGTH: {len(system_prompt)} chars")
logger.warning(f"HAS REFERENCE KNOWLEDGE: {'REFERENCE KNOWLEDGE' in system_prompt}")
try:
response = await self.llm.generate(
messages=history,
system_prompt=system_prompt,
max_tokens=500,
)
except asyncio.TimeoutError:
logger.error("LLM request timed out")
response = "Sorry, request timed out. Try again."
except Exception as e:
logger.error(f"LLM generation error: {e}")
response = "Sorry, I encountered an error. Please try again."
# Add assistant response to history
await self.history.add_message(message.sender_id, "assistant", response)
# Persist summary if one was created/updated
await self._persist_summary(message.sender_id)
# Chunk the response with sentence awareness
messages, remaining = chunk_response(
response,
max_chars=self.config.response.max_length,
max_messages=self.config.response.max_messages,
)
# Store remaining content for continuation
if remaining:
logger.info(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining")
self.continuations.store(message.sender_id, remaining)
else:
logger.info(f"No remaining content for {message.sender_id}")
return messages
async def _persist_summary(self, user_id: str) -> None:
"""Persist any cached summary to the database.
Args:
user_id: User identifier
"""
memory = self.llm.get_memory()
if not memory:
return
summary = memory.get_cached_summary(user_id)
if summary:
await self.history.store_summary(
user_id,
summary.summary,
summary.message_count,
)
logger.debug(f"Persisted summary for {user_id}")
def _clean_query(self, text: str) -> str:
"""Clean up query text and check for prompt injection."""
cleaned = " ".join(text.split())
cleaned = cleaned.strip()
# Check for prompt injection
for pattern in _INJECTION_PATTERNS:
if pattern.search(cleaned):
logger.warning(
f"Possible prompt injection detected: {cleaned[:80]}..."
)
match = pattern.search(cleaned)
cleaned = cleaned[:match.start()].strip()
if not cleaned:
cleaned = "Hello"
break
return cleaned
def _make_command_context(self, message: MeshMessage) -> CommandContext:
"""Create command context from message."""
return CommandContext(
sender_id=message.sender_id,
sender_name=message.sender_name,
channel=message.channel,
is_dm=message.is_dm,
position=message.sender_position,
config=self.config,
connector=self.connector,
history=self.history,
)
"""Message routing logic for MeshAI."""
import asyncio
import logging
import re
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional
from .backends.base import LLMBackend
from .commands import CommandContext, CommandDispatcher
from .config import Config
from .connector import MeshConnector, MeshMessage
from .context import MeshContext
from .history import ConversationHistory
from .chunker import chunk_response, ContinuationState
logger = logging.getLogger(__name__)
class RouteType(Enum):
"""Type of message routing."""
IGNORE = auto() # Don't respond
COMMAND = auto() # Bang command
LLM = auto() # Route to LLM
@dataclass
class RouteResult:
"""Result of routing decision."""
route_type: RouteType
response: Optional[str] = None # For commands, the response
query: Optional[str] = None # For LLM, the cleaned query
# advBBS protocol and notification prefixes to ignore
ADVBBS_PREFIXES = (
"MAILREQ|", "MAILACK|", "MAILNAK|", "MAILDAT|", "MAILDLV|",
"BOARDREQ|", "BOARDACK|", "BOARDNAK|", "BOARDDAT|", "BOARDDLV|",
"advBBS|",
"[MAIL]",
)
# Patterns that suggest prompt injection attempts
_INJECTION_PATTERNS = [
re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE),
re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"you\s+are\s+now\b", re.IGNORECASE),
re.compile(r"new\s+instructions?\s*:", re.IGNORECASE),
re.compile(r"system\s*prompt\s*:", re.IGNORECASE),
]
class MessageRouter:
"""Routes incoming messages to appropriate handlers."""
def __init__(
self,
config: Config,
connector: MeshConnector,
history: ConversationHistory,
dispatcher: CommandDispatcher,
llm_backend: LLMBackend,
context: MeshContext = None,
meshmonitor_sync=None,
knowledge=None,
source_manager=None,
):
self.config = config
self.connector = connector
self.history = history
self.dispatcher = dispatcher
self.llm = llm_backend
self.context = context
self.meshmonitor_sync = meshmonitor_sync
self.knowledge = knowledge
self.source_manager = source_manager # For future use in Phase 3
self.continuations = ContinuationState(max_continuations=3)
def should_respond(self, message: MeshMessage) -> bool:
"""Determine if we should respond to this message.
DM-only bot: ignores all public channel messages.
Commands and conversational LLM responses both work in DMs.
Args:
message: Incoming message
Returns:
True if we should process this message
"""
# Always ignore our own messages
if message.sender_id == self.connector.my_node_id:
return False
# Only respond to DMs
if not message.is_dm:
return False
if not self.config.bot.respond_to_dms:
return False
# Ignore advBBS protocol and notification messages
if self.config.bot.filter_bbs_protocols:
if any(message.text.startswith(p) for p in ADVBBS_PREFIXES):
logger.debug(f"Ignoring advBBS message from {message.sender_id}: {message.text[:40]}...")
return False
# Ignore messages that MeshMonitor will handle
if self.meshmonitor_sync and self.meshmonitor_sync.matches(message.text):
logger.debug(f"Ignoring MeshMonitor-handled message: {message.text[:40]}...")
return False
return True
def check_continuation(self, message) -> list[str] | None:
"""Check if this is a continuation request and return messages if so.
Returns:
List of messages to send, or None if not a continuation
"""
user_id = message.sender_id
text = message.text.strip()
logger.info(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}")
if self.continuations.has_pending(user_id):
if self.continuations.is_continuation_request(text):
result = self.continuations.get_continuation(user_id)
if result:
messages, _ = result
return messages
# Max continuations reached, return None to fall through
else:
# User asked something new, clear pending continuation
self.continuations.clear(user_id)
return None
async def route(self, message: MeshMessage) -> RouteResult:
"""Route a message and generate response.
Args:
message: Incoming message to route
Returns:
RouteResult with routing decision and any response
"""
text = message.text.strip()
# Check for bang command first
if self.dispatcher.is_command(text):
context = self._make_command_context(message)
response = await self.dispatcher.dispatch(text, context)
return RouteResult(RouteType.COMMAND, response=response)
# Clean up the message (remove @mention)
query = self._clean_query(text)
if not query:
return RouteResult(RouteType.IGNORE)
# Route to LLM
return RouteResult(RouteType.LLM, query=query)
async def generate_llm_response(self, message: MeshMessage, query: str) -> str:
"""Generate LLM response for a message.
Args:
message: Original message
query: Cleaned query text
Returns:
Generated response
"""
# Add user message to history
await self.history.add_message(message.sender_id, "user", query)
# Get conversation history
history = await self.history.get_history_for_llm(message.sender_id)
# Build system prompt in order: identity -> static -> meshmonitor -> context
# 1. Dynamic identity from bot config
bot_name = self.config.bot.name or "MeshAI"
bot_owner = self.config.bot.owner or "Unknown"
identity = (
f"You are {bot_name}, an LLM-powered conversational assistant running on a "
f"Meshtastic mesh network. Your managing operator is {bot_owner}. "
f"You are open source at github.com/zvx-echo6/meshai.\n\n"
f"IDENTITY: Your name is {bot_name}. You respond to DMs only. You connect "
f"to a Meshtastic node via TCP through meshtasticd.\n\n"
)
# 2. Static system prompt from config
static_prompt = ""
if getattr(self.config.llm, 'use_system_prompt', True):
static_prompt = self.config.llm.system_prompt
system_prompt = identity + static_prompt
# 3. MeshMonitor info (only when enabled)
if (
self.meshmonitor_sync
and self.config.meshmonitor.enabled
and self.config.meshmonitor.inject_into_prompt
):
meshmonitor_intro = (
"\n\nMESHMONITOR: You run alongside MeshMonitor (by Yeraze) on the same "
"meshtasticd node. MeshMonitor handles web dashboard, maps, telemetry, "
"traceroutes, security scanning, and auto-responder commands. Its trigger "
"commands are listed below — if someone asks what commands are available, "
"mention both yours and MeshMonitor's. If someone asks where to get "
"MeshMonitor, direct them to github.com/Yeraze/meshmonitor"
)
system_prompt += meshmonitor_intro
commands_summary = self.meshmonitor_sync.get_commands_summary()
if commands_summary:
system_prompt += "\n\n" + commands_summary
# 4. Inject mesh context if available
if self.context:
max_items = getattr(self.config.context, 'max_context_items', 20)
context_block = self.context.get_context_block(max_items=max_items)
if context_block:
system_prompt += (
"\n\n--- Recent mesh traffic (for context only, not messages to you) ---\n"
+ context_block
)
else:
system_prompt += (
"\n\n[No recent mesh traffic observed yet.]"
)
# 5. Knowledge base retrieval
if self.knowledge and query:
results = self.knowledge.search(query)
if results:
chunks = "\n\n".join(
f"[{r['title']}]: {r['excerpt']}" for r in results
)
system_prompt += (
"\n\nREFERENCE KNOWLEDGE - Answer using this information:\n"
+ chunks
)
# DEBUG: Log system prompt status
logger.warning(f"SYSTEM PROMPT LENGTH: {len(system_prompt)} chars")
logger.warning(f"HAS REFERENCE KNOWLEDGE: {'REFERENCE KNOWLEDGE' in system_prompt}")
try:
response = await self.llm.generate(
messages=history,
system_prompt=system_prompt,
max_tokens=500,
)
except asyncio.TimeoutError:
logger.error("LLM request timed out")
response = "Sorry, request timed out. Try again."
except Exception as e:
logger.error(f"LLM generation error: {e}")
response = "Sorry, I encountered an error. Please try again."
# Add assistant response to history
await self.history.add_message(message.sender_id, "assistant", response)
# Persist summary if one was created/updated
await self._persist_summary(message.sender_id)
# Chunk the response with sentence awareness
messages, remaining = chunk_response(
response,
max_chars=self.config.response.max_length,
max_messages=self.config.response.max_messages,
)
# Store remaining content for continuation
if remaining:
logger.info(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining")
self.continuations.store(message.sender_id, remaining)
else:
logger.info(f"No remaining content for {message.sender_id}")
return messages
async def _persist_summary(self, user_id: str) -> None:
"""Persist any cached summary to the database.
Args:
user_id: User identifier
"""
memory = self.llm.get_memory()
if not memory:
return
summary = memory.get_cached_summary(user_id)
if summary:
await self.history.store_summary(
user_id,
summary.summary,
summary.message_count,
)
logger.debug(f"Persisted summary for {user_id}")
def _clean_query(self, text: str) -> str:
"""Clean up query text and check for prompt injection."""
cleaned = " ".join(text.split())
cleaned = cleaned.strip()
# Check for prompt injection
for pattern in _INJECTION_PATTERNS:
if pattern.search(cleaned):
logger.warning(
f"Possible prompt injection detected: {cleaned[:80]}..."
)
match = pattern.search(cleaned)
cleaned = cleaned[:match.start()].strip()
if not cleaned:
cleaned = "Hello"
break
return cleaned
def _make_command_context(self, message: MeshMessage) -> CommandContext:
"""Create command context from message."""
return CommandContext(
sender_id=message.sender_id,
sender_name=message.sender_name,
channel=message.channel,
is_dm=message.is_dm,
position=message.sender_position,
config=self.config,
connector=self.connector,
history=self.history,
)

View file

@ -0,0 +1 @@
"""Mesh data source connectors."""

View file

@ -0,0 +1,257 @@
"""MeshMonitor API data source."""
import json
import logging
import os
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
logger = logging.getLogger(__name__)
class MeshMonitorDataSource:
"""Fetches mesh data from a MeshMonitor instance."""
def __init__(self, url: str, api_token: str, refresh_interval: int = 300):
"""Initialize MeshMonitor data source.
Args:
url: Base URL of MeshMonitor instance (e.g., http://192.168.1.100:3333)
api_token: API token for authentication. Supports ${ENV_VAR} format.
refresh_interval: Seconds between refresh checks (default 5 minutes)
"""
self._url = url.rstrip("/")
self._api_token = self._resolve_token(api_token)
self._refresh_interval = refresh_interval
# Cached data
self._nodes: list[dict] = []
self._channels: list[dict] = []
self._telemetry: list[dict] = []
self._traceroutes: list[dict] = []
self._network_stats: Optional[dict] = None
self._topology: Optional[dict] = None
self._packets: list[dict] = []
self._solar: list[dict] = []
self._last_refresh: float = 0.0
self._last_error: Optional[str] = None
self._is_loaded: bool = False
def _resolve_token(self, token: str) -> str:
"""Resolve token, supporting ${ENV_VAR} format.
Args:
token: API token or env var reference
Returns:
Resolved token value
"""
if token.startswith("${") and token.endswith("}"):
env_var = token[2:-1]
return os.environ.get(env_var, "")
return token
@property
def nodes(self) -> list[dict]:
"""Get cached nodes list."""
return self._nodes
@property
def channels(self) -> list[dict]:
"""Get cached channels list."""
return self._channels
@property
def telemetry(self) -> list[dict]:
"""Get cached telemetry list."""
return self._telemetry
@property
def traceroutes(self) -> list[dict]:
"""Get cached traceroutes list."""
return self._traceroutes
@property
def network_stats(self) -> Optional[dict]:
"""Get cached network stats."""
return self._network_stats
@property
def topology(self) -> Optional[dict]:
"""Get cached topology."""
return self._topology
@property
def packets(self) -> list[dict]:
"""Get cached packets list."""
return self._packets
@property
def solar(self) -> list[dict]:
"""Get cached solar estimates list."""
return self._solar
@property
def last_refresh(self) -> float:
"""Get last refresh timestamp (epoch)."""
return self._last_refresh
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
@property
def is_loaded(self) -> bool:
"""Check if data has been successfully loaded."""
return self._is_loaded
def _fetch_json(self, endpoint: str) -> Optional[dict | list]:
"""Fetch JSON from an endpoint with Bearer auth.
Args:
endpoint: API endpoint path (e.g., /api/v1/nodes)
Returns:
Parsed JSON data or None on error
"""
url = f"{self._url}{endpoint}"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {self._api_token}",
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
# MeshMonitor wraps responses in {"success": true, "data": [...]}
# Extract the actual data if wrapped
if isinstance(data, dict) and "data" in data:
return data["data"]
return data
except HTTPError as e:
logger.warning(f"MeshMonitor {endpoint}: HTTP {e.code} {e.reason}")
return None
except URLError as e:
logger.warning(f"MeshMonitor {endpoint}: Connection error - {e.reason}")
return None
except json.JSONDecodeError as e:
logger.warning(f"MeshMonitor {endpoint}: Invalid JSON - {e}")
return None
except Exception as e:
logger.warning(f"MeshMonitor {endpoint}: {e}")
return None
def fetch_all(self) -> bool:
"""Fetch all data from MeshMonitor API.
Fetches all endpoints independently. One failure doesn't block others.
Returns:
True if at least one endpoint succeeded
"""
success_count = 0
errors = []
# Fetch nodes
data = self._fetch_json("/api/v1/nodes")
if data is not None:
self._nodes = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._nodes)} nodes")
else:
errors.append("nodes")
# Fetch channels
data = self._fetch_json("/api/v1/channels")
if data is not None:
self._channels = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._channels)} channels")
else:
errors.append("channels")
# Fetch telemetry
data = self._fetch_json("/api/v1/telemetry")
if data is not None:
self._telemetry = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._telemetry)} telemetry records")
else:
errors.append("telemetry")
# Fetch traceroutes
data = self._fetch_json("/api/v1/traceroutes")
if data is not None:
self._traceroutes = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._traceroutes)} traceroutes")
else:
errors.append("traceroutes")
# Fetch network stats
data = self._fetch_json("/api/v1/network")
if data is not None:
self._network_stats = data if isinstance(data, dict) else None
success_count += 1
logger.debug("MeshMonitor: fetched network stats")
else:
errors.append("network")
# Fetch topology
data = self._fetch_json("/api/v1/network/topology")
if data is not None:
self._topology = data if isinstance(data, dict) else None
success_count += 1
logger.debug("MeshMonitor: fetched topology")
else:
errors.append("topology")
# Fetch packets
data = self._fetch_json("/api/v1/packets")
if data is not None:
self._packets = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._packets)} packets")
else:
errors.append("packets")
# Fetch solar estimates
data = self._fetch_json("/api/v1/solar")
if data is not None:
self._solar = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"MeshMonitor: fetched {len(self._solar)} solar estimates")
else:
errors.append("solar")
# Update state
self._last_refresh = time.time()
if success_count > 0:
self._is_loaded = True
self._last_error = None
logger.info(
f"MeshMonitor refresh: {len(self._nodes)} nodes, "
f"{len(self._telemetry)} telemetry, {len(self._traceroutes)} traceroutes"
)
return True
else:
self._last_error = f"All endpoints failed: {', '.join(errors)}"
logger.error(f"MeshMonitor: {self._last_error}")
return False
def maybe_refresh(self) -> bool:
"""Refresh data if interval has elapsed.
Returns:
True if refresh was performed
"""
if time.time() - self._last_refresh >= self._refresh_interval:
return self.fetch_all()
return False

166
meshai/sources/meshview.py Normal file
View file

@ -0,0 +1,166 @@
"""Meshview API data source."""
import json
import logging
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
logger = logging.getLogger(__name__)
class MeshviewSource:
"""Fetches mesh data from a Meshview instance."""
def __init__(self, url: str, refresh_interval: int = 300):
"""Initialize Meshview source.
Args:
url: Base URL of Meshview instance (e.g., https://meshview.example.com)
refresh_interval: Seconds between refresh checks (default 5 minutes)
"""
self._url = url.rstrip("/")
self._refresh_interval = refresh_interval
self._nodes: list[dict] = []
self._edges: list[dict] = []
self._stats: Optional[dict | list] = None
self._counts: Optional[dict] = None
self._last_refresh: float = 0.0
self._last_error: Optional[str] = None
self._is_loaded: bool = False
@property
def nodes(self) -> list[dict]:
"""Get cached nodes list."""
return self._nodes
@property
def edges(self) -> list[dict]:
"""Get cached edges list."""
return self._edges
@property
def stats(self) -> Optional[dict | list]:
"""Get cached stats."""
return self._stats
@property
def counts(self) -> Optional[dict]:
"""Get cached counts."""
return self._counts
@property
def last_refresh(self) -> float:
"""Get last refresh timestamp (epoch)."""
return self._last_refresh
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
@property
def is_loaded(self) -> bool:
"""Check if data has been successfully loaded."""
return self._is_loaded
def _fetch_json(self, endpoint: str) -> Optional[dict | list]:
"""Fetch JSON from an endpoint.
Args:
endpoint: API endpoint path (e.g., /api/nodes)
Returns:
Parsed JSON data or None on error
"""
url = f"{self._url}{endpoint}"
try:
req = Request(url, headers={"Accept": "application/json"})
with urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
logger.warning(f"Meshview {endpoint}: HTTP {e.code} {e.reason}")
return None
except URLError as e:
logger.warning(f"Meshview {endpoint}: Connection error - {e.reason}")
return None
except json.JSONDecodeError as e:
logger.warning(f"Meshview {endpoint}: Invalid JSON - {e}")
return None
except Exception as e:
logger.warning(f"Meshview {endpoint}: {e}")
return None
def fetch_all(self) -> bool:
"""Fetch all data from Meshview API.
Fetches nodes, edges, stats, and counts independently.
One failure doesn't block others.
Returns:
True if at least one endpoint succeeded
"""
success_count = 0
errors = []
# Fetch nodes
data = self._fetch_json("/api/nodes")
if data is not None:
self._nodes = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"Meshview: fetched {len(self._nodes)} nodes")
else:
errors.append("nodes")
# Fetch edges
data = self._fetch_json("/api/edges")
if data is not None:
self._edges = data if isinstance(data, list) else []
success_count += 1
logger.debug(f"Meshview: fetched {len(self._edges)} edges")
else:
errors.append("edges")
# Fetch stats (24h hourly)
data = self._fetch_json("/api/stats?period_type=hour&length=24")
if data is not None:
self._stats = data
success_count += 1
logger.debug("Meshview: fetched stats")
else:
errors.append("stats")
# Fetch counts
data = self._fetch_json("/api/stats/count")
if data is not None:
self._counts = data if isinstance(data, dict) else None
success_count += 1
logger.debug("Meshview: fetched counts")
else:
errors.append("counts")
# Update state
self._last_refresh = time.time()
if success_count > 0:
self._is_loaded = True
self._last_error = None
logger.info(
f"Meshview refresh: {len(self._nodes)} nodes, {len(self._edges)} edges"
)
return True
else:
self._last_error = f"All endpoints failed: {', '.join(errors)}"
logger.error(f"Meshview: {self._last_error}")
return False
def maybe_refresh(self) -> bool:
"""Refresh data if interval has elapsed.
Returns:
True if refresh was performed
"""
if time.time() - self._last_refresh >= self._refresh_interval:
return self.fetch_all()
return False