# Implementation Diff - Exact Changes Needed This document shows the exact code changes needed to implement Rolling Summary memory in MeshAI. --- ## 1. Create New File: `meshai/memory.py` **Action:** Create this new file with the complete implementation. **Location:** `/home/zvx/projects/meshai/meshai/memory.py` **Content:** See `MEMORY_IMPLEMENTATION_GUIDE.md` section 1 for full code. **Lines of code:** ~100 --- ## 2. Modify: `meshai/history.py` ### Add to imports ```python # No new imports needed - already has time, Optional ``` ### Modify `initialize()` method **Before:** ```python async def initialize(self) -> None: """Initialize database and create tables.""" self._db = await aiosqlite.connect(self._db_path) await self._db.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp REAL NOT NULL ) """) await self._db.execute(""" CREATE INDEX IF NOT EXISTS idx_user_timestamp ON conversations (user_id, timestamp) """) await self._db.commit() logger.info(f"Conversation history initialized at {self._db_path}") ``` **After:** ```python async def initialize(self) -> None: """Initialize database and create tables.""" self._db = await aiosqlite.connect(self._db_path) await self._db.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp REAL NOT NULL ) """) await self._db.execute(""" CREATE INDEX IF NOT EXISTS idx_user_timestamp ON conversations (user_id, timestamp) """) # NEW: Summary table await self._db.execute(""" CREATE TABLE IF NOT EXISTS conversation_summaries ( user_id TEXT PRIMARY KEY, summary TEXT NOT NULL, message_count INTEGER NOT NULL, updated_at REAL NOT NULL ) """) await self._db.commit() logger.info(f"Conversation history initialized at {self._db_path}") ``` ### Add new methods (append to end of class) ```python async def store_summary( self, user_id: str, summary: str, message_count: int ) -> None: """Store conversation summary. Args: user_id: Node ID of user summary: Summary text message_count: Number of messages summarized """ if not self._db: raise RuntimeError("Database not initialized") async with self._lock: await self._db.execute( """ INSERT OR REPLACE INTO conversation_summaries (user_id, summary, message_count, updated_at) VALUES (?, ?, ?, ?) """, (user_id, summary, message_count, time.time()), ) await self._db.commit() async def get_summary(self, user_id: str) -> Optional[dict]: """Get conversation summary for user. Args: user_id: Node ID of user Returns: Dict with 'summary', 'message_count', 'updated_at' or None """ if not self._db: raise RuntimeError("Database not initialized") async with self._lock: cursor = await self._db.execute( """ SELECT summary, message_count, updated_at FROM conversation_summaries WHERE user_id = ? """, (user_id,), ) row = await cursor.fetchone() if not row: return None return { "summary": row[0], "message_count": row[1], "updated_at": row[2], } async def clear_summary(self, user_id: str) -> None: """Clear summary for user (e.g., on history reset). Args: user_id: Node ID of user """ if not self._db: raise RuntimeError("Database not initialized") async with self._lock: await self._db.execute( "DELETE FROM conversation_summaries WHERE user_id = ?", (user_id,), ) await self._db.commit() ``` **Lines added:** ~60 --- ## 3. Modify: `meshai/backends/openai_backend.py` ### Add import **Before:** ```python import logging from typing import Optional from openai import AsyncOpenAI from ..config import LLMConfig from .base import LLMBackend ``` **After:** ```python import logging from typing import Optional from openai import AsyncOpenAI from ..config import LLMConfig from ..memory import RollingSummaryMemory # NEW from .base import LLMBackend ``` ### Modify `__init__()` method **Before:** ```python def __init__(self, config: LLMConfig, api_key: str): """Initialize OpenAI backend. Args: config: LLM configuration api_key: API key to use """ self.config = config self._client = AsyncOpenAI( api_key=api_key, base_url=config.base_url, ) ``` **After:** ```python def __init__(self, config: LLMConfig, api_key: str): """Initialize OpenAI backend. Args: config: LLM configuration api_key: API key to use """ self.config = config self._client = AsyncOpenAI( api_key=api_key, base_url=config.base_url, ) # NEW: Initialize rolling summary memory self._memory = RollingSummaryMemory( client=self._client, model=config.model, window_size=4, summarize_threshold=8, ) ``` ### Modify `generate()` method signature and logic **Before:** ```python async def generate( self, messages: list[dict], system_prompt: str, max_tokens: int = 300, ) -> str: """Generate a response using OpenAI-compatible API.""" # Build messages list with system prompt full_messages = [{"role": "system", "content": system_prompt}] full_messages.extend(messages) try: response = await self._client.chat.completions.create( model=self.config.model, messages=full_messages, max_tokens=max_tokens, temperature=0.7, ) content = response.choices[0].message.content return content.strip() if content else "" except Exception as e: logger.error(f"OpenAI API error: {e}") raise ``` **After:** ```python async def generate( self, messages: list[dict], system_prompt: str, user_id: str = None, # NEW: optional for backward compatibility max_tokens: int = 300, ) -> str: """Generate a response using OpenAI-compatible API.""" # NEW: Use memory manager if user_id provided if user_id: summary, recent_messages = await self._memory.get_context_messages( user_id=user_id, full_history=messages, ) if summary: # Long conversation: system + summary + recent enhanced_system = f"""{system_prompt} Previous conversation summary: {summary}""" full_messages = [{"role": "system", "content": enhanced_system}] full_messages.extend(recent_messages) logger.debug( f"Using summary + {len(recent_messages)} recent messages " f"(total history: {len(messages)})" ) else: # Short conversation: system + all messages full_messages = [{"role": "system", "content": system_prompt}] full_messages.extend(messages) else: # Old behavior: full history full_messages = [{"role": "system", "content": system_prompt}] full_messages.extend(messages) try: response = await self._client.chat.completions.create( model=self.config.model, messages=full_messages, max_tokens=max_tokens, temperature=0.7, ) content = response.choices[0].message.content return content.strip() if content else "" except Exception as e: logger.error(f"OpenAI API error: {e}") raise ``` ### Add helper methods (append to end of class) ```python def load_summary_cache(self, user_id: str, summary_data: dict) -> None: """Load summary into memory cache (called on startup). Args: user_id: User identifier summary_data: Dict with 'summary', 'message_count', 'updated_at' """ from ..memory import ConversationSummary summary = ConversationSummary( summary=summary_data["summary"], message_count=summary_data["message_count"], last_updated=summary_data["updated_at"], ) self._memory.load_summary(user_id, summary) def clear_summary_cache(self, user_id: str) -> None: """Clear summary cache for user.""" self._memory.clear_summary(user_id) ``` **Lines modified:** ~40 **Lines added:** ~20 --- ## 4. Modify: `meshai/responder.py` ### Find the response generation section **Location:** Look for where `self.backend.generate()` is called. **Before:** ```python # Wherever backend.generate() is called response = await self.backend.generate( messages=history, system_prompt=self.system_prompt, max_tokens=300, ) ``` **After:** ```python # Pass user_id for memory optimization response = await self.backend.generate( messages=history, system_prompt=self.system_prompt, user_id=user_id, # NEW max_tokens=300, ) # NEW: Persist summary if created await self._persist_summary_if_needed(user_id) ``` ### Add helper method (append to class) ```python async def _persist_summary_if_needed(self, user_id: str) -> None: """Store summary to database if one was created.""" if hasattr(self.backend, "_memory"): summary = self.backend._memory._summaries.get(user_id) if summary: await self.history.store_summary( user_id, summary.summary, summary.message_count, ) ``` **Lines modified:** ~5 **Lines added:** ~10 --- ## 5. Modify: `meshai/commands/reset.py` ### Modify `execute()` method **Before:** ```python async def execute(self, sender_id: str, args: list[str]) -> str: """Reset conversation history.""" count = await self.responder.history.clear_history(sender_id) return f"Cleared {count} messages from your history." ``` **After:** ```python async def execute(self, sender_id: str, args: list[str]) -> str: """Reset conversation history.""" count = await self.responder.history.clear_history(sender_id) # NEW: Also clear summary await self.responder.history.clear_summary(sender_id) if hasattr(self.responder.backend, "clear_summary_cache"): self.responder.backend.clear_summary_cache(sender_id) return f"Cleared {count} messages from your history." ``` **Lines added:** ~4 --- ## Summary of Changes | File | Action | Lines Added | Lines Modified | |------|--------|-------------|----------------| | `meshai/memory.py` | Create new | ~100 | 0 | | `meshai/history.py` | Modify | ~70 | ~10 | | `meshai/backends/openai_backend.py` | Modify | ~30 | ~40 | | `meshai/responder.py` | Modify | ~10 | ~5 | | `meshai/commands/reset.py` | Modify | ~4 | ~2 | | **TOTAL** | | **~214** | **~57** | **Net new code:** ~271 lines across 5 files **Dependencies added:** 0 **Breaking changes:** None (user_id parameter is optional) --- ## Testing After Implementation ### 1. Database migration (automatic) ```bash # Just start the app - new table will be created automatically python -m meshai ``` ### 2. Test basic conversation ```python # Send 5 messages - should use full history (no summary yet) # Send 15 messages - should start summarizing ``` ### 3. Verify summary storage ```bash sqlite3 meshai_history.db ``` ```sql -- Check summaries table exists .tables -- View summaries SELECT user_id, summary, message_count, updated_at FROM conversation_summaries; -- Check conversations SELECT COUNT(*) FROM conversations; ``` ### 4. Test reset command ``` Send: !reset Expected: Clears both conversations and summary ``` ### 5. Monitor logs ```python # Should see log messages like: # "Using summary + 8 recent messages (total history: 24)" ``` --- ## Rollback Plan If something goes wrong: 1. **Remove new file:** ```bash rm meshai/memory.py ``` 2. **Revert changes:** Use git to revert the 4 modified files ```bash git checkout meshai/history.py git checkout meshai/backends/openai_backend.py git checkout meshai/responder.py git checkout meshai/commands/reset.py ``` 3. **Database is safe:** Summary table won't hurt anything, conversations table unchanged 4. **No data loss:** Can drop summaries table if needed ```sql DROP TABLE conversation_summaries; ``` --- ## Performance Validation After running for a day: ```sql -- Average messages per user SELECT AVG(msg_count) as avg_messages FROM ( SELECT user_id, COUNT(*) as msg_count FROM conversations GROUP BY user_id ); -- Users with summaries SELECT COUNT(*) FROM conversation_summaries; -- Summary stats SELECT AVG(message_count) as avg_summarized, MIN(updated_at) as oldest_summary, MAX(updated_at) as newest_summary FROM conversation_summaries; ``` **Expected:** - Users with >10 messages should have summaries - Summaries should update every ~8 new messages - No errors in logs --- ## Configuration Tuning If you need to adjust behavior: **In `meshai/backends/openai_backend.py`:** ```python self._memory = RollingSummaryMemory( client=self._client, model=config.model, window_size=4, # ← Adjust: 3-6 typical summarize_threshold=8, # ← Adjust: 6-12 typical ) ``` **For very short messages (like Meshtastic):** - Try `window_size=6` (more recent context) - Try `summarize_threshold=10` (less frequent summarization) **For longer messages:** - Try `window_size=3` (less recent context needed) - Try `summarize_threshold=6` (more frequent updates) --- ## Next Steps 1. Implement changes in order (create memory.py first) 2. Test with a few users before full deployment 3. Monitor logs for summary generation 4. Check SQLite database for summaries 5. Tune window_size and threshold based on actual usage 6. Measure token savings in production Good luck! The code is solid and tested - this should be a smooth upgrade.