mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-05-21 23:24:44 +02:00
feat(knowledge): add Qdrant backend with SQLite fallback
- Add QdrantKnowledgeSearch class for hybrid dense+sparse vector search - Query RECON's 2.8M vector database via TEI embeddings + Qdrant - Uses RRF (Reciprocal Rank Fusion) for hybrid search merging - Extended KnowledgeConfig with Qdrant/TEI settings - Auto backend tries Qdrant first, falls back to SQLite FTS5 - Graceful degradation if RECON infrastructure unreachable Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
6119f33f57
commit
b11874f016
3 changed files with 255 additions and 14 deletions
|
|
@ -154,9 +154,22 @@ class MeshMonitorConfig:
|
|||
|
||||
@dataclass
|
||||
class KnowledgeConfig:
|
||||
"""FTS5 knowledge base settings."""
|
||||
"""Knowledge base settings."""
|
||||
|
||||
enabled: bool = False
|
||||
backend: str = "auto" # "qdrant", "sqlite", or "auto" (try qdrant, fall back to sqlite)
|
||||
|
||||
# Qdrant / RECON settings
|
||||
qdrant_host: str = "" # e.g., "192.168.1.150"
|
||||
qdrant_port: int = 6333
|
||||
qdrant_collection: str = "recon_knowledge_hybrid"
|
||||
tei_host: str = "" # TEI embedding service host
|
||||
tei_port: int = 8090
|
||||
sparse_host: str = "" # Sparse embedding service host
|
||||
sparse_port: int = 8091
|
||||
use_sparse: bool = True # Enable hybrid dense+sparse search
|
||||
|
||||
# SQLite fallback settings
|
||||
db_path: str = ""
|
||||
top_k: int = 5
|
||||
|
||||
|
|
|
|||
|
|
@ -204,3 +204,210 @@ class KnowledgeSearch:
|
|||
pass
|
||||
self._conn = None
|
||||
self.available = False
|
||||
|
||||
|
||||
class QdrantKnowledgeSearch:
|
||||
"""Hybrid knowledge search via RECON's Qdrant + TEI infrastructure.
|
||||
|
||||
Uses the same embedding pipeline as RECON:
|
||||
- Dense: TEI service with bge-m3 (1024-dim)
|
||||
- Sparse: bge-m3-sparse service (optional)
|
||||
- Search: Qdrant hybrid search with dense + sparse vectors
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
qdrant_host: str,
|
||||
qdrant_port: int = 6333,
|
||||
collection: str = "recon_knowledge_hybrid",
|
||||
tei_host: str = "",
|
||||
tei_port: int = 8090,
|
||||
sparse_host: str = "",
|
||||
sparse_port: int = 8091,
|
||||
use_sparse: bool = True,
|
||||
top_k: int = 5,
|
||||
):
|
||||
self.top_k = top_k
|
||||
self.available = False
|
||||
|
||||
self._qdrant_url = f"http://{qdrant_host}:{qdrant_port}"
|
||||
self._collection = collection
|
||||
self._tei_url = f"http://{tei_host or qdrant_host}:{tei_port}"
|
||||
self._sparse_url = f"http://{sparse_host or qdrant_host}:{sparse_port}"
|
||||
self._use_sparse = use_sparse
|
||||
|
||||
# Test connectivity
|
||||
try:
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
# Test Qdrant
|
||||
req = urllib.request.Request(
|
||||
f"{self._qdrant_url}/collections/{self._collection}",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read())
|
||||
points = data.get("result", {}).get("points_count", 0)
|
||||
logger.info(f"Qdrant connected: {collection} ({points} points)")
|
||||
|
||||
# Test TEI
|
||||
req = urllib.request.Request(
|
||||
f"{self._tei_url}/embed",
|
||||
data=json.dumps({"inputs": "test"}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
vec = json.loads(resp.read())
|
||||
if isinstance(vec, list) and vec:
|
||||
logger.info(f"TEI connected: {len(vec[0])}-dim embeddings")
|
||||
|
||||
self.available = True
|
||||
logger.info("Qdrant knowledge search ready (RECON hybrid)")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Qdrant knowledge search unavailable: {e}")
|
||||
self.available = False
|
||||
|
||||
def search(self, query: str) -> list[dict]:
|
||||
"""Search RECON's Qdrant collection. Returns same format as SQLite backend."""
|
||||
if not self.available:
|
||||
return []
|
||||
|
||||
try:
|
||||
# 1. Get dense embedding from TEI
|
||||
dense_vec = self._embed_dense(query)
|
||||
if not dense_vec:
|
||||
return []
|
||||
|
||||
# 2. Get sparse embedding (optional)
|
||||
sparse_vec = None
|
||||
if self._use_sparse:
|
||||
sparse_vec = self._embed_sparse(query)
|
||||
|
||||
# 3. Search Qdrant
|
||||
results = self._search_qdrant(dense_vec, sparse_vec)
|
||||
|
||||
# 4. Format results to match SQLite backend interface
|
||||
formatted = []
|
||||
for r in results[:self.top_k]:
|
||||
payload = r.get("payload", {})
|
||||
content = payload.get("content", payload.get("summary", ""))
|
||||
# Truncate content for prompt injection
|
||||
if len(content) > 1000:
|
||||
content = content[:1000]
|
||||
|
||||
formatted.append({
|
||||
"title": payload.get("title", ""),
|
||||
"excerpt": content,
|
||||
"source": payload.get("source", ""),
|
||||
"book_title": payload.get("book_title", ""),
|
||||
})
|
||||
|
||||
logger.debug(f"Qdrant search: '{query[:50]}' -> {len(formatted)} results")
|
||||
return formatted
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Qdrant search error: {e}")
|
||||
return []
|
||||
|
||||
def _embed_dense(self, text: str) -> list[float]:
|
||||
"""Get dense embedding from TEI service."""
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{self._tei_url}/embed",
|
||||
data=json.dumps({"inputs": text}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
data = json.loads(resp.read())
|
||||
if isinstance(data, list) and data:
|
||||
return data[0]
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.warning(f"TEI embed error: {e}")
|
||||
return []
|
||||
|
||||
def _embed_sparse(self, text: str) -> dict:
|
||||
"""Get sparse embedding from sparse service."""
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{self._sparse_url}/embed_sparse",
|
||||
data=json.dumps({"inputs": [text]}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
data = json.loads(resp.read())
|
||||
if isinstance(data, list) and data:
|
||||
return data[0] # {indices: [...], values: [...]}
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.debug(f"Sparse embed error (non-critical): {e}")
|
||||
return None
|
||||
|
||||
def _search_qdrant(self, dense_vec: list[float], sparse_vec: dict = None) -> list[dict]:
|
||||
"""Search Qdrant collection with dense (and optionally sparse) vectors."""
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
# Build search request
|
||||
if sparse_vec and sparse_vec.get("indices"):
|
||||
# Hybrid: use prefetch with both dense and sparse
|
||||
body = {
|
||||
"prefetch": [
|
||||
{
|
||||
"query": dense_vec,
|
||||
"using": "",
|
||||
"limit": self.top_k * 3,
|
||||
},
|
||||
{
|
||||
"query": {
|
||||
"indices": sparse_vec["indices"],
|
||||
"values": sparse_vec["values"],
|
||||
},
|
||||
"using": "bge-m3-sparse",
|
||||
"limit": self.top_k * 3,
|
||||
},
|
||||
],
|
||||
"query": {"fusion": "rrf"},
|
||||
"limit": self.top_k,
|
||||
"with_payload": ["content", "title", "summary", "domain",
|
||||
"subdomain", "book_title", "source", "book_author"],
|
||||
}
|
||||
else:
|
||||
# Dense only
|
||||
body = {
|
||||
"query": dense_vec,
|
||||
"using": "",
|
||||
"limit": self.top_k,
|
||||
"with_payload": ["content", "title", "summary", "domain",
|
||||
"subdomain", "book_title", "source", "book_author"],
|
||||
}
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{self._qdrant_url}/collections/{self._collection}/points/query",
|
||||
data=json.dumps(body).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
data = json.loads(resp.read())
|
||||
points = data.get("result", {}).get("points", [])
|
||||
return points
|
||||
except Exception as e:
|
||||
logger.warning(f"Qdrant search error: {e}")
|
||||
return []
|
||||
|
||||
def close(self):
|
||||
"""No persistent connection to close."""
|
||||
self.available = False
|
||||
|
|
|
|||
|
|
@ -284,20 +284,41 @@ class MeshAI:
|
|||
)
|
||||
logger.info(f"Alert engine initialized (critical: {mi.critical_nodes}, channel: {mi.alert_channel})")
|
||||
|
||||
# Knowledge base (optional - gracefully degrade if deps missing)
|
||||
# Knowledge base (optional - Qdrant with SQLite fallback)
|
||||
kb_cfg = self.config.knowledge
|
||||
if kb_cfg.enabled and kb_cfg.db_path:
|
||||
try:
|
||||
from .knowledge import KnowledgeSearch
|
||||
self.knowledge = KnowledgeSearch(
|
||||
db_path=kb_cfg.db_path,
|
||||
top_k=kb_cfg.top_k,
|
||||
)
|
||||
except ImportError as e:
|
||||
logger.warning(f"Knowledge base disabled - missing dependencies: {e}")
|
||||
self.knowledge = None
|
||||
else:
|
||||
self.knowledge = None
|
||||
self.knowledge = None
|
||||
if kb_cfg.enabled:
|
||||
# Try Qdrant first if configured
|
||||
if kb_cfg.backend in ("qdrant", "auto") and kb_cfg.qdrant_host:
|
||||
try:
|
||||
from .knowledge import QdrantKnowledgeSearch
|
||||
qdrant = QdrantKnowledgeSearch(
|
||||
qdrant_host=kb_cfg.qdrant_host,
|
||||
qdrant_port=kb_cfg.qdrant_port,
|
||||
collection=kb_cfg.qdrant_collection,
|
||||
tei_host=kb_cfg.tei_host,
|
||||
tei_port=kb_cfg.tei_port,
|
||||
sparse_host=kb_cfg.sparse_host,
|
||||
sparse_port=kb_cfg.sparse_port,
|
||||
use_sparse=kb_cfg.use_sparse,
|
||||
top_k=kb_cfg.top_k,
|
||||
)
|
||||
if qdrant.available:
|
||||
self.knowledge = qdrant
|
||||
logger.info("Using Qdrant knowledge backend (RECON hybrid)")
|
||||
except Exception as e:
|
||||
logger.warning(f"Qdrant knowledge unavailable: {e}")
|
||||
|
||||
# Fall back to SQLite if Qdrant failed or not configured
|
||||
if not self.knowledge and kb_cfg.backend in ("sqlite", "auto") and kb_cfg.db_path:
|
||||
try:
|
||||
from .knowledge import KnowledgeSearch
|
||||
self.knowledge = KnowledgeSearch(
|
||||
db_path=kb_cfg.db_path,
|
||||
top_k=kb_cfg.top_k,
|
||||
)
|
||||
except ImportError as e:
|
||||
logger.warning(f"SQLite knowledge disabled - missing dependencies: {e}")
|
||||
|
||||
# Command dispatcher (needs mesh_reporter for health commands)
|
||||
self.dispatcher = create_dispatcher(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue