Merge refactor branch: RECON v1.0.0

This merge integrates the complete refactor effort spanning Phases 0-6k,
bringing RECON from its initial baseline into production-grade form.

Pipeline architecture
---------------------
- Phases 0-2: foundation cleanup, removed dead code, standardized logging
- Phase 3: dispatcher rewrite — watches data/acquired/<subfolder>/ for
  {hash}.txt + {hash}.meta.json pairs, atomic .tmp rename, idempotent
- Phase 4: content processors for PDF (PyPDF2 -> pdftotext -> Tesseract ->
  Gemini Vision fallback chain), transcript, and text formats
- Phase 5: enrichment, embedding, and filing daemons split into
  independently restartable threads

PeerTube acquisition
--------------------
- Phase 6a-6c: PeerTube channel watcher, caption acquisition with rate
  limiting (429 handling), 0.5s rate_limit_delay enforced
- Phase 6d: multi-instance support
- Phase 6e: rewired then reverted dashboard PeerTube endpoint to live
  in acquisition module

Format handling & library cleanup
---------------------------------
- Phase 6f: text processor for .txt ingestion
- Phase 6f-2: format normalizer in dispatcher
- Phase 6g-6j: library reorg — ghost domain cleanup, SCL moved to
  dedicated domain folder, pi-nas fully decommissioned as a storage
  target (NFS-only now), ~73 GB reclaimed
- Phase 6k: hash-identical dedup — 2,477 duplicate PDFs removed,
  22.05 GB freed, catalogue/documents/Qdrant payloads updated
  coherently, 226 empty domain subdirs pruned
- 16,340 transcripts remain un-filed pending title-match review

Dashboard & metadata
--------------------
- Gemini "null" string bug fixed in pdf_processor metadata voting
- Dashboard upload migrated to pipeline with multi-format support

State at release
----------------
- 7 daemon threads: dispatcher, enrich, embed, filing, peertube-acq,
  progress, dashboard
- 29,201 documents in catalogue / documents tables (UNIQUE on hash PK)
- ~2.1M Qdrant vectors in recon_knowledge_hybrid (cortex:6333)
- ~67 GB library on /mnt/library (NFS from pi-nas)
- files.echo6.co serving 9,397 deduped PDFs
- recon.echo6.co dashboard + API on :8420

See cleanup-log.md for the full backlog and resolution history.
This commit is contained in:
Matt 2026-04-16 18:20:25 +00:00
commit 8d54ff165d
18 changed files with 1982 additions and 849 deletions

View file

@ -411,6 +411,7 @@ peertube:
public_url: https://stream.echo6.co # Public URL for video links
fetch_timeout: 30 # HTTP timeout for API/VTT requests
rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
# Stream B: New Library Pipeline
new_pipeline:
@ -436,5 +437,6 @@ pipeline:
pdf: pdf_processor
stream: transcript_processor
html: html_processor
text: text_processor
# mtime stability threshold for picking up files from acquired/
mtime_stability_seconds: 10

View file

224
lib/acquisition/peertube.py Normal file
View file

