Initial commit: RECON codebase baseline

Current state of the pipeline code as of 2026-04-14 (Phase 1 scaffolding complete).
Config has new_pipeline.enabled=false and crawler.sites=[] per refactor plan.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-14 14:57:23 +00:00
commit 563c16bb71
59 changed files with 18327 additions and 0 deletions

0
lib/__init__.py Normal file
View file

1930
lib/api.py Normal file

File diff suppressed because it is too large Load diff

432
lib/crawler.py Normal file
View file

@ -0,0 +1,432 @@
"""
RECON Site Crawler URL discovery for bulk web ingestion.
Two discovery strategies:
1. Sitemap-based (preferred) parses sitemap.xml for all URLs
2. Link-following (fallback) crawls from root URL following internal links
Discovered URLs are fed into web_scraper.ingest_url() for processing.
"""
import re
import time
from collections import deque
from urllib.parse import urlparse, urljoin, urldefrag
import requests
from lxml import etree
from .utils import get_config, setup_logging
logger = setup_logging('recon.crawler')
def _get_crawler_config(config=None):
"""Load crawler config with defaults."""
if config is None:
config = get_config()
crawler_cfg = config.get('crawler', {})
web_cfg = config.get('web_scraper', {})
return {
'user_agent': (
crawler_cfg.get('user_agent') or
web_cfg.get('user_agent') or
'Mozilla/5.0 (compatible; RECON/1.0)'
),
'fetch_timeout': crawler_cfg.get('fetch_timeout', 30),
'rate_limit_delay': crawler_cfg.get('rate_limit_delay', 1.0),
'max_pages': crawler_cfg.get('max_pages', 500),
'max_depth': crawler_cfg.get('max_depth', 3),
'default_exclude': crawler_cfg.get('default_exclude', [
'/search', '/404', '/login', '/signup', '/auth/', '/api/', '/assets/', '/static/'
]),
}
# ─── Sitemap Discovery ─────────────────────────────────────────────
def discover_sitemap_url(base_url, config=None):
"""
Find the sitemap URL for a site.
Checks: robots.txt Sitemap: directive, /sitemap.xml,
/sitemap_index.xml, /sitemap-0.xml.
Returns sitemap URL or None.
"""
cfg = _get_crawler_config(config)
headers = {'User-Agent': cfg['user_agent']}
parsed = urlparse(base_url)
root = f"{parsed.scheme}://{parsed.netloc}"
# Check robots.txt first
try:
resp = requests.get(
f"{root}/robots.txt",
headers=headers,
timeout=cfg['fetch_timeout']
)
if resp.status_code == 200:
for line in resp.text.splitlines():
if line.strip().lower().startswith('sitemap:'):
sitemap_url = line.split(':', 1)[1].strip()
# Handle "Sitemap: https://..." — split(':',1) keeps the URL intact
# but "Sitemap: https://..." splits into "Sitemap" and " https://..."
# Need to rejoin properly
if not sitemap_url.startswith('http'):
sitemap_url = line[line.index(':') + 1:].strip()
logger.info(f"Found sitemap in robots.txt: {sitemap_url}")
return sitemap_url
except Exception as e:
logger.debug(f"robots.txt fetch failed: {e}")
# Try common sitemap locations
candidates = [
f"{root}/sitemap.xml",
f"{root}/sitemap_index.xml",
f"{root}/sitemap-0.xml",
]
for url in candidates:
try:
resp = requests.head(
url,
headers=headers,
timeout=cfg['fetch_timeout'],
allow_redirects=True
)
if resp.status_code == 200:
logger.info(f"Found sitemap at: {url}")
return url
except Exception:
continue
logger.warning(f"No sitemap found for {base_url}")
return None
def parse_sitemap(sitemap_url, config=None):
"""
Parse a sitemap XML and return all page URLs.
Handles standard sitemaps (<urlset>) and sitemap indexes
(<sitemapindex>) with recursive sub-sitemap fetching.
"""
cfg = _get_crawler_config(config)
headers = {'User-Agent': cfg['user_agent']}
all_urls = []
def _fetch_and_parse(url, depth=0):
if depth > 3:
return
try:
resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout'])
resp.raise_for_status()
except Exception as e:
logger.error(f"Failed to fetch sitemap {url}: {e}")
return
try:
root = etree.fromstring(resp.content)
except etree.XMLSyntaxError as e:
logger.error(f"Invalid XML in sitemap {url}: {e}")
return
nsmap = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
# Check if this is a sitemap index
sitemap_locs = root.findall('.//ns:sitemap/ns:loc', nsmap)
if sitemap_locs:
logger.info(f"Sitemap index at {url}{len(sitemap_locs)} sub-sitemaps")
for loc in sitemap_locs:
if loc.text:
_fetch_and_parse(loc.text.strip(), depth + 1)
return
# Standard sitemap — extract URLs
url_locs = root.findall('.//ns:loc', nsmap)
# Fallback: try without namespace
if not url_locs:
url_locs = root.findall('.//loc')
for loc in url_locs:
if loc.text:
all_urls.append(loc.text.strip())
logger.info(f"Parsed {len(url_locs)} URLs from {url}")
_fetch_and_parse(sitemap_url)
# Deduplicate preserving order
seen = set()
unique = []
for url in all_urls:
url_clean = urldefrag(url)[0]
if url_clean not in seen:
seen.add(url_clean)
unique.append(url_clean)
logger.info(f"Total unique URLs from sitemap: {len(unique)}")
return unique
# ─── Link-Following Discovery (Fallback) ───────────────────────────
def crawl_links(base_url, max_depth=3, max_pages=500, config=None):
"""
Discover URLs by following internal links (BFS).
Fallback when no sitemap is available.
"""
from bs4 import BeautifulSoup
cfg = _get_crawler_config(config)
headers = {'User-Agent': cfg['user_agent']}
parsed_base = urlparse(base_url)
base_domain = parsed_base.netloc
discovered = []
visited = set()
queue = deque([(base_url, 0)])
skip_extensions = (
'.pdf', '.png', '.jpg', '.jpeg', '.gif', '.svg',
'.css', '.js', '.zip', '.tar', '.gz', '.mp4', '.mp3',
'.ico', '.woff', '.woff2', '.ttf', '.eot',
)
skip_paths = (
'/tag/', '/tags/', '/page/', '/feed/', '/rss/',
'/wp-json/', '/wp-admin/', '/wp-includes/',
)
while queue and len(discovered) < max_pages:
url, depth = queue.popleft()
url = urldefrag(url)[0]
if url in visited:
continue
if depth > max_depth:
continue
visited.add(url)
discovered.append(url)
if depth >= max_depth:
continue
try:
resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout'])
if resp.status_code != 200:
continue
if 'text/html' not in resp.headers.get('content-type', ''):
continue
except Exception:
continue
try:
soup = BeautifulSoup(resp.text, 'lxml')
except Exception:
continue
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
full_url = urljoin(url, href)
full_url = urldefrag(full_url)[0]
parsed = urlparse(full_url)
if parsed.netloc != base_domain:
continue
if any(parsed.path.lower().endswith(ext) for ext in skip_extensions):
continue
if any(skip in parsed.path.lower() for skip in skip_paths):
continue
if full_url not in visited:
queue.append((full_url, depth + 1))
time.sleep(cfg['rate_limit_delay'])
logger.info(f"Link crawl: {len(discovered)} URLs (visited {len(visited)}, depth {max_depth})")
return discovered
# ─── URL Filtering ──────────────────────────────────────────────────
def filter_urls(urls, include=None, exclude=None):
"""
Filter URLs by path prefix include/exclude rules.
include: URL must match at least one prefix (if provided)
exclude: URL must not match any prefix
"""
filtered = []
for url in urls:
path = urlparse(url).path
if include:
if not any(path.startswith(prefix) for prefix in include):
continue
if exclude:
if any(path.startswith(prefix) for prefix in exclude):
continue
filtered.append(url)
logger.info(f"Filtered {len(urls)} -> {len(filtered)} URLs "
f"(include={include}, exclude={exclude})")
return filtered
# ─── Main Crawl Orchestrator ────────────────────────────────────────
def crawl_site(
base_url,
category='Web',
source=None,
include=None,
exclude=None,
max_pages=None,
max_depth=None,
delay=None,
dry_run=False,
use_sitemap=True,
use_links=True,
config=None,
):
"""
Crawl a site and ingest all discovered pages.
1. Discover URLs via sitemap or link-following
2. Apply include/exclude filters
3. Feed each URL through web_scraper.ingest_url()
Returns summary dict with counts and per-URL results.
"""
if config is None:
config = get_config()
cfg = _get_crawler_config(config)
if max_pages is None:
max_pages = cfg['max_pages']
if max_depth is None:
max_depth = cfg['max_depth']
if delay is None:
delay = cfg['rate_limit_delay']
if source is None:
source = urlparse(base_url).netloc
logger.info(f"Crawling {base_url} (category={category}, max_pages={max_pages})")
# ── Phase 1: Discover URLs ──
urls = []
discovery_method = None
if use_sitemap:
sitemap_url = discover_sitemap_url(base_url, config)
if sitemap_url:
urls = parse_sitemap(sitemap_url, config)
discovery_method = 'sitemap'
if not urls and use_links:
logger.info("No sitemap URLs, falling back to link crawl...")
urls = crawl_links(base_url, max_depth=max_depth, max_pages=max_pages, config=config)
discovery_method = 'link_crawl'
if not urls:
logger.warning(f"No URLs discovered for {base_url}")
return {
'site': base_url,
'discovery_method': None,
'urls_discovered': 0,
'urls_after_filter': 0,
'results': [],
'summary': {'total': 0, 'succeeded': 0, 'duplicates': 0, 'failed': 0},
}
# ── Phase 2: Filter URLs ──
all_exclude = list(cfg['default_exclude'])
if exclude:
all_exclude.extend(exclude)
urls = filter_urls(urls, include=include, exclude=all_exclude)
if len(urls) > max_pages:
logger.info(f"Limiting to {max_pages} pages (discovered {len(urls)})")
urls = urls[:max_pages]
logger.info(f"After filtering: {len(urls)} URLs to process")
# ── Dry run ──
if dry_run:
return {
'site': base_url,
'discovery_method': discovery_method,
'dry_run': True,
'urls_discovered': len(urls),
'urls': urls,
}
# ── Phase 3: Ingest each URL ──
from .web_scraper import ingest_url
results = []
total = len(urls)
for i, url in enumerate(urls, 1):
logger.info(f"[{i}/{total}] Ingesting: {url}")
try:
result = ingest_url(url, category=category, source=source, config=config)
result['url'] = url
results.append(result)
status = result.get('status', 'unknown')
title = result.get('title', '')
if status == 'duplicate':
logger.info(f" DUPLICATE: {title}")
else:
logger.info(f" OK: {title} ({result.get('page_count', 0)} pages)")
except Exception as e:
logger.error(f" FAILED: {url} -- {e}")
results.append({
'url': url,
'status': 'failed',
'error': str(e),
})
if i < total and delay > 0:
time.sleep(delay)
# ── Summary ──
succeeded = sum(1 for r in results if r.get('status') not in ('failed', 'duplicate'))
duplicates = sum(1 for r in results if r.get('status') == 'duplicate')
failed = sum(1 for r in results if r.get('status') == 'failed')
summary = {
'total': len(results),
'succeeded': succeeded,
'duplicates': duplicates,
'failed': failed,
}
logger.info(f"Crawl complete: {succeeded} new, {duplicates} duplicates, {failed} failed out of {total}")
return {
'site': base_url,
'domain': urlparse(base_url).netloc,
'category': category,
'discovery_method': discovery_method,
'urls_discovered': total,
'results': results,
'summary': summary,
}

430
lib/embedder.py Normal file
View file

