feat: Dynamic identity system prompt from bot config

- Build system prompt dynamically using bot.name and bot.owner from config
- Reorder prompt: identity -> static prompt -> MeshMonitor (conditional) -> mesh context
- MeshMonitor description only injected when meshmonitor.enabled is true
- Update default system_prompt to static parts only (commands, architecture, rules)
- Fix meshmonitor.py to handle trigger arrays (not just strings)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
root 2026-05-03 05:45:58 +00:00
commit 5f66b69c9c
5 changed files with 296 additions and 298 deletions

View file

@ -1,179 +1,171 @@
"""Dynamic MeshMonitor trigger sync for MeshAI.
Reads MeshMonitor auto-responder triggers from a JSON file and converts
them to compiled regex patterns. The router uses these patterns to ignore
messages that MeshMonitor will handle.
Trigger file format (triggers.json):
{"triggers": ["weather {location}", "trivia", "ask {question}"]}
Or simple list:
["weather {location}", "trivia", "ask {question}"]
"""
import json
import logging
import re
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
def _trigger_to_regex(trigger: str) -> re.Pattern:
"""Convert a MeshMonitor trigger pattern to a compiled regex.
MeshMonitor trigger format:
- "weather {location}" -> matches "weather miami"
- "w {city},{state}" -> matches "w parkland,fl"
- "trivia" -> matches "trivia" exactly
- "ask {question}" -> matches "ask what is mesh"
- "{name:regex}" -> custom regex per param
Args:
trigger: MeshMonitor trigger pattern string
Returns:
Compiled regex pattern (case insensitive)
"""
pattern = trigger.strip()
# Temporarily replace {param} and {param:regex} blocks
param_blocks = []
def _save_param(m):
param_blocks.append(m.group(0))
return f"__PARAM_{len(param_blocks) - 1}__"
pattern = re.sub(r"\{[^}]+\}", _save_param, pattern)
# Escape remaining special chars
pattern = re.escape(pattern)
# Restore param blocks as regex groups
for i, block in enumerate(param_blocks):
placeholder = re.escape(f"__PARAM_{i}__")
# Check for custom regex: {name:regex}
custom_match = re.match(r"\{(\w+):(.+)\}", block)
if custom_match:
_name, custom_regex = custom_match.groups()
replacement = f"({custom_regex})"
else:
# Default: match one or more characters
replacement = r"(.+)"
pattern = pattern.replace(placeholder, replacement)
return re.compile(f"^{pattern}$", re.IGNORECASE)
class MeshMonitorSync:
"""Syncs MeshMonitor auto-responder triggers for MeshAI to ignore.
Reads trigger patterns from a JSON file and compiles them to regex.
Watches the file mtime so edits are picked up without restart.
"""
def __init__(self, triggers_file: str):
self._triggers_file = Path(triggers_file)
self._patterns: list[re.Pattern] = []
self._raw_triggers: list[str] = []
self._file_mtime: float = 0.0
@property
def trigger_list(self) -> list[str]:
"""Get raw trigger strings (for system prompt injection)."""
return self._raw_triggers
def load(self) -> int:
"""Load triggers from JSON file.
Returns:
Number of trigger patterns loaded
"""
if not self._triggers_file.exists():
logger.warning(f"Triggers file not found: {self._triggers_file}")
return 0
try:
self._file_mtime = self._triggers_file.stat().st_mtime
with open(self._triggers_file) as f:
data = json.load(f)
if isinstance(data, list):
raw_triggers = data
elif isinstance(data, dict):
raw_triggers = data.get("triggers", [])
else:
logger.warning(f"Unexpected triggers file format: {type(data)}")
return 0
self._raw_triggers = raw_triggers
self._compile_patterns(raw_triggers)
logger.info(
f"Loaded {len(self._patterns)} MeshMonitor trigger patterns "
f"from {self._triggers_file}"
)
return len(self._patterns)
except Exception as e:
logger.error(f"Failed to load triggers file: {e}")
return 0
def maybe_refresh(self) -> bool:
"""Reload triggers if the file has changed on disk.
Returns:
True if triggers were refreshed
"""
if not self._triggers_file.exists():
return False
mtime = self._triggers_file.stat().st_mtime
if mtime > self._file_mtime:
self.load()
return True
return False
def matches(self, text: str) -> bool:
"""Check if text matches any MeshMonitor trigger pattern.
Args:
text: Incoming message text (stripped)
Returns:
True if the message matches a MeshMonitor trigger
"""
for pattern in self._patterns:
if pattern.match(text):
logger.debug(
f"Message matches MeshMonitor trigger: "
f"{text[:40]}... -> {pattern.pattern}"
)
return True
return False
def _compile_patterns(self, raw_triggers: list[str]) -> None:
"""Compile trigger strings into regex patterns.
Handles comma-separated multi-patterns per trigger.
"""
patterns = []
for trigger in raw_triggers:
# Split multi-pattern triggers on comma
sub_patterns = [t.strip() for t in trigger.split(",")]
for sub in sub_patterns:
if not sub:
continue
try:
compiled = _trigger_to_regex(sub)
patterns.append(compiled)
except Exception as e:
logger.warning(
f"Failed to compile trigger pattern: {e}"
)
self._patterns = patterns
"""MeshMonitor trigger sync via HTTP."""
import json
import logging
import re
import time
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
logger = logging.getLogger(__name__)
def _trigger_to_regex(trigger: str) -> re.Pattern:
"""Convert MeshMonitor trigger pattern to compiled regex.
MeshMonitor patterns:
- !weather {location:.+} -> !weather (.+)
- !ping -> !ping
- !status -> !status
Args:
trigger: MeshMonitor trigger pattern
Returns:
Compiled regex pattern
"""
# Extract just the command part (before any {param} placeholders)
# Replace {name:pattern} with just the pattern for matching
pattern = re.sub(r"\{[^:}]+:([^}]+)\}", r"(\1)", trigger)
# Replace {name} (no pattern) with (.+)
pattern = re.sub(r"\{[^}]+\}", r"(.+)", pattern)
# Escape regex special chars except what we just inserted
# Actually, we need to be careful - just anchor it
pattern = "^" + pattern + "$"
return re.compile(pattern, re.IGNORECASE)
class MeshMonitorSync:
"""Sync auto-responder triggers from MeshMonitor HTTP API."""
def __init__(self, url: str, refresh_interval: int = 300):
"""Initialize MeshMonitor sync.
Args:
url: Base URL of MeshMonitor (e.g., http://100.64.0.11:3333)
refresh_interval: Seconds between refresh checks (default 5 minutes)
"""
self._url = url.rstrip("/")
self._refresh_interval = refresh_interval
self._patterns: list[re.Pattern] = []
self._raw_triggers: list[str] = []
self._last_refresh: float = 0.0
self._last_error: Optional[str] = None
@property
def raw_triggers(self) -> list[str]:
"""Get raw trigger patterns (for display)."""
return list(self._raw_triggers)
@property
def last_error(self) -> Optional[str]:
"""Get last error message if any."""
return self._last_error
def load(self) -> int:
"""Fetch triggers from MeshMonitor API.
Returns:
Number of triggers loaded
"""
endpoint = f"{self._url}/api/settings"
try:
req = Request(endpoint, headers={"Accept": "application/json"})
with urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
# autoResponderTriggers is a JSON string inside the response
triggers_json = data.get("autoResponderTriggers", "[]")
if isinstance(triggers_json, str):
triggers = json.loads(triggers_json)
else:
triggers = triggers_json
# Extract trigger patterns
self._raw_triggers = []
self._patterns = []
for item in triggers:
if isinstance(item, dict):
trigger_value = item.get("trigger", "")
else:
trigger_value = item
# Handle both string and list of strings
if isinstance(trigger_value, list):
trigger_list = trigger_value
elif isinstance(trigger_value, str) and trigger_value:
trigger_list = [trigger_value]
else:
continue
for trigger in trigger_list:
if not trigger:
continue
self._raw_triggers.append(trigger)
try:
self._patterns.append(_trigger_to_regex(trigger))
except re.error as e:
logger.warning(f"Invalid trigger pattern '{trigger}': {e}")
self._last_refresh = time.time()
self._last_error = None
logger.info(f"Loaded {len(self._patterns)} MeshMonitor triggers")
return len(self._patterns)
except HTTPError as e:
self._last_error = f"HTTP {e.code}: {e.reason}"
logger.error(f"Failed to fetch MeshMonitor triggers: {self._last_error}")
return 0
except URLError as e:
self._last_error = f"Connection error: {e.reason}"
logger.error(f"Failed to fetch MeshMonitor triggers: {self._last_error}")
return 0
except json.JSONDecodeError as e:
self._last_error = f"Invalid JSON: {e}"
logger.error(f"Failed to parse MeshMonitor response: {self._last_error}")
return 0
except Exception as e:
self._last_error = str(e)
logger.error(f"Failed to fetch MeshMonitor triggers: {self._last_error}")
return 0
def maybe_refresh(self) -> bool:
"""Refresh triggers if interval has passed.
Returns:
True if refresh was performed
"""
if time.time() - self._last_refresh >= self._refresh_interval:
self.load()
return True
return False
def matches(self, text: str) -> bool:
"""Check if text matches any MeshMonitor trigger.
Args:
text: Message text to check
Returns:
True if MeshMonitor will handle this message
"""
text = text.strip()
for pattern in self._patterns:
if pattern.match(text):
return True
return False
def get_commands_summary(self) -> str:
"""Get a summary of MeshMonitor commands for prompt injection.
Returns:
Human-readable summary of available commands
"""
if not self._raw_triggers:
return ""
lines = ["MeshMonitor handles these commands (do not respond to them):"]
for trigger in self._raw_triggers:
lines.append(f" - {trigger}")
return "\n".join(lines)