From 2945031071016369ca9b916d7f879de5642e6e20 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 23 Feb 2026 20:14:50 +0000 Subject: [PATCH] Unify memory implementations: delete AnthropicMemory and GoogleMemory RollingSummaryMemory now accepts a generic async summarize callable instead of hardcoding AsyncOpenAI. Each backend provides its own _summarize_messages() method that uses the appropriate API client. - Removed AnthropicMemory class from anthropic_backend.py - Removed GoogleMemory class from google_backend.py - Changed RollingSummaryMemory.__init__ signature to accept summarize_fn: Callable[[list[dict]], Awaitable[str]] - All three backends (OpenAI, Anthropic, Google) now instantiate the same RollingSummaryMemory class with backend-specific callables - Extracted shared summarize prompt to module-level _SUMMARIZE_PROMPT Co-Authored-By: Claude Opus 4.6 --- meshai/backends/anthropic_backend.py | 110 +++++++-------------------- meshai/backends/google_backend.py | 110 ++++++++------------------- meshai/backends/openai_backend.py | 40 +++++++++- meshai/memory.py | 52 ++----------- 4 files changed, 101 insertions(+), 211 deletions(-) diff --git a/meshai/backends/anthropic_backend.py b/meshai/backends/anthropic_backend.py index 03ef0ca..223a1bf 100644 --- a/meshai/backends/anthropic_backend.py +++ b/meshai/backends/anthropic_backend.py @@ -1,68 +1,17 @@ """Anthropic (Claude) LLM backend with rolling summary memory.""" import logging -import time from typing import Optional from anthropic import AsyncAnthropic from ..config import LLMConfig -from ..memory import ConversationSummary +from ..memory import RollingSummaryMemory from .base import LLMBackend logger = logging.getLogger(__name__) - -class AnthropicMemory: - """Rolling summary memory for Anthropic backend.""" - - def __init__(self, client: AsyncAnthropic, model: str, window_size: int = 4, summarize_threshold: int = 8): - self._client = client - self._model = model - self._window_size = window_size - self._summarize_threshold = summarize_threshold - self._summaries: dict[str, ConversationSummary] = {} - - async def get_context_messages( - self, user_id: str, full_history: list[dict] - ) -> tuple[Optional[str], list[dict]]: - """Get optimized context: summary + recent messages.""" - if len(full_history) <= self._window_size * 2: - return None, full_history - - split_point = -(self._window_size * 2) - old_messages = full_history[:split_point] - recent_messages = full_history[split_point:] - - summary = await self._get_or_create_summary(user_id, old_messages) - return summary.summary, recent_messages - - async def _get_or_create_summary(self, user_id: str, messages: list[dict]) -> ConversationSummary: - """Get cached summary or create new one.""" - if user_id in self._summaries: - cached = self._summaries[user_id] - if abs(cached.message_count - len(messages)) < self._summarize_threshold: - return cached - - logger.debug(f"Generating summary for {user_id} ({len(messages)} messages)") - summary_text = await self._summarize(messages) - - summary = ConversationSummary( - summary=summary_text, - last_updated=time.time(), - message_count=len(messages), - ) - self._summaries[user_id] = summary - return summary - - async def _summarize(self, messages: list[dict]) -> str: - """Generate summary using Anthropic.""" - if not messages: - return "No previous conversation." - - conversation = "\n".join([f"{msg['role'].upper()}: {msg['content']}" for msg in messages]) - - prompt = f"""Summarize this conversation in 2-3 concise sentences. Focus on: +_SUMMARIZE_PROMPT = """Summarize this conversation in 2-3 concise sentences. Focus on: - Main topics discussed - Important context or user preferences - Key information to remember @@ -72,30 +21,6 @@ Conversation: Summary (2-3 sentences):""" - try: - response = await self._client.messages.create( - model=self._model, - max_tokens=150, - messages=[{"role": "user", "content": prompt}], - ) - content = response.content[0].text if response.content else "" - return content.strip() if content else f"Previous conversation: {len(messages)} messages." - except Exception as e: - logger.warning(f"Failed to generate summary: {e}") - return f"Previous conversation: {len(messages)} messages about various topics." - - def load_summary(self, user_id: str, summary: ConversationSummary) -> None: - """Load summary from database into cache.""" - self._summaries[user_id] = summary - - def clear_summary(self, user_id: str) -> None: - """Clear cached summary for user.""" - self._summaries.pop(user_id, None) - - def get_cached_summary(self, user_id: str) -> Optional[ConversationSummary]: - """Get cached summary for user.""" - return self._summaries.get(user_id) - class AnthropicBackend(LLMBackend): """Anthropic Claude backend with rolling summary memory.""" @@ -117,13 +42,36 @@ class AnthropicBackend(LLMBackend): """ self.config = config self._client = AsyncAnthropic(api_key=api_key) - self._memory = AnthropicMemory( - client=self._client, - model=config.model, + + # Initialize rolling summary memory with Anthropic summarize function + self._memory = RollingSummaryMemory( + summarize_fn=self._summarize_messages, window_size=window_size, summarize_threshold=summarize_threshold, ) + async def _summarize_messages(self, messages: list[dict]) -> str: + """Summarize messages using Anthropic API.""" + if not messages: + return "No previous conversation." + + conversation = "\n".join( + [f"{msg['role'].upper()}: {msg['content']}" for msg in messages] + ) + prompt = _SUMMARIZE_PROMPT.format(conversation=conversation) + + try: + response = await self._client.messages.create( + model=self.config.model, + max_tokens=150, + messages=[{"role": "user", "content": prompt}], + ) + content = response.content[0].text if response.content else "" + return content.strip() if content else f"Previous conversation: {len(messages)} messages." + except Exception as e: + logger.warning(f"Failed to generate summary: {e}") + return f"Previous conversation: {len(messages)} messages about various topics." + async def generate( self, messages: list[dict], @@ -181,7 +129,7 @@ class AnthropicBackend(LLMBackend): logger.error(f"Anthropic API error: {e}") raise - def get_memory(self) -> AnthropicMemory: + def get_memory(self) -> RollingSummaryMemory: """Get the memory manager instance.""" return self._memory diff --git a/meshai/backends/google_backend.py b/meshai/backends/google_backend.py index 4e325ac..400e3d3 100644 --- a/meshai/backends/google_backend.py +++ b/meshai/backends/google_backend.py @@ -1,67 +1,17 @@ """Google Gemini LLM backend with rolling summary memory.""" import logging -import time from typing import Optional import google.generativeai as genai from ..config import LLMConfig -from ..memory import ConversationSummary +from ..memory import RollingSummaryMemory from .base import LLMBackend logger = logging.getLogger(__name__) - -class GoogleMemory: - """Rolling summary memory for Google backend.""" - - def __init__(self, model: genai.GenerativeModel, window_size: int = 4, summarize_threshold: int = 8): - self._model = model - self._window_size = window_size - self._summarize_threshold = summarize_threshold - self._summaries: dict[str, ConversationSummary] = {} - - async def get_context_messages( - self, user_id: str, full_history: list[dict] - ) -> tuple[Optional[str], list[dict]]: - """Get optimized context: summary + recent messages.""" - if len(full_history) <= self._window_size * 2: - return None, full_history - - split_point = -(self._window_size * 2) - old_messages = full_history[:split_point] - recent_messages = full_history[split_point:] - - summary = await self._get_or_create_summary(user_id, old_messages) - return summary.summary, recent_messages - - async def _get_or_create_summary(self, user_id: str, messages: list[dict]) -> ConversationSummary: - """Get cached summary or create new one.""" - if user_id in self._summaries: - cached = self._summaries[user_id] - if abs(cached.message_count - len(messages)) < self._summarize_threshold: - return cached - - logger.debug(f"Generating summary for {user_id} ({len(messages)} messages)") - summary_text = await self._summarize(messages) - - summary = ConversationSummary( - summary=summary_text, - last_updated=time.time(), - message_count=len(messages), - ) - self._summaries[user_id] = summary - return summary - - async def _summarize(self, messages: list[dict]) -> str: - """Generate summary using Google Gemini.""" - if not messages: - return "No previous conversation." - - conversation = "\n".join([f"{msg['role'].upper()}: {msg['content']}" for msg in messages]) - - prompt = f"""Summarize this conversation in 2-3 concise sentences. Focus on: +_SUMMARIZE_PROMPT = """Summarize this conversation in 2-3 concise sentences. Focus on: - Main topics discussed - Important context or user preferences - Key information to remember @@ -71,31 +21,6 @@ Conversation: Summary (2-3 sentences):""" - try: - response = await self._model.generate_content_async( - prompt, - generation_config=genai.types.GenerationConfig( - max_output_tokens=150, - temperature=0.3, - ), - ) - return response.text.strip() if response.text else f"Previous conversation: {len(messages)} messages." - except Exception as e: - logger.warning(f"Failed to generate summary: {e}") - return f"Previous conversation: {len(messages)} messages about various topics." - - def load_summary(self, user_id: str, summary: ConversationSummary) -> None: - """Load summary from database into cache.""" - self._summaries[user_id] = summary - - def clear_summary(self, user_id: str) -> None: - """Clear cached summary for user.""" - self._summaries.pop(user_id, None) - - def get_cached_summary(self, user_id: str) -> Optional[ConversationSummary]: - """Get cached summary for user.""" - return self._summaries.get(user_id) - class GoogleBackend(LLMBackend): """Google Gemini backend with rolling summary memory.""" @@ -118,12 +43,37 @@ class GoogleBackend(LLMBackend): self.config = config genai.configure(api_key=api_key) self._model = genai.GenerativeModel(config.model) - self._memory = GoogleMemory( - model=self._model, + + # Initialize rolling summary memory with Gemini summarize function + self._memory = RollingSummaryMemory( + summarize_fn=self._summarize_messages, window_size=window_size, summarize_threshold=summarize_threshold, ) + async def _summarize_messages(self, messages: list[dict]) -> str: + """Summarize messages using Google Gemini API.""" + if not messages: + return "No previous conversation." + + conversation = "\n".join( + [f"{msg['role'].upper()}: {msg['content']}" for msg in messages] + ) + prompt = _SUMMARIZE_PROMPT.format(conversation=conversation) + + try: + response = await self._model.generate_content_async( + prompt, + generation_config=genai.types.GenerationConfig( + max_output_tokens=150, + temperature=0.3, + ), + ) + return response.text.strip() if response.text else f"Previous conversation: {len(messages)} messages." + except Exception as e: + logger.warning(f"Failed to generate summary: {e}") + return f"Previous conversation: {len(messages)} messages about various topics." + async def generate( self, messages: list[dict], @@ -196,7 +146,7 @@ class GoogleBackend(LLMBackend): logger.error(f"Google API error: {e}") raise - def get_memory(self) -> GoogleMemory: + def get_memory(self) -> RollingSummaryMemory: """Get the memory manager instance.""" return self._memory diff --git a/meshai/backends/openai_backend.py b/meshai/backends/openai_backend.py index 62385b3..9fc263c 100644 --- a/meshai/backends/openai_backend.py +++ b/meshai/backends/openai_backend.py @@ -6,11 +6,21 @@ from typing import Optional from openai import AsyncOpenAI from ..config import LLMConfig -from ..memory import ConversationSummary, RollingSummaryMemory +from ..memory import RollingSummaryMemory from .base import LLMBackend logger = logging.getLogger(__name__) +_SUMMARIZE_PROMPT = """Summarize this conversation in 2-3 concise sentences. Focus on: +- Main topics discussed +- Important context or user preferences +- Key information to remember + +Conversation: +{conversation} + +Summary (2-3 sentences):""" + class OpenAIBackend(LLMBackend): """OpenAI-compatible backend (works with OpenAI, LiteLLM, local models).""" @@ -36,14 +46,36 @@ class OpenAIBackend(LLMBackend): base_url=config.base_url, ) - # Initialize rolling summary memory for context optimization + # Initialize rolling summary memory with OpenAI summarize function self._memory = RollingSummaryMemory( - client=self._client, - model=config.model, + summarize_fn=self._summarize_messages, window_size=window_size, summarize_threshold=summarize_threshold, ) + async def _summarize_messages(self, messages: list[dict]) -> str: + """Summarize messages using OpenAI API.""" + if not messages: + return "No previous conversation." + + conversation = "\n".join( + [f"{msg['role'].upper()}: {msg['content']}" for msg in messages] + ) + prompt = _SUMMARIZE_PROMPT.format(conversation=conversation) + + try: + response = await self._client.chat.completions.create( + model=self.config.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=150, + temperature=0.3, + ) + content = response.choices[0].message.content + return content.strip() if content else f"Previous conversation: {len(messages)} messages." + except Exception as e: + logger.warning(f"Failed to generate summary: {e}") + return f"Previous conversation: {len(messages)} messages about various topics." + async def generate( self, messages: list[dict], diff --git a/meshai/memory.py b/meshai/memory.py index 5662017..0662bd5 100644 --- a/meshai/memory.py +++ b/meshai/memory.py @@ -3,9 +3,7 @@ import logging import time from dataclasses import dataclass -from typing import Optional - -from openai import AsyncOpenAI +from typing import Awaitable, Callable, Optional logger = logging.getLogger(__name__) @@ -39,21 +37,19 @@ class RollingSummaryMemory: def __init__( self, - client: AsyncOpenAI, - model: str, + summarize_fn: Callable[[list[dict]], Awaitable[str]], window_size: int = 4, summarize_threshold: int = 8, ): """Initialize rolling summary memory. Args: - client: AsyncOpenAI client for generating summaries - model: Model name to use for summarization + summarize_fn: Async callable that takes a list of messages and returns + a summary string. Each backend provides its own implementation. window_size: Number of recent message pairs to keep in full summarize_threshold: Messages to accumulate before re-summarizing """ - self._client = client - self._model = model + self._summarize_fn = summarize_fn self._window_size = window_size self._summarize_threshold = summarize_threshold @@ -105,7 +101,7 @@ class RollingSummaryMemory: # Generate new summary logger.debug(f"Generating summary for {user_id} ({len(messages)} messages)") - summary_text = await self._summarize(messages) + summary_text = await self._summarize_fn(messages) summary = ConversationSummary( summary=summary_text, @@ -116,42 +112,6 @@ class RollingSummaryMemory: self._summaries[user_id] = summary return summary - async def _summarize(self, messages: list[dict]) -> str: - """Generate summary using LLM.""" - if not messages: - return "No previous conversation." - - # Format conversation - conversation = "\n".join( - [f"{msg['role'].upper()}: {msg['content']}" for msg in messages] - ) - - prompt = f"""Summarize this conversation in 2-3 concise sentences. Focus on: -- Main topics discussed -- Important context or user preferences -- Key information to remember - -Conversation: -{conversation} - -Summary (2-3 sentences):""" - - try: - response = await self._client.chat.completions.create( - model=self._model, - messages=[{"role": "user", "content": prompt}], - max_tokens=150, - temperature=0.3, - ) - - content = response.choices[0].message.content - return content.strip() if content else f"Previous conversation: {len(messages)} messages." - - except Exception as e: - logger.warning(f"Failed to generate summary: {e}") - # Fallback - provide basic context - return f"Previous conversation: {len(messages)} messages about various topics." - def load_summary(self, user_id: str, summary: ConversationSummary) -> None: """Load summary from database into cache.""" self._summaries[user_id] = summary