Add comprehensive config options matching fq51bbs

New features:
- Rate limiting (per-user and global)
- Enhanced logging with file rotation
- LLM fallback backend support
- Safety filtering (profanity, blocked phrases, emergency keywords)
- User management (blocklist, allowlist, admin/VIP nodes)
- Custom commands with static responses
- Personality/prompt templates with persona switching
- Web status page with JSON API
- Periodic announcements/broadcasts
- Webhook integrations

New modules:
- rate_limiter.py - Per-user and global rate limiting
- safety.py - Response filtering and user access control
- personality.py - Prompt templates and persona management
- web_status.py - Simple web status dashboard
- announcements.py - Periodic broadcast scheduler
- webhook.py - Webhook notification client
- log_setup.py - Enhanced logging configuration
- backends/fallback.py - LLM fallback wrapper

Config expanded from ~50 to ~200 lines with full documentation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt 2025-12-15 13:10:02 -07:00
commit 165da72d8d
13 changed files with 1796 additions and 48 deletions

View file

@ -1,51 +1,191 @@
# MeshAI Configuration # MeshAI Configuration
# Copy to config.yaml and edit as needed # LLM-powered Meshtastic assistant
#
# Copy this to config.yaml and customize as needed
# For Docker: mount as /data/config.yaml
# === BOT IDENTITY ===
bot: bot:
name: "ai" # @mention trigger (e.g., @ai) name: ai # Bot's trigger name (users say "ai help")
owner: "K7ZVX" # Owner callsign/name for logging owner: "" # Owner's callsign (optional)
respond_to_mentions: true # Respond to @botname mentions respond_to_mentions: true # Respond when name is mentioned
respond_to_dms: true # Respond to direct messages respond_to_dms: true # Respond to direct messages
# === MESHTASTIC CONNECTION ===
connection: connection:
type: "serial" # serial or tcp type: tcp # serial | tcp
serial_port: "/dev/ttyUSB0" # Serial port (if type=serial) serial_port: /dev/ttyUSB0 # For serial connection
tcp_host: "192.168.1.100" # TCP host (if type=tcp) tcp_host: localhost # For TCP connection (meshtasticd)
tcp_port: 4403 # TCP port (if type=tcp) tcp_port: 4403
# === CHANNEL FILTERING ===
channels: channels:
mode: "all" # "all" or "whitelist" mode: all # all | whitelist
whitelist: [0] # Channel indices (if mode=whitelist) whitelist: # Only respond on these channels (if mode=whitelist)
- 0
# === RESPONSE BEHAVIOR ===
response: response:
delay_min: 2.2 # Minimum delay before responding (seconds) delay_min: 2.2 # Min delay before responding (seconds)
delay_max: 3.0 # Maximum delay before responding (seconds) delay_max: 3.0 # Max delay before responding
max_length: 150 # Max characters per message chunk max_length: 150 # Max chars per message chunk
max_messages: 2 # Max message chunks per response max_messages: 2 # Max message chunks per response
history: # === RATE LIMITING ===
database: "conversations.db" # SQLite database file rate_limits:
max_messages_per_user: 20 # Max conversation history per user messages_per_minute: 10 # Per-user message limit
conversation_timeout: 86400 # Reset conversation after N seconds (24h) global_messages_per_minute: 30 # Total across all users
cooldown_seconds: 5.0 # Min time between responses to same user
burst_allowance: 3 # Allow short bursts before limiting
# === LOGGING ===
logging:
level: INFO # DEBUG | INFO | WARNING | ERROR
file: "" # Log file path (empty = console only)
max_size_mb: 10 # Max log file size
backup_count: 3 # Number of backup log files
log_messages: true # Log incoming messages
log_responses: true # Log outgoing responses
log_api_calls: false # Log raw LLM API requests (verbose)
# === LLM BACKEND ===
llm: llm:
backend: "openai" # openai, anthropic, or google backend: openai # openai | anthropic | google
api_key: "" # API key (or use env: LLM_API_KEY) api_key: "" # API key (or use LLM_API_KEY env var)
base_url: "https://api.openai.com/v1" # API base URL base_url: https://api.openai.com/v1 # API base URL
model: "gpt-4o-mini" # Model to use model: gpt-4o-mini # Model name
system_prompt: | timeout: 30 # Request timeout (seconds)
system_prompt: >-
You are a helpful assistant on a Meshtastic mesh network. You are a helpful assistant on a Meshtastic mesh network.
Keep responses VERY brief - under 250 characters total. Keep responses VERY brief - under 250 characters total.
Be concise but friendly. No markdown formatting. Be concise but friendly. No markdown formatting.
You may have access to web search for current information.
# Fallback backend (optional) - used if primary fails
# fallback:
# backend: openai
# api_key: ""
# base_url: https://api.openai.com/v1
# model: gpt-4o-mini
# timeout: 30
retry_attempts: 2 # Retries before fallback
fallback_on_error: true # Use fallback on errors
fallback_on_timeout: true # Use fallback on timeouts
# === SAFETY & FILTERING ===
safety:
max_response_length: 250 # Hard cap on response length
filter_profanity: false # Basic profanity filter
blocked_phrases: [] # Phrases to filter out of responses
require_mention: true # Only respond when name is mentioned
ignore_self: true # Don't respond to own messages
emergency_keywords: # Always respond to these (bypass rate limits)
- emergency
- help
- sos
# === USER MANAGEMENT ===
users:
blocklist: [] # Never respond to these node IDs
# - "!abc12345"
allowlist_only: false # If true, only respond to allowlist
allowlist: [] # Exclusive users (if allowlist_only=true)
admin_nodes: [] # Nodes with admin command access
vip_nodes: [] # Nodes that bypass rate limits
# === COMMANDS ===
commands:
enabled: true
prefix: "!" # Command prefix (e.g., !weather, !help)
disabled_commands: [] # Built-in commands to disable
# - reset # Disable !reset command
custom_commands: {} # User-defined static response commands
# Example custom commands:
# custom_commands:
# ping:
# response: "Pong! MeshAI online."
# rules:
# response: "Be respectful. Keep it brief. No spam."
# freq:
# response: "Primary: 906.875 MHz | Alt: 903.125 MHz"
# === PERSONALITY ===
personality:
system_prompt: "" # Override llm.system_prompt if set
context_injection: "" # Template with {time}, {sender_name}, {channel}
# Example: "Current time: {time}. Speaking with {sender_name}."
personas: {} # Named personality variants
# Example personas:
# personas:
# serious:
# trigger: "!serious"
# prompt: "Respond formally and technically. No jokes."
# casual:
# trigger: "!casual"
# prompt: "Be casual and use humor when appropriate."
# === CONVERSATION HISTORY ===
history:
database: /data/conversations.db
max_messages_per_user: 50 # Messages to keep per user
conversation_timeout: 86400 # Conversation expiry (seconds, 86400=24h)
auto_cleanup: true # Auto-delete old conversations
cleanup_interval_hours: 24 # How often to run cleanup
max_age_days: 30 # Delete conversations older than this
# === MEMORY OPTIMIZATION ===
memory:
enabled: true # Enable rolling summary memory
window_size: 4 # Recent message pairs to keep in full
summarize_threshold: 8 # Messages before re-summarizing
# === WEB STATUS PAGE ===
web_status:
enabled: false # Enable web status page
port: 8080 # Status page port
show_uptime: true
show_message_count: true
show_connected_nodes: true
show_recent_activity: false # Privacy concern - disabled by default
require_auth: false # Require password
auth_password: "" # Password if require_auth=true
# === ANNOUNCEMENTS ===
announcements:
enabled: false # Enable periodic announcements
interval_hours: 24 # Time between announcements
channel: 0 # Channel to broadcast on
messages: [] # Messages to rotate through
# Example:
# messages:
# - "MeshAI online. Mention 'ai' for help!"
# - "Type !help for available commands."
random_order: true # Randomize message order
# === WEATHER ===
weather: weather:
primary: "openmeteo" # openmeteo, wttr, or llm primary: openmeteo # openmeteo | wttr | llm
fallback: "llm" # openmeteo, wttr, llm, or none fallback: llm # Fallback provider
default_location: "" # Default location if no GPS default_location: "" # Default location if none specified
openmeteo: openmeteo:
url: "https://api.open-meteo.com/v1" url: https://api.open-meteo.com/v1
wttr: wttr:
url: "https://wttr.in" url: https://wttr.in
# === INTEGRATIONS ===
integrations:
weather: # Duplicate of top-level weather (for nesting)
primary: openmeteo
fallback: llm
default_location: ""
openmeteo:
url: https://api.open-meteo.com/v1
wttr:
url: https://wttr.in
webhook:
enabled: false # Enable webhook notifications
url: "" # Webhook URL
events: # Events to send
- message_received
- response_sent
- error

