From a71f92a77aa738e44389510a948ee5281392ff4b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 23 Feb 2026 20:16:25 +0000 Subject: [PATCH] Wire up all unused modules into main application lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 2a: SafetyFilter + UserFilter — check user access before processing, filter LLM responses through SafetyFilter before sending - 2b: RateLimiter — check rate limits before processing, record messages after successful response delivery - 2c: PersonalityManager — pass to MessageRouter, used for system prompt generation instead of raw config.llm.system_prompt - 2d: WebhookClient — start/stop in lifecycle, fire events on message_received, response_sent, error, startup, shutdown - 2e: WebStatusServer — start/stop in lifecycle, record messages, responses, and errors in StatusData - 2f: AnnouncementScheduler — start/stop in lifecycle, uses connector.send_message as callback - 2g: FallbackBackend — wrap primary backend when config.llm.fallback is configured, otherwise use primary directly - 2h: CommandDispatcher — pass prefix, disabled_commands, and custom_commands from config to create_dispatcher() Co-Authored-By: Claude Opus 4.6 --- meshai/main.py | 139 +++++++++++++++++++++++++++++++++++++++++++---- meshai/router.py | 18 +++++- 2 files changed, 143 insertions(+), 14 deletions(-) diff --git a/meshai/main.py b/meshai/main.py index 906f0e4..4c60e57 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -11,7 +11,8 @@ from pathlib import Path from typing import Optional from . import __version__ -from .backends import AnthropicBackend, GoogleBackend, LLMBackend, OpenAIBackend +from .announcements import AnnouncementScheduler +from .backends import AnthropicBackend, FallbackBackend, GoogleBackend, LLMBackend, OpenAIBackend from .cli import run_configurator from .commands import CommandDispatcher from .commands.dispatcher import create_dispatcher @@ -20,8 +21,13 @@ from .config import Config, load_config from .connector import MeshConnector, MeshMessage from .history import ConversationHistory from .memory import ConversationSummary +from .personality import PersonalityManager +from .rate_limiter import RateLimiter from .responder import Responder from .router import MessageRouter, RouteType +from .safety import SafetyFilter, UserFilter +from .web_status import WebStatusServer, get_status_data +from .webhook import WebhookClient logger = logging.getLogger(__name__) @@ -37,6 +43,13 @@ class MeshAI: self.llm: Optional[LLMBackend] = None self.router: Optional[MessageRouter] = None self.responder: Optional[Responder] = None + self.personality: Optional[PersonalityManager] = None + self.safety_filter: Optional[SafetyFilter] = None + self.user_filter: Optional[UserFilter] = None + self.rate_limiter: Optional[RateLimiter] = None + self.webhook: Optional[WebhookClient] = None + self.web_status: Optional[WebStatusServer] = None + self.announcements: Optional[AnnouncementScheduler] = None self._running = False self._loop: Optional[asyncio.AbstractEventLoop] = None self._last_cleanup: float = 0.0 @@ -57,6 +70,14 @@ class MeshAI: self._loop = asyncio.get_event_loop() self._last_cleanup = time.time() + # Start async services + await self.webhook.start() + await self.webhook.on_startup() + await self.announcements.start() + + # Start sync services + self.web_status.start() + # Write PID file self._write_pid() @@ -76,6 +97,16 @@ class MeshAI: logger.info("Stopping MeshAI...") self._running = False + if self.webhook: + await self.webhook.on_shutdown() + await self.webhook.stop() + + if self.announcements: + await self.announcements.stop() + + if self.web_status: + self.web_status.stop() + if self.connector: self.connector.disconnect() @@ -94,8 +125,30 @@ class MeshAI: self.history = ConversationHistory(self.config.history) await self.history.initialize() - # Command dispatcher - self.dispatcher = create_dispatcher() + # Command dispatcher (2h: pass config) + self.dispatcher = create_dispatcher( + prefix=self.config.commands.prefix, + disabled_commands=self.config.commands.disabled_commands, + custom_commands=self.config.commands.custom_commands, + ) + + # Safety and user filters (2a) + self.user_filter = UserFilter( + blocklist=self.config.users.blocklist, + allowlist=self.config.users.allowlist, + allowlist_only=self.config.users.allowlist_only, + admin_nodes=self.config.users.admin_nodes, + ) + self.safety_filter = SafetyFilter(self.config.safety) + + # Rate limiter (2b) + self.rate_limiter = RateLimiter( + self.config.rate_limits, + vip_nodes=self.config.users.vip_nodes, + ) + + # Personality manager (2c) + self.personality = PersonalityManager(self.config.personality) # LLM backend api_key = self.config.resolve_api_key() @@ -107,51 +160,100 @@ class MeshAI: window_size = mem_cfg.window_size if mem_cfg.enabled else 0 summarize_threshold = mem_cfg.summarize_threshold + # Create primary backend backend = self.config.llm.backend.lower() if backend == "openai": - self.llm = OpenAIBackend( + primary = OpenAIBackend( self.config.llm, api_key, window_size, summarize_threshold ) elif backend == "anthropic": - self.llm = AnthropicBackend( + primary = AnthropicBackend( self.config.llm, api_key, window_size, summarize_threshold ) elif backend == "google": - self.llm = GoogleBackend( + primary = GoogleBackend( self.config.llm, api_key, window_size, summarize_threshold ) else: logger.warning(f"Unknown backend '{backend}', defaulting to OpenAI") - self.llm = OpenAIBackend( + primary = OpenAIBackend( self.config.llm, api_key, window_size, summarize_threshold ) + # Wrap in FallbackBackend if fallback is configured (2g) + if self.config.llm.fallback: + self.llm = FallbackBackend( + self.config.llm, api_key, window_size, summarize_threshold + ) + else: + self.llm = primary + # Load persisted summaries into memory cache await self._load_summaries() # Meshtastic connector self.connector = MeshConnector(self.config.connection) - # Message router + # Message router (pass personality manager) self.router = MessageRouter( - self.config, self.connector, self.history, self.dispatcher, self.llm + self.config, self.connector, self.history, self.dispatcher, self.llm, + personality=self.personality, ) # Responder self.responder = Responder(self.config.response, self.connector) + # Webhook client (2d) + self.webhook = WebhookClient(self.config.integrations.webhook) + + # Web status server (2e) + self.web_status = WebStatusServer(self.config.web_status) + + # Announcement scheduler (2f) + async def _send_announcement(text: str, channel: int) -> None: + self.connector.send_message(text=text, channel=channel) + + self.announcements = AnnouncementScheduler( + self.config.announcements, + send_callback=_send_announcement, + ) + async def _on_message(self, message: MeshMessage) -> None: """Handle incoming message.""" try: + # Check user filter (2a) + allowed, reason = self.user_filter.is_allowed(message.sender_id) + if not allowed: + logger.debug(f"Blocked message from {message.sender_id}: {reason}") + return + # Check if we should respond if not self.router.should_respond(message): return + # Check rate limiter (2b) + allowed, reason = self.rate_limiter.is_allowed(message.sender_id) + if not allowed: + logger.debug(f"Rate limited {message.sender_id}: {reason}") + return + logger.info( f"Processing message from {message.sender_name} ({message.sender_id}): " f"{message.text[:50]}..." ) + # Record in web status (2e) + get_status_data().record_message(message.sender_id, message.sender_name) + + # Send webhook event (2d) + await self.webhook.on_message_received( + sender_id=message.sender_id, + sender_name=message.sender_name, + text=message.text, + channel=message.channel, + is_dm=message.is_dm, + ) + # Route the message result = await self.router.route(message) @@ -169,16 +271,18 @@ class MeshAI: if not response: return + # Apply safety filter to LLM responses (2a) + if result.route_type == RouteType.LLM: + response = self.safety_filter.filter_response(response) + # Send response if message.is_dm: - # Reply as DM await self.responder.send_response( text=response, destination=message.sender_id, channel=message.channel, ) else: - # Reply on channel formatted = self.responder.format_channel_response( response, message.sender_name, mention_sender=True ) @@ -188,8 +292,21 @@ class MeshAI: channel=message.channel, ) + # Record response in rate limiter and status (2b, 2e) + self.rate_limiter.record_message(message.sender_id) + get_status_data().record_response() + + # Send webhook event (2d) + await self.webhook.on_response_sent( + recipient_id=message.sender_id if message.is_dm else None, + text=response, + channel=message.channel, + ) + except Exception as e: logger.error(f"Error handling message: {e}", exc_info=True) + get_status_data().record_error(str(e)) + await self.webhook.on_error(str(e)) async def _load_summaries(self) -> None: """Load persisted summaries from database into memory cache.""" diff --git a/meshai/router.py b/meshai/router.py index 6926b84..8e33193 100644 --- a/meshai/router.py +++ b/meshai/router.py @@ -11,6 +11,7 @@ from .commands import CommandContext, CommandDispatcher from .config import Config from .connector import MeshConnector, MeshMessage from .history import ConversationHistory +from .personality import PersonalityManager logger = logging.getLogger(__name__) @@ -42,12 +43,14 @@ class MessageRouter: history: ConversationHistory, dispatcher: CommandDispatcher, llm_backend: LLMBackend, + personality: Optional[PersonalityManager] = None, ): self.config = config self.connector = connector self.history = history self.dispatcher = dispatcher self.llm = llm_backend + self.personality = personality # Compile mention pattern bot_name = re.escape(config.bot.name) @@ -68,6 +71,10 @@ class MessageRouter: # Check if DM if message.is_dm: + # In DMs, let commands through to dispatcher but skip !commands + # that should be handled by other bots (like MeshMonitor) + if self.dispatcher.is_command(message.text): + return True return self.config.bot.respond_to_dms # Check channel filtering @@ -129,11 +136,16 @@ class MessageRouter: # Get conversation history history = await self.history.get_history_for_llm(message.sender_id) - # Generate response with user_id for memory optimization - # Use system prompt only if enabled in config + # Get system prompt from personality manager or config system_prompt = "" if getattr(self.config.llm, 'use_system_prompt', True): - system_prompt = self.config.llm.system_prompt + if self.personality: + system_prompt = self.personality.get_system_prompt( + sender_name=message.sender_name, + channel=message.channel, + ) + else: + system_prompt = self.config.llm.system_prompt try: response = await self.llm.generate(