Add HTML escaping to status page and prompt injection guard to router

- 5a: Import html.escape and apply to all values rendered into the
  HTML template in _serve_status_page() — uptime, counts, status text,
  node counts, errors. Prevents XSS via crafted node names or errors.
- 5b: Add basic prompt injection detection to _clean_query() with
  configurable safety.prompt_injection_guard (default: on). Detects
  patterns like "ignore all previous", "you are now", "system prompt:",
  etc. Truncates query before the injection phrase and logs a warning.
  Not foolproof but better than nothing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-02-23 20:17:46 +00:00
commit 32cd2b3427
3 changed files with 59 additions and 10 deletions

View file

@ -64,6 +64,7 @@ class SafetyConfig:
emergency_keywords: list[str] = field(
default_factory=lambda: ["emergency", "help", "sos"]
) # Always respond to these
prompt_injection_guard: bool = True # Basic prompt injection detection
@dataclass

View file

@ -33,6 +33,17 @@ class RouteResult:
query: Optional[str] = None # For LLM, the cleaned query
# Patterns that suggest prompt injection attempts
_INJECTION_PATTERNS = [
re.compile(r"ignore\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"ignore\s+your\s+instructions", re.IGNORECASE),
re.compile(r"disregard\s+(all\s+)?previous", re.IGNORECASE),
re.compile(r"you\s+are\s+now\b", re.IGNORECASE),
re.compile(r"new\s+instructions?\s*:", re.IGNORECASE),
re.compile(r"system\s*prompt\s*:", re.IGNORECASE),
]
class MessageRouter:
"""Routes incoming messages to appropriate handlers."""
@ -186,12 +197,28 @@ class MessageRouter:
logger.debug(f"Persisted summary for {user_id}")
def _clean_query(self, text: str) -> str:
"""Remove @mention from query text."""
"""Remove @mention and check for prompt injection."""
# Remove @botname mention
cleaned = self._mention_pattern.sub("", text)
# Clean up extra whitespace
cleaned = " ".join(cleaned.split())
return cleaned.strip()
cleaned = cleaned.strip()
# Check for prompt injection if guard is enabled
if self.config.safety.prompt_injection_guard:
for pattern in _INJECTION_PATTERNS:
if pattern.search(cleaned):
logger.warning(
f"Possible prompt injection detected: {cleaned[:80]}..."
)
# Truncate to just the part before the injection pattern
match = pattern.search(cleaned)
cleaned = cleaned[:match.start()].strip()
if not cleaned:
cleaned = "Hello"
break
return cleaned
def _make_command_context(self, message: MeshMessage) -> CommandContext:
"""Create command context from message."""

View file

@ -1,6 +1,7 @@
"""Simple web status page for MeshAI."""
import asyncio
import html as html_module
import json
import logging
import threading
@ -159,6 +160,31 @@ class StatusRequestHandler(BaseHTTPRequestHandler):
include_activity=self.config.show_recent_activity if self.config else False
)
esc = html_module.escape
# Build optional stat rows
rows = ""
if self.config and self.config.show_uptime:
rows += (
'<div class="stat"><span class="stat-label">Uptime</span>'
f'<span class="stat-value">{esc(str(status["uptime"]))}</span></div>'
)
if self.config and self.config.show_message_count:
rows += (
'<div class="stat"><span class="stat-label">Messages</span>'
f'<span class="stat-value">{esc(str(status["messages_received"]))}</span></div>'
'<div class="stat"><span class="stat-label">Responses</span>'
f'<span class="stat-value">{esc(str(status["responses_sent"]))}</span></div>'
)
if self.config and self.config.show_connected_nodes:
rows += (
'<div class="stat"><span class="stat-label">Connected Nodes</span>'
f'<span class="stat-value">{esc(str(status["connected_nodes"]))}</span></div>'
)
status_class = "status-fallback" if status.get("using_fallback") else "status-online"
status_text = "ONLINE (Fallback)" if status.get("using_fallback") else "ONLINE"
html = f"""<!DOCTYPE html>
<html>
<head>
@ -192,17 +218,12 @@ class StatusRequestHandler(BaseHTTPRequestHandler):
<h1>MeshAI Status</h1>
<div class="stat">
<span class="stat-label">Status</span>
<span class="stat-value {'status-fallback' if status.get('using_fallback') else 'status-online'}">
{'ONLINE (Fallback)' if status.get('using_fallback') else 'ONLINE'}
</span>
<span class="stat-value {esc(status_class)}">{esc(status_text)}</span>
</div>
{'<div class="stat"><span class="stat-label">Uptime</span><span class="stat-value">' + status["uptime"] + '</span></div>' if self.config and self.config.show_uptime else ''}
{'<div class="stat"><span class="stat-label">Messages</span><span class="stat-value">' + str(status["messages_received"]) + '</span></div>' if self.config and self.config.show_message_count else ''}
{'<div class="stat"><span class="stat-label">Responses</span><span class="stat-value">' + str(status["responses_sent"]) + '</span></div>' if self.config and self.config.show_message_count else ''}
{'<div class="stat"><span class="stat-label">Connected Nodes</span><span class="stat-value">' + str(status["connected_nodes"]) + '</span></div>' if self.config and self.config.show_connected_nodes else ''}
{rows}
<div class="stat">
<span class="stat-label">Errors</span>
<span class="stat-value">{status["errors"]}</span>
<span class="stat-value">{esc(str(status["errors"]))}</span>
</div>
<div class="footer">Auto-refresh in 30s</div>
</div>