@ -0,0 +1,224 @@
"""
RECON PeerTube Acquisition Module
Polls PeerTube for new video transcripts and writes them as flat file pairs
into data/acquired/stream/ for the dispatcher to pick up.
Does NOT touch the database that's transcript_processor's job.
"""
import json
import os
import time
from lib.peertube_scraper import get_videos, get_captions, fetch_vtt, vtt_to_text, _get_pt_config
from lib.utils import content_hash, get_config, setup_logging
logger = setup_logging("recon.acquisition.peertube")
def _build_known_sets(db):
"""Build sets of known UUIDs and titles from catalogue.
Queries catalogue once per batch for dedup against both cohorts:
- URL-path rows: extract UUID from https://stream.echo6.co/w/{uuid}
- Library-path rows: extract title from filename column
"""
conn = db._get_conn()
rows = conn.execute(
"SELECT path, filename FROM catalogue WHERE source = 'stream.echo6.co'"
).fetchall()
known_uuids = set()
known_titles = set()
for row in rows:
path = row['path'] or ''
if '/w/' in path:
known_uuids.add(path.rsplit('/w/', 1)[-1])
fname = row['filename'] or ''
if fname.endswith('.txt'):
known_titles.add(fname[:-4])
else:
known_titles.add(fname)
return known_uuids, known_titles
def list_new_videos(db, config=None):
"""Find PeerTube videos with captions not yet in catalogue.
Returns list of (video_dict, caption_path) tuples for videos that have
captions and are not in the known UUID or title sets.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
rate_delay = ptc.get('rate_limit_delay', 0.5)
known_uuids, known_titles = _build_known_sets(db)
videos = get_videos(config=config)
new_videos = []
checked = 0
for video in videos:
if video['uuid'] in known_uuids:
continue
if video['name'] in known_titles:
continue
# Rate limit caption API calls
if checked > 0:
time.sleep(rate_delay)
checked += 1
try:
captions = get_captions(video['uuid'], config)
except Exception as e:
logger.warning("[peertube] Failed to get captions for %s: %s",
video['uuid'][:8], e)
continue
if not captions:
continue
# Prefer English caption
caption_path = None
for c in captions:
if c.get('language', {}).get('id') == 'en':
caption_path = c['captionPath']
break
if caption_path is None:
caption_path = captions[0]['captionPath']
new_videos.append((video, caption_path))
return new_videos
def acquire_one(video, caption_path, config=None):
"""Fetch transcript and write to hopper as flat files.
Returns hash string on success, None on skip/error.
Does NOT touch the database that's transcript_processor's job.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
pipeline_cfg = config.get('pipeline', {})
hopper_dir = os.path.join(
pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired'),
'stream'
)
os.makedirs(hopper_dir, exist_ok=True)
uuid = video['uuid']
# Fetch and convert VTT
vtt_content = fetch_vtt(caption_path, config)
text, cue_timestamps = vtt_to_text(vtt_content)
if not text or len(text.strip()) < 50:
logger.debug("[peertube] Transcript too short for %s (%s): %d chars",
video['name'], uuid, len(text) if text else 0)
return None
# Write text to temp file, hash it, then rename to final name
tmp_txt = os.path.join(hopper_dir, f'{uuid}.txt.tmp')
with open(tmp_txt, 'w', encoding='utf-8') as f:
f.write(text)
file_hash = content_hash(tmp_txt)
# Check if final file already exists (race condition guard)
final_txt = os.path.join(hopper_dir, f'{file_hash}.txt')
final_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json')
if os.path.exists(final_txt):
os.remove(tmp_txt)
logger.debug("[peertube] Hopper file already exists: %s", file_hash[:8])
return None
# Build sidecar metadata
video_url = f"{ptc['public_url']}/w/{uuid}"
meta = {
'title': video['name'],
'source_url': video_url,
'url': video_url,
'source': 'stream.echo6.co',
'source_type': 'transcript',
'category': 'Transcript',
'channel': video.get('channel_display', ''),
'duration': video.get('duration', 0),
'uuid': uuid,
'cue_timestamps': cue_timestamps,
}
# Write meta to tmp, then rename both atomically
# Meta first, then content — dispatcher only picks up when content file exists
tmp_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json.tmp')
with open(tmp_meta, 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
os.rename(tmp_meta, final_meta)
os.rename(tmp_txt, final_txt)
logger.info("[peertube] Acquired: %s (%s) -> %s",
video['name'], uuid[:8], file_hash[:12])
return file_hash
def acquire_batch(db, config=None):
"""One-shot: find new videos and acquire them.
Returns dict: {'acquired': N, 'skipped': N, 'errors': N}
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
rate_delay = ptc.get('rate_limit_delay', 0.5)
result = {'acquired': 0, 'skipped': 0, 'errors': 0}
try:
new_videos = list_new_videos(db, config)
except Exception as e:
logger.error("[peertube] Failed to list new videos: %s", e, exc_info=True)
result['errors'] = 1
return result
if not new_videos:
logger.debug("[peertube] No new videos found")
return result
logger.info("[peertube] Found %d new videos to acquire", len(new_videos))
for i, (video, caption_path) in enumerate(new_videos):
if i > 0:
time.sleep(rate_delay)
try:
file_hash = acquire_one(video, caption_path, config)
if file_hash:
result['acquired'] += 1
else:
result['skipped'] += 1
except Exception as e:
logger.error("[peertube] Error acquiring %s (%s): %s",
video['name'], video['uuid'][:8], e, exc_info=True)
result['errors'] += 1
return result
def acquisition_loop(stop_event, db, config, interval=1800):
"""Service loop: poll PeerTube for new transcripts every interval seconds."""
logger.info("[peertube] Acquisition loop started (interval: %ds)", interval)
while not stop_event.is_set():
try:
result = acquire_batch(db, config)
if result['acquired']:
logger.info("[peertube] Acquired %d new transcripts (%d skipped, %d errors)",
result['acquired'], result['skipped'], result['errors'])
else:
logger.debug("[peertube] No new transcripts")
except Exception as e:
logger.error("[peertube] Error: %s", e, exc_info=True)
stop_event.wait(interval)
logger.info("[peertube] Acquisition loop stopped")

View file

@ -9,6 +9,7 @@ API endpoints for all pipeline operations including crawl, ingest, and search.
Dependencies: Flask, qdrant-client, requests
Config: web, vector_db, embedding sections of config.yaml
"""
import glob
import json
import threading
import os
@ -77,25 +78,20 @@ def _format_source_citation(payload):
return book
def _resolve_upload_path(category, config):
"""Resolve the target directory for an upload given a category name."""
upload_paths = config.get('upload_paths', {})
library_root = config['library_root']
ALLOWED_EXTENSIONS = {'.pdf', '.txt', '.epub', '.doc', '.docx', '.mobi'}
if category in upload_paths:
return upload_paths[category]
default_path = upload_paths.get('default', library_root)
safe_category = secure_filename(category) if category else ''
if safe_category:
return os.path.join(default_path, safe_category)
return default_path
HOPPER_ROUTING = {
'.pdf': '/opt/recon/data/acquired/pdf/',
'.txt': '/opt/recon/data/acquired/text/',
'.epub': '/opt/recon/data/acquired/pdf/',
'.doc': '/opt/recon/data/acquired/pdf/',
'.docx': '/opt/recon/data/acquired/pdf/',
'.mobi': '/opt/recon/data/acquired/pdf/',
}
def _process_upload(filepath, original_filename, category, config, db):
"""Process a single PDF upload: hash, dedup, copy to library, catalogue, queue."""
library_root = config['library_root']
def _process_upload(filepath, original_filename, ext, category, config, db):
"""Process an upload: hash, dedup, drop into hopper for dispatcher pickup."""
file_hash = content_hash(filepath)
conn = db._get_conn()
@ -103,34 +99,39 @@ def _process_upload(filepath, original_filename, category, config, db):
if existing:
raise ValueError(f"Duplicate: file already catalogued as {existing['filename']}")
target_dir = _resolve_upload_path(category, config)
os.makedirs(target_dir, exist_ok=True)
# Also check if already sitting in a hopper dir awaiting dispatch
for hopper in HOPPER_ROUTING.values():
if any(os.path.exists(os.path.join(hopper, file_hash + e)) for e in ALLOWED_EXTENSIONS):
raise ValueError("Duplicate: file already queued for processing")
safe_name = secure_filename(original_filename)
if not safe_name:
safe_name = f"{file_hash}.pdf"
target_path = os.path.join(target_dir, safe_name)
hopper_dir = HOPPER_ROUTING.get(ext, '/opt/recon/data/acquired/pdf/')
os.makedirs(hopper_dir, exist_ok=True)
if os.path.exists(target_path):
base, ext = os.path.splitext(safe_name)
target_path = os.path.join(target_dir, f"{base}_{file_hash[:8]}{ext}")
target_path = os.path.join(hopper_dir, file_hash + ext)
meta_path = os.path.join(hopper_dir, file_hash + '.meta.json')
stem = os.path.splitext(original_filename)[0]
sidecar = {
'title': stem,
'source': 'dashboard_upload',
'source_type': ext.lstrip('.'),
'category': category,
'original_filename': original_filename,
}
# Write sidecar first (with .tmp safety), then content
tmp_meta = meta_path + '.tmp'
with open(tmp_meta, 'w', encoding='utf-8') as f:
json.dump(sidecar, f, indent=2)
os.rename(tmp_meta, meta_path)
shutil.copy2(filepath, target_path)
size = os.path.getsize(target_path)
source, derived_category = derive_source_and_category(target_path, library_root)
db.add_to_catalogue(file_hash, safe_name, target_path, size, source, derived_category)
db.queue_document(file_hash)
return {
'hash': file_hash,
'filename': safe_name,
'category': derived_category,
'source': source,
'path': target_path,
'size_bytes': size,
'status': 'queued'
'filename': original_filename,
'source_type': ext.lstrip('.'),
'status': 'queued',
}
@ -346,22 +347,23 @@ def api_upload():
if not file.filename:
return jsonify({'error': 'No file selected'}), 400
if not file.filename.lower().endswith('.pdf'):
return jsonify({'error': 'Only PDF files are accepted'}), 400
ext = os.path.splitext(file.filename)[1].lower()
if ext not in ALLOWED_EXTENSIONS:
return jsonify({'error': f'Unsupported file type: {ext}'}), 400
category = request.form.get('category', '').strip()
config = get_config()
db = StatusDB()
tmp_fd, tmp_path = tempfile.mkstemp(suffix='.pdf')
tmp_fd, tmp_path = tempfile.mkstemp(suffix=ext)
try:
file.save(tmp_path)
if os.path.getsize(tmp_path) == 0:
return jsonify({'error': 'Uploaded file is empty'}), 400
result = _process_upload(tmp_path, file.filename, category, config, db)
result = _process_upload(tmp_path, file.filename, ext, category, config, db)
return jsonify(result), 201
except ValueError as e:
@ -390,6 +392,28 @@ def api_upload_status(doc_hash):
'filename': cat['filename'],
'status': cat['status'],
})
# Check hopper dirs for files awaiting dispatcher pickup
for hopper in ('/opt/recon/data/acquired/pdf/', '/opt/recon/data/acquired/text/'):
if glob.glob(os.path.join(hopper, doc_hash + '.*')):
return jsonify({
'hash': doc_hash,
'status': 'pending',
'message': 'Waiting for dispatcher',
})
# Check processing dir
proc_dir = os.path.join(
config.get('pipeline', {}).get('processing_root', '/opt/recon/data/processing'),
doc_hash,
)
if os.path.isdir(proc_dir):
return jsonify({
'hash': doc_hash,
'status': 'processing',
'message': 'Being processed',
})
return jsonify({'error': 'Document not found'}), 404
result = {
@ -882,7 +906,11 @@ def _build_knowledge_stats():
sources = conn.execute("""
SELECT
c.source,
CASE WHEN c.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type,
CASE
WHEN c.source = 'stream.echo6.co' THEN 'transcript'
WHEN c.path LIKE 'http%' THEN 'web'
ELSE 'pdf'
END as type,
COUNT(DISTINCT c.hash) as catalogued,
COUNT(DISTINCT CASE WHEN d.status = 'complete' THEN d.hash END) as complete,
COUNT(DISTINCT CASE WHEN d.status NOT IN ('complete', 'failed') AND d.status IS NOT NULL THEN d.hash END) as in_pipeline,
@ -935,11 +963,17 @@ def _build_knowledge_stats():
""").fetchone()[0]
recent = conn.execute("""
SELECT book_title, status, concepts_extracted, vectors_inserted,
CASE WHEN path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type
FROM documents
WHERE status = 'complete'
ORDER BY embedded_at DESC
SELECT COALESCE(d.book_title, c.filename) as title,
d.status, d.concepts_extracted, d.vectors_inserted,
CASE
WHEN c.source = 'stream.echo6.co' THEN 'transcript'
WHEN d.path LIKE 'http%' THEN 'web'
ELSE 'pdf'
END as type
FROM documents d
JOIN catalogue c ON d.hash = c.hash
WHERE d.status = 'complete'
ORDER BY d.embedded_at DESC
LIMIT 10
""").fetchall()
@ -979,7 +1013,7 @@ def _build_knowledge_stats():
'source_types': dict(sorted(source_type_counts.items(), key=lambda x: -x[1])),
'sample_size': sample_size,
'recent_complete': [{
'title': r['book_title'] or 'Untitled',
'title': r['title'] or 'Untitled',
'concepts': r['concepts_extracted'] or 0,
'vectors': r['vectors_inserted'] or 0,
'type': r['type'],

View file

@ -1,432 +0,0 @@
"""
RECON Site Crawler URL discovery for bulk web ingestion.
Two discovery strategies:
1. Sitemap-based (preferred) parses sitemap.xml for all URLs
2. Link-following (fallback) crawls from root URL following internal links
Discovered URLs are fed into web_scraper.ingest_url() for processing.
"""
import re
import time
from collections import deque
from urllib.parse import urlparse, urljoin, urldefrag
import requests
from lxml import etree
from .utils import get_config, setup_logging
logger = setup_logging('recon.crawler')
def _get_crawler_config(config=None):
"""Load crawler config with defaults."""
if config is None:
config = get_config()
crawler_cfg = config.get('crawler', {})
web_cfg = config.get('web_scraper', {})
return {
'user_agent': (
crawler_cfg.get('user_agent') or
web_cfg.get('user_agent') or
'Mozilla/5.0 (compatible; RECON/1.0)'
),
'fetch_timeout': crawler_cfg.get('fetch_timeout', 30),
'rate_limit_delay': crawler_cfg.get('rate_limit_delay', 1.0),
'max_pages': crawler_cfg.get('max_pages', 500),
'max_depth': crawler_cfg.get('max_depth', 3),
'default_exclude': crawler_cfg.get('default_exclude', [
'/search', '/404', '/login', '/signup', '/auth/', '/api/', '/assets/', '/static/'
]),
}
# ─── Sitemap Discovery ─────────────────────────────────────────────
def discover_sitemap_url(base_url, config=None):
"""
Find the sitemap URL for a site.
Checks: robots.txt Sitemap: directive, /sitemap.xml,
/sitemap_index.xml, /sitemap-0.xml.
Returns sitemap URL or None.
"""
cfg = _get_crawler_config(config)
headers = {'User-Agent': cfg['user_agent']}
parsed = urlparse(base_url)
root = f"{parsed.scheme}://{parsed.netloc}"
# Check robots.txt first
try:
resp = requests.get(
f"{root}/robots.txt",
headers=headers,
timeout=cfg['fetch_timeout']
)
if resp.status_code == 200:
for line in resp.text.splitlines():
if line.strip().lower().startswith('sitemap:'):
sitemap_url = line.split(':', 1)[1].strip()
# Handle "Sitemap: https://..." — split(':',1) keeps the URL intact
# but "Sitemap: https://..." splits into "Sitemap" and " https://..."
# Need to rejoin properly
if not sitemap_url.startswith('http'):
sitemap_url = line[line.index(':') + 1:].strip()
logger.info(f"Found sitemap in robots.txt: {sitemap_url}")
return sitemap_url
except Exception as e:
logger.debug(f"robots.txt fetch failed: {e}")
# Try common sitemap locations
candidates = [
f"{root}/sitemap.xml",
f"{root}/sitemap_index.xml",
f"{root}/sitemap-0.xml",
]
for url in candidates:
try:
resp = requests.head(
url,
headers=headers,
timeout=cfg['fetch_timeout'],
allow_redirects=True
)
if resp.status_code == 200:
logger.info(f"Found sitemap at: {url}")
return url
except Exception:
continue
logger.warning(f"No sitemap found for {base_url}")
return None
def parse_sitemap(sitemap_url, config=None):
"""
Parse a sitemap XML and return all page URLs.
Handles standard sitemaps (<urlset>) and sitemap indexes
(<sitemapindex>) with recursive sub-sitemap fetching.
"""
cfg = _get_crawler_config(config)
headers = {'User-Agent': cfg['user_agent']}
all_urls = []
def _fetch_and_parse(url, depth=0):
if depth > 3:
return
try:
resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout'])
resp.raise_for_status()
except Exception as e:
logger.error(f"Failed to fetch sitemap {url}: {e}")
return
try:
root = etree.fromstring(resp.content)
except etree.XMLSyntaxError as e:
logger.error(f"Invalid XML in sitemap {url}: {e}")
return
nsmap = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
# Check if this is a sitemap index
sitemap_locs = root.findall('.//ns:sitemap/ns:loc', nsmap)
if sitemap_locs:
logger.info(f"Sitemap index at {url}{len(sitemap_locs)} sub-sitemaps")
for loc in sitemap_locs:
if loc.text:
_fetch_and_parse(loc.text.strip(), depth + 1)
return
# Standard sitemap — extract URLs
url_locs = root.findall('.//ns:loc', nsmap)
# Fallback: try without namespace
if not url_locs:
url_locs = root.findall('.//loc')
for loc in url_locs:
if loc.text:
all_urls.append(loc.text.strip())
logger.info(f"Parsed {len(url_locs)} URLs from {url}")
_fetch_and_parse(sitemap_url)
# Deduplicate preserving order
seen = set()
unique = []
for url in all_urls:
url_clean = urldefrag(url)[0]
if url_clean not in seen:
seen.add(url_clean)
unique.append(url_clean)
logger.info(f"Total unique URLs from sitemap: {len(unique)}")
return unique
# ─── Link-Following Discovery (Fallback) ───────────────────────────
def crawl_links(base_url, max_depth=3, max_pages=500, config=None):
"""
Discover URLs by following internal links (BFS).
Fallback when no sitemap is available.
"""
from bs4 import BeautifulSoup
cfg = _get_crawler_config(config)
headers = {'User-Agent': cfg['user_agent']}
parsed_base = urlparse(base_url)
base_domain = parsed_base.netloc
discovered = []
visited = set()
queue = deque([(base_url, 0)])
skip_extensions = (
'.pdf', '.png', '.jpg', '.jpeg', '.gif', '.svg',
'.css', '.js', '.zip', '.tar', '.gz', '.mp4', '.mp3',
'.ico', '.woff', '.woff2', '.ttf', '.eot',
)
skip_paths = (
'/tag/', '/tags/', '/page/', '/feed/', '/rss/',
'/wp-json/', '/wp-admin/', '/wp-includes/',
)
while queue and len(discovered) < max_pages:
url, depth = queue.popleft()
url = urldefrag(url)[0]
if url in visited:
continue
if depth > max_depth:
continue
visited.add(url)
discovered.append(url)
if depth >= max_depth:
continue
try:
resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout'])
if resp.status_code != 200:
continue
if 'text/html' not in resp.headers.get('content-type', ''):
continue
except Exception:
continue
try:
soup = BeautifulSoup(resp.text, 'lxml')
except Exception:
continue
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
full_url = urljoin(url, href)
full_url = urldefrag(full_url)[0]
parsed = urlparse(full_url)
if parsed.netloc != base_domain:
continue
if any(parsed.path.lower().endswith(ext) for ext in skip_extensions):
continue
if any(skip in parsed.path.lower() for skip in skip_paths):
continue
if full_url not in visited:
queue.append((full_url, depth + 1))
time.sleep(cfg['rate_limit_delay'])
logger.info(f"Link crawl: {len(discovered)} URLs (visited {len(visited)}, depth {max_depth})")
return discovered
# ─── URL Filtering ──────────────────────────────────────────────────
def filter_urls(urls, include=None, exclude=None):
"""
Filter URLs by path prefix include/exclude rules.
include: URL must match at least one prefix (if provided)
exclude: URL must not match any prefix
"""
filtered = []
for url in urls:
path = urlparse(url).path
if include:
if not any(path.startswith(prefix) for prefix in include):
continue
if exclude:
if any(path.startswith(prefix) for prefix in exclude):
continue
filtered.append(url)
logger.info(f"Filtered {len(urls)} -> {len(filtered)} URLs "
f"(include={include}, exclude={exclude})")
return filtered
# ─── Main Crawl Orchestrator ────────────────────────────────────────
def crawl_site(
base_url,
category='Web',
source=None,
include=None,
exclude=None,
max_pages=None,
max_depth=None,
delay=None,
dry_run=False,
use_sitemap=True,
use_links=True,
config=None,
):
"""
Crawl a site and ingest all discovered pages.
1. Discover URLs via sitemap or link-following
2. Apply include/exclude filters
3. Feed each URL through web_scraper.ingest_url()
Returns summary dict with counts and per-URL results.
"""
if config is None:
config = get_config()
cfg = _get_crawler_config(config)
if max_pages is None:
max_pages = cfg['max_pages']
if max_depth is None:
max_depth = cfg['max_depth']
if delay is None:
delay = cfg['rate_limit_delay']
if source is None:
source = urlparse(base_url).netloc
logger.info(f"Crawling {base_url} (category={category}, max_pages={max_pages})")
# ── Phase 1: Discover URLs ──
urls = []
discovery_method = None
if use_sitemap:
sitemap_url = discover_sitemap_url(base_url, config)
if sitemap_url:
urls = parse_sitemap(sitemap_url, config)
discovery_method = 'sitemap'
if not urls and use_links:
logger.info("No sitemap URLs, falling back to link crawl...")
urls = crawl_links(base_url, max_depth=max_depth, max_pages=max_pages, config=config)
discovery_method = 'link_crawl'
if not urls:
logger.warning(f"No URLs discovered for {base_url}")
return {
'site': base_url,
'discovery_method': None,
'urls_discovered': 0,
'urls_after_filter': 0,
'results': [],
'summary': {'total': 0, 'succeeded': 0, 'duplicates': 0, 'failed': 0},
}
# ── Phase 2: Filter URLs ──
all_exclude = list(cfg['default_exclude'])
if exclude:
all_exclude.extend(exclude)
urls = filter_urls(urls, include=include, exclude=all_exclude)
if len(urls) > max_pages:
logger.info(f"Limiting to {max_pages} pages (discovered {len(urls)})")
urls = urls[:max_pages]
logger.info(f"After filtering: {len(urls)} URLs to process")
# ── Dry run ──
if dry_run:
return {
'site': base_url,
'discovery_method': discovery_method,
'dry_run': True,
'urls_discovered': len(urls),
'urls': urls,
}
# ── Phase 3: Ingest each URL ──
from .web_scraper import ingest_url
results = []
total = len(urls)
for i, url in enumerate(urls, 1):
logger.info(f"[{i}/{total}] Ingesting: {url}")
try:
result = ingest_url(url, category=category, source=source, config=config)
result['url'] = url
results.append(result)
status = result.get('status', 'unknown')
title = result.get('title', '')
if status == 'duplicate':
logger.info(f" DUPLICATE: {title}")
else:
logger.info(f" OK: {title} ({result.get('page_count', 0)} pages)")
except Exception as e:
logger.error(f" FAILED: {url} -- {e}")
results.append({
'url': url,
'status': 'failed',
'error': str(e),
})
if i < total and delay > 0:
time.sleep(delay)
# ── Summary ──
succeeded = sum(1 for r in results if r.get('status') not in ('failed', 'duplicate'))
duplicates = sum(1 for r in results if r.get('status') == 'duplicate')
failed = sum(1 for r in results if r.get('status') == 'failed')
summary = {
'total': len(results),
'succeeded': succeeded,
'duplicates': duplicates,
'failed': failed,
}
logger.info(f"Crawl complete: {succeeded} new, {duplicates} duplicates, {failed} failed out of {total}")
return {
'site': base_url,
'domain': urlparse(base_url).netloc,
'category': category,
'discovery_method': discovery_method,
'urls_discovered': total,
'results': results,
'summary': summary,
}

237
lib/dispatcher.py Normal file
View file

@ -0,0 +1,237 @@
"""
RECON Dispatcher
Watches configured acquired/<subfolder>/ directories for content+sidecar pairs
that have been stable (mtime unchanged) for the configured threshold, then
hands them to the appropriate processor's pre_flight().
Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5.
Phase 4: sidecar is optional (PDFs may arrive without .meta.json).
Phase 6f-2: format normalizer converts non-standard formats to PDF before dispatch.
"""
import importlib
import logging
import os
import subprocess
import time
from .utils import get_config
from .status import StatusDB
logger = logging.getLogger("recon.dispatcher")
# Content file extensions recognized by the dispatcher
CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'}
# Non-standard formats that can be converted to PDF before dispatch
CONVERTIBLE_EXTENSIONS = {'.epub', '.mobi', '.doc', '.docx'}
def _load_processor(processor_name):
"""Dynamically import a processor module from lib.processors."""
module_path = f"lib.processors.{processor_name}"
try:
return importlib.import_module(module_path)
except ModuleNotFoundError:
logger.debug("Processor module not found: %s (not yet implemented)", processor_name)
return None
except ImportError as e:
logger.error("Failed to import processor %s: %s", processor_name, e)
return None
def _normalize_formats(subfolder_path):
"""Convert non-standard document formats to PDF before dispatch.
Walks the subfolder for files with convertible extensions (.epub, .mobi,
.doc, .docx). Converts each to PDF using the appropriate tool, then
deletes the original.
Returns count of files converted.
"""
if not os.path.isdir(subfolder_path):
return 0
converted = 0
for fname in sorted(os.listdir(subfolder_path)):
stem, ext = os.path.splitext(fname)
if ext.lower() not in CONVERTIBLE_EXTENSIONS:
continue
source = os.path.join(subfolder_path, fname)
target = os.path.join(subfolder_path, stem + '.pdf')
if os.path.exists(target):
logger.debug("Target PDF already exists, skipping: %s", fname)
continue
try:
if ext.lower() in ('.epub', '.mobi'):
subprocess.run(
['ebook-convert', source, target],
capture_output=True, check=True, timeout=300,
)
elif ext.lower() in ('.doc', '.docx'):
subprocess.run(
['libreoffice', '--headless', '--convert-to', 'pdf',
'--outdir', subfolder_path, source],
capture_output=True, check=True, timeout=300,
)
if os.path.isfile(target) and os.path.getsize(target) > 0:
os.remove(source)
converted += 1
logger.info("Converted %s -> %s.pdf", fname, stem)
else:
logger.warning("Conversion produced no output: %s", fname)
except subprocess.TimeoutExpired:
logger.error("Conversion timed out: %s", fname)
except subprocess.CalledProcessError as e:
logger.error("Conversion failed for %s: %s", fname,
e.stderr.decode(errors='replace')[:200] if e.stderr else str(e))
except Exception as e:
logger.error("Unexpected error converting %s: %s", fname, e)
return converted
def _find_pairs(subfolder_path):
"""Find content files (with optional sidecar) in a subfolder.
A pair is:
<basename>.<ext> content file
<basename>.meta.json optional sidecar
Returns list of (content_path, meta_path_or_None, basename) tuples.
"""
if not os.path.isdir(subfolder_path):
return []
files = set(os.listdir(subfolder_path))
pairs = []
seen_basenames = set()
# First pass: find .meta.json files and their matching content
for fname in sorted(files):
if fname.endswith('.meta.json'):
basename = fname[:-len('.meta.json')]
for ext in sorted(CONTENT_EXTENSIONS):
content_name = basename + ext
if content_name in files:
pairs.append((
os.path.join(subfolder_path, content_name),
os.path.join(subfolder_path, fname),
basename,
))
seen_basenames.add(content_name)
break
# Second pass: find solo content files (no sidecar)
for fname in sorted(files):
if fname in seen_basenames:
continue
_stem, ext = os.path.splitext(fname)
if ext.lower() in CONTENT_EXTENSIONS and not fname.endswith('.meta.json'):
pairs.append((
os.path.join(subfolder_path, fname),
None,
_stem,
))
return pairs
def _is_stable(filepath, stability_seconds):
"""Check if a file's mtime is older than stability_seconds ago."""
try:
mtime = os.path.getmtime(filepath)
return (time.time() - mtime) >= stability_seconds
except OSError:
return False
def dispatch_once():
"""One-shot dispatch: scan all configured acquired/ subfolders once.
Returns list of result dicts from processor pre_flight calls.
"""
config = get_config()
pipeline_cfg = config.get('pipeline', {})
acquired_root = pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired')
dispatch_map = pipeline_cfg.get('dispatch', {})
stability_seconds = pipeline_cfg.get('mtime_stability_seconds', 10)
db = StatusDB(config['paths']['db'])
results = []
for subfolder_name, processor_name in dispatch_map.items():
subfolder_path = os.path.join(acquired_root, subfolder_name)
processor = _load_processor(processor_name)
if processor is None:
continue
if not hasattr(processor, 'pre_flight'):
logger.error("Processor %s has no pre_flight function", processor_name)
continue
# Convert non-standard formats to PDF before scanning for pairs
_normalize_formats(subfolder_path)
pairs = _find_pairs(subfolder_path)
if not pairs:
continue
for content_path, meta_path, basename in pairs:
# Content file must be stable; sidecar too if present
if not _is_stable(content_path, stability_seconds):
logger.debug("File %s not yet stable, skipping", basename)
continue
if meta_path and not _is_stable(meta_path, stability_seconds):
logger.debug("Sidecar for %s not yet stable, skipping", basename)
continue
logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name)
try:
result = processor.pre_flight(content_path, meta_path, db, config)
results.append(result)
logger.info("Result for %s: %s", basename, result.get('action', 'unknown'))
except Exception as e:
logger.error("Processor %s crashed on %s: %s", processor_name, basename, e)
results.append({
'action': 'error',
'error': str(e),
'basename': basename,
})
return results
def dispatch_loop(stop_event, db, config, interval=30):
"""Run dispatch_once() on a loop until stop_event is set.
Designed to run as a service thread. Never raises to the caller.
"""
logger.info("[dispatcher] Loop started (interval: %ds)", interval)
while not stop_event.is_set():
try:
results = dispatch_once()
if results:
actions = {}
for r in results:
a = r.get('action', 'unknown')
actions[a] = actions.get(a, 0) + 1
logger.info("[dispatcher] Dispatched %d items: %s",
len(results),
", ".join(f"{k}={v}" for k, v in sorted(actions.items())))
else:
logger.debug("[dispatcher] No items to dispatch")
except Exception as e:
logger.error("[dispatcher] Error in dispatch_once: %s", e, exc_info=True)
stop_event.wait(interval)
logger.info("[dispatcher] Loop stopped")

View file

@ -21,6 +21,7 @@ from qdrant_client.models import PointStruct, SparseVector
from .utils import get_config, concept_id, generate_download_url, setup_logging
from .status import StatusDB
from .utils import resolve_text_dir
logger = setup_logging('recon.embedder')
@ -274,7 +275,7 @@ def embed_single(file_hash, db, config):
source_type = 'web' if is_web else 'document'
# Check meta.json for explicit source_type (e.g. 'transcript')
text_dir = os.path.join(config['paths']['text'], file_hash)
text_dir = resolve_text_dir(file_hash, config, db)
meta_path = os.path.join(text_dir, 'meta.json')
page_timestamps = {}
if os.path.exists(meta_path):

View file

@ -25,6 +25,7 @@ import google.generativeai as genai
from .utils import get_config, setup_logging
from .status import StatusDB
from .utils import resolve_text_dir
logger = setup_logging('recon.enricher')
@ -345,7 +346,7 @@ def enrich_single(file_hash, db, config, key_rotator):
if not doc:
return False
text_dir = os.path.join(config['paths']['text'], file_hash)
text_dir = resolve_text_dir(file_hash, config, db)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
window_size = config['processing']['enrich_window_size']
delay = config['processing']['rate_limit_delay']

215
lib/filing.py Normal file
View file

@ -0,0 +1,215 @@
"""
RECON shared filing logic.
Provides file_processed_item() the shared function that any processor
can call to file a completed item from /opt/recon/data/processing/{hash}/
into /mnt/library/Domain/Subdomain/{canonical_name}.{ext}.
The function:
1. Reads dominant domain from concept JSONs (existing logic)
2. Derives canonical name (level 1, escalating to 2/3/4 only on collision)
3. Moves the source file from processing/ to library/Domain/Subdomain/
4. Updates catalogue + documents + Qdrant payloads atomically
5. Marks organized
This function does NOT extract, enrich, or embed. Those are upstream stages.
This function does NOT touch the legacy organize_document() that stays in place
until cutover (Phase 5).
Phase 2: function exists, is tested in isolation. Not yet called by anything
in the service loop.
"""
import logging
import os
import shutil
from .organizer import determine_dominant_domain, _build_target_path
from .new_pipeline import update_qdrant_payload
logger = logging.getLogger("recon.filing")
def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False):
"""File a completed item into the library.
Args:
doc_hash: Document hash
source_file_path: Current absolute path to the source file
(typically in /opt/recon/data/processing/{hash}/ or current library path)
db: StatusDB instance
config: RECON config dict
dry_run: If True, plan but don't move
Returns:
dict with keys:
hash, action, source_path, target_path,
domain, subdomain, qdrant_points_updated, error
"""
result = {
"hash": doc_hash,
"action": "skip",
"source_path": source_file_path,
"target_path": None,
"domain": None,
"subdomain": None,
"qdrant_points_updated": 0,
"error": None,
}
# Verify source file exists
if not os.path.exists(source_file_path):
result["action"] = "error"
result["error"] = f"Source file not found: {source_file_path}"
return result
# Determine domain from existing concept JSONs
data_dir = config["paths"]["data"]
domain, subdomain, confidence = determine_dominant_domain(doc_hash, data_dir)
result["domain"] = domain
result["subdomain"] = subdomain
if domain is None:
result["action"] = "skip_unclassified"
return result
# Get the original filename from catalogue
conn = db._get_conn()
row = conn.execute(
"SELECT filename FROM catalogue WHERE hash = ?", (doc_hash,)
).fetchone()
if not row:
result["action"] = "error"
result["error"] = f"Hash not in catalogue: {doc_hash}"
return result
original_filename = row["filename"]
# Build target path using existing collision-handling logic
library_root = config["library_root"]
target_path, sanitized_name = _build_target_path(
library_root, domain, subdomain, original_filename, doc_hash
)
if target_path is None:
result["action"] = "skip_unclassified"
return result
# Fix 1.1: Preserve the source file's actual extension instead of
# the default .pdf that sanitize_filename() may have applied
source_ext = os.path.splitext(source_file_path)[1].lower()
if source_ext:
target_stem, _old_ext = os.path.splitext(target_path)
target_path = target_stem + source_ext
san_stem, _old_ext = os.path.splitext(sanitized_name)
sanitized_name = san_stem + source_ext
result["target_path"] = target_path
# If already at target (idempotency), just mark organized
if os.path.abspath(source_file_path) == os.path.abspath(target_path):
result["action"] = "skip_already_filed"
if not dry_run:
db.mark_organized(doc_hash)
return result
if dry_run:
result["action"] = "would_file"
return result
# Move the file
try:
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
shutil.move(source_file_path, target_path)
except Exception as e:
result["action"] = "error"
result["error"] = f"Move failed: {e}"
logger.error("Move failed for %s: %s", doc_hash[:8], e)
return result
# Update DB and Qdrant
try:
db.update_catalogue_path(doc_hash, target_path, sanitized_name)
db.sync_document_path(doc_hash, target_path, sanitized_name)
db.mark_organized(doc_hash)
# Update Qdrant payloads (download_url, filename, original_filename)
points = update_qdrant_payload(
doc_hash, target_path, sanitized_name, original_filename, config
)
result["qdrant_points_updated"] = points
result["action"] = "filed"
logger.info(
"Filed %s -> %s [%s/%s, %d vectors]",
doc_hash[:8],
target_path,
domain,
subdomain,
points,
)
except Exception as e:
# File was moved but DB update failed — log the dangerous state
result["action"] = "error"
result["error"] = f"DB/Qdrant update failed after move: {e}"
logger.error("DB/Qdrant update failed for %s: %s", doc_hash[:8], e)
return result
def filing_worker_loop(stop_event, db, config, interval=30):
"""Run filing on items ready to be filed until stop_event is set.
Watches for documents with status='complete', organized_at IS NULL,
and path in /opt/recon/data/processing/. Files them to library.
Designed to run as a service thread. Never raises to the caller.
"""
logger.info("[filing] Worker started (interval: %ds)", interval)
while not stop_event.is_set():
try:
conn = db._get_conn()
rows = conn.execute(
"SELECT hash, path FROM documents "
"WHERE status = 'complete' "
"AND organized_at IS NULL "
"AND path LIKE '/opt/recon/data/processing/%' "
"LIMIT 50"
).fetchall()
if rows:
filed = 0
skipped = 0
errors = 0
for row in rows:
if stop_event.is_set():
break
try:
result = file_processed_item(row['hash'], row['path'], db, config)
action = result.get('action', 'unknown')
if action == 'filed':
filed += 1
elif action.startswith('skip'):
skipped += 1
elif action == 'error':
errors += 1
logger.warning("[filing] Error filing %s: %s",
row['hash'][:8], result.get('error', 'unknown'))
except Exception as e:
errors += 1
logger.error("[filing] Exception filing %s: %s",
row['hash'][:8], e, exc_info=True)
logger.info("[filing] Batch: %d filed, %d skipped, %d errors",
filed, skipped, errors)
else:
logger.debug("[filing] No items ready to file")
except Exception as e:
logger.error("[filing] Error in filing worker: %s", e, exc_info=True)
stop_event.wait(interval)
logger.info("[filing] Worker stopped")

View file

View file

@ -0,0 +1,665 @@
"""
RECON PDF Processor
Handles pre_flight for PDF content arriving in acquired/pdf/.
Opens the PDF, extracts layered metadata (PDF dict + filename + Gemini),
votes on fields, checks level-4 dedupe, runs full text extraction,
and registers in the database.
Metadata sources:
A. PDF info dictionary (PyPDF2 reader.metadata)
B. Filename parsing (clean_filename_to_title + regex patterns)
C. Gemini LLM on first 3 pages of extracted text
Voting: if 2+ sources agree on a field, use that value.
Otherwise priority: C (Gemini) > A (PDF dict) > B (filename).
Level-4 dedupe: requires ALL FOUR fields (title, author, edition, year)
present and matching an existing document. Very conservative only
catches near-certain duplicates.
Failure modes:
- Transient (Gemini API): retry 3x with backoff, then continue without
- Content (unreadable PDF): move to _review/rejected_pdfs/
- Level-4 duplicate: move to _review/duplicate_quarantine/
Phase 4: first implementation.
"""
import json
import logging
import os
import re
import shutil
import subprocess
import time
import google.generativeai as genai
from PyPDF2 import PdfReader
from lib.extractor import extract_text_from_page
from lib.utils import content_hash, clean_filename_to_title, sanitize_filename
logger = logging.getLogger("recon.processors.pdf")
# Maximum retries for transient (API) failures
MAX_TRANSIENT_RETRIES = 3
TRANSIENT_BACKOFF_SECONDS = 30
# ── Metadata Extraction Sources ─────────────────────────────────────
def _extract_pdf_dict(reader):
"""Source A: Extract metadata from PDF's built-in info dictionary.
Returns dict with keys: title, author, edition, year (any may be None).
"""
result = {'title': None, 'author': None, 'edition': None, 'year': None}
try:
info = reader.metadata
if not info:
return result
except Exception:
return result
# Title
title = None
try:
title = info.get('/Title') or info.title
except Exception:
pass
if title and isinstance(title, str) and len(title.strip()) > 2:
result['title'] = title.strip()
# Author
author = None
try:
author = info.get('/Author') or info.author
except Exception:
pass
if author and isinstance(author, str) and len(author.strip()) > 1:
result['author'] = author.strip()
# Year from CreationDate or ModDate
for date_key in ('/CreationDate', '/ModDate'):
try:
date_val = info.get(date_key)
except Exception:
continue
if date_val and isinstance(date_val, str):
match = re.search(r'(?:D:)?(\d{4})', date_val)
if match:
year = int(match.group(1))
if 1800 <= year <= 2030:
result['year'] = str(year)
break
# Edition from Subject or Title
for field_val in [info.get('/Subject'), title]:
if field_val and isinstance(field_val, str):
ed_match = re.search(
r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)',
field_val, re.IGNORECASE
)
if ed_match:
result['edition'] = ed_match.group(0).strip()
break
return result
def _extract_filename_metadata(filename):
"""Source B: Extract metadata by parsing the filename.
Returns dict with keys: title, author, edition, year (any may be None).
"""
result = {'title': None, 'author': None, 'edition': None, 'year': None}
stem = os.path.splitext(filename)[0]
# Title from filename (using existing utility)
result['title'] = clean_filename_to_title(filename)
# Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY near boundaries
year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem)
if year_match:
result['year'] = year_match.group(1)
# Edition: look for Nth Edition, Edition N, etc.
ed_match = re.search(
r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)',
stem, re.IGNORECASE
)
if ed_match:
result['edition'] = ed_match.group(0).strip()
# Author: look for "by Author Name" pattern
by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem)
if by_match:
result['author'] = by_match.group(1).strip()
return result
def _extract_gemini_metadata(pages_text, config):
"""Source C: Extract metadata using Gemini on first 3 pages text.
Returns dict with keys: title, author, edition, year (any may be None).
Retries up to MAX_TRANSIENT_RETRIES times on transient failures.
"""
result = {'title': None, 'author': None, 'edition': None, 'year': None}
keys = config.get('gemini_keys', [])
if not keys or len(pages_text.strip()) < 50:
return result
prompt = (
"Extract the following metadata from this book/document text.\n"
'Return JSON: {"title": "...", "author": "...", "edition": "...", "year": "..."}\n'
"- title: The full title of the book or document\n"
"- author: The author(s) name(s)\n"
'- edition: The edition (e.g. "2nd Edition", "Revised Edition")\n'
"- year: The publication year (4-digit number as string)\n"
"If a field cannot be determined, use null.\n\n"
"Text from first pages:\n"
+ pages_text[:6000]
)
for attempt in range(MAX_TRANSIENT_RETRIES):
try:
key = keys[attempt % len(keys)]
genai.configure(api_key=key)
model = genai.GenerativeModel(
config['gemini']['model'],
generation_config={
"response_mime_type": config['gemini']['response_mime_type']
}
)
response = model.generate_content(prompt)
data = json.loads(response.text)
for field in ('title', 'author', 'edition', 'year'):
val = data.get(field)
if val and isinstance(val, str) and val.strip() and val.strip().lower() != "null":
result[field] = val.strip()
return result
except Exception as e:
logger.warning(
"Gemini metadata attempt %d/%d failed: %s",
attempt + 1, MAX_TRANSIENT_RETRIES, e
)
if attempt < MAX_TRANSIENT_RETRIES - 1:
time.sleep(TRANSIENT_BACKOFF_SECONDS)
logger.warning(
"Gemini metadata extraction failed after %d attempts", MAX_TRANSIENT_RETRIES
)
return result
# ── Voting ───────────────────────────────────────────────────────────
def _vote_metadata(source_a, source_b, source_c):
"""Vote on each metadata field across three sources.
Priority: C (Gemini) > A (PDF dict) > B (filename).
If 2+ sources agree, use the agreed value.
Returns:
(voted_dict, provenance_dict)
voted_dict: {field: value_or_None}
provenance_dict: {field: source_label_or_None}
"""
sources = {'pdf_dict': source_a, 'filename': source_b, 'gemini': source_c}
priority = ['gemini', 'pdf_dict', 'filename']
voted = {}
provenance = {}
for field in ('title', 'author', 'edition', 'year'):
values = {}
for name, src in sources.items():
val = src.get(field)
if val and str(val).strip().lower() != "null":
values[name] = val
if not values:
voted[field] = None
provenance[field] = None
continue
# Normalize for comparison
def _norm(v):
if field == 'year':
return v.strip()
return v.strip().lower()
# Group by normalized value
norm_groups = {}
for name, val in values.items():
nv = _norm(val)
norm_groups.setdefault(nv, []).append((name, val))
# Find group with most agreement
best_group = max(norm_groups.values(), key=len)
if len(best_group) >= 2:
# 2+ sources agree — use highest-priority original case
for p in priority:
for name, val in best_group:
if name == p:
voted[field] = val
names = ','.join(n for n, _ in best_group)
provenance[field] = "agreed({})".format(names)
break
if voted.get(field) is not None:
break
else:
# No agreement — highest priority wins
for p in priority:
if p in values:
voted[field] = values[p]
provenance[field] = p
break
return voted, provenance
# ── Level-4 Dedupe ───────────────────────────────────────────────────
def _check_level4_dedupe(voted, db):
"""Level-4 strict dedupe: all four fields must be present and match.
Returns (is_duplicate, matching_hash) or (False, None).
"""
title = voted.get('title')
author = voted.get('author')
edition = voted.get('edition')
year = voted.get('year')
if not all([title, author, edition, year]):
return False, None
conn = db._get_conn()
rows = conn.execute(
"SELECT hash, metadata_provenance FROM documents "
"WHERE book_title = ? AND book_author = ?",
(title, author)
).fetchall()
if not rows:
return False, None
for row in rows:
prov_json = row['metadata_provenance']
if not prov_json:
continue
try:
prov_data = json.loads(prov_json)
existing_voted = prov_data.get('voted', {})
ex_edition = existing_voted.get('edition', '')
ex_year = existing_voted.get('year', '')
if (ex_edition and ex_year
and ex_edition.lower() == edition.lower()
and ex_year == year):
return True, row['hash']
except (json.JSONDecodeError, AttributeError):
continue
return False, None
# ── Page Count Fallback ──────────────────────────────────────────────
def _pdfinfo_page_count(pdf_path):
"""Get page count via pdfinfo (poppler) when PdfReader fails."""
try:
proc = subprocess.run(
['pdfinfo', pdf_path],
capture_output=True, text=True, timeout=30
)
if proc.returncode == 0:
for line in proc.stdout.splitlines():
if line.startswith('Pages:'):
return int(line.split(':', 1)[1].strip())
except Exception:
pass
return 0
# ── Pre-Flight ───────────────────────────────────────────────────────
def pre_flight(content_path, meta_path, db, config):
"""Process a PDF from acquired/pdf/.
Args:
content_path: Path to the PDF file
meta_path: Path to .meta.json sidecar (may be None)
db: StatusDB instance
config: RECON config dict
Returns:
dict with keys: hash, action, source_path, error
Actions: 'extracted', 'duplicate', 'level4_duplicate',
'content_failure', 'error'
"""
result = {
'hash': None,
'action': 'error',
'source_path': content_path,
'error': None,
}
filename = os.path.basename(content_path)
# ── Step 1: Hash ──────────────────────────────────────────────
try:
file_hash = content_hash(content_path)
result['hash'] = file_hash
except Exception as e:
result['error'] = "Cannot hash PDF: {}".format(e)
return result
# ── Step 2: Stale state cleanup ───────────────────────────────
processing_root = config.get('pipeline', {}).get(
'processing_root', '/opt/recon/data/processing'
)
proc_dir = os.path.join(processing_root, file_hash)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
if os.path.exists(proc_dir):
try:
shutil.rmtree(proc_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", proc_dir, e)
raise
if os.path.exists(concepts_dir):
try:
shutil.rmtree(concepts_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", concepts_dir, e)
raise
# ── Step 3: Hash dedupe ───────────────────────────────────────
conn = db._get_conn()
existing = conn.execute(
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if existing:
logger.info("Duplicate hash %s, removing pair", file_hash[:8])
try:
os.remove(content_path)
if meta_path:
os.remove(meta_path)
except OSError as e:
logger.warning("Failed to remove duplicate pair: %s", e)
result['action'] = 'duplicate'
return result
# ── Step 4: Size check ────────────────────────────────────────
proc_cfg = config.get('processing', {})
max_size_mb = proc_cfg.get('max_pdf_size_mb', 200)
try:
file_size_mb = os.path.getsize(content_path) / 1048576
except OSError as e:
result['error'] = "Cannot stat PDF: {}".format(e)
return result
if file_size_mb > max_size_mb:
result['error'] = "PDF too large: {:.0f}MB > {}MB limit".format(
file_size_mb, max_size_mb
)
result['action'] = 'content_failure'
_move_to_rejected(content_path, meta_path, config, filename)
return result
# ── Step 5: Open PDF ──────────────────────────────────────────
reader = None
page_count = 0
try:
reader = PdfReader(content_path)
page_count = len(reader.pages)
except Exception as e:
logger.warning(
"PdfReader failed for %s: %s — trying pdfinfo", filename, e
)
page_count = _pdfinfo_page_count(content_path)
if page_count == 0:
logger.error("Content failure: cannot read PDF %s", filename)
result['action'] = 'content_failure'
result['error'] = "Cannot read PDF (0 pages)"
_move_to_rejected(content_path, meta_path, config, filename)
return result
# ── Step 6: Source A — PDF dict metadata ──────────────────────
source_a = _extract_pdf_dict(reader) if reader else {
'title': None, 'author': None, 'edition': None, 'year': None
}
# ── Step 7: Source B — Filename metadata ──────────────────────
source_b = _extract_filename_metadata(filename)
# ── Step 8: Extract first 3 pages for Source C ────────────────
page_timeout = proc_cfg.get('page_timeout', 30)
first_pages_text = ""
if reader:
for i in range(min(3, page_count)):
try:
text, _method = extract_text_from_page(
reader, i, content_path, page_timeout
)
first_pages_text += text + "\n"
except Exception as e:
logger.warning(
"Failed to extract page %d for metadata: %s", i + 1, e
)
# ── Step 9: Source C — Gemini metadata ────────────────────────
source_c = _extract_gemini_metadata(first_pages_text, config)
# ── Step 10: Vote ─────────────────────────────────────────────
voted, provenance = _vote_metadata(source_a, source_b, source_c)
provenance_record = {
'voted': voted,
'provenance': provenance,
'sources': {
'pdf_dict': source_a,
'filename': source_b,
'gemini': source_c,
}
}
# ── Step 11: Level-4 dedupe ───────────────────────────────────
is_dup, dup_hash = _check_level4_dedupe(voted, db)
if is_dup:
logger.info(
"Level-4 duplicate: %s matches %s", file_hash[:8], dup_hash[:8]
)
quarantine_dir = os.path.join(
config.get('library_root', '/mnt/library'),
'_review', 'duplicate_quarantine'
)
try:
os.makedirs(quarantine_dir, exist_ok=True)
quarantine_path = os.path.join(quarantine_dir, filename)
shutil.move(content_path, quarantine_path)
if meta_path:
os.remove(meta_path)
san_name = sanitize_filename(filename, doc_hash=file_hash)
db.queue_duplicate_review(
doc_hash=file_hash,
original_filename=filename,
sanitized_filename=san_name,
collision_with_hash=dup_hash,
duplicate_path=quarantine_path,
book_title=voted.get('title'),
book_author=voted.get('author'),
)
except Exception as e:
logger.error("Failed to quarantine duplicate: %s", e)
result['action'] = 'level4_duplicate'
return result
# ── Step 12: Set up processing directory ──────────────────────
try:
os.makedirs(proc_dir, exist_ok=True)
except Exception as e:
result['error'] = "Cannot create processing dir: {}".format(e)
return result
pdf_proc_path = os.path.join(proc_dir, 'source.pdf')
try:
shutil.move(content_path, pdf_proc_path)
if meta_path:
shutil.move(meta_path, os.path.join(proc_dir, 'sidecar.meta.json'))
except Exception as e:
result['error'] = "Cannot move PDF to processing: {}".format(e)
return result
# ── Step 13: Full text extraction ─────────────────────────────
# Re-open reader from new location
try:
reader = PdfReader(pdf_proc_path)
except Exception:
reader = None
pages_extracted = 0
ocr_pages = []
ocr_methods = {
'pypdf2': 0, 'pdftotext': 0, 'tesseract': 0,
'gemini_vision': 0, 'none': 0,
}
for i in range(page_count):
try:
if reader:
text, method = extract_text_from_page(
reader, i, pdf_proc_path, page_timeout
)
else:
proc_result = subprocess.run(
['pdftotext', '-f', str(i + 1), '-l', str(i + 1),
pdf_proc_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
text = proc_result.stdout if proc_result.returncode == 0 else ''
method = 'pdftotext' if text.strip() else 'none'
ocr_methods[method] += 1
if method in ('tesseract', 'gemini_vision'):
ocr_pages.append(i + 1)
except Exception as e:
logger.warning("Page %d/%d failed: %s", i + 1, page_count, e)
text = ''
ocr_methods['none'] += 1
page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i + 1))
with open(page_path, 'w', encoding='utf-8') as f:
f.write(text)
if text.strip():
pages_extracted += 1
if (i + 1) % 50 == 0:
logger.info(" %s: page %d/%d", filename, i + 1, page_count)
# ── Step 14: Write meta.json ──────────────────────────────────
meta = {
'hash': file_hash,
'filename': filename,
'source_type': 'pdf',
'page_count': page_count,
'pages_extracted': pages_extracted,
'ocr_pages': ocr_pages,
'ocr_methods': ocr_methods,
'metadata': voted,
'metadata_provenance': provenance_record,
}
# Merge sidecar meta if present
sidecar_path = os.path.join(proc_dir, 'sidecar.meta.json')
if os.path.exists(sidecar_path):
try:
with open(sidecar_path, encoding='utf-8') as f:
sidecar = json.load(f)
meta['sidecar'] = sidecar
except Exception:
pass
with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
# ── Step 15: Register in catalogue + documents ────────────────
display_title = voted.get('title') or clean_filename_to_title(filename)
size_bytes = os.path.getsize(pdf_proc_path)
source = 'acquired/pdf'
category = 'PDF'
if meta.get('sidecar'):
source = meta['sidecar'].get('source', source)
category = meta['sidecar'].get('category', category)
db.add_to_catalogue(
file_hash, filename, pdf_proc_path, size_bytes, source, category
)
db.queue_document(file_hash)
# ── Step 16: Update documents row ─────────────────────────────
conn = db._get_conn()
conn.execute(
"UPDATE documents SET "
"text_dir = ?, page_count = ?, book_title = ?, book_author = ?, "
"metadata_provenance = ? "
"WHERE hash = ?",
(
proc_dir,
page_count,
voted.get('title'),
voted.get('author'),
json.dumps(provenance_record),
file_hash,
)
)
conn.commit()
# ── Step 17: Status = extracted ───────────────────────────────
db.update_status(file_hash, 'extracted', pages_extracted=pages_extracted)
logger.info(
"PDF pre_flight complete: %s (%s) -> %d/%d pages in %s",
file_hash[:8], display_title, pages_extracted, page_count, proc_dir,
)
result['action'] = 'extracted'
return result
# ── Helpers ──────────────────────────────────────────────────────────
def _move_to_rejected(content_path, meta_path, config, filename):
"""Move an unreadable PDF to _review/rejected_pdfs/."""
review_dir = os.path.join(
config.get('library_root', '/mnt/library'),
'_review', 'rejected_pdfs'
)
try:
os.makedirs(review_dir, exist_ok=True)
reject_path = os.path.join(review_dir, filename)
shutil.move(content_path, reject_path)
if meta_path:
os.remove(meta_path)
logger.info("Moved rejected PDF to %s", reject_path)
except Exception as e:
logger.error("Failed to move rejected PDF: %s", e)

View file

@ -0,0 +1,320 @@
"""
RECON Text Processor
Handles pre_flight for plain .txt files arriving in acquired/text/.
These are primary source documents (books, manuals, guides), not derived
content like transcripts.
Metadata extraction via two-source vote:
A. Filename parsing (title, optionally author)
B. Gemini LLM extraction (title/author/edition/year from first 3 pages)
Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/
by the filing worker (NOT organized in-place like transcripts).
Phase 6f: initial implementation.
"""
import json
import logging
import os
import re
import shutil
from lib.web_scraper import chunk_text
from lib.utils import content_hash, clean_filename_to_title
from lib.processors.pdf_processor import _extract_gemini_metadata
logger = logging.getLogger("recon.processors.text")
WORDS_PER_PAGE = 2000
def _extract_filename_metadata(filename):
"""Source A: Extract metadata by parsing the filename.
Returns dict with keys: title, author, edition, year (any may be None).
"""
result = {'title': None, 'author': None, 'edition': None, 'year': None}
stem = os.path.splitext(filename)[0]
result['title'] = clean_filename_to_title(filename)
# Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY
year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem)
if year_match:
result['year'] = year_match.group(1)
# Edition: Nth Edition, Edition N, etc.
ed_match = re.search(
r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)',
stem, re.IGNORECASE
)
if ed_match:
result['edition'] = ed_match.group(0).strip()
# Author: "Title - Author" or "Title by Author" patterns
# Try " - Author" at end
dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem)
if dash_match:
result['author'] = dash_match.group(1).strip()
else:
by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem)
if by_match:
result['author'] = by_match.group(1).strip()
return result
def _vote_metadata(source_a, source_b):
"""Vote on each metadata field across two sources.
Priority: B (Gemini) > A (filename).
If both agree, note agreement.
Returns:
(voted_dict, provenance_record)
"""
sources = {'filename': source_a, 'gemini': source_b}
priority = ['gemini', 'filename']
voted = {}
provenance = {}
for field in ('title', 'author', 'edition', 'year'):
values = {}
for name, src in sources.items():
val = src.get(field)
if val and str(val).strip().lower() != 'null':
values[name] = val
if not values:
voted[field] = None
provenance[field] = None
continue
def _norm(v):
if field == 'year':
return v.strip()
return v.strip().lower()
norm_groups = {}
for name, val in values.items():
nv = _norm(val)
norm_groups.setdefault(nv, []).append((name, val))
best_group = max(norm_groups.values(), key=len)
if len(best_group) >= 2:
# Both sources agree
for p in priority:
for name, val in best_group:
if name == p:
voted[field] = val
names = ','.join(n for n, _ in best_group)
provenance[field] = "agreed({})".format(names)
break
if voted.get(field) is not None:
break
else:
# No agreement — highest priority wins
for p in priority:
if p in values:
voted[field] = values[p]
provenance[field] = p
break
provenance_record = {
'voted': voted,
'provenance': provenance,
'sources': {
'filename': source_a,
'gemini': source_b,
}
}
return voted, provenance_record
def pre_flight(content_path, meta_path, db, config):
"""Process a .txt file dropped into acquired/text/.
Args:
content_path: Path to the .txt file
meta_path: Path to .meta.json sidecar (may be None)
db: StatusDB instance
config: RECON config dict
Returns:
dict with keys: hash, action, source_path, error
Actions: 'extracted', 'duplicate', 'skip_empty', 'error'
"""
result = {
'hash': None,
'action': 'error',
'source_path': content_path,
'error': None,
}
filename = os.path.basename(content_path)
# ── Step 1: Hash ──────────────────────────────────────────────
try:
file_hash = content_hash(content_path)
result['hash'] = file_hash
except Exception as e:
result['error'] = "Cannot hash text file: {}".format(e)
return result
# ── Step 2: Stale state cleanup ───────────────────────────────
processing_root = config.get('pipeline', {}).get(
'processing_root', '/opt/recon/data/processing'
)
proc_dir = os.path.join(processing_root, file_hash)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
if os.path.exists(proc_dir):
try:
shutil.rmtree(proc_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", proc_dir, e)
raise
if os.path.exists(concepts_dir):
try:
shutil.rmtree(concepts_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", concepts_dir, e)
raise
# ── Step 3: Hash dedupe ───────────────────────────────────────
conn = db._get_conn()
existing = conn.execute(
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if existing:
logger.info("Duplicate hash %s, removing pair", file_hash[:8])
try:
os.remove(content_path)
if meta_path:
os.remove(meta_path)
except OSError as e:
logger.warning("Failed to remove duplicate pair: %s", e)
result['action'] = 'duplicate'
return result
# ── Step 4: Read text content ─────────────────────────────────
try:
with open(content_path, encoding='utf-8', errors='replace') as f:
raw_text = f.read()
except Exception as e:
result['error'] = "Cannot read text file: {}".format(e)
return result
if len(raw_text.strip()) < 50:
logger.info("Text file too short (%d chars), skipping: %s",
len(raw_text.strip()), filename)
try:
os.remove(content_path)
if meta_path:
os.remove(meta_path)
except OSError:
pass
result['action'] = 'skip_empty'
return result
# ── Step 5: Read optional sidecar ─────────────────────────────
sidecar = None
if meta_path:
try:
with open(meta_path, encoding='utf-8') as f:
sidecar = json.load(f)
except Exception as e:
logger.warning("Cannot read sidecar %s: %s", meta_path, e)
# ── Step 6: Set up processing directory ───────────────────────
try:
os.makedirs(proc_dir, exist_ok=True)
except Exception as e:
result['error'] = "Cannot create processing dir: {}".format(e)
return result
# ── Step 7: Move files to processing ──────────────────────────
source_path = os.path.join(proc_dir, 'source.txt')
try:
shutil.move(content_path, source_path)
if meta_path:
shutil.move(meta_path, os.path.join(proc_dir, 'meta.json'))
except Exception as e:
result['error'] = "Cannot move files to processing: {}".format(e)
return result
# ── Step 8: Split into pages ──────────────────────────────────
pages = chunk_text(raw_text, WORDS_PER_PAGE)
for i, page_text in enumerate(pages, start=1):
page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i))
with open(page_path, 'w', encoding='utf-8') as f:
f.write(page_text)
# ── Step 9: Source A — filename metadata ──────────────────────
source_a = _extract_filename_metadata(filename)
# ── Step 10: Source B — Gemini metadata ───────────────────────
first_pages_text = "\n".join(pages[:3])
source_b = _extract_gemini_metadata(first_pages_text, config)
# ── Step 11: Vote ─────────────────────────────────────────────
voted, provenance_record = _vote_metadata(source_a, source_b)
# ── Step 12: Write meta.json ──────────────────────────────────
meta = {
'hash': file_hash,
'filename': filename,
'source_type': 'text',
'page_count': len(pages),
'text_length': len(raw_text),
'metadata': voted,
'metadata_provenance': provenance_record,
}
if sidecar:
meta['sidecar'] = sidecar
with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
# ── Step 13: Register in catalogue ────────────────────────────
display_title = voted.get('title') or clean_filename_to_title(filename)
size_bytes = os.path.getsize(source_path)
db.add_to_catalogue(
file_hash, filename, source_path, size_bytes, 'text', 'Document'
)
# ── Step 14: Queue and update documents row ───────────────────
db.queue_document(file_hash)
conn = db._get_conn()
conn.execute(
"UPDATE documents SET "
"text_dir = ?, page_count = ?, path = ?, "
"book_title = ?, book_author = ?, metadata_provenance = ? "
"WHERE hash = ?",
(
proc_dir,
len(pages),
source_path,
voted.get('title'),
voted.get('author'),
json.dumps(provenance_record),
file_hash,
)
)
conn.commit()
# ── Step 15: Status = extracted ───────────────────────────────
db.update_status(file_hash, 'extracted', pages_extracted=len(pages))
logger.info(
"Text pre_flight complete: %s (%s) -> %d pages in %s",
file_hash[:8], display_title, len(pages), proc_dir,
)
result['action'] = 'extracted'
return result

View file

@ -0,0 +1,175 @@
"""
RECON Transcript Processor
Handles pre_flight for transcript content arriving in acquired/stream/.
Reads a raw text file + meta.json sidecar, hashes, dedupes, splits into
page_NNNN.txt files, and registers in the database.
Phase 3: first processor implementation.
Phase 4: added stale state cleanup at start of pre_flight.
"""
import hashlib
import json
import logging
import os
import shutil
from lib.web_scraper import chunk_text
from lib.utils import content_hash
logger = logging.getLogger("recon.processors.transcript")
# Words per page for transcript chunking (matches existing pipeline)
WORDS_PER_PAGE = 2000
def pre_flight(content_path, meta_path, db, config):
"""Process a transcript pair from acquired/stream/.
Args:
content_path: Path to the raw transcript .txt file
meta_path: Path to the .meta.json sidecar
db: StatusDB instance
config: RECON config dict
Returns:
dict with keys: hash, action, source_path, error
Actions: 'extracted', 'duplicate', 'error'
"""
result = {
'hash': None,
'action': 'error',
'source_path': content_path,
'error': None,
}
# Read and hash the content file
try:
file_hash = content_hash(content_path)
result['hash'] = file_hash
except Exception as e:
result['error'] = f"Cannot hash content file: {e}"
return result
# Stale state cleanup — remove any pre-existing processing/concepts dirs
processing_root = config.get('pipeline', {}).get(
'processing_root', '/opt/recon/data/processing'
)
proc_dir = os.path.join(processing_root, file_hash)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
if os.path.exists(proc_dir):
try:
shutil.rmtree(proc_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", proc_dir, e)
raise
if os.path.exists(concepts_dir):
try:
shutil.rmtree(concepts_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", concepts_dir, e)
raise
# Hash dedupe: if hash exists in catalogue, delete the pair and return
conn = db._get_conn()
existing = conn.execute(
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if existing:
logger.info("Duplicate hash %s, removing pair", file_hash[:8])
try:
os.remove(content_path)
if meta_path:
os.remove(meta_path)
except OSError as e:
logger.warning("Failed to remove duplicate pair: %s", e)
result['action'] = 'duplicate'
return result
# Read meta.json sidecar
try:
with open(meta_path, encoding='utf-8') as f:
meta = json.load(f)
except Exception as e:
result['error'] = f"Cannot read meta.json: {e}"
return result
# Read raw transcript text
try:
with open(content_path, encoding='utf-8') as f:
raw_text = f.read()
except Exception as e:
result['error'] = f"Cannot read content file: {e}"
return result
if not raw_text.strip():
result['error'] = "Empty transcript"
return result
# Set up processing directory
try:
os.makedirs(proc_dir, exist_ok=True)
except Exception as e:
result['error'] = f"Cannot create processing dir: {e}"
return result
# Move the pair to processing/{hash}/
try:
shutil.move(content_path, os.path.join(proc_dir, 'transcript.txt'))
shutil.move(meta_path, os.path.join(proc_dir, 'meta.json'))
except Exception as e:
result['error'] = f"Cannot move pair to processing: {e}"
return result
# Split raw text into page_NNNN.txt files
pages = chunk_text(raw_text, WORDS_PER_PAGE)
for i, page_text in enumerate(pages, start=1):
page_path = os.path.join(proc_dir, f"page_{i:04d}.txt")
with open(page_path, 'w', encoding='utf-8') as f:
f.write(page_text)
# Update meta.json with processing metadata
meta['hash'] = file_hash
meta['source_type'] = meta.get('source_type', 'transcript')
meta['page_count'] = len(pages)
meta['text_length'] = len(raw_text)
meta_out_path = os.path.join(proc_dir, 'meta.json')
with open(meta_out_path, 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
# Register in catalogue
title = meta.get('title', os.path.basename(content_path))
source_url = meta.get('source_url', meta.get('url', ''))
source = 'stream.echo6.co'
category = meta.get('category', 'Transcript')
size_bytes = os.path.getsize(os.path.join(proc_dir, 'transcript.txt'))
db.add_to_catalogue(file_hash, title, source_url, size_bytes, source, category)
# Queue and advance to extracted
db.queue_document(file_hash)
# Set text_dir and page_count on the documents row.
# Transcripts are derived text from PeerTube videos, not primary sources.
# They don't get filed into library/Domain/Subdomain/ like PDFs -- instead,
# they're marked organized in-place. Their watch URL remains in catalogue.path
# and Qdrant download_url so users clicking search results go to PeerTube.
# The filing worker's path LIKE filter naturally excludes transcripts since
# their documents.path is the watch URL, not a filesystem path.
conn = db._get_conn()
conn.execute(
"UPDATE documents SET text_dir = ?, page_count = ?, organized_at = CURRENT_TIMESTAMP WHERE hash = ?",
(proc_dir, len(pages), file_hash)
)
conn.commit()
# Update status to extracted with page count
db.update_status(file_hash, 'extracted', pages_extracted=len(pages))
logger.info(
"Transcript pre_flight complete: %s (%s) -> %d pages in %s",
file_hash[:8], title, len(pages), proc_dir,
)
result['action'] = 'extracted'
return result

View file

@ -388,3 +388,19 @@ def generate_download_url(filepath, library_root='/mnt/library', base_url='https
parts = rel.split(os.sep)
encoded = '/'.join(quote(p) for p in parts)
return f"{base_url}/{encoded}"
def resolve_text_dir(file_hash, config, db=None):
"""Resolve the text directory for a document.
If db is provided and documents.text_dir is set for this hash, use that.
Otherwise fall back to the legacy location: config['paths']['text']/{hash}/
"""
if db is not None:
conn = db._get_conn()
row = conn.execute(
"SELECT text_dir FROM documents WHERE hash = ?", (file_hash,)
).fetchone()
if row and row['text_dir']:
return row['text_dir']
return os.path.join(config['paths']['text'], file_hash)

397
recon.py
View file

@ -3,7 +3,7 @@
RECON CLI Main entry point.
Subcommands: scan, queue, extract, enrich, embed, run, search, upload,
ingest-url, crawl, ingest-peertube, organize, status, catalogue, failures, validate, rebuild, serve, ingest.
ingest-url, ingest-peertube, organize, status, catalogue, failures, validate, rebuild, serve, ingest.
Usage: cd /opt/recon && source venv/bin/activate && python3 recon.py <command>
"""
@ -580,73 +580,6 @@ def cmd_ingest_url(args):
def cmd_crawl(args):
from lib.crawler import crawl_site
print(f"Crawling {args.url}...")
if args.include:
print(f" Include paths: {args.include}")
if args.exclude:
print(f" Exclude paths: {args.exclude}")
if args.dry_run:
print(f" DRY RUN — no content will be ingested")
print()
result = crawl_site(
base_url=args.url,
category=args.category,
source=args.source,
include=args.include,
exclude=args.exclude,
max_pages=args.max_pages,
max_depth=args.max_depth,
delay=args.delay,
dry_run=args.dry_run,
use_sitemap=not args.no_sitemap,
)
method = result.get('discovery_method', 'none')
print(f"Discovery method: {method}")
if args.dry_run:
urls = result.get('urls', [])
print(f"Found {len(urls)} URLs that would be ingested:\n")
for i, url in enumerate(urls, 1):
print(f" {i:4d}. {url}")
print(f"\nTotal: {len(urls)} pages")
print(f"Re-run without --dry-run to ingest.")
return 0
summary = result.get('summary', {})
print(f"\nResults:")
print(f" New: {summary.get('succeeded', 0)}")
print(f" Duplicates: {summary.get('duplicates', 0)}")
print(f" Failed: {summary.get('failed', 0)}")
print(f" Total: {summary.get('total', 0)}")
failed_results = [r for r in result.get('results', []) if r.get('status') == 'failed']
if failed_results:
print(f"\nFailed URLs:")
for r in failed_results[:10]:
print(f" {r['url']}: {r.get('error', 'Unknown error')}")
if len(failed_results) > 10:
print(f" ... and {len(failed_results) - 10} more")
if args.enrich or args.process:
print("\nRunning enrichment...")
from lib.enricher import run_enrichment
enriched = run_enrichment()
print(f" Enriched: {enriched}")
if args.process:
print("\nRunning embedding...")
from lib.embedder import run_embedding
embedded = run_embedding()
print(f" Embedded: {embedded}")
return 0
def cmd_validate(args):
from scripts.validate import run_validation
run_validation(deep=args.deep)
@ -668,30 +601,31 @@ def cmd_serve(args):
def cmd_service(args):
"""Run RECON as a long-lived service. Called by systemd.
Bundles: Flask dashboard + pipeline stages + library scanner + PeerTube scanner + progress reporter.
Bundles: Flask dashboard + dispatcher + pipeline stages + filing worker + progress reporter.
All threads are daemon threads; SIGTERM/SIGINT trigger graceful shutdown.
"""
from lib.extractor import run_extraction
from lib.enricher import run_enrichment
from lib.embedder import run_embedding
from lib.api import app, run_server as start_dashboard
from lib.peertube_scraper import ingest_all as pt_ingest_all
from lib.dispatcher import dispatch_loop
from lib.filing import filing_worker_loop
from lib.acquisition.peertube import acquisition_loop
config = get_config()
proc = config.get('processing', {})
svc = config.get('service', {})
extract_workers = proc.get('extract_workers', 4)
enrich_workers = proc.get('enrich_workers', 16)
embed_workers = proc.get('embed_workers', 4)
poll_interval = svc.get('stage_poll_interval', 30)
scan_interval = svc.get('scan_interval', 3600)
dispatch_interval = svc.get('dispatch_interval', 30)
filing_interval = svc.get('filing_interval', 30)
progress_interval = svc.get('progress_interval', 60)
web_host = config.get('web', {}).get('host', '0.0.0.0')
web_port = config.get('web', {}).get('port', 8420)
stop_event = threading.Event()
totals = {'extract': 0, 'enrich': 0, 'embed': 0, 'scan': 0}
totals = {'enrich': 0, 'embed': 0}
def shutdown(signum, frame):
sig_name = signal.Signals(signum).name
@ -716,36 +650,6 @@ def cmd_service(args):
stop_event.wait(poll_interval)
logger.info(f"[{name}] Stage stopped (total: {totals[name]})")
def scanner_loop():
"""Periodically scan library and queue new documents."""
logger.info(f"[scanner] Started (interval: {scan_interval}s)")
# Run initial scan immediately
try:
new_cat = scan_library()
new_queued = queue_all()
synced = sync_qdrant_paths()
if new_cat or new_queued or synced:
logger.info(f"[scanner] Initial: {new_cat} catalogued, {new_queued} queued, {synced} paths synced")
except Exception as e:
logger.error(f"[scanner] Initial scan error: {e}", exc_info=True)
while not stop_event.is_set():
stop_event.wait(scan_interval)
if stop_event.is_set():
break
try:
new_cat = scan_library()
new_queued = queue_all()
synced = sync_qdrant_paths()
totals['scan'] += new_queued
if new_cat or new_queued or synced:
logger.info(f"[scanner] {new_cat} catalogued, {new_queued} queued, {synced} paths synced")
else:
logger.debug("[scanner] No new documents")
except Exception as e:
logger.error(f"[scanner] Error: {e}", exc_info=True)
logger.info("[scanner] Stopped")
def progress_loop():
"""Log pipeline status periodically."""
while not stop_event.is_set():
@ -769,217 +673,20 @@ def cmd_service(args):
except Exception:
pass
def peertube_scanner_loop():
"""Periodically ingest new PeerTube video transcripts."""
from lib.peertube_scraper import set_stop_check
set_stop_check(stop_event.is_set)
logger.info(f"[peertube] Scanner started (interval: {scan_interval}s)")
try:
result = pt_ingest_all(config=config)
ingested = result.get('ingested', 0) if isinstance(result, dict) else 0
if ingested:
logger.info(f"[peertube] Initial scan: {ingested} transcripts ingested")
else:
logger.info("[peertube] Initial scan: no new transcripts")
except Exception as e:
logger.error(f"[peertube] Initial scan error: {e}", exc_info=True)
while not stop_event.is_set():
stop_event.wait(scan_interval)
if stop_event.is_set():
break
try:
result = pt_ingest_all(config=config)
ingested = result.get('ingested', 0) if isinstance(result, dict) else 0
if ingested:
logger.info(f"[peertube] {ingested} new transcripts ingested")
else:
logger.debug("[peertube] No new transcripts")
except Exception as e:
logger.error(f"[peertube] Scan error: {e}", exc_info=True)
logger.info("[peertube] Scanner stopped")
def crawler_scheduler_loop():
"""Scheduled site crawler — crawls configured sites, one per cycle.
- Reads sites from config.yaml crawler.sites list
- Processes sites in tier order (lower tier = higher priority)
- Tracks last-crawled timestamp per site in crawl_state.json
- Skips sites crawled within recrawl_interval_days (default 7)
- Waits inter_site_cooldown seconds between sites (default 30)
- Uses per-site delay for rate limiting during crawl
"""
import json as _json
from lib.crawler import crawl_site
crawler_cfg = config.get('crawler', {})
sites = crawler_cfg.get('sites', [])
if not sites:
logger.info("[crawler] No sites configured, scheduler disabled")
return
recrawl_days = crawler_cfg.get('recrawl_interval_days', 7)
cooldown = crawler_cfg.get('inter_site_cooldown', 30)
state_file = os.path.join(config.get('paths', {}).get('data', '/opt/recon/data'), 'crawl_state.json')
# Load persisted state
def load_state():
try:
with open(state_file, 'r') as sf:
return _json.load(sf)
except (FileNotFoundError, _json.JSONDecodeError):
return {}
def save_state(state):
try:
with open(state_file, 'w') as sf:
_json.dump(state, sf, indent=2)
except Exception as e:
logger.error(f"[crawler] Failed to save state: {e}")
# Sort by tier (lower = higher priority)
sorted_sites = sorted(sites, key=lambda s: s.get('tier', 99))
logger.info(f"[crawler] Scheduler started — {len(sorted_sites)} sites configured, "
f"recrawl every {recrawl_days}d, {cooldown}s cooldown between sites")
# Initial delay — let the main pipeline stabilize before crawling
stop_event.wait(60)
if stop_event.is_set():
return
while not stop_event.is_set():
state = load_state()
now = time.time()
crawled_this_cycle = 0
for site in sorted_sites:
if stop_event.is_set():
break
url = site.get('url', '')
if not url:
continue
# Check recrawl interval
last_crawled = state.get(url, {}).get('last_crawled', 0)
age_days = (now - last_crawled) / 86400
if age_days < recrawl_days:
logger.debug(f"[crawler] Skipping {url} — crawled {age_days:.1f}d ago")
continue
category = site.get('category', 'Web')
max_depth = site.get('max_depth', crawler_cfg.get('max_depth', 3))
max_pages = site.get('max_pages', crawler_cfg.get('max_pages', 500))
delay = site.get('delay', crawler_cfg.get('rate_limit_delay', 1.0))
tier = site.get('tier', 99)
notes = site.get('notes', '')
logger.info(f"[crawler] Starting: {url} (tier {tier}, category={category}, "
f"depth={max_depth}, pages={max_pages}, delay={delay}s)")
try:
result = crawl_site(
base_url=url,
category=category,
max_pages=max_pages,
max_depth=max_depth,
delay=delay,
config=config,
)
summary = result.get('summary', {})
method = result.get('discovery_method', 'none')
logger.info(f"[crawler] Done: {url} via {method}"
f"{summary.get('succeeded', 0)} new, "
f"{summary.get('duplicates', 0)} dupes, "
f"{summary.get('failed', 0)} failed")
# Update state
state[url] = {
'last_crawled': time.time(),
'method': method,
'succeeded': summary.get('succeeded', 0),
'duplicates': summary.get('duplicates', 0),
'failed': summary.get('failed', 0),
'tier': tier,
'category': category,
}
save_state(state)
crawled_this_cycle += 1
except Exception as e:
logger.error(f"[crawler] Failed: {url}{e}", exc_info=True)
state[url] = {
'last_crawled': time.time(),
'error': str(e),
'tier': tier,
'category': category,
}
save_state(state)
# Inter-site cooldown
if not stop_event.is_set():
logger.debug(f"[crawler] Cooling down {cooldown}s before next site...")
stop_event.wait(cooldown)
if crawled_this_cycle:
logger.info(f"[crawler] Cycle complete — {crawled_this_cycle} sites crawled")
else:
logger.debug("[crawler] Cycle complete — all sites within recrawl window")
# Sleep until next cycle check (1 hour)
if not stop_event.is_set():
stop_event.wait(3600)
logger.info("[crawler] Scheduler stopped")
def organizer_loop():
"""Organize completed documents into Domain/Subdomain folders."""
from lib.organizer import organize_document
logger.info(f"[organizer] Started (interval: {poll_interval}s)")
# Initial delay — let pipeline process some docs first
stop_event.wait(60)
while not stop_event.is_set():
try:
org_db = StatusDB()
unorganized = org_db.get_unorganized(limit=100)
if unorganized:
moved = 0
errors = 0
for doc in unorganized:
if stop_event.is_set():
break
result = organize_document(doc['hash'], org_db, config)
if result['action'] == 'moved':
moved += 1
elif result['action'] == 'error':
errors += 1
if moved or errors:
logger.info(f"[organizer] Organized {moved} documents ({errors} errors)")
# Sync paths to Qdrant after batch
if moved > 0:
synced = sync_qdrant_paths()
if synced:
logger.info(f"[organizer] Synced {synced} paths to Qdrant")
else:
logger.debug("[organizer] No unorganized documents")
except Exception as e:
logger.error(f"[organizer] Error: {e}", exc_info=True)
stop_event.wait(poll_interval)
logger.info("[organizer] Stopped")
db = StatusDB()
threads = [
threading.Thread(target=stage_loop, daemon=True, name='extract',
args=('extract', lambda: run_extraction(workers=extract_workers))),
threading.Thread(target=lambda: dispatch_loop(stop_event, db, config, interval=dispatch_interval),
daemon=True, name='dispatcher'),
threading.Thread(target=stage_loop, daemon=True, name='enrich',
args=('enrich', lambda: run_enrichment(workers=enrich_workers))),
threading.Thread(target=stage_loop, daemon=True, name='embed',
args=('embed', lambda: run_embedding(workers=embed_workers))),
threading.Thread(target=scanner_loop, daemon=True, name='scanner'),
threading.Thread(target=peertube_scanner_loop, daemon=True, name='peertube'),
threading.Thread(target=crawler_scheduler_loop, daemon=True, name='crawler'),
threading.Thread(target=organizer_loop, daemon=True, name='organizer'),
threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval),
daemon=True, name='filing'),
threading.Thread(target=lambda: acquisition_loop(stop_event, db, config,
interval=config.get("peertube", {}).get("poll_interval", 1800)),
daemon=True, name="peertube-acq"),
threading.Thread(target=progress_loop, daemon=True, name='progress'),
threading.Thread(target=lambda: start_dashboard(stop_event),
daemon=True, name='dashboard'),
@ -987,10 +694,11 @@ def cmd_service(args):
logger.info("=== RECON Service Starting ===")
logger.info(f" Dashboard: {web_host}:{web_port}")
logger.info(f" Workers: extract={extract_workers}, enrich={enrich_workers}, embed={embed_workers}")
logger.info(f" Scanner: every {scan_interval}s | Progress: every {progress_interval}s")
crawler_sites = config.get('crawler', {}).get('sites', [])
logger.info(f" Crawler: {len(crawler_sites)} sites configured")
logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}")
logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s")
pt_interval = config.get("peertube", {}).get("poll_interval", 1800)
logger.info(f" PeerTube acquisition: every {pt_interval}s")
logger.info(f" Progress: every {progress_interval}s")
for t in threads:
t.start()
@ -1018,10 +726,10 @@ def cmd_service(args):
logger.info("=== RECON Service Stopped ===")
return 0
def cmd_ingest_peertube(args):
from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats
from lib.peertube_scraper import get_instance_stats
from lib.acquisition.peertube import acquire_batch
from lib.status import StatusDB
if args.stats:
stats = get_instance_stats()
@ -1034,36 +742,20 @@ def cmd_ingest_peertube(args):
print(f" {status}: {count}")
return 0
since = args.since if hasattr(args, 'since') and args.since else None
db = StatusDB()
if args.channel:
print(f"Ingesting PeerTube channel: {args.channel}")
result = ingest_channel(args.channel, since=since)
else:
print("Ingesting all PeerTube videos with captions")
result = ingest_all(since=since)
print("Acquiring PeerTube transcripts to hopper...")
result = acquire_batch(db)
summary = result.get('summary', {})
print(f"\nResults:")
print(f" Checked: {summary.get('total_checked', 0)}")
print(f" Ingested: {summary.get('ingested', 0)} ({summary.get('total_pages', 0)} pages)")
print(f" No captions: {summary.get('skipped_no_captions', 0)}")
print(f" Duplicates: {summary.get('skipped_duplicate', 0)}")
print(f" Failed: {summary.get('failed', 0)}")
print(f" Acquired: {result['acquired']}")
print(f" Skipped: {result['skipped']}")
print(f" Errors: {result['errors']}")
# Optional: run enrichment
if args.enrich or args.process:
print("\nRunning enrichment on extracted content...")
from lib.enricher import run_enrichment
enriched = run_enrichment()
print(f" Enriched: {enriched}")
# Optional: run embedding too
if args.process:
print("\nRunning embedding...")
from lib.embedder import run_embedding
embedded = run_embedding()
print(f" Embedded: {embedded}")
if result['acquired']:
print(f"\n{result['acquired']} transcript(s) staged in hopper.")
print("The dispatcher will pick them up on its next cycle.")
print("Run 'recon status' to monitor progress.")
return 0
@ -1417,21 +1109,6 @@ def main():
p.set_defaults(func=cmd_ingest_url)
# crawl
p = sub.add_parser('crawl', help='Crawl a site and ingest discovered pages')
p.add_argument('url', help='Base URL to crawl')
p.add_argument('--category', default='Web', help='Category for ingested content')
p.add_argument('--source', default=None, help='Source identifier (default: domain name)')
p.add_argument('--include', nargs='+', help='Only include URL paths starting with these prefixes')
p.add_argument('--exclude', nargs='+', help='Exclude URL paths starting with these prefixes')
p.add_argument('--max-pages', type=int, default=500, help='Maximum pages to ingest')
p.add_argument('--max-depth', type=int, default=3, help='Maximum link-follow depth')
p.add_argument('--delay', type=float, default=1.0, help='Delay between page fetches (seconds)')
p.add_argument('--dry-run', action='store_true', help='Discover URLs without ingesting')
p.add_argument('--no-sitemap', action='store_true', help='Skip sitemap, use link-following only')
p.add_argument('--enrich', action='store_true', help='Run enrichment after crawl')
p.add_argument('--process', action='store_true', help='Full pipeline: crawl + enrich + embed')
p.set_defaults(func=cmd_crawl)
# validate
p = sub.add_parser('validate', help='Validate pipeline consistency')
p.add_argument('--deep', action='store_true', help='Deep validation (check all files)')
@ -1459,12 +1136,8 @@ def main():
p.set_defaults(func=cmd_organize)
# ingest-peertube
p = sub.add_parser('ingest-peertube', help='Ingest PeerTube video transcripts')
p.add_argument('--channel', help='Ingest specific channel (actor_name)')
p.add_argument('--since', help='Only ingest videos published after this date (ISO format)')
p = sub.add_parser('ingest-peertube', help='Acquire PeerTube transcripts to hopper')
p.add_argument('--stats', action='store_true', help='Show PeerTube instance stats')
p.add_argument('--enrich', action='store_true', help='Run enrichment after ingestion')
p.add_argument('--process', action='store_true', help='Full pipeline: ingest + enrich + embed')
p.set_defaults(func=cmd_ingest_peertube)
# ingest

View file

@ -210,6 +210,7 @@ tr:hover { background: var(--bg-secondary); }
}
.badge-web { background: #1e3a5f; color: #60a5fa; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }
.badge-pdf { background: #2d5a2d; color: #4ade80; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }
.badge-transcript { background: #3b1f5e; color: #c084fc; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }
/* ── Trend indicators ── */
.trend { font-size: 11px; margin-left: 6px; }

View file

@ -88,7 +88,7 @@
var pipeCount = s.in_pipeline || 0;
totalCat += catCount; totalComp += compCount; totalPipe += pipeCount;
totalConcepts += s.concepts; totalVectors += s.vectors;
var badge = s.type === 'web' ? '<span class="badge-web">WEB</span>' : '<span class="badge-pdf">PDF</span>';
var badge = s.type === 'transcript' ? '<span class="badge-transcript">TRANSCRIPT</span>' : s.type === 'web' ? '<span class="badge-web">WEB</span>' : '<span class="badge-pdf">PDF</span>';
var compPct = catCount > 0 ? (compCount / catCount * 100) : 0;
var pipePct = catCount > 0 ? (pipeCount / catCount * 100) : 0;
var compColor = compPct >= 100 ? '#00ff41' : compPct > 0 ? '#ffa500' : '#666';
@ -185,7 +185,7 @@
rtb.innerHTML = '<tr><td colspan="4" class="text-dim">None yet</td></tr>';
} else {
rtb.innerHTML = data.recent_complete.map(function(r) {
var badge = r.type === 'web' ? '<span class="badge-web">WEB</span>' : '<span class="badge-pdf">PDF</span>';
var badge = r.type === 'transcript' ? '<span class="badge-transcript">TRANSCRIPT</span>' : r.type === 'web' ? '<span class="badge-web">WEB</span>' : '<span class="badge-pdf">PDF</span>';
return '<tr><td>' + r.title + '</td><td>' + badge + '</td><td>' +
r.concepts + '</td><td>' + r.vectors + '</td></tr>';
}).join('');

View file

@ -1,12 +1,13 @@
{% extends "base.html" %}
{% block content %}
<h3 class="section-title mb-16">Upload PDF</h3>
<h3 class="section-title mb-16">Upload Document</h3>
<div class="panel">
<form id="upload-form" enctype="multipart/form-data">
<div class="mb-16">
<label class="text-dim text-xs" style="text-transform:uppercase;display:block;margin-bottom:4px;">PDF File</label>
<input type="file" name="file" accept=".pdf" id="upload-file"
<label class="text-dim text-xs" style="text-transform:uppercase;display:block;margin-bottom:4px;">Document File</label>
<input type="file" name="file" accept=".pdf,.txt,.epub,.doc,.docx,.mobi" id="upload-file"
style="background:#0a0a0a;border:1px solid #333;color:#c0c0c0;padding:8px;width:100%;font-family:inherit;">
<span class="text-dim" style="font-size:11px;display:block;margin-top:4px;">Supported: PDF, TXT, EPUB, DOC, DOCX, MOBI</span>
</div>
<div class="mb-16">
<label class="text-dim text-xs" style="text-transform:uppercase;display:block;margin-bottom:4px;">Category</label>
@ -67,7 +68,7 @@ document.getElementById('upload-form').addEventListener('submit', async function
result.innerHTML = '<span style="color:#00ff41;">Queued for processing</span><br>' +
'<span class="text-dim">Hash: ' + data.hash + '</span><br>' +
'<span class="text-dim">File: ' + data.filename + '</span><br>' +
'<span class="text-dim">Category: ' + data.source + '/' + data.category + '</span>';
'<span class="text-dim">Type: ' + data.source_type + '</span>';
fileInput.value = '';
} else {
status.style.color = '#ff4444';