From b11874f01639c37026d8044bf3ef121cc9cf01c4 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Wed, 6 May 2026 15:54:43 +0000 Subject: [PATCH] feat(knowledge): add Qdrant backend with SQLite fallback - Add QdrantKnowledgeSearch class for hybrid dense+sparse vector search - Query RECON's 2.8M vector database via TEI embeddings + Qdrant - Uses RRF (Reciprocal Rank Fusion) for hybrid search merging - Extended KnowledgeConfig with Qdrant/TEI settings - Auto backend tries Qdrant first, falls back to SQLite FTS5 - Graceful degradation if RECON infrastructure unreachable Co-Authored-By: Claude Opus 4.5 --- meshai/config.py | 15 +++- meshai/knowledge.py | 207 ++++++++++++++++++++++++++++++++++++++++++++ meshai/main.py | 47 +++++++--- 3 files changed, 255 insertions(+), 14 deletions(-) diff --git a/meshai/config.py b/meshai/config.py index ca028ee..1e04f75 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -154,9 +154,22 @@ class MeshMonitorConfig: @dataclass class KnowledgeConfig: - """FTS5 knowledge base settings.""" + """Knowledge base settings.""" enabled: bool = False + backend: str = "auto" # "qdrant", "sqlite", or "auto" (try qdrant, fall back to sqlite) + + # Qdrant / RECON settings + qdrant_host: str = "" # e.g., "192.168.1.150" + qdrant_port: int = 6333 + qdrant_collection: str = "recon_knowledge_hybrid" + tei_host: str = "" # TEI embedding service host + tei_port: int = 8090 + sparse_host: str = "" # Sparse embedding service host + sparse_port: int = 8091 + use_sparse: bool = True # Enable hybrid dense+sparse search + + # SQLite fallback settings db_path: str = "" top_k: int = 5 diff --git a/meshai/knowledge.py b/meshai/knowledge.py index 3d5fda5..66f31d1 100644 --- a/meshai/knowledge.py +++ b/meshai/knowledge.py @@ -204,3 +204,210 @@ class KnowledgeSearch: pass self._conn = None self.available = False + + +class QdrantKnowledgeSearch: + """Hybrid knowledge search via RECON's Qdrant + TEI infrastructure. + + Uses the same embedding pipeline as RECON: + - Dense: TEI service with bge-m3 (1024-dim) + - Sparse: bge-m3-sparse service (optional) + - Search: Qdrant hybrid search with dense + sparse vectors + """ + + def __init__( + self, + qdrant_host: str, + qdrant_port: int = 6333, + collection: str = "recon_knowledge_hybrid", + tei_host: str = "", + tei_port: int = 8090, + sparse_host: str = "", + sparse_port: int = 8091, + use_sparse: bool = True, + top_k: int = 5, + ): + self.top_k = top_k + self.available = False + + self._qdrant_url = f"http://{qdrant_host}:{qdrant_port}" + self._collection = collection + self._tei_url = f"http://{tei_host or qdrant_host}:{tei_port}" + self._sparse_url = f"http://{sparse_host or qdrant_host}:{sparse_port}" + self._use_sparse = use_sparse + + # Test connectivity + try: + import urllib.request + import json + + # Test Qdrant + req = urllib.request.Request( + f"{self._qdrant_url}/collections/{self._collection}", + headers={"Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read()) + points = data.get("result", {}).get("points_count", 0) + logger.info(f"Qdrant connected: {collection} ({points} points)") + + # Test TEI + req = urllib.request.Request( + f"{self._tei_url}/embed", + data=json.dumps({"inputs": "test"}).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=10) as resp: + vec = json.loads(resp.read()) + if isinstance(vec, list) and vec: + logger.info(f"TEI connected: {len(vec[0])}-dim embeddings") + + self.available = True + logger.info("Qdrant knowledge search ready (RECON hybrid)") + + except Exception as e: + logger.warning(f"Qdrant knowledge search unavailable: {e}") + self.available = False + + def search(self, query: str) -> list[dict]: + """Search RECON's Qdrant collection. Returns same format as SQLite backend.""" + if not self.available: + return [] + + try: + # 1. Get dense embedding from TEI + dense_vec = self._embed_dense(query) + if not dense_vec: + return [] + + # 2. Get sparse embedding (optional) + sparse_vec = None + if self._use_sparse: + sparse_vec = self._embed_sparse(query) + + # 3. Search Qdrant + results = self._search_qdrant(dense_vec, sparse_vec) + + # 4. Format results to match SQLite backend interface + formatted = [] + for r in results[:self.top_k]: + payload = r.get("payload", {}) + content = payload.get("content", payload.get("summary", "")) + # Truncate content for prompt injection + if len(content) > 1000: + content = content[:1000] + + formatted.append({ + "title": payload.get("title", ""), + "excerpt": content, + "source": payload.get("source", ""), + "book_title": payload.get("book_title", ""), + }) + + logger.debug(f"Qdrant search: '{query[:50]}' -> {len(formatted)} results") + return formatted + + except Exception as e: + logger.warning(f"Qdrant search error: {e}") + return [] + + def _embed_dense(self, text: str) -> list[float]: + """Get dense embedding from TEI service.""" + import urllib.request + import json + + try: + req = urllib.request.Request( + f"{self._tei_url}/embed", + data=json.dumps({"inputs": text}).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=15) as resp: + data = json.loads(resp.read()) + if isinstance(data, list) and data: + return data[0] + return [] + except Exception as e: + logger.warning(f"TEI embed error: {e}") + return [] + + def _embed_sparse(self, text: str) -> dict: + """Get sparse embedding from sparse service.""" + import urllib.request + import json + + try: + req = urllib.request.Request( + f"{self._sparse_url}/embed_sparse", + data=json.dumps({"inputs": [text]}).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=15) as resp: + data = json.loads(resp.read()) + if isinstance(data, list) and data: + return data[0] # {indices: [...], values: [...]} + return None + except Exception as e: + logger.debug(f"Sparse embed error (non-critical): {e}") + return None + + def _search_qdrant(self, dense_vec: list[float], sparse_vec: dict = None) -> list[dict]: + """Search Qdrant collection with dense (and optionally sparse) vectors.""" + import urllib.request + import json + + # Build search request + if sparse_vec and sparse_vec.get("indices"): + # Hybrid: use prefetch with both dense and sparse + body = { + "prefetch": [ + { + "query": dense_vec, + "using": "", + "limit": self.top_k * 3, + }, + { + "query": { + "indices": sparse_vec["indices"], + "values": sparse_vec["values"], + }, + "using": "bge-m3-sparse", + "limit": self.top_k * 3, + }, + ], + "query": {"fusion": "rrf"}, + "limit": self.top_k, + "with_payload": ["content", "title", "summary", "domain", + "subdomain", "book_title", "source", "book_author"], + } + else: + # Dense only + body = { + "query": dense_vec, + "using": "", + "limit": self.top_k, + "with_payload": ["content", "title", "summary", "domain", + "subdomain", "book_title", "source", "book_author"], + } + + try: + req = urllib.request.Request( + f"{self._qdrant_url}/collections/{self._collection}/points/query", + data=json.dumps(body).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=15) as resp: + data = json.loads(resp.read()) + points = data.get("result", {}).get("points", []) + return points + except Exception as e: + logger.warning(f"Qdrant search error: {e}") + return [] + + def close(self): + """No persistent connection to close.""" + self.available = False diff --git a/meshai/main.py b/meshai/main.py index 16fc5cf..d130f22 100644 --- a/meshai/main.py +++ b/meshai/main.py @@ -284,20 +284,41 @@ class MeshAI: ) logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})") - # Knowledge base (optional - gracefully degrade if deps missing) + # Knowledge base (optional - Qdrant with SQLite fallback) kb_cfg = self.config.knowledge - if kb_cfg.enabled and kb_cfg.db_path: - try: - from .knowledge import KnowledgeSearch - self.knowledge = KnowledgeSearch( - db_path=kb_cfg.db_path, - top_k=kb_cfg.top_k, - ) - except ImportError as e: - logger.warning(f"Knowledge base disabled - missing dependencies: {e}") - self.knowledge = None - else: - self.knowledge = None + self.knowledge = None + if kb_cfg.enabled: + # Try Qdrant first if configured + if kb_cfg.backend in ("qdrant", "auto") and kb_cfg.qdrant_host: + try: + from .knowledge import QdrantKnowledgeSearch + qdrant = QdrantKnowledgeSearch( + qdrant_host=kb_cfg.qdrant_host, + qdrant_port=kb_cfg.qdrant_port, + collection=kb_cfg.qdrant_collection, + tei_host=kb_cfg.tei_host, + tei_port=kb_cfg.tei_port, + sparse_host=kb_cfg.sparse_host, + sparse_port=kb_cfg.sparse_port, + use_sparse=kb_cfg.use_sparse, + top_k=kb_cfg.top_k, + ) + if qdrant.available: + self.knowledge = qdrant + logger.info("Using Qdrant knowledge backend (RECON hybrid)") + except Exception as e: + logger.warning(f"Qdrant knowledge unavailable: {e}") + + # Fall back to SQLite if Qdrant failed or not configured + if not self.knowledge and kb_cfg.backend in ("sqlite", "auto") and kb_cfg.db_path: + try: + from .knowledge import KnowledgeSearch + self.knowledge = KnowledgeSearch( + db_path=kb_cfg.db_path, + top_k=kb_cfg.top_k, + ) + except ImportError as e: + logger.warning(f"SQLite knowledge disabled - missing dependencies: {e}") # Command dispatcher (needs mesh_reporter for health commands) self.dispatcher = create_dispatcher(