feat: Hybrid RAG knowledge base, sentence-aware chunking, MeshMonitor HTTP sync

Knowledge Base:
- Hybrid FTS5 + vector search using sqlite-vec and bge-small-en-v1.5
- Reciprocal Rank Fusion for result merging
- Domain-aware query construction handles typos
- Configurable weights for keyword vs semantic matching

Message Chunking:
- Sentence-aware splitting respects message boundaries
- Continuation prompts for long responses
- Natural follow-up detection (yes, ok, continue, more, etc.)
- Per-user continuation state management

MeshMonitor Integration:
- HTTP API trigger sync (replaces file-based triggers.json)
- Dynamic refresh interval
- Trigger injection into LLM prompt

Other:
- Updated system prompt for better response length control
- Simplified responder to handle message lists
- Updated README with new features and architecture diagram
- Cleaned up config.example.yaml

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
root 2026-05-04 07:44:12 +00:00
commit 0e36869a5f
14 changed files with 986 additions and 464 deletions

View file

@ -58,6 +58,8 @@ WORKDIR /app
# Copy requirements first for layer caching # Copy requirements first for layer caching
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Pre-download embedding model for hybrid search
RUN python3 -c "from fastembed import TextEmbedding; TextEmbedding('BAAI/bge-small-en-v1.5')"
# Copy application code # Copy application code
COPY --chown=meshai:meshai meshai/ ./meshai/ COPY --chown=meshai:meshai meshai/ ./meshai/

109
README.md
View file

