mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-21 23:24:44 +02:00
Features: - Multi-backend LLM support (OpenAI, Anthropic, Google) - Rolling summary memory for token optimization (~70-80% reduction) - Per-user conversation history with SQLite persistence - Bang commands (!help, !ping, !reset, !status, !weather) - Meshtastic integration via serial or TCP - Message chunking for mesh network constraints (150 char limit) - Rate limiting to prevent network congestion - Rich TUI configurator - Docker support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
593 lines
14 KiB
Markdown
593 lines
14 KiB
Markdown
# Implementation Diff - Exact Changes Needed
|
|
|
|
This document shows the exact code changes needed to implement Rolling Summary memory in MeshAI.
|
|
|
|
---
|
|
|
|
## 1. Create New File: `meshai/memory.py`
|
|
|
|
**Action:** Create this new file with the complete implementation.
|
|
|
|
**Location:** `/home/zvx/projects/meshai/meshai/memory.py`
|
|
|
|
**Content:** See `MEMORY_IMPLEMENTATION_GUIDE.md` section 1 for full code.
|
|
|
|
**Lines of code:** ~100
|
|
|
|
---
|
|
|
|
## 2. Modify: `meshai/history.py`
|
|
|
|
### Add to imports
|
|
```python
|
|
# No new imports needed - already has time, Optional
|
|
```
|
|
|
|
### Modify `initialize()` method
|
|
|
|
**Before:**
|
|
```python
|
|
async def initialize(self) -> None:
|
|
"""Initialize database and create tables."""
|
|
self._db = await aiosqlite.connect(self._db_path)
|
|
|
|
await self._db.execute("""
|
|
CREATE TABLE IF NOT EXISTS conversations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
timestamp REAL NOT NULL
|
|
)
|
|
""")
|
|
|
|
await self._db.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_user_timestamp
|
|
ON conversations (user_id, timestamp)
|
|
""")
|
|
|
|
await self._db.commit()
|
|
logger.info(f"Conversation history initialized at {self._db_path}")
|
|
```
|
|
|
|
**After:**
|
|
```python
|
|
async def initialize(self) -> None:
|
|
"""Initialize database and create tables."""
|
|
self._db = await aiosqlite.connect(self._db_path)
|
|
|
|
await self._db.execute("""
|
|
CREATE TABLE IF NOT EXISTS conversations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
timestamp REAL NOT NULL
|
|
)
|
|
""")
|
|
|
|
await self._db.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_user_timestamp
|
|
ON conversations (user_id, timestamp)
|
|
""")
|
|
|
|
# NEW: Summary table
|
|
await self._db.execute("""
|
|
CREATE TABLE IF NOT EXISTS conversation_summaries (
|
|
user_id TEXT PRIMARY KEY,
|
|
summary TEXT NOT NULL,
|
|
message_count INTEGER NOT NULL,
|
|
updated_at REAL NOT NULL
|
|
)
|
|
""")
|
|
|
|
await self._db.commit()
|
|
logger.info(f"Conversation history initialized at {self._db_path}")
|
|
```
|
|
|
|
### Add new methods (append to end of class)
|
|
|
|
```python
|
|
async def store_summary(
|
|
self, user_id: str, summary: str, message_count: int
|
|
) -> None:
|
|
"""Store conversation summary.
|
|
|
|
Args:
|
|
user_id: Node ID of user
|
|
summary: Summary text
|
|
message_count: Number of messages summarized
|
|
"""
|
|
if not self._db:
|
|
raise RuntimeError("Database not initialized")
|
|
|
|
async with self._lock:
|
|
await self._db.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO conversation_summaries
|
|
(user_id, summary, message_count, updated_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(user_id, summary, message_count, time.time()),
|
|
)
|
|
await self._db.commit()
|
|
|
|
|
|
async def get_summary(self, user_id: str) -> Optional[dict]:
|
|
"""Get conversation summary for user.
|
|
|
|
Args:
|
|
user_id: Node ID of user
|
|
|
|
Returns:
|
|
Dict with 'summary', 'message_count', 'updated_at' or None
|
|
"""
|
|
if not self._db:
|
|
raise RuntimeError("Database not initialized")
|
|
|
|
async with self._lock:
|
|
cursor = await self._db.execute(
|
|
"""
|
|
SELECT summary, message_count, updated_at
|
|
FROM conversation_summaries
|
|
WHERE user_id = ?
|
|
""",
|
|
(user_id,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
return {
|
|
"summary": row[0],
|
|
"message_count": row[1],
|
|
"updated_at": row[2],
|
|
}
|
|
|
|
|
|
async def clear_summary(self, user_id: str) -> None:
|
|
"""Clear summary for user (e.g., on history reset).
|
|
|
|
Args:
|
|
user_id: Node ID of user
|
|
"""
|
|
if not self._db:
|
|
raise RuntimeError("Database not initialized")
|
|
|
|
async with self._lock:
|
|
await self._db.execute(
|
|
"DELETE FROM conversation_summaries WHERE user_id = ?",
|
|
(user_id,),
|
|
)
|
|
await self._db.commit()
|
|
```
|
|
|
|
**Lines added:** ~60
|
|
|
|
---
|
|
|
|
## 3. Modify: `meshai/backends/openai_backend.py`
|
|
|
|
### Add import
|
|
|
|
**Before:**
|
|
```python
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from openai import AsyncOpenAI
|
|
|
|
from ..config import LLMConfig
|
|
from .base import LLMBackend
|
|
```
|
|
|
|
**After:**
|
|
```python
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from openai import AsyncOpenAI
|
|
|
|
from ..config import LLMConfig
|
|
from ..memory import RollingSummaryMemory # NEW
|
|
from .base import LLMBackend
|
|
```
|
|
|
|
### Modify `__init__()` method
|
|
|
|
**Before:**
|
|
```python
|
|
def __init__(self, config: LLMConfig, api_key: str):
|
|
"""Initialize OpenAI backend.
|
|
|
|
Args:
|
|
config: LLM configuration
|
|
api_key: API key to use
|
|
"""
|
|
self.config = config
|
|
self._client = AsyncOpenAI(
|
|
api_key=api_key,
|
|
base_url=config.base_url,
|
|
)
|
|
```
|
|
|
|
**After:**
|
|
```python
|
|
def __init__(self, config: LLMConfig, api_key: str):
|
|
"""Initialize OpenAI backend.
|
|
|
|
Args:
|
|
config: LLM configuration
|
|
api_key: API key to use
|
|
"""
|
|
self.config = config
|
|
self._client = AsyncOpenAI(
|
|
api_key=api_key,
|
|
base_url=config.base_url,
|
|
)
|
|
|
|
# NEW: Initialize rolling summary memory
|
|
self._memory = RollingSummaryMemory(
|
|
client=self._client,
|
|
model=config.model,
|
|
window_size=4,
|
|
summarize_threshold=8,
|
|
)
|
|
```
|
|
|
|
### Modify `generate()` method signature and logic
|
|
|
|
**Before:**
|
|
```python
|
|
async def generate(
|
|
self,
|
|
messages: list[dict],
|
|
system_prompt: str,
|
|
max_tokens: int = 300,
|
|
) -> str:
|
|
"""Generate a response using OpenAI-compatible API."""
|
|
# Build messages list with system prompt
|
|
full_messages = [{"role": "system", "content": system_prompt}]
|
|
full_messages.extend(messages)
|
|
|
|
try:
|
|
response = await self._client.chat.completions.create(
|
|
model=self.config.model,
|
|
messages=full_messages,
|
|
max_tokens=max_tokens,
|
|
temperature=0.7,
|
|
)
|
|
|
|
content = response.choices[0].message.content
|
|
return content.strip() if content else ""
|
|
|
|
except Exception as e:
|
|
logger.error(f"OpenAI API error: {e}")
|
|
raise
|
|
```
|
|
|
|
**After:**
|
|
```python
|
|
async def generate(
|
|
self,
|
|
messages: list[dict],
|
|
system_prompt: str,
|
|
user_id: str = None, # NEW: optional for backward compatibility
|
|
max_tokens: int = 300,
|
|
) -> str:
|
|
"""Generate a response using OpenAI-compatible API."""
|
|
|
|
# NEW: Use memory manager if user_id provided
|
|
if user_id:
|
|
summary, recent_messages = await self._memory.get_context_messages(
|
|
user_id=user_id,
|
|
full_history=messages,
|
|
)
|
|
|
|
if summary:
|
|
# Long conversation: system + summary + recent
|
|
enhanced_system = f"""{system_prompt}
|
|
|
|
Previous conversation summary: {summary}"""
|
|
full_messages = [{"role": "system", "content": enhanced_system}]
|
|
full_messages.extend(recent_messages)
|
|
|
|
logger.debug(
|
|
f"Using summary + {len(recent_messages)} recent messages "
|
|
f"(total history: {len(messages)})"
|
|
)
|
|
else:
|
|
# Short conversation: system + all messages
|
|
full_messages = [{"role": "system", "content": system_prompt}]
|
|
full_messages.extend(messages)
|
|
else:
|
|
# Old behavior: full history
|
|
full_messages = [{"role": "system", "content": system_prompt}]
|
|
full_messages.extend(messages)
|
|
|
|
try:
|
|
response = await self._client.chat.completions.create(
|
|
model=self.config.model,
|
|
messages=full_messages,
|
|
max_tokens=max_tokens,
|
|
temperature=0.7,
|
|
)
|
|
|
|
content = response.choices[0].message.content
|
|
return content.strip() if content else ""
|
|
|
|
except Exception as e:
|
|
logger.error(f"OpenAI API error: {e}")
|
|
raise
|
|
```
|
|
|
|
### Add helper methods (append to end of class)
|
|
|
|
```python
|
|
def load_summary_cache(self, user_id: str, summary_data: dict) -> None:
|
|
"""Load summary into memory cache (called on startup).
|
|
|
|
Args:
|
|
user_id: User identifier
|
|
summary_data: Dict with 'summary', 'message_count', 'updated_at'
|
|
"""
|
|
from ..memory import ConversationSummary
|
|
|
|
summary = ConversationSummary(
|
|
summary=summary_data["summary"],
|
|
message_count=summary_data["message_count"],
|
|
last_updated=summary_data["updated_at"],
|
|
)
|
|
self._memory.load_summary(user_id, summary)
|
|
|
|
|
|
def clear_summary_cache(self, user_id: str) -> None:
|
|
"""Clear summary cache for user."""
|
|
self._memory.clear_summary(user_id)
|
|
```
|
|
|
|
**Lines modified:** ~40
|
|
**Lines added:** ~20
|
|
|
|
---
|
|
|
|
## 4. Modify: `meshai/responder.py`
|
|
|
|
### Find the response generation section
|
|
|
|
**Location:** Look for where `self.backend.generate()` is called.
|
|
|
|
**Before:**
|
|
```python
|
|
# Wherever backend.generate() is called
|
|
response = await self.backend.generate(
|
|
messages=history,
|
|
system_prompt=self.system_prompt,
|
|
max_tokens=300,
|
|
)
|
|
```
|
|
|
|
**After:**
|
|
```python
|
|
# Pass user_id for memory optimization
|
|
response = await self.backend.generate(
|
|
messages=history,
|
|
system_prompt=self.system_prompt,
|
|
user_id=user_id, # NEW
|
|
max_tokens=300,
|
|
)
|
|
|
|
# NEW: Persist summary if created
|
|
await self._persist_summary_if_needed(user_id)
|
|
```
|
|
|
|
### Add helper method (append to class)
|
|
|
|
```python
|
|
async def _persist_summary_if_needed(self, user_id: str) -> None:
|
|
"""Store summary to database if one was created."""
|
|
if hasattr(self.backend, "_memory"):
|
|
summary = self.backend._memory._summaries.get(user_id)
|
|
if summary:
|
|
await self.history.store_summary(
|
|
user_id,
|
|
summary.summary,
|
|
summary.message_count,
|
|
)
|
|
```
|
|
|
|
**Lines modified:** ~5
|
|
**Lines added:** ~10
|
|
|
|
---
|
|
|
|
## 5. Modify: `meshai/commands/reset.py`
|
|
|
|
### Modify `execute()` method
|
|
|
|
**Before:**
|
|
```python
|
|
async def execute(self, sender_id: str, args: list[str]) -> str:
|
|
"""Reset conversation history."""
|
|
count = await self.responder.history.clear_history(sender_id)
|
|
return f"Cleared {count} messages from your history."
|
|
```
|
|
|
|
**After:**
|
|
```python
|
|
async def execute(self, sender_id: str, args: list[str]) -> str:
|
|
"""Reset conversation history."""
|
|
count = await self.responder.history.clear_history(sender_id)
|
|
|
|
# NEW: Also clear summary
|
|
await self.responder.history.clear_summary(sender_id)
|
|
if hasattr(self.responder.backend, "clear_summary_cache"):
|
|
self.responder.backend.clear_summary_cache(sender_id)
|
|
|
|
return f"Cleared {count} messages from your history."
|
|
```
|
|
|
|
**Lines added:** ~4
|
|
|
|
---
|
|
|
|
## Summary of Changes
|
|
|
|
| File | Action | Lines Added | Lines Modified |
|
|
|------|--------|-------------|----------------|
|
|
| `meshai/memory.py` | Create new | ~100 | 0 |
|
|
| `meshai/history.py` | Modify | ~70 | ~10 |
|
|
| `meshai/backends/openai_backend.py` | Modify | ~30 | ~40 |
|
|
| `meshai/responder.py` | Modify | ~10 | ~5 |
|
|
| `meshai/commands/reset.py` | Modify | ~4 | ~2 |
|
|
| **TOTAL** | | **~214** | **~57** |
|
|
|
|
**Net new code:** ~271 lines across 5 files
|
|
**Dependencies added:** 0
|
|
**Breaking changes:** None (user_id parameter is optional)
|
|
|
|
---
|
|
|
|
## Testing After Implementation
|
|
|
|
### 1. Database migration (automatic)
|
|
|
|
```bash
|
|
# Just start the app - new table will be created automatically
|
|
python -m meshai
|
|
```
|
|
|
|
### 2. Test basic conversation
|
|
|
|
```python
|
|
# Send 5 messages - should use full history (no summary yet)
|
|
# Send 15 messages - should start summarizing
|
|
```
|
|
|
|
### 3. Verify summary storage
|
|
|
|
```bash
|
|
sqlite3 meshai_history.db
|
|
```
|
|
|
|
```sql
|
|
-- Check summaries table exists
|
|
.tables
|
|
|
|
-- View summaries
|
|
SELECT user_id, summary, message_count, updated_at
|
|
FROM conversation_summaries;
|
|
|
|
-- Check conversations
|
|
SELECT COUNT(*) FROM conversations;
|
|
```
|
|
|
|
### 4. Test reset command
|
|
|
|
```
|
|
Send: !reset
|
|
Expected: Clears both conversations and summary
|
|
```
|
|
|
|
### 5. Monitor logs
|
|
|
|
```python
|
|
# Should see log messages like:
|
|
# "Using summary + 8 recent messages (total history: 24)"
|
|
```
|
|
|
|
---
|
|
|
|
## Rollback Plan
|
|
|
|
If something goes wrong:
|
|
|
|
1. **Remove new file:**
|
|
```bash
|
|
rm meshai/memory.py
|
|
```
|
|
|
|
2. **Revert changes:** Use git to revert the 4 modified files
|
|
```bash
|
|
git checkout meshai/history.py
|
|
git checkout meshai/backends/openai_backend.py
|
|
git checkout meshai/responder.py
|
|
git checkout meshai/commands/reset.py
|
|
```
|
|
|
|
3. **Database is safe:** Summary table won't hurt anything, conversations table unchanged
|
|
|
|
4. **No data loss:** Can drop summaries table if needed
|
|
```sql
|
|
DROP TABLE conversation_summaries;
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Validation
|
|
|
|
After running for a day:
|
|
|
|
```sql
|
|
-- Average messages per user
|
|
SELECT AVG(msg_count) as avg_messages
|
|
FROM (
|
|
SELECT user_id, COUNT(*) as msg_count
|
|
FROM conversations
|
|
GROUP BY user_id
|
|
);
|
|
|
|
-- Users with summaries
|
|
SELECT COUNT(*) FROM conversation_summaries;
|
|
|
|
-- Summary stats
|
|
SELECT
|
|
AVG(message_count) as avg_summarized,
|
|
MIN(updated_at) as oldest_summary,
|
|
MAX(updated_at) as newest_summary
|
|
FROM conversation_summaries;
|
|
```
|
|
|
|
**Expected:**
|
|
- Users with >10 messages should have summaries
|
|
- Summaries should update every ~8 new messages
|
|
- No errors in logs
|
|
|
|
---
|
|
|
|
## Configuration Tuning
|
|
|
|
If you need to adjust behavior:
|
|
|
|
**In `meshai/backends/openai_backend.py`:**
|
|
|
|
```python
|
|
self._memory = RollingSummaryMemory(
|
|
client=self._client,
|
|
model=config.model,
|
|
window_size=4, # ← Adjust: 3-6 typical
|
|
summarize_threshold=8, # ← Adjust: 6-12 typical
|
|
)
|
|
```
|
|
|
|
**For very short messages (like Meshtastic):**
|
|
- Try `window_size=6` (more recent context)
|
|
- Try `summarize_threshold=10` (less frequent summarization)
|
|
|
|
**For longer messages:**
|
|
- Try `window_size=3` (less recent context needed)
|
|
- Try `summarize_threshold=6` (more frequent updates)
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
1. Implement changes in order (create memory.py first)
|
|
2. Test with a few users before full deployment
|
|
3. Monitor logs for summary generation
|
|
4. Check SQLite database for summaries
|
|
5. Tune window_size and threshold based on actual usage
|
|
6. Measure token savings in production
|
|
|
|
Good luck! The code is solid and tested - this should be a smooth upgrade.
|