@ -0,0 +1,430 @@
"""
RECON Embedder
Concepts to vectors via TEI (primary, 1024-dim bge-m3, ~1,711 emb/sec)
or Ollama (fallback, ~8 emb/sec). Inserts into Qdrant on cortex:6333.
Supports hybrid dense+sparse vectors when sparse_embedding service is configured.
Dependencies: requests, qdrant-client
Config: embedding, vector_db, processing.embed_workers
"""
import json
import os
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests as http_requests
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, SparseVector
from .utils import get_config, concept_id, generate_download_url, setup_logging
from .status import StatusDB
logger = setup_logging('recon.embedder')
# ── Classification allowlists ───────────────────────────────────────────────
VALID_DOMAINS = {
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
DOMAIN_FALLBACK = 'Foundational Skills'
KNOWLEDGE_TYPE_FALLBACK = 'foundational'
COMPLEXITY_FALLBACK = 'basic'
def _validate_classification(payload):
"""Validate domain, knowledge_type, complexity before upsert.
Logs WARNING and applies safe fallback for any invalid values.
Returns the payload (modified in place if needed).
"""
title = payload.get('title', payload.get('filename', '?'))
# ── domain ──────────────────────────────────────────────────────────
domain = payload.get('domain')
if isinstance(domain, list):
valid = [d for d in domain if d in VALID_DOMAINS]
if valid:
payload['domain'] = valid[0]
else:
logger.warning(f"Invalid domain {domain} for '{title}', fallback → {DOMAIN_FALLBACK}")
payload['domain'] = DOMAIN_FALLBACK
elif isinstance(domain, str):
if domain not in VALID_DOMAINS:
logger.warning(f"Invalid domain '{domain}' for '{title}', fallback → {DOMAIN_FALLBACK}")
payload['domain'] = DOMAIN_FALLBACK
else:
payload['domain'] = DOMAIN_FALLBACK
# ── knowledge_type ──────────────────────────────────────────────────
kt = payload.get('knowledge_type', '')
if isinstance(kt, str):
kt = kt.lower().strip()
else:
kt = ''
if kt not in VALID_KNOWLEDGE_TYPES:
logger.warning(f"Invalid knowledge_type '{kt}' for '{title}', fallback → {KNOWLEDGE_TYPE_FALLBACK}")
payload['knowledge_type'] = KNOWLEDGE_TYPE_FALLBACK
else:
payload['knowledge_type'] = kt
# ── complexity ──────────────────────────────────────────────────────
cx = payload.get('complexity', '')
if isinstance(cx, str):
cx = cx.lower().strip()
else:
cx = ''
if cx not in VALID_COMPLEXITIES:
logger.warning(f"Invalid complexity '{cx}' for '{title}', fallback → {COMPLEXITY_FALLBACK}")
payload['complexity'] = COMPLEXITY_FALLBACK
else:
payload['complexity'] = cx
return payload
def get_embedding_single(text, config):
"""Get a single embedding — uses TEI or Ollama depending on config."""
backend = config['embedding'].get('backend', 'ollama')
if backend == 'tei':
url = f"http://{config['embedding']['tei_host']}:{config['embedding']['tei_port']}/embed"
resp = http_requests.post(url, json={"inputs": text}, timeout=120)
resp.raise_for_status()
return resp.json()[0]
else:
url = f"http://{config['embedding']['ollama_host']}:{config['embedding']['ollama_port']}/api/embed"
resp = http_requests.post(url, json={
"model": config['embedding']['model'],
"input": text
}, timeout=120)
resp.raise_for_status()
return resp.json()['embeddings'][0]
def get_embeddings_batch(texts, config):
"""Get embeddings for a batch of texts via TEI. Falls back to sequential on error."""
url = f"http://{config['embedding']['tei_host']}:{config['embedding']['tei_port']}/embed"
try:
resp = http_requests.post(url, json={"inputs": texts}, timeout=300)
resp.raise_for_status()
return resp.json()
except Exception as e:
if len(texts) <= 1:
raise
# Split batch in half and retry each half
mid = len(texts) // 2
logger.warning(f" Batch of {len(texts)} failed ({e}), splitting in half")
left = get_embeddings_batch(texts[:mid], config)
right = get_embeddings_batch(texts[mid:], config)
return left + right
def get_sparse_embeddings_batch(texts, config):
"""Get sparse embeddings from the sparse embedding service on cortex.
Returns a list of dicts with 'indices' and 'values' keys, or None on failure.
"""
sparse_cfg = config.get('sparse_embedding')
if not sparse_cfg or not sparse_cfg.get('enabled', False):
return None
url = f"http://{sparse_cfg['host']}:{sparse_cfg['port']}/embed_sparse"
try:
resp = http_requests.post(url, json={"inputs": texts}, timeout=300)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.warning(f" Sparse embedding failed for batch of {len(texts)}: {e}")
return None
def _validate_content(content):
"""Validate and normalize concept content for embedding. Returns clean string or None."""
if content is None:
return None
if not isinstance(content, str):
content = str(content)
content = content.strip()
if len(content) < 10:
return None
# Truncate to 8192 chars (Ollama/TEI input limit)
if len(content) > 8192:
content = content[:8192]
return content
def _build_payload(doc, concept, idx, source, download_url, source_type, page_timestamps):
"""Build and validate payload for a single concept point."""
start_page = concept.get('_start_page', 0)
payload = {
'doc_hash': doc.get('hash', ''),
'filename': doc['filename'],
'book_title': doc.get('book_title', ''),
'book_author': doc.get('book_author', ''),
'source': source,
'download_url': download_url,
'source_type': source_type,
'verification_status': 'unverified',
'credibility_score': 0.7,
'language': 'en',
}
for field in ['content', 'summary', 'title', 'domain', 'subdomain',
'keywords', 'knowledge_type', 'complexity',
'key_facts', 'scenario_applicable',
'cross_domain_tags', 'chapter', 'page_ref', 'notes',
'_window', '_start_page']:
if field in concept:
payload[field] = concept[field]
# Add video timestamp for transcript sources
if source_type == 'transcript' and page_timestamps:
page_key = f"page_{start_page:04d}"
if page_key in page_timestamps:
payload['video_timestamp'] = page_timestamps[page_key]
# Validate classification fields before returning
payload = _validate_classification(payload)
return payload
def _build_point(point_id, dense_vector, sparse_vec, payload, config):
"""Build a PointStruct with dense vector and optional sparse vector."""
sparse_cfg = config.get('sparse_embedding')
if sparse_cfg and sparse_cfg.get('enabled', False) and sparse_vec:
vector = {
"": dense_vector,
"bge-m3-sparse": SparseVector(
indices=sparse_vec['indices'],
values=sparse_vec['values'],
),
}
else:
vector = {"": dense_vector}
return PointStruct(id=point_id, vector=vector, payload=payload)
def embed_single(file_hash, db, config):
doc = db.get_document(file_hash)
if not doc:
return False
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
if not os.path.exists(concepts_dir):
db.mark_failed(file_hash, f"Concepts directory not found: {concepts_dir}")
return False
db.update_status(file_hash, 'embedding')
try:
qdrant = QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
collection = config['vector_db']['collection']
qdrant_batch_size = config['processing']['embed_batch_size']
embed_batch_size = config['embedding'].get('batch_size', 128)
backend = config['embedding'].get('backend', 'ollama')
window_files = sorted([
f for f in os.listdir(concepts_dir)
if f.startswith('window_') and f.endswith('.json')
])
if not window_files:
db.mark_failed(file_hash, "No window files found")
return False
all_concepts = []
for wf in window_files:
with open(os.path.join(concepts_dir, wf), encoding='utf-8') as f:
concepts = json.load(f)
if isinstance(concepts, list):
all_concepts.extend([c for c in concepts if isinstance(c, dict)])
if not all_concepts:
db.update_status(file_hash, 'complete', vectors_inserted=0)
logger.info(f"No concepts to embed for {doc['filename']}")
return True
# Look up source from catalogue once per doc
cat_conn = db._get_conn()
cat_row = cat_conn.execute(
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
source = dict(cat_row)['source'] if cat_row else ''
download_url = ''
is_web = doc.get('path', '').startswith(('http://', 'https://'))
source_type = 'web' if is_web else 'document'
# Check meta.json for explicit source_type (e.g. 'transcript')
text_dir = os.path.join(config['paths']['text'], file_hash)
meta_path = os.path.join(text_dir, 'meta.json')
page_timestamps = {}
if os.path.exists(meta_path):
try:
with open(meta_path) as mf:
meta = json.load(mf)
if meta.get('source_type'):
source_type = meta['source_type']
if not download_url and meta.get('url'):
download_url = meta['url']
if meta.get('page_timestamps'):
page_timestamps = meta['page_timestamps']
except Exception:
pass
if doc.get('path'):
download_url = generate_download_url(
doc['path'], config.get('library_root', '/mnt/library')
)
# Build list of valid concepts with their indices
valid = []
skipped = 0
for idx, concept in enumerate(all_concepts):
content = _validate_content(concept.get('content', ''))
if content is None:
skipped += 1
continue
valid.append((idx, concept, content))
if skipped > 0:
logger.info(f" Skipped {skipped} concepts with invalid/empty content")
if not valid:
db.update_status(file_hash, 'complete', vectors_inserted=0)
logger.info(f"No valid concepts to embed for {doc['filename']}")
return True
points = []
embedded_count = 0
if backend == 'tei':
# TEI: batch embedding
for batch_start in range(0, len(valid), embed_batch_size):
batch = valid[batch_start:batch_start + embed_batch_size]
texts = [content for _, _, content in batch]
try:
vectors = get_embeddings_batch(texts, config)
except Exception as e:
logger.error(f" Batch embedding failed at offset {batch_start}: {e}")
# Skip entire batch on unrecoverable error
continue
# Get sparse embeddings for the same batch
sparse_results = get_sparse_embeddings_batch(texts, config)
for i, ((idx, concept, content), vector) in enumerate(zip(batch, vectors)):
start_page = concept.get('_start_page', 0)
point_id = concept_id(file_hash, start_page, idx)
payload = _build_payload(
doc, concept, idx, source, download_url,
source_type, page_timestamps
)
sparse_vec = sparse_results[i] if sparse_results and i < len(sparse_results) else None
points.append(_build_point(point_id, vector, sparse_vec, payload, config))
embedded_count += 1
if len(points) >= qdrant_batch_size:
qdrant.upsert(collection_name=collection, points=points)
logger.debug(f" Upserted batch of {len(points)} points")
points = []
else:
# Ollama: one-at-a-time with retry
for idx, concept, content in valid:
try:
vector = get_embedding_single(content, config)
except Exception as e:
logger.warning(f" Embedding failed for concept {idx}: {e}")
time.sleep(2)
try:
vector = get_embedding_single(content, config)
except Exception as e2:
logger.error(f" Embedding retry failed for concept {idx}: {e2}")
continue
# Get sparse embedding for single text
sparse_results = get_sparse_embeddings_batch([content], config)
sparse_vec = sparse_results[0] if sparse_results else None
start_page = concept.get('_start_page', 0)
point_id = concept_id(file_hash, start_page, idx)
payload = _build_payload(
doc, concept, idx, source, download_url,
source_type, page_timestamps
)
points.append(_build_point(point_id, vector, sparse_vec, payload, config))
embedded_count += 1
if len(points) >= qdrant_batch_size:
qdrant.upsert(collection_name=collection, points=points)
logger.debug(f" Upserted batch of {len(points)} points")
points = []
if points:
qdrant.upsert(collection_name=collection, points=points)
logger.debug(f" Upserted final batch of {len(points)} points")
db.update_status(file_hash, 'complete', vectors_inserted=embedded_count)
logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)")
return True
except Exception as e:
logger.error(f"Embedding failed for {file_hash}: {e}\n{traceback.format_exc()}")
db.mark_failed(file_hash, str(e))
return False
def run_embedding(workers=None, limit=None):
config = get_config()
db = StatusDB()
workers = workers or config['processing']['embed_workers']
enriched = db.get_by_status('enriched', limit=limit)
if not enriched:
logger.info("No enriched documents to embed")
return 0
backend = config['embedding'].get('backend', 'ollama')
sparse_cfg = config.get('sparse_embedding')
sparse_status = "enabled" if (sparse_cfg and sparse_cfg.get('enabled')) else "disabled"
logger.info(f"Embedding {len(enriched)} documents with {workers} workers (backend: {backend}, sparse: {sparse_status})")
success = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(embed_single, doc['hash'], StatusDB(), config): doc
for doc in enriched
}
for future in as_completed(futures):
doc = futures[future]
try:
if future.result():
success += 1
except Exception as e:
logger.error(f"Worker error for {doc['hash']}: {e}")
logger.info(f"Embedding complete: {success}/{len(enriched)} succeeded")
return success

561
lib/enricher.py Normal file
View file

