recon/enricher.py
Matt 563c16bb71 Initial commit: RECON codebase baseline
Current state of the pipeline code as of 2026-04-14 (Phase 1 scaffolding complete).
Config has new_pipeline.enabled=false and crawler.sites=[] per refactor plan.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-14 14:57:23 +00:00

264 lines
9 KiB
Python

import json
import os
import re
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
import google.generativeai as genai
from .utils import get_config, setup_logging
from .status import StatusDB
logger = setup_logging('recon.enricher')
def repair_json(text):
"""Attempt to repair common LLM JSON output issues including truncation."""
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
# Remove trailing commas before } or ]
text = re.sub(r',\s*([}\]])', r'\1', text)
# Handle truncated JSON: try to find the last complete object in the array
try:
json.loads(text, strict=False)
return text
except json.JSONDecodeError:
pass
# Find the last complete }, then close the array
# Walk backward to find the last valid closing brace
last_complete = -1
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for i, ch in enumerate(text):
if escape:
escape = False
continue
if ch == '\\' and in_string:
escape = True
continue
if ch == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if ch == '{':
depth_brace += 1
elif ch == '}':
depth_brace -= 1
if depth_brace == 0:
last_complete = i
elif ch == '[':
depth_bracket += 1
elif ch == ']':
depth_bracket -= 1
if last_complete > 0:
truncated = text[:last_complete + 1].rstrip().rstrip(',')
# Close any open arrays
open_brackets = truncated.count('[') - truncated.count(']')
truncated += ']' * open_brackets
return truncated
return text
ENRICH_PROMPT = """Extract knowledge concepts from this document text.
A concept is a SELF-CONTAINED piece of knowledge that can stand alone.
For each concept, provide ALL fields:
Required:
- content: Full text of the concept (complete procedure, definition, etc.)
- summary: 1-2 sentence summary
- title: Brief descriptive title
- domain: Array of 1-5 from: Foundational Skills, Sustainment Systems, Defense & Tactics, Off-Grid Systems, Communications, Scenario Playbooks, Reference
- subdomain: Array of specific subcategories (up to 10)
- keywords: Array of 3-30 searchable terms
- skill_level: novice | intermediate | advanced
- key_facts: Array of specific extractable claims, measurements, data points
Optional (include when present):
- scenario_applicable: Array from: tuesday_prepper, month_prepper, year_prepper, multi_year, eotwawki
- cross_domain_tags: Array from: sustainment, medical, security, communications, leadership, logistics, navigation, power_systems, water_systems, food_systems, tactical_ops, community_coordination
- chapter: Chapter name if identifiable
- page_ref: Page reference
- notes: Any additional context
Return JSON array. If no extractable concepts, return [].
Document text:
"""
class KeyRotator:
def __init__(self, keys):
self.keys = keys
self.index = 0
def next(self):
if not self.keys:
raise ValueError("No Gemini API keys configured")
key = self.keys[self.index % len(self.keys)]
self.index += 1
return key
def enrich_window(text, key, config):
genai.configure(api_key=key)
model = genai.GenerativeModel(
config['gemini']['model'],
generation_config={"response_mime_type": config['gemini']['response_mime_type']}
)
response = model.generate_content(ENRICH_PROMPT + text)
raw = response.text
try:
return json.loads(raw, strict=False)
except json.JSONDecodeError:
repaired = repair_json(raw)
return json.loads(repaired, strict=False)
def enrich_single(file_hash, db, config, key_rotator):
doc = db.get_document(file_hash)
if not doc:
return False
text_dir = os.path.join(config['paths']['text'], file_hash)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
window_size = config['processing']['enrich_window_size']
delay = config['processing']['rate_limit_delay']
max_retries = config['processing']['max_retries']
if not os.path.exists(text_dir):
db.mark_failed(file_hash, f"Text directory not found: {text_dir}")
return False
db.update_status(file_hash, 'enriching')
try:
os.makedirs(concepts_dir, exist_ok=True)
page_files = sorted([f for f in os.listdir(text_dir) if f.startswith('page_') and f.endswith('.txt')])
if not page_files:
db.mark_failed(file_hash, "No page files found")
return False
pages_text = []
for pf in page_files:
with open(os.path.join(text_dir, pf), encoding='utf-8') as f:
pages_text.append(f.read())
windows = []
for i in range(0, len(pages_text), window_size):
window_pages = pages_text[i:i + window_size]
combined = "\n\n".join(f"--- Page {i + j + 1} ---\n{t}" for j, t in enumerate(window_pages))
windows.append((i, combined))
total_concepts = 0
for w_idx, (start_page, window_text) in enumerate(windows):
window_file = os.path.join(concepts_dir, f"window_{w_idx+1:04d}.json")
if os.path.exists(window_file):
with open(window_file, encoding='utf-8') as f:
existing = json.load(f)
total_concepts += len(existing)
logger.debug(f" Window {w_idx+1} already exists, skipping")
continue
if len(window_text.strip()) < 50:
with open(window_file, 'w') as f:
json.dump([], f)
continue
concepts = None
for attempt in range(max_retries):
try:
key = key_rotator.next()
concepts = enrich_window(window_text, key, config)
break
except Exception as e:
logger.warning(f" Window {w_idx+1} attempt {attempt+1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1) * 2)
if concepts is None:
db.mark_failed(file_hash, f"All retries failed for window {w_idx+1}")
return False
if not isinstance(concepts, list):
concepts = [concepts] if isinstance(concepts, dict) else []
for c_idx, concept in enumerate(concepts):
concept['_window'] = w_idx + 1
concept['_start_page'] = start_page + 1
concept['_doc_hash'] = file_hash
# JSON FIRST: save before anything else
with open(window_file, 'w', encoding='utf-8') as f:
json.dump(concepts, f, indent=2, ensure_ascii=False)
total_concepts += len(concepts)
logger.debug(f" Window {w_idx+1}/{len(windows)}: {len(concepts)} concepts")
time.sleep(delay)
meta = {
'hash': file_hash,
'total_windows': len(windows),
'total_concepts': total_concepts,
'window_size': window_size,
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
}
with open(os.path.join(concepts_dir, 'meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
db.update_status(file_hash, 'enriched', concepts_extracted=total_concepts)
logger.info(f"Enriched {doc['filename']}: {total_concepts} concepts from {len(windows)} windows")
return True
except Exception as e:
logger.error(f"Enrichment failed for {file_hash}: {e}\n{traceback.format_exc()}")
db.mark_failed(file_hash, str(e))
return False
def run_enrichment(workers=None, limit=None):
config = get_config()
db = StatusDB()
workers = workers or config['processing']['enrich_workers']
keys = config.get('gemini_keys', [])
if not keys:
logger.error("No Gemini API keys configured in .env")
return 0
key_rotator = KeyRotator(keys)
extracted = db.get_by_status('extracted', limit=limit)
if not extracted:
logger.info("No extracted documents to enrich")
return 0
logger.info(f"Enriching {len(extracted)} documents with {workers} workers, {len(keys)} API key(s)")
success = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(enrich_single, doc['hash'], StatusDB(), config, key_rotator): doc
for doc in extracted
}
for future in as_completed(futures):
doc = futures[future]
try:
if future.result():
success += 1
except Exception as e:
logger.error(f"Worker error for {doc['hash']}: {e}")
logger.info(f"Enrichment complete: {success}/{len(extracted)} succeeded")
return success