diff --git a/config.yaml b/config.yaml index 1f5d1b0..3e185f8 100644 --- a/config.yaml +++ b/config.yaml @@ -411,6 +411,7 @@ peertube: public_url: https://stream.echo6.co # Public URL for video links fetch_timeout: 30 # HTTP timeout for API/VTT requests rate_limit_delay: 0.5 # Delay between video ingestions (seconds) + poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) # Stream B: New Library Pipeline new_pipeline: @@ -436,5 +437,6 @@ pipeline: pdf: pdf_processor stream: transcript_processor html: html_processor + text: text_processor # mtime stability threshold for picking up files from acquired/ mtime_stability_seconds: 10 diff --git a/lib/acquisition/__init__.py b/lib/acquisition/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/acquisition/peertube.py b/lib/acquisition/peertube.py new file mode 100644 index 0000000..43b9a9a --- /dev/null +++ b/lib/acquisition/peertube.py @@ -0,0 +1,224 @@ +""" +RECON PeerTube Acquisition Module + +Polls PeerTube for new video transcripts and writes them as flat file pairs +into data/acquired/stream/ for the dispatcher to pick up. + +Does NOT touch the database — that's transcript_processor's job. +""" +import json +import os +import time + +from lib.peertube_scraper import get_videos, get_captions, fetch_vtt, vtt_to_text, _get_pt_config +from lib.utils import content_hash, get_config, setup_logging + +logger = setup_logging("recon.acquisition.peertube") + + +def _build_known_sets(db): + """Build sets of known UUIDs and titles from catalogue. + + Queries catalogue once per batch for dedup against both cohorts: + - URL-path rows: extract UUID from https://stream.echo6.co/w/{uuid} + - Library-path rows: extract title from filename column + """ + conn = db._get_conn() + rows = conn.execute( + "SELECT path, filename FROM catalogue WHERE source = 'stream.echo6.co'" + ).fetchall() + known_uuids = set() + known_titles = set() + for row in rows: + path = row['path'] or '' + if '/w/' in path: + known_uuids.add(path.rsplit('/w/', 1)[-1]) + fname = row['filename'] or '' + if fname.endswith('.txt'): + known_titles.add(fname[:-4]) + else: + known_titles.add(fname) + return known_uuids, known_titles + + +def list_new_videos(db, config=None): + """Find PeerTube videos with captions not yet in catalogue. + + Returns list of (video_dict, caption_path) tuples for videos that have + captions and are not in the known UUID or title sets. + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + rate_delay = ptc.get('rate_limit_delay', 0.5) + + known_uuids, known_titles = _build_known_sets(db) + + videos = get_videos(config=config) + new_videos = [] + checked = 0 + + for video in videos: + if video['uuid'] in known_uuids: + continue + if video['name'] in known_titles: + continue + + # Rate limit caption API calls + if checked > 0: + time.sleep(rate_delay) + checked += 1 + + try: + captions = get_captions(video['uuid'], config) + except Exception as e: + logger.warning("[peertube] Failed to get captions for %s: %s", + video['uuid'][:8], e) + continue + + if not captions: + continue + + # Prefer English caption + caption_path = None + for c in captions: + if c.get('language', {}).get('id') == 'en': + caption_path = c['captionPath'] + break + if caption_path is None: + caption_path = captions[0]['captionPath'] + + new_videos.append((video, caption_path)) + + return new_videos + + +def acquire_one(video, caption_path, config=None): + """Fetch transcript and write to hopper as flat files. + + Returns hash string on success, None on skip/error. + Does NOT touch the database — that's transcript_processor's job. + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + + pipeline_cfg = config.get('pipeline', {}) + hopper_dir = os.path.join( + pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired'), + 'stream' + ) + os.makedirs(hopper_dir, exist_ok=True) + + uuid = video['uuid'] + + # Fetch and convert VTT + vtt_content = fetch_vtt(caption_path, config) + text, cue_timestamps = vtt_to_text(vtt_content) + + if not text or len(text.strip()) < 50: + logger.debug("[peertube] Transcript too short for %s (%s): %d chars", + video['name'], uuid, len(text) if text else 0) + return None + + # Write text to temp file, hash it, then rename to final name + tmp_txt = os.path.join(hopper_dir, f'{uuid}.txt.tmp') + with open(tmp_txt, 'w', encoding='utf-8') as f: + f.write(text) + + file_hash = content_hash(tmp_txt) + + # Check if final file already exists (race condition guard) + final_txt = os.path.join(hopper_dir, f'{file_hash}.txt') + final_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json') + if os.path.exists(final_txt): + os.remove(tmp_txt) + logger.debug("[peertube] Hopper file already exists: %s", file_hash[:8]) + return None + + # Build sidecar metadata + video_url = f"{ptc['public_url']}/w/{uuid}" + meta = { + 'title': video['name'], + 'source_url': video_url, + 'url': video_url, + 'source': 'stream.echo6.co', + 'source_type': 'transcript', + 'category': 'Transcript', + 'channel': video.get('channel_display', ''), + 'duration': video.get('duration', 0), + 'uuid': uuid, + 'cue_timestamps': cue_timestamps, + } + + # Write meta to tmp, then rename both atomically + # Meta first, then content — dispatcher only picks up when content file exists + tmp_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json.tmp') + with open(tmp_meta, 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + os.rename(tmp_meta, final_meta) + os.rename(tmp_txt, final_txt) + + logger.info("[peertube] Acquired: %s (%s) -> %s", + video['name'], uuid[:8], file_hash[:12]) + return file_hash + + +def acquire_batch(db, config=None): + """One-shot: find new videos and acquire them. + + Returns dict: {'acquired': N, 'skipped': N, 'errors': N} + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + rate_delay = ptc.get('rate_limit_delay', 0.5) + + result = {'acquired': 0, 'skipped': 0, 'errors': 0} + + try: + new_videos = list_new_videos(db, config) + except Exception as e: + logger.error("[peertube] Failed to list new videos: %s", e, exc_info=True) + result['errors'] = 1 + return result + + if not new_videos: + logger.debug("[peertube] No new videos found") + return result + + logger.info("[peertube] Found %d new videos to acquire", len(new_videos)) + + for i, (video, caption_path) in enumerate(new_videos): + if i > 0: + time.sleep(rate_delay) + try: + file_hash = acquire_one(video, caption_path, config) + if file_hash: + result['acquired'] += 1 + else: + result['skipped'] += 1 + except Exception as e: + logger.error("[peertube] Error acquiring %s (%s): %s", + video['name'], video['uuid'][:8], e, exc_info=True) + result['errors'] += 1 + + return result + + +def acquisition_loop(stop_event, db, config, interval=1800): + """Service loop: poll PeerTube for new transcripts every interval seconds.""" + logger.info("[peertube] Acquisition loop started (interval: %ds)", interval) + while not stop_event.is_set(): + try: + result = acquire_batch(db, config) + if result['acquired']: + logger.info("[peertube] Acquired %d new transcripts (%d skipped, %d errors)", + result['acquired'], result['skipped'], result['errors']) + else: + logger.debug("[peertube] No new transcripts") + except Exception as e: + logger.error("[peertube] Error: %s", e, exc_info=True) + stop_event.wait(interval) + logger.info("[peertube] Acquisition loop stopped") diff --git a/lib/api.py b/lib/api.py index 4ceab68..757ebf4 100644 --- a/lib/api.py +++ b/lib/api.py @@ -9,6 +9,7 @@ API endpoints for all pipeline operations including crawl, ingest, and search. Dependencies: Flask, qdrant-client, requests Config: web, vector_db, embedding sections of config.yaml """ +import glob import json import threading import os @@ -77,25 +78,20 @@ def _format_source_citation(payload): return book -def _resolve_upload_path(category, config): - """Resolve the target directory for an upload given a category name.""" - upload_paths = config.get('upload_paths', {}) - library_root = config['library_root'] +ALLOWED_EXTENSIONS = {'.pdf', '.txt', '.epub', '.doc', '.docx', '.mobi'} - if category in upload_paths: - return upload_paths[category] - - default_path = upload_paths.get('default', library_root) - safe_category = secure_filename(category) if category else '' - if safe_category: - return os.path.join(default_path, safe_category) - return default_path +HOPPER_ROUTING = { + '.pdf': '/opt/recon/data/acquired/pdf/', + '.txt': '/opt/recon/data/acquired/text/', + '.epub': '/opt/recon/data/acquired/pdf/', + '.doc': '/opt/recon/data/acquired/pdf/', + '.docx': '/opt/recon/data/acquired/pdf/', + '.mobi': '/opt/recon/data/acquired/pdf/', +} -def _process_upload(filepath, original_filename, category, config, db): - """Process a single PDF upload: hash, dedup, copy to library, catalogue, queue.""" - library_root = config['library_root'] - +def _process_upload(filepath, original_filename, ext, category, config, db): + """Process an upload: hash, dedup, drop into hopper for dispatcher pickup.""" file_hash = content_hash(filepath) conn = db._get_conn() @@ -103,34 +99,39 @@ def _process_upload(filepath, original_filename, category, config, db): if existing: raise ValueError(f"Duplicate: file already catalogued as {existing['filename']}") - target_dir = _resolve_upload_path(category, config) - os.makedirs(target_dir, exist_ok=True) + # Also check if already sitting in a hopper dir awaiting dispatch + for hopper in HOPPER_ROUTING.values(): + if any(os.path.exists(os.path.join(hopper, file_hash + e)) for e in ALLOWED_EXTENSIONS): + raise ValueError("Duplicate: file already queued for processing") - safe_name = secure_filename(original_filename) - if not safe_name: - safe_name = f"{file_hash}.pdf" - target_path = os.path.join(target_dir, safe_name) + hopper_dir = HOPPER_ROUTING.get(ext, '/opt/recon/data/acquired/pdf/') + os.makedirs(hopper_dir, exist_ok=True) - if os.path.exists(target_path): - base, ext = os.path.splitext(safe_name) - target_path = os.path.join(target_dir, f"{base}_{file_hash[:8]}{ext}") + target_path = os.path.join(hopper_dir, file_hash + ext) + meta_path = os.path.join(hopper_dir, file_hash + '.meta.json') + + stem = os.path.splitext(original_filename)[0] + sidecar = { + 'title': stem, + 'source': 'dashboard_upload', + 'source_type': ext.lstrip('.'), + 'category': category, + 'original_filename': original_filename, + } + + # Write sidecar first (with .tmp safety), then content + tmp_meta = meta_path + '.tmp' + with open(tmp_meta, 'w', encoding='utf-8') as f: + json.dump(sidecar, f, indent=2) + os.rename(tmp_meta, meta_path) shutil.copy2(filepath, target_path) - size = os.path.getsize(target_path) - - source, derived_category = derive_source_and_category(target_path, library_root) - - db.add_to_catalogue(file_hash, safe_name, target_path, size, source, derived_category) - db.queue_document(file_hash) return { 'hash': file_hash, - 'filename': safe_name, - 'category': derived_category, - 'source': source, - 'path': target_path, - 'size_bytes': size, - 'status': 'queued' + 'filename': original_filename, + 'source_type': ext.lstrip('.'), + 'status': 'queued', } @@ -346,22 +347,23 @@ def api_upload(): if not file.filename: return jsonify({'error': 'No file selected'}), 400 - if not file.filename.lower().endswith('.pdf'): - return jsonify({'error': 'Only PDF files are accepted'}), 400 + ext = os.path.splitext(file.filename)[1].lower() + if ext not in ALLOWED_EXTENSIONS: + return jsonify({'error': f'Unsupported file type: {ext}'}), 400 category = request.form.get('category', '').strip() config = get_config() db = StatusDB() - tmp_fd, tmp_path = tempfile.mkstemp(suffix='.pdf') + tmp_fd, tmp_path = tempfile.mkstemp(suffix=ext) try: file.save(tmp_path) if os.path.getsize(tmp_path) == 0: return jsonify({'error': 'Uploaded file is empty'}), 400 - result = _process_upload(tmp_path, file.filename, category, config, db) + result = _process_upload(tmp_path, file.filename, ext, category, config, db) return jsonify(result), 201 except ValueError as e: @@ -390,6 +392,28 @@ def api_upload_status(doc_hash): 'filename': cat['filename'], 'status': cat['status'], }) + + # Check hopper dirs for files awaiting dispatcher pickup + for hopper in ('/opt/recon/data/acquired/pdf/', '/opt/recon/data/acquired/text/'): + if glob.glob(os.path.join(hopper, doc_hash + '.*')): + return jsonify({ + 'hash': doc_hash, + 'status': 'pending', + 'message': 'Waiting for dispatcher', + }) + + # Check processing dir + proc_dir = os.path.join( + config.get('pipeline', {}).get('processing_root', '/opt/recon/data/processing'), + doc_hash, + ) + if os.path.isdir(proc_dir): + return jsonify({ + 'hash': doc_hash, + 'status': 'processing', + 'message': 'Being processed', + }) + return jsonify({'error': 'Document not found'}), 404 result = { @@ -882,7 +906,11 @@ def _build_knowledge_stats(): sources = conn.execute(""" SELECT c.source, - CASE WHEN c.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type, + CASE + WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN c.path LIKE 'http%' THEN 'web' + ELSE 'pdf' + END as type, COUNT(DISTINCT c.hash) as catalogued, COUNT(DISTINCT CASE WHEN d.status = 'complete' THEN d.hash END) as complete, COUNT(DISTINCT CASE WHEN d.status NOT IN ('complete', 'failed') AND d.status IS NOT NULL THEN d.hash END) as in_pipeline, @@ -935,11 +963,17 @@ def _build_knowledge_stats(): """).fetchone()[0] recent = conn.execute(""" - SELECT book_title, status, concepts_extracted, vectors_inserted, - CASE WHEN path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type - FROM documents - WHERE status = 'complete' - ORDER BY embedded_at DESC + SELECT COALESCE(d.book_title, c.filename) as title, + d.status, d.concepts_extracted, d.vectors_inserted, + CASE + WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN d.path LIKE 'http%' THEN 'web' + ELSE 'pdf' + END as type + FROM documents d + JOIN catalogue c ON d.hash = c.hash + WHERE d.status = 'complete' + ORDER BY d.embedded_at DESC LIMIT 10 """).fetchall() @@ -979,7 +1013,7 @@ def _build_knowledge_stats(): 'source_types': dict(sorted(source_type_counts.items(), key=lambda x: -x[1])), 'sample_size': sample_size, 'recent_complete': [{ - 'title': r['book_title'] or 'Untitled', + 'title': r['title'] or 'Untitled', 'concepts': r['concepts_extracted'] or 0, 'vectors': r['vectors_inserted'] or 0, 'type': r['type'], diff --git a/lib/crawler.py b/lib/crawler.py deleted file mode 100644 index a37a0b9..0000000 --- a/lib/crawler.py +++ /dev/null @@ -1,432 +0,0 @@ -""" -RECON Site Crawler — URL discovery for bulk web ingestion. - -Two discovery strategies: -1. Sitemap-based (preferred) — parses sitemap.xml for all URLs -2. Link-following (fallback) — crawls from root URL following internal links - -Discovered URLs are fed into web_scraper.ingest_url() for processing. -""" - -import re -import time -from collections import deque -from urllib.parse import urlparse, urljoin, urldefrag - -import requests -from lxml import etree - -from .utils import get_config, setup_logging - -logger = setup_logging('recon.crawler') - - -def _get_crawler_config(config=None): - """Load crawler config with defaults.""" - if config is None: - config = get_config() - crawler_cfg = config.get('crawler', {}) - web_cfg = config.get('web_scraper', {}) - return { - 'user_agent': ( - crawler_cfg.get('user_agent') or - web_cfg.get('user_agent') or - 'Mozilla/5.0 (compatible; RECON/1.0)' - ), - 'fetch_timeout': crawler_cfg.get('fetch_timeout', 30), - 'rate_limit_delay': crawler_cfg.get('rate_limit_delay', 1.0), - 'max_pages': crawler_cfg.get('max_pages', 500), - 'max_depth': crawler_cfg.get('max_depth', 3), - 'default_exclude': crawler_cfg.get('default_exclude', [ - '/search', '/404', '/login', '/signup', '/auth/', '/api/', '/assets/', '/static/' - ]), - } - - -# ─── Sitemap Discovery ───────────────────────────────────────────── - -def discover_sitemap_url(base_url, config=None): - """ - Find the sitemap URL for a site. - - Checks: robots.txt Sitemap: directive, /sitemap.xml, - /sitemap_index.xml, /sitemap-0.xml. - - Returns sitemap URL or None. - """ - cfg = _get_crawler_config(config) - headers = {'User-Agent': cfg['user_agent']} - parsed = urlparse(base_url) - root = f"{parsed.scheme}://{parsed.netloc}" - - # Check robots.txt first - try: - resp = requests.get( - f"{root}/robots.txt", - headers=headers, - timeout=cfg['fetch_timeout'] - ) - if resp.status_code == 200: - for line in resp.text.splitlines(): - if line.strip().lower().startswith('sitemap:'): - sitemap_url = line.split(':', 1)[1].strip() - # Handle "Sitemap: https://..." — split(':',1) keeps the URL intact - # but "Sitemap: https://..." splits into "Sitemap" and " https://..." - # Need to rejoin properly - if not sitemap_url.startswith('http'): - sitemap_url = line[line.index(':') + 1:].strip() - logger.info(f"Found sitemap in robots.txt: {sitemap_url}") - return sitemap_url - except Exception as e: - logger.debug(f"robots.txt fetch failed: {e}") - - # Try common sitemap locations - candidates = [ - f"{root}/sitemap.xml", - f"{root}/sitemap_index.xml", - f"{root}/sitemap-0.xml", - ] - - for url in candidates: - try: - resp = requests.head( - url, - headers=headers, - timeout=cfg['fetch_timeout'], - allow_redirects=True - ) - if resp.status_code == 200: - logger.info(f"Found sitemap at: {url}") - return url - except Exception: - continue - - logger.warning(f"No sitemap found for {base_url}") - return None - - -def parse_sitemap(sitemap_url, config=None): - """ - Parse a sitemap XML and return all page URLs. - - Handles standard sitemaps () and sitemap indexes - () with recursive sub-sitemap fetching. - """ - cfg = _get_crawler_config(config) - headers = {'User-Agent': cfg['user_agent']} - all_urls = [] - - def _fetch_and_parse(url, depth=0): - if depth > 3: - return - - try: - resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout']) - resp.raise_for_status() - except Exception as e: - logger.error(f"Failed to fetch sitemap {url}: {e}") - return - - try: - root = etree.fromstring(resp.content) - except etree.XMLSyntaxError as e: - logger.error(f"Invalid XML in sitemap {url}: {e}") - return - - nsmap = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} - - # Check if this is a sitemap index - sitemap_locs = root.findall('.//ns:sitemap/ns:loc', nsmap) - if sitemap_locs: - logger.info(f"Sitemap index at {url} — {len(sitemap_locs)} sub-sitemaps") - for loc in sitemap_locs: - if loc.text: - _fetch_and_parse(loc.text.strip(), depth + 1) - return - - # Standard sitemap — extract URLs - url_locs = root.findall('.//ns:loc', nsmap) - - # Fallback: try without namespace - if not url_locs: - url_locs = root.findall('.//loc') - - for loc in url_locs: - if loc.text: - all_urls.append(loc.text.strip()) - - logger.info(f"Parsed {len(url_locs)} URLs from {url}") - - _fetch_and_parse(sitemap_url) - - # Deduplicate preserving order - seen = set() - unique = [] - for url in all_urls: - url_clean = urldefrag(url)[0] - if url_clean not in seen: - seen.add(url_clean) - unique.append(url_clean) - - logger.info(f"Total unique URLs from sitemap: {len(unique)}") - return unique - - -# ─── Link-Following Discovery (Fallback) ─────────────────────────── - -def crawl_links(base_url, max_depth=3, max_pages=500, config=None): - """ - Discover URLs by following internal links (BFS). - Fallback when no sitemap is available. - """ - from bs4 import BeautifulSoup - - cfg = _get_crawler_config(config) - headers = {'User-Agent': cfg['user_agent']} - - parsed_base = urlparse(base_url) - base_domain = parsed_base.netloc - - discovered = [] - visited = set() - queue = deque([(base_url, 0)]) - - skip_extensions = ( - '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.svg', - '.css', '.js', '.zip', '.tar', '.gz', '.mp4', '.mp3', - '.ico', '.woff', '.woff2', '.ttf', '.eot', - ) - skip_paths = ( - '/tag/', '/tags/', '/page/', '/feed/', '/rss/', - '/wp-json/', '/wp-admin/', '/wp-includes/', - ) - - while queue and len(discovered) < max_pages: - url, depth = queue.popleft() - url = urldefrag(url)[0] - - if url in visited: - continue - if depth > max_depth: - continue - - visited.add(url) - discovered.append(url) - - if depth >= max_depth: - continue - - try: - resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout']) - if resp.status_code != 200: - continue - if 'text/html' not in resp.headers.get('content-type', ''): - continue - except Exception: - continue - - try: - soup = BeautifulSoup(resp.text, 'lxml') - except Exception: - continue - - for a_tag in soup.find_all('a', href=True): - href = a_tag['href'] - full_url = urljoin(url, href) - full_url = urldefrag(full_url)[0] - - parsed = urlparse(full_url) - if parsed.netloc != base_domain: - continue - if any(parsed.path.lower().endswith(ext) for ext in skip_extensions): - continue - if any(skip in parsed.path.lower() for skip in skip_paths): - continue - - if full_url not in visited: - queue.append((full_url, depth + 1)) - - time.sleep(cfg['rate_limit_delay']) - - logger.info(f"Link crawl: {len(discovered)} URLs (visited {len(visited)}, depth {max_depth})") - return discovered - - -# ─── URL Filtering ────────────────────────────────────────────────── - -def filter_urls(urls, include=None, exclude=None): - """ - Filter URLs by path prefix include/exclude rules. - - include: URL must match at least one prefix (if provided) - exclude: URL must not match any prefix - """ - filtered = [] - - for url in urls: - path = urlparse(url).path - - if include: - if not any(path.startswith(prefix) for prefix in include): - continue - - if exclude: - if any(path.startswith(prefix) for prefix in exclude): - continue - - filtered.append(url) - - logger.info(f"Filtered {len(urls)} -> {len(filtered)} URLs " - f"(include={include}, exclude={exclude})") - return filtered - - -# ─── Main Crawl Orchestrator ──────────────────────────────────────── - -def crawl_site( - base_url, - category='Web', - source=None, - include=None, - exclude=None, - max_pages=None, - max_depth=None, - delay=None, - dry_run=False, - use_sitemap=True, - use_links=True, - config=None, -): - """ - Crawl a site and ingest all discovered pages. - - 1. Discover URLs via sitemap or link-following - 2. Apply include/exclude filters - 3. Feed each URL through web_scraper.ingest_url() - - Returns summary dict with counts and per-URL results. - """ - if config is None: - config = get_config() - cfg = _get_crawler_config(config) - - if max_pages is None: - max_pages = cfg['max_pages'] - if max_depth is None: - max_depth = cfg['max_depth'] - if delay is None: - delay = cfg['rate_limit_delay'] - if source is None: - source = urlparse(base_url).netloc - - logger.info(f"Crawling {base_url} (category={category}, max_pages={max_pages})") - - # ── Phase 1: Discover URLs ── - - urls = [] - discovery_method = None - - if use_sitemap: - sitemap_url = discover_sitemap_url(base_url, config) - if sitemap_url: - urls = parse_sitemap(sitemap_url, config) - discovery_method = 'sitemap' - - if not urls and use_links: - logger.info("No sitemap URLs, falling back to link crawl...") - urls = crawl_links(base_url, max_depth=max_depth, max_pages=max_pages, config=config) - discovery_method = 'link_crawl' - - if not urls: - logger.warning(f"No URLs discovered for {base_url}") - return { - 'site': base_url, - 'discovery_method': None, - 'urls_discovered': 0, - 'urls_after_filter': 0, - 'results': [], - 'summary': {'total': 0, 'succeeded': 0, 'duplicates': 0, 'failed': 0}, - } - - # ── Phase 2: Filter URLs ── - - all_exclude = list(cfg['default_exclude']) - if exclude: - all_exclude.extend(exclude) - - urls = filter_urls(urls, include=include, exclude=all_exclude) - - if len(urls) > max_pages: - logger.info(f"Limiting to {max_pages} pages (discovered {len(urls)})") - urls = urls[:max_pages] - - logger.info(f"After filtering: {len(urls)} URLs to process") - - # ── Dry run ── - - if dry_run: - return { - 'site': base_url, - 'discovery_method': discovery_method, - 'dry_run': True, - 'urls_discovered': len(urls), - 'urls': urls, - } - - # ── Phase 3: Ingest each URL ── - - from .web_scraper import ingest_url - - results = [] - total = len(urls) - - for i, url in enumerate(urls, 1): - logger.info(f"[{i}/{total}] Ingesting: {url}") - - try: - result = ingest_url(url, category=category, source=source, config=config) - result['url'] = url - results.append(result) - - status = result.get('status', 'unknown') - title = result.get('title', '') - if status == 'duplicate': - logger.info(f" DUPLICATE: {title}") - else: - logger.info(f" OK: {title} ({result.get('page_count', 0)} pages)") - - except Exception as e: - logger.error(f" FAILED: {url} -- {e}") - results.append({ - 'url': url, - 'status': 'failed', - 'error': str(e), - }) - - if i < total and delay > 0: - time.sleep(delay) - - # ── Summary ── - - succeeded = sum(1 for r in results if r.get('status') not in ('failed', 'duplicate')) - duplicates = sum(1 for r in results if r.get('status') == 'duplicate') - failed = sum(1 for r in results if r.get('status') == 'failed') - - summary = { - 'total': len(results), - 'succeeded': succeeded, - 'duplicates': duplicates, - 'failed': failed, - } - - logger.info(f"Crawl complete: {succeeded} new, {duplicates} duplicates, {failed} failed out of {total}") - - return { - 'site': base_url, - 'domain': urlparse(base_url).netloc, - 'category': category, - 'discovery_method': discovery_method, - 'urls_discovered': total, - 'results': results, - 'summary': summary, - } diff --git a/lib/dispatcher.py b/lib/dispatcher.py new file mode 100644 index 0000000..7294ec4 --- /dev/null +++ b/lib/dispatcher.py @@ -0,0 +1,237 @@ +""" +RECON Dispatcher + +Watches configured acquired// directories for content+sidecar pairs +that have been stable (mtime unchanged) for the configured threshold, then +hands them to the appropriate processor's pre_flight(). + +Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5. +Phase 4: sidecar is optional (PDFs may arrive without .meta.json). +Phase 6f-2: format normalizer converts non-standard formats to PDF before dispatch. +""" +import importlib +import logging +import os +import subprocess +import time + +from .utils import get_config +from .status import StatusDB + +logger = logging.getLogger("recon.dispatcher") + +# Content file extensions recognized by the dispatcher +CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'} + +# Non-standard formats that can be converted to PDF before dispatch +CONVERTIBLE_EXTENSIONS = {'.epub', '.mobi', '.doc', '.docx'} + + +def _load_processor(processor_name): + """Dynamically import a processor module from lib.processors.""" + module_path = f"lib.processors.{processor_name}" + try: + return importlib.import_module(module_path) + except ModuleNotFoundError: + logger.debug("Processor module not found: %s (not yet implemented)", processor_name) + return None + except ImportError as e: + logger.error("Failed to import processor %s: %s", processor_name, e) + return None + + +def _normalize_formats(subfolder_path): + """Convert non-standard document formats to PDF before dispatch. + + Walks the subfolder for files with convertible extensions (.epub, .mobi, + .doc, .docx). Converts each to PDF using the appropriate tool, then + deletes the original. + + Returns count of files converted. + """ + if not os.path.isdir(subfolder_path): + return 0 + + converted = 0 + + for fname in sorted(os.listdir(subfolder_path)): + stem, ext = os.path.splitext(fname) + if ext.lower() not in CONVERTIBLE_EXTENSIONS: + continue + + source = os.path.join(subfolder_path, fname) + target = os.path.join(subfolder_path, stem + '.pdf') + + if os.path.exists(target): + logger.debug("Target PDF already exists, skipping: %s", fname) + continue + + try: + if ext.lower() in ('.epub', '.mobi'): + subprocess.run( + ['ebook-convert', source, target], + capture_output=True, check=True, timeout=300, + ) + elif ext.lower() in ('.doc', '.docx'): + subprocess.run( + ['libreoffice', '--headless', '--convert-to', 'pdf', + '--outdir', subfolder_path, source], + capture_output=True, check=True, timeout=300, + ) + + if os.path.isfile(target) and os.path.getsize(target) > 0: + os.remove(source) + converted += 1 + logger.info("Converted %s -> %s.pdf", fname, stem) + else: + logger.warning("Conversion produced no output: %s", fname) + + except subprocess.TimeoutExpired: + logger.error("Conversion timed out: %s", fname) + except subprocess.CalledProcessError as e: + logger.error("Conversion failed for %s: %s", fname, + e.stderr.decode(errors='replace')[:200] if e.stderr else str(e)) + except Exception as e: + logger.error("Unexpected error converting %s: %s", fname, e) + + return converted + + +def _find_pairs(subfolder_path): + """Find content files (with optional sidecar) in a subfolder. + + A pair is: + . — content file + .meta.json — optional sidecar + + Returns list of (content_path, meta_path_or_None, basename) tuples. + """ + if not os.path.isdir(subfolder_path): + return [] + + files = set(os.listdir(subfolder_path)) + pairs = [] + seen_basenames = set() + + # First pass: find .meta.json files and their matching content + for fname in sorted(files): + if fname.endswith('.meta.json'): + basename = fname[:-len('.meta.json')] + for ext in sorted(CONTENT_EXTENSIONS): + content_name = basename + ext + if content_name in files: + pairs.append(( + os.path.join(subfolder_path, content_name), + os.path.join(subfolder_path, fname), + basename, + )) + seen_basenames.add(content_name) + break + + # Second pass: find solo content files (no sidecar) + for fname in sorted(files): + if fname in seen_basenames: + continue + _stem, ext = os.path.splitext(fname) + if ext.lower() in CONTENT_EXTENSIONS and not fname.endswith('.meta.json'): + pairs.append(( + os.path.join(subfolder_path, fname), + None, + _stem, + )) + + return pairs + + +def _is_stable(filepath, stability_seconds): + """Check if a file's mtime is older than stability_seconds ago.""" + try: + mtime = os.path.getmtime(filepath) + return (time.time() - mtime) >= stability_seconds + except OSError: + return False + + +def dispatch_once(): + """One-shot dispatch: scan all configured acquired/ subfolders once. + + Returns list of result dicts from processor pre_flight calls. + """ + config = get_config() + pipeline_cfg = config.get('pipeline', {}) + acquired_root = pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired') + dispatch_map = pipeline_cfg.get('dispatch', {}) + stability_seconds = pipeline_cfg.get('mtime_stability_seconds', 10) + + db = StatusDB(config['paths']['db']) + results = [] + + for subfolder_name, processor_name in dispatch_map.items(): + subfolder_path = os.path.join(acquired_root, subfolder_name) + + processor = _load_processor(processor_name) + if processor is None: + continue + + if not hasattr(processor, 'pre_flight'): + logger.error("Processor %s has no pre_flight function", processor_name) + continue + + # Convert non-standard formats to PDF before scanning for pairs + _normalize_formats(subfolder_path) + + pairs = _find_pairs(subfolder_path) + if not pairs: + continue + + for content_path, meta_path, basename in pairs: + # Content file must be stable; sidecar too if present + if not _is_stable(content_path, stability_seconds): + logger.debug("File %s not yet stable, skipping", basename) + continue + if meta_path and not _is_stable(meta_path, stability_seconds): + logger.debug("Sidecar for %s not yet stable, skipping", basename) + continue + + logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name) + try: + result = processor.pre_flight(content_path, meta_path, db, config) + results.append(result) + logger.info("Result for %s: %s", basename, result.get('action', 'unknown')) + except Exception as e: + logger.error("Processor %s crashed on %s: %s", processor_name, basename, e) + results.append({ + 'action': 'error', + 'error': str(e), + 'basename': basename, + }) + + return results + + +def dispatch_loop(stop_event, db, config, interval=30): + """Run dispatch_once() on a loop until stop_event is set. + + Designed to run as a service thread. Never raises to the caller. + """ + logger.info("[dispatcher] Loop started (interval: %ds)", interval) + + while not stop_event.is_set(): + try: + results = dispatch_once() + if results: + actions = {} + for r in results: + a = r.get('action', 'unknown') + actions[a] = actions.get(a, 0) + 1 + logger.info("[dispatcher] Dispatched %d items: %s", + len(results), + ", ".join(f"{k}={v}" for k, v in sorted(actions.items()))) + else: + logger.debug("[dispatcher] No items to dispatch") + except Exception as e: + logger.error("[dispatcher] Error in dispatch_once: %s", e, exc_info=True) + + stop_event.wait(interval) + + logger.info("[dispatcher] Loop stopped") diff --git a/lib/embedder.py b/lib/embedder.py index b4ed162..35fcb58 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -21,6 +21,7 @@ from qdrant_client.models import PointStruct, SparseVector from .utils import get_config, concept_id, generate_download_url, setup_logging from .status import StatusDB +from .utils import resolve_text_dir logger = setup_logging('recon.embedder') @@ -274,7 +275,7 @@ def embed_single(file_hash, db, config): source_type = 'web' if is_web else 'document' # Check meta.json for explicit source_type (e.g. 'transcript') - text_dir = os.path.join(config['paths']['text'], file_hash) + text_dir = resolve_text_dir(file_hash, config, db) meta_path = os.path.join(text_dir, 'meta.json') page_timestamps = {} if os.path.exists(meta_path): diff --git a/lib/enricher.py b/lib/enricher.py index fe18d06..d9540aa 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -25,6 +25,7 @@ import google.generativeai as genai from .utils import get_config, setup_logging from .status import StatusDB +from .utils import resolve_text_dir logger = setup_logging('recon.enricher') @@ -345,7 +346,7 @@ def enrich_single(file_hash, db, config, key_rotator): if not doc: return False - text_dir = os.path.join(config['paths']['text'], file_hash) + text_dir = resolve_text_dir(file_hash, config, db) concepts_dir = os.path.join(config['paths']['concepts'], file_hash) window_size = config['processing']['enrich_window_size'] delay = config['processing']['rate_limit_delay'] diff --git a/lib/filing.py b/lib/filing.py new file mode 100644 index 0000000..fc8ff00 --- /dev/null +++ b/lib/filing.py @@ -0,0 +1,215 @@ +""" +RECON shared filing logic. + +Provides file_processed_item() — the shared function that any processor +can call to file a completed item from /opt/recon/data/processing/{hash}/ +into /mnt/library/Domain/Subdomain/{canonical_name}.{ext}. + +The function: + 1. Reads dominant domain from concept JSONs (existing logic) + 2. Derives canonical name (level 1, escalating to 2/3/4 only on collision) + 3. Moves the source file from processing/ to library/Domain/Subdomain/ + 4. Updates catalogue + documents + Qdrant payloads atomically + 5. Marks organized + +This function does NOT extract, enrich, or embed. Those are upstream stages. +This function does NOT touch the legacy organize_document() — that stays in place +until cutover (Phase 5). + +Phase 2: function exists, is tested in isolation. Not yet called by anything +in the service loop. +""" + +import logging +import os +import shutil + +from .organizer import determine_dominant_domain, _build_target_path +from .new_pipeline import update_qdrant_payload + +logger = logging.getLogger("recon.filing") + + +def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False): + """File a completed item into the library. + + Args: + doc_hash: Document hash + source_file_path: Current absolute path to the source file + (typically in /opt/recon/data/processing/{hash}/ or current library path) + db: StatusDB instance + config: RECON config dict + dry_run: If True, plan but don't move + + Returns: + dict with keys: + hash, action, source_path, target_path, + domain, subdomain, qdrant_points_updated, error + """ + result = { + "hash": doc_hash, + "action": "skip", + "source_path": source_file_path, + "target_path": None, + "domain": None, + "subdomain": None, + "qdrant_points_updated": 0, + "error": None, + } + + # Verify source file exists + if not os.path.exists(source_file_path): + result["action"] = "error" + result["error"] = f"Source file not found: {source_file_path}" + return result + + # Determine domain from existing concept JSONs + data_dir = config["paths"]["data"] + domain, subdomain, confidence = determine_dominant_domain(doc_hash, data_dir) + result["domain"] = domain + result["subdomain"] = subdomain + + if domain is None: + result["action"] = "skip_unclassified" + return result + + # Get the original filename from catalogue + conn = db._get_conn() + row = conn.execute( + "SELECT filename FROM catalogue WHERE hash = ?", (doc_hash,) + ).fetchone() + if not row: + result["action"] = "error" + result["error"] = f"Hash not in catalogue: {doc_hash}" + return result + + original_filename = row["filename"] + + # Build target path using existing collision-handling logic + library_root = config["library_root"] + target_path, sanitized_name = _build_target_path( + library_root, domain, subdomain, original_filename, doc_hash + ) + + if target_path is None: + result["action"] = "skip_unclassified" + return result + + # Fix 1.1: Preserve the source file's actual extension instead of + # the default .pdf that sanitize_filename() may have applied + source_ext = os.path.splitext(source_file_path)[1].lower() + if source_ext: + target_stem, _old_ext = os.path.splitext(target_path) + target_path = target_stem + source_ext + san_stem, _old_ext = os.path.splitext(sanitized_name) + sanitized_name = san_stem + source_ext + + result["target_path"] = target_path + + # If already at target (idempotency), just mark organized + if os.path.abspath(source_file_path) == os.path.abspath(target_path): + result["action"] = "skip_already_filed" + if not dry_run: + db.mark_organized(doc_hash) + return result + + if dry_run: + result["action"] = "would_file" + return result + + # Move the file + try: + target_dir = os.path.dirname(target_path) + os.makedirs(target_dir, exist_ok=True) + shutil.move(source_file_path, target_path) + except Exception as e: + result["action"] = "error" + result["error"] = f"Move failed: {e}" + logger.error("Move failed for %s: %s", doc_hash[:8], e) + return result + + # Update DB and Qdrant + try: + db.update_catalogue_path(doc_hash, target_path, sanitized_name) + db.sync_document_path(doc_hash, target_path, sanitized_name) + db.mark_organized(doc_hash) + + # Update Qdrant payloads (download_url, filename, original_filename) + points = update_qdrant_payload( + doc_hash, target_path, sanitized_name, original_filename, config + ) + result["qdrant_points_updated"] = points + + result["action"] = "filed" + logger.info( + "Filed %s -> %s [%s/%s, %d vectors]", + doc_hash[:8], + target_path, + domain, + subdomain, + points, + ) + except Exception as e: + # File was moved but DB update failed — log the dangerous state + result["action"] = "error" + result["error"] = f"DB/Qdrant update failed after move: {e}" + logger.error("DB/Qdrant update failed for %s: %s", doc_hash[:8], e) + + return result + + +def filing_worker_loop(stop_event, db, config, interval=30): + """Run filing on items ready to be filed until stop_event is set. + + Watches for documents with status='complete', organized_at IS NULL, + and path in /opt/recon/data/processing/. Files them to library. + + Designed to run as a service thread. Never raises to the caller. + """ + logger.info("[filing] Worker started (interval: %ds)", interval) + + while not stop_event.is_set(): + try: + conn = db._get_conn() + rows = conn.execute( + "SELECT hash, path FROM documents " + "WHERE status = 'complete' " + "AND organized_at IS NULL " + "AND path LIKE '/opt/recon/data/processing/%' " + "LIMIT 50" + ).fetchall() + + if rows: + filed = 0 + skipped = 0 + errors = 0 + for row in rows: + if stop_event.is_set(): + break + try: + result = file_processed_item(row['hash'], row['path'], db, config) + action = result.get('action', 'unknown') + if action == 'filed': + filed += 1 + elif action.startswith('skip'): + skipped += 1 + elif action == 'error': + errors += 1 + logger.warning("[filing] Error filing %s: %s", + row['hash'][:8], result.get('error', 'unknown')) + except Exception as e: + errors += 1 + logger.error("[filing] Exception filing %s: %s", + row['hash'][:8], e, exc_info=True) + + logger.info("[filing] Batch: %d filed, %d skipped, %d errors", + filed, skipped, errors) + else: + logger.debug("[filing] No items ready to file") + + except Exception as e: + logger.error("[filing] Error in filing worker: %s", e, exc_info=True) + + stop_event.wait(interval) + + logger.info("[filing] Worker stopped") diff --git a/lib/processors/__init__.py b/lib/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/processors/pdf_processor.py b/lib/processors/pdf_processor.py new file mode 100644 index 0000000..b5a17dd --- /dev/null +++ b/lib/processors/pdf_processor.py @@ -0,0 +1,665 @@ +""" +RECON PDF Processor + +Handles pre_flight for PDF content arriving in acquired/pdf/. +Opens the PDF, extracts layered metadata (PDF dict + filename + Gemini), +votes on fields, checks level-4 dedupe, runs full text extraction, +and registers in the database. + +Metadata sources: + A. PDF info dictionary (PyPDF2 reader.metadata) + B. Filename parsing (clean_filename_to_title + regex patterns) + C. Gemini LLM on first 3 pages of extracted text + +Voting: if 2+ sources agree on a field, use that value. +Otherwise priority: C (Gemini) > A (PDF dict) > B (filename). + +Level-4 dedupe: requires ALL FOUR fields (title, author, edition, year) +present and matching an existing document. Very conservative — only +catches near-certain duplicates. + +Failure modes: + - Transient (Gemini API): retry 3x with backoff, then continue without + - Content (unreadable PDF): move to _review/rejected_pdfs/ + - Level-4 duplicate: move to _review/duplicate_quarantine/ + +Phase 4: first implementation. +""" +import json +import logging +import os +import re +import shutil +import subprocess +import time + +import google.generativeai as genai +from PyPDF2 import PdfReader + +from lib.extractor import extract_text_from_page +from lib.utils import content_hash, clean_filename_to_title, sanitize_filename + +logger = logging.getLogger("recon.processors.pdf") + +# Maximum retries for transient (API) failures +MAX_TRANSIENT_RETRIES = 3 +TRANSIENT_BACKOFF_SECONDS = 30 + + +# ── Metadata Extraction Sources ───────────────────────────────────── + + +def _extract_pdf_dict(reader): + """Source A: Extract metadata from PDF's built-in info dictionary. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + try: + info = reader.metadata + if not info: + return result + except Exception: + return result + + # Title + title = None + try: + title = info.get('/Title') or info.title + except Exception: + pass + if title and isinstance(title, str) and len(title.strip()) > 2: + result['title'] = title.strip() + + # Author + author = None + try: + author = info.get('/Author') or info.author + except Exception: + pass + if author and isinstance(author, str) and len(author.strip()) > 1: + result['author'] = author.strip() + + # Year from CreationDate or ModDate + for date_key in ('/CreationDate', '/ModDate'): + try: + date_val = info.get(date_key) + except Exception: + continue + if date_val and isinstance(date_val, str): + match = re.search(r'(?:D:)?(\d{4})', date_val) + if match: + year = int(match.group(1)) + if 1800 <= year <= 2030: + result['year'] = str(year) + break + + # Edition from Subject or Title + for field_val in [info.get('/Subject'), title]: + if field_val and isinstance(field_val, str): + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + field_val, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + break + + return result + + +def _extract_filename_metadata(filename): + """Source B: Extract metadata by parsing the filename. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + stem = os.path.splitext(filename)[0] + + # Title from filename (using existing utility) + result['title'] = clean_filename_to_title(filename) + + # Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY near boundaries + year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem) + if year_match: + result['year'] = year_match.group(1) + + # Edition: look for Nth Edition, Edition N, etc. + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + stem, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + + # Author: look for "by Author Name" pattern + by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem) + if by_match: + result['author'] = by_match.group(1).strip() + + return result + + +def _extract_gemini_metadata(pages_text, config): + """Source C: Extract metadata using Gemini on first 3 pages text. + + Returns dict with keys: title, author, edition, year (any may be None). + Retries up to MAX_TRANSIENT_RETRIES times on transient failures. + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + keys = config.get('gemini_keys', []) + if not keys or len(pages_text.strip()) < 50: + return result + + prompt = ( + "Extract the following metadata from this book/document text.\n" + 'Return JSON: {"title": "...", "author": "...", "edition": "...", "year": "..."}\n' + "- title: The full title of the book or document\n" + "- author: The author(s) name(s)\n" + '- edition: The edition (e.g. "2nd Edition", "Revised Edition")\n' + "- year: The publication year (4-digit number as string)\n" + "If a field cannot be determined, use null.\n\n" + "Text from first pages:\n" + + pages_text[:6000] + ) + + for attempt in range(MAX_TRANSIENT_RETRIES): + try: + key = keys[attempt % len(keys)] + genai.configure(api_key=key) + model = genai.GenerativeModel( + config['gemini']['model'], + generation_config={ + "response_mime_type": config['gemini']['response_mime_type'] + } + ) + response = model.generate_content(prompt) + data = json.loads(response.text) + + for field in ('title', 'author', 'edition', 'year'): + val = data.get(field) + if val and isinstance(val, str) and val.strip() and val.strip().lower() != "null": + result[field] = val.strip() + + return result + + except Exception as e: + logger.warning( + "Gemini metadata attempt %d/%d failed: %s", + attempt + 1, MAX_TRANSIENT_RETRIES, e + ) + if attempt < MAX_TRANSIENT_RETRIES - 1: + time.sleep(TRANSIENT_BACKOFF_SECONDS) + + logger.warning( + "Gemini metadata extraction failed after %d attempts", MAX_TRANSIENT_RETRIES + ) + return result + + +# ── Voting ─────────────────────────────────────────────────────────── + + +def _vote_metadata(source_a, source_b, source_c): + """Vote on each metadata field across three sources. + + Priority: C (Gemini) > A (PDF dict) > B (filename). + If 2+ sources agree, use the agreed value. + + Returns: + (voted_dict, provenance_dict) + voted_dict: {field: value_or_None} + provenance_dict: {field: source_label_or_None} + """ + sources = {'pdf_dict': source_a, 'filename': source_b, 'gemini': source_c} + priority = ['gemini', 'pdf_dict', 'filename'] + voted = {} + provenance = {} + + for field in ('title', 'author', 'edition', 'year'): + values = {} + for name, src in sources.items(): + val = src.get(field) + if val and str(val).strip().lower() != "null": + values[name] = val + + if not values: + voted[field] = None + provenance[field] = None + continue + + # Normalize for comparison + def _norm(v): + if field == 'year': + return v.strip() + return v.strip().lower() + + # Group by normalized value + norm_groups = {} + for name, val in values.items(): + nv = _norm(val) + norm_groups.setdefault(nv, []).append((name, val)) + + # Find group with most agreement + best_group = max(norm_groups.values(), key=len) + + if len(best_group) >= 2: + # 2+ sources agree — use highest-priority original case + for p in priority: + for name, val in best_group: + if name == p: + voted[field] = val + names = ','.join(n for n, _ in best_group) + provenance[field] = "agreed({})".format(names) + break + if voted.get(field) is not None: + break + else: + # No agreement — highest priority wins + for p in priority: + if p in values: + voted[field] = values[p] + provenance[field] = p + break + + return voted, provenance + + +# ── Level-4 Dedupe ─────────────────────────────────────────────────── + + +def _check_level4_dedupe(voted, db): + """Level-4 strict dedupe: all four fields must be present and match. + + Returns (is_duplicate, matching_hash) or (False, None). + """ + title = voted.get('title') + author = voted.get('author') + edition = voted.get('edition') + year = voted.get('year') + + if not all([title, author, edition, year]): + return False, None + + conn = db._get_conn() + rows = conn.execute( + "SELECT hash, metadata_provenance FROM documents " + "WHERE book_title = ? AND book_author = ?", + (title, author) + ).fetchall() + + if not rows: + return False, None + + for row in rows: + prov_json = row['metadata_provenance'] + if not prov_json: + continue + try: + prov_data = json.loads(prov_json) + existing_voted = prov_data.get('voted', {}) + ex_edition = existing_voted.get('edition', '') + ex_year = existing_voted.get('year', '') + if (ex_edition and ex_year + and ex_edition.lower() == edition.lower() + and ex_year == year): + return True, row['hash'] + except (json.JSONDecodeError, AttributeError): + continue + + return False, None + + +# ── Page Count Fallback ────────────────────────────────────────────── + + +def _pdfinfo_page_count(pdf_path): + """Get page count via pdfinfo (poppler) when PdfReader fails.""" + try: + proc = subprocess.run( + ['pdfinfo', pdf_path], + capture_output=True, text=True, timeout=30 + ) + if proc.returncode == 0: + for line in proc.stdout.splitlines(): + if line.startswith('Pages:'): + return int(line.split(':', 1)[1].strip()) + except Exception: + pass + return 0 + + +# ── Pre-Flight ─────────────────────────────────────────────────────── + + +def pre_flight(content_path, meta_path, db, config): + """Process a PDF from acquired/pdf/. + + Args: + content_path: Path to the PDF file + meta_path: Path to .meta.json sidecar (may be None) + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'level4_duplicate', + 'content_failure', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + filename = os.path.basename(content_path) + + # ── Step 1: Hash ────────────────────────────────────────────── + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = "Cannot hash PDF: {}".format(e) + return result + + # ── Step 2: Stale state cleanup ─────────────────────────────── + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise + + # ── Step 3: Hash dedupe ─────────────────────────────────────── + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # ── Step 4: Size check ──────────────────────────────────────── + proc_cfg = config.get('processing', {}) + max_size_mb = proc_cfg.get('max_pdf_size_mb', 200) + try: + file_size_mb = os.path.getsize(content_path) / 1048576 + except OSError as e: + result['error'] = "Cannot stat PDF: {}".format(e) + return result + + if file_size_mb > max_size_mb: + result['error'] = "PDF too large: {:.0f}MB > {}MB limit".format( + file_size_mb, max_size_mb + ) + result['action'] = 'content_failure' + _move_to_rejected(content_path, meta_path, config, filename) + return result + + # ── Step 5: Open PDF ────────────────────────────────────────── + reader = None + page_count = 0 + + try: + reader = PdfReader(content_path) + page_count = len(reader.pages) + except Exception as e: + logger.warning( + "PdfReader failed for %s: %s — trying pdfinfo", filename, e + ) + page_count = _pdfinfo_page_count(content_path) + + if page_count == 0: + logger.error("Content failure: cannot read PDF %s", filename) + result['action'] = 'content_failure' + result['error'] = "Cannot read PDF (0 pages)" + _move_to_rejected(content_path, meta_path, config, filename) + return result + + # ── Step 6: Source A — PDF dict metadata ────────────────────── + source_a = _extract_pdf_dict(reader) if reader else { + 'title': None, 'author': None, 'edition': None, 'year': None + } + + # ── Step 7: Source B — Filename metadata ────────────────────── + source_b = _extract_filename_metadata(filename) + + # ── Step 8: Extract first 3 pages for Source C ──────────────── + page_timeout = proc_cfg.get('page_timeout', 30) + first_pages_text = "" + + if reader: + for i in range(min(3, page_count)): + try: + text, _method = extract_text_from_page( + reader, i, content_path, page_timeout + ) + first_pages_text += text + "\n" + except Exception as e: + logger.warning( + "Failed to extract page %d for metadata: %s", i + 1, e + ) + + # ── Step 9: Source C — Gemini metadata ──────────────────────── + source_c = _extract_gemini_metadata(first_pages_text, config) + + # ── Step 10: Vote ───────────────────────────────────────────── + voted, provenance = _vote_metadata(source_a, source_b, source_c) + + provenance_record = { + 'voted': voted, + 'provenance': provenance, + 'sources': { + 'pdf_dict': source_a, + 'filename': source_b, + 'gemini': source_c, + } + } + + # ── Step 11: Level-4 dedupe ─────────────────────────────────── + is_dup, dup_hash = _check_level4_dedupe(voted, db) + if is_dup: + logger.info( + "Level-4 duplicate: %s matches %s", file_hash[:8], dup_hash[:8] + ) + quarantine_dir = os.path.join( + config.get('library_root', '/mnt/library'), + '_review', 'duplicate_quarantine' + ) + try: + os.makedirs(quarantine_dir, exist_ok=True) + quarantine_path = os.path.join(quarantine_dir, filename) + shutil.move(content_path, quarantine_path) + if meta_path: + os.remove(meta_path) + + san_name = sanitize_filename(filename, doc_hash=file_hash) + db.queue_duplicate_review( + doc_hash=file_hash, + original_filename=filename, + sanitized_filename=san_name, + collision_with_hash=dup_hash, + duplicate_path=quarantine_path, + book_title=voted.get('title'), + book_author=voted.get('author'), + ) + except Exception as e: + logger.error("Failed to quarantine duplicate: %s", e) + + result['action'] = 'level4_duplicate' + return result + + # ── Step 12: Set up processing directory ────────────────────── + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = "Cannot create processing dir: {}".format(e) + return result + + pdf_proc_path = os.path.join(proc_dir, 'source.pdf') + try: + shutil.move(content_path, pdf_proc_path) + if meta_path: + shutil.move(meta_path, os.path.join(proc_dir, 'sidecar.meta.json')) + except Exception as e: + result['error'] = "Cannot move PDF to processing: {}".format(e) + return result + + # ── Step 13: Full text extraction ───────────────────────────── + # Re-open reader from new location + try: + reader = PdfReader(pdf_proc_path) + except Exception: + reader = None + + pages_extracted = 0 + ocr_pages = [] + ocr_methods = { + 'pypdf2': 0, 'pdftotext': 0, 'tesseract': 0, + 'gemini_vision': 0, 'none': 0, + } + + for i in range(page_count): + try: + if reader: + text, method = extract_text_from_page( + reader, i, pdf_proc_path, page_timeout + ) + else: + proc_result = subprocess.run( + ['pdftotext', '-f', str(i + 1), '-l', str(i + 1), + pdf_proc_path, '-'], + capture_output=True, text=True, timeout=page_timeout + ) + text = proc_result.stdout if proc_result.returncode == 0 else '' + method = 'pdftotext' if text.strip() else 'none' + + ocr_methods[method] += 1 + if method in ('tesseract', 'gemini_vision'): + ocr_pages.append(i + 1) + except Exception as e: + logger.warning("Page %d/%d failed: %s", i + 1, page_count, e) + text = '' + ocr_methods['none'] += 1 + + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i + 1)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(text) + + if text.strip(): + pages_extracted += 1 + + if (i + 1) % 50 == 0: + logger.info(" %s: page %d/%d", filename, i + 1, page_count) + + # ── Step 14: Write meta.json ────────────────────────────────── + meta = { + 'hash': file_hash, + 'filename': filename, + 'source_type': 'pdf', + 'page_count': page_count, + 'pages_extracted': pages_extracted, + 'ocr_pages': ocr_pages, + 'ocr_methods': ocr_methods, + 'metadata': voted, + 'metadata_provenance': provenance_record, + } + + # Merge sidecar meta if present + sidecar_path = os.path.join(proc_dir, 'sidecar.meta.json') + if os.path.exists(sidecar_path): + try: + with open(sidecar_path, encoding='utf-8') as f: + sidecar = json.load(f) + meta['sidecar'] = sidecar + except Exception: + pass + + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # ── Step 15: Register in catalogue + documents ──────────────── + display_title = voted.get('title') or clean_filename_to_title(filename) + size_bytes = os.path.getsize(pdf_proc_path) + + source = 'acquired/pdf' + category = 'PDF' + if meta.get('sidecar'): + source = meta['sidecar'].get('source', source) + category = meta['sidecar'].get('category', category) + + db.add_to_catalogue( + file_hash, filename, pdf_proc_path, size_bytes, source, category + ) + db.queue_document(file_hash) + + # ── Step 16: Update documents row ───────────────────────────── + conn = db._get_conn() + conn.execute( + "UPDATE documents SET " + "text_dir = ?, page_count = ?, book_title = ?, book_author = ?, " + "metadata_provenance = ? " + "WHERE hash = ?", + ( + proc_dir, + page_count, + voted.get('title'), + voted.get('author'), + json.dumps(provenance_record), + file_hash, + ) + ) + conn.commit() + + # ── Step 17: Status = extracted ─────────────────────────────── + db.update_status(file_hash, 'extracted', pages_extracted=pages_extracted) + + logger.info( + "PDF pre_flight complete: %s (%s) -> %d/%d pages in %s", + file_hash[:8], display_title, pages_extracted, page_count, proc_dir, + ) + + result['action'] = 'extracted' + return result + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _move_to_rejected(content_path, meta_path, config, filename): + """Move an unreadable PDF to _review/rejected_pdfs/.""" + review_dir = os.path.join( + config.get('library_root', '/mnt/library'), + '_review', 'rejected_pdfs' + ) + try: + os.makedirs(review_dir, exist_ok=True) + reject_path = os.path.join(review_dir, filename) + shutil.move(content_path, reject_path) + if meta_path: + os.remove(meta_path) + logger.info("Moved rejected PDF to %s", reject_path) + except Exception as e: + logger.error("Failed to move rejected PDF: %s", e) diff --git a/lib/processors/text_processor.py b/lib/processors/text_processor.py new file mode 100644 index 0000000..47e68fc --- /dev/null +++ b/lib/processors/text_processor.py @@ -0,0 +1,320 @@ +""" +RECON Text Processor + +Handles pre_flight for plain .txt files arriving in acquired/text/. +These are primary source documents (books, manuals, guides), not derived +content like transcripts. + +Metadata extraction via two-source vote: + A. Filename parsing (title, optionally author) + B. Gemini LLM extraction (title/author/edition/year from first 3 pages) + +Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/ +by the filing worker (NOT organized in-place like transcripts). + +Phase 6f: initial implementation. +""" +import json +import logging +import os +import re +import shutil + +from lib.web_scraper import chunk_text +from lib.utils import content_hash, clean_filename_to_title +from lib.processors.pdf_processor import _extract_gemini_metadata + +logger = logging.getLogger("recon.processors.text") + +WORDS_PER_PAGE = 2000 + + +def _extract_filename_metadata(filename): + """Source A: Extract metadata by parsing the filename. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + stem = os.path.splitext(filename)[0] + + result['title'] = clean_filename_to_title(filename) + + # Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY + year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem) + if year_match: + result['year'] = year_match.group(1) + + # Edition: Nth Edition, Edition N, etc. + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + stem, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + + # Author: "Title - Author" or "Title by Author" patterns + # Try " - Author" at end + dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem) + if dash_match: + result['author'] = dash_match.group(1).strip() + else: + by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem) + if by_match: + result['author'] = by_match.group(1).strip() + + return result + + +def _vote_metadata(source_a, source_b): + """Vote on each metadata field across two sources. + + Priority: B (Gemini) > A (filename). + If both agree, note agreement. + + Returns: + (voted_dict, provenance_record) + """ + sources = {'filename': source_a, 'gemini': source_b} + priority = ['gemini', 'filename'] + voted = {} + provenance = {} + + for field in ('title', 'author', 'edition', 'year'): + values = {} + for name, src in sources.items(): + val = src.get(field) + if val and str(val).strip().lower() != 'null': + values[name] = val + + if not values: + voted[field] = None + provenance[field] = None + continue + + def _norm(v): + if field == 'year': + return v.strip() + return v.strip().lower() + + norm_groups = {} + for name, val in values.items(): + nv = _norm(val) + norm_groups.setdefault(nv, []).append((name, val)) + + best_group = max(norm_groups.values(), key=len) + + if len(best_group) >= 2: + # Both sources agree + for p in priority: + for name, val in best_group: + if name == p: + voted[field] = val + names = ','.join(n for n, _ in best_group) + provenance[field] = "agreed({})".format(names) + break + if voted.get(field) is not None: + break + else: + # No agreement — highest priority wins + for p in priority: + if p in values: + voted[field] = values[p] + provenance[field] = p + break + + provenance_record = { + 'voted': voted, + 'provenance': provenance, + 'sources': { + 'filename': source_a, + 'gemini': source_b, + } + } + + return voted, provenance_record + + +def pre_flight(content_path, meta_path, db, config): + """Process a .txt file dropped into acquired/text/. + + Args: + content_path: Path to the .txt file + meta_path: Path to .meta.json sidecar (may be None) + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'skip_empty', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + filename = os.path.basename(content_path) + + # ── Step 1: Hash ────────────────────────────────────────────── + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = "Cannot hash text file: {}".format(e) + return result + + # ── Step 2: Stale state cleanup ─────────────────────────────── + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise + + # ── Step 3: Hash dedupe ─────────────────────────────────────── + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # ── Step 4: Read text content ───────────────────────────────── + try: + with open(content_path, encoding='utf-8', errors='replace') as f: + raw_text = f.read() + except Exception as e: + result['error'] = "Cannot read text file: {}".format(e) + return result + + if len(raw_text.strip()) < 50: + logger.info("Text file too short (%d chars), skipping: %s", + len(raw_text.strip()), filename) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError: + pass + result['action'] = 'skip_empty' + return result + + # ── Step 5: Read optional sidecar ───────────────────────────── + sidecar = None + if meta_path: + try: + with open(meta_path, encoding='utf-8') as f: + sidecar = json.load(f) + except Exception as e: + logger.warning("Cannot read sidecar %s: %s", meta_path, e) + + # ── Step 6: Set up processing directory ─────────────────────── + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = "Cannot create processing dir: {}".format(e) + return result + + # ── Step 7: Move files to processing ────────────────────────── + source_path = os.path.join(proc_dir, 'source.txt') + try: + shutil.move(content_path, source_path) + if meta_path: + shutil.move(meta_path, os.path.join(proc_dir, 'meta.json')) + except Exception as e: + result['error'] = "Cannot move files to processing: {}".format(e) + return result + + # ── Step 8: Split into pages ────────────────────────────────── + pages = chunk_text(raw_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # ── Step 9: Source A — filename metadata ────────────────────── + source_a = _extract_filename_metadata(filename) + + # ── Step 10: Source B — Gemini metadata ─────────────────────── + first_pages_text = "\n".join(pages[:3]) + source_b = _extract_gemini_metadata(first_pages_text, config) + + # ── Step 11: Vote ───────────────────────────────────────────── + voted, provenance_record = _vote_metadata(source_a, source_b) + + # ── Step 12: Write meta.json ────────────────────────────────── + meta = { + 'hash': file_hash, + 'filename': filename, + 'source_type': 'text', + 'page_count': len(pages), + 'text_length': len(raw_text), + 'metadata': voted, + 'metadata_provenance': provenance_record, + } + if sidecar: + meta['sidecar'] = sidecar + + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # ── Step 13: Register in catalogue ──────────────────────────── + display_title = voted.get('title') or clean_filename_to_title(filename) + size_bytes = os.path.getsize(source_path) + + db.add_to_catalogue( + file_hash, filename, source_path, size_bytes, 'text', 'Document' + ) + + # ── Step 14: Queue and update documents row ─────────────────── + db.queue_document(file_hash) + + conn = db._get_conn() + conn.execute( + "UPDATE documents SET " + "text_dir = ?, page_count = ?, path = ?, " + "book_title = ?, book_author = ?, metadata_provenance = ? " + "WHERE hash = ?", + ( + proc_dir, + len(pages), + source_path, + voted.get('title'), + voted.get('author'), + json.dumps(provenance_record), + file_hash, + ) + ) + conn.commit() + + # ── Step 15: Status = extracted ─────────────────────────────── + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + logger.info( + "Text pre_flight complete: %s (%s) -> %d pages in %s", + file_hash[:8], display_title, len(pages), proc_dir, + ) + + result['action'] = 'extracted' + return result diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py new file mode 100644 index 0000000..dbc3013 --- /dev/null +++ b/lib/processors/transcript_processor.py @@ -0,0 +1,175 @@ +""" +RECON Transcript Processor + +Handles pre_flight for transcript content arriving in acquired/stream/. +Reads a raw text file + meta.json sidecar, hashes, dedupes, splits into +page_NNNN.txt files, and registers in the database. + +Phase 3: first processor implementation. +Phase 4: added stale state cleanup at start of pre_flight. +""" +import hashlib +import json +import logging +import os +import shutil + +from lib.web_scraper import chunk_text +from lib.utils import content_hash + +logger = logging.getLogger("recon.processors.transcript") + +# Words per page for transcript chunking (matches existing pipeline) +WORDS_PER_PAGE = 2000 + + +def pre_flight(content_path, meta_path, db, config): + """Process a transcript pair from acquired/stream/. + + Args: + content_path: Path to the raw transcript .txt file + meta_path: Path to the .meta.json sidecar + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + # Read and hash the content file + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = f"Cannot hash content file: {e}" + return result + + # Stale state cleanup — remove any pre-existing processing/concepts dirs + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise + + # Hash dedupe: if hash exists in catalogue, delete the pair and return + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # Read meta.json sidecar + try: + with open(meta_path, encoding='utf-8') as f: + meta = json.load(f) + except Exception as e: + result['error'] = f"Cannot read meta.json: {e}" + return result + + # Read raw transcript text + try: + with open(content_path, encoding='utf-8') as f: + raw_text = f.read() + except Exception as e: + result['error'] = f"Cannot read content file: {e}" + return result + + if not raw_text.strip(): + result['error'] = "Empty transcript" + return result + + # Set up processing directory + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = f"Cannot create processing dir: {e}" + return result + + # Move the pair to processing/{hash}/ + try: + shutil.move(content_path, os.path.join(proc_dir, 'transcript.txt')) + shutil.move(meta_path, os.path.join(proc_dir, 'meta.json')) + except Exception as e: + result['error'] = f"Cannot move pair to processing: {e}" + return result + + # Split raw text into page_NNNN.txt files + pages = chunk_text(raw_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, f"page_{i:04d}.txt") + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # Update meta.json with processing metadata + meta['hash'] = file_hash + meta['source_type'] = meta.get('source_type', 'transcript') + meta['page_count'] = len(pages) + meta['text_length'] = len(raw_text) + meta_out_path = os.path.join(proc_dir, 'meta.json') + with open(meta_out_path, 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # Register in catalogue + title = meta.get('title', os.path.basename(content_path)) + source_url = meta.get('source_url', meta.get('url', '')) + source = 'stream.echo6.co' + category = meta.get('category', 'Transcript') + size_bytes = os.path.getsize(os.path.join(proc_dir, 'transcript.txt')) + + db.add_to_catalogue(file_hash, title, source_url, size_bytes, source, category) + + # Queue and advance to extracted + db.queue_document(file_hash) + + # Set text_dir and page_count on the documents row. + # Transcripts are derived text from PeerTube videos, not primary sources. + # They don't get filed into library/Domain/Subdomain/ like PDFs -- instead, + # they're marked organized in-place. Their watch URL remains in catalogue.path + # and Qdrant download_url so users clicking search results go to PeerTube. + # The filing worker's path LIKE filter naturally excludes transcripts since + # their documents.path is the watch URL, not a filesystem path. + conn = db._get_conn() + conn.execute( + "UPDATE documents SET text_dir = ?, page_count = ?, organized_at = CURRENT_TIMESTAMP WHERE hash = ?", + (proc_dir, len(pages), file_hash) + ) + conn.commit() + + # Update status to extracted with page count + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + logger.info( + "Transcript pre_flight complete: %s (%s) -> %d pages in %s", + file_hash[:8], title, len(pages), proc_dir, + ) + + result['action'] = 'extracted' + return result diff --git a/lib/utils.py b/lib/utils.py index a98359c..1aec793 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -388,3 +388,19 @@ def generate_download_url(filepath, library_root='/mnt/library', base_url='https parts = rel.split(os.sep) encoded = '/'.join(quote(p) for p in parts) return f"{base_url}/{encoded}" + + +def resolve_text_dir(file_hash, config, db=None): + """Resolve the text directory for a document. + + If db is provided and documents.text_dir is set for this hash, use that. + Otherwise fall back to the legacy location: config['paths']['text']/{hash}/ + """ + if db is not None: + conn = db._get_conn() + row = conn.execute( + "SELECT text_dir FROM documents WHERE hash = ?", (file_hash,) + ).fetchone() + if row and row['text_dir']: + return row['text_dir'] + return os.path.join(config['paths']['text'], file_hash) diff --git a/recon.py b/recon.py index 9cf5814..47dda7d 100755 --- a/recon.py +++ b/recon.py @@ -3,7 +3,7 @@ RECON CLI — Main entry point. Subcommands: scan, queue, extract, enrich, embed, run, search, upload, -ingest-url, crawl, ingest-peertube, organize, status, catalogue, failures, validate, rebuild, serve, ingest. +ingest-url, ingest-peertube, organize, status, catalogue, failures, validate, rebuild, serve, ingest. Usage: cd /opt/recon && source venv/bin/activate && python3 recon.py """ @@ -580,73 +580,6 @@ def cmd_ingest_url(args): -def cmd_crawl(args): - from lib.crawler import crawl_site - - print(f"Crawling {args.url}...") - if args.include: - print(f" Include paths: {args.include}") - if args.exclude: - print(f" Exclude paths: {args.exclude}") - if args.dry_run: - print(f" DRY RUN — no content will be ingested") - print() - - result = crawl_site( - base_url=args.url, - category=args.category, - source=args.source, - include=args.include, - exclude=args.exclude, - max_pages=args.max_pages, - max_depth=args.max_depth, - delay=args.delay, - dry_run=args.dry_run, - use_sitemap=not args.no_sitemap, - ) - - method = result.get('discovery_method', 'none') - print(f"Discovery method: {method}") - - if args.dry_run: - urls = result.get('urls', []) - print(f"Found {len(urls)} URLs that would be ingested:\n") - for i, url in enumerate(urls, 1): - print(f" {i:4d}. {url}") - print(f"\nTotal: {len(urls)} pages") - print(f"Re-run without --dry-run to ingest.") - return 0 - - summary = result.get('summary', {}) - print(f"\nResults:") - print(f" New: {summary.get('succeeded', 0)}") - print(f" Duplicates: {summary.get('duplicates', 0)}") - print(f" Failed: {summary.get('failed', 0)}") - print(f" Total: {summary.get('total', 0)}") - - failed_results = [r for r in result.get('results', []) if r.get('status') == 'failed'] - if failed_results: - print(f"\nFailed URLs:") - for r in failed_results[:10]: - print(f" {r['url']}: {r.get('error', 'Unknown error')}") - if len(failed_results) > 10: - print(f" ... and {len(failed_results) - 10} more") - - if args.enrich or args.process: - print("\nRunning enrichment...") - from lib.enricher import run_enrichment - enriched = run_enrichment() - print(f" Enriched: {enriched}") - - if args.process: - print("\nRunning embedding...") - from lib.embedder import run_embedding - embedded = run_embedding() - print(f" Embedded: {embedded}") - - return 0 - - def cmd_validate(args): from scripts.validate import run_validation run_validation(deep=args.deep) @@ -668,30 +601,31 @@ def cmd_serve(args): def cmd_service(args): """Run RECON as a long-lived service. Called by systemd. - Bundles: Flask dashboard + pipeline stages + library scanner + PeerTube scanner + progress reporter. + Bundles: Flask dashboard + dispatcher + pipeline stages + filing worker + progress reporter. All threads are daemon threads; SIGTERM/SIGINT trigger graceful shutdown. """ - from lib.extractor import run_extraction from lib.enricher import run_enrichment from lib.embedder import run_embedding from lib.api import app, run_server as start_dashboard - from lib.peertube_scraper import ingest_all as pt_ingest_all + from lib.dispatcher import dispatch_loop + from lib.filing import filing_worker_loop + from lib.acquisition.peertube import acquisition_loop config = get_config() proc = config.get('processing', {}) svc = config.get('service', {}) - extract_workers = proc.get('extract_workers', 4) enrich_workers = proc.get('enrich_workers', 16) embed_workers = proc.get('embed_workers', 4) poll_interval = svc.get('stage_poll_interval', 30) - scan_interval = svc.get('scan_interval', 3600) + dispatch_interval = svc.get('dispatch_interval', 30) + filing_interval = svc.get('filing_interval', 30) progress_interval = svc.get('progress_interval', 60) web_host = config.get('web', {}).get('host', '0.0.0.0') web_port = config.get('web', {}).get('port', 8420) stop_event = threading.Event() - totals = {'extract': 0, 'enrich': 0, 'embed': 0, 'scan': 0} + totals = {'enrich': 0, 'embed': 0} def shutdown(signum, frame): sig_name = signal.Signals(signum).name @@ -716,36 +650,6 @@ def cmd_service(args): stop_event.wait(poll_interval) logger.info(f"[{name}] Stage stopped (total: {totals[name]})") - def scanner_loop(): - """Periodically scan library and queue new documents.""" - logger.info(f"[scanner] Started (interval: {scan_interval}s)") - # Run initial scan immediately - try: - new_cat = scan_library() - new_queued = queue_all() - synced = sync_qdrant_paths() - if new_cat or new_queued or synced: - logger.info(f"[scanner] Initial: {new_cat} catalogued, {new_queued} queued, {synced} paths synced") - except Exception as e: - logger.error(f"[scanner] Initial scan error: {e}", exc_info=True) - - while not stop_event.is_set(): - stop_event.wait(scan_interval) - if stop_event.is_set(): - break - try: - new_cat = scan_library() - new_queued = queue_all() - synced = sync_qdrant_paths() - totals['scan'] += new_queued - if new_cat or new_queued or synced: - logger.info(f"[scanner] {new_cat} catalogued, {new_queued} queued, {synced} paths synced") - else: - logger.debug("[scanner] No new documents") - except Exception as e: - logger.error(f"[scanner] Error: {e}", exc_info=True) - logger.info("[scanner] Stopped") - def progress_loop(): """Log pipeline status periodically.""" while not stop_event.is_set(): @@ -769,217 +673,20 @@ def cmd_service(args): except Exception: pass - def peertube_scanner_loop(): - """Periodically ingest new PeerTube video transcripts.""" - from lib.peertube_scraper import set_stop_check - set_stop_check(stop_event.is_set) - logger.info(f"[peertube] Scanner started (interval: {scan_interval}s)") - try: - result = pt_ingest_all(config=config) - ingested = result.get('ingested', 0) if isinstance(result, dict) else 0 - if ingested: - logger.info(f"[peertube] Initial scan: {ingested} transcripts ingested") - else: - logger.info("[peertube] Initial scan: no new transcripts") - except Exception as e: - logger.error(f"[peertube] Initial scan error: {e}", exc_info=True) - - while not stop_event.is_set(): - stop_event.wait(scan_interval) - if stop_event.is_set(): - break - try: - result = pt_ingest_all(config=config) - ingested = result.get('ingested', 0) if isinstance(result, dict) else 0 - if ingested: - logger.info(f"[peertube] {ingested} new transcripts ingested") - else: - logger.debug("[peertube] No new transcripts") - except Exception as e: - logger.error(f"[peertube] Scan error: {e}", exc_info=True) - logger.info("[peertube] Scanner stopped") - - def crawler_scheduler_loop(): - """Scheduled site crawler — crawls configured sites, one per cycle. - - - Reads sites from config.yaml crawler.sites list - - Processes sites in tier order (lower tier = higher priority) - - Tracks last-crawled timestamp per site in crawl_state.json - - Skips sites crawled within recrawl_interval_days (default 7) - - Waits inter_site_cooldown seconds between sites (default 30) - - Uses per-site delay for rate limiting during crawl - """ - import json as _json - from lib.crawler import crawl_site - - crawler_cfg = config.get('crawler', {}) - sites = crawler_cfg.get('sites', []) - if not sites: - logger.info("[crawler] No sites configured, scheduler disabled") - return - - recrawl_days = crawler_cfg.get('recrawl_interval_days', 7) - cooldown = crawler_cfg.get('inter_site_cooldown', 30) - state_file = os.path.join(config.get('paths', {}).get('data', '/opt/recon/data'), 'crawl_state.json') - - # Load persisted state - def load_state(): - try: - with open(state_file, 'r') as sf: - return _json.load(sf) - except (FileNotFoundError, _json.JSONDecodeError): - return {} - - def save_state(state): - try: - with open(state_file, 'w') as sf: - _json.dump(state, sf, indent=2) - except Exception as e: - logger.error(f"[crawler] Failed to save state: {e}") - - # Sort by tier (lower = higher priority) - sorted_sites = sorted(sites, key=lambda s: s.get('tier', 99)) - logger.info(f"[crawler] Scheduler started — {len(sorted_sites)} sites configured, " - f"recrawl every {recrawl_days}d, {cooldown}s cooldown between sites") - - # Initial delay — let the main pipeline stabilize before crawling - stop_event.wait(60) - if stop_event.is_set(): - return - - while not stop_event.is_set(): - state = load_state() - now = time.time() - crawled_this_cycle = 0 - - for site in sorted_sites: - if stop_event.is_set(): - break - - url = site.get('url', '') - if not url: - continue - - # Check recrawl interval - last_crawled = state.get(url, {}).get('last_crawled', 0) - age_days = (now - last_crawled) / 86400 - if age_days < recrawl_days: - logger.debug(f"[crawler] Skipping {url} — crawled {age_days:.1f}d ago") - continue - - category = site.get('category', 'Web') - max_depth = site.get('max_depth', crawler_cfg.get('max_depth', 3)) - max_pages = site.get('max_pages', crawler_cfg.get('max_pages', 500)) - delay = site.get('delay', crawler_cfg.get('rate_limit_delay', 1.0)) - tier = site.get('tier', 99) - notes = site.get('notes', '') - - logger.info(f"[crawler] Starting: {url} (tier {tier}, category={category}, " - f"depth={max_depth}, pages={max_pages}, delay={delay}s)") - - try: - result = crawl_site( - base_url=url, - category=category, - max_pages=max_pages, - max_depth=max_depth, - delay=delay, - config=config, - ) - - summary = result.get('summary', {}) - method = result.get('discovery_method', 'none') - logger.info(f"[crawler] Done: {url} via {method} — " - f"{summary.get('succeeded', 0)} new, " - f"{summary.get('duplicates', 0)} dupes, " - f"{summary.get('failed', 0)} failed") - - # Update state - state[url] = { - 'last_crawled': time.time(), - 'method': method, - 'succeeded': summary.get('succeeded', 0), - 'duplicates': summary.get('duplicates', 0), - 'failed': summary.get('failed', 0), - 'tier': tier, - 'category': category, - } - save_state(state) - crawled_this_cycle += 1 - - except Exception as e: - logger.error(f"[crawler] Failed: {url} — {e}", exc_info=True) - state[url] = { - 'last_crawled': time.time(), - 'error': str(e), - 'tier': tier, - 'category': category, - } - save_state(state) - - # Inter-site cooldown - if not stop_event.is_set(): - logger.debug(f"[crawler] Cooling down {cooldown}s before next site...") - stop_event.wait(cooldown) - - if crawled_this_cycle: - logger.info(f"[crawler] Cycle complete — {crawled_this_cycle} sites crawled") - else: - logger.debug("[crawler] Cycle complete — all sites within recrawl window") - - # Sleep until next cycle check (1 hour) - if not stop_event.is_set(): - stop_event.wait(3600) - - logger.info("[crawler] Scheduler stopped") - - def organizer_loop(): - """Organize completed documents into Domain/Subdomain folders.""" - from lib.organizer import organize_document - logger.info(f"[organizer] Started (interval: {poll_interval}s)") - # Initial delay — let pipeline process some docs first - stop_event.wait(60) - - while not stop_event.is_set(): - try: - org_db = StatusDB() - unorganized = org_db.get_unorganized(limit=100) - if unorganized: - moved = 0 - errors = 0 - for doc in unorganized: - if stop_event.is_set(): - break - result = organize_document(doc['hash'], org_db, config) - if result['action'] == 'moved': - moved += 1 - elif result['action'] == 'error': - errors += 1 - if moved or errors: - logger.info(f"[organizer] Organized {moved} documents ({errors} errors)") - # Sync paths to Qdrant after batch - if moved > 0: - synced = sync_qdrant_paths() - if synced: - logger.info(f"[organizer] Synced {synced} paths to Qdrant") - else: - logger.debug("[organizer] No unorganized documents") - except Exception as e: - logger.error(f"[organizer] Error: {e}", exc_info=True) - stop_event.wait(poll_interval) - logger.info("[organizer] Stopped") + db = StatusDB() threads = [ - threading.Thread(target=stage_loop, daemon=True, name='extract', - args=('extract', lambda: run_extraction(workers=extract_workers))), + threading.Thread(target=lambda: dispatch_loop(stop_event, db, config, interval=dispatch_interval), + daemon=True, name='dispatcher'), threading.Thread(target=stage_loop, daemon=True, name='enrich', args=('enrich', lambda: run_enrichment(workers=enrich_workers))), threading.Thread(target=stage_loop, daemon=True, name='embed', args=('embed', lambda: run_embedding(workers=embed_workers))), - threading.Thread(target=scanner_loop, daemon=True, name='scanner'), - threading.Thread(target=peertube_scanner_loop, daemon=True, name='peertube'), - threading.Thread(target=crawler_scheduler_loop, daemon=True, name='crawler'), - threading.Thread(target=organizer_loop, daemon=True, name='organizer'), + threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval), + daemon=True, name='filing'), + threading.Thread(target=lambda: acquisition_loop(stop_event, db, config, + interval=config.get("peertube", {}).get("poll_interval", 1800)), + daemon=True, name="peertube-acq"), threading.Thread(target=progress_loop, daemon=True, name='progress'), threading.Thread(target=lambda: start_dashboard(stop_event), daemon=True, name='dashboard'), @@ -987,10 +694,11 @@ def cmd_service(args): logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") - logger.info(f" Workers: extract={extract_workers}, enrich={enrich_workers}, embed={embed_workers}") - logger.info(f" Scanner: every {scan_interval}s | Progress: every {progress_interval}s") - crawler_sites = config.get('crawler', {}).get('sites', []) - logger.info(f" Crawler: {len(crawler_sites)} sites configured") + logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") + logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") + pt_interval = config.get("peertube", {}).get("poll_interval", 1800) + logger.info(f" PeerTube acquisition: every {pt_interval}s") + logger.info(f" Progress: every {progress_interval}s") for t in threads: t.start() @@ -1018,10 +726,10 @@ def cmd_service(args): logger.info("=== RECON Service Stopped ===") return 0 - - def cmd_ingest_peertube(args): - from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats + from lib.peertube_scraper import get_instance_stats + from lib.acquisition.peertube import acquire_batch + from lib.status import StatusDB if args.stats: stats = get_instance_stats() @@ -1034,36 +742,20 @@ def cmd_ingest_peertube(args): print(f" {status}: {count}") return 0 - since = args.since if hasattr(args, 'since') and args.since else None + db = StatusDB() - if args.channel: - print(f"Ingesting PeerTube channel: {args.channel}") - result = ingest_channel(args.channel, since=since) - else: - print("Ingesting all PeerTube videos with captions") - result = ingest_all(since=since) + print("Acquiring PeerTube transcripts to hopper...") + result = acquire_batch(db) - summary = result.get('summary', {}) print(f"\nResults:") - print(f" Checked: {summary.get('total_checked', 0)}") - print(f" Ingested: {summary.get('ingested', 0)} ({summary.get('total_pages', 0)} pages)") - print(f" No captions: {summary.get('skipped_no_captions', 0)}") - print(f" Duplicates: {summary.get('skipped_duplicate', 0)}") - print(f" Failed: {summary.get('failed', 0)}") + print(f" Acquired: {result['acquired']}") + print(f" Skipped: {result['skipped']}") + print(f" Errors: {result['errors']}") - # Optional: run enrichment - if args.enrich or args.process: - print("\nRunning enrichment on extracted content...") - from lib.enricher import run_enrichment - enriched = run_enrichment() - print(f" Enriched: {enriched}") - - # Optional: run embedding too - if args.process: - print("\nRunning embedding...") - from lib.embedder import run_embedding - embedded = run_embedding() - print(f" Embedded: {embedded}") + if result['acquired']: + print(f"\n{result['acquired']} transcript(s) staged in hopper.") + print("The dispatcher will pick them up on its next cycle.") + print("Run 'recon status' to monitor progress.") return 0 @@ -1417,21 +1109,6 @@ def main(): p.set_defaults(func=cmd_ingest_url) # crawl - p = sub.add_parser('crawl', help='Crawl a site and ingest discovered pages') - p.add_argument('url', help='Base URL to crawl') - p.add_argument('--category', default='Web', help='Category for ingested content') - p.add_argument('--source', default=None, help='Source identifier (default: domain name)') - p.add_argument('--include', nargs='+', help='Only include URL paths starting with these prefixes') - p.add_argument('--exclude', nargs='+', help='Exclude URL paths starting with these prefixes') - p.add_argument('--max-pages', type=int, default=500, help='Maximum pages to ingest') - p.add_argument('--max-depth', type=int, default=3, help='Maximum link-follow depth') - p.add_argument('--delay', type=float, default=1.0, help='Delay between page fetches (seconds)') - p.add_argument('--dry-run', action='store_true', help='Discover URLs without ingesting') - p.add_argument('--no-sitemap', action='store_true', help='Skip sitemap, use link-following only') - p.add_argument('--enrich', action='store_true', help='Run enrichment after crawl') - p.add_argument('--process', action='store_true', help='Full pipeline: crawl + enrich + embed') - p.set_defaults(func=cmd_crawl) - # validate p = sub.add_parser('validate', help='Validate pipeline consistency') p.add_argument('--deep', action='store_true', help='Deep validation (check all files)') @@ -1459,12 +1136,8 @@ def main(): p.set_defaults(func=cmd_organize) # ingest-peertube - p = sub.add_parser('ingest-peertube', help='Ingest PeerTube video transcripts') - p.add_argument('--channel', help='Ingest specific channel (actor_name)') - p.add_argument('--since', help='Only ingest videos published after this date (ISO format)') + p = sub.add_parser('ingest-peertube', help='Acquire PeerTube transcripts to hopper') p.add_argument('--stats', action='store_true', help='Show PeerTube instance stats') - p.add_argument('--enrich', action='store_true', help='Run enrichment after ingestion') - p.add_argument('--process', action='store_true', help='Full pipeline: ingest + enrich + embed') p.set_defaults(func=cmd_ingest_peertube) # ingest diff --git a/static/css/recon.css b/static/css/recon.css index d6752cf..95aed52 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -210,6 +210,7 @@ tr:hover { background: var(--bg-secondary); } } .badge-web { background: #1e3a5f; color: #60a5fa; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-pdf { background: #2d5a2d; color: #4ade80; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-transcript { background: #3b1f5e; color: #c084fc; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } /* ── Trend indicators ── */ .trend { font-size: 11px; margin-left: 6px; } diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 4e0b3d1..254d92a 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -88,7 +88,7 @@ var pipeCount = s.in_pipeline || 0; totalCat += catCount; totalComp += compCount; totalPipe += pipeCount; totalConcepts += s.concepts; totalVectors += s.vectors; - var badge = s.type === 'web' ? 'WEB' : 'PDF'; + var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : 'PDF'; var compPct = catCount > 0 ? (compCount / catCount * 100) : 0; var pipePct = catCount > 0 ? (pipeCount / catCount * 100) : 0; var compColor = compPct >= 100 ? '#00ff41' : compPct > 0 ? '#ffa500' : '#666'; @@ -185,7 +185,7 @@ rtb.innerHTML = 'None yet'; } else { rtb.innerHTML = data.recent_complete.map(function(r) { - var badge = r.type === 'web' ? 'WEB' : 'PDF'; + var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : 'PDF'; return '' + r.title + '' + badge + '' + r.concepts + '' + r.vectors + ''; }).join(''); diff --git a/templates/knowledge/upload.html b/templates/knowledge/upload.html index 3b58a3f..1f65bad 100644 --- a/templates/knowledge/upload.html +++ b/templates/knowledge/upload.html @@ -1,12 +1,13 @@ {% extends "base.html" %} {% block content %} -

Upload PDF

+

Upload Document

- - Document File + + Supported: PDF, TXT, EPUB, DOC, DOCX, MOBI
@@ -67,7 +68,7 @@ document.getElementById('upload-form').addEventListener('submit', async function result.innerHTML = 'Queued for processing
' + 'Hash: ' + data.hash + '
' + 'File: ' + data.filename + '
' + - 'Category: ' + data.source + '/' + data.category + ''; + 'Type: ' + data.source_type + ''; fileInput.value = ''; } else { status.style.color = '#ff4444';