Wire up all unused modules into main application lifecycle

- 2a: SafetyFilter + UserFilter — check user access before processing,
  filter LLM responses through SafetyFilter before sending
- 2b: RateLimiter — check rate limits before processing, record
  messages after successful response delivery
- 2c: PersonalityManager — pass to MessageRouter, used for system
  prompt generation instead of raw config.llm.system_prompt
- 2d: WebhookClient — start/stop in lifecycle, fire events on
  message_received, response_sent, error, startup, shutdown
- 2e: WebStatusServer — start/stop in lifecycle, record messages,
  responses, and errors in StatusData
- 2f: AnnouncementScheduler — start/stop in lifecycle, uses
  connector.send_message as callback
- 2g: FallbackBackend — wrap primary backend when config.llm.fallback
  is configured, otherwise use primary directly
- 2h: CommandDispatcher — pass prefix, disabled_commands, and
  custom_commands from config to create_dispatcher()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-02-23 20:16:25 +00:00
commit a71f92a77a
2 changed files with 143 additions and 14 deletions

View file

@ -11,7 +11,8 @@ from pathlib import Path
from typing import Optional
from . import __version__
from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend
from .announcements import AnnouncementScheduler
from .backends import AnthropicBackend, FallbackBackend, GoogleBackend, LLMBackend, OpenAIBackend
from .cli import run_configurator
from .commands import CommandDispatcher
from .commands.dispatcher import create_dispatcher
@ -20,8 +21,13 @@ from .config import Config, load_config
from .connector import MeshConnector, MeshMessage
from .history import ConversationHistory
from .memory import ConversationSummary
from .personality import PersonalityManager
from .rate_limiter import RateLimiter
from .responder import Responder
from .router import MessageRouter, RouteType
from .safety import SafetyFilter, UserFilter
from .web_status import WebStatusServer, get_status_data
from .webhook import WebhookClient
logger = logging.getLogger(__name__)
@ -37,6 +43,13 @@ class MeshAI:
self.llm: Optional[LLMBackend] = None
self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None
self.personality: Optional[PersonalityManager] = None
self.safety_filter: Optional[SafetyFilter] = None
self.user_filter: Optional[UserFilter] = None
self.rate_limiter: Optional[RateLimiter] = None
self.webhook: Optional[WebhookClient] = None
self.web_status: Optional[WebStatusServer] = None
self.announcements: Optional[AnnouncementScheduler] = None
self._running = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._last_cleanup: float = 0.0
@ -57,6 +70,14 @@ class MeshAI:
self._loop = asyncio.get_event_loop()
self._last_cleanup = time.time()
# Start async services
await self.webhook.start()
await self.webhook.on_startup()
await self.announcements.start()
# Start sync services
self.web_status.start()
# Write PID file
self._write_pid()
@ -76,6 +97,16 @@ class MeshAI:
logger.info("Stopping MeshAI...")
self._running = False
if self.webhook:
await self.webhook.on_shutdown()
await self.webhook.stop()
if self.announcements:
await self.announcements.stop()
if self.web_status:
self.web_status.stop()
if self.connector:
self.connector.disconnect()
@ -94,8 +125,30 @@ class MeshAI:
self.history = ConversationHistory(self.config.history)
await self.history.initialize()
# Command dispatcher
self.dispatcher = create_dispatcher()
# Command dispatcher (2h: pass config)
self.dispatcher = create_dispatcher(
prefix=self.config.commands.prefix,
disabled_commands=self.config.commands.disabled_commands,
custom_commands=self.config.commands.custom_commands,
)
# Safety and user filters (2a)
self.user_filter = UserFilter(
blocklist=self.config.users.blocklist,
allowlist=self.config.users.allowlist,
allowlist_only=self.config.users.allowlist_only,
admin_nodes=self.config.users.admin_nodes,
)
self.safety_filter = SafetyFilter(self.config.safety)
# Rate limiter (2b)
self.rate_limiter = RateLimiter(
self.config.rate_limits,
vip_nodes=self.config.users.vip_nodes,
)
# Personality manager (2c)
self.personality = PersonalityManager(self.config.personality)
# LLM backend
api_key = self.config.resolve_api_key()
@ -107,51 +160,100 @@ class MeshAI:
window_size = mem_cfg.window_size if mem_cfg.enabled else 0
summarize_threshold = mem_cfg.summarize_threshold
# Create primary backend
backend = self.config.llm.backend.lower()
if backend == "openai":
self.llm = OpenAIBackend(
primary = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "anthropic":
self.llm = AnthropicBackend(
primary = AnthropicBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
elif backend == "google":
self.llm = GoogleBackend(
primary = GoogleBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
else:
logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI")
self.llm = OpenAIBackend(
primary = OpenAIBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
# Wrap in FallbackBackend if fallback is configured (2g)
if self.config.llm.fallback:
self.llm = FallbackBackend(
self.config.llm, api_key, window_size, summarize_threshold
)
else:
self.llm = primary
# Load persisted summaries into memory cache
await self._load_summaries()
# Meshtastic connector
self.connector = MeshConnector(self.config.connection)
# Message router
# Message router (pass personality manager)
self.router = MessageRouter(
self.config, self.connector, self.history, self.dispatcher, self.llm
self.config, self.connector, self.history, self.dispatcher, self.llm,
personality=self.personality,
)
# Responder
self.responder = Responder(self.config.response, self.connector)
# Webhook client (2d)
self.webhook = WebhookClient(self.config.integrations.webhook)
# Web status server (2e)
self.web_status = WebStatusServer(self.config.web_status)
# Announcement scheduler (2f)
async def _send_announcement(text: str, channel: int) -> None:
self.connector.send_message(text=text, channel=channel)
self.announcements = AnnouncementScheduler(
self.config.announcements,
send_callback=_send_announcement,
)
async def _on_message(self, message: MeshMessage) -> None:
"""Handle incoming message."""
try:
# Check user filter (2a)
allowed, reason = self.user_filter.is_allowed(message.sender_id)
if not allowed:
logger.debug(f"Blocked message from {message.sender_id}: {reason}")
return
# Check if we should respond
if not self.router.should_respond(message):
return
# Check rate limiter (2b)
allowed, reason = self.rate_limiter.is_allowed(message.sender_id)
if not allowed:
logger.debug(f"Rate limited {message.sender_id}: {reason}")
return
logger.info(
f"Processing message from {message.sender_name} ({message.sender_id}): "
f"{message.text[:50]}..."
)
# Record in web status (2e)
get_status_data().record_message(message.sender_id, message.sender_name)
# Send webhook event (2d)
await self.webhook.on_message_received(
sender_id=message.sender_id,
sender_name=message.sender_name,
text=message.text,
channel=message.channel,
is_dm=message.is_dm,
)
# Route the message
result = await self.router.route(message)
@ -169,16 +271,18 @@ class MeshAI:
if not response:
return
# Apply safety filter to LLM responses (2a)
if result.route_type == RouteType.LLM:
response = self.safety_filter.filter_response(response)
# Send response
if message.is_dm:
# Reply as DM
await self.responder.send_response(
text=response,
destination=message.sender_id,
channel=message.channel,
)
else:
# Reply on channel
formatted = self.responder.format_channel_response(
response, message.sender_name, mention_sender=True
)
@ -188,8 +292,21 @@ class MeshAI:
channel=message.channel,
)
# Record response in rate limiter and status (2b, 2e)
self.rate_limiter.record_message(message.sender_id)
get_status_data().record_response()
# Send webhook event (2d)
await self.webhook.on_response_sent(
recipient_id=message.sender_id if message.is_dm else None,
text=response,
channel=message.channel,
)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
get_status_data().record_error(str(e))
await self.webhook.on_error(str(e))
async def _load_summaries(self) -> None:
"""Load persisted summaries from database into memory cache."""

View file

@ -11,6 +11,7 @@ from .commands import CommandContext, CommandDispatcher
from .config import Config
from .connector import MeshConnector, MeshMessage
from .history import ConversationHistory
from .personality import PersonalityManager
logger = logging.getLogger(__name__)
@ -42,12 +43,14 @@ class MessageRouter:
history: ConversationHistory,
dispatcher: CommandDispatcher,
llm_backend: LLMBackend,
personality: Optional[PersonalityManager] = None,
):
self.config = config
self.connector = connector
self.history = history
self.dispatcher = dispatcher
self.llm = llm_backend
self.personality = personality
# Compile mention pattern
bot_name = re.escape(config.bot.name)
@ -68,6 +71,10 @@ class MessageRouter:
# Check if DM
if message.is_dm:
# In DMs, let commands through to dispatcher but skip !commands
# that should be handled by other bots (like MeshMonitor)
if self.dispatcher.is_command(message.text):
return True
return self.config.bot.respond_to_dms
# Check channel filtering
@ -129,10 +136,15 @@ class MessageRouter:
# Get conversation history
history = await self.history.get_history_for_llm(message.sender_id)
# Generate response with user_id for memory optimization
# Use system prompt only if enabled in config
# Get system prompt from personality manager or config
system_prompt = ""
if getattr(self.config.llm, 'use_system_prompt', True):
if self.personality:
system_prompt = self.personality.get_system_prompt(
sender_name=message.sender_name,
channel=message.channel,
)
else:
system_prompt = self.config.llm.system_prompt
try: