From 81611110eb3deb87eb59de875b44cc3f8142a469 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 7 May 2026 22:33:14 +0000 Subject: [PATCH] =?UTF-8?q?implement=20three-tier=20cascade:=20Qdrant=20?= =?UTF-8?q?=E2=86=92=20Kiwix=20=E2=86=92=20SearXNG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Kiwix integration with HTML parser for offline Wikipedia search - Add SearXNG integration for web search fallback - Cascade triggered when FlashRank top-1 score < 0.5 threshold - Context tagging: [DOMAIN_KNOWLEDGE], [OFFLINE_WIKI], [WEB_SEARCH] - Cascade decision logging to /opt/recon/logs/cascade.jsonl - Graceful degradation: skip unavailable tiers - Version bumped to 5.0.0 Co-Authored-By: Claude Opus 4.5 --- tools/recon_rag_tool.py | 2772 +++++++++++++++++++++++---------------- 1 file changed, 1653 insertions(+), 1119 deletions(-) diff --git a/tools/recon_rag_tool.py b/tools/recon_rag_tool.py index b0bc0cf..c835864 100644 --- a/tools/recon_rag_tool.py +++ b/tools/recon_rag_tool.py @@ -1,1119 +1,1653 @@ -""" -title: RECON Knowledge Base -author: Echo6 -version: 4.3.0 -description: RAG filter that searches the RECON knowledge base and injects reference material into Aurora's context. Emits citations with PDF download links. Supports intent-based metadata filtering, FlashRank neural reranking with MMR diversity, Ollama-powered query expansion, transcript source boosting, semantic query routing with inline navigation, and address book place resolution. -""" - -import logging -import json -import math -import re -import threading -from typing import Optional, Callable, Awaitable -from concurrent.futures import ThreadPoolExecutor, as_completed - -import requests -from pydantic import BaseModel, Field - -log = logging.getLogger(__name__) - -# Module-level source store: keyed by chat_id so inlet/outlet share state -# even if OWI instantiates separate Filter objects per call. -_SOURCE_STORE: dict[str, list] = {} - -# ── Semantic Query Router (v4.3.0) ─────────────────────────────────────────── -ROUTE_EXAMPLES = { - "nav_route": [ - "how do I get to Boise", - "directions to Twin Falls", - "how do I get from Buhl to Boise", - "drive from Jerome to Sun Valley", - "route from Boise to McCall", - "what's the fastest way to Sun Valley", - "how far is it to Twin Falls", - "take me to Shoshone", - "navigate to the airport", - "how do I drive to Salt Lake City", - "walking directions to the park", - "bike route to downtown", - ], - "nav_reverse_geocode": [ - "what town is at 42.5, -114.7", - "where am I right now", - "what is at coordinates 43.6, -116.2", - "what location is 42.574, -114.607", - "where is this place 44.0, -114.3", - "what city is near 42.7, -114.5", - "reverse geocode 43.0, -115.0", - "what's at this location 42.9, -114.8", - ], - "direct_answer": [ - "hello", - "hey aurora", - "good morning", - "thanks", - "thank you", - "what's your name", - "who are you", - "tell me a joke", - "how are you", - "hi there", - ], - "rag_search": [ - "what does the survival manual say about water", - "how to purify water in the field", - "how to treat a gunshot wound", - "what is the ranger handbook chapter on patrolling", - "field manual water purification", - "how to build a shelter in the wilderness", - "tactical combat casualty care procedures", - "what does FM 21-76 say about fire starting", - ], -} - -_ROUTE_CENTROIDS: dict | None = None -_ROUTER_LOCK = threading.Lock() - - -def _embed_batch_router(texts: list[str], tei_url: str) -> list[list[float]]: - resp = requests.post(tei_url, json={"inputs": texts}, timeout=30) - resp.raise_for_status() - return resp.json() - - -def _compute_centroid(vectors: list[list[float]]) -> list[float]: - n = len(vectors) - dim = len(vectors[0]) - centroid = [0.0] * dim - for vec in vectors: - for i in range(dim): - centroid[i] += vec[i] - for i in range(dim): - centroid[i] /= n - return centroid - - -def _cosine_similarity(a: list[float], b: list[float]) -> float: - dot = 0.0 - norm_a = 0.0 - norm_b = 0.0 - for i in range(len(a)): - dot += a[i] * b[i] - norm_a += a[i] * a[i] - norm_b += b[i] * b[i] - denom = math.sqrt(norm_a) * math.sqrt(norm_b) - if denom == 0: - return 0.0 - return dot / denom - - -def _ensure_centroids(tei_url: str) -> dict[str, list[float]]: - global _ROUTE_CENTROIDS - if _ROUTE_CENTROIDS is not None: - return _ROUTE_CENTROIDS - with _ROUTER_LOCK: - if _ROUTE_CENTROIDS is not None: - return _ROUTE_CENTROIDS - all_texts = [] - route_ranges: dict[str, tuple[int, int]] = {} - offset = 0 - for route, examples in ROUTE_EXAMPLES.items(): - route_ranges[route] = (offset, offset + len(examples)) - all_texts.extend(examples) - offset += len(examples) - all_vectors = _embed_batch_router(all_texts, tei_url) - centroids = {} - for route, (start, end) in route_ranges.items(): - centroids[route] = _compute_centroid(all_vectors[start:end]) - _ROUTE_CENTROIDS = centroids - return _ROUTE_CENTROIDS - - -def _classify_query( - query: str, - tei_url: str, - threshold: float = 0.45, -) -> tuple[str, float]: - """Classify query intent. Returns ("rag_search", 0.0) on any failure.""" - try: - centroids = _ensure_centroids(tei_url) - vecs = _embed_batch_router([query], tei_url) - query_vec = vecs[0] - best_route = "rag_search" - best_score = 0.0 - for route, centroid in centroids.items(): - sim = _cosine_similarity(query_vec, centroid) - if sim > best_score: - best_score = sim - best_route = route - if best_score < threshold: - return ("rag_search", best_score) - return (best_route, best_score) - except Exception as e: - log.warning(f"Router classification failed: {e}") - return ("rag_search", 0.0) - - -# ── Navigation handlers (v4.3.0) ───────────────────────────────────────────── -_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$') -_FROM_TO_RE = re.compile(r'from\s+(.+?)\s+to\s+(.+?)(?:\s+by\s+\w+)?$', re.IGNORECASE) -_TO_RE = re.compile(r'(?:to|towards?)\s+(?:the\s+)?(.+?)$', re.IGNORECASE) -_COORD_IN_TEXT_RE = re.compile(r'(-?\d+\.?\d+)\s*,\s*(-?\d+\.?\d+)') -_MODE_MAP = { - "walk": "pedestrian", "walking": "pedestrian", "foot": "pedestrian", "pedestrian": "pedestrian", - "bike": "bicycle", "cycling": "bicycle", "bicycle": "bicycle", "cycle": "bicycle", - "truck": "truck", "lorry": "truck", - "drive": "auto", "driving": "auto", "car": "auto", "auto": "auto", -} - - -def _detect_mode(query: str) -> str: - q = query.lower() - for keyword, mode in _MODE_MAP.items(): - if keyword in q: - return mode - return "auto" - - -def _clean_place(text: str) -> str: - """Clean a place string for geocoding: strip articles, punctuation, normalize 'in' to comma.""" - s = text.strip().rstrip('?.,!') - # Strip leading articles - s = re.sub(r'^(the|a|an)\s+', '', s, flags=re.IGNORECASE) - # "214 North St in Filer ID" → "214 North St, Filer, ID" - s = re.sub(r'\s+in\s+', ', ', s, count=1, flags=re.IGNORECASE) - return s.strip() - - -def _parse_nav_query(query: str) -> tuple[str, str, str] | None: - mode = _detect_mode(query) - m = _FROM_TO_RE.search(query) - if m: - return (_clean_place(m.group(1)), _clean_place(m.group(2)), mode) - m = _TO_RE.search(query) - if m: - dest = _clean_place(m.group(1)) - if dest: - return (None, dest, mode) - return None - - -def _geocode(query: str, photon_url: str, address_book_url: str = "") -> tuple[float, float, str] | tuple[None, None, None]: - m = _COORD_RE.match(query.strip()) - if m: - lat, lon = float(m.group(1)), float(m.group(2)) - return lat, lon, query - # Address book lookup (before Photon) - ab = _address_book_lookup(query, address_book_url) - if ab: - return ab['lat'], ab['lon'], ab.get('address') or ab['name'] - resp = requests.get( - f"{photon_url}/api", - params={"q": query, "limit": 1}, - timeout=10, - ) - resp.raise_for_status() - features = resp.json().get("features", []) - if not features: - return None, None, None - props = features[0]["properties"] - coords = features[0]["geometry"]["coordinates"] - parts = [props.get("name", "")] - for key in ("city", "state", "country"): - v = props.get(key) - if v and v != parts[-1]: - parts.append(v) - return coords[1], coords[0], ", ".join(p for p in parts if p) - - -def _route_valhalla( - orig: tuple[float, float], - dest: tuple[float, float], - mode: str, - valhalla_url: str, -) -> str | None: - try: - resp = requests.post( - f"{valhalla_url}/route", - json={ - "locations": [ - {"lat": orig[0], "lon": orig[1]}, - {"lat": dest[0], "lon": dest[1]}, - ], - "costing": mode, - "directions_options": {"units": "miles"}, - }, - timeout=30, - ) - except requests.RequestException: - return None - if resp.status_code != 200: - return None - trip = resp.json()["trip"] - summary = trip["summary"] - legs = trip["legs"][0]["maneuvers"] - miles = round(summary["length"], 1) - minutes = round(summary["time"] / 60, 1) - lines = [f"Distance: {miles} miles | Time: {minutes} minutes", ""] - for i, m in enumerate(legs, 1): - inst = m["instruction"] - dist = m.get("length", 0) - if dist > 0: - lines.append(f"{i}. {inst} — {round(dist, 1)} mi") - else: - lines.append(f"{i}. {inst}") - return "\n".join(lines) - - -def _handle_nav_route( - query: str, - photon_url: str, - valhalla_url: str, - default_origin: str, - address_book_url: str = "", -) -> str | None: - parsed = _parse_nav_query(query) - if not parsed: - return None - origin_str, dest_str, mode = parsed - if not origin_str: - origin_str = default_origin - orig_lat, orig_lon, orig_name = _geocode(origin_str, photon_url, address_book_url) - if orig_lat is None: - return None - dest_lat, dest_lon, dest_name = _geocode(dest_str, photon_url, address_book_url) - if dest_lat is None: - return None - directions = _route_valhalla( - (orig_lat, orig_lon), (dest_lat, dest_lon), mode, valhalla_url - ) - if not directions: - return None - return f"Directions from {orig_name} to {dest_name} ({mode}):\n{directions}" - - -def _handle_reverse_geocode(query: str, photon_url: str) -> str | None: - m = _COORD_IN_TEXT_RE.search(query) - if not m: - return None - lat, lon = float(m.group(1)), float(m.group(2)) - try: - resp = requests.get( - f"{photon_url}/reverse", - params={"lat": lat, "lon": lon, "limit": 1}, - timeout=10, - ) - resp.raise_for_status() - features = resp.json().get("features", []) - if not features: - return f"No location found near coordinates ({lat}, {lon})" - props = features[0]["properties"] - parts = [] - for key in ("name", "city", "state", "country"): - v = props.get(key) - if v and v not in parts: - parts.append(v) - display = ", ".join(parts) if parts else "Unknown location" - return f"Location: {display} ({lat}, {lon})" - except Exception: - return None - - -def _inject_nav_context(body: dict, context: str): - messages = body.get("messages", []) - nav_block = ( - "\n\n---NAVIGATION RESULT---\n\n" - f"{context}\n\n" - "---END NAVIGATION RESULT---\n\n" - "Present these directions to the user exactly as provided. " - "Do not summarize or omit steps. You may add brief contextual notes." - ) - system_msg = next((m for m in messages if m.get("role") == "system"), None) - if system_msg: - system_msg["content"] = system_msg["content"] + nav_block - else: - body["messages"].insert(0, {"role": "system", "content": nav_block}) - - - -def _address_book_lookup(query: str, address_book_url: str) -> dict | None: - """Check RECON address book for exact place match. Returns dict with lat/lon or None.""" - if not address_book_url: - return None - try: - resp = requests.get( - f"{address_book_url}/api/address_book/lookup", - params={"q": query}, - timeout=2, - ) - if resp.status_code == 200: - data = resp.json() - if data.get("confidence") == "exact" and data.get("lat") and data.get("lon"): - log.info(f"Address book hit: {query!r} → {data['name']} ({data['lat']}, {data['lon']})") - return data - return None - except Exception: - return None - - -# ── End router/nav code ────────────────────────────────────────────────────── - -# Subdomains excluded from Medical results when tactical context detected -_OBSTETRIC_SUBDOMAINS = [ - "Obstetrics", "Midwifery", "Pregnancy", "Pregnancy Care", - "High-Risk Pregnancy", "Childbirth", "Postpartum Care", - "Family Planning", "Contraception", "Breastfeeding", - "Labor Complications", "Twin Delivery", -] - -# Query intent patterns — compiled once at import time -_PROCEDURAL_RE = re.compile( - r"^(how\s+(do|can|should|would|to)\b|steps?\s+(to|for)\b|procedure\s+for\b|technique\s+for\b|way\s+to\b|method\s+(to|for)\b|guide\s+(to|for)\b|instructions?\s+for\b)", - re.IGNORECASE, -) -_FOUNDATIONAL_RE = re.compile( - r"^(what\s+(is|are|does|was|were)\b|explain\b|define\b|why\s+(does|do|is|are|did)\b|describe\b|meaning\s+of\b|difference\s+between\b)", - re.IGNORECASE, -) - -# Tactical keyword patterns for obstetric subdomain exclusion -_TACTICAL_RE = re.compile( - r"\b(tactical|combat|tccc|casevac|medevac|casualty|triage|tourniquet|hemorrhage|wound packing|chest seal|care under fire|point of injury|far forward|buddy aid|self aid|field care|9-line|march algorithm)\b", - re.IGNORECASE, -) - - -def _rerank_by_keyword_overlap(query: str, results: list) -> list: - """Rerank results by boosting those with query term overlap in content/summary/key_facts. - - Adds a boost of up to 0.15 based on the fraction of query tokens found in the result text. - Results are re-sorted by boosted score. - """ - q_tokens = set(re.findall(r'[a-z0-9][-a-z0-9]{2,}', query.lower())) - if not q_tokens: - return results - - reranked = [] - for r in results: - p = r.get("payload", {}) - score = r.get("score", 0) - - # Build searchable text from content, summary, and key_facts - parts = [] - content = p.get("content", "") - if content: - parts.append(content[:2000].lower()) - summary = p.get("summary", "") - if summary: - parts.append(summary.lower()) - key_facts = p.get("key_facts", []) - if isinstance(key_facts, list): - parts.append(" ".join(str(f) for f in key_facts).lower()) - searchable = " ".join(parts) - - # Count how many query tokens appear in the result - if searchable: - matches = sum(1 for t in q_tokens if t in searchable) - overlap_ratio = matches / len(q_tokens) - else: - overlap_ratio = 0 - - # Boost: up to 0.15 for perfect overlap - boosted_score = score + (overlap_ratio * 0.15) - reranked.append({**r, "score": boosted_score}) - - reranked.sort(key=lambda x: -x["score"]) - return reranked - - -class Filter: - class Valves(BaseModel): - tei_url: str = Field( - default="http://100.64.0.14:8090/embed", - description="TEI embedding endpoint", - ) - qdrant_url: str = Field( - default="http://100.64.0.14:6333", - description="Qdrant REST API base URL", - ) - collection: str = Field( - default="recon_knowledge_hybrid", - description="Qdrant collection name", - ) - top_k: int = Field( - default=8, - description="Number of results to retrieve", - ) - score_threshold: float = Field( - default=0.3, - description="Minimum similarity score to include a result", - ) - fallback_min: int = Field( - default=3, - description="Minimum filtered results before falling back to unfiltered search", - ) - candidate_limit: int = Field( - default=50, - description="Initial retrieval pool size for reranking", - ) - rerank_top_n: int = Field( - default=20, - description="Keep top N after FlashRank reranking", - ) - mmr_diversity: float = Field( - default=0.3, - description="MMR diversity 0-1 (0=pure relevance, 1=max diversity)", - ) - enabled: bool = Field( - default=True, - description="Enable/disable RECON RAG augmentation", - ) - priority: int = Field( - default=0, - description="Filter execution priority (lower = earlier)", - ) - router_enabled: bool = Field( - default=True, - description="Enable semantic query routing", - ) - router_threshold: float = Field( - default=0.45, - description="Min confidence for route classification", - ) - photon_url: str = Field( - default="http://100.64.0.24:2322", - description="Photon geocoder URL", - ) - valhalla_url: str = Field( - default="http://100.64.0.24:8002", - description="Valhalla routing URL", - ) - address_book_url: str = Field( - default="http://100.64.0.24:8420", - description="RECON address book API base URL", - ) - - def __init__(self): - self.valves = self.Valves() - self._expansion_cache: dict[str, list[str]] = {} - self._ranker = None - - def _embed_query(self, text: str) -> list: - """Embed a query string using TEI.""" - resp = requests.post( - self.valves.tei_url, - json={"inputs": text}, - timeout=30, - ) - resp.raise_for_status() - return resp.json()[0] - - def _get_ranker(self): - """Lazy-load FlashRank neural reranker.""" - if self._ranker is None: - from flashrank import Ranker - self._ranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2", cache_dir="/tmp/flashrank") - return self._ranker - - def _rerank_flashrank(self, query: str, results: list) -> list: - """Rerank results using FlashRank neural reranker. - - Takes Qdrant REST API result dicts (with 'payload' and 'score' keys). - Returns reranked list with updated scores, trimmed to rerank_top_n. - """ - from flashrank import RerankRequest - - ranker = self._get_ranker() - - passages = [] - for i, r in enumerate(results): - p = r.get("payload", {}) - text = p.get("content", "") - if not text: - text = p.get("summary", "") - passages.append({"id": i, "text": text[:2048]}) - - if not passages: - return results - - request = RerankRequest(query=query, passages=passages) - ranked = ranker.rerank(request) - - reranked = [] - for item in ranked[:self.valves.rerank_top_n]: - idx = item["id"] - result_copy = dict(results[idx]) - result_copy["score"] = item["score"] - reranked.append(result_copy) - - return reranked - - def _mmr_select(self, candidates: list, final_k: int) -> list: - """Select final_k results using Maximal Marginal Relevance. - - Penalizes redundancy: same book_title (0.6), same domain (0.3), same source_type (0.1). - Works with Qdrant REST API result dicts. - """ - if len(candidates) <= final_k: - return candidates - - selected = [candidates[0]] - remaining = list(candidates[1:]) - - while len(selected) < final_k and remaining: - best_score = -999 - best_idx = 0 - - for i, candidate in enumerate(remaining): - relevance = candidate.get("score", 0) - cp = candidate.get("payload", {}) - - max_overlap = 0 - for sel in selected: - sp = sel.get("payload", {}) - overlap = 0 - - c_title = cp.get("book_title", "") - s_title = sp.get("book_title", "") - if c_title and s_title and c_title == s_title: - overlap += 0.6 - - c_domain = cp.get("domain", "") - s_domain = sp.get("domain", "") - if c_domain and s_domain and c_domain == s_domain: - overlap += 0.3 - - c_src = cp.get("source_type", "") - s_src = sp.get("source_type", "") - if c_src and s_src and c_src == s_src: - overlap += 0.1 - - max_overlap = max(max_overlap, overlap) - - diversity = self.valves.mmr_diversity - mmr_score = (1 - diversity) * relevance - diversity * max_overlap - - if mmr_score > best_score: - best_score = mmr_score - best_idx = i - - selected.append(remaining.pop(best_idx)) - - return selected - - @staticmethod - def _detect_intent(query: str) -> Optional[list]: - """Detect query intent and return preferred knowledge_types, or None for unfiltered.""" - q = query.strip() - if _PROCEDURAL_RE.search(q): - return ["procedural", "operational"] - if _FOUNDATIONAL_RE.search(q): - return ["foundational"] - return None - - def _search_qdrant( - self, - vector: list, - limit: int, - knowledge_types: Optional[list] = None, - domain: Optional[str] = None, - exclude_subdomains: Optional[list] = None, - ) -> list: - """Search Qdrant for similar vectors, optionally filtered by knowledge_type and/or domain.""" - url = f"{self.valves.qdrant_url}/collections/{self.valves.collection}/points/search" - payload = { - "vector": vector, - "limit": limit, - "with_payload": True, - "score_threshold": self.valves.score_threshold, - } - - must_clauses = [] - must_not_clauses = [] - should_clauses = [] - - if domain: - must_clauses.append({"key": "domain", "match": {"value": domain}}) - - if knowledge_types: - for kt in knowledge_types: - should_clauses.append({"key": "knowledge_type", "match": {"value": kt}}) - - if exclude_subdomains: - for sd in exclude_subdomains: - must_not_clauses.append({"key": "subdomain", "match": {"value": sd}}) - - if must_clauses or should_clauses or must_not_clauses: - filter_obj = {} - if must_clauses: - filter_obj["must"] = must_clauses - if should_clauses: - filter_obj["should"] = should_clauses - if must_not_clauses: - filter_obj["must_not"] = must_not_clauses - payload["filter"] = filter_obj - - resp = requests.post(url, json=payload, timeout=30) - resp.raise_for_status() - return resp.json().get("result", []) - - def _boost_transcripts(self, results: list, factor: float = 1.10) -> list: - """Boost transcript source scores to surface video content alongside documents.""" - for r in results: - p = r.get("payload", {}) - if p.get("source_type") == "transcript": - r["score"] = r.get("score", 0) * factor - return results - - def _fetch_guaranteed_transcripts(self, vector: list, domain: str = "Medical", limit: int = 3, exclude_subdomains: Optional[list] = None) -> list: - """Fetch top transcript results for a domain regardless of score threshold.""" - url = f"{self.valves.qdrant_url}/collections/{self.valves.collection}/points/search" - filter_obj = { - "must": [ - {"key": "source_type", "match": {"value": "transcript"}}, - {"key": "domain", "match": {"value": domain}}, - ], - } - if exclude_subdomains: - filter_obj["must_not"] = [ - {"key": "subdomain", "match": {"value": sd}} for sd in exclude_subdomains - ] - payload = { - "vector": vector, - "limit": limit, - "with_payload": True, - "filter": filter_obj, - } - try: - resp = requests.post(url, json=payload, timeout=10) - resp.raise_for_status() - return resp.json().get("result", []) - except Exception as e: - log.warning(f"Guaranteed transcript fetch failed: {e}") - return [] - - def _expand_query_ollama(self, query: str) -> list[str]: - """Generate alternative search terms via Ollama. Cached, 10s timeout, fail-safe.""" - if query in self._expansion_cache: - return self._expansion_cache[query] - try: - resp = requests.post( - "http://100.64.0.14:11434/api/generate", - json={ - "model": "goekdenizguelmez/JOSIEFIED-Qwen3:8b", - "prompt": ( - f'Given this search query for a military/survival/preparedness knowledge base: "{query}"\n' - "Generate 3 specific technical search terms that would find TCCC, tactical medicine, " - "or field craft content. Focus on specific procedures, equipment, or doctrine terms " - "— not generic descriptions. Return only the terms, one per line, no numbering, no explanations." - ), - "stream": False, - }, - timeout=10, - ) - resp.raise_for_status() - text = resp.json().get("response", "") - terms = [ - t for t in ( - line.strip().lstrip("0123456789.-)*# ") - for line in text.strip().split("\n") - if line.strip() - ) - if t and len(t) >= 3 - ][:3] - self._expansion_cache[query] = terms - log.info(f"Query expansion: {query!r} → {terms}") - return terms - except Exception as e: - log.warning(f"Query expansion failed (proceeding without): {e}") - self._expansion_cache[query] = [] - return [] - - def _search_expanded_terms( - self, - terms: list[str], - intent_types: Optional[list], - limit: int, - exclude_subdomains: Optional[list] = None, - ) -> list: - """Embed and search expanded query terms in parallel.""" - if not terms: - return [] - - def embed_and_search(term: str) -> list: - vec = self._embed_query(term) - return self._search_qdrant(vec, limit, knowledge_types=intent_types, exclude_subdomains=exclude_subdomains) - - results = [] - with ThreadPoolExecutor(max_workers=min(len(terms), 3)) as pool: - futures = {pool.submit(embed_and_search, t): t for t in terms} - for future in as_completed(futures): - term = futures[future] - try: - results.extend(future.result()) - except Exception as e: - log.warning(f"Expanded search for {term!r} failed: {e}") - return results - - def _format_context(self, results: list) -> str: - """Format search results into a context block for the system prompt.""" - if not results: - return "" - - blocks = [] - for i, r in enumerate(results, 1): - p = r.get("payload", {}) - score = r.get("score", 0) - - # Build citation line - book = p.get("book_title") or p.get("filename", "Unknown") - page = p.get("page_ref", "") - if page: - page_str = str(page) - if not page_str.startswith("p"): - page_str = f"p. {page_str}" - citation = f"{book}, {page_str}" - else: - citation = book - - # Summary or truncated content - summary = p.get("summary", "") - if not summary: - content = p.get("content", "") - summary = content[:500] + "..." if len(content) > 500 else content - - # Key facts - key_facts = p.get("key_facts", []) - facts_str = "" - if key_facts and isinstance(key_facts, list): - facts_str = "\nKey facts: " + "; ".join(str(f) for f in key_facts[:5]) - - # Domain - domains = p.get("domain", []) - subdomains = p.get("subdomain", []) - domain_str = "" - if domains: - d = ", ".join(domains) if isinstance(domains, list) else str(domains) - if subdomains: - s = ", ".join(subdomains) if isinstance(subdomains, list) else str(subdomains) - domain_str = f"\nDomain: {d} > {s}" - else: - domain_str = f"\nDomain: {d}" - - # Download URL - dl = p.get("download_url", "") - source_type = p.get("source_type", "document") - if dl: - if source_type == "transcript": - dl_str = f"\nSource Video: {dl}" - elif source_type == "web": - dl_str = f"\nSource URL: {dl}" - else: - dl_str = f"\nSource PDF: {dl}" - else: - dl_str = "" - - block = f"[{i}] {citation} (relevance: {score:.2f})\n{summary}{facts_str}{domain_str}{dl_str}" - blocks.append(block) - - return "\n\n".join(blocks) - - async def inlet( - self, - body: dict, - __user__: Optional[dict] = None, - __event_emitter__: Callable[[dict], Awaitable[None]] = None, - ) -> dict: - if not self.valves.enabled: - return body - - # Get the latest user message - messages = body.get("messages", []) - user_messages = [m for m in messages if m.get("role") == "user"] - if not user_messages: - return body - - query = user_messages[-1].get("content", "") - if not query or len(query.strip()) < 3: - return body - - # ── ROUTER GATE (v4.3.0) ───────────────────────────────────────── - if self.valves.router_enabled: - route, confidence = _classify_query( - query, self.valves.tei_url, self.valves.router_threshold - ) - log.info(f"Router: {query!r} → {route} ({confidence:.3f})") - - if route == "direct_answer": - if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"description": "Direct response", "done": True}} - ) - return body - - if route == "nav_route": - if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"description": "Getting directions...", "done": False}} - ) - result = _handle_nav_route( - query, - self.valves.photon_url, - self.valves.valhalla_url, - "Buhl, Idaho", - self.valves.address_book_url, - ) - if result: - _inject_nav_context(body, result) - if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"description": "Directions ready", "done": True}} - ) - return body - # Fall through to RAG if nav handling fails - - if route == "nav_reverse_geocode": - if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"description": "Looking up location...", "done": False}} - ) - result = _handle_reverse_geocode(query, self.valves.photon_url) - if result: - _inject_nav_context(body, result) - if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"description": "Location found", "done": True}} - ) - return body - # Fall through to RAG if reverse geocode fails - - # route == "rag_search" or nav fallthrough → continue existing pipeline - - # ── EXISTING RAG PIPELINE ───────────────────────────────────────── - # Emit status - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Searching RECON knowledge base...", - "done": False, - }, - } - ) - - try: - vector = self._embed_query(query) - - # Detect intent (knowledge_type filter) - intent_types = self._detect_intent(query) - - # Exclude obstetric/midwifery content when tactical context detected - exclude_subs = _OBSTETRIC_SUBDOMAINS if _TACTICAL_RE.search(query) else None - - # Start query expansion in background (runs concurrently with main search) - expansion_executor = ThreadPoolExecutor(max_workers=1) - expansion_future = expansion_executor.submit(self._expand_query_ollama, query) - - # Search Qdrant — unfiltered semantic search, optionally narrowed by knowledge_type - pool_size = self.valves.candidate_limit - if intent_types: - results = self._search_qdrant(vector, pool_size, knowledge_types=intent_types, - exclude_subdomains=exclude_subs) - if len(results) < self.valves.fallback_min: - results = self._search_qdrant(vector, pool_size, exclude_subdomains=exclude_subs) - else: - results = self._search_qdrant(vector, pool_size, exclude_subdomains=exclude_subs) - - # Collect expansion results and merge with main search - try: - expanded_terms = expansion_future.result(timeout=12) - except Exception: - expanded_terms = [] - expansion_executor.shutdown(wait=False) - - if expanded_terms: - expanded_results = self._search_expanded_terms( - expanded_terms, intent_types, pool_size, - exclude_subdomains=exclude_subs, - ) - if expanded_results: - combined = list(results) + expanded_results - seen: dict[str, dict] = {} - for r in combined: - pid = str(r.get("id", "")) - if pid not in seen or (r.get("score") or 0) > (seen[pid].get("score") or 0): - seen[pid] = r - results = sorted(seen.values(), key=lambda x: -(x.get("score") or 0)) - - # Guaranteed transcript inclusion for medical queries - if _TACTICAL_RE.search(query) or any( - kw in query.lower() for kw in ("medical", "medicine", "wound", "trauma", "tourniquet", - "hemorrhage", "bleeding", "fracture", "burn", "cpr", - "first aid", "triage", "casualty") - ): - transcript_results = self._fetch_guaranteed_transcripts(vector, domain="Medical", limit=3, exclude_subdomains=exclude_subs) - if transcript_results: - combined = list(results) + transcript_results - seen: dict[str, dict] = {} - for r in combined: - pid = str(r.get("id", "")) - if pid not in seen or (r.get("score") or 0) > (seen[pid].get("score") or 0): - seen[pid] = r - results = sorted(seen.values(), key=lambda x: -(x.get("score") or 0)) - - # Boost transcript sources across all retrieval paths - results = self._boost_transcripts(results) - - # Neural reranking via FlashRank, then MMR diversity selection - try: - results = self._rerank_flashrank(query, results) - results = self._mmr_select(results, self.valves.top_k) - except Exception as e: - log.warning(f"FlashRank reranking failed, falling back to keyword overlap: {e}") - results = _rerank_by_keyword_overlap(query, results) - results = results[:self.valves.top_k] - - # Store results for outlet citations (module-level, keyed by chat_id) - chat_id = body.get("chat_id", body.get("metadata", {}).get("chat_id", "")) - if chat_id: - _SOURCE_STORE[chat_id] = results - - # Build context block - context = self._format_context(results) - - if context: - rag_prompt = ( - "You have access to the RECON knowledge base — a curated library of military field manuals, " - "survival guides, preparedness literature, and video transcripts. Answer the user's question using " - "the reference material below. Reference sources using [1], [2], [3] etc. matching the " - "numbered sources provided. Use these numbers inline in your response.\n\n" - "If the reference material doesn't adequately answer the question, say so explicitly rather " - "than filling gaps with general knowledge.\n\n" - "---REFERENCE MATERIAL---\n\n" - f"{context}\n\n" - "---END REFERENCE MATERIAL---" - ) - else: - rag_prompt = ( - "You have access to the RECON knowledge base, but no relevant reference material was " - "found for this query. Answer from your general knowledge and clearly flag that your " - "response is NOT backed by the RECON reference library." - ) - - # Inject into system message - system_msg = next( - (m for m in messages if m.get("role") == "system"), None - ) - if system_msg: - system_msg["content"] = system_msg["content"] + "\n\n" + rag_prompt - else: - body["messages"].insert( - 0, {"role": "system", "content": rag_prompt} - ) - - if __event_emitter__: - status_msg = f"Found {len(results)} reference{'s' if len(results) != 1 else ''}" if results else "No matching references found" - await __event_emitter__( - { - "type": "status", - "data": { - "description": status_msg, - "done": True, - }, - } - ) - - except Exception as e: - log.warning(f"RECON RAG search failed: {e}") - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "RECON search unavailable, proceeding without references", - "done": True, - }, - } - ) - - return body - - async def outlet( - self, - body: dict, - __user__: Optional[dict] = None, - __event_emitter__: Callable[[dict], Awaitable[None]] = None, - ) -> dict: - if not self.valves.enabled or not __event_emitter__: - return body - - # Retrieve sources from module-level store (survives instance recreation) - chat_id = body.get("chat_id", "") - sources = _SOURCE_STORE.pop(chat_id, []) - if not sources: - return body - - # Emit citations for each source used - for r in sources: - try: - if not isinstance(r, dict): - continue - p = r.get("payload") or {} - if not isinstance(p, dict): - p = {} - - # Build citation — every field defensively None-checked - book = p.get("book_title") or p.get("filename") or "Unknown Source" - page = p.get("page_ref") - if page is not None and str(page).strip(): - page_str = str(page).strip() - if not page_str.startswith("p"): - page_str = f"p. {page_str}" - citation_name = f"{book}, {page_str}" - else: - citation_name = str(book) - - download_url = str(p.get("download_url") or "") - - # Safe summary extraction — handle None/missing without raising - summary = str(p.get("summary") or "") - if not summary: - content = str(p.get("content") or "") - summary = content[:300] if content else "" - - # Safe score formatting - score = r.get("score") - try: - relevance = f"{float(score):.2f}" - except (TypeError, ValueError): - relevance = "0.00" - - author = str(p.get("book_author") or "") - - await __event_emitter__( - { - "type": "source", - "data": { - "document": [summary], - "metadata": [ - { - "source": citation_name, - "url": download_url, - "author": author, - "relevance": relevance, - } - ], - "source": { - "name": citation_name, - "url": download_url, - }, - }, - } - ) - except Exception as e: - pid = r.get("id", "?") if isinstance(r, dict) else "?" - log.warning(f"Failed to emit citation (id={pid}): {e}") - - return body +""" +title: RECON Knowledge Base +author: Echo6 +version: 5.0.0 +description: RAG filter with three-tier cascade: Qdrant (domain knowledge) → Kiwix (offline wiki) → SearXNG (web search). Supports intent-based metadata filtering, FlashRank neural reranking with MMR diversity, Ollama-powered query expansion, transcript source boosting, semantic query routing with inline navigation, and address book place resolution. +""" + +import logging +import json +import math +import re +import threading +import html +from datetime import datetime +from html.parser import HTMLParser +from pathlib import Path +from typing import Optional, Callable, Awaitable +from concurrent.futures import ThreadPoolExecutor, as_completed +from urllib.parse import quote, unquote + +import requests +from pydantic import BaseModel, Field + +log = logging.getLogger(__name__) + +# Module-level source store: keyed by chat_id so inlet/outlet share state +# even if OWI instantiates separate Filter objects per call. +_SOURCE_STORE: dict[str, list] = {} + +# ── CASCADE CONFIGURATION (v5.0.0) ─────────────────────────────────────────── +# FlashRank score threshold for Tier 1 (Qdrant). Below this, fall through to Tier 2. +# Based on calibration: RECON queries cluster at 0.95-1.0, misses below 0.3. +# 0.5 is conservative - will let more through to Kiwix than strictly necessary. +CASCADE_CONFIDENCE_THRESHOLD = 0.5 + +# Kiwix-serve configuration +KIWIX_BASE_URL = "http://localhost:8430" +KIWIX_SEARCH_TIMEOUT = 5 # seconds +KIWIX_ARTICLE_TIMEOUT = 5 # seconds +KIWIX_MAX_RESULTS = 3 + +# SearXNG configuration +SEARXNG_URL = "http://192.168.1.102:8080" +SEARXNG_TIMEOUT = 5 # seconds +SEARXNG_MAX_RESULTS = 5 + +# Cascade logging +CASCADE_LOG_PATH = Path("/opt/recon/logs/cascade.jsonl") + +# ── Semantic Query Router (v4.3.0) ─────────────────────────────────────────── +ROUTE_EXAMPLES = { + "nav_route": [ + "how do I get to Boise", + "directions to Twin Falls", + "how do I get from Buhl to Boise", + "drive from Jerome to Sun Valley", + "route from Boise to McCall", + "what's the fastest way to Sun Valley", + "how far is it to Twin Falls", + "take me to Shoshone", + "navigate to the airport", + "how do I drive to Salt Lake City", + "walking directions to the park", + "bike route to downtown", + ], + "nav_reverse_geocode": [ + "what town is at 42.5, -114.7", + "where am I right now", + "what is at coordinates 43.6, -116.2", + "what location is 42.574, -114.607", + "where is this place 44.0, -114.3", + "what city is near 42.7, -114.5", + "reverse geocode 43.0, -115.0", + "what's at this location 42.9, -114.8", + ], + "direct_answer": [ + "hello", + "hey aurora", + "good morning", + "thanks", + "thank you", + "what's your name", + "who are you", + "tell me a joke", + "how are you", + "hi there", + ], + "rag_search": [ + "what does the survival manual say about water", + "how to purify water in the field", + "how to treat a gunshot wound", + "what is the ranger handbook chapter on patrolling", + "field manual water purification", + "how to build a shelter in the wilderness", + "tactical combat casualty care procedures", + "what does FM 21-76 say about fire starting", + ], +} + +_ROUTE_CENTROIDS: dict | None = None +_ROUTER_LOCK = threading.Lock() + + +def _embed_batch_router(texts: list[str], tei_url: str) -> list[list[float]]: + resp = requests.post(tei_url, json={"inputs": texts}, timeout=30) + resp.raise_for_status() + return resp.json() + + +def _compute_centroid(vectors: list[list[float]]) -> list[float]: + n = len(vectors) + dim = len(vectors[0]) + centroid = [0.0] * dim + for vec in vectors: + for i in range(dim): + centroid[i] += vec[i] + for i in range(dim): + centroid[i] /= n + return centroid + + +def _cosine_similarity(a: list[float], b: list[float]) -> float: + dot = 0.0 + norm_a = 0.0 + norm_b = 0.0 + for i in range(len(a)): + dot += a[i] * b[i] + norm_a += a[i] * a[i] + norm_b += b[i] * b[i] + denom = math.sqrt(norm_a) * math.sqrt(norm_b) + if denom == 0: + return 0.0 + return dot / denom + + +def _ensure_centroids(tei_url: str) -> dict[str, list[float]]: + global _ROUTE_CENTROIDS + if _ROUTE_CENTROIDS is not None: + return _ROUTE_CENTROIDS + with _ROUTER_LOCK: + if _ROUTE_CENTROIDS is not None: + return _ROUTE_CENTROIDS + all_texts = [] + route_ranges: dict[str, tuple[int, int]] = {} + offset = 0 + for route, examples in ROUTE_EXAMPLES.items(): + route_ranges[route] = (offset, offset + len(examples)) + all_texts.extend(examples) + offset += len(examples) + all_vectors = _embed_batch_router(all_texts, tei_url) + centroids = {} + for route, (start, end) in route_ranges.items(): + centroids[route] = _compute_centroid(all_vectors[start:end]) + _ROUTE_CENTROIDS = centroids + return _ROUTE_CENTROIDS + + +def _classify_query( + query: str, + tei_url: str, + threshold: float = 0.45, +) -> tuple[str, float]: + """Classify query intent. Returns ("rag_search", 0.0) on any failure.""" + try: + centroids = _ensure_centroids(tei_url) + vecs = _embed_batch_router([query], tei_url) + query_vec = vecs[0] + best_route = "rag_search" + best_score = 0.0 + for route, centroid in centroids.items(): + sim = _cosine_similarity(query_vec, centroid) + if sim > best_score: + best_score = sim + best_route = route + if best_score < threshold: + return ("rag_search", best_score) + return (best_route, best_score) + except Exception as e: + log.warning(f"Router classification failed: {e}") + return ("rag_search", 0.0) + + +# ── Navigation handlers (v4.3.0) ───────────────────────────────────────────── +_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$') +_FROM_TO_RE = re.compile(r'from\s+(.+?)\s+to\s+(.+?)(?:\s+by\s+\w+)?$', re.IGNORECASE) +_TO_RE = re.compile(r'(?:to|towards?)\s+(?:the\s+)?(.+?)$', re.IGNORECASE) +_COORD_IN_TEXT_RE = re.compile(r'(-?\d+\.?\d+)\s*,\s*(-?\d+\.?\d+)') +_MODE_MAP = { + "walk": "pedestrian", "walking": "pedestrian", "foot": "pedestrian", "pedestrian": "pedestrian", + "bike": "bicycle", "cycling": "bicycle", "bicycle": "bicycle", "cycle": "bicycle", + "truck": "truck", "lorry": "truck", + "drive": "auto", "driving": "auto", "car": "auto", "auto": "auto", +} + + +def _detect_mode(query: str) -> str: + q = query.lower() + for keyword, mode in _MODE_MAP.items(): + if keyword in q: + return mode + return "auto" + + +def _clean_place(text: str) -> str: + """Clean a place string for geocoding: strip articles, punctuation, normalize 'in' to comma.""" + s = text.strip().rstrip('?.,!') + # Strip leading articles + s = re.sub(r'^(the|a|an)\s+', '', s, flags=re.IGNORECASE) + # "214 North St in Filer ID" → "214 North St, Filer, ID" + s = re.sub(r'\s+in\s+', ', ', s, count=1, flags=re.IGNORECASE) + return s.strip() + + +def _parse_nav_query(query: str) -> tuple[str, str, str] | None: + mode = _detect_mode(query) + m = _FROM_TO_RE.search(query) + if m: + return (_clean_place(m.group(1)), _clean_place(m.group(2)), mode) + m = _TO_RE.search(query) + if m: + dest = _clean_place(m.group(1)) + if dest: + return (None, dest, mode) + return None + + +def _geocode(query: str, photon_url: str, address_book_url: str = "") -> tuple[float, float, str] | tuple[None, None, None]: + m = _COORD_RE.match(query.strip()) + if m: + lat, lon = float(m.group(1)), float(m.group(2)) + return lat, lon, query + # Address book lookup (before Photon) + ab = _address_book_lookup(query, address_book_url) + if ab: + return ab['lat'], ab['lon'], ab.get('address') or ab['name'] + resp = requests.get( + f"{photon_url}/api", + params={"q": query, "limit": 1}, + timeout=10, + ) + resp.raise_for_status() + features = resp.json().get("features", []) + if not features: + return None, None, None + props = features[0]["properties"] + coords = features[0]["geometry"]["coordinates"] + parts = [props.get("name", "")] + for key in ("city", "state", "country"): + v = props.get(key) + if v and v != parts[-1]: + parts.append(v) + return coords[1], coords[0], ", ".join(p for p in parts if p) + + +def _route_valhalla( + orig: tuple[float, float], + dest: tuple[float, float], + mode: str, + valhalla_url: str, +) -> str | None: + try: + resp = requests.post( + f"{valhalla_url}/route", + json={ + "locations": [ + {"lat": orig[0], "lon": orig[1]}, + {"lat": dest[0], "lon": dest[1]}, + ], + "costing": mode, + "directions_options": {"units": "miles"}, + }, + timeout=30, + ) + except requests.RequestException: + return None + if resp.status_code != 200: + return None + trip = resp.json()["trip"] + summary = trip["summary"] + legs = trip["legs"][0]["maneuvers"] + miles = round(summary["length"], 1) + minutes = round(summary["time"] / 60, 1) + lines = [f"Distance: {miles} miles | Time: {minutes} minutes", ""] + for i, m in enumerate(legs, 1): + inst = m["instruction"] + dist = m.get("length", 0) + if dist > 0: + lines.append(f"{i}. {inst} — {round(dist, 1)} mi") + else: + lines.append(f"{i}. {inst}") + return "\n".join(lines) + + +def _handle_nav_route( + query: str, + photon_url: str, + valhalla_url: str, + default_origin: str, + address_book_url: str = "", +) -> str | None: + parsed = _parse_nav_query(query) + if not parsed: + return None + origin_str, dest_str, mode = parsed + if not origin_str: + origin_str = default_origin + orig_lat, orig_lon, orig_name = _geocode(origin_str, photon_url, address_book_url) + if orig_lat is None: + return None + dest_lat, dest_lon, dest_name = _geocode(dest_str, photon_url, address_book_url) + if dest_lat is None: + return None + directions = _route_valhalla( + (orig_lat, orig_lon), (dest_lat, dest_lon), mode, valhalla_url + ) + if not directions: + return None + return f"Directions from {orig_name} to {dest_name} ({mode}):\n{directions}" + + +def _handle_reverse_geocode(query: str, photon_url: str) -> str | None: + m = _COORD_IN_TEXT_RE.search(query) + if not m: + return None + lat, lon = float(m.group(1)), float(m.group(2)) + try: + resp = requests.get( + f"{photon_url}/reverse", + params={"lat": lat, "lon": lon, "limit": 1}, + timeout=10, + ) + resp.raise_for_status() + features = resp.json().get("features", []) + if not features: + return f"No location found near coordinates ({lat}, {lon})" + props = features[0]["properties"] + parts = [] + for key in ("name", "city", "state", "country"): + v = props.get(key) + if v and v not in parts: + parts.append(v) + display = ", ".join(parts) if parts else "Unknown location" + return f"Location: {display} ({lat}, {lon})" + except Exception: + return None + + +def _inject_nav_context(body: dict, context: str): + messages = body.get("messages", []) + nav_block = ( + "\n\n---NAVIGATION RESULT---\n\n" + f"{context}\n\n" + "---END NAVIGATION RESULT---\n\n" + "Present these directions to the user exactly as provided. " + "Do not summarize or omit steps. You may add brief contextual notes." + ) + system_msg = next((m for m in messages if m.get("role") == "system"), None) + if system_msg: + system_msg["content"] = system_msg["content"] + nav_block + else: + body["messages"].insert(0, {"role": "system", "content": nav_block}) + + + +def _address_book_lookup(query: str, address_book_url: str) -> dict | None: + """Check RECON address book for exact place match. Returns dict with lat/lon or None.""" + if not address_book_url: + return None + try: + resp = requests.get( + f"{address_book_url}/api/address_book/lookup", + params={"q": query}, + timeout=2, + ) + if resp.status_code == 200: + data = resp.json() + if data.get("confidence") == "exact" and data.get("lat") and data.get("lon"): + log.info(f"Address book hit: {query!r} → {data['name']} ({data['lat']}, {data['lon']})") + return data + return None + except Exception: + return None + + +# ── End router/nav code ────────────────────────────────────────────────────── + +# ── Kiwix Search Helpers (v5.0.0) ──────────────────────────────────────────── + +class _KiwixResultParser(HTMLParser): + """Parse Kiwix search results HTML to extract articles.""" + def __init__(self): + super().__init__() + self.results = [] + self._in_results = False + self._in_li = False + self._in_cite = False + self._in_info = False + self._current = {} + self._capture_text = False + + def handle_starttag(self, tag, attrs): + attrs_dict = dict(attrs) + if tag == "div" and "results" in attrs_dict.get("class", ""): + self._in_results = True + elif self._in_results and tag == "li": + self._in_li = True + self._current = {"title": "", "url": "", "snippet": "", "word_count": ""} + elif self._in_li and tag == "a" and not self._current.get("url"): + self._current["url"] = attrs_dict.get("href", "") + self._capture_text = True + elif self._in_li and tag == "cite": + self._in_cite = True + self._capture_text = True + elif self._in_li and tag == "div" and "informations" in attrs_dict.get("class", ""): + self._in_info = True + self._capture_text = True + + def handle_endtag(self, tag): + if tag == "div" and self._in_results and not self._in_li: + self._in_results = False + elif tag == "li" and self._in_li: + if self._current.get("url"): + self.results.append(self._current) + self._current = {} + self._in_li = False + elif tag == "a" and self._capture_text and not self._in_cite: + self._capture_text = False + elif tag == "cite": + self._in_cite = False + self._capture_text = False + elif tag == "div" and self._in_info: + self._in_info = False + self._capture_text = False + + def handle_data(self, data): + if self._capture_text and self._in_li: + text = data.strip() + if self._in_cite: + self._current["snippet"] += text + " " + elif self._in_info: + self._current["word_count"] = text + elif not self._current.get("title"): + self._current["title"] = text + + +def _strip_html_tags(html_content: str) -> str: + """Simple HTML to plain text conversion using stdlib.""" + # Remove script and style elements + text = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL | re.IGNORECASE) + text = re.sub(r']*>.*?', '', text, flags=re.DOTALL | re.IGNORECASE) + # Remove tags + text = re.sub(r'<[^>]+>', ' ', text) + # Decode entities + text = html.unescape(text) + # Normalize whitespace + text = re.sub(r'\s+', ' ', text).strip() + return text + + +def _fetch_kiwix_books() -> list[str]: + """Fetch list of available books from kiwix-serve catalog.""" + try: + resp = requests.get( + f"{KIWIX_BASE_URL}/catalog/v2/entries", + timeout=KIWIX_SEARCH_TIMEOUT, + ) + resp.raise_for_status() + # Extract book names from href attributes + books = re.findall(r'href="/content/([^"]+)"', resp.text) + return list(set(books)) # dedupe + except Exception as e: + log.warning(f"Failed to fetch Kiwix book list: {e}") + return [] + + +def _search_kiwix_book(book: str, query: str, limit: int = 5) -> list[dict]: + """Search a single Kiwix book and return results.""" + try: + resp = requests.get( + f"{KIWIX_BASE_URL}/search", + params={"content": book, "pattern": query, "limit": limit}, + timeout=KIWIX_SEARCH_TIMEOUT, + ) + if resp.status_code != 200: + return [] + + parser = _KiwixResultParser() + parser.feed(resp.text) + + # Add book name to results + for r in parser.results: + r["book"] = book + + return parser.results + except Exception as e: + log.warning(f"Kiwix search failed for {book}: {e}") + return [] + + +def _fetch_kiwix_article(url_path: str) -> str: + """Fetch and extract text content from a Kiwix article.""" + try: + resp = requests.get( + f"{KIWIX_BASE_URL}{url_path}", + timeout=KIWIX_ARTICLE_TIMEOUT, + ) + resp.raise_for_status() + + # Extract main content - try to find article body + content = resp.text + + # Try to extract just the main content area + main_match = re.search(r']*>(.*?)', content, re.DOTALL | re.IGNORECASE) + if main_match: + content = main_match.group(1) + else: + # Try article tag + article_match = re.search(r']*>(.*?)', content, re.DOTALL | re.IGNORECASE) + if article_match: + content = article_match.group(1) + else: + # Try body content div + body_match = re.search(r']*class="[^"]*content[^"]*"[^>]*>(.*?)', content, re.DOTALL | re.IGNORECASE) + if body_match: + content = body_match.group(1) + + return _strip_html_tags(content)[:4000] # Limit to 4000 chars + except Exception as e: + log.warning(f"Failed to fetch Kiwix article {url_path}: {e}") + return "" + + +def _search_kiwix(query: str, books: list[str]) -> list[dict]: + """Search Kiwix across specified books and return merged results.""" + all_results = [] + + # Prioritize English Wikipedia and other English content + priority_books = [] + other_books = [] + for book in books: + if "wikipedia_en" in book or "_en_" in book or "_eng_" in book: + priority_books.append(book) + elif not any(lang in book for lang in ["_af_", "_de_", "_fr_", "_es_"]): + other_books.append(book) + + # Search priority books first + for book in priority_books[:3]: # Limit to top 3 priority books + results = _search_kiwix_book(book, query, limit=5) + all_results.extend(results) + + # If not enough results, try other books + if len(all_results) < KIWIX_MAX_RESULTS: + for book in other_books[:2]: + results = _search_kiwix_book(book, query, limit=3) + all_results.extend(results) + + return all_results[:KIWIX_MAX_RESULTS * 2] # Return up to 6 for further filtering + + +# ── SearXNG Search Helpers (v5.0.0) ────────────────────────────────────────── + +def _search_searxng(query: str) -> list[dict]: + """Search SearXNG and return results. Returns empty list on failure.""" + try: + resp = requests.get( + f"{SEARXNG_URL}/search", + params={"q": query, "format": "json"}, + timeout=SEARXNG_TIMEOUT, + ) + if resp.status_code != 200: + log.warning(f"SearXNG returned status {resp.status_code}") + return [] + + data = resp.json() + results = data.get("results", []) + + # Format results + formatted = [] + for r in results[:SEARXNG_MAX_RESULTS]: + formatted.append({ + "title": r.get("title", ""), + "url": r.get("url", ""), + "snippet": r.get("content", ""), + "engines": r.get("engines", []), + "score": r.get("score", 0), + }) + + return formatted + except requests.Timeout: + log.warning("SearXNG request timed out (offline or slow)") + return [] + except requests.ConnectionError: + log.warning("SearXNG connection failed (offline)") + return [] + except Exception as e: + log.warning(f"SearXNG search failed: {e}") + return [] + + +# ── Cascade Logging (v5.0.0) ───────────────────────────────────────────────── + +def _log_cascade_decision( + query: str, + router_intent: str, + top_1_score: float, + tier_used: int, + num_results: int, +): + """Log cascade decision to JSONL file for threshold tuning.""" + try: + CASCADE_LOG_PATH.parent.mkdir(parents=True, exist_ok=True) + entry = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "query": query, + "router_intent": router_intent, + "top_1_score": round(top_1_score, 4), + "tier_used": tier_used, + "num_results": num_results, + } + with open(CASCADE_LOG_PATH, "a") as f: + f.write(json.dumps(entry) + "\n") + except Exception as e: + log.warning(f"Failed to log cascade decision: {e}") + + +# ── End cascade helpers ────────────────────────────────────────────────────── + +# Subdomains excluded from Medical results when tactical context detected +_OBSTETRIC_SUBDOMAINS = [ + "Obstetrics", "Midwifery", "Pregnancy", "Pregnancy Care", + "High-Risk Pregnancy", "Childbirth", "Postpartum Care", + "Family Planning", "Contraception", "Breastfeeding", + "Labor Complications", "Twin Delivery", +] + +# Query intent patterns — compiled once at import time +_PROCEDURAL_RE = re.compile( + r"^(how\s+(do|can|should|would|to)\b|steps?\s+(to|for)\b|procedure\s+for\b|technique\s+for\b|way\s+to\b|method\s+(to|for)\b|guide\s+(to|for)\b|instructions?\s+for\b)", + re.IGNORECASE, +) +_FOUNDATIONAL_RE = re.compile( + r"^(what\s+(is|are|does|was|were)\b|explain\b|define\b|why\s+(does|do|is|are|did)\b|describe\b|meaning\s+of\b|difference\s+between\b)", + re.IGNORECASE, +) + +# Tactical keyword patterns for obstetric subdomain exclusion +_TACTICAL_RE = re.compile( + r"\b(tactical|combat|tccc|casevac|medevac|casualty|triage|tourniquet|hemorrhage|wound packing|chest seal|care under fire|point of injury|far forward|buddy aid|self aid|field care|9-line|march algorithm)\b", + re.IGNORECASE, +) + + +def _rerank_by_keyword_overlap(query: str, results: list) -> list: + """Rerank results by boosting those with query term overlap in content/summary/key_facts. + + Adds a boost of up to 0.15 based on the fraction of query tokens found in the result text. + Results are re-sorted by boosted score. + """ + q_tokens = set(re.findall(r'[a-z0-9][-a-z0-9]{2,}', query.lower())) + if not q_tokens: + return results + + reranked = [] + for r in results: + p = r.get("payload", {}) + score = r.get("score", 0) + + # Build searchable text from content, summary, and key_facts + parts = [] + content = p.get("content", "") + if content: + parts.append(content[:2000].lower()) + summary = p.get("summary", "") + if summary: + parts.append(summary.lower()) + key_facts = p.get("key_facts", []) + if isinstance(key_facts, list): + parts.append(" ".join(str(f) for f in key_facts).lower()) + searchable = " ".join(parts) + + # Count how many query tokens appear in the result + if searchable: + matches = sum(1 for t in q_tokens if t in searchable) + overlap_ratio = matches / len(q_tokens) + else: + overlap_ratio = 0 + + # Boost: up to 0.15 for perfect overlap + boosted_score = score + (overlap_ratio * 0.15) + reranked.append({**r, "score": boosted_score}) + + reranked.sort(key=lambda x: -x["score"]) + return reranked + + +class Filter: + class Valves(BaseModel): + tei_url: str = Field( + default="http://100.64.0.14:8090/embed", + description="TEI embedding endpoint", + ) + qdrant_url: str = Field( + default="http://100.64.0.14:6333", + description="Qdrant REST API base URL", + ) + collection: str = Field( + default="recon_knowledge_hybrid", + description="Qdrant collection name", + ) + top_k: int = Field( + default=8, + description="Number of results to retrieve", + ) + score_threshold: float = Field( + default=0.3, + description="Minimum similarity score to include a result", + ) + fallback_min: int = Field( + default=3, + description="Minimum filtered results before falling back to unfiltered search", + ) + candidate_limit: int = Field( + default=50, + description="Initial retrieval pool size for reranking", + ) + rerank_top_n: int = Field( + default=20, + description="Keep top N after FlashRank reranking", + ) + mmr_diversity: float = Field( + default=0.3, + description="MMR diversity 0-1 (0=pure relevance, 1=max diversity)", + ) + enabled: bool = Field( + default=True, + description="Enable/disable RECON RAG augmentation", + ) + priority: int = Field( + default=0, + description="Filter execution priority (lower = earlier)", + ) + router_enabled: bool = Field( + default=True, + description="Enable semantic query routing", + ) + router_threshold: float = Field( + default=0.45, + description="Min confidence for route classification", + ) + photon_url: str = Field( + default="http://100.64.0.24:2322", + description="Photon geocoder URL", + ) + valhalla_url: str = Field( + default="http://100.64.0.24:8002", + description="Valhalla routing URL", + ) + address_book_url: str = Field( + default="http://100.64.0.24:8420", + description="RECON address book API base URL", + ) + cascade_enabled: bool = Field( + default=True, + description="Enable three-tier cascade (Qdrant → Kiwix → SearXNG)", + ) + cascade_threshold: float = Field( + default=0.5, + description="FlashRank score threshold for cascade fallthrough", + ) + + def __init__(self): + self.valves = self.Valves() + self._expansion_cache: dict[str, list[str]] = {} + self._ranker = None + self._kiwix_books: list[str] | None = None + + def _get_kiwix_books(self) -> list[str]: + """Get cached list of Kiwix books, fetching on first use.""" + if self._kiwix_books is None: + self._kiwix_books = _fetch_kiwix_books() + log.info(f"Loaded {len(self._kiwix_books)} Kiwix books") + return self._kiwix_books + + def _embed_query(self, text: str) -> list: + """Embed a query string using TEI.""" + resp = requests.post( + self.valves.tei_url, + json={"inputs": text}, + timeout=30, + ) + resp.raise_for_status() + return resp.json()[0] + + def _get_ranker(self): + """Lazy-load FlashRank neural reranker.""" + if self._ranker is None: + from flashrank import Ranker + self._ranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2", cache_dir="/tmp/flashrank") + return self._ranker + + def _rerank_flashrank(self, query: str, results: list) -> list: + """Rerank results using FlashRank neural reranker. + + Takes Qdrant REST API result dicts (with 'payload' and 'score' keys). + Returns reranked list with updated scores, trimmed to rerank_top_n. + """ + from flashrank import RerankRequest + + ranker = self._get_ranker() + + passages = [] + for i, r in enumerate(results): + p = r.get("payload", {}) + text = p.get("content", "") + if not text: + text = p.get("summary", "") + passages.append({"id": i, "text": text[:2048]}) + + if not passages: + return results + + request = RerankRequest(query=query, passages=passages) + ranked = ranker.rerank(request) + + reranked = [] + for item in ranked[:self.valves.rerank_top_n]: + idx = item["id"] + result_copy = dict(results[idx]) + result_copy["score"] = float(item["score"]) + reranked.append(result_copy) + + return reranked + + def _mmr_select(self, candidates: list, final_k: int) -> list: + """Select final_k results using Maximal Marginal Relevance. + + Penalizes redundancy: same book_title (0.6), same domain (0.3), same source_type (0.1). + Works with Qdrant REST API result dicts. + """ + if len(candidates) <= final_k: + return candidates + + selected = [candidates[0]] + remaining = list(candidates[1:]) + + while len(selected) < final_k and remaining: + best_score = -999 + best_idx = 0 + + for i, candidate in enumerate(remaining): + relevance = candidate.get("score", 0) + cp = candidate.get("payload", {}) + + max_overlap = 0 + for sel in selected: + sp = sel.get("payload", {}) + overlap = 0 + + c_title = cp.get("book_title", "") + s_title = sp.get("book_title", "") + if c_title and s_title and c_title == s_title: + overlap += 0.6 + + c_domain = cp.get("domain", "") + s_domain = sp.get("domain", "") + if c_domain and s_domain and c_domain == s_domain: + overlap += 0.3 + + c_src = cp.get("source_type", "") + s_src = sp.get("source_type", "") + if c_src and s_src and c_src == s_src: + overlap += 0.1 + + max_overlap = max(max_overlap, overlap) + + diversity = self.valves.mmr_diversity + mmr_score = (1 - diversity) * relevance - diversity * max_overlap + + if mmr_score > best_score: + best_score = mmr_score + best_idx = i + + selected.append(remaining.pop(best_idx)) + + return selected + + @staticmethod + def _detect_intent(query: str) -> Optional[list]: + """Detect query intent and return preferred knowledge_types, or None for unfiltered.""" + q = query.strip() + if _PROCEDURAL_RE.search(q): + return ["procedural", "operational"] + if _FOUNDATIONAL_RE.search(q): + return ["foundational"] + return None + + def _search_qdrant( + self, + vector: list, + limit: int, + knowledge_types: Optional[list] = None, + domain: Optional[str] = None, + exclude_subdomains: Optional[list] = None, + ) -> list: + """Search Qdrant for similar vectors, optionally filtered by knowledge_type and/or domain.""" + url = f"{self.valves.qdrant_url}/collections/{self.valves.collection}/points/search" + payload = { + "vector": vector, + "limit": limit, + "with_payload": True, + "score_threshold": self.valves.score_threshold, + } + + must_clauses = [] + must_not_clauses = [] + should_clauses = [] + + if domain: + must_clauses.append({"key": "domain", "match": {"value": domain}}) + + if knowledge_types: + for kt in knowledge_types: + should_clauses.append({"key": "knowledge_type", "match": {"value": kt}}) + + if exclude_subdomains: + for sd in exclude_subdomains: + must_not_clauses.append({"key": "subdomain", "match": {"value": sd}}) + + if must_clauses or should_clauses or must_not_clauses: + filter_obj = {} + if must_clauses: + filter_obj["must"] = must_clauses + if should_clauses: + filter_obj["should"] = should_clauses + if must_not_clauses: + filter_obj["must_not"] = must_not_clauses + payload["filter"] = filter_obj + + resp = requests.post(url, json=payload, timeout=30) + resp.raise_for_status() + return resp.json().get("result", []) + + def _boost_transcripts(self, results: list, factor: float = 1.10) -> list: + """Boost transcript source scores to surface video content alongside documents.""" + for r in results: + p = r.get("payload", {}) + if p.get("source_type") == "transcript": + r["score"] = r.get("score", 0) * factor + return results + + def _fetch_guaranteed_transcripts(self, vector: list, domain: str = "Medical", limit: int = 3, exclude_subdomains: Optional[list] = None) -> list: + """Fetch top transcript results for a domain regardless of score threshold.""" + url = f"{self.valves.qdrant_url}/collections/{self.valves.collection}/points/search" + filter_obj = { + "must": [ + {"key": "source_type", "match": {"value": "transcript"}}, + {"key": "domain", "match": {"value": domain}}, + ], + } + if exclude_subdomains: + filter_obj["must_not"] = [ + {"key": "subdomain", "match": {"value": sd}} for sd in exclude_subdomains + ] + payload = { + "vector": vector, + "limit": limit, + "with_payload": True, + "filter": filter_obj, + } + try: + resp = requests.post(url, json=payload, timeout=10) + resp.raise_for_status() + return resp.json().get("result", []) + except Exception as e: + log.warning(f"Guaranteed transcript fetch failed: {e}") + return [] + + def _expand_query_ollama(self, query: str) -> list[str]: + """Generate alternative search terms via Ollama. Cached, 10s timeout, fail-safe.""" + if query in self._expansion_cache: + return self._expansion_cache[query] + try: + resp = requests.post( + "http://100.64.0.14:11434/api/generate", + json={ + "model": "goekdenizguelmez/JOSIEFIED-Qwen3:8b", + "prompt": ( + f'Given this search query for a military/survival/preparedness knowledge base: "{query}"\n' + "Generate 3 specific technical search terms that would find TCCC, tactical medicine, " + "or field craft content. Focus on specific procedures, equipment, or doctrine terms " + "— not generic descriptions. Return only the terms, one per line, no numbering, no explanations." + ), + "stream": False, + }, + timeout=10, + ) + resp.raise_for_status() + text = resp.json().get("response", "") + terms = [ + t for t in ( + line.strip().lstrip("0123456789.-)*# ") + for line in text.strip().split("\n") + if line.strip() + ) + if t and len(t) >= 3 + ][:3] + self._expansion_cache[query] = terms + log.info(f"Query expansion: {query!r} → {terms}") + return terms + except Exception as e: + log.warning(f"Query expansion failed (proceeding without): {e}") + self._expansion_cache[query] = [] + return [] + + def _search_expanded_terms( + self, + terms: list[str], + intent_types: Optional[list], + limit: int, + exclude_subdomains: Optional[list] = None, + ) -> list: + """Embed and search expanded query terms in parallel.""" + if not terms: + return [] + + def embed_and_search(term: str) -> list: + vec = self._embed_query(term) + return self._search_qdrant(vec, limit, knowledge_types=intent_types, exclude_subdomains=exclude_subdomains) + + results = [] + with ThreadPoolExecutor(max_workers=min(len(terms), 3)) as pool: + futures = {pool.submit(embed_and_search, t): t for t in terms} + for future in as_completed(futures): + term = futures[future] + try: + results.extend(future.result()) + except Exception as e: + log.warning(f"Expanded search for {term!r} failed: {e}") + return results + + def _format_context(self, results: list, tier_tag: str = "DOMAIN_KNOWLEDGE") -> str: + """Format search results into a context block for the system prompt.""" + if not results: + return "" + + blocks = [] + for i, r in enumerate(results, 1): + p = r.get("payload", {}) + score = r.get("score", 0) + + # Build citation line + book = p.get("book_title") or p.get("filename", "Unknown") + page = p.get("page_ref", "") + if page: + page_str = str(page) + if not page_str.startswith("p"): + page_str = f"p. {page_str}" + citation = f"{book}, {page_str}" + else: + citation = book + + # Summary or truncated content + summary = p.get("summary", "") + if not summary: + content = p.get("content", "") + summary = content[:500] + "..." if len(content) > 500 else content + + # Key facts + key_facts = p.get("key_facts", []) + facts_str = "" + if key_facts and isinstance(key_facts, list): + facts_str = "\nKey facts: " + "; ".join(str(f) for f in key_facts[:5]) + + # Domain + domains = p.get("domain", []) + subdomains = p.get("subdomain", []) + domain_str = "" + if domains: + d = ", ".join(domains) if isinstance(domains, list) else str(domains) + if subdomains: + s = ", ".join(subdomains) if isinstance(subdomains, list) else str(subdomains) + domain_str = f"\nDomain: {d} > {s}" + else: + domain_str = f"\nDomain: {d}" + + # Download URL + dl = p.get("download_url", "") + source_type = p.get("source_type", "document") + if dl: + if source_type == "transcript": + dl_str = f"\nSource Video: {dl}" + elif source_type == "web": + dl_str = f"\nSource URL: {dl}" + else: + dl_str = f"\nSource PDF: {dl}" + else: + dl_str = "" + + block = f"[{tier_tag}:{i}] {citation} (relevance: {score:.2f})\n{summary}{facts_str}{domain_str}{dl_str}" + blocks.append(block) + + return "\n\n".join(blocks) + + def _format_kiwix_context(self, results: list[dict]) -> str: + """Format Kiwix search results into a context block.""" + if not results: + return "" + + blocks = [] + for i, r in enumerate(results, 1): + title = r.get("title", "Unknown") + snippet = r.get("snippet", "").strip() + book = r.get("book", "") + url_path = r.get("url", "") + + # Build wiki URL + if url_path: + # Extract article path from /content/book/path + path_match = re.search(r'/content/[^/]+/(.+)$', url_path) + if path_match: + article_path = path_match.group(1) + wiki_url = f"https://wiki.echo6.co/viewer#{book}/{article_path}" + else: + wiki_url = f"https://wiki.echo6.co/viewer#{book}" + else: + wiki_url = "" + + # Fetch article content if available + content = "" + if url_path: + content = _fetch_kiwix_article(url_path) + if content: + content = content[:1500] # Limit per article + + if not content: + content = snippet + + block = f"[OFFLINE_WIKI:{i}] {title}\n{content}" + if wiki_url: + block += f"\nSource: {wiki_url}" + blocks.append(block) + + return "\n\n".join(blocks) + + def _format_searxng_context(self, results: list[dict]) -> str: + """Format SearXNG search results into a context block.""" + if not results: + return "" + + blocks = [] + for i, r in enumerate(results, 1): + title = r.get("title", "Unknown") + snippet = r.get("snippet", "") + url = r.get("url", "") + engines = r.get("engines", []) + + engine_str = f" (via {', '.join(engines[:2])})" if engines else "" + + block = f"[WEB_SEARCH:{i}] {title}{engine_str}\n{snippet}" + if url: + block += f"\nSource: {url}" + blocks.append(block) + + return "\n\n".join(blocks) + + async def inlet( + self, + body: dict, + __user__: Optional[dict] = None, + __event_emitter__: Callable[[dict], Awaitable[None]] = None, + ) -> dict: + if not self.valves.enabled: + return body + + # Get the latest user message + messages = body.get("messages", []) + user_messages = [m for m in messages if m.get("role") == "user"] + if not user_messages: + return body + + query = user_messages[-1].get("content", "") + if not query or len(query.strip()) < 3: + return body + + router_intent = "rag_search" + + # ── ROUTER GATE (v4.3.0) ───────────────────────────────────────── + if self.valves.router_enabled: + route, confidence = _classify_query( + query, self.valves.tei_url, self.valves.router_threshold + ) + router_intent = route + log.info(f"Router: {query!r} → {route} ({confidence:.3f})") + + if route == "direct_answer": + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Direct response", "done": True}} + ) + return body + + if route == "nav_route": + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Getting directions...", "done": False}} + ) + result = _handle_nav_route( + query, + self.valves.photon_url, + self.valves.valhalla_url, + "Buhl, Idaho", + self.valves.address_book_url, + ) + if result: + _inject_nav_context(body, result) + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Directions ready", "done": True}} + ) + return body + # Fall through to RAG if nav handling fails + + if route == "nav_reverse_geocode": + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Looking up location...", "done": False}} + ) + result = _handle_reverse_geocode(query, self.valves.photon_url) + if result: + _inject_nav_context(body, result) + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Location found", "done": True}} + ) + return body + # Fall through to RAG if reverse geocode fails + + # route == "rag_search" or nav fallthrough → continue existing pipeline + + # ── EXISTING RAG PIPELINE ───────────────────────────────────────── + # Emit status + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Searching RECON knowledge base...", + "done": False, + }, + } + ) + + tier_used = 1 + top_1_score = 0.0 + final_context = "" + final_results = [] + + try: + vector = self._embed_query(query) + + # Detect intent (knowledge_type filter) + intent_types = self._detect_intent(query) + + # Exclude obstetric/midwifery content when tactical context detected + exclude_subs = _OBSTETRIC_SUBDOMAINS if _TACTICAL_RE.search(query) else None + + # Start query expansion in background (runs concurrently with main search) + expansion_executor = ThreadPoolExecutor(max_workers=1) + expansion_future = expansion_executor.submit(self._expand_query_ollama, query) + + # Search Qdrant — unfiltered semantic search, optionally narrowed by knowledge_type + pool_size = self.valves.candidate_limit + if intent_types: + results = self._search_qdrant(vector, pool_size, knowledge_types=intent_types, + exclude_subdomains=exclude_subs) + if len(results) < self.valves.fallback_min: + results = self._search_qdrant(vector, pool_size, exclude_subdomains=exclude_subs) + else: + results = self._search_qdrant(vector, pool_size, exclude_subdomains=exclude_subs) + + # Collect expansion results and merge with main search + try: + expanded_terms = expansion_future.result(timeout=12) + except Exception: + expanded_terms = [] + expansion_executor.shutdown(wait=False) + + if expanded_terms: + expanded_results = self._search_expanded_terms( + expanded_terms, intent_types, pool_size, + exclude_subdomains=exclude_subs, + ) + if expanded_results: + combined = list(results) + expanded_results + seen: dict[str, dict] = {} + for r in combined: + pid = str(r.get("id", "")) + if pid not in seen or (r.get("score") or 0) > (seen[pid].get("score") or 0): + seen[pid] = r + results = sorted(seen.values(), key=lambda x: -(x.get("score") or 0)) + + # Guaranteed transcript inclusion for medical queries + if _TACTICAL_RE.search(query) or any( + kw in query.lower() for kw in ("medical", "medicine", "wound", "trauma", "tourniquet", + "hemorrhage", "bleeding", "fracture", "burn", "cpr", + "first aid", "triage", "casualty") + ): + transcript_results = self._fetch_guaranteed_transcripts(vector, domain="Medical", limit=3, exclude_subdomains=exclude_subs) + if transcript_results: + combined = list(results) + transcript_results + seen: dict[str, dict] = {} + for r in combined: + pid = str(r.get("id", "")) + if pid not in seen or (r.get("score") or 0) > (seen[pid].get("score") or 0): + seen[pid] = r + results = sorted(seen.values(), key=lambda x: -(x.get("score") or 0)) + + # Boost transcript sources across all retrieval paths + results = self._boost_transcripts(results) + + # Neural reranking via FlashRank, then MMR diversity selection + try: + results = self._rerank_flashrank(query, results) + results = self._mmr_select(results, self.valves.top_k) + except Exception as e: + log.warning(f"FlashRank reranking failed, falling back to keyword overlap: {e}") + results = _rerank_by_keyword_overlap(query, results) + results = results[:self.valves.top_k] + + # Get top-1 score for cascade decision + top_1_score = results[0]["score"] if results else 0.0 + + # ── CASCADE DECISION POINT (v5.0.0) ────────────────────────────── + if self.valves.cascade_enabled and top_1_score < self.valves.cascade_threshold: + # Tier 1 score too low, try Tier 2 (Kiwix) + log.info(f"Cascade: Tier 1 score {top_1_score:.3f} < {self.valves.cascade_threshold}, trying Kiwix") + + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Searching offline encyclopedia...", "done": False}} + ) + + kiwix_results = _search_kiwix(query, self._get_kiwix_books()) + + if kiwix_results: + tier_used = 2 + final_context = self._format_kiwix_context(kiwix_results[:KIWIX_MAX_RESULTS]) + log.info(f"Cascade: Tier 2 (Kiwix) returned {len(kiwix_results)} results") + else: + # Tier 2 failed, try Tier 3 (SearXNG) + log.info("Cascade: Tier 2 empty, trying SearXNG") + + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"description": "Searching the web...", "done": False}} + ) + + searxng_results = _search_searxng(query) + + if searxng_results: + tier_used = 3 + final_context = self._format_searxng_context(searxng_results) + log.info(f"Cascade: Tier 3 (SearXNG) returned {len(searxng_results)} results") + else: + # All tiers exhausted, fall back to whatever Tier 1 had + log.info("Cascade: All tiers exhausted, using Tier 1 results") + tier_used = 1 + final_context = self._format_context(results, "DOMAIN_KNOWLEDGE") + final_results = results + else: + # Tier 1 score good enough, use Qdrant results + tier_used = 1 + final_context = self._format_context(results, "DOMAIN_KNOWLEDGE") + final_results = results + + # Store results for outlet citations (only for Tier 1) + if tier_used == 1: + chat_id = body.get("chat_id", body.get("metadata", {}).get("chat_id", "")) + if chat_id: + _SOURCE_STORE[chat_id] = final_results + + # Log cascade decision + _log_cascade_decision( + query=query, + router_intent=router_intent, + top_1_score=top_1_score, + tier_used=tier_used, + num_results=len(results) if tier_used == 1 else (len(kiwix_results) if tier_used == 2 else len(searxng_results) if tier_used == 3 else 0), + ) + + # Build the RAG prompt with tier-appropriate instructions + if final_context: + if tier_used == 1: + rag_prompt = ( + "You have access to the RECON knowledge base — a curated library of military field manuals, " + "survival guides, preparedness literature, and video transcripts. Answer the user's question using " + "the reference material below. Reference sources using [DOMAIN_KNOWLEDGE:1], [DOMAIN_KNOWLEDGE:2], etc.\n\n" + "If the reference material doesn't adequately answer the question, say so explicitly rather " + "than filling gaps with general knowledge.\n\n" + "---REFERENCE MATERIAL---\n\n" + f"{final_context}\n\n" + "---END REFERENCE MATERIAL---" + ) + elif tier_used == 2: + rag_prompt = ( + "The RECON domain knowledge base did not have high-confidence results for this query. " + "The following information comes from offline Wikipedia/encyclopedia sources (Kiwix). " + "Reference sources using [OFFLINE_WIKI:1], [OFFLINE_WIKI:2], etc.\n\n" + "Note: This is general encyclopedia content, not domain-specific preparedness material.\n\n" + "---OFFLINE WIKI CONTENT---\n\n" + f"{final_context}\n\n" + "---END OFFLINE WIKI CONTENT---" + ) + else: # tier_used == 3 + rag_prompt = ( + "Neither the RECON knowledge base nor offline encyclopedias had relevant content. " + "The following information comes from a live web search. Reference sources using [WEB_SEARCH:1], etc.\n\n" + "Note: Web search results may be less reliable than curated sources. Verify important information.\n\n" + "---WEB SEARCH RESULTS---\n\n" + f"{final_context}\n\n" + "---END WEB SEARCH RESULTS---" + ) + else: + rag_prompt = ( + "You have access to the RECON knowledge base, but no relevant reference material was " + "found for this query in any tier (domain knowledge, offline wiki, or web search). " + "Answer from your general knowledge and clearly flag that your response is NOT backed by references." + ) + + # Add source priority instruction + rag_prompt += ( + "\n\nSource priority: When sources overlap, prefer DOMAIN_KNOWLEDGE over OFFLINE_WIKI over WEB_SEARCH. " + "Always cite which tier your information came from." + ) + + # Inject into system message + system_msg = next( + (m for m in messages if m.get("role") == "system"), None + ) + if system_msg: + system_msg["content"] = system_msg["content"] + "\n\n" + rag_prompt + else: + body["messages"].insert( + 0, {"role": "system", "content": rag_prompt} + ) + + # Emit final status + if __event_emitter__: + tier_names = {1: "RECON", 2: "Kiwix", 3: "Web"} + status_msg = f"Found results from {tier_names.get(tier_used, 'unknown')} (Tier {tier_used})" + await __event_emitter__( + { + "type": "status", + "data": { + "description": status_msg, + "done": True, + }, + } + ) + + except Exception as e: + log.warning(f"RECON RAG search failed: {e}") + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "RECON search unavailable, proceeding without references", + "done": True, + }, + } + ) + + return body + + async def outlet( + self, + body: dict, + __user__: Optional[dict] = None, + __event_emitter__: Callable[[dict], Awaitable[None]] = None, + ) -> dict: + if not self.valves.enabled or not __event_emitter__: + return body + + # Retrieve sources from module-level store (survives instance recreation) + chat_id = body.get("chat_id", "") + sources = _SOURCE_STORE.pop(chat_id, []) + if not sources: + return body + + # Emit citations for each source used + for r in sources: + try: + if not isinstance(r, dict): + continue + p = r.get("payload") or {} + if not isinstance(p, dict): + p = {} + + # Build citation — every field defensively None-checked + book = p.get("book_title") or p.get("filename") or "Unknown Source" + page = p.get("page_ref") + if page is not None and str(page).strip(): + page_str = str(page).strip() + if not page_str.startswith("p"): + page_str = f"p. {page_str}" + citation_name = f"{book}, {page_str}" + else: + citation_name = str(book) + + download_url = str(p.get("download_url") or "") + + # Safe summary extraction — handle None/missing without raising + summary = str(p.get("summary") or "") + if not summary: + content = str(p.get("content") or "") + summary = content[:300] if content else "" + + # Safe score formatting + score = r.get("score") + try: + relevance = f"{float(score):.2f}" + except (TypeError, ValueError): + relevance = "0.00" + + author = str(p.get("book_author") or "") + + await __event_emitter__( + { + "type": "source", + "data": { + "document": [summary], + "metadata": [ + { + "source": citation_name, + "url": download_url, + "author": author, + "relevance": relevance, + } + ], + "source": { + "name": citation_name, + "url": download_url, + }, + }, + } + ) + except Exception as e: + pid = r.get("id", "?") if isinstance(r, dict) else "?" + log.warning(f"Failed to emit citation (id={pid}): {e}") + + return body + + +# ── TEST BLOCK ─────────────────────────────────────────────────────────────── +if __name__ == "__main__": + import asyncio + + # Test queries for each tier + TEST_QUERIES = [ + ("tourniquet application steps", "Should hit Tier 1 (RECON)"), + ("population of Ukraine", "Should hit Tier 2 (Kiwix)"), + ("history of the Winter War between Finland and Russia", "Should hit Tier 2 (Kiwix)"), + ("latest iPhone reviews 2026", "Should hit Tier 3 (SearXNG)"), + ("compass declination adjustment", "Should hit Tier 1 (RECON)"), + ("what is the Coriolis effect", "Could go either way"), + ] + + async def run_tests(): + f = Filter() + results = [] + + print("=" * 70) + print("CASCADE TEST RESULTS") + print("=" * 70) + + for query, expected in TEST_QUERIES: + print(f"\n{'─' * 70}") + print(f"Query: {query}") + print(f"Expected: {expected}") + print("─" * 70) + + # Simulate a request body + body = { + "messages": [ + {"role": "user", "content": query} + ], + "chat_id": f"test_{hash(query)}", + } + + try: + # Run through inlet + result_body = await f.inlet(body) + + # Extract what was injected + system_msg = next( + (m for m in result_body.get("messages", []) if m.get("role") == "system"), + None + ) + + if system_msg: + content = system_msg.get("content", "") + + # Determine tier used + if "[DOMAIN_KNOWLEDGE:" in content: + tier = 1 + elif "[OFFLINE_WIKI:" in content: + tier = 2 + elif "[WEB_SEARCH:" in content: + tier = 3 + else: + tier = 0 + + print(f"Tier Used: {tier}") + + # Get first 200 chars of context + context_start = content.find("---") + if context_start > 0: + context_preview = content[context_start:context_start+300] + print(f"Context Preview: {context_preview[:200]}...") + + results.append({ + "query": query, + "expected": expected, + "tier": tier, + }) + else: + print("No system message injected") + results.append({ + "query": query, + "expected": expected, + "tier": None, + }) + + except Exception as e: + print(f"ERROR: {e}") + results.append({ + "query": query, + "expected": expected, + "tier": None, + "error": str(e), + }) + + print("\n" + "=" * 70) + print("SUMMARY") + print("=" * 70) + for r in results: + tier_str = f"Tier {r['tier']}" if r.get('tier') else "ERROR" + print(f" {r['query'][:40]:<40} → {tier_str}") + + return results + + asyncio.run(run_tests())