View file

@ -10,7 +10,7 @@ if [ ! -f "$MESHAI_CONFIG" ]; then
mkdir -p /data mkdir -p /data
cat > "$MESHAI_CONFIG" << 'EOF' cat > "$MESHAI_CONFIG" << 'EOF'
# MeshAI Configuration # MeshAI Configuration
# Configure via http://localhost:7681 # Configure via http://localhost:7682
bot: bot:
name: ai name: ai
@ -35,10 +35,28 @@ response:
max_length: 150 max_length: 150
max_messages: 2 max_messages: 2
rate_limits:
messages_per_minute: 10
global_messages_per_minute: 30
cooldown_seconds: 5.0
burst_allowance: 3
logging:
level: INFO
file: /data/meshai.log
max_size_mb: 10
backup_count: 3
log_messages: true
log_responses: true
log_api_calls: false
history: history:
database: /data/conversations.db database: /data/conversations.db
max_messages_per_user: 20 max_messages_per_user: 50
conversation_timeout: 86400 conversation_timeout: 86400
auto_cleanup: true
cleanup_interval_hours: 24
max_age_days: 30
memory: memory:
enabled: true enabled: true
@ -50,10 +68,60 @@ llm:
api_key: "" api_key: ""
base_url: https://api.openai.com/v1 base_url: https://api.openai.com/v1
model: gpt-4o-mini model: gpt-4o-mini
timeout: 30
system_prompt: >- system_prompt: >-
You are a helpful assistant on a Meshtastic mesh network. You are a helpful assistant on a Meshtastic mesh network.
Keep responses VERY brief - under 250 characters total. Keep responses VERY brief - under 250 characters total.
Be concise but friendly. No markdown formatting. Be concise but friendly. No markdown formatting.
retry_attempts: 2
fallback_on_error: true
fallback_on_timeout: true
safety:
max_response_length: 250
filter_profanity: false
blocked_phrases: []
require_mention: true
ignore_self: true
emergency_keywords:
- emergency
- help
- sos
users:
blocklist: []
allowlist_only: false
allowlist: []
admin_nodes: []
vip_nodes: []
commands:
enabled: true
prefix: "!"
disabled_commands: []
custom_commands: {}
personality:
system_prompt: ""
context_injection: ""
personas: {}
web_status:
enabled: false
port: 8080
show_uptime: true
show_message_count: true
show_connected_nodes: true
show_recent_activity: false
require_auth: false
auth_password: ""
announcements:
enabled: false
interval_hours: 24
channel: 0
messages: []
random_order: true
weather: weather:
primary: openmeteo primary: openmeteo
@ -63,6 +131,19 @@ weather:
url: https://api.open-meteo.com/v1 url: https://api.open-meteo.com/v1
wttr: wttr:
url: https://wttr.in url: https://wttr.in
integrations:
weather:
primary: openmeteo
fallback: llm
default_location: ""
webhook:
enabled: false
url: ""
events:
- message_received
- response_sent
- error
EOF EOF
echo "Default config created. Configure via http://localhost:7682" echo "Default config created. Configure via http://localhost:7682"
fi fi
@ -70,6 +151,7 @@ fi
# Start ttyd for web-based config access # Start ttyd for web-based config access
echo "Starting web config interface on port 7682..." echo "Starting web config interface on port 7682..."
ttyd -W -p 7682 \ ttyd -W -p 7682 \
-t enableClipboard=true \
-t titleFixed="MeshAI Config" \ -t titleFixed="MeshAI Config" \
-t 'theme={"background":"#0d1117","foreground":"#c9d1d9","cursor":"#58a6ff","selectionBackground":"#388bfd"}' \ -t 'theme={"background":"#0d1117","foreground":"#c9d1d9","cursor":"#58a6ff","selectionBackground":"#388bfd"}' \
-t fontSize=14 \ -t fontSize=14 \

109
meshai/announcements.py Normal file
View file