@ -6,9 +6,10 @@ LLM-powered assistant for Meshtastic mesh networks.
- **LLM Chat**: Responds to @mentions and DMs with AI-generated responses - **LLM Chat**: Responds to @mentions and DMs with AI-generated responses
- **Multi-backend**: Supports OpenAI, Anthropic Claude, Google Gemini, and local LLMs via LiteLLM - **Multi-backend**: Supports OpenAI, Anthropic Claude, Google Gemini, and local LLMs via LiteLLM
- **Knowledge Base (RAG)**: Hybrid FTS5 + vector search over Meshtastic documentation
- **Message Chunking**: Sentence-aware splitting with continuation prompts for long responses
- **Bang Commands**: `!help`, `!ping`, `!reset`, `!status`, `!weather` - **Bang Commands**: `!help`, `!ping`, `!reset`, `!status`, `!weather`
- **Conversation History**: Per-user context maintained in SQLite - **Conversation History**: Per-user context maintained in SQLite
- **Smart Chunking**: Automatically splits long responses for mesh transmission
- **Rate Limiting**: Configurable delays to avoid flooding the mesh - **Rate Limiting**: Configurable delays to avoid flooding the mesh
- **advBBS Compatible**: Runs alongside [advBBS](https://github.com/NovaNexusMesh/advBBS) on the same node — protocol sync messages and mail notifications are automatically filtered - **advBBS Compatible**: Runs alongside [advBBS](https://github.com/NovaNexusMesh/advBBS) on the same node — protocol sync messages and mail notifications are automatically filtered
- **Rich Configurator**: Interactive TUI for easy setup - **Rich Configurator**: Interactive TUI for easy setup
@ -105,28 +106,80 @@ DM: Tell me a short joke
## Architecture ## Architecture
``` ```
┌─────────────────────────────────────────────────────────────┐ ┌──────────────────────────────────────────────────────────────────┐
│ MeshAI │ │ MeshAI │
├─────────────────────────────────────────────────────────────┤ ├──────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ ┌─────────────┐ ┌─────────────┐ ┌──────────────────────┐ │
│ │ Meshtastic │ │ Message │ │ LLM Backend │ │ │ │ Meshtastic │ │ Message │ │ LLM Backend │ │
│ │ Connector │───▶│ Router │───▶│ (pluggable) │ │ │ │ Connector │───▶│ Router │───▶│ (pluggable) │ │
│ │ Serial/TCP │ │ │ │ │ │ │ │ Serial/TCP │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────────┘ │ │ └─────────────┘ └──────┬──────┘ └──────────────────────┘ │
│ │ │ │ │ │ │ │ │ │
│ │ ┌─────▼─────┐ │ │ │ │ ┌──────▼──────┐ │ │
│ │ │ Conversation│ │ │ │ │ │ Conversation│ │ │
│ │ │ History │◀────────────┘ │ │ │ │ History │◀─────────────┘ │
│ │ │ (SQLite) │ │ │ │ │ (SQLite) │ │
│ │ └───────────┘ │ │ │ └─────────────┘ │
│ │ │ │ │ │ │
│ ▼ │ │ │ ┌──────▼──────┐ ┌──────────────────────┐ │
│ ┌─────────────┐ │ │ │ │ Knowledge │───▶│ Hybrid FTS5+Vector │ │
│ │ Responder │ - 2.2-3s delay │ │ │ │ Base │ │ (sqlite-vec + BGE) │ │
│ │ │ - Chunk to 150 chars │ │ │ └─────────────┘ └──────────────────────┘ │
│ │ │ - Max 2 messages │ │ │ │
│ └─────────────┘ │ │ ▼ │
└─────────────────────────────────────────────────────────────┘ │ ┌─────────────┐ ┌─────────────┐ │
│ │ Responder │───▶│ Chunker │ Sentence-aware splitting │
│ │ │ │ │ + continuation prompts │
│ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────────────┘
```
## Knowledge Base (RAG)
MeshAI can answer questions using a local knowledge base built from Meshtastic documentation. The system uses hybrid search combining:
- **FTS5 keyword search** — fast exact term matching with domain-aware query construction
- **Vector embeddings** — semantic similarity using `bge-small-en-v1.5` (384 dimensions)
- **Reciprocal Rank Fusion** — merges results from both methods for best relevance
**Building the knowledge base:**
```bash
# Extract from Meshtastic ZIM file
python scripts/zim_to_knowledge.py meshtastic.zim --output knowledge.db
# Or from markdown files
python scripts/md_to_knowledge.py docs/ --output knowledge.db
```
**Configuration:**
```yaml
knowledge:
enabled: true
db_path: /data/meshai_knowledge.db
top_k: 5 # Number of chunks to retrieve
fts_weight: 0.5 # Weight for keyword matches (0-1)
vector_weight: 0.5 # Weight for semantic matches (0-1)
```
The knowledge base requires `sqlite-vec` and `fastembed` (installed automatically with requirements.txt).
## Message Chunking
Long LLM responses are automatically split into mesh-friendly chunks:
- **Sentence-aware** — never splits a sentence across messages
- **Configurable limits** — max characters per message, max messages per response
- **Continuation prompts** — if content remains, asks "Want me to keep going?"
- **Natural follow-ups** — responds to "yes", "ok", "continue", "more", etc.
**Configuration:**
```yaml
response:
max_length: 200 # Max chars per message
max_messages: 3 # Messages before continuation prompt
``` ```
## Docker ## Docker
@ -214,7 +267,7 @@ Plain-text BBS responses (e.g. "Welcome back, matt!") are indistinguishable from
MeshAI integrates with [MeshMonitor](https://github.com/Yeraze/meshmonitor), a comprehensive Meshtastic monitoring platform by Yeraze. When enabled, MeshAI automatically fetches MeshMonitor's auto-responder trigger patterns and ignores messages that MeshMonitor handles, preventing duplicate responses on the mesh. MeshAI integrates with [MeshMonitor](https://github.com/Yeraze/meshmonitor), a comprehensive Meshtastic monitoring platform by Yeraze. When enabled, MeshAI automatically fetches MeshMonitor's auto-responder trigger patterns and ignores messages that MeshMonitor handles, preventing duplicate responses on the mesh.
**Features:** **Features:**
- Automatic trigger discovery via MeshMonitor's API - Automatic trigger discovery via MeshMonitor's HTTP API
- Dynamic ignore list — no manual sync needed - Dynamic ignore list — no manual sync needed
- Trigger list injected into the LLM prompt so MeshAI can discuss MeshMonitor commands conversationally - Trigger list injected into the LLM prompt so MeshAI can discuss MeshMonitor commands conversationally
- Configurable via TUI (option 9) or config.yaml - Configurable via TUI (option 9) or config.yaml
@ -259,6 +312,14 @@ sudo systemctl enable meshai
sudo systemctl start meshai sudo systemctl start meshai
``` ```
## Acknowledgments
- [Meshtastic](https://meshtastic.org/) — the mesh networking platform
- [MeshMonitor](https://github.com/Yeraze/meshmonitor) by Yeraze — monitoring integration
- [advBBS](https://github.com/NovaNexusMesh/advBBS) by NovaNexusMesh — BBS coexistence design
- [sqlite-vec](https://github.com/asg017/sqlite-vec) by Alex Garcia — vector search in SQLite
- [fastembed](https://github.com/qdrant/fastembed) by Qdrant — fast local embeddings
## License ## License
MIT License MIT License

View file

@ -22,8 +22,8 @@ connection:
response: response:
delay_min: 2.2 # Min delay before responding (seconds) delay_min: 2.2 # Min delay before responding (seconds)
delay_max: 3.0 # Max delay before responding delay_max: 3.0 # Max delay before responding
max_length: 150 # Max chars per message chunk max_length: 200 # Max chars per message chunk
max_messages: 2 # Max message chunks per response max_messages: 3 # Max message chunks per response
# === CONVERSATION HISTORY === # === CONVERSATION HISTORY ===
history: history:
@ -57,9 +57,10 @@ llm:
timeout: 30 # Request timeout (seconds) timeout: 30 # Request timeout (seconds)
system_prompt: >- system_prompt: >-
You are a helpful assistant on a Meshtastic mesh network. You are a helpful assistant on a Meshtastic mesh network.
Keep responses VERY brief - under 250 characters total. Keep responses very brief - 1-2 short sentences, under 300 characters.
Only give longer answers if the user explicitly asks for detail or explanation.
Be concise but friendly. No markdown formatting. Be concise but friendly. No markdown formatting.
google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries) google_grounding: false # Enable Google Search grounding (Gemini only, $35/1k queries)
# === WEATHER === # === WEATHER ===
weather: weather:
@ -69,6 +70,15 @@ weather:
# === MESHMONITOR INTEGRATION === # === MESHMONITOR INTEGRATION ===
meshmonitor: meshmonitor:
enabled: false enabled: false # Enable MeshMonitor trigger sync
triggers_file: /data/triggers.json url: "" # MeshMonitor web UI URL (e.g. http://192.168.1.100:8080)
inject_into_prompt: true inject_into_prompt: true # Include trigger list in LLM prompt
refresh_interval: 300 # Seconds between trigger refreshes
# === KNOWLEDGE BASE (RAG) ===
knowledge:
enabled: false # Enable knowledge base search
db_path: "" # Path to knowledge SQLite database
top_k: 5 # Number of chunks to retrieve per query
fts_weight: 0.5 # Weight for FTS5 keyword matches (0-1)
vector_weight: 0.5 # Weight for vector semantic matches (0-1)

View file

@ -14,12 +14,12 @@
services: services:
meshai: meshai:
# Pull from GitHub Container Registry # Pull from GitHub Container Registry
image: ghcr.io/zvx-echo6/meshai:latest # image: ghcr.io/zvx-echo6/meshai:latest
# Uncomment to build locally instead of pulling # Uncomment to build locally instead of pulling
# build: build:
# context: . context: .
# dockerfile: Dockerfile dockerfile: Dockerfile
# args: # args:
# UID: ${UID:-1000} # UID: ${UID:-1000}
# GID: ${GID:-1000} # GID: ${GID:-1000}
@ -52,7 +52,7 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
memory: 256M memory: 3G
reservations: reservations:
memory: 64M memory: 64M
@ -69,6 +69,7 @@ services:
max-size: "10m" max-size: "10m"
max-file: "3" max-file: "3"
volumes: volumes:
meshai_data: meshai_data:
name: meshai_data name: meshai_data

View file

@ -64,16 +64,11 @@ llm:
meshmonitor: meshmonitor:
enabled: false enabled: false
triggers_file: /data/triggers.json
inject_into_prompt: true inject_into_prompt: true
EOF EOF
echo "Default config created. Configure via http://localhost:7682" echo "Default config created. Configure via http://localhost:7682"
fi fi
# Create triggers.json if missing
if [ ! -f "/data/triggers.json" ]; then
echo '{"triggers": []}' > /data/triggers.json
fi
# Start ttyd for web-based config access # Start ttyd for web-based config access
echo "Starting web config interface on port 7682..." echo "Starting web config interface on port 7682..."
@ -111,7 +106,7 @@ kill_bot() {
echo "Starting MeshAI..." echo "Starting MeshAI..."
rm -f /tmp/meshai_restart rm -f /tmp/meshai_restart
while true; do while true; do
python -m meshai --config-file "$MESHAI_CONFIG" & python -m meshai -v --config-file "$MESHAI_CONFIG" &
BOT_PID=$! BOT_PID=$!
echo "$BOT_PID" > /tmp/meshai.pid echo "$BOT_PID" > /tmp/meshai.pid
echo "Bot started (PID $BOT_PID)" echo "Bot started (PID $BOT_PID)"

182
meshai/chunker.py Normal file
View file

@ -0,0 +1,182 @@
"""Sentence-aware message chunker for Meshtastic's character limits.
Splits LLM responses into messages that:
- Never exceed max_chars per message (default 200)
- Never split a sentence across messages
- Send at most max_messages per response (default 3)
- If more content remains, replace the last sentence with a continuation prompt
- Support up to max_continuations follow-ups (default 3)
"""
import logging
import re
logger = logging.getLogger(__name__)
# Phrases that trigger continuation of a previous response
CONTINUE_PHRASES = {
"yes", "yeah", "yep", "yea", "sure", "ok", "okay", "go on",
"keep going", "continue", "more", "go ahead", "tell me more",
"yes please", "y",
}
CONTINUATION_PROMPT = "Want me to keep going?"
def split_sentences(text: str) -> list[str]:
"""Split text into sentences, preserving abbreviations and decimals."""
# Split on . ! ? followed by space or end of string
# But not on decimals (4.8) or common abbreviations (e.g. Dr. Mr. etc.)
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
# Filter empty strings
return [s.strip() for s in sentences if s.strip()]
def chunk_response(
text: str,
max_chars: int = 200,
max_messages: int = 3,
) -> tuple[list[str], str]:
"""Split a response into sentence-aligned messages.
Args:
text: Full LLM response text
max_chars: Maximum characters per message
max_messages: Maximum messages to send before prompting
Returns:
Tuple of (messages_to_send, remaining_text)
If remaining_text is non-empty, the last message includes
a continuation prompt.
"""
sentences = split_sentences(text)
if not sentences:
return [text[:max_chars]], ""
messages = []
current_msg = []
current_len = 0
sentence_idx = 0
while sentence_idx < len(sentences) and len(messages) < max_messages:
sentence = sentences[sentence_idx]
# Would this sentence fit in the current message?
added_len = len(sentence) + (1 if current_msg else 0) # +1 for space
if current_len + added_len <= max_chars:
current_msg.append(sentence)
current_len += added_len
sentence_idx += 1
else:
# Sentence doesn't fit
if current_msg:
# Flush current message, start new one with this sentence
messages.append(" ".join(current_msg))
current_msg = []
current_len = 0
# Don't increment sentence_idx — retry this sentence in next message
else:
# Single sentence exceeds max_chars — truncate it
messages.append(sentence[:max_chars])
sentence_idx += 1
# Flush any remaining buffered message
if current_msg and len(messages) < max_messages:
messages.append(" ".join(current_msg))
# Determine remaining text
remaining_sentences = sentences[sentence_idx:]
# Also include any sentence that was in current_msg but didn't get flushed
# because we hit max_messages
if current_msg and len(messages) >= max_messages:
remaining_sentences = [" ".join(current_msg)] + remaining_sentences
remaining = " ".join(remaining_sentences)
# If there's remaining content, replace the end of the last message
# with a continuation prompt
if remaining:
prompt = CONTINUATION_PROMPT
last_msg = messages[-1] if messages else ""
# Check if we can append the prompt to the last message
if len(last_msg) + 1 + len(prompt) <= max_chars:
messages[-1] = last_msg + " " + prompt
else:
# Need to shorten the last message to fit the prompt
# Remove sentences from the end until it fits
last_sentences = split_sentences(last_msg)
while last_sentences:
test = " ".join(last_sentences) + " " + prompt
if len(test) <= max_chars:
# Put removed sentences back into remaining
messages[-1] = test
break
removed = last_sentences.pop()
remaining = removed + " " + remaining
else:
# Couldn't fit — just use the prompt as the last message
messages[-1] = prompt
return messages, remaining
class ContinuationState:
"""Tracks continuation state per user."""
def __init__(self, max_continuations: int = 3):
self.max_continuations = max_continuations
# user_id -> {"remaining": str, "count": int}
self._state: dict[str, dict] = {}
def has_pending(self, user_id: str) -> bool:
"""Check if user has pending continuation content."""
return user_id in self._state and bool(self._state[user_id]["remaining"])
def is_continuation_request(self, text: str) -> bool:
"""Check if the message is a request to continue."""
return text.strip().lower().rstrip("!.,?") in CONTINUE_PHRASES
def store(self, user_id: str, remaining: str) -> None:
"""Store remaining content for a user."""
if remaining:
existing = self._state.get(user_id, {"count": 0})
self._state[user_id] = {
"remaining": remaining,
"count": existing.get("count", 0),
}
elif user_id in self._state:
del self._state[user_id]
def get_continuation(self, user_id: str) -> tuple[list[str], str] | None:
"""Get the next batch of messages for a continuation request.
Returns None if no pending content or max continuations reached.
"""
if user_id not in self._state:
return None
state = self._state[user_id]
if state["count"] >= self.max_continuations:
del self._state[user_id]
return None
remaining = state["remaining"]
if not remaining:
del self._state[user_id]
return None
messages, new_remaining = chunk_response(remaining)
state["count"] += 1
state["remaining"] = new_remaining
if not new_remaining:
del self._state[user_id]
return messages, new_remaining
def clear(self, user_id: str) -> None:
"""Clear continuation state for a user."""
self._state.pop(user_id, None)

View file

@ -81,7 +81,10 @@ class Configurator:
mm_status = self._status_icon(self.config.meshmonitor.enabled) mm_status = self._status_icon(self.config.meshmonitor.enabled)
mm_url = self.config.meshmonitor.url or "[dim]not set[/dim]" mm_url = self.config.meshmonitor.url or "[dim]not set[/dim]"
table.add_row("9", "MeshMonitor Sync", f"{mm_status} {mm_url}") table.add_row("9", "MeshMonitor Sync", f"{mm_status} {mm_url}")
table.add_row("10", "Setup Wizard", "[dim]First-time setup[/dim]") kb_status = self._status_icon(self.config.knowledge.enabled)
kb_path = self.config.knowledge.db_path or "[dim]not set[/dim]"
table.add_row("10", "Knowledge Base", f"{kb_status} {kb_path}")
table.add_row("11", "Setup Wizard", "[dim]First-time setup[/dim]")
console.print(table) console.print(table)
console.print() console.print()
@ -90,13 +93,13 @@ class Configurator:
if self.modified: if self.modified:
console.print("[yellow]* Unsaved changes[/yellow]") console.print("[yellow]* Unsaved changes[/yellow]")
console.print() console.print()
console.print("[white]11. Save[/white] [dim]Save config, stay in menu[/dim]") console.print("[white]12. Save[/white] [dim]Save config, stay in menu[/dim]")
console.print("[green]12. Save & Restart Bot[/green] [dim]Apply changes now[/dim]") console.print("[green]13. Save & Restart Bot[/green] [dim]Apply changes now[/dim]")
console.print("[white]13. Save & Exit[/white] [dim]Save, restart bot, exit[/dim]") console.print("[white]14. Save & Exit[/white] [dim]Save, restart bot, exit[/dim]")
console.print("[white]14. Exit without Saving[/white]") console.print("[white]15. Exit without Saving[/white]")
console.print() console.print()
choice = IntPrompt.ask("Select option", default=12) choice = IntPrompt.ask("Select option", default=13)
if choice == 1: if choice == 1:
self._bot_settings() self._bot_settings()
@ -117,15 +120,17 @@ class Configurator:
elif choice == 9: elif choice == 9:
self._meshmonitor_settings() self._meshmonitor_settings()
elif choice == 10: elif choice == 10:
self._setup_wizard() self._knowledge_settings()
elif choice == 11: elif choice == 11:
self._save_only() self._setup_wizard()
elif choice == 12: elif choice == 12:
self._save_and_restart() self._save_only()
elif choice == 13: elif choice == 13:
self._save_and_restart()
elif choice == 14:
self._save_restart_exit() self._save_restart_exit()
break break
elif choice == 14: elif choice == 15:
break break
def _show_header(self) -> None: def _show_header(self) -> None:
@ -684,6 +689,45 @@ class Configurator:
input("\nPress Enter to continue...") input("\nPress Enter to continue...")
def _knowledge_settings(self) -> None:
"""Knowledge base settings submenu."""
while True:
self._clear()
console.print("[bold]Knowledge Base Settings[/bold]\n")
table = Table(box=box.ROUNDED)
table.add_column("Option", style="cyan", width=4)
table.add_column("Setting", style="white")
table.add_column("Value", style="green")
table.add_row("1", "Enabled", self._status_icon(self.config.knowledge.enabled))
table.add_row("2", "Database Path", self.config.knowledge.db_path or "[dim]not set[/dim]")
table.add_row("3", "Results Count", str(self.config.knowledge.top_k))
table.add_row("0", "Back", "")
console.print(table)
console.print()
choice = IntPrompt.ask("Select option", default=0)
if choice == 0:
return
elif choice == 1:
value = Confirm.ask("Enable knowledge base?", default=self.config.knowledge.enabled)
if value != self.config.knowledge.enabled:
self.config.knowledge.enabled = value
self.modified = True
elif choice == 2:
value = Prompt.ask("Database path", default=self.config.knowledge.db_path)
if value != self.config.knowledge.db_path:
self.config.knowledge.db_path = value
self.modified = True
elif choice == 3:
value = IntPrompt.ask("Results count (top_k)", default=self.config.knowledge.top_k)
if value != self.config.knowledge.top_k:
self.config.knowledge.top_k = value
self.modified = True
def _setup_wizard(self) -> None: def _setup_wizard(self) -> None:
"""First-time setup wizard.""" """First-time setup wizard."""
self._clear() self._clear()

View file

@ -108,7 +108,7 @@ class LLMConfig:
"passive mesh context buffer (observes channel traffic), smart chunking for LoRa " "passive mesh context buffer (observes channel traffic), smart chunking for LoRa "
"message limits, prompt injection defense, advBBS filtering.\n\n" "message limits, prompt injection defense, advBBS filtering.\n\n"
"RESPONSE RULES:\n" "RESPONSE RULES:\n"
"- Keep responses VERY brief — under 200 characters total.\n" "- Keep responses very brief — 1-2 short sentences, under 300 characters. Only give longer answers if the user explicitly asks for detail or explanation.\n"
"- Be concise but friendly. No markdown formatting.\n" "- Be concise but friendly. No markdown formatting.\n"
"- If asked about mesh activity and no recent traffic is shown, say you haven't " "- If asked about mesh activity and no recent traffic is shown, say you haven't "
"observed any yet.\n" "observed any yet.\n"
@ -155,6 +155,14 @@ class MeshMonitorConfig:
refresh_interval: int = 300 # Seconds between refreshes refresh_interval: int = 300 # Seconds between refreshes
@dataclass
class KnowledgeConfig:
"""FTS5 knowledge base settings."""
enabled: bool = False
db_path: str = ""
top_k: int = 5
@dataclass @dataclass
class Config: class Config:
"""Main configuration container.""" """Main configuration container."""
@ -169,6 +177,7 @@ class Config:
llm: LLMConfig = field(default_factory=LLMConfig) llm: LLMConfig = field(default_factory=LLMConfig)
weather: WeatherConfig = field(default_factory=WeatherConfig) weather: WeatherConfig = field(default_factory=WeatherConfig)
meshmonitor: MeshMonitorConfig = field(default_factory=MeshMonitorConfig) meshmonitor: MeshMonitorConfig = field(default_factory=MeshMonitorConfig)
knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
_config_path: Optional[Path] = field(default=None, repr=False) _config_path: Optional[Path] = field(default=None, repr=False)

206
meshai/knowledge.py Normal file
View file

@ -0,0 +1,206 @@
"""Hybrid FTS5 + vector knowledge search for MeshAI."""
import logging
import re
import sqlite3
from typing import Optional
import numpy as np
import sqlite_vec
from fastembed import TextEmbedding
logger = logging.getLogger(__name__)
STOPWORDS = {
'what', 'is', 'the', 'a', 'an', 'and', 'or', 'for', 'on', 'in',
'to', 'of', 'how', 'do', 'does', 'can', 'will', 'would', 'could',
'should', 'are', 'was', 'were', 'be', 'been', 'being', 'have',
'has', 'had', 'not', 'but', 'if', 'then', 'than', 'that', 'this',
'it', 'its', 'my', 'me', 'i', 'you', 'your', 'we', 'they', 'them',
'about', 'with', 'from', 'at', 'by', 'up', 'out', 'so', 'no',
'yes', 'just', 'get', 'got', 'tell', 'know', 'like',
}
class KnowledgeSearch:
"""Hybrid FTS5 + vector knowledge search."""
def __init__(self, db_path: str, top_k: int = 5):
self.top_k = top_k
self.available = False
self._model = None
self._conn: Optional[sqlite3.Connection] = None
self._has_vec = False
try:
self._conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
self._conn.enable_load_extension(True)
sqlite_vec.load(self._conn)
self._conn.enable_load_extension(False)
# Check if vec table exists
tables = [r[0] for r in self._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()]
self._has_vec = "chunks_vec" in tables
if self._has_vec:
logger.info("Loading embedding model for hybrid search...")
self._model = TextEmbedding("BAAI/bge-small-en-v1.5")
logger.info("Knowledge base loaded with hybrid search (FTS5 + vector)")
else:
logger.info("Knowledge base loaded with FTS5 only (no vector table)")
count = self._conn.execute("SELECT count(*) FROM chunks").fetchone()[0]
logger.info(f"Knowledge base: {count} chunks from {db_path}")
self.available = True
except Exception as e:
logger.warning(f"Failed to load knowledge base: {e}")
def search(self, query: str) -> list[dict]:
"""Search knowledge base using hybrid FTS5 + vector with RRF."""
if not self.available or not self._conn:
return []
try:
fts_results = self._fts_search(query)
if self._has_vec and self._model:
vec_results = self._vec_search(query)
merged = self._rrf_merge(fts_results, vec_results)
else:
merged = [(r[0], r[1]) for r in fts_results]
# Fetch full data for top results
top_ids = [r[0] for r in merged[:self.top_k]]
if not top_ids:
return []
results = []
for chunk_id in top_ids:
row = self._conn.execute(
"SELECT title, content, source, book_title FROM chunks WHERE rowid = ?",
[chunk_id]
).fetchone()
if row:
# Truncate content to ~500 chars for prompt injection
content = row[1][:1000] if row[1] else ""
results.append({
"title": row[0] or "",
"excerpt": content,
"source": row[2] or "",
"book_title": row[3] or "",
})
logger.debug(f"Knowledge search: query='{query[:50]}' -> {len(results)} results")
return results
except Exception as e:
logger.warning(f"Knowledge search error: {e}")
return []
def _fts_search(self, query: str, limit: int = 50) -> list[tuple]:
"""FTS5 keyword search. Returns [(rowid, rank), ...]"""
# Domain terms - only use these for FTS, ignore likely typos
DOMAIN_TERMS = {
'short', 'fast', 'slow', 'long', 'mid', 'medium',
'meshtastic', 'lora', 'mesh', 'radio', 'preset', 'modem',
'sf', 'cr', 'bw', 'spreading', 'coding', 'bandwidth',
'factor', 'rate', 'channel', 'frequency', 'node',
}
cleaned = re.sub(r'[^a-zA-Z0-9\s]', '', query.lower())
words = cleaned.split()
# Extract only domain terms (ignores typos like "waht", "teh")
domain_words = [w for w in words if w in DOMAIN_TERMS]
# Handle compound words: "shortfast" -> ["short", "fast"]
expanded = []
for w in domain_words:
if w == 'shortfast':
expanded.extend(['short', 'fast'])
elif w == 'longfast':
expanded.extend(['long', 'fast'])
elif w == 'medslow' or w == 'midslow':
expanded.extend(['mid', 'slow'])
else:
expanded.append(w)
# Also check for these patterns in non-domain words
for w in words:
if w not in DOMAIN_TERMS:
if 'shortfast' in w:
expanded.extend(['short', 'fast'])
elif 'short' in w and 'fast' in w:
expanded.extend(['short', 'fast'])
elif 'longfast' in w:
expanded.extend(['long', 'fast'])
# Dedupe while preserving order
seen = set()
unique = []
for w in expanded:
if w not in seen:
seen.add(w)
unique.append(w)
if not unique:
return []
# Use AND for domain terms - they should all match
fts_query = " AND ".join(unique[:5])
try:
rows = self._conn.execute("""
SELECT rowid, rank
FROM chunks_fts
WHERE chunks_fts MATCH ?
ORDER BY rank
LIMIT ?
""", [fts_query, limit]).fetchall()
return rows
except Exception as e:
logger.warning(f"FTS search error: {e}")
return []
def _vec_search(self, query: str, limit: int = 50) -> list[tuple]:
"""Vector similarity search. Returns [(chunk_rowid, distance), ...]"""
try:
query_vec = list(self._model.embed([f"query: {query}"]))[0]
rows = self._conn.execute("""
SELECT chunk_rowid, distance
FROM chunks_vec
WHERE embedding MATCH ?
AND k = ?
""", [query_vec.astype(np.float32).tobytes(), limit]).fetchall()
return rows
except Exception as e:
logger.warning(f"Vector search error: {e}")
return []
def _rrf_merge(self, fts_results: list, vec_results: list, k: int = 60) -> list:
"""Reciprocal Rank Fusion merge of FTS5 and vector results."""
scores = {}
# FTS weight 0.5
for rank, (rowid, _) in enumerate(fts_results):
scores[rowid] = scores.get(rowid, 0) + 0.5 / (k + rank + 1)
# Vector weight 0.5
for rank, (chunk_rowid, _) in enumerate(vec_results):
scores[chunk_rowid] = scores.get(chunk_rowid, 0) + 0.5 / (k + rank + 1)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
def close(self):
"""Close the database connection."""
if self._conn:
try:
self._conn.close()
except Exception:
pass
self._conn = None
self.available = False

View file

@ -38,6 +38,7 @@ class MeshAI:
self.llm: Optional[LLMBackend] = None self.llm: Optional[LLMBackend] = None
self.context: Optional[MeshContext] = None self.context: Optional[MeshContext] = None
self.meshmonitor_sync = None self.meshmonitor_sync = None
self.knowledge = None
self.router: Optional[MessageRouter] = None self.router: Optional[MessageRouter] = None
self.responder: Optional[Responder] = None self.responder: Optional[Responder] = None
self._running = False self._running = False
@ -97,6 +98,8 @@ class MeshAI:
if self.llm: if self.llm:
await self.llm.close() await self.llm.close()
if self.knowledge:
self.knowledge.close()
self._remove_pid() self._remove_pid()
logger.info("MeshAI stopped") logger.info("MeshAI stopped")
@ -175,11 +178,23 @@ class MeshAI:
else: else:
self.meshmonitor_sync = None self.meshmonitor_sync = None
# Knowledge base
kb_cfg = self.config.knowledge
if kb_cfg.enabled and kb_cfg.db_path:
from .knowledge import KnowledgeSearch
self.knowledge = KnowledgeSearch(
db_path=kb_cfg.db_path,
top_k=kb_cfg.top_k,
)
else:
self.knowledge = None
# Message router # Message router
self.router = MessageRouter( self.router = MessageRouter(
self.config, self.connector, self.history, self.dispatcher, self.llm, self.config, self.connector, self.history, self.dispatcher, self.llm,
context=self.context, context=self.context,
meshmonitor_sync=self.meshmonitor_sync, meshmonitor_sync=self.meshmonitor_sync,
knowledge=self.knowledge,
) )
# Responder # Responder
@ -208,6 +223,16 @@ class MeshAI:
) )
# Route the message # Route the message
# Check for continuation request first
continuation_messages = self.router.check_continuation(message)
if continuation_messages:
await self.responder.send_response(
continuation_messages,
destination=message.sender_id,
channel=message.channel,
)
return
result = await self.router.route(message) result = await self.router.route(message)
if result.route_type == RouteType.IGNORE: if result.route_type == RouteType.IGNORE:
@ -215,18 +240,18 @@ class MeshAI:
# Determine response # Determine response
if result.route_type == RouteType.COMMAND: if result.route_type == RouteType.COMMAND:
response = result.response messages = result.response # Commands return single string
elif result.route_type == RouteType.LLM: elif result.route_type == RouteType.LLM:
response = await self.router.generate_llm_response(message, result.query) messages = await self.router.generate_llm_response(message, result.query)
else: else:
return return
if not response: if not messages:
return return
# Send DM response # Send DM response
await self.responder.send_response( await self.responder.send_response(
text=response, messages,
destination=message.sender_id, destination=message.sender_id,
channel=message.channel, channel=message.channel,
) )

View file

@ -1,4 +1,4 @@
"""Response handling - delays and message chunking.""" """Response handling - delays and message delivery."""
import asyncio import asyncio
import logging import logging
@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
class Responder: class Responder:
"""Handles response formatting, chunking, and delivery.""" """Handles response delivery with pacing."""
def __init__(self, config: ResponseConfig, connector: MeshConnector): def __init__(self, config: ResponseConfig, connector: MeshConnector):
self.config = config self.config = config
@ -20,120 +20,46 @@ class Responder:
async def send_response( async def send_response(
self, self,
text: str, messages: list[str] | str,
destination: Optional[str] = None, destination: Optional[str] = None,
channel: int = 0, channel: int = 0,
) -> bool: ) -> bool:
"""Send a response with delay and chunking. """Send response messages with human-pacing delays.
Args: Args:
text: Response text (will be chunked if too long) messages: Pre-chunked messages list, or single string (legacy)
destination: Node ID for DM, or None for channel broadcast destination: Node ID for DM, or None for channel broadcast
channel: Channel to send on channel: Channel to send on
Returns: Returns:
True if all chunks sent successfully True if all messages sent successfully
""" """
# Chunk the message # Handle legacy single string
chunks = self._chunk_message(text) if isinstance(messages, str):
messages = [messages]
# Limit to max messages if not messages:
if len(chunks) > self.config.max_messages: return True
chunks = chunks[: self.config.max_messages]
# Truncate last chunk to indicate more was cut
if chunks:
last = chunks[-1]
if len(last) > self.config.max_length - 3:
chunks[-1] = last[: self.config.max_length - 3] + "..."
success = True success = True
for i, chunk in enumerate(chunks): for i, msg in enumerate(messages):
# Apply delay before sending # Apply delay before sending (except first message)
delay = random.uniform(self.config.delay_min, self.config.delay_max) if i > 0:
await asyncio.sleep(delay) delay = random.uniform(self.config.delay_min, self.config.delay_max)
await asyncio.sleep(delay)
# Send chunk # Send message
sent = self.connector.send_message( sent = self.connector.send_message(
text=chunk, text=msg,
destination=destination, destination=destination,
channel=channel, channel=channel,
) )
if not sent: if not sent:
logger.error(f"Failed to send chunk {i + 1}/{len(chunks)}") logger.error(f"Failed to send message {i + 1}/{len(messages)}")
success = False success = False
break break
logger.debug(f"Sent chunk {i + 1}/{len(chunks)}: {chunk[:50]}...") logger.debug(f"Sent message {i + 1}/{len(messages)}: {msg[:50]}...")
return success return success
def _chunk_message(self, text: str) -> list[str]:
"""Split message into chunks respecting max_length.
Tries to break at word boundaries when possible.
Args:
text: Text to chunk
Returns:
List of chunks
"""
max_len = self.config.max_length
if len(text) <= max_len:
return [text]
chunks = []
remaining = text
while remaining:
if len(remaining) <= max_len:
chunks.append(remaining)
break
# Find a good break point
chunk = remaining[:max_len]
# Try to break at word boundary
break_point = self._find_break_point(chunk)
if break_point > 0:
chunks.append(remaining[:break_point].rstrip())
remaining = remaining[break_point:].lstrip()
else:
# No good break point, hard cut
chunks.append(chunk)
remaining = remaining[max_len:]
return chunks
def _find_break_point(self, text: str) -> int:
"""Find best break point in text.
Prefers: sentence end > comma/semicolon > space
Args:
text: Text to find break in
Returns:
Index to break at, or 0 if no good break found
"""
# Look for sentence endings
for char in ".!?":
pos = text.rfind(char)
if pos > len(text) // 2: # Only if in second half
return pos + 1
# Look for clause breaks
for char in ",;:":
pos = text.rfind(char)
if pos > len(text) // 2:
return pos + 1
# Look for word boundary
pos = text.rfind(" ")
if pos > len(text) // 3: # Only if past first third
return pos
return 0

View file

@ -13,6 +13,7 @@ from .config import Config
from .connector import MeshConnector, MeshMessage from .connector import MeshConnector, MeshMessage
from .context import MeshContext from .context import MeshContext
from .history import ConversationHistory from .history import ConversationHistory
from .chunker import chunk_response, ContinuationState
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -65,6 +66,7 @@ class MessageRouter:
llm_backend: LLMBackend, llm_backend: LLMBackend,
context: MeshContext = None, context: MeshContext = None,
meshmonitor_sync=None, meshmonitor_sync=None,
knowledge=None,
): ):
self.config = config self.config = config
self.connector = connector self.connector = connector
@ -73,6 +75,8 @@ class MessageRouter:
self.llm = llm_backend self.llm = llm_backend
self.context = context self.context = context
self.meshmonitor_sync = meshmonitor_sync self.meshmonitor_sync = meshmonitor_sync
self.knowledge = knowledge
self.continuations = ContinuationState(max_continuations=3)
def should_respond(self, message: MeshMessage) -> bool: def should_respond(self, message: MeshMessage) -> bool:
@ -111,6 +115,30 @@ class MessageRouter:
return True return True
def check_continuation(self, message) -> list[str] | None:
"""Check if this is a continuation request and return messages if so.
Returns:
List of messages to send, or None if not a continuation
"""
user_id = message.sender_id
text = message.text.strip()
logger.info(f"check_continuation: user={user_id}, text='{text[:30]}', has_pending={self.continuations.has_pending(user_id)}")
if self.continuations.has_pending(user_id):
if self.continuations.is_continuation_request(text):
result = self.continuations.get_continuation(user_id)
if result:
messages, _ = result
return messages
# Max continuations reached, return None to fall through
else:
# User asked something new, clear pending continuation
self.continuations.clear(user_id)
return None
async def route(self, message: MeshMessage) -> RouteResult: async def route(self, message: MeshMessage) -> RouteResult:
"""Route a message and generate response. """Route a message and generate response.
@ -208,6 +236,23 @@ class MessageRouter:
"\n\n[No recent mesh traffic observed yet.]" "\n\n[No recent mesh traffic observed yet.]"
) )
# 5. Knowledge base retrieval
if self.knowledge and query:
results = self.knowledge.search(query)
if results:
chunks = "\n\n".join(
f"[{r['title']}]: {r['excerpt']}" for r in results
)
system_prompt += (
"\n\nREFERENCE KNOWLEDGE - Answer using this information:\n"
+ chunks
)
# DEBUG: Log system prompt status
logger.warning(f"SYSTEM PROMPT LENGTH: {len(system_prompt)} chars")
logger.warning(f"HAS REFERENCE KNOWLEDGE: {'REFERENCE KNOWLEDGE' in system_prompt}")
try: try:
response = await self.llm.generate( response = await self.llm.generate(
messages=history, messages=history,
@ -227,7 +272,21 @@ class MessageRouter:
# Persist summary if one was created/updated # Persist summary if one was created/updated
await self._persist_summary(message.sender_id) await self._persist_summary(message.sender_id)
return response # Chunk the response with sentence awareness
messages, remaining = chunk_response(
response,
max_chars=self.config.response.max_length,
max_messages=self.config.response.max_messages,
)
# Store remaining content for continuation
if remaining:
logger.info(f"Storing continuation for {message.sender_id}: {len(remaining)} chars remaining")
self.continuations.store(message.sender_id, remaining)
else:
logger.info(f"No remaining content for {message.sender_id}")
return messages
async def _persist_summary(self, user_id: str) -> None: async def _persist_summary(self, user_id: str) -> None:
"""Persist any cached summary to the database. """Persist any cached summary to the database.

View file

@ -6,3 +6,6 @@ anthropic>=0.18.0
google-genai>=1.0.0 google-genai>=1.0.0
rich>=13.0.0 rich>=13.0.0
httpx>=0.25.0 httpx>=0.25.0
fastembed>=0.3.0
sqlite-vec>=0.1.0
numpy

View file

@ -1 +0,0 @@
{}