diff --git a/tools/recon_rag_tool.py b/tools/recon_rag_tool.py
index b0bc0cf..c835864 100644
--- a/tools/recon_rag_tool.py
+++ b/tools/recon_rag_tool.py
@@ -1,1119 +1,1653 @@
-"""
-title: RECON Knowledge Base
-author: Echo6
-version: 4.3.0
-description: RAG filter that searches the RECON knowledge base and injects reference material into Aurora's context. Emits citations with PDF download links. Supports intent-based metadata filtering, FlashRank neural reranking with MMR diversity, Ollama-powered query expansion, transcript source boosting, semantic query routing with inline navigation, and address book place resolution.
-"""
-
-import logging
-import json
-import math
-import re
-import threading
-from typing import Optional, Callable, Awaitable
-from concurrent.futures import ThreadPoolExecutor, as_completed
-
-import requests
-from pydantic import BaseModel, Field
-
-log = logging.getLogger(__name__)
-
-# Module-level source store: keyed by chat_id so inlet/outlet share state
-# even if OWI instantiates separate Filter objects per call.
-_SOURCE_STORE: dict[str, list] = {}
-
-# ── Semantic Query Router (v4.3.0) ───────────────────────────────────────────
-ROUTE_EXAMPLES = {
- "nav_route": [
- "how do I get to Boise",
- "directions to Twin Falls",
- "how do I get from Buhl to Boise",
- "drive from Jerome to Sun Valley",
- "route from Boise to McCall",
- "what's the fastest way to Sun Valley",
- "how far is it to Twin Falls",
- "take me to Shoshone",
- "navigate to the airport",
- "how do I drive to Salt Lake City",
- "walking directions to the park",
- "bike route to downtown",
- ],
- "nav_reverse_geocode": [
- "what town is at 42.5, -114.7",
- "where am I right now",
- "what is at coordinates 43.6, -116.2",
- "what location is 42.574, -114.607",
- "where is this place 44.0, -114.3",
- "what city is near 42.7, -114.5",
- "reverse geocode 43.0, -115.0",
- "what's at this location 42.9, -114.8",
- ],
- "direct_answer": [
- "hello",
- "hey aurora",
- "good morning",
- "thanks",
- "thank you",
- "what's your name",
- "who are you",
- "tell me a joke",
- "how are you",
- "hi there",
- ],
- "rag_search": [
- "what does the survival manual say about water",
- "how to purify water in the field",
- "how to treat a gunshot wound",
- "what is the ranger handbook chapter on patrolling",
- "field manual water purification",
- "how to build a shelter in the wilderness",
- "tactical combat casualty care procedures",
- "what does FM 21-76 say about fire starting",
- ],
-}
-
-_ROUTE_CENTROIDS: dict | None = None
-_ROUTER_LOCK = threading.Lock()
-
-
-def _embed_batch_router(texts: list[str], tei_url: str) -> list[list[float]]:
- resp = requests.post(tei_url, json={"inputs": texts}, timeout=30)
- resp.raise_for_status()
- return resp.json()
-
-
-def _compute_centroid(vectors: list[list[float]]) -> list[float]:
- n = len(vectors)
- dim = len(vectors[0])
- centroid = [0.0] * dim
- for vec in vectors:
- for i in range(dim):
- centroid[i] += vec[i]
- for i in range(dim):
- centroid[i] /= n
- return centroid
-
-
-def _cosine_similarity(a: list[float], b: list[float]) -> float:
- dot = 0.0
- norm_a = 0.0
- norm_b = 0.0
- for i in range(len(a)):
- dot += a[i] * b[i]
- norm_a += a[i] * a[i]
- norm_b += b[i] * b[i]
- denom = math.sqrt(norm_a) * math.sqrt(norm_b)
- if denom == 0:
- return 0.0
- return dot / denom
-
-
-def _ensure_centroids(tei_url: str) -> dict[str, list[float]]:
- global _ROUTE_CENTROIDS
- if _ROUTE_CENTROIDS is not None:
- return _ROUTE_CENTROIDS
- with _ROUTER_LOCK:
- if _ROUTE_CENTROIDS is not None:
- return _ROUTE_CENTROIDS
- all_texts = []
- route_ranges: dict[str, tuple[int, int]] = {}
- offset = 0
- for route, examples in ROUTE_EXAMPLES.items():
- route_ranges[route] = (offset, offset + len(examples))
- all_texts.extend(examples)
- offset += len(examples)
- all_vectors = _embed_batch_router(all_texts, tei_url)
- centroids = {}
- for route, (start, end) in route_ranges.items():
- centroids[route] = _compute_centroid(all_vectors[start:end])
- _ROUTE_CENTROIDS = centroids
- return _ROUTE_CENTROIDS
-
-
-def _classify_query(
- query: str,
- tei_url: str,
- threshold: float = 0.45,
-) -> tuple[str, float]:
- """Classify query intent. Returns ("rag_search", 0.0) on any failure."""
- try:
- centroids = _ensure_centroids(tei_url)
- vecs = _embed_batch_router([query], tei_url)
- query_vec = vecs[0]
- best_route = "rag_search"
- best_score = 0.0
- for route, centroid in centroids.items():
- sim = _cosine_similarity(query_vec, centroid)
- if sim > best_score:
- best_score = sim
- best_route = route
- if best_score < threshold:
- return ("rag_search", best_score)
- return (best_route, best_score)
- except Exception as e:
- log.warning(f"Router classification failed: {e}")
- return ("rag_search", 0.0)
-
-
-# ── Navigation handlers (v4.3.0) ─────────────────────────────────────────────
-_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$')
-_FROM_TO_RE = re.compile(r'from\s+(.+?)\s+to\s+(.+?)(?:\s+by\s+\w+)?$', re.IGNORECASE)
-_TO_RE = re.compile(r'(?:to|towards?)\s+(?:the\s+)?(.+?)$', re.IGNORECASE)
-_COORD_IN_TEXT_RE = re.compile(r'(-?\d+\.?\d+)\s*,\s*(-?\d+\.?\d+)')
-_MODE_MAP = {
- "walk": "pedestrian", "walking": "pedestrian", "foot": "pedestrian", "pedestrian": "pedestrian",
- "bike": "bicycle", "cycling": "bicycle", "bicycle": "bicycle", "cycle": "bicycle",
- "truck": "truck", "lorry": "truck",
- "drive": "auto", "driving": "auto", "car": "auto", "auto": "auto",
-}
-
-
-def _detect_mode(query: str) -> str:
- q = query.lower()
- for keyword, mode in _MODE_MAP.items():
- if keyword in q:
- return mode
- return "auto"
-
-
-def _clean_place(text: str) -> str:
- """Clean a place string for geocoding: strip articles, punctuation, normalize 'in' to comma."""
- s = text.strip().rstrip('?.,!')
- # Strip leading articles
- s = re.sub(r'^(the|a|an)\s+', '', s, flags=re.IGNORECASE)
- # "214 North St in Filer ID" → "214 North St, Filer, ID"
- s = re.sub(r'\s+in\s+', ', ', s, count=1, flags=re.IGNORECASE)
- return s.strip()
-
-
-def _parse_nav_query(query: str) -> tuple[str, str, str] | None:
- mode = _detect_mode(query)
- m = _FROM_TO_RE.search(query)
- if m:
- return (_clean_place(m.group(1)), _clean_place(m.group(2)), mode)
- m = _TO_RE.search(query)
- if m:
- dest = _clean_place(m.group(1))
- if dest:
- return (None, dest, mode)
- return None
-
-
-def _geocode(query: str, photon_url: str, address_book_url: str = "") -> tuple[float, float, str] | tuple[None, None, None]:
- m = _COORD_RE.match(query.strip())
- if m:
- lat, lon = float(m.group(1)), float(m.group(2))
- return lat, lon, query
- # Address book lookup (before Photon)
- ab = _address_book_lookup(query, address_book_url)
- if ab:
- return ab['lat'], ab['lon'], ab.get('address') or ab['name']
- resp = requests.get(
- f"{photon_url}/api",
- params={"q": query, "limit": 1},
- timeout=10,
- )
- resp.raise_for_status()
- features = resp.json().get("features", [])
- if not features:
- return None, None, None
- props = features[0]["properties"]
- coords = features[0]["geometry"]["coordinates"]
- parts = [props.get("name", "")]
- for key in ("city", "state", "country"):
- v = props.get(key)
- if v and v != parts[-1]:
- parts.append(v)
- return coords[1], coords[0], ", ".join(p for p in parts if p)
-
-
-def _route_valhalla(
- orig: tuple[float, float],
- dest: tuple[float, float],
- mode: str,
- valhalla_url: str,
-) -> str | None:
- try:
- resp = requests.post(
- f"{valhalla_url}/route",
- json={
- "locations": [
- {"lat": orig[0], "lon": orig[1]},
- {"lat": dest[0], "lon": dest[1]},
- ],
- "costing": mode,
- "directions_options": {"units": "miles"},
- },
- timeout=30,
- )
- except requests.RequestException:
- return None
- if resp.status_code != 200:
- return None
- trip = resp.json()["trip"]
- summary = trip["summary"]
- legs = trip["legs"][0]["maneuvers"]
- miles = round(summary["length"], 1)
- minutes = round(summary["time"] / 60, 1)
- lines = [f"Distance: {miles} miles | Time: {minutes} minutes", ""]
- for i, m in enumerate(legs, 1):
- inst = m["instruction"]
- dist = m.get("length", 0)
- if dist > 0:
- lines.append(f"{i}. {inst} — {round(dist, 1)} mi")
- else:
- lines.append(f"{i}. {inst}")
- return "\n".join(lines)
-
-
-def _handle_nav_route(
- query: str,
- photon_url: str,
- valhalla_url: str,
- default_origin: str,
- address_book_url: str = "",
-) -> str | None:
- parsed = _parse_nav_query(query)
- if not parsed:
- return None
- origin_str, dest_str, mode = parsed
- if not origin_str:
- origin_str = default_origin
- orig_lat, orig_lon, orig_name = _geocode(origin_str, photon_url, address_book_url)
- if orig_lat is None:
- return None
- dest_lat, dest_lon, dest_name = _geocode(dest_str, photon_url, address_book_url)
- if dest_lat is None:
- return None
- directions = _route_valhalla(
- (orig_lat, orig_lon), (dest_lat, dest_lon), mode, valhalla_url
- )
- if not directions:
- return None
- return f"Directions from {orig_name} to {dest_name} ({mode}):\n{directions}"
-
-
-def _handle_reverse_geocode(query: str, photon_url: str) -> str | None:
- m = _COORD_IN_TEXT_RE.search(query)
- if not m:
- return None
- lat, lon = float(m.group(1)), float(m.group(2))
- try:
- resp = requests.get(
- f"{photon_url}/reverse",
- params={"lat": lat, "lon": lon, "limit": 1},
- timeout=10,
- )
- resp.raise_for_status()
- features = resp.json().get("features", [])
- if not features:
- return f"No location found near coordinates ({lat}, {lon})"
- props = features[0]["properties"]
- parts = []
- for key in ("name", "city", "state", "country"):
- v = props.get(key)
- if v and v not in parts:
- parts.append(v)
- display = ", ".join(parts) if parts else "Unknown location"
- return f"Location: {display} ({lat}, {lon})"
- except Exception:
- return None
-
-
-def _inject_nav_context(body: dict, context: str):
- messages = body.get("messages", [])
- nav_block = (
- "\n\n---NAVIGATION RESULT---\n\n"
- f"{context}\n\n"
- "---END NAVIGATION RESULT---\n\n"
- "Present these directions to the user exactly as provided. "
- "Do not summarize or omit steps. You may add brief contextual notes."
- )
- system_msg = next((m for m in messages if m.get("role") == "system"), None)
- if system_msg:
- system_msg["content"] = system_msg["content"] + nav_block
- else:
- body["messages"].insert(0, {"role": "system", "content": nav_block})
-
-
-
-def _address_book_lookup(query: str, address_book_url: str) -> dict | None:
- """Check RECON address book for exact place match. Returns dict with lat/lon or None."""
- if not address_book_url:
- return None
- try:
- resp = requests.get(
- f"{address_book_url}/api/address_book/lookup",
- params={"q": query},
- timeout=2,
- )
- if resp.status_code == 200:
- data = resp.json()
- if data.get("confidence") == "exact" and data.get("lat") and data.get("lon"):
- log.info(f"Address book hit: {query!r} → {data['name']} ({data['lat']}, {data['lon']})")
- return data
- return None
- except Exception:
- return None
-
-
-# ── End router/nav code ──────────────────────────────────────────────────────
-
-# Subdomains excluded from Medical results when tactical context detected
-_OBSTETRIC_SUBDOMAINS = [
- "Obstetrics", "Midwifery", "Pregnancy", "Pregnancy Care",
- "High-Risk Pregnancy", "Childbirth", "Postpartum Care",
- "Family Planning", "Contraception", "Breastfeeding",
- "Labor Complications", "Twin Delivery",
-]
-
-# Query intent patterns — compiled once at import time
-_PROCEDURAL_RE = re.compile(
- r"^(how\s+(do|can|should|would|to)\b|steps?\s+(to|for)\b|procedure\s+for\b|technique\s+for\b|way\s+to\b|method\s+(to|for)\b|guide\s+(to|for)\b|instructions?\s+for\b)",
- re.IGNORECASE,
-)
-_FOUNDATIONAL_RE = re.compile(
- r"^(what\s+(is|are|does|was|were)\b|explain\b|define\b|why\s+(does|do|is|are|did)\b|describe\b|meaning\s+of\b|difference\s+between\b)",
- re.IGNORECASE,
-)
-
-# Tactical keyword patterns for obstetric subdomain exclusion
-_TACTICAL_RE = re.compile(
- r"\b(tactical|combat|tccc|casevac|medevac|casualty|triage|tourniquet|hemorrhage|wound packing|chest seal|care under fire|point of injury|far forward|buddy aid|self aid|field care|9-line|march algorithm)\b",
- re.IGNORECASE,
-)
-
-
-def _rerank_by_keyword_overlap(query: str, results: list) -> list:
- """Rerank results by boosting those with query term overlap in content/summary/key_facts.
-
- Adds a boost of up to 0.15 based on the fraction of query tokens found in the result text.
- Results are re-sorted by boosted score.
- """
- q_tokens = set(re.findall(r'[a-z0-9][-a-z0-9]{2,}', query.lower()))
- if not q_tokens:
- return results
-
- reranked = []
- for r in results:
- p = r.get("payload", {})
- score = r.get("score", 0)
-
- # Build searchable text from content, summary, and key_facts
- parts = []
- content = p.get("content", "")
- if content:
- parts.append(content[:2000].lower())
- summary = p.get("summary", "")
- if summary:
- parts.append(summary.lower())
- key_facts = p.get("key_facts", [])
- if isinstance(key_facts, list):
- parts.append(" ".join(str(f) for f in key_facts).lower())
- searchable = " ".join(parts)
-
- # Count how many query tokens appear in the result
- if searchable:
- matches = sum(1 for t in q_tokens if t in searchable)
- overlap_ratio = matches / len(q_tokens)
- else:
- overlap_ratio = 0
-
- # Boost: up to 0.15 for perfect overlap
- boosted_score = score + (overlap_ratio * 0.15)
- reranked.append({**r, "score": boosted_score})
-
- reranked.sort(key=lambda x: -x["score"])
- return reranked
-
-
-class Filter:
- class Valves(BaseModel):
- tei_url: str = Field(
- default="http://100.64.0.14:8090/embed",
- description="TEI embedding endpoint",
- )
- qdrant_url: str = Field(
- default="http://100.64.0.14:6333",
- description="Qdrant REST API base URL",
- )
- collection: str = Field(
- default="recon_knowledge_hybrid",
- description="Qdrant collection name",
- )
- top_k: int = Field(
- default=8,
- description="Number of results to retrieve",
- )
- score_threshold: float = Field(
- default=0.3,
- description="Minimum similarity score to include a result",
- )
- fallback_min: int = Field(
- default=3,
- description="Minimum filtered results before falling back to unfiltered search",
- )
- candidate_limit: int = Field(
- default=50,
- description="Initial retrieval pool size for reranking",
- )
- rerank_top_n: int = Field(
- default=20,
- description="Keep top N after FlashRank reranking",
- )
- mmr_diversity: float = Field(
- default=0.3,
- description="MMR diversity 0-1 (0=pure relevance, 1=max diversity)",
- )
- enabled: bool = Field(
- default=True,
- description="Enable/disable RECON RAG augmentation",
- )
- priority: int = Field(
- default=0,
- description="Filter execution priority (lower = earlier)",
- )
- router_enabled: bool = Field(
- default=True,
- description="Enable semantic query routing",
- )
- router_threshold: float = Field(
- default=0.45,
- description="Min confidence for route classification",
- )
- photon_url: str = Field(
- default="http://100.64.0.24:2322",
- description="Photon geocoder URL",
- )
- valhalla_url: str = Field(
- default="http://100.64.0.24:8002",
- description="Valhalla routing URL",
- )
- address_book_url: str = Field(
- default="http://100.64.0.24:8420",
- description="RECON address book API base URL",
- )
-
- def __init__(self):
- self.valves = self.Valves()
- self._expansion_cache: dict[str, list[str]] = {}
- self._ranker = None
-
- def _embed_query(self, text: str) -> list:
- """Embed a query string using TEI."""
- resp = requests.post(
- self.valves.tei_url,
- json={"inputs": text},
- timeout=30,
- )
- resp.raise_for_status()
- return resp.json()[0]
-
- def _get_ranker(self):
- """Lazy-load FlashRank neural reranker."""
- if self._ranker is None:
- from flashrank import Ranker
- self._ranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2", cache_dir="/tmp/flashrank")
- return self._ranker
-
- def _rerank_flashrank(self, query: str, results: list) -> list:
- """Rerank results using FlashRank neural reranker.
-
- Takes Qdrant REST API result dicts (with 'payload' and 'score' keys).
- Returns reranked list with updated scores, trimmed to rerank_top_n.
- """
- from flashrank import RerankRequest
-
- ranker = self._get_ranker()
-
- passages = []
- for i, r in enumerate(results):
- p = r.get("payload", {})
- text = p.get("content", "")
- if not text:
- text = p.get("summary", "")
- passages.append({"id": i, "text": text[:2048]})
-
- if not passages:
- return results
-
- request = RerankRequest(query=query, passages=passages)
- ranked = ranker.rerank(request)
-
- reranked = []
- for item in ranked[:self.valves.rerank_top_n]:
- idx = item["id"]
- result_copy = dict(results[idx])
- result_copy["score"] = item["score"]
- reranked.append(result_copy)
-
- return reranked
-
- def _mmr_select(self, candidates: list, final_k: int) -> list:
- """Select final_k results using Maximal Marginal Relevance.
-
- Penalizes redundancy: same book_title (0.6), same domain (0.3), same source_type (0.1).
- Works with Qdrant REST API result dicts.
- """
- if len(candidates) <= final_k:
- return candidates
-
- selected = [candidates[0]]
- remaining = list(candidates[1:])
-
- while len(selected) < final_k and remaining:
- best_score = -999
- best_idx = 0
-
- for i, candidate in enumerate(remaining):
- relevance = candidate.get("score", 0)
- cp = candidate.get("payload", {})
-
- max_overlap = 0
- for sel in selected:
- sp = sel.get("payload", {})
- overlap = 0
-
- c_title = cp.get("book_title", "")
- s_title = sp.get("book_title", "")
- if c_title and s_title and c_title == s_title:
- overlap += 0.6
-
- c_domain = cp.get("domain", "")
- s_domain = sp.get("domain", "")
- if c_domain and s_domain and c_domain == s_domain:
- overlap += 0.3
-
- c_src = cp.get("source_type", "")
- s_src = sp.get("source_type", "")
- if c_src and s_src and c_src == s_src:
- overlap += 0.1
-
- max_overlap = max(max_overlap, overlap)
-
- diversity = self.valves.mmr_diversity
- mmr_score = (1 - diversity) * relevance - diversity * max_overlap
-
- if mmr_score > best_score:
- best_score = mmr_score
- best_idx = i
-
- selected.append(remaining.pop(best_idx))
-
- return selected
-
- @staticmethod
- def _detect_intent(query: str) -> Optional[list]:
- """Detect query intent and return preferred knowledge_types, or None for unfiltered."""
- q = query.strip()
- if _PROCEDURAL_RE.search(q):
- return ["procedural", "operational"]
- if _FOUNDATIONAL_RE.search(q):
- return ["foundational"]
- return None
-
- def _search_qdrant(
- self,
- vector: list,
- limit: int,
- knowledge_types: Optional[list] = None,
- domain: Optional[str] = None,
- exclude_subdomains: Optional[list] = None,
- ) -> list:
- """Search Qdrant for similar vectors, optionally filtered by knowledge_type and/or domain."""
- url = f"{self.valves.qdrant_url}/collections/{self.valves.collection}/points/search"
- payload = {
- "vector": vector,
- "limit": limit,
- "with_payload": True,
- "score_threshold": self.valves.score_threshold,
- }
-
- must_clauses = []
- must_not_clauses = []
- should_clauses = []
-
- if domain:
- must_clauses.append({"key": "domain", "match": {"value": domain}})
-
- if knowledge_types:
- for kt in knowledge_types:
- should_clauses.append({"key": "knowledge_type", "match": {"value": kt}})
-
- if exclude_subdomains:
- for sd in exclude_subdomains:
- must_not_clauses.append({"key": "subdomain", "match": {"value": sd}})
-
- if must_clauses or should_clauses or must_not_clauses:
- filter_obj = {}
- if must_clauses:
- filter_obj["must"] = must_clauses
- if should_clauses:
- filter_obj["should"] = should_clauses
- if must_not_clauses:
- filter_obj["must_not"] = must_not_clauses
- payload["filter"] = filter_obj
-
- resp = requests.post(url, json=payload, timeout=30)
- resp.raise_for_status()
- return resp.json().get("result", [])
-
- def _boost_transcripts(self, results: list, factor: float = 1.10) -> list:
- """Boost transcript source scores to surface video content alongside documents."""
- for r in results:
- p = r.get("payload", {})
- if p.get("source_type") == "transcript":
- r["score"] = r.get("score", 0) * factor
- return results
-
- def _fetch_guaranteed_transcripts(self, vector: list, domain: str = "Medical", limit: int = 3, exclude_subdomains: Optional[list] = None) -> list:
- """Fetch top transcript results for a domain regardless of score threshold."""
- url = f"{self.valves.qdrant_url}/collections/{self.valves.collection}/points/search"
- filter_obj = {
- "must": [
- {"key": "source_type", "match": {"value": "transcript"}},
- {"key": "domain", "match": {"value": domain}},
- ],
- }
- if exclude_subdomains:
- filter_obj["must_not"] = [
- {"key": "subdomain", "match": {"value": sd}} for sd in exclude_subdomains
- ]
- payload = {
- "vector": vector,
- "limit": limit,
- "with_payload": True,
- "filter": filter_obj,
- }
- try:
- resp = requests.post(url, json=payload, timeout=10)
- resp.raise_for_status()
- return resp.json().get("result", [])
- except Exception as e:
- log.warning(f"Guaranteed transcript fetch failed: {e}")
- return []
-
- def _expand_query_ollama(self, query: str) -> list[str]:
- """Generate alternative search terms via Ollama. Cached, 10s timeout, fail-safe."""
- if query in self._expansion_cache:
- return self._expansion_cache[query]
- try:
- resp = requests.post(
- "http://100.64.0.14:11434/api/generate",
- json={
- "model": "goekdenizguelmez/JOSIEFIED-Qwen3:8b",
- "prompt": (
- f'Given this search query for a military/survival/preparedness knowledge base: "{query}"\n'
- "Generate 3 specific technical search terms that would find TCCC, tactical medicine, "
- "or field craft content. Focus on specific procedures, equipment, or doctrine terms "
- "— not generic descriptions. Return only the terms, one per line, no numbering, no explanations."
- ),
- "stream": False,
- },
- timeout=10,
- )
- resp.raise_for_status()
- text = resp.json().get("response", "")
- terms = [
- t for t in (
- line.strip().lstrip("0123456789.-)*# ")
- for line in text.strip().split("\n")
- if line.strip()
- )
- if t and len(t) >= 3
- ][:3]
- self._expansion_cache[query] = terms
- log.info(f"Query expansion: {query!r} → {terms}")
- return terms
- except Exception as e:
- log.warning(f"Query expansion failed (proceeding without): {e}")
- self._expansion_cache[query] = []
- return []
-
- def _search_expanded_terms(
- self,
- terms: list[str],
- intent_types: Optional[list],
- limit: int,
- exclude_subdomains: Optional[list] = None,
- ) -> list:
- """Embed and search expanded query terms in parallel."""
- if not terms:
- return []
-
- def embed_and_search(term: str) -> list:
- vec = self._embed_query(term)
- return self._search_qdrant(vec, limit, knowledge_types=intent_types, exclude_subdomains=exclude_subdomains)
-
- results = []
- with ThreadPoolExecutor(max_workers=min(len(terms), 3)) as pool:
- futures = {pool.submit(embed_and_search, t): t for t in terms}
- for future in as_completed(futures):
- term = futures[future]
- try:
- results.extend(future.result())
- except Exception as e:
- log.warning(f"Expanded search for {term!r} failed: {e}")
- return results
-
- def _format_context(self, results: list) -> str:
- """Format search results into a context block for the system prompt."""
- if not results:
- return ""
-
- blocks = []
- for i, r in enumerate(results, 1):
- p = r.get("payload", {})
- score = r.get("score", 0)
-
- # Build citation line
- book = p.get("book_title") or p.get("filename", "Unknown")
- page = p.get("page_ref", "")
- if page:
- page_str = str(page)
- if not page_str.startswith("p"):
- page_str = f"p. {page_str}"
- citation = f"{book}, {page_str}"
- else:
- citation = book
-
- # Summary or truncated content
- summary = p.get("summary", "")
- if not summary:
- content = p.get("content", "")
- summary = content[:500] + "..." if len(content) > 500 else content
-
- # Key facts
- key_facts = p.get("key_facts", [])
- facts_str = ""
- if key_facts and isinstance(key_facts, list):
- facts_str = "\nKey facts: " + "; ".join(str(f) for f in key_facts[:5])
-
- # Domain
- domains = p.get("domain", [])
- subdomains = p.get("subdomain", [])
- domain_str = ""
- if domains:
- d = ", ".join(domains) if isinstance(domains, list) else str(domains)
- if subdomains:
- s = ", ".join(subdomains) if isinstance(subdomains, list) else str(subdomains)
- domain_str = f"\nDomain: {d} > {s}"
- else:
- domain_str = f"\nDomain: {d}"
-
- # Download URL
- dl = p.get("download_url", "")
- source_type = p.get("source_type", "document")
- if dl:
- if source_type == "transcript":
- dl_str = f"\nSource Video: {dl}"
- elif source_type == "web":
- dl_str = f"\nSource URL: {dl}"
- else:
- dl_str = f"\nSource PDF: {dl}"
- else:
- dl_str = ""
-
- block = f"[{i}] {citation} (relevance: {score:.2f})\n{summary}{facts_str}{domain_str}{dl_str}"
- blocks.append(block)
-
- return "\n\n".join(blocks)
-
- async def inlet(
- self,
- body: dict,
- __user__: Optional[dict] = None,
- __event_emitter__: Callable[[dict], Awaitable[None]] = None,
- ) -> dict:
- if not self.valves.enabled:
- return body
-
- # Get the latest user message
- messages = body.get("messages", [])
- user_messages = [m for m in messages if m.get("role") == "user"]
- if not user_messages:
- return body
-
- query = user_messages[-1].get("content", "")
- if not query or len(query.strip()) < 3:
- return body
-
- # ── ROUTER GATE (v4.3.0) ─────────────────────────────────────────
- if self.valves.router_enabled:
- route, confidence = _classify_query(
- query, self.valves.tei_url, self.valves.router_threshold
- )
- log.info(f"Router: {query!r} → {route} ({confidence:.3f})")
-
- if route == "direct_answer":
- if __event_emitter__:
- await __event_emitter__(
- {"type": "status", "data": {"description": "Direct response", "done": True}}
- )
- return body
-
- if route == "nav_route":
- if __event_emitter__:
- await __event_emitter__(
- {"type": "status", "data": {"description": "Getting directions...", "done": False}}
- )
- result = _handle_nav_route(
- query,
- self.valves.photon_url,
- self.valves.valhalla_url,
- "Buhl, Idaho",
- self.valves.address_book_url,
- )
- if result:
- _inject_nav_context(body, result)
- if __event_emitter__:
- await __event_emitter__(
- {"type": "status", "data": {"description": "Directions ready", "done": True}}
- )
- return body
- # Fall through to RAG if nav handling fails
-
- if route == "nav_reverse_geocode":
- if __event_emitter__:
- await __event_emitter__(
- {"type": "status", "data": {"description": "Looking up location...", "done": False}}
- )
- result = _handle_reverse_geocode(query, self.valves.photon_url)
- if result:
- _inject_nav_context(body, result)
- if __event_emitter__:
- await __event_emitter__(
- {"type": "status", "data": {"description": "Location found", "done": True}}
- )
- return body
- # Fall through to RAG if reverse geocode fails
-
- # route == "rag_search" or nav fallthrough → continue existing pipeline
-
- # ── EXISTING RAG PIPELINE ─────────────────────────────────────────
- # Emit status
- if __event_emitter__:
- await __event_emitter__(
- {
- "type": "status",
- "data": {
- "description": "Searching RECON knowledge base...",
- "done": False,
- },
- }
- )
-
- try:
- vector = self._embed_query(query)
-
- # Detect intent (knowledge_type filter)
- intent_types = self._detect_intent(query)
-
- # Exclude obstetric/midwifery content when tactical context detected
- exclude_subs = _OBSTETRIC_SUBDOMAINS if _TACTICAL_RE.search(query) else None
-
- # Start query expansion in background (runs concurrently with main search)
- expansion_executor = ThreadPoolExecutor(max_workers=1)
- expansion_future = expansion_executor.submit(self._expand_query_ollama, query)
-
- # Search Qdrant — unfiltered semantic search, optionally narrowed by knowledge_type
- pool_size = self.valves.candidate_limit
- if intent_types:
- results = self._search_qdrant(vector, pool_size, knowledge_types=intent_types,
- exclude_subdomains=exclude_subs)
- if len(results) < self.valves.fallback_min:
- results = self._search_qdrant(vector, pool_size, exclude_subdomains=exclude_subs)
- else:
- results = self._search_qdrant(vector, pool_size, exclude_subdomains=exclude_subs)
-
- # Collect expansion results and merge with main search
- try:
- expanded_terms = expansion_future.result(timeout=12)
- except Exception:
- expanded_terms = []
- expansion_executor.shutdown(wait=False)
-
- if expanded_terms:
- expanded_results = self._search_expanded_terms(
- expanded_terms, intent_types, pool_size,
- exclude_subdomains=exclude_subs,
- )
- if expanded_results:
- combined = list(results) + expanded_results
- seen: dict[str, dict] = {}
- for r in combined:
- pid = str(r.get("id", ""))
- if pid not in seen or (r.get("score") or 0) > (seen[pid].get("score") or 0):
- seen[pid] = r
- results = sorted(seen.values(), key=lambda x: -(x.get("score") or 0))
-
- # Guaranteed transcript inclusion for medical queries
- if _TACTICAL_RE.search(query) or any(
- kw in query.lower() for kw in ("medical", "medicine", "wound", "trauma", "tourniquet",
- "hemorrhage", "bleeding", "fracture", "burn", "cpr",
- "first aid", "triage", "casualty")
- ):
- transcript_results = self._fetch_guaranteed_transcripts(vector, domain="Medical", limit=3, exclude_subdomains=exclude_subs)
- if transcript_results:
- combined = list(results) + transcript_results
- seen: dict[str, dict] = {}
- for r in combined:
- pid = str(r.get("id", ""))
- if pid not in seen or (r.get("score") or 0) > (seen[pid].get("score") or 0):
- seen[pid] = r
- results = sorted(seen.values(), key=lambda x: -(x.get("score") or 0))
-
- # Boost transcript sources across all retrieval paths
- results = self._boost_transcripts(results)
-
- # Neural reranking via FlashRank, then MMR diversity selection
- try:
- results = self._rerank_flashrank(query, results)
- results = self._mmr_select(results, self.valves.top_k)
- except Exception as e:
- log.warning(f"FlashRank reranking failed, falling back to keyword overlap: {e}")
- results = _rerank_by_keyword_overlap(query, results)
- results = results[:self.valves.top_k]
-
- # Store results for outlet citations (module-level, keyed by chat_id)
- chat_id = body.get("chat_id", body.get("metadata", {}).get("chat_id", ""))
- if chat_id:
- _SOURCE_STORE[chat_id] = results
-
- # Build context block
- context = self._format_context(results)
-
- if context:
- rag_prompt = (
- "You have access to the RECON knowledge base — a curated library of military field manuals, "
- "survival guides, preparedness literature, and video transcripts. Answer the user's question using "
- "the reference material below. Reference sources using [1], [2], [3] etc. matching the "
- "numbered sources provided. Use these numbers inline in your response.\n\n"
- "If the reference material doesn't adequately answer the question, say so explicitly rather "
- "than filling gaps with general knowledge.\n\n"
- "---REFERENCE MATERIAL---\n\n"
- f"{context}\n\n"
- "---END REFERENCE MATERIAL---"
- )
- else:
- rag_prompt = (
- "You have access to the RECON knowledge base, but no relevant reference material was "
- "found for this query. Answer from your general knowledge and clearly flag that your "
- "response is NOT backed by the RECON reference library."
- )
-
- # Inject into system message
- system_msg = next(
- (m for m in messages if m.get("role") == "system"), None
- )
- if system_msg:
- system_msg["content"] = system_msg["content"] + "\n\n" + rag_prompt
- else:
- body["messages"].insert(
- 0, {"role": "system", "content": rag_prompt}
- )
-
- if __event_emitter__:
- status_msg = f"Found {len(results)} reference{'s' if len(results) != 1 else ''}" if results else "No matching references found"
- await __event_emitter__(
- {
- "type": "status",
- "data": {
- "description": status_msg,
- "done": True,
- },
- }
- )
-
- except Exception as e:
- log.warning(f"RECON RAG search failed: {e}")
- if __event_emitter__:
- await __event_emitter__(
- {
- "type": "status",
- "data": {
- "description": "RECON search unavailable, proceeding without references",
- "done": True,
- },
- }
- )
-
- return body
-
- async def outlet(
- self,
- body: dict,
- __user__: Optional[dict] = None,
- __event_emitter__: Callable[[dict], Awaitable[None]] = None,
- ) -> dict:
- if not self.valves.enabled or not __event_emitter__:
- return body
-
- # Retrieve sources from module-level store (survives instance recreation)
- chat_id = body.get("chat_id", "")
- sources = _SOURCE_STORE.pop(chat_id, [])
- if not sources:
- return body
-
- # Emit citations for each source used
- for r in sources:
- try:
- if not isinstance(r, dict):
- continue
- p = r.get("payload") or {}
- if not isinstance(p, dict):
- p = {}
-
- # Build citation — every field defensively None-checked
- book = p.get("book_title") or p.get("filename") or "Unknown Source"
- page = p.get("page_ref")
- if page is not None and str(page).strip():
- page_str = str(page).strip()
- if not page_str.startswith("p"):
- page_str = f"p. {page_str}"
- citation_name = f"{book}, {page_str}"
- else:
- citation_name = str(book)
-
- download_url = str(p.get("download_url") or "")
-
- # Safe summary extraction — handle None/missing without raising
- summary = str(p.get("summary") or "")
- if not summary:
- content = str(p.get("content") or "")
- summary = content[:300] if content else ""
-
- # Safe score formatting
- score = r.get("score")
- try:
- relevance = f"{float(score):.2f}"
- except (TypeError, ValueError):
- relevance = "0.00"
-
- author = str(p.get("book_author") or "")
-
- await __event_emitter__(
- {
- "type": "source",
- "data": {
- "document": [summary],
- "metadata": [
- {
- "source": citation_name,
- "url": download_url,
- "author": author,
- "relevance": relevance,
- }
- ],
- "source": {
- "name": citation_name,
- "url": download_url,
- },
- },
- }
- )
- except Exception as e:
- pid = r.get("id", "?") if isinstance(r, dict) else "?"
- log.warning(f"Failed to emit citation (id={pid}): {e}")
-
- return body
+"""
+title: RECON Knowledge Base
+author: Echo6
+version: 5.0.0
+description: RAG filter with three-tier cascade: Qdrant (domain knowledge) → Kiwix (offline wiki) → SearXNG (web search). Supports intent-based metadata filtering, FlashRank neural reranking with MMR diversity, Ollama-powered query expansion, transcript source boosting, semantic query routing with inline navigation, and address book place resolution.
+"""
+
+import logging
+import json
+import math
+import re
+import threading
+import html
+from datetime import datetime
+from html.parser import HTMLParser
+from pathlib import Path
+from typing import Optional, Callable, Awaitable
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from urllib.parse import quote, unquote
+
+import requests
+from pydantic import BaseModel, Field
+
+log = logging.getLogger(__name__)
+
+# Module-level source store: keyed by chat_id so inlet/outlet share state
+# even if OWI instantiates separate Filter objects per call.
+_SOURCE_STORE: dict[str, list] = {}
+
+# ── CASCADE CONFIGURATION (v5.0.0) ───────────────────────────────────────────
+# FlashRank score threshold for Tier 1 (Qdrant). Below this, fall through to Tier 2.
+# Based on calibration: RECON queries cluster at 0.95-1.0, misses below 0.3.
+# 0.5 is conservative - will let more through to Kiwix than strictly necessary.
+CASCADE_CONFIDENCE_THRESHOLD = 0.5
+
+# Kiwix-serve configuration
+KIWIX_BASE_URL = "http://localhost:8430"
+KIWIX_SEARCH_TIMEOUT = 5 # seconds
+KIWIX_ARTICLE_TIMEOUT = 5 # seconds
+KIWIX_MAX_RESULTS = 3
+
+# SearXNG configuration
+SEARXNG_URL = "http://192.168.1.102:8080"
+SEARXNG_TIMEOUT = 5 # seconds
+SEARXNG_MAX_RESULTS = 5
+
+# Cascade logging
+CASCADE_LOG_PATH = Path("/opt/recon/logs/cascade.jsonl")
+
+# ── Semantic Query Router (v4.3.0) ───────────────────────────────────────────
+ROUTE_EXAMPLES = {
+ "nav_route": [
+ "how do I get to Boise",
+ "directions to Twin Falls",
+ "how do I get from Buhl to Boise",
+ "drive from Jerome to Sun Valley",
+ "route from Boise to McCall",
+ "what's the fastest way to Sun Valley",
+ "how far is it to Twin Falls",
+ "take me to Shoshone",
+ "navigate to the airport",
+ "how do I drive to Salt Lake City",
+ "walking directions to the park",
+ "bike route to downtown",
+ ],
+ "nav_reverse_geocode": [
+ "what town is at 42.5, -114.7",
+ "where am I right now",
+ "what is at coordinates 43.6, -116.2",
+ "what location is 42.574, -114.607",
+ "where is this place 44.0, -114.3",
+ "what city is near 42.7, -114.5",
+ "reverse geocode 43.0, -115.0",
+ "what's at this location 42.9, -114.8",
+ ],
+ "direct_answer": [
+ "hello",
+ "hey aurora",
+ "good morning",
+ "thanks",
+ "thank you",
+ "what's your name",
+ "who are you",
+ "tell me a joke",
+ "how are you",
+ "hi there",
+ ],
+ "rag_search": [
+ "what does the survival manual say about water",
+ "how to purify water in the field",
+ "how to treat a gunshot wound",
+ "what is the ranger handbook chapter on patrolling",
+ "field manual water purification",
+ "how to build a shelter in the wilderness",
+ "tactical combat casualty care procedures",
+ "what does FM 21-76 say about fire starting",
+ ],
+}
+
+_ROUTE_CENTROIDS: dict | None = None
+_ROUTER_LOCK = threading.Lock()
+
+
+def _embed_batch_router(texts: list[str], tei_url: str) -> list[list[float]]:
+ resp = requests.post(tei_url, json={"inputs": texts}, timeout=30)
+ resp.raise_for_status()
+ return resp.json()
+
+
+def _compute_centroid(vectors: list[list[float]]) -> list[float]:
+ n = len(vectors)
+ dim = len(vectors[0])
+ centroid = [0.0] * dim
+ for vec in vectors:
+ for i in range(dim):
+ centroid[i] += vec[i]
+ for i in range(dim):
+ centroid[i] /= n
+ return centroid
+
+
+def _cosine_similarity(a: list[float], b: list[float]) -> float:
+ dot = 0.0
+ norm_a = 0.0
+ norm_b = 0.0
+ for i in range(len(a)):
+ dot += a[i] * b[i]
+ norm_a += a[i] * a[i]
+ norm_b += b[i] * b[i]
+ denom = math.sqrt(norm_a) * math.sqrt(norm_b)
+ if denom == 0:
+ return 0.0
+ return dot / denom
+
+
+def _ensure_centroids(tei_url: str) -> dict[str, list[float]]:
+ global _ROUTE_CENTROIDS
+ if _ROUTE_CENTROIDS is not None:
+ return _ROUTE_CENTROIDS
+ with _ROUTER_LOCK:
+ if _ROUTE_CENTROIDS is not None:
+ return _ROUTE_CENTROIDS
+ all_texts = []
+ route_ranges: dict[str, tuple[int, int]] = {}
+ offset = 0
+ for route, examples in ROUTE_EXAMPLES.items():
+ route_ranges[route] = (offset, offset + len(examples))
+ all_texts.extend(examples)
+ offset += len(examples)
+ all_vectors = _embed_batch_router(all_texts, tei_url)
+ centroids = {}
+ for route, (start, end) in route_ranges.items():
+ centroids[route] = _compute_centroid(all_vectors[start:end])
+ _ROUTE_CENTROIDS = centroids
+ return _ROUTE_CENTROIDS
+
+
+def _classify_query(
+ query: str,
+ tei_url: str,
+ threshold: float = 0.45,
+) -> tuple[str, float]:
+ """Classify query intent. Returns ("rag_search", 0.0) on any failure."""
+ try:
+ centroids = _ensure_centroids(tei_url)
+ vecs = _embed_batch_router([query], tei_url)
+ query_vec = vecs[0]
+ best_route = "rag_search"
+ best_score = 0.0
+ for route, centroid in centroids.items():
+ sim = _cosine_similarity(query_vec, centroid)
+ if sim > best_score:
+ best_score = sim
+ best_route = route
+ if best_score < threshold:
+ return ("rag_search", best_score)
+ return (best_route, best_score)
+ except Exception as e:
+ log.warning(f"Router classification failed: {e}")
+ return ("rag_search", 0.0)
+
+
+# ── Navigation handlers (v4.3.0) ─────────────────────────────────────────────
+_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$')
+_FROM_TO_RE = re.compile(r'from\s+(.+?)\s+to\s+(.+?)(?:\s+by\s+\w+)?$', re.IGNORECASE)
+_TO_RE = re.compile(r'(?:to|towards?)\s+(?:the\s+)?(.+?)$', re.IGNORECASE)
+_COORD_IN_TEXT_RE = re.compile(r'(-?\d+\.?\d+)\s*,\s*(-?\d+\.?\d+)')
+_MODE_MAP = {
+ "walk": "pedestrian", "walking": "pedestrian", "foot": "pedestrian", "pedestrian": "pedestrian",
+ "bike": "bicycle", "cycling": "bicycle", "bicycle": "bicycle", "cycle": "bicycle",
+ "truck": "truck", "lorry": "truck",
+ "drive": "auto", "driving": "auto", "car": "auto", "auto": "auto",
+}
+
+
+def _detect_mode(query: str) -> str:
+ q = query.lower()
+ for keyword, mode in _MODE_MAP.items():
+ if keyword in q:
+ return mode
+ return "auto"
+
+
+def _clean_place(text: str) -> str:
+ """Clean a place string for geocoding: strip articles, punctuation, normalize 'in' to comma."""
+ s = text.strip().rstrip('?.,!')
+ # Strip leading articles
+ s = re.sub(r'^(the|a|an)\s+', '', s, flags=re.IGNORECASE)
+ # "214 North St in Filer ID" → "214 North St, Filer, ID"
+ s = re.sub(r'\s+in\s+', ', ', s, count=1, flags=re.IGNORECASE)
+ return s.strip()
+
+
+def _parse_nav_query(query: str) -> tuple[str, str, str] | None:
+ mode = _detect_mode(query)
+ m = _FROM_TO_RE.search(query)
+ if m:
+ return (_clean_place(m.group(1)), _clean_place(m.group(2)), mode)
+ m = _TO_RE.search(query)
+ if m:
+ dest = _clean_place(m.group(1))
+ if dest:
+ return (None, dest, mode)
+ return None
+
+
+def _geocode(query: str, photon_url: str, address_book_url: str = "") -> tuple[float, float, str] | tuple[None, None, None]:
+ m = _COORD_RE.match(query.strip())
+ if m:
+ lat, lon = float(m.group(1)), float(m.group(2))
+ return lat, lon, query
+ # Address book lookup (before Photon)
+ ab = _address_book_lookup(query, address_book_url)
+ if ab:
+ return ab['lat'], ab['lon'], ab.get('address') or ab['name']
+ resp = requests.get(
+ f"{photon_url}/api",
+ params={"q": query, "limit": 1},
+ timeout=10,
+ )
+ resp.raise_for_status()
+ features = resp.json().get("features", [])
+ if not features:
+ return None, None, None
+ props = features[0]["properties"]
+ coords = features[0]["geometry"]["coordinates"]
+ parts = [props.get("name", "")]
+ for key in ("city", "state", "country"):
+ v = props.get(key)
+ if v and v != parts[-1]:
+ parts.append(v)
+ return coords[1], coords[0], ", ".join(p for p in parts if p)
+
+
+def _route_valhalla(
+ orig: tuple[float, float],
+ dest: tuple[float, float],
+ mode: str,
+ valhalla_url: str,
+) -> str | None:
+ try:
+ resp = requests.post(
+ f"{valhalla_url}/route",
+ json={
+ "locations": [
+ {"lat": orig[0], "lon": orig[1]},
+ {"lat": dest[0], "lon": dest[1]},
+ ],
+ "costing": mode,
+ "directions_options": {"units": "miles"},
+ },
+ timeout=30,
+ )
+ except requests.RequestException:
+ return None
+ if resp.status_code != 200:
+ return None
+ trip = resp.json()["trip"]
+ summary = trip["summary"]
+ legs = trip["legs"][0]["maneuvers"]
+ miles = round(summary["length"], 1)
+ minutes = round(summary["time"] / 60, 1)
+ lines = [f"Distance: {miles} miles | Time: {minutes} minutes", ""]
+ for i, m in enumerate(legs, 1):
+ inst = m["instruction"]
+ dist = m.get("length", 0)
+ if dist > 0:
+ lines.append(f"{i}. {inst} — {round(dist, 1)} mi")
+ else:
+ lines.append(f"{i}. {inst}")
+ return "\n".join(lines)
+
+
+def _handle_nav_route(
+ query: str,
+ photon_url: str,
+ valhalla_url: str,
+ default_origin: str,
+ address_book_url: str = "",
+) -> str | None:
+ parsed = _parse_nav_query(query)
+ if not parsed:
+ return None
+ origin_str, dest_str, mode = parsed
+ if not origin_str:
+ origin_str = default_origin
+ orig_lat, orig_lon, orig_name = _geocode(origin_str, photon_url, address_book_url)
+ if orig_lat is None:
+ return None
+ dest_lat, dest_lon, dest_name = _geocode(dest_str, photon_url, address_book_url)
+ if dest_lat is None:
+ return None
+ directions = _route_valhalla(
+ (orig_lat, orig_lon), (dest_lat, dest_lon), mode, valhalla_url
+ )
+ if not directions:
+ return None
+ return f"Directions from {orig_name} to {dest_name} ({mode}):\n{directions}"
+
+
+def _handle_reverse_geocode(query: str, photon_url: str) -> str | None:
+ m = _COORD_IN_TEXT_RE.search(query)
+ if not m:
+ return None
+ lat, lon = float(m.group(1)), float(m.group(2))
+ try:
+ resp = requests.get(
+ f"{photon_url}/reverse",
+ params={"lat": lat, "lon": lon, "limit": 1},
+ timeout=10,
+ )
+ resp.raise_for_status()
+ features = resp.json().get("features", [])
+ if not features:
+ return f"No location found near coordinates ({lat}, {lon})"
+ props = features[0]["properties"]
+ parts = []
+ for key in ("name", "city", "state", "country"):
+ v = props.get(key)
+ if v and v not in parts:
+ parts.append(v)
+ display = ", ".join(parts) if parts else "Unknown location"
+ return f"Location: {display} ({lat}, {lon})"
+ except Exception:
+ return None
+
+
+def _inject_nav_context(body: dict, context: str):
+ messages = body.get("messages", [])
+ nav_block = (
+ "\n\n---NAVIGATION RESULT---\n\n"
+ f"{context}\n\n"
+ "---END NAVIGATION RESULT---\n\n"
+ "Present these directions to the user exactly as provided. "
+ "Do not summarize or omit steps. You may add brief contextual notes."
+ )
+ system_msg = next((m for m in messages if m.get("role") == "system"), None)
+ if system_msg:
+ system_msg["content"] = system_msg["content"] + nav_block
+ else:
+ body["messages"].insert(0, {"role": "system", "content": nav_block})
+
+
+
+def _address_book_lookup(query: str, address_book_url: str) -> dict | None:
+ """Check RECON address book for exact place match. Returns dict with lat/lon or None."""
+ if not address_book_url:
+ return None
+ try:
+ resp = requests.get(
+ f"{address_book_url}/api/address_book/lookup",
+ params={"q": query},
+ timeout=2,
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ if data.get("confidence") == "exact" and data.get("lat") and data.get("lon"):
+ log.info(f"Address book hit: {query!r} → {data['name']} ({data['lat']}, {data['lon']})")
+ return data
+ return None
+ except Exception:
+ return None
+
+
+# ── End router/nav code ──────────────────────────────────────────────────────
+
+# ── Kiwix Search Helpers (v5.0.0) ────────────────────────────────────────────
+
+class _KiwixResultParser(HTMLParser):
+ """Parse Kiwix search results HTML to extract articles."""
+ def __init__(self):
+ super().__init__()
+ self.results = []
+ self._in_results = False
+ self._in_li = False
+ self._in_cite = False
+ self._in_info = False
+ self._current = {}
+ self._capture_text = False
+
+ def handle_starttag(self, tag, attrs):
+ attrs_dict = dict(attrs)
+ if tag == "div" and "results" in attrs_dict.get("class", ""):
+ self._in_results = True
+ elif self._in_results and tag == "li":
+ self._in_li = True
+ self._current = {"title": "", "url": "", "snippet": "", "word_count": ""}
+ elif self._in_li and tag == "a" and not self._current.get("url"):
+ self._current["url"] = attrs_dict.get("href", "")
+ self._capture_text = True
+ elif self._in_li and tag == "cite":
+ self._in_cite = True
+ self._capture_text = True
+ elif self._in_li and tag == "div" and "informations" in attrs_dict.get("class", ""):
+ self._in_info = True
+ self._capture_text = True
+
+ def handle_endtag(self, tag):
+ if tag == "div" and self._in_results and not self._in_li:
+ self._in_results = False
+ elif tag == "li" and self._in_li:
+ if self._current.get("url"):
+ self.results.append(self._current)
+ self._current = {}
+ self._in_li = False
+ elif tag == "a" and self._capture_text and not self._in_cite:
+ self._capture_text = False
+ elif tag == "cite":
+ self._in_cite = False
+ self._capture_text = False
+ elif tag == "div" and self._in_info:
+ self._in_info = False
+ self._capture_text = False
+
+ def handle_data(self, data):
+ if self._capture_text and self._in_li:
+ text = data.strip()
+ if self._in_cite:
+ self._current["snippet"] += text + " "
+ elif self._in_info:
+ self._current["word_count"] = text
+ elif not self._current.get("title"):
+ self._current["title"] = text
+
+
+def _strip_html_tags(html_content: str) -> str:
+ """Simple HTML to plain text conversion using stdlib."""
+ # Remove script and style elements
+ text = re.sub(r'', '', html_content, flags=re.DOTALL | re.IGNORECASE)
+ text = re.sub(r'', '', text, flags=re.DOTALL | re.IGNORECASE)
+ # Remove tags
+ text = re.sub(r'<[^>]+>', ' ', text)
+ # Decode entities
+ text = html.unescape(text)
+ # Normalize whitespace
+ text = re.sub(r'\s+', ' ', text).strip()
+ return text
+
+
+def _fetch_kiwix_books() -> list[str]:
+ """Fetch list of available books from kiwix-serve catalog."""
+ try:
+ resp = requests.get(
+ f"{KIWIX_BASE_URL}/catalog/v2/entries",
+ timeout=KIWIX_SEARCH_TIMEOUT,
+ )
+ resp.raise_for_status()
+ # Extract book names from href attributes
+ books = re.findall(r'href="/content/([^"]+)"', resp.text)
+ return list(set(books)) # dedupe
+ except Exception as e:
+ log.warning(f"Failed to fetch Kiwix book list: {e}")
+ return []
+
+
+def _search_kiwix_book(book: str, query: str, limit: int = 5) -> list[dict]:
+ """Search a single Kiwix book and return results."""
+ try:
+ resp = requests.get(
+ f"{KIWIX_BASE_URL}/search",
+ params={"content": book, "pattern": query, "limit": limit},
+ timeout=KIWIX_SEARCH_TIMEOUT,
+ )
+ if resp.status_code != 200:
+ return []
+
+ parser = _KiwixResultParser()
+ parser.feed(resp.text)
+
+ # Add book name to results
+ for r in parser.results:
+ r["book"] = book
+
+ return parser.results
+ except Exception as e:
+ log.warning(f"Kiwix search failed for {book}: {e}")
+ return []
+
+
+def _fetch_kiwix_article(url_path: str) -> str:
+ """Fetch and extract text content from a Kiwix article."""
+ try:
+ resp = requests.get(
+ f"{KIWIX_BASE_URL}{url_path}",
+ timeout=KIWIX_ARTICLE_TIMEOUT,
+ )
+ resp.raise_for_status()
+
+ # Extract main content - try to find article body
+ content = resp.text
+
+ # Try to extract just the main content area
+ main_match = re.search(r'