@ -0,0 +1,109 @@
"""Periodic announcements/broadcasts for MeshAI."""
import asyncio
import logging
import random
from typing import Callable, Optional
from .config import AnnouncementsConfig
logger = logging.getLogger(__name__)
class AnnouncementScheduler:
"""Scheduler for periodic announcements."""
def __init__(
self,
config: AnnouncementsConfig,
send_callback: Callable[[str, int], asyncio.coroutine],
):
"""Initialize the announcement scheduler.
Args:
config: Announcements configuration
send_callback: Async callback to send messages: (text, channel) -> None
"""
self.config = config
self._send_callback = send_callback
self._task: Optional[asyncio.Task] = None
self._message_index = 0
self._running = False
async def start(self):
"""Start the announcement scheduler."""
if not self.config.enabled or not self.config.messages:
logger.debug("Announcements disabled or no messages configured")
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
logger.info(
f"Announcement scheduler started (every {self.config.interval_hours}h)"
)
async def stop(self):
"""Stop the announcement scheduler."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info("Announcement scheduler stopped")
async def _run_loop(self):
"""Main loop for sending periodic announcements."""
# Wait a bit before first announcement
await asyncio.sleep(60) # 1 minute initial delay
while self._running:
try:
# Get next message
message = self._get_next_message()
if message:
logger.info(f"Sending announcement to channel {self.config.channel}")
await self._send_callback(message, self.config.channel)
# Wait for next interval
await asyncio.sleep(self.config.interval_hours * 3600)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in announcement loop: {e}")
await asyncio.sleep(300) # Wait 5 min on error
def _get_next_message(self) -> Optional[str]:
"""Get the next announcement message."""
if not self.config.messages:
return None
if self.config.random_order:
return random.choice(self.config.messages)
else:
message = self.config.messages[self._message_index]
self._message_index = (self._message_index + 1) % len(self.config.messages)
return message
async def send_now(self, message: Optional[str] = None) -> bool:
"""Send an announcement immediately.
Args:
message: Optional specific message, or use next in rotation
Returns:
True if sent successfully
"""
msg = message or self._get_next_message()
if not msg:
return False
try:
await self._send_callback(msg, self.config.channel)
return True
except Exception as e:
logger.error(f"Failed to send announcement: {e}")
return False

View file

@ -4,5 +4,13 @@ from .base import LLMBackend
from .openai_backend import OpenAIBackend from .openai_backend import OpenAIBackend
from .anthropic_backend import AnthropicBackend from .anthropic_backend import AnthropicBackend
from .google_backend import GoogleBackend from .google_backend import GoogleBackend
from .fallback import FallbackBackend, create_backend
__all__ = ["LLMBackend", "OpenAIBackend", "AnthropicBackend", "GoogleBackend"] __all__ = [
"LLMBackend",
"OpenAIBackend",
"AnthropicBackend",
"GoogleBackend",
"FallbackBackend",
"create_backend",
]

218
meshai/backends/fallback.py Normal file
View file

@ -0,0 +1,218 @@
"""Fallback-aware LLM backend wrapper."""
import asyncio
import logging
from typing import Optional
from ..config import LLMConfig, LLMBackendConfig
from .base import LLMBackend
from .openai_backend import OpenAIBackend
from .anthropic_backend import AnthropicBackend
from .google_backend import GoogleBackend
logger = logging.getLogger(__name__)
def create_backend(
backend_type: str,
api_key: str,
base_url: str,
model: str,
timeout: int,
window_size: int = 0,
summarize_threshold: int = 8,
) -> LLMBackend:
"""Create an LLM backend instance.
Args:
backend_type: Type of backend (openai, anthropic, google)
api_key: API key for the backend
base_url: Base URL for the API
model: Model name to use
timeout: Request timeout in seconds
window_size: Memory window size
summarize_threshold: When to summarize older messages
Returns:
Configured LLM backend instance
"""
# Create a minimal config object for the backend
from dataclasses import dataclass
@dataclass
class MinimalLLMConfig:
backend: str
api_key: str
base_url: str
model: str
system_prompt: str = ""
config = MinimalLLMConfig(
backend=backend_type,
api_key=api_key,
base_url=base_url,
model=model,
)
backend_type = backend_type.lower()
if backend_type == "openai":
return OpenAIBackend(config, api_key, window_size, summarize_threshold)
elif backend_type == "anthropic":
return AnthropicBackend(config, api_key, window_size, summarize_threshold)
elif backend_type == "google":
return GoogleBackend(config, api_key, window_size, summarize_threshold)
else:
logger.warning(f"Unknown backend '{backend_type}', defaulting to OpenAI")
return OpenAIBackend(config, api_key, window_size, summarize_threshold)
class FallbackBackend(LLMBackend):
"""LLM backend with automatic fallback support."""
def __init__(
self,
config: LLMConfig,
api_key: str,
window_size: int = 0,
summarize_threshold: int = 8,
):
self.config = config
self.api_key = api_key
self.window_size = window_size
self.summarize_threshold = summarize_threshold
# Create primary backend
self.primary = create_backend(
backend_type=config.backend,
api_key=api_key,
base_url=config.base_url,
model=config.model,
timeout=config.timeout,
window_size=window_size,
summarize_threshold=summarize_threshold,
)
# Create fallback backend if configured
self.fallback: Optional[LLMBackend] = None
if config.fallback:
fb = config.fallback
fb_api_key = fb.api_key or api_key # Use primary key if not specified
self.fallback = create_backend(
backend_type=fb.backend,
api_key=fb_api_key,
base_url=fb.base_url,
model=fb.model,
timeout=fb.timeout,
window_size=window_size,
summarize_threshold=summarize_threshold,
)
self._using_fallback = False
@property
def using_fallback(self) -> bool:
"""Whether we're currently using the fallback backend."""
return self._using_fallback
def get_memory(self):
"""Get memory from the active backend."""
if self._using_fallback and self.fallback:
return self.fallback.get_memory()
return self.primary.get_memory()
async def generate(
self,
messages: list[dict],
system_prompt: str,
max_tokens: int = 300,
user_id: Optional[str] = None,
) -> str:
"""Generate with automatic fallback."""
last_error = None
# Try primary
for attempt in range(self.config.retry_attempts):
try:
result = await asyncio.wait_for(
self.primary.generate(messages, system_prompt, max_tokens, user_id),
timeout=self.config.timeout,
)
self._using_fallback = False
return result
except asyncio.TimeoutError as e:
logger.warning(f"Primary backend timeout (attempt {attempt + 1})")
last_error = e
if not self.config.fallback_on_timeout:
raise
except Exception as e:
logger.warning(f"Primary backend error (attempt {attempt + 1}): {e}")
last_error = e
if not self.config.fallback_on_error:
raise
# Try fallback if available
if self.fallback:
logger.info("Switching to fallback backend")
try:
result = await asyncio.wait_for(
self.fallback.generate(messages, system_prompt, max_tokens, user_id),
timeout=self.config.fallback.timeout if self.config.fallback else 30,
)
self._using_fallback = True
return result
except Exception as e:
logger.error(f"Fallback backend also failed: {e}")
raise
# No fallback, raise the last error
if last_error:
raise last_error
raise RuntimeError("All LLM backends failed")
async def generate_with_search(
self,
query: str,
system_prompt: Optional[str] = None,
) -> str:
"""Generate with search using automatic fallback."""
last_error = None
# Try primary
try:
result = await asyncio.wait_for(
self.primary.generate_with_search(query, system_prompt),
timeout=self.config.timeout,
)
self._using_fallback = False
return result
except asyncio.TimeoutError as e:
logger.warning("Primary backend timeout for search")
last_error = e
if not self.config.fallback_on_timeout:
raise
except Exception as e:
logger.warning(f"Primary backend search error: {e}")
last_error = e
if not self.config.fallback_on_error:
raise
# Try fallback
if self.fallback:
logger.info("Switching to fallback backend for search")
try:
result = await self.fallback.generate_with_search(query, system_prompt)
self._using_fallback = True
return result
except Exception as e:
logger.error(f"Fallback search also failed: {e}")
raise
if last_error:
raise last_error
raise RuntimeError("All LLM backends failed")
async def close(self) -> None:
"""Close both backends."""
await self.primary.close()
if self.fallback:
await self.fallback.close()

View file

