mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 06:34:40 +02:00
- lib/dispatcher.py: one-shot dispatcher that scans acquired/<type>/ for content+sidecar pairs and routes to registered processors - lib/processors/transcript_processor.py: pre_flight() for transcripts (hash, dedupe, split into pages, register in DB, set text_dir) - lib/utils.py: resolve_text_dir() helper for text_dir column fallback - lib/enricher.py: use resolve_text_dir() instead of hardcoded path - lib/embedder.py: use resolve_text_dir() instead of hardcoded path - lib/processors/__init__.py, lib/acquisition/__init__.py: package inits Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
562 lines
24 KiB
Python
562 lines
24 KiB
Python
"""
|
|
RECON Enricher
|
|
|
|
Text to structured concepts via Gemini API. Saves JSON to data/concepts/{hash}/
|
|
BEFORE any DB operations. Uses 10-page windows, 4 API keys, 16 workers.
|
|
|
|
Resilience:
|
|
- Exponential backoff with jitter for transient errors (429, 500, 503, timeout)
|
|
- Permanent errors (JSON parse, auth) fail immediately without wasting retries
|
|
- Window failures skip that window and continue — partial enrichment beats zero
|
|
- Document marked enriched if ANY windows succeeded, failed only if ALL failed
|
|
|
|
Dependencies: google-generativeai
|
|
Config: processing.enrich_workers, processing.enrich_window_size, gemini, paths.concepts
|
|
"""
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import time
|
|
import traceback
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import google.generativeai as genai
|
|
|
|
from .utils import get_config, setup_logging
|
|
from .status import StatusDB
|
|
from .utils import resolve_text_dir
|
|
|
|
logger = setup_logging('recon.enricher')
|
|
|
|
# Docs stuck in "enriching" longer than this get reset to "extracted" for retry
|
|
STALE_ENRICHING_HOURS = 2
|
|
|
|
# ── Classification allowlists ───────────────────────────────────────────────
|
|
VALID_DOMAINS = {
|
|
'Agriculture & Livestock', 'Civil Organization', 'Communications',
|
|
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
|
|
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
|
|
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
|
|
'Vehicles', 'Water Systems', 'Wilderness Skills',
|
|
}
|
|
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
|
|
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
|
|
|
|
DOMAIN_FALLBACK = 'Foundational Skills'
|
|
KNOWLEDGE_TYPE_FALLBACK = 'foundational'
|
|
COMPLEXITY_FALLBACK = 'basic'
|
|
|
|
|
|
def repair_json(text):
|
|
"""Attempt to repair common LLM JSON output issues including truncation."""
|
|
# Remove control characters except newlines and tabs
|
|
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
|
|
# Fix invalid JSON escape sequences (e.g. \e, \p, \c from Gemini)
|
|
# Valid JSON escapes: \", \\, \/, \b, \f, \n, \r, \t, \uXXXX
|
|
text = re.sub(r'\\(?!["\\/bfnrtu])', r'\\\\', text)
|
|
# Remove trailing commas before } or ]
|
|
text = re.sub(r',\s*([}\]])', r'\1', text)
|
|
|
|
# Handle truncated JSON: try to find the last complete object in the array
|
|
try:
|
|
json.loads(text, strict=False)
|
|
return text
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Find the last complete }, then close the array
|
|
# Walk backward to find the last valid closing brace
|
|
last_complete = -1
|
|
depth_brace = 0
|
|
depth_bracket = 0
|
|
in_string = False
|
|
escape = False
|
|
|
|
for i, ch in enumerate(text):
|
|
if escape:
|
|
escape = False
|
|
continue
|
|
if ch == '\\' and in_string:
|
|
escape = True
|
|
continue
|
|
if ch == '"' and not escape:
|
|
in_string = not in_string
|
|
continue
|
|
if in_string:
|
|
continue
|
|
if ch == '{':
|
|
depth_brace += 1
|
|
elif ch == '}':
|
|
depth_brace -= 1
|
|
if depth_brace == 0:
|
|
last_complete = i
|
|
elif ch == '[':
|
|
depth_bracket += 1
|
|
elif ch == ']':
|
|
depth_bracket -= 1
|
|
|
|
if last_complete > 0:
|
|
truncated = text[:last_complete + 1].rstrip().rstrip(',')
|
|
# Close any open arrays
|
|
open_brackets = truncated.count('[') - truncated.count(']')
|
|
truncated += ']' * open_brackets
|
|
return truncated
|
|
|
|
return text
|
|
|
|
ENRICH_PROMPT = """Extract knowledge concepts from this document text.
|
|
|
|
A concept is a SELF-CONTAINED piece of knowledge that can stand alone.
|
|
|
|
For each concept, provide ALL fields:
|
|
|
|
Required:
|
|
- content: Full text of the concept (complete procedure, definition, etc.)
|
|
- summary: 1-2 sentence summary
|
|
- title: Brief descriptive title
|
|
- domain: must be exactly one of: Agriculture & Livestock, Civil Organization, Communications, Food Systems, Foundational Skills, Logistics, Medical, Navigation, Operations, Power Systems, Preservation & Storage, Security, Shelter & Construction, Technology, Tools & Equipment, Vehicles, Water Systems, Wilderness Skills — return ONLY this exact string, no variations, no new domains, no underscores, no synonyms
|
|
CRITICAL: Medical content (first aid, anatomy, pharmacology, herbs, veterinary, austere medicine) → Medical
|
|
CRITICAL: Food growing, farming, animal husbandry, livestock → Agriculture & Livestock
|
|
CRITICAL: Foraging, hunting, fishing, bushcraft, wilderness survival → Wilderness Skills
|
|
CRITICAL: Food preservation, storage, canning, dehydration, processing → Preservation & Storage
|
|
CRITICAL: Solar, wind, hydro, batteries, generators → Power Systems
|
|
CRITICAL: Water sourcing, filtration, sanitation, purification → Water Systems
|
|
CRITICAL: Building, carpentry, structural construction, shelter → Shelter & Construction
|
|
CRITICAL: Tactical operations, mission execution, combat maneuvers, search & rescue → Operations
|
|
CRITICAL: Governance, civil administration, community leadership → Civil Organization
|
|
CRITICAL: Electronics, IT, computing, engineering → Technology
|
|
CRITICAL: Hand tools, power tools, equipment maintenance → Tools & Equipment
|
|
CRITICAL: Motor vehicles, aircraft, watercraft, vehicle maintenance → Vehicles
|
|
CRITICAL: Radio, signals, networking, comms equipment → Communications
|
|
CRITICAL: Supply chain, transport, distribution, inventory → Logistics
|
|
CRITICAL: Physical security, OPSEC, threat assessment → Security
|
|
CRITICAL: Map reading, orienteering, GPS, celestial navigation → Navigation
|
|
CRITICAL: Cooking methods, food production, recipes, nutrition → Food Systems
|
|
- subdomain: Array of specific subcategories (up to 10)
|
|
- keywords: Array of 3-30 searchable terms
|
|
- knowledge_type: foundational | procedural | operational
|
|
foundational — concepts, definitions, theory, background knowledge, explanations of how things work
|
|
procedural — step-by-step techniques, instructions, how-to skills, methods you execute
|
|
operational — application under real conditions, decision-making, mission execution, judgment calls in context
|
|
Valid values are ONLY: foundational, procedural, operational — do not use any other values
|
|
- complexity: basic | intermediate | advanced
|
|
basic — requires little or no prior knowledge, introductory material, simple concepts
|
|
intermediate — requires some domain familiarity, assumes foundational knowledge is in place
|
|
advanced — requires significant experience or expertise, high-stakes or highly technical material
|
|
Valid values are ONLY: basic, intermediate, advanced — do not use any other values
|
|
- key_facts: Array of specific extractable claims, measurements, data points
|
|
|
|
Optional (include when present):
|
|
- scenario_applicable: Array from: tuesday_prepper, month_prepper, year_prepper, multi_year, eotwawki
|
|
- cross_domain_tags: Array from: sustainment, medical, security, communications, leadership, logistics, navigation, power_systems, water_systems, food_systems, tactical_ops, community_coordination
|
|
- chapter: Chapter name if identifiable
|
|
- page_ref: Page reference
|
|
- notes: Any additional context
|
|
|
|
EXAMPLES (knowledge_type + complexity):
|
|
- "Needle chest decompression procedure" → knowledge_type: "procedural", complexity: "advanced"
|
|
- "What is soil texture and why does it matter" → knowledge_type: "foundational", complexity: "basic"
|
|
- "Coordinating a fire team withdrawal under contact" → knowledge_type: "operational", complexity: "advanced"
|
|
|
|
Return JSON array. If no extractable concepts, return [].
|
|
|
|
Document text:
|
|
"""
|
|
|
|
|
|
class KeyRotator:
|
|
def __init__(self, keys):
|
|
self.keys = keys
|
|
self.index = 0
|
|
|
|
def next(self):
|
|
if not self.keys:
|
|
raise ValueError("No Gemini API keys configured")
|
|
key = self.keys[self.index % len(self.keys)]
|
|
self.index += 1
|
|
return key
|
|
|
|
|
|
def enrich_window(text, key, config):
|
|
genai.configure(api_key=key)
|
|
model = genai.GenerativeModel(
|
|
config['gemini']['model'],
|
|
generation_config={"response_mime_type": config['gemini']['response_mime_type']}
|
|
)
|
|
response = model.generate_content(ENRICH_PROMPT + text)
|
|
raw = response.text
|
|
try:
|
|
result = json.loads(raw, strict=False)
|
|
except json.JSONDecodeError:
|
|
repaired = repair_json(raw)
|
|
result = json.loads(repaired, strict=False)
|
|
# Filter out non-dict items (nested lists from truncated responses)
|
|
if isinstance(result, list):
|
|
result = [c for c in result if isinstance(c, dict)]
|
|
return result
|
|
|
|
|
|
def _is_transient(error_str):
|
|
"""Classify whether an error is transient (worth retrying) or permanent."""
|
|
s = error_str.lower()
|
|
transient_signals = ['429', 'resource_exhausted', 'quota', 'rate',
|
|
'500', '503', 'unavailable', 'timeout',
|
|
'connection', 'reset by peer', 'broken pipe']
|
|
return any(sig in s for sig in transient_signals)
|
|
|
|
|
|
def _retry_with_backoff(fn, max_retries=5, base_delay=5.0, max_delay=120.0):
|
|
"""Retry with exponential backoff + jitter for transient errors.
|
|
|
|
Backoff: ~5s, ~10s, ~20s, ~40s, ~80s (total ~155s before giving up).
|
|
Permanent errors (JSON parse, auth) raise immediately without retrying.
|
|
"""
|
|
last_exc = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return fn()
|
|
except Exception as e:
|
|
last_exc = e
|
|
err = str(e)
|
|
if not _is_transient(err):
|
|
raise # permanent — don't waste retries
|
|
if attempt < max_retries - 1:
|
|
delay = min(base_delay * (2 ** attempt) + random.uniform(0, base_delay), max_delay)
|
|
logger.info(f" Transient error (attempt {attempt+1}/{max_retries}), "
|
|
f"retrying in {delay:.0f}s: {err[:120]}")
|
|
time.sleep(delay)
|
|
else:
|
|
logger.warning(f" Transient error, max retries exhausted: {err[:150]}")
|
|
raise last_exc
|
|
|
|
|
|
def _reclassify_field(field_name, allowlist, concept, key, config, max_retries=3):
|
|
"""Retry Gemini up to max_retries to get a valid value for a specific field."""
|
|
content = concept.get('content', concept.get('summary', ''))
|
|
if isinstance(content, str):
|
|
content = content[:400]
|
|
else:
|
|
content = str(content)[:400]
|
|
title = concept.get('title', '(untitled)')
|
|
allowlist_str = ', '.join(sorted(allowlist))
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
prompt = (
|
|
f"Your previous response for '{field_name}' was invalid. "
|
|
f"You must return ONLY one of these exact strings: {allowlist_str}\n\n"
|
|
f"Title: {title}\n"
|
|
f"Content: {content}\n\n"
|
|
f"Return ONLY the exact string, nothing else. No explanation, no punctuation, no quotes."
|
|
)
|
|
genai.configure(api_key=key)
|
|
model = genai.GenerativeModel(
|
|
config['gemini']['model'],
|
|
generation_config={"response_mime_type": "text/plain"}
|
|
)
|
|
resp = model.generate_content(prompt)
|
|
value = resp.text.strip().strip('"').strip("'").strip()
|
|
if value in allowlist:
|
|
return value
|
|
# Try case-insensitive match for knowledge_type/complexity
|
|
for valid in allowlist:
|
|
if value.lower() == valid.lower():
|
|
return valid
|
|
except Exception as e:
|
|
err = str(e).lower()
|
|
if any(s in err for s in ['429', 'quota', 'rate', '503']):
|
|
time.sleep(min(3 * (2 ** attempt) + random.uniform(0, 2), 30))
|
|
else:
|
|
logger.warning(f" Reclassify retry {attempt+1} for {field_name} failed: {e}")
|
|
return None
|
|
|
|
|
|
def validate_and_fix_concepts(concepts, key, config):
|
|
"""Validate domain, knowledge_type, complexity on each concept.
|
|
|
|
For invalid values: retry Gemini up to 3 times, then apply safe fallback.
|
|
"""
|
|
for concept in concepts:
|
|
if not isinstance(concept, dict):
|
|
continue
|
|
|
|
# ── Validate domain ─────────────────────────────────────────────
|
|
domain = concept.get('domain')
|
|
if isinstance(domain, list):
|
|
# Legacy array format — find first valid or reclassify
|
|
valid = [d for d in domain if d in VALID_DOMAINS]
|
|
if valid:
|
|
concept['domain'] = valid[0]
|
|
else:
|
|
new_val = _reclassify_field('domain', VALID_DOMAINS, concept, key, config)
|
|
if new_val:
|
|
concept['domain'] = new_val
|
|
else:
|
|
logger.warning(f"Invalid domain {domain} for '{concept.get('title', '?')}', using fallback")
|
|
concept['domain'] = DOMAIN_FALLBACK
|
|
elif isinstance(domain, str):
|
|
if domain not in VALID_DOMAINS:
|
|
new_val = _reclassify_field('domain', VALID_DOMAINS, concept, key, config)
|
|
if new_val:
|
|
concept['domain'] = new_val
|
|
else:
|
|
logger.warning(f"Invalid domain '{domain}' for '{concept.get('title', '?')}', using fallback")
|
|
concept['domain'] = DOMAIN_FALLBACK
|
|
else:
|
|
concept['domain'] = DOMAIN_FALLBACK
|
|
|
|
# ── Validate knowledge_type ─────────────────────────────────────
|
|
kt = concept.get('knowledge_type', '')
|
|
if isinstance(kt, str):
|
|
kt = kt.lower().strip()
|
|
else:
|
|
kt = ''
|
|
if kt not in VALID_KNOWLEDGE_TYPES:
|
|
new_val = _reclassify_field('knowledge_type', VALID_KNOWLEDGE_TYPES, concept, key, config)
|
|
if new_val:
|
|
concept['knowledge_type'] = new_val
|
|
else:
|
|
logger.warning(f"Invalid knowledge_type '{kt}' for '{concept.get('title', '?')}', using fallback")
|
|
concept['knowledge_type'] = KNOWLEDGE_TYPE_FALLBACK
|
|
else:
|
|
concept['knowledge_type'] = kt
|
|
|
|
# ── Validate complexity ─────────────────────────────────────────
|
|
cx = concept.get('complexity', '')
|
|
if isinstance(cx, str):
|
|
cx = cx.lower().strip()
|
|
else:
|
|
cx = ''
|
|
if cx not in VALID_COMPLEXITIES:
|
|
new_val = _reclassify_field('complexity', VALID_COMPLEXITIES, concept, key, config)
|
|
if new_val:
|
|
concept['complexity'] = new_val
|
|
else:
|
|
logger.warning(f"Invalid complexity '{cx}' for '{concept.get('title', '?')}', using fallback")
|
|
concept['complexity'] = COMPLEXITY_FALLBACK
|
|
else:
|
|
concept['complexity'] = cx
|
|
|
|
return concepts
|
|
|
|
|
|
def enrich_single(file_hash, db, config, key_rotator):
|
|
doc = db.get_document(file_hash)
|
|
if not doc:
|
|
return False
|
|
|
|
text_dir = resolve_text_dir(file_hash, config, db)
|
|
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
|
|
window_size = config['processing']['enrich_window_size']
|
|
delay = config['processing']['rate_limit_delay']
|
|
proc = config.get('processing', {})
|
|
max_retries = proc.get('enrich_max_retries', proc.get('max_retries', 5))
|
|
base_delay = proc.get('enrich_base_delay', 5.0)
|
|
max_delay = proc.get('enrich_max_delay', 120.0)
|
|
|
|
if not os.path.exists(text_dir):
|
|
db.mark_failed(file_hash, f"Text directory not found: {text_dir}")
|
|
return False
|
|
|
|
db.update_status(file_hash, 'enriching')
|
|
|
|
try:
|
|
os.makedirs(concepts_dir, exist_ok=True)
|
|
|
|
page_files = sorted([f for f in os.listdir(text_dir) if f.startswith('page_') and f.endswith('.txt')])
|
|
if not page_files:
|
|
db.mark_failed(file_hash, "No page files found")
|
|
return False
|
|
|
|
pages_text = []
|
|
for pf in page_files:
|
|
with open(os.path.join(text_dir, pf), encoding='utf-8') as f:
|
|
pages_text.append(f.read())
|
|
|
|
windows = []
|
|
for i in range(0, len(pages_text), window_size):
|
|
window_pages = pages_text[i:i + window_size]
|
|
combined = "\n\n".join(f"--- Page {i + j + 1} ---\n{t}" for j, t in enumerate(window_pages))
|
|
windows.append((i, combined))
|
|
|
|
total_concepts = 0
|
|
failed_windows = []
|
|
|
|
for w_idx, (start_page, window_text) in enumerate(windows):
|
|
window_file = os.path.join(concepts_dir, f"window_{w_idx+1:04d}.json")
|
|
|
|
if os.path.exists(window_file):
|
|
with open(window_file, encoding='utf-8') as f:
|
|
existing = json.load(f)
|
|
total_concepts += len(existing)
|
|
logger.debug(f" Window {w_idx+1} already exists, skipping")
|
|
continue
|
|
|
|
if len(window_text.strip()) < 50:
|
|
with open(window_file, 'w') as f:
|
|
json.dump([], f)
|
|
continue
|
|
|
|
# Attempt enrichment with backoff — failures skip the window, not the doc
|
|
try:
|
|
key = key_rotator.next()
|
|
concepts = _retry_with_backoff(
|
|
lambda k=key: enrich_window(window_text, k, config),
|
|
max_retries=max_retries,
|
|
base_delay=base_delay,
|
|
max_delay=max_delay,
|
|
)
|
|
except Exception as e:
|
|
failed_windows.append((w_idx + 1, str(e)[:100]))
|
|
logger.warning(f" Window {w_idx+1}/{len(windows)} failed: {e}")
|
|
continue # skip this window, keep going
|
|
|
|
if not isinstance(concepts, list):
|
|
concepts = [concepts] if isinstance(concepts, dict) else []
|
|
concepts = [c for c in concepts if isinstance(c, dict)]
|
|
|
|
# Validate domain, knowledge_type, complexity — retry then fallback
|
|
validation_key = key_rotator.next()
|
|
concepts = validate_and_fix_concepts(concepts, validation_key, config)
|
|
|
|
for c_idx, concept in enumerate(concepts):
|
|
concept['_window'] = w_idx + 1
|
|
concept['_start_page'] = start_page + 1
|
|
concept['_doc_hash'] = file_hash
|
|
|
|
# JSON FIRST: save before anything else
|
|
with open(window_file, 'w', encoding='utf-8') as f:
|
|
json.dump(concepts, f, indent=2, ensure_ascii=False)
|
|
|
|
total_concepts += len(concepts)
|
|
logger.debug(f" Window {w_idx+1}/{len(windows)}: {len(concepts)} concepts")
|
|
time.sleep(delay)
|
|
|
|
# Decide document status based on results
|
|
meta = {
|
|
'hash': file_hash,
|
|
'total_windows': len(windows),
|
|
'total_concepts': total_concepts,
|
|
'failed_windows': len(failed_windows),
|
|
'window_size': window_size,
|
|
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
|
|
}
|
|
with open(os.path.join(concepts_dir, 'meta.json'), 'w') as f:
|
|
json.dump(meta, f, indent=2)
|
|
|
|
if total_concepts > 0 or not failed_windows:
|
|
# Some concepts extracted, or all windows were empty — mark enriched
|
|
error_msg = None
|
|
if total_concepts == 0 and doc.get('page_count', 0) >= 3:
|
|
error_msg = (f"0 concepts from {doc.get('page_count', '?')} pages — "
|
|
f"likely image-only PDF, may need manual review")
|
|
logger.warning(f" {doc['filename']}: {error_msg}")
|
|
elif failed_windows:
|
|
wins = ', '.join(str(w) for w, _ in failed_windows[:10])
|
|
error_msg = (f"Partial: {len(failed_windows)}/{len(windows)} "
|
|
f"windows failed (windows {wins})")
|
|
logger.warning(f" {doc['filename']}: {error_msg}")
|
|
db.update_status(file_hash, 'enriched', concepts_extracted=total_concepts,
|
|
error_message=error_msg)
|
|
fw_note = f", {len(failed_windows)} windows failed" if failed_windows else ""
|
|
logger.info(f"Enriched {doc['filename']}: {total_concepts} concepts "
|
|
f"from {len(windows)} windows{fw_note}")
|
|
return True
|
|
else:
|
|
# Every window failed — document truly failed
|
|
first_err = failed_windows[0][1] if failed_windows else 'unknown'
|
|
db.mark_failed(file_hash,
|
|
f"All {len(windows)} windows failed: {first_err}")
|
|
logger.error(f" {doc['filename']}: all {len(windows)} windows failed")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Enrichment failed for {file_hash}: {e}\n{traceback.format_exc()}")
|
|
db.mark_failed(file_hash, str(e))
|
|
return False
|
|
|
|
|
|
def _recover_stale_enriching(db, max_hours=STALE_ENRICHING_HOURS):
|
|
"""Reset docs stuck in enriching back to extracted so they get retried.
|
|
|
|
This handles the case where a previous enrichment run crashed mid-document.
|
|
The enricher skips already-completed window files, so no work is lost.
|
|
"""
|
|
import sqlite3
|
|
conn = db._get_conn()
|
|
rows = conn.execute(
|
|
"SELECT hash, filename FROM documents WHERE status = 'enriching'",
|
|
).fetchall()
|
|
if not rows:
|
|
return
|
|
|
|
# Check extracted_at timestamp — if enriching started > max_hours ago, reset
|
|
now = __import__('datetime').datetime.now(__import__('datetime').timezone.utc)
|
|
reset = []
|
|
for row in rows:
|
|
doc = db.get_document(row['hash'])
|
|
extracted_at = doc.get('extracted_at', '')
|
|
if not extracted_at:
|
|
reset.append(row)
|
|
continue
|
|
try:
|
|
from datetime import datetime, timezone
|
|
ts = datetime.fromisoformat(extracted_at)
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
age_hours = (now - ts).total_seconds() / 3600
|
|
if age_hours > max_hours:
|
|
reset.append(row)
|
|
except Exception:
|
|
reset.append(row)
|
|
|
|
for row in reset:
|
|
conn.execute(
|
|
"UPDATE documents SET status = 'extracted' WHERE hash = ?",
|
|
(row['hash'],)
|
|
)
|
|
logger.warning(f"Recovered stale enriching doc: {row['filename']} ({row['hash'][:12]}...)")
|
|
if reset:
|
|
conn.commit()
|
|
logger.info(f"Reset {len(reset)} stale enriching docs back to extracted")
|
|
|
|
|
|
def run_enrichment(workers=None, limit=None):
|
|
config = get_config()
|
|
db = StatusDB()
|
|
workers = workers or config['processing']['enrich_workers']
|
|
|
|
# Recover docs orphaned by previous crashed enrichment runs
|
|
_recover_stale_enriching(db)
|
|
|
|
keys = config.get('gemini_keys', [])
|
|
if not keys:
|
|
logger.error("No Gemini API keys configured in .env")
|
|
return 0
|
|
|
|
key_rotator = KeyRotator(keys)
|
|
|
|
extracted = db.get_by_status('extracted', limit=limit)
|
|
if not extracted:
|
|
logger.info("No extracted documents to enrich")
|
|
return 0
|
|
|
|
logger.info(f"Enriching {len(extracted)} documents with {workers} workers, {len(keys)} API key(s)")
|
|
success = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool:
|
|
futures = {
|
|
pool.submit(enrich_single, doc['hash'], StatusDB(), config, key_rotator): doc
|
|
for doc in extracted
|
|
}
|
|
for future in as_completed(futures):
|
|
doc = futures[future]
|
|
try:
|
|
if future.result():
|
|
success += 1
|
|
except Exception as e:
|
|
logger.error(f"Worker error for {doc['hash']}: {e}")
|
|
|
|
logger.info(f"Enrichment complete: {success}/{len(extracted)} succeeded")
|
|
return success
|