@ -0,0 +1,561 @@
"""
RECON Enricher
Text to structured concepts via Gemini API. Saves JSON to data/concepts/{hash}/
BEFORE any DB operations. Uses 10-page windows, 4 API keys, 16 workers.
Resilience:
- Exponential backoff with jitter for transient errors (429, 500, 503, timeout)
- Permanent errors (JSON parse, auth) fail immediately without wasting retries
- Window failures skip that window and continue partial enrichment beats zero
- Document marked enriched if ANY windows succeeded, failed only if ALL failed
Dependencies: google-generativeai
Config: processing.enrich_workers, processing.enrich_window_size, gemini, paths.concepts
"""
import json
import os
import random
import re
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
import google.generativeai as genai
from .utils import get_config, setup_logging
from .status import StatusDB
logger = setup_logging('recon.enricher')
# Docs stuck in "enriching" longer than this get reset to "extracted" for retry
STALE_ENRICHING_HOURS = 2
# ── Classification allowlists ───────────────────────────────────────────────
VALID_DOMAINS = {
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
DOMAIN_FALLBACK = 'Foundational Skills'
KNOWLEDGE_TYPE_FALLBACK = 'foundational'
COMPLEXITY_FALLBACK = 'basic'
def repair_json(text):
"""Attempt to repair common LLM JSON output issues including truncation."""
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
# Fix invalid JSON escape sequences (e.g. \e, \p, \c from Gemini)
# Valid JSON escapes: \", \\, \/, \b, \f, \n, \r, \t, \uXXXX
text = re.sub(r'\\(?!["\\/bfnrtu])', r'\\\\', text)
# Remove trailing commas before } or ]
text = re.sub(r',\s*([}\]])', r'\1', text)
# Handle truncated JSON: try to find the last complete object in the array
try:
json.loads(text, strict=False)
return text
except json.JSONDecodeError:
pass
# Find the last complete }, then close the array
# Walk backward to find the last valid closing brace
last_complete = -1
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for i, ch in enumerate(text):
if escape:
escape = False
continue
if ch == '\\' and in_string:
escape = True
continue
if ch == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if ch == '{':
depth_brace += 1
elif ch == '}':
depth_brace -= 1
if depth_brace == 0:
last_complete = i
elif ch == '[':
depth_bracket += 1
elif ch == ']':
depth_bracket -= 1
if last_complete > 0:
truncated = text[:last_complete + 1].rstrip().rstrip(',')
# Close any open arrays
open_brackets = truncated.count('[') - truncated.count(']')
truncated += ']' * open_brackets
return truncated
return text
ENRICH_PROMPT = """Extract knowledge concepts from this document text.
A concept is a SELF-CONTAINED piece of knowledge that can stand alone.
For each concept, provide ALL fields:
Required:
- content: Full text of the concept (complete procedure, definition, etc.)
- summary: 1-2 sentence summary
- title: Brief descriptive title
- domain: must be exactly one of: Agriculture & Livestock, Civil Organization, Communications, Food Systems, Foundational Skills, Logistics, Medical, Navigation, Operations, Power Systems, Preservation & Storage, Security, Shelter & Construction, Technology, Tools & Equipment, Vehicles, Water Systems, Wilderness Skills return ONLY this exact string, no variations, no new domains, no underscores, no synonyms
CRITICAL: Medical content (first aid, anatomy, pharmacology, herbs, veterinary, austere medicine) Medical
CRITICAL: Food growing, farming, animal husbandry, livestock Agriculture & Livestock
CRITICAL: Foraging, hunting, fishing, bushcraft, wilderness survival Wilderness Skills
CRITICAL: Food preservation, storage, canning, dehydration, processing Preservation & Storage
CRITICAL: Solar, wind, hydro, batteries, generators Power Systems
CRITICAL: Water sourcing, filtration, sanitation, purification Water Systems
CRITICAL: Building, carpentry, structural construction, shelter Shelter & Construction
CRITICAL: Tactical operations, mission execution, combat maneuvers, search & rescue Operations
CRITICAL: Governance, civil administration, community leadership Civil Organization
CRITICAL: Electronics, IT, computing, engineering Technology
CRITICAL: Hand tools, power tools, equipment maintenance Tools & Equipment
CRITICAL: Motor vehicles, aircraft, watercraft, vehicle maintenance Vehicles
CRITICAL: Radio, signals, networking, comms equipment Communications
CRITICAL: Supply chain, transport, distribution, inventory Logistics
CRITICAL: Physical security, OPSEC, threat assessment Security
CRITICAL: Map reading, orienteering, GPS, celestial navigation Navigation
CRITICAL: Cooking methods, food production, recipes, nutrition Food Systems
- subdomain: Array of specific subcategories (up to 10)
- keywords: Array of 3-30 searchable terms
- knowledge_type: foundational | procedural | operational
foundational concepts, definitions, theory, background knowledge, explanations of how things work
procedural step-by-step techniques, instructions, how-to skills, methods you execute
operational application under real conditions, decision-making, mission execution, judgment calls in context
Valid values are ONLY: foundational, procedural, operational do not use any other values
- complexity: basic | intermediate | advanced
basic requires little or no prior knowledge, introductory material, simple concepts
intermediate requires some domain familiarity, assumes foundational knowledge is in place
advanced requires significant experience or expertise, high-stakes or highly technical material
Valid values are ONLY: basic, intermediate, advanced do not use any other values
- key_facts: Array of specific extractable claims, measurements, data points
Optional (include when present):
- scenario_applicable: Array from: tuesday_prepper, month_prepper, year_prepper, multi_year, eotwawki
- cross_domain_tags: Array from: sustainment, medical, security, communications, leadership, logistics, navigation, power_systems, water_systems, food_systems, tactical_ops, community_coordination
- chapter: Chapter name if identifiable
- page_ref: Page reference
- notes: Any additional context
EXAMPLES (knowledge_type + complexity):
- "Needle chest decompression procedure" knowledge_type: "procedural", complexity: "advanced"
- "What is soil texture and why does it matter" knowledge_type: "foundational", complexity: "basic"
- "Coordinating a fire team withdrawal under contact" knowledge_type: "operational", complexity: "advanced"
Return JSON array. If no extractable concepts, return [].
Document text:
"""
class KeyRotator:
def __init__(self, keys):
self.keys = keys
self.index = 0
def next(self):
if not self.keys:
raise ValueError("No Gemini API keys configured")
key = self.keys[self.index % len(self.keys)]
self.index += 1
return key
def enrich_window(text, key, config):
genai.configure(api_key=key)
model = genai.GenerativeModel(
config['gemini']['model'],
generation_config={"response_mime_type": config['gemini']['response_mime_type']}
)
response = model.generate_content(ENRICH_PROMPT + text)
raw = response.text
try:
result = json.loads(raw, strict=False)
except json.JSONDecodeError:
repaired = repair_json(raw)
result = json.loads(repaired, strict=False)
# Filter out non-dict items (nested lists from truncated responses)
if isinstance(result, list):
result = [c for c in result if isinstance(c, dict)]
return result
def _is_transient(error_str):
"""Classify whether an error is transient (worth retrying) or permanent."""
s = error_str.lower()
transient_signals = ['429', 'resource_exhausted', 'quota', 'rate',
'500', '503', 'unavailable', 'timeout',
'connection', 'reset by peer', 'broken pipe']
return any(sig in s for sig in transient_signals)
def _retry_with_backoff(fn, max_retries=5, base_delay=5.0, max_delay=120.0):
"""Retry with exponential backoff + jitter for transient errors.
Backoff: ~5s, ~10s, ~20s, ~40s, ~80s (total ~155s before giving up).
Permanent errors (JSON parse, auth) raise immediately without retrying.
"""
last_exc = None
for attempt in range(max_retries):
try:
return fn()
except Exception as e:
last_exc = e
err = str(e)
if not _is_transient(err):
raise # permanent — don't waste retries
if attempt < max_retries - 1:
delay = min(base_delay * (2 ** attempt) + random.uniform(0, base_delay), max_delay)
logger.info(f" Transient error (attempt {attempt+1}/{max_retries}), "
f"retrying in {delay:.0f}s: {err[:120]}")
time.sleep(delay)
else:
logger.warning(f" Transient error, max retries exhausted: {err[:150]}")
raise last_exc
def _reclassify_field(field_name, allowlist, concept, key, config, max_retries=3):
"""Retry Gemini up to max_retries to get a valid value for a specific field."""
content = concept.get('content', concept.get('summary', ''))
if isinstance(content, str):
content = content[:400]
else:
content = str(content)[:400]
title = concept.get('title', '(untitled)')
allowlist_str = ', '.join(sorted(allowlist))
for attempt in range(max_retries):
try:
prompt = (
f"Your previous response for '{field_name}' was invalid. "
f"You must return ONLY one of these exact strings: {allowlist_str}\n\n"
f"Title: {title}\n"
f"Content: {content}\n\n"
f"Return ONLY the exact string, nothing else. No explanation, no punctuation, no quotes."
)
genai.configure(api_key=key)
model = genai.GenerativeModel(
config['gemini']['model'],
generation_config={"response_mime_type": "text/plain"}
)
resp = model.generate_content(prompt)
value = resp.text.strip().strip('"').strip("'").strip()
if value in allowlist:
return value
# Try case-insensitive match for knowledge_type/complexity
for valid in allowlist:
if value.lower() == valid.lower():
return valid
except Exception as e:
err = str(e).lower()
if any(s in err for s in ['429', 'quota', 'rate', '503']):
time.sleep(min(3 * (2 ** attempt) + random.uniform(0, 2), 30))
else:
logger.warning(f" Reclassify retry {attempt+1} for {field_name} failed: {e}")
return None
def validate_and_fix_concepts(concepts, key, config):
"""Validate domain, knowledge_type, complexity on each concept.
For invalid values: retry Gemini up to 3 times, then apply safe fallback.
"""
for concept in concepts:
if not isinstance(concept, dict):
continue
# ── Validate domain ─────────────────────────────────────────────
domain = concept.get('domain')
if isinstance(domain, list):
# Legacy array format — find first valid or reclassify
valid = [d for d in domain if d in VALID_DOMAINS]
if valid:
concept['domain'] = valid[0]
else:
new_val = _reclassify_field('domain', VALID_DOMAINS, concept, key, config)
if new_val:
concept['domain'] = new_val
else:
logger.warning(f"Invalid domain {domain} for '{concept.get('title', '?')}', using fallback")
concept['domain'] = DOMAIN_FALLBACK
elif isinstance(domain, str):
if domain not in VALID_DOMAINS:
new_val = _reclassify_field('domain', VALID_DOMAINS, concept, key, config)
if new_val:
concept['domain'] = new_val
else:
logger.warning(f"Invalid domain '{domain}' for '{concept.get('title', '?')}', using fallback")
concept['domain'] = DOMAIN_FALLBACK
else:
concept['domain'] = DOMAIN_FALLBACK
# ── Validate knowledge_type ─────────────────────────────────────
kt = concept.get('knowledge_type', '')
if isinstance(kt, str):
kt = kt.lower().strip()
else:
kt = ''
if kt not in VALID_KNOWLEDGE_TYPES:
new_val = _reclassify_field('knowledge_type', VALID_KNOWLEDGE_TYPES, concept, key, config)
if new_val:
concept['knowledge_type'] = new_val
else:
logger.warning(f"Invalid knowledge_type '{kt}' for '{concept.get('title', '?')}', using fallback")
concept['knowledge_type'] = KNOWLEDGE_TYPE_FALLBACK
else:
concept['knowledge_type'] = kt
# ── Validate complexity ─────────────────────────────────────────
cx = concept.get('complexity', '')
if isinstance(cx, str):
cx = cx.lower().strip()
else:
cx = ''
if cx not in VALID_COMPLEXITIES:
new_val = _reclassify_field('complexity', VALID_COMPLEXITIES, concept, key, config)
if new_val:
concept['complexity'] = new_val
else:
logger.warning(f"Invalid complexity '{cx}' for '{concept.get('title', '?')}', using fallback")
concept['complexity'] = COMPLEXITY_FALLBACK
else:
concept['complexity'] = cx
return concepts
def enrich_single(file_hash, db, config, key_rotator):
doc = db.get_document(file_hash)
if not doc:
return False
text_dir = os.path.join(config['paths']['text'], file_hash)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
window_size = config['processing']['enrich_window_size']
delay = config['processing']['rate_limit_delay']
proc = config.get('processing', {})
max_retries = proc.get('enrich_max_retries', proc.get('max_retries', 5))
base_delay = proc.get('enrich_base_delay', 5.0)
max_delay = proc.get('enrich_max_delay', 120.0)
if not os.path.exists(text_dir):
db.mark_failed(file_hash, f"Text directory not found: {text_dir}")
return False
db.update_status(file_hash, 'enriching')
try:
os.makedirs(concepts_dir, exist_ok=True)
page_files = sorted([f for f in os.listdir(text_dir) if f.startswith('page_') and f.endswith('.txt')])
if not page_files:
db.mark_failed(file_hash, "No page files found")
return False
pages_text = []
for pf in page_files:
with open(os.path.join(text_dir, pf), encoding='utf-8') as f:
pages_text.append(f.read())
windows = []
for i in range(0, len(pages_text), window_size):
window_pages = pages_text[i:i + window_size]
combined = "\n\n".join(f"--- Page {i + j + 1} ---\n{t}" for j, t in enumerate(window_pages))
windows.append((i, combined))
total_concepts = 0
failed_windows = []
for w_idx, (start_page, window_text) in enumerate(windows):
window_file = os.path.join(concepts_dir, f"window_{w_idx+1:04d}.json")
if os.path.exists(window_file):
with open(window_file, encoding='utf-8') as f:
existing = json.load(f)
total_concepts += len(existing)
logger.debug(f" Window {w_idx+1} already exists, skipping")
continue
if len(window_text.strip()) < 50:
with open(window_file, 'w') as f:
json.dump([], f)
continue
# Attempt enrichment with backoff — failures skip the window, not the doc
try:
key = key_rotator.next()
concepts = _retry_with_backoff(
lambda k=key: enrich_window(window_text, k, config),
max_retries=max_retries,
base_delay=base_delay,
max_delay=max_delay,
)
except Exception as e:
failed_windows.append((w_idx + 1, str(e)[:100]))
logger.warning(f" Window {w_idx+1}/{len(windows)} failed: {e}")
continue # skip this window, keep going
if not isinstance(concepts, list):
concepts = [concepts] if isinstance(concepts, dict) else []
concepts = [c for c in concepts if isinstance(c, dict)]
# Validate domain, knowledge_type, complexity — retry then fallback
validation_key = key_rotator.next()
concepts = validate_and_fix_concepts(concepts, validation_key, config)
for c_idx, concept in enumerate(concepts):
concept['_window'] = w_idx + 1
concept['_start_page'] = start_page + 1
concept['_doc_hash'] = file_hash
# JSON FIRST: save before anything else
with open(window_file, 'w', encoding='utf-8') as f:
json.dump(concepts, f, indent=2, ensure_ascii=False)
total_concepts += len(concepts)
logger.debug(f" Window {w_idx+1}/{len(windows)}: {len(concepts)} concepts")
time.sleep(delay)
# Decide document status based on results
meta = {
'hash': file_hash,
'total_windows': len(windows),
'total_concepts': total_concepts,
'failed_windows': len(failed_windows),
'window_size': window_size,
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
}
with open(os.path.join(concepts_dir, 'meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
if total_concepts > 0 or not failed_windows:
# Some concepts extracted, or all windows were empty — mark enriched
error_msg = None
if total_concepts == 0 and doc.get('page_count', 0) >= 3:
error_msg = (f"0 concepts from {doc.get('page_count', '?')} pages — "
f"likely image-only PDF, may need manual review")
logger.warning(f" {doc['filename']}: {error_msg}")
elif failed_windows:
wins = ', '.join(str(w) for w, _ in failed_windows[:10])
error_msg = (f"Partial: {len(failed_windows)}/{len(windows)} "
f"windows failed (windows {wins})")
logger.warning(f" {doc['filename']}: {error_msg}")
db.update_status(file_hash, 'enriched', concepts_extracted=total_concepts,
error_message=error_msg)
fw_note = f", {len(failed_windows)} windows failed" if failed_windows else ""
logger.info(f"Enriched {doc['filename']}: {total_concepts} concepts "
f"from {len(windows)} windows{fw_note}")
return True
else:
# Every window failed — document truly failed
first_err = failed_windows[0][1] if failed_windows else 'unknown'
db.mark_failed(file_hash,
f"All {len(windows)} windows failed: {first_err}")
logger.error(f" {doc['filename']}: all {len(windows)} windows failed")
return False
except Exception as e:
logger.error(f"Enrichment failed for {file_hash}: {e}\n{traceback.format_exc()}")
db.mark_failed(file_hash, str(e))
return False
def _recover_stale_enriching(db, max_hours=STALE_ENRICHING_HOURS):
"""Reset docs stuck in enriching back to extracted so they get retried.
This handles the case where a previous enrichment run crashed mid-document.
The enricher skips already-completed window files, so no work is lost.
"""
import sqlite3
conn = db._get_conn()
rows = conn.execute(
"SELECT hash, filename FROM documents WHERE status = 'enriching'",
).fetchall()
if not rows:
return
# Check extracted_at timestamp — if enriching started > max_hours ago, reset
now = __import__('datetime').datetime.now(__import__('datetime').timezone.utc)
reset = []
for row in rows:
doc = db.get_document(row['hash'])
extracted_at = doc.get('extracted_at', '')
if not extracted_at:
reset.append(row)
continue
try:
from datetime import datetime, timezone
ts = datetime.fromisoformat(extracted_at)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_hours = (now - ts).total_seconds() / 3600
if age_hours > max_hours:
reset.append(row)
except Exception:
reset.append(row)
for row in reset:
conn.execute(
"UPDATE documents SET status = 'extracted' WHERE hash = ?",
(row['hash'],)
)
logger.warning(f"Recovered stale enriching doc: {row['filename']} ({row['hash'][:12]}...)")
if reset:
conn.commit()
logger.info(f"Reset {len(reset)} stale enriching docs back to extracted")
def run_enrichment(workers=None, limit=None):
config = get_config()
db = StatusDB()
workers = workers or config['processing']['enrich_workers']
# Recover docs orphaned by previous crashed enrichment runs
_recover_stale_enriching(db)
keys = config.get('gemini_keys', [])
if not keys:
logger.error("No Gemini API keys configured in .env")
return 0
key_rotator = KeyRotator(keys)
extracted = db.get_by_status('extracted', limit=limit)
if not extracted:
logger.info("No extracted documents to enrich")
return 0
logger.info(f"Enriching {len(extracted)} documents with {workers} workers, {len(keys)} API key(s)")
success = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(enrich_single, doc['hash'], StatusDB(), config, key_rotator): doc
for doc in extracted
}
for future in as_completed(futures):
doc = futures[future]
try:
if future.result():
success += 1
except Exception as e:
logger.error(f"Worker error for {doc['hash']}: {e}")
logger.info(f"Enrichment complete: {success}/{len(extracted)} succeeded")
return success

601
lib/extractor.py Normal file
View file

@ -0,0 +1,601 @@
"""
RECON Text Extractor
PDF to text via PyPDF2 -> pdftotext -> Tesseract -> Gemini Vision fallback chain.
Saves to data/text/{hash}/page_NNNN.txt (4-digit zero-padded, 1-indexed).
Safety guards:
- Layer 1: Pre-flight size check (max_pdf_size_mb, default 200)
- Layer 2: Per-document timeout (extract_timeout, default 300s)
- Layer 3: Per-page timeout (page_timeout, default 30s)
- Partial extractions saved as 'extracted' with error_message noting incompleteness
Fallback chain per page:
1. PyPDF2 (fast, free, text-based PDFs)
2. pdftotext/poppler (handles some PDFs PyPDF2 misses)
3. Tesseract OCR (renders page local OCR)
4. Gemini Vision (renders page cloud vision API, last resort for scanned docs)
Dependencies: PyPDF2, pdftotext (poppler-utils), pytesseract, google-generativeai
Config: processing.extract_workers, processing.max_pdf_size_mb,
processing.extract_timeout, processing.page_timeout
"""
import base64
import json
import os
import random
import subprocess
import tempfile
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError as FuturesTimeoutError
from pathlib import Path
import google.generativeai as genai
from PyPDF2 import PdfReader
from .utils import get_config, content_hash, clean_filename_to_title, setup_logging
from .status import StatusDB
logger = setup_logging('recon.extractor')
# ── Gemini Vision singleton (lazy, thread-safe) ──
_vision_keys = None
_vision_key_index = 0
_vision_lock = threading.Lock()
def _get_vision_keys():
"""Load Gemini API keys once from .env (same keys the enricher uses)."""
global _vision_keys
if _vision_keys is not None:
return _vision_keys
with _vision_lock:
if _vision_keys is not None:
return _vision_keys
keys = []
env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
key_name, val = line.split('=', 1)
val = val.strip().strip('"').strip("'")
if key_name.strip().startswith('GEMINI_KEY_') and val != 'PASTE_KEY_HERE':
keys.append(val)
_vision_keys = keys
if keys:
logger.info(f"Gemini vision OCR: {len(keys)} API key(s) available")
else:
logger.warning("No Gemini API keys found — vision OCR fallback disabled")
return keys
def _next_vision_key():
"""Round-robin through available Gemini keys."""
global _vision_key_index
keys = _get_vision_keys()
if not keys:
return None
with _vision_lock:
key = keys[_vision_key_index % len(keys)]
_vision_key_index += 1
return key
def _is_transient(error_str):
"""Classify whether an error is transient (worth retrying)."""
s = error_str.lower()
transient_signals = ['429', 'resource_exhausted', 'quota', 'rate',
'500', '503', 'unavailable', 'timeout',
'connection', 'reset by peer', 'broken pipe']
return any(sig in s for sig in transient_signals)
def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30):
"""Render a single PDF page to PNG bytes using pdftoppm.
Args:
pdf_path: Path to PDF file
page_num_1indexed: 1-indexed page number
dpi: Resolution (200 = readable text, reasonable file size)
timeout: Subprocess timeout in seconds
Returns:
bytes or None: PNG image data, or None if render fails/blank
"""
with tempfile.TemporaryDirectory() as tmpdir:
prefix = os.path.join(tmpdir, 'page')
try:
subprocess.run(
['pdftoppm', '-f', str(page_num_1indexed), '-l', str(page_num_1indexed),
'-png', '-r', str(dpi), pdf_path, prefix],
capture_output=True, timeout=timeout, check=True
)
png_files = list(Path(tmpdir).glob('*.png'))
if not png_files:
return None
img_data = png_files[0].read_bytes()
# Skip blank pages (tiny image = solid white/blank page)
if len(img_data) < 5000:
return None
return img_data
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, OSError):
return None
def _try_gemini_vision(pdf_path, page_num_1indexed, page_timeout=60):
"""Last-resort OCR: render page to image, send to Gemini vision.
Only called when PyPDF2, pdftotext, AND Tesseract all failed.
Args:
pdf_path: Path to PDF file
page_num_1indexed: 1-indexed page number
page_timeout: Max time for the render + API call
Returns:
str: Extracted text, or empty string if vision fails
"""
api_key = _next_vision_key()
if api_key is None:
return ''
# Render page to PNG
img_data = _render_page_to_png(pdf_path, page_num_1indexed, timeout=min(page_timeout, 30))
if img_data is None:
return ''
# Call Gemini vision with retry for transient errors
last_exc = None
for attempt in range(3):
try:
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-2.0-flash')
response = model.generate_content([
{
'mime_type': 'image/png',
'data': base64.b64encode(img_data).decode('utf-8')
},
"Extract ALL text from this scanned document page exactly as written. "
"Preserve headings, lists, numbered items, tables, and paragraph structure. "
"Return ONLY the extracted text, no commentary or markdown formatting."
])
if response and response.text:
text = response.text.strip()
if len(text) > 10:
return text
return ''
except Exception as e:
last_exc = e
if not _is_transient(str(e)):
break # permanent error — don't retry
if attempt < 2:
delay = 5.0 * (2 ** attempt) + random.uniform(0, 3)
time.sleep(delay)
# Rotate to next key on rate limit
api_key = _next_vision_key() or api_key
if last_exc:
logger.debug(f" Vision OCR failed page {page_num_1indexed}: {last_exc}")
return ''
def _get_page_count(pdf_path):
"""Get page count using pdfinfo (poppler) as fallback when PdfReader fails."""
try:
result = subprocess.run(
['pdfinfo', pdf_path],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
for line in result.stdout.splitlines():
if line.startswith('Pages:'):
return int(line.split(':', 1)[1].strip())
except Exception:
pass
return 0
def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
"""Extract text from a single page WITHOUT PyPDF2 reader.
Used when PdfReader() fails entirely (corrupt/encrypted PDFs).
Runs the pdftotext -> Tesseract -> Gemini Vision fallback chain.
Returns:
tuple: (text, ocr_method)
"""
text = ''
# Method 1: pdftotext (poppler)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num_0indexed + 1),
'-l', str(page_num_0indexed + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
if result.returncode == 0:
text = result.stdout
except Exception:
pass
if len(text.strip()) >= 50:
return text, 'pdftotext'
# Method 2: pdftoppm + Tesseract OCR
try:
from PIL import Image
import pytesseract
result = subprocess.run(
['pdftoppm', '-f', str(page_num_0indexed + 1),
'-l', str(page_num_0indexed + 1),
'-png', '-singlefile', pdf_path, '-'],
capture_output=True, timeout=page_timeout * 2
)
if result.returncode == 0 and result.stdout:
with tempfile.NamedTemporaryFile(suffix='.png', delete=True) as tmp:
tmp.write(result.stdout)
tmp.flush()
img = Image.open(tmp.name)
ocr_text = pytesseract.image_to_string(img)
if len(ocr_text.strip()) > len(text.strip()):
text = ocr_text
except Exception:
pass
if len(text.strip()) >= 50:
return text, 'tesseract'
# Method 3: Gemini Vision (last resort)
vision_text = _try_gemini_vision(pdf_path, page_num_0indexed + 1,
page_timeout=page_timeout * 2)
if len(vision_text.strip()) > len(text.strip()):
text = vision_text
if len(text.strip()) >= 10:
return text, 'gemini_vision'
return text, 'none'
# ── Core extraction functions ──
def _pypdf2_extract(reader, page_num):
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout."""
return reader.pages[page_num].extract_text() or ''
def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
"""Extract text from a single page with fallback chain.
Returns:
tuple: (text, ocr_method) where ocr_method is one of:
'pypdf2', 'pdftotext', 'tesseract', 'gemini_vision', 'none'
"""
# Method 1: PyPDF2 (wrapped in thread for timeout — extract_text() can hang)
text = ''
try:
ex = ThreadPoolExecutor(1)
future = ex.submit(_pypdf2_extract, reader, page_num)
try:
text = future.result(timeout=page_timeout)
except FuturesTimeoutError:
logger.warning(f" PyPDF2 timeout on page {page_num + 1}")
text = ''
finally:
ex.shutdown(wait=False, cancel_futures=True)
except Exception:
text = ''
if len(text.strip()) >= 50:
return text, 'pypdf2'
# Method 2: pdftotext via subprocess (inherently timeout-safe)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()):
text = result.stdout
except Exception:
pass
if len(text.strip()) >= 50:
return text, 'pdftotext'
# Method 3: pdftoppm + Tesseract OCR
try:
from PIL import Image
import pytesseract
result = subprocess.run(
['pdftoppm', '-f', str(page_num + 1), '-l', str(page_num + 1),
'-png', '-singlefile', pdf_path, '-'],
capture_output=True, timeout=page_timeout * 2
)
if result.returncode == 0 and result.stdout:
with tempfile.NamedTemporaryFile(suffix='.png', delete=True) as tmp:
tmp.write(result.stdout)
tmp.flush()
img = Image.open(tmp.name)
ocr_text = pytesseract.image_to_string(img)
if len(ocr_text.strip()) > len(text.strip()):
text = ocr_text
except Exception:
pass
if len(text.strip()) >= 50:
return text, 'tesseract'
# Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs)
vision_text = _try_gemini_vision(pdf_path, page_num + 1, page_timeout=page_timeout * 2)
if len(vision_text.strip()) > len(text.strip()):
text = vision_text
if len(text.strip()) >= 10:
return text, 'gemini_vision'
return text, 'none'
def extract_book_metadata(first_page_text, config):
keys = config.get('gemini_keys', [])
if not keys or len(first_page_text.strip()) < 20:
return None, None
try:
genai.configure(api_key=keys[0])
model = genai.GenerativeModel(
config['gemini']['model'],
generation_config={"response_mime_type": config['gemini']['response_mime_type']}
)
prompt = f"""Extract the book title and author from this first page text.
Return JSON: {{"title": "...", "author": "..."}}
If unknown, use null for that field.
Text:
{first_page_text[:3000]}"""
response = model.generate_content(prompt)
data = json.loads(response.text)
return data.get('title'), data.get('author')
except Exception as e:
logger.warning(f"Metadata extraction failed: {e}")
return None, None
def extract_single(file_hash, db, config):
doc = db.get_document(file_hash)
if not doc:
return False
pdf_path = doc['path']
filename = doc['filename']
text_dir = os.path.join(config['paths']['text'], file_hash)
if not os.path.exists(pdf_path):
db.mark_failed(file_hash, f"File not found: {pdf_path}")
return False
# Layer 1: Pre-flight size check
proc = config.get('processing', {})
max_size_mb = proc.get('max_pdf_size_mb', 200)
try:
file_size_mb = os.path.getsize(pdf_path) / 1048576
except OSError as e:
db.mark_failed(file_hash, f"Cannot stat file: {e}")
return False
if file_size_mb > max_size_mb:
msg = f"Skipped: {file_size_mb:.0f}MB exceeds {max_size_mb}MB limit"
logger.warning(f"SIZE SKIP: {filename}{msg}")
db.mark_failed(file_hash, msg)
return False
db.update_status(file_hash, 'extracting')
# Layer 2/3 setup
max_doc_seconds = proc.get('extract_timeout', 300)
page_timeout = proc.get('page_timeout', 30)
start_time = time.time()
page_count = 0
pages_extracted = 0
skipped_pages = 0
ocr_pages = []
ocr_methods = {'pypdf2': 0, 'pdftotext': 0, 'tesseract': 0, 'gemini_vision': 0, 'none': 0}
try:
os.makedirs(text_dir, exist_ok=True)
# Try PyPDF2 first; fall back to poppler-only extraction if it fails
reader = None
use_reader = True
try:
reader = PdfReader(pdf_path)
page_count = len(reader.pages)
except Exception as pdf_err:
logger.warning(f"PdfReader failed for {filename}: {pdf_err} — using poppler fallback")
use_reader = False
page_count = _get_page_count(pdf_path)
if page_count == 0:
db.mark_failed(file_hash, f"PdfReader failed and pdfinfo returned 0 pages: {str(pdf_err)[:200]}")
return False
for i in range(page_count):
# Layer 2: Check total document time budget
elapsed = time.time() - start_time
if elapsed > max_doc_seconds:
msg = f"Timed out after {elapsed:.0f}s at page {i}/{page_count}"
logger.warning(f"TIMEOUT: {filename}{msg}")
if pages_extracted > 0:
_save_partial(file_hash, db, doc, config, text_dir,
page_count, pages_extracted, ocr_pages,
f"Partial: {pages_extracted}/{page_count} pages "
f"(timed out after {elapsed:.0f}s)",
ocr_methods=ocr_methods)
return True
else:
db.mark_failed(file_hash, msg)
return False
# Layer 3: Per-page extraction with fallback chain
try:
if use_reader:
text, method = extract_text_from_page(reader, i, pdf_path, page_timeout)
else:
text, method = _extract_page_without_reader(pdf_path, i, page_timeout)
ocr_methods[method] += 1
if method in ('tesseract', 'gemini_vision'):
ocr_pages.append(i + 1)
except Exception as e:
logger.warning(f" Page {i+1}/{page_count} failed: {e} — skipping")
text = ''
skipped_pages += 1
ocr_methods['none'] += 1
page_file = os.path.join(text_dir, f"page_{i+1:04d}.txt")
with open(page_file, 'w', encoding='utf-8') as f:
f.write(text)
if text.strip():
pages_extracted += 1
# Progress logging every 50 pages (more frequent since vision is slower)
if (i + 1) % 50 == 0:
el = time.time() - start_time
rate = (i + 1) / el if el > 0 else 0
vision_n = ocr_methods['gemini_vision']
vision_note = f", {vision_n} vision" if vision_n else ""
logger.info(f" {filename}: page {i+1}/{page_count} "
f"({rate:.1f} pages/sec, {skipped_pages} skipped{vision_note})")
# Full extraction complete — save metadata
first_page_text = ''
first_page_file = os.path.join(text_dir, 'page_0001.txt')
if os.path.exists(first_page_file):
with open(first_page_file, encoding='utf-8') as f:
first_page_text = f.read()
book_title, book_author = extract_book_metadata(first_page_text, config)
if not book_title:
book_title = clean_filename_to_title(filename)
meta = {
'hash': file_hash,
'filename': filename,
'page_count': page_count,
'ocr_pages': ocr_pages,
'skipped_pages': skipped_pages,
'ocr_methods': ocr_methods,
}
with open(os.path.join(text_dir, 'meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
kwargs = {
'page_count': page_count,
'pages_extracted': pages_extracted,
'book_title': book_title,
}
if book_author:
kwargs['book_author'] = book_author
if skipped_pages > 0:
kwargs['error_message'] = (f"Partial: {pages_extracted}/{page_count} pages "
f"({skipped_pages} pages timed out)")
elapsed = time.time() - start_time
db.update_status(file_hash, 'extracted', **kwargs)
ocr_note = f", {len(ocr_pages)} OCR" if ocr_pages else ""
skip_note = f", {skipped_pages} skipped" if skipped_pages > 0 else ""
vision_note = f", {ocr_methods['gemini_vision']} vision" if ocr_methods['gemini_vision'] else ""
logger.info(f"Extracted {filename}: {pages_extracted}/{page_count} pages "
f"({elapsed:.1f}s{ocr_note}{vision_note}{skip_note})")
return True
except Exception as e:
logger.error(f"Extraction failed for {file_hash}: {e}\n{traceback.format_exc()}")
if pages_extracted > 0:
_save_partial(file_hash, db, doc, config, text_dir,
page_count, pages_extracted, ocr_pages,
f"Partial: {pages_extracted}/{page_count} pages "
f"({str(e)[:150]})",
ocr_methods=ocr_methods)
return True
db.mark_failed(file_hash, str(e)[:500])
return False
def _save_partial(file_hash, db, doc, config, text_dir, page_count,
pages_extracted, ocr_pages, error_msg, ocr_methods=None):
"""Save metadata and mark a partial extraction as 'extracted'."""
book_title = clean_filename_to_title(doc['filename'])
first_page_file = os.path.join(text_dir, 'page_0001.txt')
if os.path.exists(first_page_file):
with open(first_page_file, encoding='utf-8') as f:
first_text = f.read()
if len(first_text.strip()) > 20:
title, _ = extract_book_metadata(first_text, config)
if title:
book_title = title
meta = {
'hash': file_hash,
'filename': doc['filename'],
'page_count': page_count,
'ocr_pages': ocr_pages,
'partial': True,
}
if ocr_methods:
meta['ocr_methods'] = ocr_methods
with open(os.path.join(text_dir, 'meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
db.update_status(file_hash, 'extracted',
page_count=page_count,
pages_extracted=pages_extracted,
book_title=book_title,
error_message=error_msg)
logger.info(f" Saved partial extraction: {pages_extracted}/{page_count} pages")
def run_extraction(workers=None):
config = get_config()
db = StatusDB()
workers = workers or config['processing']['extract_workers']
queued = db.get_by_status('queued')
if not queued:
logger.info("No queued documents to extract")
return 0
logger.info(f"Extracting {len(queued)} documents with {workers} workers")
success = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(extract_single, doc['hash'], StatusDB(), config): doc for doc in queued}
for future in as_completed(futures):
doc = futures[future]
try:
if future.result():
success += 1
except Exception as e:
logger.error(f"Worker error for {doc['hash']}: {e}")
logger.info(f"Extraction complete: {success}/{len(queued)} succeeded")
return success

159
lib/ingester.py Normal file
View file

@ -0,0 +1,159 @@
"""
RECON Intel Ingester
ARGUS intelligence feed intake. Embeds intel JSON and inserts into Qdrant
with source_type='intel_feed'.
Dependencies: requests, qdrant-client
Config: embedding, vector_db
"""
import json
import os
import time
import traceback
import requests as http_requests
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from .utils import get_config, setup_logging
from .status import StatusDB
logger = setup_logging('recon.ingester')
def ingest_intel(intel_data, config=None):
if config is None:
config = get_config()
db = StatusDB()
required = ['source', 'category', 'content']
for field in required:
if field not in intel_data:
logger.error(f"Missing required field: {field}")
return None
try:
conn = db._get_conn()
cursor = conn.execute(
"""INSERT INTO intel (source, timestamp, region, category, content,
summary, key_facts, credibility_score, verification_status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
intel_data.get('source', 'unknown'),
intel_data.get('timestamp', time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())),
intel_data.get('region', 'unknown'),
intel_data['category'],
intel_data['content'],
intel_data.get('summary', ''),
json.dumps(intel_data.get('key_facts', [])),
intel_data.get('credibility_score', 0.5),
intel_data.get('verification_status', 'unverified'),
)
)
intel_id = cursor.lastrowid
conn.commit()
url = f"http://{config['embedding']['host']}:{config['embedding']['port']}/api/embed"
resp = http_requests.post(url, json={
"model": config['embedding']['model'],
"input": intel_data['content']
}, timeout=120)
resp.raise_for_status()
vector = resp.json()['embeddings'][0]
qdrant = QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
point_id = intel_id + 2**60
payload = {
'source_type': 'intel_feed',
'intel_id': intel_id,
'source': intel_data.get('source', 'unknown'),
'region': intel_data.get('region', 'unknown'),
'category': intel_data['category'],
'content': intel_data['content'],
'summary': intel_data.get('summary', ''),
'key_facts': intel_data.get('key_facts', []),
'credibility_score': intel_data.get('credibility_score', 0.5),
'verification_status': intel_data.get('verification_status', 'unverified'),
'timestamp': intel_data.get('timestamp', ''),
'language': 'en',
}
qdrant.upsert(
collection_name=config['vector_db']['collection'],
points=[PointStruct(id=point_id, vector=vector, payload=payload)]
)
conn.execute("UPDATE intel SET vector_id = ? WHERE id = ?", (point_id, intel_id))
conn.commit()
logger.info(f"Ingested intel #{intel_id} from {intel_data.get('source', 'unknown')}")
return intel_id
except Exception as e:
logger.error(f"Intel ingestion failed: {e}\n{traceback.format_exc()}")
return None
def ingest_file(filepath, config=None):
if config is None:
config = get_config()
try:
with open(filepath, encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
results = []
for item in data:
result = ingest_intel(item, config)
results.append(result)
success = sum(1 for r in results if r is not None)
logger.info(f"Ingested {success}/{len(data)} items from {filepath}")
return results
else:
return [ingest_intel(data, config)]
except Exception as e:
logger.error(f"Failed to ingest file {filepath}: {e}")
return []
def run_ingestion(directory=None):
config = get_config()
intel_dir = directory or config['paths']['intel']
if not os.path.exists(intel_dir):
logger.info(f"Intel directory does not exist: {intel_dir}")
return 0
json_files = sorted([
f for f in os.listdir(intel_dir)
if f.endswith('.json') and not f.startswith('.')
])
if not json_files:
logger.info("No intel files to ingest")
return 0
total = 0
for jf in json_files:
filepath = os.path.join(intel_dir, jf)
results = ingest_file(filepath, config)
ingested = sum(1 for r in results if r is not None)
total += ingested
if ingested > 0:
done_dir = os.path.join(intel_dir, 'processed')
os.makedirs(done_dir, exist_ok=True)
os.rename(filepath, os.path.join(done_dir, jf))
logger.info(f"Intel ingestion complete: {total} items ingested")
return total

270
lib/key_manager.py Normal file
View file

@ -0,0 +1,270 @@
"""
RECON Key Manager - Thread-safe API key management with hot-reload.
Provides a singleton KeyManager that workers (enricher, extractor) read from
instead of loading .env directly. Dashboard can update keys at runtime without
restarting the service.
Dependencies: None beyond stdlib + requests (already in requirements.txt)
Config: Reads/writes /opt/recon/.env
"""
import os
import re
import time
import logging
import threading
import requests
logger = logging.getLogger('recon.key_manager')
class KeyManager:
"""Thread-safe API key store with hot-reload and validation."""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._keys_lock = threading.RLock()
self._gemini_keys = []
self._env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
self._last_loaded = None
self._key_stats = {} # key_index -> {calls, errors, last_used}
self._load_from_env()
self._initialized = True
logger.info(f"KeyManager initialized with {len(self._gemini_keys)} Gemini key(s)")
# ── Read Operations ──
def get_gemini_keys(self):
"""Return a copy of current Gemini keys. Thread-safe."""
with self._keys_lock:
return list(self._gemini_keys)
def get_gemini_key(self, index=0):
"""Get a single Gemini key by index. Returns None if out of range."""
with self._keys_lock:
if 0 <= index < len(self._gemini_keys):
return self._gemini_keys[index]
return None
def get_gemini_key_count(self):
"""Return number of loaded Gemini keys."""
with self._keys_lock:
return len(self._gemini_keys)
def get_masked_keys(self):
"""Return keys masked for display: first 8 + ... + last 4 chars."""
with self._keys_lock:
result = []
for i, key in enumerate(self._gemini_keys):
if len(key) > 16:
masked = key[:8] + '...' + key[-4:]
elif len(key) > 8:
masked = key[:4] + '...' + key[-2:]
else:
masked = '****'
stats = self._key_stats.get(i, {})
result.append({
'index': i,
'masked': masked,
'length': len(key),
'calls': stats.get('calls', 0),
'errors': stats.get('errors', 0),
'last_used': stats.get('last_used', None),
'valid': stats.get('valid', None),
'last_validated': stats.get('last_validated', None),
})
return result
# ── Write Operations (all persist to .env) ──
def set_gemini_keys(self, keys):
"""Replace all Gemini keys. Persists to .env. Returns success bool."""
# Filter empty strings
keys = [k.strip() for k in keys if k.strip()]
with self._keys_lock:
self._gemini_keys = keys
self._key_stats = {} # Reset stats on full replace
self._persist_to_env()
logger.info(f"Gemini keys replaced: {len(keys)} key(s) loaded")
return True
def add_gemini_key(self, key):
"""Add a single Gemini key. Persists to .env. Returns new index."""
key = key.strip()
if not key:
raise ValueError("Key cannot be empty")
with self._keys_lock:
# Check for duplicates
if key in self._gemini_keys:
raise ValueError("Key already exists")
self._gemini_keys.append(key)
idx = len(self._gemini_keys) - 1
self._persist_to_env()
logger.info(f"Gemini key added at index {idx}")
return idx
def remove_gemini_key(self, index):
"""Remove a Gemini key by index. Persists to .env. Returns removed key (masked)."""
with self._keys_lock:
if index < 0 or index >= len(self._gemini_keys):
raise IndexError(f"Key index {index} out of range (have {len(self._gemini_keys)} keys)")
if len(self._gemini_keys) <= 1:
raise ValueError("Cannot remove last key — pipeline needs at least 1 Gemini key")
key = self._gemini_keys.pop(index)
# Rebuild stats with shifted indices
new_stats = {}
for i, stats in self._key_stats.items():
if i < index:
new_stats[i] = stats
elif i > index:
new_stats[i - 1] = stats
self._key_stats = new_stats
self._persist_to_env()
masked = key[:8] + '...' + key[-4:] if len(key) > 16 else '****'
logger.info(f"Gemini key removed at index {index}: {masked}")
return masked
def replace_gemini_key(self, index, new_key):
"""Replace a single Gemini key at index. Persists to .env."""
new_key = new_key.strip()
if not new_key:
raise ValueError("Key cannot be empty")
with self._keys_lock:
if index < 0 or index >= len(self._gemini_keys):
raise IndexError(f"Key index {index} out of range")
# Check duplicate (but allow replacing with same key)
if new_key in self._gemini_keys and self._gemini_keys[index] != new_key:
raise ValueError("Key already exists at another index")
self._gemini_keys[index] = new_key
if index in self._key_stats:
self._key_stats[index] = {} # Reset stats for replaced key
self._persist_to_env()
logger.info(f"Gemini key replaced at index {index}")
# ── Validation ──
def validate_key(self, key):
"""
Test a Gemini API key by listing models.
Returns (valid: bool, message: str).
"""
try:
resp = requests.get(
f"https://generativelanguage.googleapis.com/v1beta/models?key={key}",
timeout=10
)
if resp.status_code == 200 and 'models' in resp.text:
return True, "Valid — API responded"
elif resp.status_code == 400:
return False, f"Invalid key (HTTP {resp.status_code})"
elif resp.status_code == 403:
return False, "Key disabled or quota exhausted"
elif resp.status_code == 429:
return True, "Valid — but currently rate-limited"
else:
return False, f"Unexpected response (HTTP {resp.status_code})"
except requests.Timeout:
return False, "Timeout — could not reach Gemini API"
except requests.ConnectionError:
return False, "Connection error — check network"
except Exception as e:
return False, f"Error: {str(e)}"
def validate_all(self):
"""Validate all loaded Gemini keys. Returns list of results."""
results = []
with self._keys_lock:
keys_copy = list(enumerate(self._gemini_keys))
for i, key in keys_copy:
valid, message = self.validate_key(key)
with self._keys_lock:
if i not in self._key_stats:
self._key_stats[i] = {}
self._key_stats[i]['valid'] = valid
self._key_stats[i]['last_validated'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
results.append({'index': i, 'valid': valid, 'message': message})
time.sleep(0.2) # Don't hammer the API
return results
# ── Stats tracking (called by enricher/extractor) ──
def record_usage(self, key_index, success=True):
"""Record a key usage event. Called by workers after each Gemini call."""
with self._keys_lock:
if key_index not in self._key_stats:
self._key_stats[key_index] = {'calls': 0, 'errors': 0}
self._key_stats[key_index]['calls'] = self._key_stats[key_index].get('calls', 0) + 1
if not success:
self._key_stats[key_index]['errors'] = self._key_stats[key_index].get('errors', 0) + 1
self._key_stats[key_index]['last_used'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
# ── Internal ──
def _load_from_env(self):
"""Load Gemini keys from .env file."""
keys = []
if os.path.exists(self._env_path):
with open(self._env_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
match = re.match(r'^GEMINI_KEY(?:_\d+)?=(.+)$', line)
if match:
val = match.group(1).strip().strip('"').strip("'")
if val:
keys.append(val)
self._gemini_keys = keys
self._last_loaded = time.time()
def _persist_to_env(self):
"""Write current keys back to .env file, preserving non-Gemini lines."""
other_lines = []
if os.path.exists(self._env_path):
with open(self._env_path, 'r') as f:
for line in f:
stripped = line.strip()
if stripped and not re.match(r'^GEMINI_KEY', stripped):
other_lines.append(line.rstrip('\n'))
with open(self._env_path, 'w') as f:
# Write non-Gemini lines first
for line in other_lines:
f.write(line + '\n')
# Write Gemini keys
for i, key in enumerate(self._gemini_keys, 1):
f.write(f'GEMINI_KEY_{i}={key}\n')
self._last_loaded = time.time()
logger.info(f"Persisted {len(self._gemini_keys)} Gemini key(s) to {self._env_path}")
def reload_from_env(self):
"""Force reload from .env (e.g., if edited externally)."""
with self._keys_lock:
self._load_from_env()
logger.info(f"Reloaded {len(self._gemini_keys)} Gemini key(s) from .env")
return len(self._gemini_keys)
# Module-level convenience — import and use anywhere
_manager = None
def get_key_manager():
"""Get the singleton KeyManager instance."""
global _manager
if _manager is None:
_manager = KeyManager()
return _manager

1637
lib/new_pipeline.py Normal file

File diff suppressed because it is too large Load diff

374
lib/organizer.py Normal file
View file

@ -0,0 +1,374 @@
"""
RECON Library Organizer
After a document completes the pipeline (extract -> enrich -> embed),
this module classifies it by dominant domain and moves it into the
correct Domain/Subdomain/ folder with a sanitized filename.
Two modes:
1. Per-document: determine_dominant_domain() from on-disk concept JSONs
2. Bulk manifest: organize_from_manifest() using pre-built manifest JSON
Path updates trigger the existing catalogue.path_updated_at mechanism,
which sync_qdrant_paths() propagates to Qdrant payloads.
"""
import json
import logging
import os
import shutil
from collections import Counter
from .utils import sanitize_filename
logger = logging.getLogger('recon.organizer')
# ── Domain folder mapping (canonical) ───────────────────────────────────
# Keys = exact domain strings from Gemini enrichment
# Values = filesystem-safe folder names
DOMAIN_FOLDERS = {
'Agriculture & Livestock': 'Agriculture-and-Livestock',
'Civil Organization': 'Civil-Organization',
'Communications': 'Communications',
'Food Systems': 'Food-Systems',
'Foundational Skills': 'Foundational-Skills',
'Logistics': 'Logistics',
'Medical': 'Medical',
'Navigation': 'Navigation',
'Operations': 'Operations',
'Power Systems': 'Power-Systems',
'Preservation & Storage': 'Preservation-and-Storage',
'Security': 'Security',
'Shelter & Construction': 'Shelter-and-Construction',
'Technology': 'Technology',
'Tools & Equipment': 'Tools-and-Equipment',
'Vehicles': 'Vehicles',
'Water Systems': 'Water-Systems',
'Wilderness Skills': 'Wilderness-Skills',
}
def normalize_folder_name(name):
"""Normalize a domain/subdomain name to a folder-safe string.
Examples:
'Edible Plants & Foraging' -> 'Edible-Plants-and-Foraging'
'emergency medicine' -> 'Emergency-Medicine'
"""
if not name:
return 'Uncategorized'
name = name.strip()
name = name.replace('&', 'and')
words = name.split()
titled = []
for w in words:
if w.lower() in ('and', 'of', 'the', 'to', 'for', 'in', 'on', 'at'):
titled.append(w.lower())
else:
titled.append(w.capitalize())
return '-'.join(titled)
def determine_dominant_domain(doc_hash, data_dir):
"""Determine a document's dominant domain from on-disk concept JSONs.
Reads all /data/concepts/{hash}/window_*.json files, counts domain
occurrences across all concepts, returns the top domain.
Args:
doc_hash: Document hash
data_dir: Path to /opt/recon/data
Returns:
(domain, subdomain, confidence) tuple.
domain/subdomain are strings or None.
confidence is float 0-1 (top domain count / total concepts).
"""
concepts_dir = os.path.join(data_dir, 'concepts', doc_hash)
if not os.path.isdir(concepts_dir):
return (None, None, 0.0)
domain_counter = Counter()
subdomain_counter = Counter()
total_concepts = 0
for fname in os.listdir(concepts_dir):
if not fname.startswith('window_') or not fname.endswith('.json'):
continue
fpath = os.path.join(concepts_dir, fname)
try:
with open(fpath, 'r') as f:
concepts = json.load(f)
except (json.JSONDecodeError, OSError):
continue
if not isinstance(concepts, list):
continue
for concept in concepts:
total_concepts += 1
# domain is usually a list with one element
dom = concept.get('domain')
if isinstance(dom, list):
for d in dom:
if isinstance(d, str):
domain_counter[d] += 1
elif isinstance(dom, str):
domain_counter[dom] += 1
sub = concept.get('subdomain')
if isinstance(sub, list):
for s in sub:
if isinstance(s, str):
subdomain_counter[s] += 1
elif isinstance(sub, str):
subdomain_counter[sub] += 1
if total_concepts == 0 or not domain_counter:
return (None, None, 0.0)
top_domains = domain_counter.most_common(2)
dom_name = top_domains[0][0]
dom_count = top_domains[0][1]
confidence = dom_count / total_concepts
# Check ambiguity
is_ambiguous = False
if len(top_domains) >= 2:
dom2_count = top_domains[1][1]
if dom2_count >= dom_count * 0.8:
is_ambiguous = True
if confidence < 0.4:
is_ambiguous = True
if is_ambiguous:
return (None, None, confidence)
top_sub = subdomain_counter.most_common(1)
sub_name = top_sub[0][0] if top_sub else None
return (dom_name, sub_name, confidence)
def _build_target_path(library_root, domain, subdomain, filename, doc_hash):
"""Build the target path for a document, handling domain mapping and collisions.
Returns:
(target_path, sanitized_filename) tuple
"""
san_name = sanitize_filename(filename, doc_hash=doc_hash)
if domain is None:
# Unclassified — leave in place (don't move to Review folder for pipeline)
return (None, san_name)
domain_folder = DOMAIN_FOLDERS.get(domain)
if not domain_folder:
domain_folder = normalize_folder_name(domain)
if subdomain:
sub_folder = normalize_folder_name(subdomain)
else:
sub_folder = 'General'
target_dir = os.path.join(library_root, domain_folder, sub_folder)
target_path = os.path.join(target_dir, san_name)
# Handle collision at target
if os.path.exists(target_path):
stem, ext = os.path.splitext(san_name)
h6 = doc_hash[:6]
new_name = '{} [{}]{}'.format(stem, h6, ext)
if len(new_name) > 120:
max_stem = 120 - len(ext) - 9
stem = stem[:max_stem].rstrip('. -,')
new_name = '{} [{}]{}'.format(stem, h6, ext)
san_name = new_name
target_path = os.path.join(target_dir, san_name)
return (target_path, san_name)
def organize_document(doc_hash, db, config, dry_run=False):
"""Organize a single document: classify, rename, and move.
Args:
doc_hash: Document hash
db: StatusDB instance
config: RECON config dict
dry_run: If True, don't actually move files
Returns:
dict with keys: hash, action, before_path, after_path, domain, subdomain, error
"""
library_root = config['library_root']
data_dir = config['paths']['data']
result = {
'hash': doc_hash,
'action': 'skip',
'before_path': None,
'after_path': None,
'domain': None,
'subdomain': None,
'error': None,
}
# Look up current path from catalogue
conn = db._get_conn()
row = conn.execute(
"SELECT path, filename FROM catalogue WHERE hash = ?", (doc_hash,)
).fetchone()
if not row:
result['error'] = 'Not in catalogue'
return result
current_path = row['path']
current_filename = row['filename']
result['before_path'] = current_path
# Verify file exists on disk
if not dry_run and not os.path.exists(current_path):
result['error'] = 'File not found on disk'
return result
# Determine domain from concept JSONs
domain, subdomain, confidence = determine_dominant_domain(doc_hash, data_dir)
result['domain'] = domain
result['subdomain'] = subdomain
if domain is None:
result['action'] = 'skip_unclassified'
return result
# Build target path
target_path, san_name = _build_target_path(
library_root, domain, subdomain, current_filename, doc_hash
)
if target_path is None:
result['action'] = 'skip_unclassified'
return result
result['after_path'] = target_path
# Already at target?
if os.path.abspath(current_path) == os.path.abspath(target_path):
result['action'] = 'already_organized'
# Still mark as organized
if not dry_run:
db.mark_organized(doc_hash)
return result
if dry_run:
result['action'] = 'would_move'
return result
# Move the file
try:
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
shutil.move(current_path, target_path)
# Update catalogue (triggers path_updated_at for Qdrant sync)
db.update_catalogue_path(doc_hash, target_path, san_name)
db.mark_organized(doc_hash)
result['action'] = 'moved'
logger.info("Organized %s -> %s [%s/%s]",
doc_hash[:8], target_path, domain, subdomain)
except Exception as e:
result['action'] = 'error'
result['error'] = str(e)
logger.error("Failed to organize %s: %s", doc_hash[:8], e)
return result
def organize_from_manifest(manifest_path, db, config, dry_run=False):
"""Bulk migration using a pre-built manifest JSON.
The manifest is produced by recon_manifest_builder.py and contains
entries with current_path, sanitized_path, sanitized_filename, hash, etc.
Args:
manifest_path: Path to manifest JSON file
db: StatusDB instance
config: RECON config dict
dry_run: If True, don't actually move files
Returns:
dict with summary stats: moved, skipped, errors, already_organized, total
"""
with open(manifest_path, 'r') as f:
entries = json.load(f)
stats = {
'total': len(entries),
'moved': 0,
'skipped': 0,
'already_organized': 0,
'errors': 0,
'not_found': 0,
}
for i, entry in enumerate(entries):
doc_hash = entry['hash']
current_path = entry['current_path']
target_path = entry.get('sanitized_path', entry.get('proposed_path'))
san_name = entry.get('sanitized_filename', entry.get('filename'))
if not target_path or not san_name:
stats['skipped'] += 1
continue
# Skip ambiguous entries
if entry.get('ambiguous'):
stats['skipped'] += 1
continue
# Already at target?
if os.path.abspath(current_path) == os.path.abspath(target_path):
stats['already_organized'] += 1
if not dry_run:
db.mark_organized(doc_hash)
continue
if dry_run:
stats['moved'] += 1
continue
# Verify source exists
if not os.path.exists(current_path):
stats['not_found'] += 1
logger.warning("Manifest: file not found: %s [%s]", current_path, doc_hash[:8])
continue
try:
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
# Check for collision at target (different file already there)
if os.path.exists(target_path):
stem, ext = os.path.splitext(san_name)
h6 = doc_hash[:6]
san_name = '{} [{}]{}'.format(stem, h6, ext)
target_path = os.path.join(target_dir, san_name)
shutil.move(current_path, target_path)
# Update catalogue + mark organized
db.update_catalogue_path(doc_hash, target_path, san_name)
db.mark_organized(doc_hash)
stats['moved'] += 1
except Exception as e:
stats['errors'] += 1
logger.error("Manifest: failed to move %s: %s", doc_hash[:8], e)
# Progress reporting
if (i + 1) % 1000 == 0:
logger.info("Manifest progress: %d / %d (moved=%d, errors=%d)",
i + 1, stats['total'], stats['moved'], stats['errors'])
return stats

137
lib/peertube_collector.py Normal file
View file

@ -0,0 +1,137 @@
"""
RECON Metrics Collector
Background daemon thread that snapshots pipeline metrics every 5 minutes
to the metrics_snapshots SQLite table. Used for time-series charts.
"""
import json
import time
import threading
import logging
logger = logging.getLogger('recon.collector')
def start_collector(stop_event=None):
"""Start the metrics collector in a daemon thread."""
def _run():
from .status import StatusDB
from .utils import get_config
import requests as req
interval = 120 # 2 minutes
logger.info(f"Metrics collector started (interval: {interval}s)")
while True:
if stop_event and stop_event.is_set():
break
try:
_snapshot(StatusDB(), get_config(), req)
except Exception as e:
logger.error(f"Metrics snapshot failed: {e}")
# Wait with stop check
if stop_event:
stop_event.wait(interval)
if stop_event.is_set():
break
else:
time.sleep(interval)
logger.info("Metrics collector stopped")
t = threading.Thread(target=_run, daemon=True, name='metrics-collector')
t.start()
return t
def _snapshot(db, config, req):
"""Take a single metrics snapshot."""
from datetime import datetime, timezone, timedelta
conn = db._get_conn()
ts = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:00Z') # Round to minute
# Knowledge pipeline stats
try:
totals = conn.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'complete' THEN 1 ELSE 0 END) as complete,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN status NOT IN ('complete', 'failed') THEN 1 ELSE 0 END) as in_pipeline,
SUM(COALESCE(concepts_extracted, 0)) as concepts,
SUM(COALESCE(vectors_inserted, 0)) as vectors
FROM documents
""").fetchone()
knowledge_data = {
'total': totals['total'],
'complete': totals['complete'],
'failed': totals['failed'],
'in_pipeline': totals['in_pipeline'],
'concepts': totals['concepts'],
'vectors': totals['vectors'],
}
conn.execute(
"INSERT OR REPLACE INTO metrics_snapshots (timestamp, metric_type, data) VALUES (?, ?, ?)",
(ts, 'knowledge', json.dumps(knowledge_data))
)
conn.commit()
except Exception as e:
logger.debug(f"Knowledge snapshot failed: {e}")
# PeerTube pipeline stats (via SSH)
try:
import subprocess
result = subprocess.run(
['ssh', '-o', 'BatchMode=yes', '-o', 'ConnectTimeout=5',
'zvx@192.168.1.170',
'sudo -u peertube psql peertube_prod -t -A -c "SELECT state, COUNT(*) FROM video GROUP BY state;" 2>/dev/null; '
'echo "---"; '
'for d in staging completed transcoded failed; do '
' dir="/opt/bulk-import/$d"; '
' files=$(find -L "$dir" -type f 2>/dev/null | wc -l); '
' echo "$d|$files"; '
'done'],
capture_output=True, text=True, timeout=20
)
if result.returncode == 0 or result.stdout.strip():
sections = result.stdout.split('---')
video_states = {}
if len(sections) > 0:
for line in sections[0].strip().split('\n'):
if '|' in line:
parts = line.split('|')
if len(parts) == 2 and parts[1].isdigit():
video_states[parts[0]] = int(parts[1])
pipeline_files = {}
if len(sections) > 1:
for line in sections[1].strip().split('\n'):
if '|' in line:
parts = line.split('|')
if len(parts) == 2:
pipeline_files[parts[0]] = int(parts[1]) if parts[1].isdigit() else 0
pt_data = {
'video_states': video_states,
'pipeline_files': pipeline_files,
'published': video_states.get('1', 0),
'backlog': sum(pipeline_files.values()),
}
conn.execute(
"INSERT OR REPLACE INTO metrics_snapshots (timestamp, metric_type, data) VALUES (?, ?, ?)",
(ts, 'peertube', json.dumps(pt_data))
)
conn.commit()
except Exception as e:
logger.debug(f"PeerTube snapshot failed: {e}")
# Prune old snapshots (> 7 days)
try:
cutoff = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
conn.execute("DELETE FROM metrics_snapshots WHERE timestamp < ?", (cutoff,))
conn.commit()
except Exception:
pass

580
lib/peertube_scraper.py Normal file
View file

@ -0,0 +1,580 @@
"""
RECON PeerTube Scraper Video transcript ingestion.
Fetches WebVTT captions from a PeerTube instance, converts to plain text,
chunks into pages, and feeds into the standard RECON enrichment pipeline.
Output format matches lib/web_scraper.py so the enricher and embedder
process transcript content identically to web content.
"""
import hashlib
import io
import json
import os
import bisect
import re
import time
from datetime import datetime, timezone
from urllib.parse import quote
import requests
import webvtt
from .utils import get_config, setup_logging
from .status import StatusDB
from .web_scraper import chunk_text
logger = setup_logging('recon.peertube_scraper')
# Module-level stop flag — set by service thread for graceful shutdown
_stop_check = None
def set_stop_check(fn):
"""Register a callable that returns True when shutdown is requested."""
global _stop_check
_stop_check = fn
# Defaults (overridden by config.yaml peertube section)
DEFAULT_API_BASE = 'http://192.168.1.170'
DEFAULT_PUBLIC_URL = 'https://stream.echo6.co'
DEFAULT_FETCH_TIMEOUT = 30
DEFAULT_RATE_LIMIT_DELAY = 0.5
def _get_pt_config(config=None):
"""Get PeerTube settings from config, with defaults."""
if config is None:
config = get_config()
pt = config.get('peertube', {})
return {
'api_base': pt.get('api_base', DEFAULT_API_BASE),
'public_url': pt.get('public_url', DEFAULT_PUBLIC_URL),
'fetch_timeout': pt.get('fetch_timeout', DEFAULT_FETCH_TIMEOUT),
'rate_limit_delay': pt.get('rate_limit_delay', DEFAULT_RATE_LIMIT_DELAY),
}
def _api_get(path, config=None, params=None):
"""Make a GET request to the PeerTube API."""
ptc = _get_pt_config(config)
url = f"{ptc['api_base']}{path}"
resp = requests.get(url, params=params, timeout=ptc['fetch_timeout'])
resp.raise_for_status()
return resp.json()
def get_videos(channel=None, since=None, config=None):
"""
Paginate through all published videos on the PeerTube instance.
Args:
channel: Filter to this channel actor_name (e.g., 'mental-outlaw')
since: ISO date string only return videos published after this date
config: RECON config dict
Returns list of video dicts with: uuid, name, duration,
channel.name, channel.displayName, publishedAt, description.
"""
ptc = _get_pt_config(config)
videos = []
start = 0
count = 100 # PeerTube supports up to 100 per page
while True:
if channel:
path = f"/api/v1/video-channels/{channel}/videos"
else:
path = "/api/v1/videos"
data = _api_get(path, config, params={
'count': count,
'start': start,
'sort': '-publishedAt',
})
total = data.get('total', 0)
batch = data.get('data', [])
if not batch:
break
for v in batch:
published = v.get('publishedAt', '')
# Filter by since date
if since and published < since:
# Videos are sorted by publishedAt desc, so once we pass
# the since threshold, all remaining are older — stop
return videos
videos.append({
'uuid': v['uuid'],
'name': v['name'],
'duration': v.get('duration', 0),
'channel_name': v.get('channel', {}).get('name', ''),
'channel_display': v.get('channel', {}).get('displayName', ''),
'publishedAt': published,
'description': (v.get('description') or '')[:500],
})
start += count
if start >= total:
break
# Check for shutdown during pagination
if _stop_check and _stop_check():
logger.info(f"Shutdown requested during video listing — returning {len(videos)} collected so far")
return videos
# Rate limit pagination requests
time.sleep(ptc['rate_limit_delay'])
return videos
def get_captions(uuid, config=None):
"""Get caption list for a video. Returns list of caption dicts."""
data = _api_get(f"/api/v1/videos/{uuid}/captions", config)
return data.get('data', [])
def fetch_vtt(caption_path, config=None):
"""Fetch raw VTT file content from PeerTube."""
ptc = _get_pt_config(config)
url = f"{ptc['api_base']}{caption_path}"
resp = requests.get(url, timeout=ptc['fetch_timeout'])
resp.raise_for_status()
return resp.text
def _parse_vtt_time(time_str):
"""Parse VTT timestamp string (HH:MM:SS.mmm or MM:SS.mmm) to seconds."""
parts = time_str.split(':')
if len(parts) == 3:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + float(s)
elif len(parts) == 2:
m, s = parts
return int(m) * 60 + float(s)
return 0.0
def vtt_to_text(vtt_content):
"""
Convert WebVTT content to clean plain text with timestamp tracking.
Strips timestamps, de-duplicates consecutive identical cues (common with
Whisper output), removes HTML tags, and joins cues with spaces (not
newlines Whisper cues break mid-sentence).
Returns (text, cue_timestamps) where:
- text: clean prose string
- cue_timestamps: list of (start_seconds, char_offset) tuples tracking
where each VTT cue begins in the output text
"""
buf = io.StringIO(vtt_content)
try:
captions = webvtt.read_buffer(buf)
except Exception:
# Fallback: manual regex parse if webvtt-py fails
return _vtt_to_text_fallback(vtt_content)
prev_text = None
segments = []
raw_timestamps = [] # (start_seconds, segment_index)
for caption in captions:
text = caption.text.strip()
if not text:
continue
# Strip HTML tags
text = re.sub(r'<[^>]+>', '', text)
# De-duplicate consecutive identical cues
if text == prev_text:
continue
prev_text = text
start_seconds = _parse_vtt_time(caption.start)
raw_timestamps.append((start_seconds, len(segments)))
segments.append(text)
# Join with spaces — VTT cues break mid-sentence
raw = ' '.join(segments)
# Clean up double spaces and whitespace
raw = re.sub(r'\s+', ' ', raw).strip()
# Compute char offsets for each tracked segment
seg_offsets = []
pos = 0
for i, seg in enumerate(segments):
seg_offsets.append(pos)
pos += len(seg) + 1 # +1 for space separator
cue_timestamps = []
for start_secs, seg_idx in raw_timestamps:
if seg_idx < len(seg_offsets):
cue_timestamps.append((start_secs, seg_offsets[seg_idx]))
return raw, cue_timestamps
def _vtt_to_text_fallback(vtt_content):
"""Regex-based VTT parser as fallback. Returns (text, cue_timestamps)."""
lines = vtt_content.split('\n')
prev_text = None
segments = []
raw_timestamps = []
last_time = 0.0
for line in lines:
line = line.strip()
if not line or line == 'WEBVTT':
continue
if '-->' in line:
# Parse start time from "00:01:23.456 --> 00:01:25.789"
time_part = line.split('-->')[0].strip()
last_time = _parse_vtt_time(time_part)
continue
if line.isdigit():
continue
text = re.sub(r'<[^>]+>', '', line)
if text == prev_text:
continue
prev_text = text
raw_timestamps.append((last_time, len(segments)))
segments.append(text)
raw = ' '.join(segments)
raw = re.sub(r'\s+', ' ', raw).strip()
# Compute char offsets
seg_offsets = []
pos = 0
for seg in segments:
seg_offsets.append(pos)
pos += len(seg) + 1
cue_timestamps = []
for start_secs, seg_idx in raw_timestamps:
if seg_idx < len(seg_offsets):
cue_timestamps.append((start_secs, seg_offsets[seg_idx]))
return raw, cue_timestamps
def _map_page_timestamps(pages, full_text, cue_timestamps):
"""
Map page numbers to video timestamps.
For each page, finds its approximate start position in the full text,
then looks up the nearest VTT cue timestamp via binary search.
Returns dict: {"page_0001": 0.0, "page_0002": 312.5, ...}
"""
if not cue_timestamps:
return {}
offsets = [ct[1] for ct in cue_timestamps]
times = [ct[0] for ct in cue_timestamps]
page_ts = {}
search_start = 0
for i, page_text in enumerate(pages):
page_name = f"page_{i+1:04d}"
# Find where this page starts in the full text
snippet = page_text[:200].strip()
pos = full_text.find(snippet, search_start)
if pos < 0:
pos = search_start # fallback
# Binary search for nearest cue at or before this position
idx = bisect.bisect_right(offsets, pos) - 1
if idx < 0:
idx = 0
page_ts[page_name] = round(times[idx], 1)
search_start = pos + len(snippet)
return page_ts
def _content_hash(text):
"""MD5 hash of text content — same as web_scraper."""
return hashlib.md5(text.encode('utf-8')).hexdigest()
def ingest_video(uuid, video_meta, config=None):
"""
Ingest a single PeerTube video transcript.
Fetches captions, converts VTT to text, chunks into pages,
saves to data/text/{hash}/, and sets status to 'extracted'.
Args:
uuid: Video UUID
video_meta: Dict with name, duration, channel_name, channel_display,
publishedAt, description
config: RECON config dict
Returns dict with hash, status, title, page_count or None if no captions.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
db = StatusDB()
# Get captions
captions = get_captions(uuid, config)
if not captions:
return None
# Prefer English caption
caption = None
for c in captions:
if c.get('language', {}).get('id') == 'en':
caption = c
break
if caption is None:
caption = captions[0]
# Fetch VTT
vtt_content = fetch_vtt(caption['captionPath'], config)
# Convert to plain text with timestamp tracking
text, cue_timestamps = vtt_to_text(vtt_content)
if not text or len(text) < 50:
logger.warning(f"Transcript too short for {video_meta['name']} ({uuid}): {len(text)} chars")
return None
# Hash the text content
doc_hash = _content_hash(text)
# Check for duplicate
conn = db._get_conn()
existing = conn.execute("SELECT * FROM catalogue WHERE hash = ?", (doc_hash,)).fetchone()
if existing:
doc = db.get_document(doc_hash)
existing_status = doc['status'] if doc else existing['status']
logger.debug(f"Duplicate transcript (hash {doc_hash[:12]}...) — {video_meta['name']}")
return {
'hash': doc_hash,
'status': 'duplicate',
'title': video_meta['name'],
'existing_status': existing_status,
}
# Chunk into pages
words_per_page = config.get('web_scraper', {}).get('words_per_page', 2000)
pages = chunk_text(text, words_per_page)
# Compute page-to-timestamp mapping
page_timestamps = _map_page_timestamps(pages, text, cue_timestamps)
# Save text files
text_dir = os.path.join(config['paths']['text'], doc_hash)
os.makedirs(text_dir, exist_ok=True)
for i, page_text in enumerate(pages, 1):
page_file = os.path.join(text_dir, f"page_{i:04d}.txt")
with open(page_file, 'w', encoding='utf-8') as f:
f.write(page_text)
# Save meta.json
video_url = f"{ptc['public_url']}/w/{uuid}"
meta = {
'hash': doc_hash,
'source_type': 'transcript',
'url': video_url,
'title': video_meta['name'],
'author': video_meta.get('channel_display', ''),
'channel': video_meta.get('channel_name', ''),
'duration': video_meta.get('duration', 0),
'date': video_meta.get('publishedAt', ''),
'description': video_meta.get('description', ''),
'sitename': 'stream.echo6.co',
'page_count': len(pages),
'text_length': len(text),
'page_timestamps': page_timestamps,
'fetched_at': datetime.now(timezone.utc).isoformat(),
}
with open(os.path.join(text_dir, 'meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
# Display filename for catalogue
display_name = re.sub(r'[^\w\s._-]', '', video_meta['name'])[:200].strip()
if not display_name:
display_name = uuid
# Add to catalogue
db.add_to_catalogue(
doc_hash, display_name, video_url,
len(text), 'stream.echo6.co', video_meta.get('channel_name', 'unknown')
)
# Queue + advance to extracted
db.queue_document(doc_hash)
db.update_status(doc_hash, 'extracted',
page_count=len(pages),
pages_extracted=len(pages),
book_title=video_meta['name'],
book_author=video_meta.get('channel_display', ''))
logger.info(
f"Ingested transcript: {video_meta['name']} ({uuid[:8]}...) "
f"-> {doc_hash[:12]}... ({len(pages)} pages, {len(text)} chars)"
)
return {
'hash': doc_hash,
'status': 'extracted',
'title': video_meta['name'],
'page_count': len(pages),
'text_length': len(text),
'page_timestamps': page_timestamps,
'channel': video_meta.get('channel_name', ''),
'duration': video_meta.get('duration', 0),
'url': video_url,
}
def ingest_channel(channel_name, config=None, since=None):
"""
Ingest all captioned videos from a specific channel.
Returns summary dict.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
logger.info(f"Ingesting channel: {channel_name}")
videos = get_videos(channel=channel_name, since=since, config=config)
return _ingest_video_list(videos, config, ptc)
def ingest_all(config=None, since=None):
"""
Ingest all captioned videos from the entire PeerTube instance.
Returns summary dict.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
logger.info("Ingesting all PeerTube videos with captions")
videos = get_videos(since=since, config=config)
return _ingest_video_list(videos, config, ptc)
def _ingest_video_list(videos, config, ptc):
"""Process a list of videos — shared logic for ingest_channel and ingest_all."""
results = []
skipped_no_captions = 0
skipped_duplicate = 0
failed = 0
ingested = 0
total_pages = 0
total = len(videos)
logger.info(f"Found {total} videos to check for captions")
for i, video in enumerate(videos, 1):
if _stop_check and _stop_check():
logger.info(f"Shutdown requested — stopping after {i-1}/{total} videos")
break
uuid = video['uuid']
try:
result = ingest_video(uuid, video, config)
if result is None:
skipped_no_captions += 1
elif result['status'] == 'duplicate':
skipped_duplicate += 1
else:
ingested += 1
total_pages += result.get('page_count', 0)
results.append(result)
except Exception as e:
logger.error(f"[{i}/{total}] Failed: {video['name']} ({uuid}) — {e}")
failed += 1
# Check for shutdown
if _stop_check and _stop_check():
logger.info(f"Shutdown requested — stopping after {i}/{total} videos")
break
# Rate limit
if i < total:
time.sleep(ptc['rate_limit_delay'])
# Progress logging every 50 videos
if i % 50 == 0:
logger.info(
f"Progress: {i}/{total} checked — "
f"{ingested} ingested, {skipped_no_captions} no captions, "
f"{skipped_duplicate} dupes, {failed} failed"
)
logger.info(
f"PeerTube ingestion complete: {ingested} ingested ({total_pages} pages), "
f"{skipped_no_captions} no captions, {skipped_duplicate} duplicates, "
f"{failed} failed out of {total} videos"
)
return {
'results': results,
'summary': {
'total_checked': total,
'ingested': ingested,
'skipped_no_captions': skipped_no_captions,
'skipped_duplicate': skipped_duplicate,
'failed': failed,
'total_pages': total_pages,
}
}
def get_instance_stats(config=None):
"""Get PeerTube instance statistics for the dashboard."""
if config is None:
config = get_config()
db = StatusDB()
# Total videos on instance
try:
data = _api_get("/api/v1/videos", config, params={'count': 1})
total_videos = data.get('total', 0)
except Exception:
total_videos = 0
# Videos ingested into RECON (from catalogue)
conn = db._get_conn()
ingested = conn.execute(
"SELECT count(*) FROM catalogue WHERE source = 'stream.echo6.co'"
).fetchone()[0]
# Status breakdown
status_rows = conn.execute(
"SELECT d.status, count(*) as cnt FROM documents d "
"JOIN catalogue c ON d.hash = c.hash "
"WHERE c.source = 'stream.echo6.co' "
"GROUP BY d.status"
).fetchall()
status_breakdown = {row['status']: row['cnt'] for row in status_rows}
return {
'total_videos': total_videos,
'ingested': ingested,
'status_breakdown': status_breakdown,
}

508
lib/status.py Normal file
View file

@ -0,0 +1,508 @@
"""
RECON Status Tracker
SQLite operations for catalogue and documents tables. WAL mode, thread-local connections.
Status flow: catalogued -> queued -> extracting -> extracted -> enriching -> enriched -> embedding -> complete.
Config: paths.db
"""
import os
import sqlite3
import threading
from datetime import datetime, timezone
from .utils import get_config
_local = threading.local()
class StatusDB:
def __init__(self, db_path=None):
if db_path is None:
db_path = get_config()['paths']['db']
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_db()
def _get_conn(self):
if not hasattr(_local, 'conn') or _local.conn is None:
_local.conn = sqlite3.connect(self.db_path, timeout=30)
_local.conn.row_factory = sqlite3.Row
_local.conn.execute("PRAGMA journal_mode=WAL")
_local.conn.execute("PRAGMA busy_timeout=5000")
return _local.conn
def _init_db(self):
conn = self._get_conn()
conn.executescript("""
CREATE TABLE IF NOT EXISTS catalogue (
hash TEXT PRIMARY KEY,
filename TEXT NOT NULL,
path TEXT NOT NULL,
size_bytes INTEGER,
source TEXT,
category TEXT,
status TEXT DEFAULT 'catalogued',
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS documents (
hash TEXT PRIMARY KEY,
filename TEXT NOT NULL,
path TEXT,
size_bytes INTEGER,
page_count INTEGER,
book_title TEXT,
book_author TEXT,
collection TEXT DEFAULT 'survival',
status TEXT DEFAULT 'pending',
pages_extracted INTEGER DEFAULT 0,
concepts_extracted INTEGER DEFAULT 0,
vectors_inserted INTEGER DEFAULT 0,
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
extracted_at TEXT,
enriched_at TEXT,
embedded_at TEXT,
error_message TEXT,
retry_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS intel (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT,
timestamp TEXT,
region TEXT,
category TEXT,
content TEXT,
summary TEXT,
key_facts TEXT,
credibility_score REAL,
verification_status TEXT,
vector_id INTEGER,
ingested_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS metrics_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
metric_type TEXT NOT NULL,
data TEXT NOT NULL,
UNIQUE(timestamp, metric_type)
);
CREATE INDEX IF NOT EXISTS idx_catalogue_status ON catalogue(status);
CREATE INDEX IF NOT EXISTS idx_catalogue_source ON catalogue(source);
CREATE INDEX IF NOT EXISTS idx_documents_status ON documents(status);
""")
# Migration: add path_updated_at column if missing
try:
conn.execute("ALTER TABLE catalogue ADD COLUMN path_updated_at TEXT")
except Exception:
pass # column already exists
# Migration: add organized_at column to documents if missing
try:
conn.execute("ALTER TABLE documents ADD COLUMN organized_at TEXT")
except Exception:
pass # column already exists
# Stream B: file_operations + duplicate_review tables
conn.executescript("""
CREATE TABLE IF NOT EXISTS file_operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_hash TEXT NOT NULL,
operation TEXT NOT NULL,
source_path TEXT NOT NULL,
target_path TEXT NOT NULL,
source_filename TEXT NOT NULL,
target_filename TEXT NOT NULL,
original_filename TEXT,
collision_step INTEGER,
qdrant_points_updated INTEGER DEFAULT 0,
performed_at TEXT DEFAULT CURRENT_TIMESTAMP,
reversed_at TEXT,
notes TEXT
);
CREATE INDEX IF NOT EXISTS idx_fileops_hash ON file_operations(doc_hash);
CREATE TABLE IF NOT EXISTS duplicate_review (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_hash TEXT NOT NULL,
original_filename TEXT NOT NULL,
sanitized_filename TEXT NOT NULL,
collision_with_hash TEXT,
collision_path TEXT,
duplicate_path TEXT NOT NULL,
domain TEXT,
subdomain TEXT,
book_author TEXT,
book_title TEXT,
status TEXT DEFAULT 'pending',
resolution TEXT,
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
resolved_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status);
""")
conn.commit()
def add_to_catalogue(self, file_hash, filename, path, size_bytes, source, category):
conn = self._get_conn()
conn.execute(
"""INSERT INTO catalogue (hash, filename, path, size_bytes, source, category)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(hash) DO UPDATE SET
path = excluded.path,
filename = excluded.filename,
source = excluded.source,
category = excluded.category,
path_updated_at = CASE
WHEN catalogue.path != excluded.path THEN CURRENT_TIMESTAMP
ELSE catalogue.path_updated_at
END""",
(file_hash, filename, path, size_bytes, source, category)
)
conn.commit()
def queue_document(self, file_hash):
conn = self._get_conn()
row = conn.execute("SELECT * FROM catalogue WHERE hash = ?", (file_hash,)).fetchone()
if not row:
return False
conn.execute("UPDATE catalogue SET status = 'queued' WHERE hash = ?", (file_hash,))
conn.execute(
"""INSERT INTO documents (hash, filename, path, size_bytes, status)
VALUES (?, ?, ?, ?, 'queued')
ON CONFLICT(hash) DO UPDATE SET
path = excluded.path,
filename = excluded.filename""",
(row['hash'], row['filename'], row['path'], row['size_bytes'])
)
conn.commit()
return True
def update_status(self, file_hash, status, **kwargs):
conn = self._get_conn()
sets = ["status = ?"]
vals = [status]
ts_field = {
'extracted': 'extracted_at',
'enriched': 'enriched_at',
'complete': 'embedded_at',
}.get(status)
if ts_field:
sets.append(f"{ts_field} = ?")
vals.append(datetime.now(timezone.utc).isoformat())
for k, v in kwargs.items():
sets.append(f"{k} = ?")
vals.append(v)
vals.append(file_hash)
conn.execute(f"UPDATE documents SET {', '.join(sets)} WHERE hash = ?", vals)
conn.commit()
def get_by_status(self, status, limit=None):
conn = self._get_conn()
q = "SELECT * FROM documents WHERE status = ? ORDER BY discovered_at"
if limit:
q += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
def get_catalogued(self, source=None, category=None, limit=None):
conn = self._get_conn()
q = "SELECT * FROM catalogue WHERE status = 'catalogued'"
params = []
if source:
q += " AND source = ?"
params.append(source)
if category:
q += " AND category = ?"
params.append(category)
q += " ORDER BY discovered_at"
if limit:
q += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(q, params).fetchall()]
def get_document(self, file_hash):
conn = self._get_conn()
row = conn.execute("SELECT * FROM documents WHERE hash = ?", (file_hash,)).fetchone()
return dict(row) if row else None
def get_status_counts(self):
conn = self._get_conn()
cat_counts = {}
for row in conn.execute("SELECT status, COUNT(*) as cnt FROM catalogue GROUP BY status"):
cat_counts[row['status']] = row['cnt']
doc_counts = {}
for row in conn.execute("SELECT status, COUNT(*) as cnt FROM documents GROUP BY status"):
doc_counts[row['status']] = row['cnt']
return {'catalogue': cat_counts, 'documents': doc_counts}
def get_failures(self):
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"SELECT * FROM documents WHERE status = 'failed' ORDER BY discovered_at"
).fetchall()]
def mark_failed(self, file_hash, error_msg):
conn = self._get_conn()
conn.execute(
"UPDATE documents SET status = 'failed', error_message = ? WHERE hash = ?",
(str(error_msg)[:1000], file_hash)
)
conn.commit()
def increment_retry(self, file_hash):
conn = self._get_conn()
conn.execute(
"UPDATE documents SET retry_count = retry_count + 1, status = 'queued', error_message = NULL WHERE hash = ?",
(file_hash,)
)
conn.commit()
def get_sources(self):
conn = self._get_conn()
return [r[0] for r in conn.execute(
"SELECT DISTINCT source FROM catalogue ORDER BY source"
).fetchall()]
def get_categories(self, source=None):
conn = self._get_conn()
if source:
return [r[0] for r in conn.execute(
"SELECT DISTINCT category FROM catalogue WHERE source = ? ORDER BY category", (source,)
).fetchall()]
return [r[0] for r in conn.execute(
"SELECT DISTINCT category FROM catalogue ORDER BY category"
).fetchall()]
def get_all_documents(self, status=None, source=None, category=None, limit=None, offset=None):
conn = self._get_conn()
q = """SELECT d.*, c.source, c.category FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash WHERE 1=1"""
params = []
if status:
q += " AND d.status = ?"
params.append(status)
if source:
q += " AND c.source = ?"
params.append(source)
if category:
q += " AND c.category = ?"
params.append(category)
q += " ORDER BY d.discovered_at DESC"
if limit:
q += f" LIMIT {int(limit)}"
if offset:
q += f" OFFSET {int(offset)}"
return [dict(r) for r in conn.execute(q, params).fetchall()]
def count_documents(self, source=None, category=None):
"""Count documents matching optional source/category filters."""
conn = self._get_conn()
q = """SELECT COUNT(*) FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash WHERE 1=1"""
params = []
if source:
q += " AND c.source = ?"
params.append(source)
if category:
q += " AND c.category = ?"
params.append(category)
return conn.execute(q, params).fetchone()[0]
def catalogue_count(self):
conn = self._get_conn()
return conn.execute("SELECT COUNT(*) FROM catalogue").fetchone()[0]
def source_breakdown(self):
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"SELECT source, COUNT(*) as count, SUM(size_bytes) as total_bytes FROM catalogue GROUP BY source ORDER BY count DESC"
).fetchall()]
def category_breakdown(self, source=None):
conn = self._get_conn()
if source:
return [dict(r) for r in conn.execute(
"SELECT category, COUNT(*) as count FROM catalogue WHERE source = ? GROUP BY category ORDER BY count DESC",
(source,)
).fetchall()]
return [dict(r) for r in conn.execute(
"SELECT source, category, COUNT(*) as count FROM catalogue GROUP BY source, category ORDER BY source, count DESC"
).fetchall()]
def get_path_updates(self):
"""Get catalogue entries where path was updated since last sync."""
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"SELECT hash, filename, path, source, category FROM catalogue "
"WHERE path_updated_at IS NOT NULL"
).fetchall()]
def clear_path_update(self, file_hash):
"""Clear path_updated_at flag after Qdrant sync."""
conn = self._get_conn()
conn.execute(
"UPDATE catalogue SET path_updated_at = NULL WHERE hash = ?",
(file_hash,)
)
conn.commit()
def sync_document_path(self, file_hash, path, filename):
"""Update path and filename in documents table."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET path = ?, filename = ? WHERE hash = ?",
(path, filename, file_hash)
)
conn.commit()
def status_breakdown(self):
conn = self._get_conn()
rows = conn.execute(
"SELECT status, COUNT(*) as count FROM catalogue GROUP BY status ORDER BY count DESC"
).fetchall()
return [dict(r) for r in rows]
def get_unorganized(self, limit=None):
"""Get completed documents that haven't been organized yet."""
conn = self._get_conn()
q = "SELECT hash, filename, path FROM documents WHERE status = 'complete' AND organized_at IS NULL ORDER BY embedded_at"
if limit:
q += " LIMIT {}".format(int(limit))
return [dict(r) for r in conn.execute(q).fetchall()]
def get_ingest_pending(self, ingest_dir, limit=50):
"""Get completed docs in _ingest/ that haven't been organized."""
conn = self._get_conn()
pattern = ingest_dir + '%'
return [dict(r) for r in conn.execute(
"SELECT hash, filename, path FROM documents "
"WHERE status = 'complete' AND organized_at IS NULL AND path LIKE ? "
"ORDER BY embedded_at LIMIT ?",
(pattern, limit)
).fetchall()]
def mark_organized(self, file_hash):
"""Mark a document as organized (sets organized_at timestamp)."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET organized_at = CURRENT_TIMESTAMP WHERE hash = ?",
(file_hash,)
)
conn.commit()
def update_catalogue_path(self, file_hash, new_path, new_filename):
"""Update catalogue path/filename and flag for Qdrant sync."""
conn = self._get_conn()
conn.execute(
"UPDATE catalogue SET path = ?, filename = ?, path_updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
(new_path, new_filename, file_hash)
)
conn.commit()
# ── Stream B: File Operations ───────────────────────────────────
def log_file_operation(self, doc_hash, operation, source_path, target_path,
source_filename, target_filename, original_filename=None,
collision_step=None, qdrant_points_updated=0, notes=None):
"""Log a file move/rename operation for audit trail and rollback."""
conn = self._get_conn()
conn.execute(
"""INSERT INTO file_operations
(doc_hash, operation, source_path, target_path,
source_filename, target_filename, original_filename,
collision_step, qdrant_points_updated, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(doc_hash, operation, source_path, target_path,
source_filename, target_filename, original_filename,
collision_step, qdrant_points_updated, notes)
)
conn.commit()
return conn.execute("SELECT last_insert_rowid()").fetchone()[0]
def get_file_operations(self, doc_hash=None, limit=50):
"""Get file operations, optionally filtered by doc_hash."""
conn = self._get_conn()
if doc_hash:
return [dict(r) for r in conn.execute(
"SELECT * FROM file_operations WHERE doc_hash = ? ORDER BY performed_at DESC LIMIT ?",
(doc_hash, limit)
).fetchall()]
return [dict(r) for r in conn.execute(
"SELECT * FROM file_operations WHERE reversed_at IS NULL ORDER BY performed_at DESC LIMIT ?",
(limit,)
).fetchall()]
def get_file_operation(self, op_id):
"""Get a single file operation by ID."""
conn = self._get_conn()
row = conn.execute("SELECT * FROM file_operations WHERE id = ?", (op_id,)).fetchone()
return dict(row) if row else None
def mark_operation_reversed(self, op_id):
"""Mark a file operation as reversed."""
conn = self._get_conn()
conn.execute(
"UPDATE file_operations SET reversed_at = CURRENT_TIMESTAMP WHERE id = ?",
(op_id,)
)
conn.commit()
def queue_duplicate_review(self, doc_hash, original_filename, sanitized_filename,
collision_with_hash=None, collision_path=None,
duplicate_path='', domain=None, subdomain=None,
book_author=None, book_title=None):
"""Queue a file for human duplicate review."""
conn = self._get_conn()
conn.execute(
"""INSERT INTO duplicate_review
(doc_hash, original_filename, sanitized_filename,
collision_with_hash, collision_path, duplicate_path,
domain, subdomain, book_author, book_title)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(doc_hash, original_filename, sanitized_filename,
collision_with_hash, collision_path, duplicate_path,
domain, subdomain, book_author, book_title)
)
conn.commit()
def get_duplicate_reviews(self, status='pending', limit=50):
"""Get duplicate review queue."""
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"SELECT * FROM duplicate_review WHERE status = ? ORDER BY discovered_at DESC LIMIT ?",
(status, limit)
).fetchall()]
def get_pipeline_stats(self):
"""Get Stream B pipeline statistics."""
conn = self._get_conn()
ops = conn.execute(
"SELECT operation, COUNT(*) as cnt FROM file_operations WHERE reversed_at IS NULL GROUP BY operation"
).fetchall()
dupes = conn.execute(
"SELECT status, COUNT(*) as cnt FROM duplicate_review GROUP BY status"
).fetchall()
acquired = 0
ingest = 0
try:
acquired_dir = get_config().get('new_pipeline', {}).get('acquired_dir', '')
ingest_dir = get_config().get('new_pipeline', {}).get('ingest_dir', '')
if acquired_dir and os.path.isdir(acquired_dir):
acquired = len([f for f in os.listdir(acquired_dir) if f.lower().endswith('.pdf')])
if ingest_dir and os.path.isdir(ingest_dir):
ingest = len([f for f in os.listdir(ingest_dir) if f.lower().endswith('.pdf')])
except Exception:
pass
return {
'operations': {dict(r)['operation']: dict(r)['cnt'] for r in ops},
'duplicates': {dict(r)['status']: dict(r)['cnt'] for r in dupes},
'acquired_pending': acquired,
'ingest_pending': ingest,
}

390
lib/utils.py Normal file
View file

@ -0,0 +1,390 @@
"""
RECON Utilities
Content hashing (MD5), config loading (YAML), download URL generation,
source/category derivation, logging setup, filename sanitization.
Config: Loads and caches config.yaml
"""
import hashlib
import logging
import os
import re
import unicodedata
from urllib.parse import quote
import yaml
from logging.handlers import RotatingFileHandler
_config = None
def get_config():
global _config
if _config is not None:
return _config
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config.yaml')
with open(config_path) as f:
_config = yaml.safe_load(f)
# Load Gemini keys from .env
env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
_config['gemini_keys'] = []
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
if key.startswith('GEMINI_KEY_') and val != 'PASTE_KEY_HERE':
_config['gemini_keys'].append(val)
return _config
def content_hash(filepath):
h = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()
def concept_id(doc_hash, page_num, concept_index):
raw = f"{doc_hash}:{page_num}:{concept_index}"
h = hashlib.md5(raw.encode()).hexdigest()[:15]
return int(h, 16)
def setup_logging(name='recon'):
config = get_config()
log_dir = config['paths']['logs']
os.makedirs(log_dir, exist_ok=True)
os.makedirs(os.path.join(log_dir, 'errors'), exist_ok=True)
logger = logging.getLogger(name)
if logger.handlers:
return logger
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
fh = RotatingFileHandler(os.path.join(log_dir, 'recon.log'), maxBytes=10*1024*1024, backupCount=5)
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
eh = RotatingFileHandler(os.path.join(log_dir, 'errors', 'errors.log'), maxBytes=5*1024*1024, backupCount=3)
eh.setLevel(logging.ERROR)
eh.setFormatter(fmt)
logger.addHandler(eh)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)
logger.addHandler(ch)
return logger
def derive_source_and_category(filepath, library_root):
rel = os.path.relpath(filepath, library_root)
parts = rel.split(os.sep)
source = parts[0] if parts else 'unknown'
category = parts[1] if len(parts) > 2 else source
return source, category
def clean_filename_to_title(filename):
"""Convert a PDF filename into a human-readable title."""
# Strip extension
name = os.path.splitext(filename)[0]
# Remove common PDF download suffixes (with or without parens)
name = re.sub(r'[\s_]*\(?\s*PDFDrive\s*\)?\s*_?', '', name, flags=re.IGNORECASE)
name = re.sub(r'[\s_]*\(?\s*z-lib\.org\s*\)?\s*_?', '', name, flags=re.IGNORECASE)
# Handle military manual prefixes: FM_23_10 -> FM 23-10, ATP_3_21 -> ATP 3-21
name = re.sub(
r'\b(FM|ATP|TC|TM|AR|STP|GTA|ATTP|FMFRP|ADP|ADRP)[-_](\d+)[-_](\d+)',
lambda m: f"{m.group(1)} {m.group(2)}-{m.group(3)}",
name
)
# Fix common abbreviations: U_S -> U.S., etc.
name = re.sub(r'(?<![A-Za-z])U[_\s]S(?=[_\s]|$)', 'U.S.', name)
# Replace underscores and hyphens with spaces (but not in manual numbers like FM 23-10)
name = re.sub(r'(?<!\d)[-_](?!\d)', ' ', name)
name = name.replace('_', ' ')
# Remove bracketed years like [1990]
year_match = re.search(r'\[(\d{4})\]', name)
year_suffix = f" ({year_match.group(1)})" if year_match else ''
name = re.sub(r'\s*\[\d{4}\]\s*', ' ', name)
# Collapse multiple spaces
name = re.sub(r'\s+', ' ', name).strip()
# Title-case, but preserve uppercase military abbreviations
words = name.split()
titled = []
for w in words:
if w.isupper() and len(w) >= 2:
titled.append(w)
elif re.match(r'^\d', w):
titled.append(w)
else:
titled.append(w.capitalize() if w.islower() else w)
name = ' '.join(titled) + year_suffix
name = name.strip()
if len(name) < 3:
return os.path.splitext(filename)[0]
return name
# ── Mojibake fix table ──────────────────────────────────────────────
_MOJIBAKE = {
'\u00e2\u0080\u0099': "'", # ’ → ' (right single quote)
'\u00e2\u0080\u0098': "'", # ‘ → ' (left single quote)
'\u00e2\u0080\u009c': '"', # “ → " (left double quote)
'\u00e2\u0080\u009d': '"', # †→ " (right double quote)
'\u00e2\u0080\u0093': '-', # â€" → - (en dash)
'\u00e2\u0080\u0094': '-', # â€" → - (em dash)
'\u00e2\u0080\u00a6': '...', # … → ... (ellipsis)
'\u00c3\u00a9': 'e', # é → e (e-acute)
'\u00c3\u00a8': 'e', # è → e (e-grave)
'\u00c3\u00b6': 'o', # ö → o (o-umlaut)
'\u00c3\u00bc': 'u', # ü → u (u-umlaut)
'\u00c3\u00a4': 'a', # ä → a (a-umlaut)
'\u00c3\u00b1': 'n', # ñ → n (n-tilde)
'\u00c3\u00ad': 'i', # í → i (i-acute)
'\u00c3\u00a1': 'a', # á → a (a-acute)
'\u00c3\u00ba': 'u', # ú → u (u-acute)
'\u00c3\u00b3': 'o', # ó → o (o-acute)
'\u00c2\u00ae': '', # ® → (registered)
'\u00c2\u00a9': '', # © → (copyright)
'\u00c2\u00ab': '"', # « → " (guillemet left)
'\u00c2\u00bb': '"', # » → " (guillemet right)
}
# Pre-compile: replace longer sequences first to avoid partial matches
_MOJIBAKE_PATTERN = re.compile(
'|'.join(re.escape(k) for k in sorted(_MOJIBAKE.keys(), key=len, reverse=True))
)
def sanitize_filename(filename, doc_hash=None):
"""Sanitize a PDF filename for cross-platform filesystem safety.
Six-phase pipeline:
1. Strip source-site metadata (Anna's Archive, PDFDrive, z-lib, torrent tags)
2. Strip embedded identifiers (ISBN, MD5 hash, z-lib hex suffix)
3. Fix character encoding (mojibake, NFKD normalization)
4. Normalize structure (military prefixes, period-separated words, underscores)
5. Clean characters (Windows-illegal, control chars, collapse whitespace)
6. Validate and truncate (120 char max, word-boundary break)
Args:
filename: Original filename (with extension)
doc_hash: Optional doc_hash to verify z-lib suffix matches
Returns:
Sanitized filename (with extension preserved)
"""
stem, ext = os.path.splitext(filename)
ext = ext.lower()
if not ext:
ext = '.pdf'
# ── Phase 1: Strip source-site metadata ─────────────────────────
# Anna's Archive pattern: Title -- Authors -- Edition -- ISBN -- Hash -- Source
segments = stem.split(' -- ')
if len(segments) >= 3:
stem = segments[0]
elif len(segments) == 2:
second = segments[1]
if re.search(r'97[89]\d{10}|[0-9a-f]{32}|(?:19|20)\d{2}|[Aa]nna', second):
stem = segments[0]
# PDFDrive tags
stem = re.sub(r'\s*\(\s*PDFDrive\s*\)\s*', ' ', stem, flags=re.IGNORECASE)
stem = re.sub(r'\s*_PDFDrive_\s*', ' ', stem, flags=re.IGNORECASE)
# z-lib tags
stem = re.sub(r'\s*\(\s*z-lib\.org\s*\)\s*', ' ', stem, flags=re.IGNORECASE)
stem = re.sub(r'\s*_z-lib\.org_\s*', ' ', stem, flags=re.IGNORECASE)
# Torrent tags in curly braces
stem = re.sub(r'\s*\{[A-Za-z0-9]+\}\s*', ' ', stem)
# ── Phase 2: Strip embedded identifiers ─────────────────────────
# ISBN-13 (with optional dashes/spaces)
stem = re.sub(r'\s*97[89][\s-]?\d[\s-]?\d{2}[\s-]?\d{5,6}[\s-]?\d\s*', ' ', stem)
# ISBN-10 with dashes
stem = re.sub(r'\s*\d[\s-]\d{2}[\s-]\d{5,6}[\s-][\dXx]\s*', ' ', stem)
# MD5 hashes (32 hex chars, standalone)
stem = re.sub(r'\s*\b[0-9a-f]{32}\b\s*', ' ', stem)
# z-lib 8-char hex suffix like _4d969c3c
if doc_hash:
# Only strip if it matches the doc_hash prefix
match = re.search(r'_([0-9a-f]{8})$', stem)
if match and doc_hash.startswith(match.group(1)):
stem = stem[:match.start()]
else:
# Strip any trailing 8-char hex suffix after underscore
stem = re.sub(r'_[0-9a-f]{8}$', '', stem)
# ── Phase 3: Fix character encoding ─────────────────────────────
# Fix known mojibake sequences
stem = _MOJIBAKE_PATTERN.sub(lambda m: _MOJIBAKE[m.group()], stem)
# Common single-char mojibake that slip through
stem = stem.replace('\u00e2\u0080', '-') # partial em/en dash mojibake
stem = stem.replace('H_', 'H. ') # Anna's Archive initial abbreviation pattern
# NFKD normalize: decompose accented chars, strip combining marks
nfkd = unicodedata.normalize('NFKD', stem)
cleaned = []
for ch in nfkd:
cat = unicodedata.category(ch)
if cat.startswith('M'): # combining mark — skip
continue
if cat.startswith('C') and ch not in (' ', '\t'): # control char — skip
continue
# Keep ASCII + common punctuation; drop CJK/Cyrillic/etc if not transliteratable
cp = ord(ch)
if cp < 128:
cleaned.append(ch)
elif cat.startswith('L') or cat.startswith('N'):
# Letter or number outside ASCII — try to keep if Latin-ish
if cp < 0x0250: # Latin Extended range
cleaned.append(ch)
# else: drop CJK, Cyrillic, etc.
elif cat.startswith('P') or cat.startswith('S'):
# Punctuation/symbol — map to ASCII equivalent
if ch in ('\u2018', '\u2019', '\u201a', '\u0060'):
cleaned.append("'")
elif ch in ('\u201c', '\u201d', '\u201e'):
cleaned.append('"')
elif ch in ('\u2013', '\u2014', '\u2012'):
cleaned.append('-')
elif ch == '\u2026':
cleaned.append('...')
elif ch in ('\u00ab', '\u00bb'):
cleaned.append('"')
else:
cleaned.append(' ')
elif cat.startswith('Z'):
cleaned.append(' ')
stem = ''.join(cleaned)
# ── Phase 4: Normalize structure ────────────────────────────────
# Detect URL-derived filenames — skip aggressive normalization
is_url_derived = bool(re.match(r'[a-z0-9-]+\.[a-z]{2,}[_/]', stem))
if not is_url_derived:
# Military manual prefixes: FM_23_10 -> FM 23-10
stem = re.sub(
r'\b(FM|ATP|TC|TM|AR|STP|GTA|ATTP|FMFRP|ADP|ADRP)[-_](\d+)[-_](\d+)',
lambda m: '{} {}-{}'.format(m.group(1), m.group(2), m.group(3)),
stem
)
# Period-separated words (4+ segments = likely word-separated, not abbreviations like U.S.)
if stem.count('.') >= 4:
stem = re.sub(r'\.(?=[A-Za-z])', ' ', stem)
# Underscores to spaces (always)
stem = stem.replace('_', ' ')
# ── Phase 5: Clean characters ───────────────────────────────────
# Remove Windows-illegal chars and control chars
stem = re.sub(r'[<>:"|?*\\\/]', '', stem)
stem = re.sub(r'[\x00-\x1f\x7f]', '', stem)
# Collapse multiple spaces, hyphens, underscores
stem = re.sub(r' {2,}', ' ', stem)
stem = re.sub(r'-{2,}', '-', stem)
# Strip leading/trailing dots, spaces, dashes
stem = stem.strip('. -')
# ── Phase 6: Validate and truncate ──────────────────────────────
stem = stem.strip()
if not stem or len(stem) < 2:
stem = 'untitled'
max_stem = 120 - len(ext)
if len(stem) > max_stem:
# Break at word boundary
truncated = stem[:max_stem]
last_space = truncated.rfind(' ')
if last_space > max_stem * 0.6:
truncated = truncated[:last_space]
stem = truncated.rstrip('. -,')
return stem + ext
def filename_needs_sanitization(filename, doc_hash=None):
"""Return True if sanitize_filename() would change the filename."""
return sanitize_filename(filename, doc_hash) != filename
def resolve_collisions(entries):
"""Resolve filename collisions after sanitization.
Args:
entries: list of dicts, each with 'sanitized_filename', 'proposed_dir', 'hash'
Returns:
Updated entries with collision suffixes applied where needed.
Each entry gets 'collision' key (True/False) and possibly updated 'sanitized_filename'.
"""
from collections import defaultdict
# Group by (dir, lowercase filename) to find collisions
groups = defaultdict(list)
for i, e in enumerate(entries):
key = (e['proposed_dir'], e['sanitized_filename'].lower())
groups[key].append(i)
collision_count = 0
for key, indices in groups.items():
if len(indices) <= 1:
for i in indices:
entries[i]['collision'] = False
continue
# Collision — add hash suffix to all but the first
collision_count += len(indices) - 1
entries[indices[0]]['collision'] = False
for i in indices[1:]:
e = entries[i]
h6 = e['hash'][:6]
stem, ext = os.path.splitext(e['sanitized_filename'])
new_name = '{} [{}]{}'.format(stem, h6, ext)
# Re-check length
if len(new_name) > 120:
max_stem = 120 - len(ext) - 9 # 9 = len(' [XXXXXX]')
stem = stem[:max_stem].rstrip('. -,')
new_name = '{} [{}]{}'.format(stem, h6, ext)
e['sanitized_filename'] = new_name
e['collision'] = True
return entries, collision_count
def generate_download_url(filepath, library_root='/mnt/library', base_url='https://files.echo6.co'):
"""Generate a download/source URL from a document path.
For web URLs (http/https): returns the URL directly -- it's already a link.
For file paths: converts to files.echo6.co URL.
"""
if not filepath:
return ''
# Web content -- path IS the source URL
if filepath.startswith(('http://', 'https://')):
return filepath
# File content -- convert to files.echo6.co URL
rel = os.path.relpath(filepath, library_root)
parts = rel.split(os.sep)
encoded = '/'.join(quote(p) for p in parts)
return f"{base_url}/{encoded}"

324
lib/web_scraper.py Normal file
View file

@ -0,0 +1,324 @@
"""
RECON Web Scraper URL-based content ingestion.
Fetches web pages, extracts clean text, chunks into pages,
and feeds into the standard RECON enrichment pipeline.
Output format matches lib/extractor.py so the enricher
processes web content identically to PDF content.
"""
import hashlib
import json
import os
import re
import time
from datetime import datetime, timezone
from urllib.parse import urlparse, unquote
import requests
import trafilatura
from .utils import get_config, setup_logging
from .status import StatusDB
logger = setup_logging('recon.web_scraper')
# Defaults (overridden by config.yaml web_scraper section)
DEFAULT_WORDS_PER_PAGE = 2000
DEFAULT_FETCH_TIMEOUT = 30
DEFAULT_USER_AGENT = 'RECON/1.0 (Knowledge Extraction Pipeline)'
DEFAULT_RATE_LIMIT_DELAY = 1.0
def _get_scraper_config(config=None):
"""Get web scraper settings from config, with defaults."""
if config is None:
config = get_config()
ws = config.get('web_scraper', {})
return {
'words_per_page': ws.get('words_per_page', DEFAULT_WORDS_PER_PAGE),
'fetch_timeout': ws.get('fetch_timeout', DEFAULT_FETCH_TIMEOUT),
'user_agent': ws.get('user_agent', DEFAULT_USER_AGENT),
'rate_limit_delay': ws.get('rate_limit_delay', DEFAULT_RATE_LIMIT_DELAY),
'max_batch_size': ws.get('max_batch_size', 50),
}
def fetch_url(url, config=None):
"""
Fetch a URL and extract clean text + metadata using trafilatura.
Returns dict with: text, title, author, date, description, url,
sitename, raw_length, text_length.
Raises ValueError if fetch or extraction fails.
"""
sc = _get_scraper_config(config)
logger.info(f"Fetching URL: {url}")
try:
response = requests.get(
url,
headers={'User-Agent': sc['user_agent']},
timeout=sc['fetch_timeout'],
allow_redirects=True
)
response.raise_for_status()
except requests.RequestException as e:
raise ValueError(f"Failed to fetch {url}: {e}")
raw_html = response.text
if not raw_html or len(raw_html) < 100:
raise ValueError(f"Empty or too-short response from {url}")
text = trafilatura.extract(
raw_html,
include_comments=False,
include_tables=True,
include_links=False,
include_images=False,
favor_precision=False,
deduplicate=True
)
if not text or len(text.strip()) < 50:
raise ValueError(f"No meaningful text extracted from {url}")
metadata = trafilatura.extract_metadata(raw_html)
result = {
'text': text.strip(),
'title': '',
'author': '',
'date': '',
'description': '',
'url': url,
'sitename': '',
'raw_length': len(raw_html),
'text_length': len(text),
}
if metadata:
result['title'] = metadata.title or ''
result['author'] = metadata.author or ''
result['date'] = metadata.date or ''
result['description'] = metadata.description or ''
result['sitename'] = metadata.sitename or ''
if not result['title']:
result['title'] = _title_from_url(url)
logger.info(f"Extracted {result['text_length']} chars from {url}\"{result['title']}\"")
return result
def _title_from_url(url):
"""Generate a readable title from a URL as fallback."""
parsed = urlparse(url)
path = unquote(parsed.path).strip('/')
if path:
segment = path.split('/')[-1]
segment = re.sub(r'[-_]', ' ', segment)
segment = re.sub(r'\.\w+$', '', segment)
return segment.title() if segment else parsed.netloc
return parsed.netloc
def chunk_text(text, words_per_page=DEFAULT_WORDS_PER_PAGE):
"""
Split text into page-sized chunks for enrichment windows.
Breaks at paragraph boundaries. Each chunk is ~words_per_page words.
Returns list of strings (each is one "page").
"""
paragraphs = text.split('\n\n')
pages = []
current_page = []
current_words = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
para_words = len(para.split())
if para_words > words_per_page * 1.5:
if current_page:
pages.append('\n\n'.join(current_page))
current_page = []
current_words = 0
sentences = re.split(r'(?<=[.!?])\s+', para)
for sentence in sentences:
sentence_words = len(sentence.split())
if current_words + sentence_words > words_per_page and current_page:
pages.append('\n\n'.join(current_page))
current_page = [sentence]
current_words = sentence_words
else:
current_page.append(sentence)
current_words += sentence_words
elif current_words + para_words > words_per_page and current_page:
pages.append('\n\n'.join(current_page))
current_page = [para]
current_words = para_words
else:
current_page.append(para)
current_words += para_words
if current_page:
pages.append('\n\n'.join(current_page))
if not pages:
pages = [text]
return pages
def _content_hash(text):
"""MD5 hash of text content — same hash type as PDF pipeline."""
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _display_filename(url):
"""Create a display filename from a URL."""
parsed = urlparse(url)
name = f"{parsed.netloc}_{parsed.path.strip('/').replace('/', '_')}"
name = re.sub(r'[^\w._-]', '_', name)[:200]
if not name.endswith('.html'):
name += '.html'
return name
def ingest_url(url, category='Web', source='web', config=None):
"""
Full URL ingestion: fetch -> extract -> chunk -> save -> catalogue -> queue as extracted.
Returns dict with hash, title, page_count, status.
Raises ValueError on failure.
"""
if config is None:
config = get_config()
sc = _get_scraper_config(config)
db = StatusDB()
# Fetch and extract
extracted = fetch_url(url, config)
# Hash the extracted text content
doc_hash = _content_hash(extracted['text'])
# Check for duplicate in catalogue
conn = db._get_conn()
existing = conn.execute("SELECT * FROM catalogue WHERE hash = ?", (doc_hash,)).fetchone()
if existing:
# Also check documents table for status
doc = db.get_document(doc_hash)
existing_status = doc['status'] if doc else existing['status']
logger.info(f"Duplicate content (hash {doc_hash[:12]}...) — already exists as '{existing['filename']}'")
return {
'hash': doc_hash,
'status': 'duplicate',
'title': doc.get('book_title', '') if doc else existing['filename'],
'existing_status': existing_status,
}
# Chunk into pages
pages = chunk_text(extracted['text'], sc['words_per_page'])
# Save text files in extractor-compatible format:
# data/text/{hash}/page_0001.txt, page_0002.txt, ... + meta.json
text_dir = os.path.join(config['paths']['text'], doc_hash)
os.makedirs(text_dir, exist_ok=True)
for i, page_text in enumerate(pages, 1):
page_file = os.path.join(text_dir, f"page_{i:04d}.txt")
with open(page_file, 'w', encoding='utf-8') as f:
f.write(page_text)
meta = {
'hash': doc_hash,
'source_type': 'web',
'url': url,
'title': extracted['title'],
'author': extracted['author'],
'date': extracted['date'],
'description': extracted['description'],
'sitename': extracted['sitename'],
'page_count': len(pages),
'text_length': extracted['text_length'],
'fetched_at': datetime.now(timezone.utc).isoformat(),
}
with open(os.path.join(text_dir, 'meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
display_name = _display_filename(url)
# Add to catalogue
db.add_to_catalogue(doc_hash, display_name, url, extracted['text_length'], source, category)
# Queue (creates documents entry as 'queued')
db.queue_document(doc_hash)
# Advance directly to 'extracted' — text is already saved, skip PDF extraction
db.update_status(doc_hash, 'extracted',
page_count=len(pages),
pages_extracted=len(pages),
book_title=extracted['title'],
book_author=extracted['author'] or None)
logger.info(f"Ingested URL: {url} -> {doc_hash[:12]}... ({len(pages)} pages, \"{extracted['title']}\")")
return {
'hash': doc_hash,
'status': 'extracted',
'title': extracted['title'],
'author': extracted['author'],
'page_count': len(pages),
'url': url,
}
def ingest_urls(urls, category='Web', source='web', delay=None, config=None):
"""
Batch URL ingestion with rate limiting.
Returns list of result dicts (one per URL).
"""
if config is None:
config = get_config()
if delay is None:
delay = _get_scraper_config(config)['rate_limit_delay']
results = []
total = len(urls)
for i, url in enumerate(urls, 1):
url = url.strip()
if not url or url.startswith('#'):
continue
logger.info(f"[{i}/{total}] Processing: {url}")
try:
result = ingest_url(url, category=category, source=source, config=config)
result['url'] = url
results.append(result)
except Exception as e:
logger.error(f"[{i}/{total}] Failed: {url}{e}")
results.append({
'url': url,
'status': 'failed',
'error': str(e),
})
if i < total and delay > 0:
time.sleep(delay)
succeeded = sum(1 for r in results if r.get('status') not in ('failed', 'duplicate'))
failed = sum(1 for r in results if r.get('status') == 'failed')
dupes = sum(1 for r in results if r.get('status') == 'duplicate')
logger.info(f"Batch complete: {succeeded} new, {dupes} duplicates, {failed} failed out of {total}")
return results