@ -8,11 +8,38 @@ from .base import CommandContext, CommandHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class CustomCommandHandler(CommandHandler):
"""Handler for user-defined static response commands."""
def __init__(self, name: str, response: str, description: str = "Custom command"):
self._name = name
self._response = response
self._description = description
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return self._description
@property
def usage(self) -> str:
return f"!{self._name}"
async def execute(self, args: str, context: CommandContext) -> str:
return self._response
class CommandDispatcher: class CommandDispatcher:
"""Registry and dispatcher for bang commands.""" """Registry and dispatcher for bang commands."""
def __init__(self): def __init__(self, prefix: str = "!", disabled_commands: Optional[list[str]] = None):
self._commands: dict[str, CommandHandler] = {} self._commands: dict[str, CommandHandler] = {}
self._custom_commands: dict[str, str] = {}
self.prefix = prefix
self.disabled_commands = set(c.upper() for c in (disabled_commands or []))
def register(self, handler: CommandHandler) -> None: def register(self, handler: CommandHandler) -> None:
"""Register a command handler. """Register a command handler.
@ -21,9 +48,40 @@ class CommandDispatcher:
handler: CommandHandler instance to register handler: CommandHandler instance to register
""" """
name = handler.name.upper() name = handler.name.upper()
if name in self.disabled_commands:
logger.debug(f"Skipping disabled command: !{handler.name}")
return
self._commands[name] = handler self._commands[name] = handler
logger.debug(f"Registered command: !{handler.name}") logger.debug(f"Registered command: !{handler.name}")
def register_custom(self, name: str, response: str, description: str = "Custom command") -> None:
"""Register a custom static response command.
Args:
name: Command name (without prefix)
response: Static response text
description: Command description for help
"""
handler = CustomCommandHandler(name, response, description)
self.register(handler)
self._custom_commands[name.upper()] = response
def unregister(self, name: str) -> bool:
"""Unregister a command.
Args:
name: Command name to remove
Returns:
True if command was removed, False if not found
"""
name = name.upper()
if name in self._commands:
del self._commands[name]
self._custom_commands.pop(name, None)
return True
return False
def get_commands(self) -> list[CommandHandler]: def get_commands(self) -> list[CommandHandler]:
"""Get all registered command handlers.""" """Get all registered command handlers."""
return list(self._commands.values()) return list(self._commands.values())
@ -35,25 +93,25 @@ class CommandDispatcher:
text: Message text to check text: Message text to check
Returns: Returns:
True if text starts with ! True if text starts with command prefix
""" """
return text.strip().startswith("!") return text.strip().startswith(self.prefix)
def parse(self, text: str) -> tuple[Optional[str], str]: def parse(self, text: str) -> tuple[Optional[str], str]:
"""Parse command and arguments from text. """Parse command and arguments from text.
Args: Args:
text: Message text starting with ! text: Message text starting with command prefix
Returns: Returns:
Tuple of (command_name, arguments) or (None, "") if invalid Tuple of (command_name, arguments) or (None, "") if invalid
""" """
text = text.strip() text = text.strip()
if not text.startswith("!"): if not text.startswith(self.prefix):
return None, "" return None, ""
# Remove ! prefix # Remove prefix
text = text[1:] text = text[len(self.prefix):]
# Split into command and args # Split into command and args
parts = text.split(maxsplit=1) parts = text.split(maxsplit=1)
@ -96,21 +154,48 @@ class CommandDispatcher:
return f"Error: {str(e)[:100]}" return f"Error: {str(e)[:100]}"
def create_dispatcher() -> CommandDispatcher: def create_dispatcher(
"""Create and populate command dispatcher with default commands.""" prefix: str = "!",
disabled_commands: Optional[list[str]] = None,
custom_commands: Optional[dict] = None,
) -> CommandDispatcher:
"""Create and populate command dispatcher with default commands.
Args:
prefix: Command prefix (default: "!")
disabled_commands: List of command names to disable
custom_commands: Dict of name -> response for custom commands
Returns:
Configured CommandDispatcher
"""
from .help import HelpCommand from .help import HelpCommand
from .ping import PingCommand from .ping import PingCommand
from .reset import ResetCommand from .reset import ResetCommand
from .status import StatusCommand from .status import StatusCommand
from .weather import WeatherCommand from .weather import WeatherCommand
dispatcher = CommandDispatcher() dispatcher = CommandDispatcher(prefix=prefix, disabled_commands=disabled_commands)
# Register all commands # Register all built-in commands
dispatcher.register(HelpCommand(dispatcher)) dispatcher.register(HelpCommand(dispatcher))
dispatcher.register(PingCommand()) dispatcher.register(PingCommand())
dispatcher.register(ResetCommand()) dispatcher.register(ResetCommand())
dispatcher.register(StatusCommand()) dispatcher.register(StatusCommand())
dispatcher.register(WeatherCommand()) dispatcher.register(WeatherCommand())
# Register custom commands
if custom_commands:
for name, response in custom_commands.items():
if isinstance(response, dict):
# Support dict format: {response: "...", description: "..."}
dispatcher.register_custom(
name,
response.get("response", ""),
response.get("description", "Custom command"),
)
else:
# Simple string response
dispatcher.register_custom(name, str(response))
return dispatcher return dispatcher

View file

@ -18,6 +18,131 @@ class BotConfig:
respond_to_dms: bool = True respond_to_dms: bool = True
@dataclass
class RateLimitsConfig:
"""Rate limiting settings."""
messages_per_minute: int = 10 # Per-user message limit
global_messages_per_minute: int = 30 # Total across all users
cooldown_seconds: float = 5.0 # Min time between responses to same user
burst_allowance: int = 3 # Allow short bursts before limiting
@dataclass
class LoggingConfig:
"""Logging settings."""
level: str = "INFO" # DEBUG | INFO | WARNING | ERROR
file: str = "" # Empty = console only
max_size_mb: int = 10
backup_count: int = 3
log_messages: bool = True # Log incoming messages
log_responses: bool = True # Log outgoing responses
log_api_calls: bool = False # Log raw LLM API requests (verbose)
@dataclass
class LLMBackendConfig:
"""Single LLM backend configuration."""
backend: str = "openai" # openai, anthropic, google
api_key: str = ""
base_url: str = "https://api.openai.com/v1"
model: str = "gpt-4o-mini"
timeout: int = 30
@dataclass
class SafetyConfig:
"""Response filtering and safety settings."""
max_response_length: int = 250 # Hard cap on response length
filter_profanity: bool = False # Basic profanity filter
blocked_phrases: list[str] = field(default_factory=list) # Phrases to filter out
require_mention: bool = True # Only respond when mentioned by name
ignore_self: bool = True # Don't respond to own messages
emergency_keywords: list[str] = field(
default_factory=lambda: ["emergency", "help", "sos"]
) # Always respond to these
@dataclass
class UsersConfig:
"""User management settings."""
blocklist: list[str] = field(default_factory=list) # Never respond to these node IDs
allowlist_only: bool = False # If true, only respond to allowlist
allowlist: list[str] = field(default_factory=list) # Exclusive users
admin_nodes: list[str] = field(default_factory=list) # Nodes with admin commands
vip_nodes: list[str] = field(default_factory=list) # Skip rate limits
@dataclass
class CustomCommandConfig:
"""Custom static command definition."""
response: str = ""
@dataclass
class CommandsConfig:
"""Command customization settings."""
enabled: bool = True
prefix: str = "!" # Command prefix
custom_commands: dict = field(default_factory=dict) # name -> response mapping
disabled_commands: list[str] = field(default_factory=list) # Built-in commands to disable
@dataclass
class PersonalityConfig:
"""Personality and prompt settings."""
system_prompt: str = (
"You are a helpful assistant on a Meshtastic mesh network. "
"Keep responses VERY brief - under 250 characters total. "
"Be concise but friendly. No markdown formatting."
)
context_injection: str = "" # Template with {time}, {sender_name}, {channel}
personas: dict = field(default_factory=dict) # trigger -> prompt mapping
@dataclass
class WebStatusConfig:
"""Web status page settings."""
enabled: bool = False
port: int = 8080
show_uptime: bool = True
show_message_count: bool = True
show_connected_nodes: bool = True
show_recent_activity: bool = False # Privacy concern
require_auth: bool = False
auth_password: str = ""
@dataclass
class AnnouncementsConfig:
"""Periodic announcement settings."""
enabled: bool = False
interval_hours: int = 24
channel: int = 0
messages: list[str] = field(default_factory=list)
random_order: bool = True
@dataclass
class WebhookConfig:
"""Webhook integration settings."""
enabled: bool = False
url: str = ""
events: list[str] = field(
default_factory=lambda: ["message_received", "response_sent", "error"]
)
@dataclass @dataclass
class ConnectionConfig: class ConnectionConfig:
"""Meshtastic connection settings.""" """Meshtastic connection settings."""
@ -51,9 +176,14 @@ class HistoryConfig:
"""Conversation history settings.""" """Conversation history settings."""
database: str = "conversations.db" database: str = "conversations.db"
max_messages_per_user: int = 20 max_messages_per_user: int = 50
conversation_timeout: int = 86400 # 24 hours conversation_timeout: int = 86400 # 24 hours
# Cleanup settings
auto_cleanup: bool = True
cleanup_interval_hours: int = 24
max_age_days: int = 30 # Delete conversations older than this
@dataclass @dataclass
class MemoryConfig: class MemoryConfig:
@ -66,18 +196,28 @@ class MemoryConfig:
@dataclass @dataclass
class LLMConfig: class LLMConfig:
"""LLM backend settings.""" """LLM backend settings with fallback support."""
# Primary backend (backwards compatible with old config)
backend: str = "openai" # openai, anthropic, google backend: str = "openai" # openai, anthropic, google
api_key: str = "" api_key: str = ""
base_url: str = "https://api.openai.com/v1" base_url: str = "https://api.openai.com/v1"
model: str = "gpt-4o-mini" model: str = "gpt-4o-mini"
timeout: int = 30
# System prompt (kept for backwards compat, personality.system_prompt preferred)
system_prompt: str = ( system_prompt: str = (
"You are a helpful assistant on a Meshtastic mesh network. " "You are a helpful assistant on a Meshtastic mesh network. "
"Keep responses VERY brief - under 250 characters total. " "Keep responses VERY brief - under 250 characters total. "
"Be concise but friendly. No markdown formatting." "Be concise but friendly. No markdown formatting."
) )
# Fallback settings
fallback: Optional[LLMBackendConfig] = None
retry_attempts: int = 2
fallback_on_error: bool = True
fallback_on_timeout: bool = True
@dataclass @dataclass
class OpenMeteoConfig: class OpenMeteoConfig:
@ -104,6 +244,14 @@ class WeatherConfig:
wttr: WttrConfig = field(default_factory=WttrConfig) wttr: WttrConfig = field(default_factory=WttrConfig)
@dataclass
class IntegrationsConfig:
"""External integrations settings."""
weather: WeatherConfig = field(default_factory=WeatherConfig)
webhook: WebhookConfig = field(default_factory=WebhookConfig)
@dataclass @dataclass
class Config: class Config:
"""Main configuration container.""" """Main configuration container."""
@ -115,10 +263,29 @@ class Config:
history: HistoryConfig = field(default_factory=HistoryConfig) history: HistoryConfig = field(default_factory=HistoryConfig)
memory: MemoryConfig = field(default_factory=MemoryConfig) memory: MemoryConfig = field(default_factory=MemoryConfig)
llm: LLMConfig = field(default_factory=LLMConfig) llm: LLMConfig = field(default_factory=LLMConfig)
# New config sections
rate_limits: RateLimitsConfig = field(default_factory=RateLimitsConfig)
logging: LoggingConfig = field(default_factory=LoggingConfig)
safety: SafetyConfig = field(default_factory=SafetyConfig)
users: UsersConfig = field(default_factory=UsersConfig)
commands: CommandsConfig = field(default_factory=CommandsConfig)
personality: PersonalityConfig = field(default_factory=PersonalityConfig)
web_status: WebStatusConfig = field(default_factory=WebStatusConfig)
announcements: AnnouncementsConfig = field(default_factory=AnnouncementsConfig)
integrations: IntegrationsConfig = field(default_factory=IntegrationsConfig)
# Keep weather at top level for backwards compatibility
weather: WeatherConfig = field(default_factory=WeatherConfig) weather: WeatherConfig = field(default_factory=WeatherConfig)
_config_path: Optional[Path] = field(default=None, repr=False) _config_path: Optional[Path] = field(default=None, repr=False)
def get_system_prompt(self) -> str:
"""Get effective system prompt, preferring personality config."""
if self.personality.system_prompt:
return self.personality.system_prompt
return self.llm.system_prompt
def resolve_api_key(self) -> str: def resolve_api_key(self) -> str:
"""Resolve API key from config or environment.""" """Resolve API key from config or environment."""
if self.llm.api_key: if self.llm.api_key:

122
meshai/log_setup.py Normal file
View file

@ -0,0 +1,122 @@
"""Enhanced logging setup for MeshAI."""
import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional
from .config import LoggingConfig
# Custom log levels for message tracking
MESSAGE_IN = 25 # Between INFO (20) and WARNING (30)
MESSAGE_OUT = 26
API_CALL = 15 # Between DEBUG (10) and INFO (20)
logging.addLevelName(MESSAGE_IN, "MSG_IN")
logging.addLevelName(MESSAGE_OUT, "MSG_OUT")
logging.addLevelName(API_CALL, "API")
class MeshAILogger(logging.Logger):
"""Custom logger with message tracking methods."""
def message_in(self, sender: str, text: str, channel: int = 0):
"""Log an incoming message."""
if self.isEnabledFor(MESSAGE_IN):
self._log(MESSAGE_IN, f"[CH{channel}] {sender}: {text}", ())
def message_out(self, recipient: str, text: str, channel: int = 0):
"""Log an outgoing message."""
if self.isEnabledFor(MESSAGE_OUT):
self._log(MESSAGE_OUT, f"[CH{channel}] -> {recipient}: {text}", ())
def api_call(self, backend: str, model: str, tokens: Optional[int] = None):
"""Log an API call."""
if self.isEnabledFor(API_CALL):
msg = f"API call to {backend}/{model}"
if tokens:
msg += f" ({tokens} tokens)"
self._log(API_CALL, msg, ())
# Set the custom logger class
logging.setLoggerClass(MeshAILogger)
def setup_logging(config: LoggingConfig, verbose: bool = False) -> logging.Logger:
"""Configure logging based on config.
Args:
config: Logging configuration
verbose: Override to enable DEBUG level
Returns:
The configured root logger
"""
# Determine log level
if verbose:
level = logging.DEBUG
else:
level_name = config.level.upper()
level = getattr(logging, level_name, logging.INFO)
# Create formatter
formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Get root logger
root_logger = logging.getLogger()
root_logger.setLevel(level)
# Clear existing handlers
root_logger.handlers.clear()
# Console handler (always)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler (if configured)
if config.file:
log_path = Path(config.file)
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = RotatingFileHandler(
log_path,
maxBytes=config.max_size_mb * 1024 * 1024,
backupCount=config.backup_count,
)
file_handler.setLevel(level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Configure message logging levels based on config
meshai_logger = logging.getLogger("meshai")
if not config.log_messages:
# Disable message logging
meshai_logger.addFilter(lambda r: r.levelno not in (MESSAGE_IN, MESSAGE_OUT))
if not config.log_api_calls:
# Disable API call logging (it's DEBUG level anyway)
meshai_logger.addFilter(lambda r: r.levelno != API_CALL)
return root_logger
def get_logger(name: str = "meshai") -> MeshAILogger:
"""Get a MeshAI logger instance.
Args:
name: Logger name (will be prefixed with 'meshai.')
Returns:
Configured logger instance
"""
if not name.startswith("meshai"):
name = f"meshai.{name}"
return logging.getLogger(name)

119
meshai/personality.py Normal file
View file

@ -0,0 +1,119 @@
"""Personality and prompt template handling for MeshAI."""
import re
from datetime import datetime
from typing import Optional
from .config import PersonalityConfig
class PersonalityManager:
"""Manages personality switching and prompt templating."""
def __init__(self, config: PersonalityConfig):
self.config = config
self._current_persona: Optional[str] = None
self._persona_prompts: dict[str, str] = {}
# Parse personas from config
for name, persona_data in config.personas.items():
if isinstance(persona_data, dict):
self._persona_prompts[name] = persona_data.get("prompt", "")
else:
self._persona_prompts[name] = str(persona_data)
def get_system_prompt(
self,
sender_name: str = "",
channel: int = 0,
extra_context: Optional[dict] = None,
) -> str:
"""Get the current system prompt with context injection.
Args:
sender_name: Name of the message sender
channel: Channel number
extra_context: Additional context variables
Returns:
Formatted system prompt
"""
# Start with base prompt or persona prompt
if self._current_persona and self._current_persona in self._persona_prompts:
base_prompt = self._persona_prompts[self._current_persona]
else:
base_prompt = self.config.system_prompt
# Apply context injection if configured
if self.config.context_injection:
context_vars = {
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"sender_name": sender_name,
"channel": str(channel),
"persona": self._current_persona or "default",
}
if extra_context:
context_vars.update(extra_context)
try:
context = self.config.context_injection.format(**context_vars)
base_prompt = f"{base_prompt}\n\n{context}"
except KeyError as e:
# Ignore missing context variables
pass
return base_prompt
def check_persona_trigger(self, text: str) -> Optional[str]:
"""Check if text contains a persona switch trigger.
Args:
text: Message text to check
Returns:
Persona name if triggered, None otherwise
"""
text_lower = text.lower().strip()
for name, persona_data in self.config.personas.items():
trigger = None
if isinstance(persona_data, dict):
trigger = persona_data.get("trigger", f"!{name}")
else:
trigger = f"!{name}"
if trigger and text_lower.startswith(trigger.lower()):
return name
return None
def switch_persona(self, persona_name: Optional[str]) -> bool:
"""Switch to a different persona.
Args:
persona_name: Name of persona to switch to, or None for default
Returns:
True if switch was successful
"""
if persona_name is None:
self._current_persona = None
return True
if persona_name in self._persona_prompts:
self._current_persona = persona_name
return True
return False
def get_current_persona(self) -> Optional[str]:
"""Get the name of the current persona."""
return self._current_persona
def list_personas(self) -> list[str]:
"""List available persona names."""
return list(self._persona_prompts.keys())
def reset(self) -> None:
"""Reset to default persona."""
self._current_persona = None

115
meshai/rate_limiter.py Normal file
View file

@ -0,0 +1,115 @@
"""Rate limiting for MeshAI."""
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
from .config import RateLimitsConfig
@dataclass
class UserRateState:
"""Rate limit state for a single user."""
message_times: list[float] = field(default_factory=list)
last_response_time: float = 0.0
burst_count: int = 0
class RateLimiter:
"""Rate limiter for message processing."""
def __init__(self, config: RateLimitsConfig, vip_nodes: Optional[list[str]] = None):
self.config = config
self.vip_nodes = set(vip_nodes or [])
self._user_states: dict[str, UserRateState] = defaultdict(UserRateState)
self._global_times: list[float] = []
def is_allowed(self, user_id: str) -> tuple[bool, Optional[str]]:
"""Check if a message from user is allowed.
Args:
user_id: The user's node ID
Returns:
Tuple of (allowed, reason). If not allowed, reason explains why.
"""
# VIP users bypass rate limits
if user_id in self.vip_nodes:
return True, None
now = time.time()
state = self._user_states[user_id]
# Clean old timestamps (older than 1 minute)
cutoff = now - 60.0
state.message_times = [t for t in state.message_times if t > cutoff]
self._global_times = [t for t in self._global_times if t > cutoff]
# Check cooldown (minimum time between responses to same user)
if state.last_response_time > 0:
elapsed = now - state.last_response_time
if elapsed < self.config.cooldown_seconds:
remaining = self.config.cooldown_seconds - elapsed
return False, f"Cooldown: wait {remaining:.1f}s"
# Check per-user rate limit
if len(state.message_times) >= self.config.messages_per_minute:
# Check burst allowance
if state.burst_count >= self.config.burst_allowance:
return False, "Rate limit exceeded (per-user)"
state.burst_count += 1
else:
state.burst_count = 0
# Check global rate limit
if len(self._global_times) >= self.config.global_messages_per_minute:
return False, "Rate limit exceeded (global)"
return True, None
def record_message(self, user_id: str) -> None:
"""Record that a message was processed for a user."""
now = time.time()
state = self._user_states[user_id]
state.message_times.append(now)
state.last_response_time = now
self._global_times.append(now)
def get_user_stats(self, user_id: str) -> dict:
"""Get rate limit stats for a user."""
now = time.time()
state = self._user_states[user_id]
cutoff = now - 60.0
recent_count = len([t for t in state.message_times if t > cutoff])
return {
"messages_last_minute": recent_count,
"limit": self.config.messages_per_minute,
"remaining": max(0, self.config.messages_per_minute - recent_count),
"is_vip": user_id in self.vip_nodes,
}
def get_global_stats(self) -> dict:
"""Get global rate limit stats."""
now = time.time()
cutoff = now - 60.0
recent_count = len([t for t in self._global_times if t > cutoff])
return {
"messages_last_minute": recent_count,
"limit": self.config.global_messages_per_minute,
"remaining": max(0, self.config.global_messages_per_minute - recent_count),
}
def reset_user(self, user_id: str) -> None:
"""Reset rate limit state for a user."""
if user_id in self._user_states:
del self._user_states[user_id]
def reset_all(self) -> None:
"""Reset all rate limit state."""
self._user_states.clear()
self._global_times.clear()

142
meshai/safety.py Normal file
View file

@ -0,0 +1,142 @@
"""Response filtering and safety for MeshAI."""
import re
from typing import Optional
from .config import SafetyConfig
# Basic profanity list (expand as needed)
PROFANITY_PATTERNS = [
r"\bf+u+c+k+\w*\b",
r"\bs+h+i+t+\w*\b",
r"\ba+s+s+h+o+l+e+\w*\b",
r"\bb+i+t+c+h+\w*\b",
r"\bc+u+n+t+\w*\b",
r"\bd+a+m+n+\w*\b",
]
class SafetyFilter:
"""Filter for response safety and content moderation."""
def __init__(self, config: SafetyConfig):
self.config = config
self._profanity_regex = None
if config.filter_profanity:
self._profanity_regex = re.compile(
"|".join(PROFANITY_PATTERNS), re.IGNORECASE
)
def filter_response(self, text: str) -> str:
"""Filter a response for safety.
Args:
text: The response text to filter
Returns:
Filtered text
"""
# Truncate to max length
if len(text) > self.config.max_response_length:
text = text[: self.config.max_response_length - 3] + "..."
# Filter profanity
if self._profanity_regex:
text = self._profanity_regex.sub("***", text)
# Filter blocked phrases
for phrase in self.config.blocked_phrases:
text = text.replace(phrase, "[filtered]")
text = text.replace(phrase.lower(), "[filtered]")
text = text.replace(phrase.upper(), "[filtered]")
return text
def should_respond(
self,
text: str,
sender_id: str,
own_id: str,
is_mentioned: bool,
is_dm: bool,
) -> tuple[bool, Optional[str]]:
"""Check if we should respond to this message.
Args:
text: Message text
sender_id: Sender's node ID
own_id: Our own node ID
is_mentioned: Whether our name is mentioned
is_dm: Whether this is a direct message
Returns:
Tuple of (should_respond, reason). Reason is None if we should respond.
"""
# Never respond to self
if self.config.ignore_self and sender_id == own_id:
return False, "Self message"
# Check for emergency keywords (always respond)
text_lower = text.lower()
for keyword in self.config.emergency_keywords:
if keyword.lower() in text_lower:
return True, None
# Check mention requirement
if self.config.require_mention and not is_mentioned and not is_dm:
return False, "Not mentioned"
return True, None
def contains_emergency(self, text: str) -> bool:
"""Check if text contains emergency keywords."""
text_lower = text.lower()
return any(kw.lower() in text_lower for kw in self.config.emergency_keywords)
class UserFilter:
"""Filter for user access control."""
def __init__(
self,
blocklist: list[str],
allowlist: list[str],
allowlist_only: bool,
admin_nodes: list[str],
):
self.blocklist = set(blocklist)
self.allowlist = set(allowlist)
self.allowlist_only = allowlist_only
self.admin_nodes = set(admin_nodes)
def is_allowed(self, user_id: str) -> tuple[bool, Optional[str]]:
"""Check if user is allowed to interact.
Args:
user_id: The user's node ID
Returns:
Tuple of (allowed, reason)
"""
# Check blocklist first
if user_id in self.blocklist:
return False, "User is blocked"
# If allowlist_only mode, check allowlist
if self.allowlist_only:
if user_id not in self.allowlist:
return False, "User not in allowlist"
return True, None
def is_admin(self, user_id: str) -> bool:
"""Check if user is an admin."""
return user_id in self.admin_nodes
def add_to_blocklist(self, user_id: str) -> None:
"""Add a user to the blocklist."""
self.blocklist.add(user_id)
def remove_from_blocklist(self, user_id: str) -> None:
"""Remove a user from the blocklist."""
self.blocklist.discard(user_id)

265
meshai/web_status.py Normal file
View file

@ -0,0 +1,265 @@
"""Simple web status page for MeshAI."""
import asyncio
import json
import logging
import time
from datetime import datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from typing import Callable, Optional
from .config import WebStatusConfig
logger = logging.getLogger(__name__)
class StatusData:
"""Container for status information."""
def __init__(self):
self.start_time = time.time()
self.message_count = 0
self.response_count = 0
self.error_count = 0
self.connected_nodes: set[str] = set()
self.recent_activity: list[dict] = []
self.last_message_time: Optional[float] = None
self.using_fallback = False
def record_message(self, sender_id: str, sender_name: str):
"""Record an incoming message."""
self.message_count += 1
self.last_message_time = time.time()
self.connected_nodes.add(sender_id)
self.recent_activity.append({
"type": "message",
"time": datetime.now().isoformat(),
"sender": sender_name,
})
# Keep only last 20 activities
self.recent_activity = self.recent_activity[-20:]
def record_response(self):
"""Record an outgoing response."""
self.response_count += 1
def record_error(self, error: str):
"""Record an error."""
self.error_count += 1
self.recent_activity.append({
"type": "error",
"time": datetime.now().isoformat(),
"error": error[:100],
})
self.recent_activity = self.recent_activity[-20:]
def get_uptime(self) -> str:
"""Get formatted uptime string."""
elapsed = int(time.time() - self.start_time)
days, remainder = divmod(elapsed, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
return " ".join(parts)
def to_dict(self, include_activity: bool = False) -> dict:
"""Convert to dictionary for JSON response."""
data = {
"status": "online",
"uptime": self.get_uptime(),
"uptime_seconds": int(time.time() - self.start_time),
"messages_received": self.message_count,
"responses_sent": self.response_count,
"errors": self.error_count,
"connected_nodes": len(self.connected_nodes),
"using_fallback": self.using_fallback,
}
if self.last_message_time:
data["last_message_ago"] = int(time.time() - self.last_message_time)
if include_activity:
data["recent_activity"] = self.recent_activity
return data
# Global status data instance
_status_data = StatusData()
def get_status_data() -> StatusData:
"""Get the global status data instance."""
return _status_data
class StatusRequestHandler(BaseHTTPRequestHandler):
"""HTTP request handler for status page."""
config: WebStatusConfig = None
def log_message(self, format, *args):
"""Suppress default logging."""
pass
def do_GET(self):
"""Handle GET requests."""
if self.path == "/" or self.path == "/status":
self._serve_status_page()
elif self.path == "/api/status":
self._serve_json_status()
elif self.path == "/health":
self._serve_health()
else:
self.send_error(404)
def _check_auth(self) -> bool:
"""Check authentication if required."""
if not self.config or not self.config.require_auth:
return True
auth_header = self.headers.get("Authorization", "")
if auth_header.startswith("Basic "):
import base64
try:
decoded = base64.b64decode(auth_header[6:]).decode()
_, password = decoded.split(":", 1)
return password == self.config.auth_password
except Exception:
pass
return False
def _serve_status_page(self):
"""Serve HTML status page."""
if not self._check_auth():
self.send_response(401)
self.send_header("WWW-Authenticate", 'Basic realm="MeshAI Status"')
self.end_headers()
return
status = _status_data.to_dict(
include_activity=self.config.show_recent_activity if self.config else False
)
html = f"""<!DOCTYPE html>
<html>
<head>
<title>MeshAI Status</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, monospace;
background: #0d1117;
color: #c9d1d9;
margin: 0;
padding: 20px;
}}
.container {{ max-width: 600px; margin: 0 auto; }}
h1 {{ color: #58a6ff; border-bottom: 1px solid #30363d; padding-bottom: 10px; }}
.stat {{
display: flex;
justify-content: space-between;
padding: 10px 0;
border-bottom: 1px solid #21262d;
}}
.stat-label {{ color: #8b949e; }}
.stat-value {{ color: #58a6ff; font-weight: bold; }}
.status-online {{ color: #3fb950; }}
.status-fallback {{ color: #d29922; }}
.footer {{ margin-top: 20px; color: #484f58; font-size: 12px; }}
</style>
</head>
<body>
<div class="container">
<h1>MeshAI Status</h1>
<div class="stat">
<span class="stat-label">Status</span>
<span class="stat-value {'status-fallback' if status.get('using_fallback') else 'status-online'}">
{'ONLINE (Fallback)' if status.get('using_fallback') else 'ONLINE'}
</span>
</div>
{'<div class="stat"><span class="stat-label">Uptime</span><span class="stat-value">' + status["uptime"] + '</span></div>' if self.config and self.config.show_uptime else ''}
{'<div class="stat"><span class="stat-label">Messages</span><span class="stat-value">' + str(status["messages_received"]) + '</span></div>' if self.config and self.config.show_message_count else ''}
{'<div class="stat"><span class="stat-label">Responses</span><span class="stat-value">' + str(status["responses_sent"]) + '</span></div>' if self.config and self.config.show_message_count else ''}
{'<div class="stat"><span class="stat-label">Connected Nodes</span><span class="stat-value">' + str(status["connected_nodes"]) + '</span></div>' if self.config and self.config.show_connected_nodes else ''}
<div class="stat">
<span class="stat-label">Errors</span>
<span class="stat-value">{status["errors"]}</span>
</div>
<div class="footer">Auto-refresh in 30s</div>
</div>
<script>setTimeout(() => location.reload(), 30000);</script>
</body>
</html>"""
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(html.encode())
def _serve_json_status(self):
"""Serve JSON status."""
if not self._check_auth():
self.send_response(401)
self.end_headers()
return
status = _status_data.to_dict(
include_activity=self.config.show_recent_activity if self.config else False
)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(status, indent=2).encode())
def _serve_health(self):
"""Serve simple health check."""
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
class WebStatusServer:
"""Web status server manager."""
def __init__(self, config: WebStatusConfig):
self.config = config
self._server: Optional[HTTPServer] = None
self._thread: Optional[Thread] = None
def start(self):
"""Start the web status server."""
if not self.config.enabled:
return
StatusRequestHandler.config = self.config
try:
self._server = HTTPServer(("0.0.0.0", self.config.port), StatusRequestHandler)
self._thread = Thread(target=self._server.serve_forever, daemon=True)
self._thread.start()
logger.info(f"Web status server started on port {self.config.port}")
except Exception as e:
logger.error(f"Failed to start web status server: {e}")
def stop(self):
"""Stop the web status server."""
if self._server:
self._server.shutdown()
self._server = None
self._thread = None
logger.info("Web status server stopped")

176
meshai/webhook.py Normal file
View file

@ -0,0 +1,176 @@
"""Webhook integration for MeshAI."""
import asyncio
import json
import logging
from datetime import datetime
from typing import Any, Optional
import httpx
from .config import WebhookConfig
logger = logging.getLogger(__name__)
class WebhookClient:
"""Client for sending webhook notifications."""
def __init__(self, config: WebhookConfig):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
self._queue: asyncio.Queue = asyncio.Queue()
self._task: Optional[asyncio.Task] = None
async def start(self):
"""Start the webhook client."""
if not self.config.enabled or not self.config.url:
logger.debug("Webhooks disabled or no URL configured")
return
self._client = httpx.AsyncClient(timeout=10.0)
self._task = asyncio.create_task(self._process_queue())
logger.info(f"Webhook client started: {self.config.url}")
async def stop(self):
"""Stop the webhook client."""
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
if self._client:
await self._client.aclose()
self._client = None
logger.info("Webhook client stopped")
def _should_send(self, event_type: str) -> bool:
"""Check if this event type should be sent."""
if not self.config.enabled:
return False
return event_type in self.config.events
async def send_event(
self,
event_type: str,
data: dict[str, Any],
immediate: bool = False,
):
"""Send a webhook event.
Args:
event_type: Type of event (message_received, response_sent, error)
data: Event data
immediate: If True, send immediately instead of queuing
"""
if not self._should_send(event_type):
return
payload = {
"event": event_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
"data": data,
}
if immediate:
await self._send_payload(payload)
else:
await self._queue.put(payload)
async def _process_queue(self):
"""Process queued webhook payloads."""
while True:
try:
payload = await self._queue.get()
await self._send_payload(payload)
self._queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error processing webhook queue: {e}")
async def _send_payload(self, payload: dict):
"""Send a webhook payload."""
if not self._client:
return
try:
response = await self._client.post(
self.config.url,
json=payload,
headers={"Content-Type": "application/json"},
)
if response.status_code >= 400:
logger.warning(
f"Webhook returned {response.status_code}: {response.text[:100]}"
)
except Exception as e:
logger.error(f"Failed to send webhook: {e}")
# Convenience methods for common events
async def on_message_received(
self,
sender_id: str,
sender_name: str,
text: str,
channel: int,
is_dm: bool,
):
"""Send message_received event."""
await self.send_event(
"message_received",
{
"sender_id": sender_id,
"sender_name": sender_name,
"text": text,
"channel": channel,
"is_dm": is_dm,
},
)
async def on_response_sent(
self,
recipient_id: Optional[str],
text: str,
channel: int,
):
"""Send response_sent event."""
await self.send_event(
"response_sent",
{
"recipient_id": recipient_id,
"text": text,
"channel": channel,
},
)
async def on_error(self, error: str, context: Optional[dict] = None):
"""Send error event."""
await self.send_event(
"error",
{
"error": error,
"context": context or {},
},
)
async def on_startup(self):
"""Send startup event."""
await self.send_event(
"startup",
{"message": "MeshAI started"},
immediate=True,
)
async def on_shutdown(self):
"""Send shutdown event."""
await self.send_event(
"shutdown",
{"message": "MeshAI stopping"},
immediate=True,
)