From de2c59a501e14d784ca76d9a41101ae5ae346386 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 15:03:36 +0000 Subject: [PATCH 01/17] Phase 2: add shared filing function (lib/filing.py) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New reusable file_processed_item() that future processors will call to file completed items from /opt/recon/data/processing/{hash}/ into the library. Reuses existing organizer logic for domain classification and collision handling. Not yet wired into the service loop — exists as library code for Phase 3+ to call. Phase 2 of the refactor. See https://forge.echo6.co/matt/refactored-recon Co-Authored-By: Claude Opus 4.6 --- lib/filing.py | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 lib/filing.py diff --git a/lib/filing.py b/lib/filing.py new file mode 100644 index 0000000..bb33e2c --- /dev/null +++ b/lib/filing.py @@ -0,0 +1,149 @@ +""" +RECON shared filing logic. + +Provides file_processed_item() — the shared function that any processor +can call to file a completed item from /opt/recon/data/processing/{hash}/ +into /mnt/library/Domain/Subdomain/{canonical_name}.{ext}. + +The function: + 1. Reads dominant domain from concept JSONs (existing logic) + 2. Derives canonical name (level 1, escalating to 2/3/4 only on collision) + 3. Moves the source file from processing/ to library/Domain/Subdomain/ + 4. Updates catalogue + documents + Qdrant payloads atomically + 5. Marks organized + +This function does NOT extract, enrich, or embed. Those are upstream stages. +This function does NOT touch the legacy organize_document() — that stays in place +until cutover (Phase 5). + +Phase 2: function exists, is tested in isolation. Not yet called by anything +in the service loop. +""" + +import logging +import os +import shutil + +from .organizer import determine_dominant_domain, _build_target_path +from .new_pipeline import update_qdrant_payload + +logger = logging.getLogger("recon.filing") + + +def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False): + """File a completed item into the library. + + Args: + doc_hash: Document hash + source_file_path: Current absolute path to the source file + (typically in /opt/recon/data/processing/{hash}/ or current library path) + db: StatusDB instance + config: RECON config dict + dry_run: If True, plan but don't move + + Returns: + dict with keys: + hash, action, source_path, target_path, + domain, subdomain, qdrant_points_updated, error + """ + result = { + "hash": doc_hash, + "action": "skip", + "source_path": source_file_path, + "target_path": None, + "domain": None, + "subdomain": None, + "qdrant_points_updated": 0, + "error": None, + } + + # Verify source file exists + if not os.path.exists(source_file_path): + result["action"] = "error" + result["error"] = f"Source file not found: {source_file_path}" + return result + + # Determine domain from existing concept JSONs + data_dir = config["paths"]["data"] + domain, subdomain, confidence = determine_dominant_domain(doc_hash, data_dir) + result["domain"] = domain + result["subdomain"] = subdomain + + if domain is None: + result["action"] = "skip_unclassified" + return result + + # Get the original filename from catalogue + conn = db._get_conn() + row = conn.execute( + "SELECT filename FROM catalogue WHERE hash = ?", (doc_hash,) + ).fetchone() + if not row: + result["action"] = "error" + result["error"] = f"Hash not in catalogue: {doc_hash}" + return result + + original_filename = row["filename"] + + # Build target path using existing collision-handling logic + library_root = config["library_root"] + target_path, sanitized_name = _build_target_path( + library_root, domain, subdomain, original_filename, doc_hash + ) + + if target_path is None: + result["action"] = "skip_unclassified" + return result + + result["target_path"] = target_path + + # If already at target (idempotency), just mark organized + if os.path.abspath(source_file_path) == os.path.abspath(target_path): + result["action"] = "skip_already_filed" + if not dry_run: + db.mark_organized(doc_hash) + return result + + if dry_run: + result["action"] = "would_file" + return result + + # Move the file + try: + target_dir = os.path.dirname(target_path) + os.makedirs(target_dir, exist_ok=True) + shutil.move(source_file_path, target_path) + except Exception as e: + result["action"] = "error" + result["error"] = f"Move failed: {e}" + logger.error("Move failed for %s: %s", doc_hash[:8], e) + return result + + # Update DB and Qdrant + try: + db.update_catalogue_path(doc_hash, target_path, sanitized_name) + db.sync_document_path(doc_hash, target_path, sanitized_name) + db.mark_organized(doc_hash) + + # Update Qdrant payloads (download_url, filename, original_filename) + points = update_qdrant_payload( + doc_hash, target_path, sanitized_name, original_filename, config + ) + result["qdrant_points_updated"] = points + + result["action"] = "filed" + logger.info( + "Filed %s -> %s [%s/%s, %d vectors]", + doc_hash[:8], + target_path, + domain, + subdomain, + points, + ) + except Exception as e: + # File was moved but DB update failed — log the dangerous state + result["action"] = "error" + result["error"] = f"DB/Qdrant update failed after move: {e}" + logger.error("DB/Qdrant update failed for %s: %s", doc_hash[:8], e) + + return result From 66fadb7487182d22dcfcb5e37234222e0815c5fc Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 15:39:42 +0000 Subject: [PATCH 02/17] Phase 3: dispatcher, transcript processor, text_dir resolution - lib/dispatcher.py: one-shot dispatcher that scans acquired// for content+sidecar pairs and routes to registered processors - lib/processors/transcript_processor.py: pre_flight() for transcripts (hash, dedupe, split into pages, register in DB, set text_dir) - lib/utils.py: resolve_text_dir() helper for text_dir column fallback - lib/enricher.py: use resolve_text_dir() instead of hardcoded path - lib/embedder.py: use resolve_text_dir() instead of hardcoded path - lib/processors/__init__.py, lib/acquisition/__init__.py: package inits Co-Authored-By: Claude Opus 4.6 --- lib/acquisition/__init__.py | 0 lib/dispatcher.py | 121 ++++++++++++++++++++ lib/embedder.py | 3 +- lib/enricher.py | 3 +- lib/processors/__init__.py | 0 lib/processors/transcript_processor.py | 152 +++++++++++++++++++++++++ lib/utils.py | 16 +++ 7 files changed, 293 insertions(+), 2 deletions(-) create mode 100644 lib/acquisition/__init__.py create mode 100644 lib/dispatcher.py create mode 100644 lib/processors/__init__.py create mode 100644 lib/processors/transcript_processor.py diff --git a/lib/acquisition/__init__.py b/lib/acquisition/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/dispatcher.py b/lib/dispatcher.py new file mode 100644 index 0000000..0abb314 --- /dev/null +++ b/lib/dispatcher.py @@ -0,0 +1,121 @@ +""" +RECON Dispatcher + +Watches configured acquired// directories for content+sidecar pairs +that have been stable (mtime unchanged) for the configured threshold, then +hands them to the appropriate processor's pre_flight(). + +Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5. +""" +import importlib +import logging +import os +import time + +from .utils import get_config +from .status import StatusDB + +logger = logging.getLogger("recon.dispatcher") + + +def _load_processor(processor_name): + """Dynamically import a processor module from lib.processors.""" + module_path = f"lib.processors.{processor_name}" + try: + return importlib.import_module(module_path) + except ImportError as e: + logger.error("Cannot load processor %s: %s", processor_name, e) + return None + + +def _find_pairs(subfolder_path): + """Find content+sidecar pairs in a subfolder. + + A pair is two files sharing a basename: + .txt (or other content extension) + .meta.json (sidecar) + + Returns list of (content_path, meta_path, basename) tuples. + """ + if not os.path.isdir(subfolder_path): + return [] + + files = set(os.listdir(subfolder_path)) + pairs = [] + + for fname in sorted(files): + if fname.endswith('.meta.json'): + basename = fname[:-len('.meta.json')] + # Look for matching content file (try common extensions) + for ext in ['.txt', '.vtt', '.html', '.pdf']: + content_name = basename + ext + if content_name in files: + pairs.append(( + os.path.join(subfolder_path, content_name), + os.path.join(subfolder_path, fname), + basename, + )) + break + + return pairs + + +def _is_stable(filepath, stability_seconds): + """Check if a file's mtime is older than stability_seconds ago.""" + try: + mtime = os.path.getmtime(filepath) + return (time.time() - mtime) >= stability_seconds + except OSError: + return False + + +def dispatch_once(): + """One-shot dispatch: scan all configured acquired/ subfolders once. + + Returns list of result dicts from processor pre_flight calls. + """ + config = get_config() + pipeline_cfg = config.get('pipeline', {}) + acquired_root = pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired') + dispatch_map = pipeline_cfg.get('dispatch', {}) + stability_seconds = pipeline_cfg.get('mtime_stability_seconds', 10) + + db = StatusDB(config['paths']['db']) + results = [] + + for subfolder_name, processor_name in dispatch_map.items(): + subfolder_path = os.path.join(acquired_root, subfolder_name) + + processor = _load_processor(processor_name) + if processor is None: + continue + + if not hasattr(processor, 'pre_flight'): + logger.error("Processor %s has no pre_flight function", processor_name) + continue + + pairs = _find_pairs(subfolder_path) + if not pairs: + continue + + for content_path, meta_path, basename in pairs: + # Both files must be stable + if not (_is_stable(content_path, stability_seconds) and + _is_stable(meta_path, stability_seconds)): + logger.debug("Pair %s not yet stable, skipping", basename) + continue + + logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name) + try: + result = processor.pre_flight(content_path, meta_path, db, config) + results.append(result) + logger.info("Result for %s: %s", basename, result.get('action', 'unknown')) + except Exception as e: + logger.error("Processor %s crashed on %s: %s", processor_name, basename, e) + results.append({ + 'action': 'error', + 'error': str(e), + 'basename': basename, + }) + + return results diff --git a/lib/embedder.py b/lib/embedder.py index b4ed162..35fcb58 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -21,6 +21,7 @@ from qdrant_client.models import PointStruct, SparseVector from .utils import get_config, concept_id, generate_download_url, setup_logging from .status import StatusDB +from .utils import resolve_text_dir logger = setup_logging('recon.embedder') @@ -274,7 +275,7 @@ def embed_single(file_hash, db, config): source_type = 'web' if is_web else 'document' # Check meta.json for explicit source_type (e.g. 'transcript') - text_dir = os.path.join(config['paths']['text'], file_hash) + text_dir = resolve_text_dir(file_hash, config, db) meta_path = os.path.join(text_dir, 'meta.json') page_timestamps = {} if os.path.exists(meta_path): diff --git a/lib/enricher.py b/lib/enricher.py index fe18d06..d9540aa 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -25,6 +25,7 @@ import google.generativeai as genai from .utils import get_config, setup_logging from .status import StatusDB +from .utils import resolve_text_dir logger = setup_logging('recon.enricher') @@ -345,7 +346,7 @@ def enrich_single(file_hash, db, config, key_rotator): if not doc: return False - text_dir = os.path.join(config['paths']['text'], file_hash) + text_dir = resolve_text_dir(file_hash, config, db) concepts_dir = os.path.join(config['paths']['concepts'], file_hash) window_size = config['processing']['enrich_window_size'] delay = config['processing']['rate_limit_delay'] diff --git a/lib/processors/__init__.py b/lib/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py new file mode 100644 index 0000000..76998f7 --- /dev/null +++ b/lib/processors/transcript_processor.py @@ -0,0 +1,152 @@ +""" +RECON Transcript Processor + +Handles pre_flight for transcript content arriving in acquired/stream/. +Reads a raw text file + meta.json sidecar, hashes, dedupes, splits into +page_NNNN.txt files, and registers in the database. + +Phase 3: first processor implementation. +""" +import hashlib +import json +import logging +import os +import shutil + +from lib.web_scraper import chunk_text +from lib.utils import content_hash + +logger = logging.getLogger("recon.processors.transcript") + +# Words per page for transcript chunking (matches existing pipeline) +WORDS_PER_PAGE = 2000 + + +def pre_flight(content_path, meta_path, db, config): + """Process a transcript pair from acquired/stream/. + + Args: + content_path: Path to the raw transcript .txt file + meta_path: Path to the .meta.json sidecar + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + # Read and hash the content file + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = f"Cannot hash content file: {e}" + return result + + # Hash dedupe: if hash exists in catalogue, delete the pair and return + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # Read meta.json sidecar + try: + with open(meta_path, encoding='utf-8') as f: + meta = json.load(f) + except Exception as e: + result['error'] = f"Cannot read meta.json: {e}" + return result + + # Read raw transcript text + try: + with open(content_path, encoding='utf-8') as f: + raw_text = f.read() + except Exception as e: + result['error'] = f"Cannot read content file: {e}" + return result + + if not raw_text.strip(): + result['error'] = "Empty transcript" + return result + + # Set up processing directory + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = f"Cannot create processing dir: {e}" + return result + + # Move the pair to processing/{hash}/ + try: + shutil.move(content_path, os.path.join(proc_dir, 'transcript.txt')) + shutil.move(meta_path, os.path.join(proc_dir, 'meta.json')) + except Exception as e: + result['error'] = f"Cannot move pair to processing: {e}" + return result + + # Split raw text into page_NNNN.txt files + pages = chunk_text(raw_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, f"page_{i:04d}.txt") + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # Update meta.json with processing metadata + meta['hash'] = file_hash + meta['source_type'] = meta.get('source_type', 'transcript') + meta['page_count'] = len(pages) + meta['text_length'] = len(raw_text) + meta_out_path = os.path.join(proc_dir, 'meta.json') + with open(meta_out_path, 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # Register in catalogue + title = meta.get('title', os.path.basename(content_path)) + source_url = meta.get('source_url', meta.get('url', '')) + source = 'stream.echo6.co' + category = meta.get('category', 'Transcript') + size_bytes = os.path.getsize(os.path.join(proc_dir, 'transcript.txt')) + + db.add_to_catalogue(file_hash, title, source_url, size_bytes, source, category) + + # Queue and advance to extracted + db.queue_document(file_hash) + + # Set text_dir on the documents row + conn = db._get_conn() + conn.execute( + "UPDATE documents SET text_dir = ? WHERE hash = ?", + (proc_dir, file_hash) + ) + conn.commit() + + # Update status to extracted with page count + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + logger.info( + "Transcript pre_flight complete: %s (%s) -> %d pages in %s", + file_hash[:8], title, len(pages), proc_dir, + ) + + result['action'] = 'extracted' + return result diff --git a/lib/utils.py b/lib/utils.py index a98359c..1aec793 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -388,3 +388,19 @@ def generate_download_url(filepath, library_root='/mnt/library', base_url='https parts = rel.split(os.sep) encoded = '/'.join(quote(p) for p in parts) return f"{base_url}/{encoded}" + + +def resolve_text_dir(file_hash, config, db=None): + """Resolve the text directory for a document. + + If db is provided and documents.text_dir is set for this hash, use that. + Otherwise fall back to the legacy location: config['paths']['text']/{hash}/ + """ + if db is not None: + conn = db._get_conn() + row = conn.execute( + "SELECT text_dir FROM documents WHERE hash = ?", (file_hash,) + ).fetchone() + if row and row['text_dir']: + return row['text_dir'] + return os.path.join(config['paths']['text'], file_hash) From f69c04a0e3f60584c24dd9dc5f33a6d06200c35e Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 15:43:21 +0000 Subject: [PATCH 03/17] Phase 3: fix page_count in transcript processor Set page_count on documents row during pre_flight. Without this, enricher comparison `page_count >= 3` fails with TypeError on NULL. Co-Authored-By: Claude Opus 4.6 --- lib/processors/transcript_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py index 76998f7..392db6d 100644 --- a/lib/processors/transcript_processor.py +++ b/lib/processors/transcript_processor.py @@ -132,11 +132,11 @@ def pre_flight(content_path, meta_path, db, config): # Queue and advance to extracted db.queue_document(file_hash) - # Set text_dir on the documents row + # Set text_dir and page_count on the documents row conn = db._get_conn() conn.execute( - "UPDATE documents SET text_dir = ? WHERE hash = ?", - (proc_dir, file_hash) + "UPDATE documents SET text_dir = ?, page_count = ? WHERE hash = ?", + (proc_dir, len(pages), file_hash) ) conn.commit() From 9fe6a0a782a12c2a7d7f4186725e86ccb69670e7 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 16:39:57 +0000 Subject: [PATCH 04/17] Phase 4: Phase 3 cleanup fixes Fix 1.1: filing preserves source file extension instead of defaulting to .pdf Fix 1.2: back-fixed soldering transcript from .pdf to .txt (hash 380dbc78) Fix 1.3: dispatcher logs missing processor modules at DEBUG, not ERROR Fix 1.4: transcript processor cleans stale processing/concepts dirs on entry Also: dispatcher now handles solo content files without .meta.json sidecar Co-Authored-By: Claude Opus 4.6 --- lib/dispatcher.py | 47 +++++++++++++++++++------- lib/filing.py | 9 +++++ lib/processors/transcript_processor.py | 17 +++++++--- 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/lib/dispatcher.py b/lib/dispatcher.py index 0abb314..f9a7b78 100644 --- a/lib/dispatcher.py +++ b/lib/dispatcher.py @@ -6,6 +6,7 @@ that have been stable (mtime unchanged) for the configured threshold, then hands them to the appropriate processor's pre_flight(). Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5. +Phase 4: sidecar is optional (PDFs may arrive without .meta.json). """ import importlib import logging @@ -17,37 +18,44 @@ from .status import StatusDB logger = logging.getLogger("recon.dispatcher") +# Content file extensions recognized by the dispatcher +CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'} + def _load_processor(processor_name): """Dynamically import a processor module from lib.processors.""" module_path = f"lib.processors.{processor_name}" try: return importlib.import_module(module_path) + except ModuleNotFoundError: + logger.debug("Processor module not found: %s (not yet implemented)", processor_name) + return None except ImportError as e: - logger.error("Cannot load processor %s: %s", processor_name, e) + logger.error("Failed to import processor %s: %s", processor_name, e) return None def _find_pairs(subfolder_path): - """Find content+sidecar pairs in a subfolder. + """Find content files (with optional sidecar) in a subfolder. - A pair is two files sharing a basename: - .txt (or other content extension) - .meta.json (sidecar) + A pair is: + . — content file + .meta.json — optional sidecar - Returns list of (content_path, meta_path, basename) tuples. + Returns list of (content_path, meta_path_or_None, basename) tuples. """ if not os.path.isdir(subfolder_path): return [] files = set(os.listdir(subfolder_path)) pairs = [] + seen_basenames = set() + # First pass: find .meta.json files and their matching content for fname in sorted(files): if fname.endswith('.meta.json'): basename = fname[:-len('.meta.json')] - # Look for matching content file (try common extensions) - for ext in ['.txt', '.vtt', '.html', '.pdf']: + for ext in sorted(CONTENT_EXTENSIONS): content_name = basename + ext if content_name in files: pairs.append(( @@ -55,8 +63,21 @@ def _find_pairs(subfolder_path): os.path.join(subfolder_path, fname), basename, )) + seen_basenames.add(content_name) break + # Second pass: find solo content files (no sidecar) + for fname in sorted(files): + if fname in seen_basenames: + continue + _stem, ext = os.path.splitext(fname) + if ext.lower() in CONTENT_EXTENSIONS and not fname.endswith('.meta.json'): + pairs.append(( + os.path.join(subfolder_path, fname), + None, + _stem, + )) + return pairs @@ -99,10 +120,12 @@ def dispatch_once(): continue for content_path, meta_path, basename in pairs: - # Both files must be stable - if not (_is_stable(content_path, stability_seconds) and - _is_stable(meta_path, stability_seconds)): - logger.debug("Pair %s not yet stable, skipping", basename) + # Content file must be stable; sidecar too if present + if not _is_stable(content_path, stability_seconds): + logger.debug("File %s not yet stable, skipping", basename) + continue + if meta_path and not _is_stable(meta_path, stability_seconds): + logger.debug("Sidecar for %s not yet stable, skipping", basename) continue logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name) diff --git a/lib/filing.py b/lib/filing.py index bb33e2c..4267922 100644 --- a/lib/filing.py +++ b/lib/filing.py @@ -95,6 +95,15 @@ def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False): result["action"] = "skip_unclassified" return result + # Fix 1.1: Preserve the source file's actual extension instead of + # the default .pdf that sanitize_filename() may have applied + source_ext = os.path.splitext(source_file_path)[1].lower() + if source_ext: + target_stem, _old_ext = os.path.splitext(target_path) + target_path = target_stem + source_ext + san_stem, _old_ext = os.path.splitext(sanitized_name) + sanitized_name = san_stem + source_ext + result["target_path"] = target_path # If already at target (idempotency), just mark organized diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py index 392db6d..21860cf 100644 --- a/lib/processors/transcript_processor.py +++ b/lib/processors/transcript_processor.py @@ -6,6 +6,7 @@ Reads a raw text file + meta.json sidecar, hashes, dedupes, splits into page_NNNN.txt files, and registers in the database. Phase 3: first processor implementation. +Phase 4: added stale state cleanup at start of pre_flight. """ import hashlib import json @@ -50,6 +51,15 @@ def pre_flight(content_path, meta_path, db, config): result['error'] = f"Cannot hash content file: {e}" return result + # Stale state cleanup — remove any pre-existing processing/concepts dirs + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + shutil.rmtree(proc_dir, ignore_errors=True) + shutil.rmtree(concepts_dir, ignore_errors=True) + # Hash dedupe: if hash exists in catalogue, delete the pair and return conn = db._get_conn() existing = conn.execute( @@ -59,7 +69,8 @@ def pre_flight(content_path, meta_path, db, config): logger.info("Duplicate hash %s, removing pair", file_hash[:8]) try: os.remove(content_path) - os.remove(meta_path) + if meta_path: + os.remove(meta_path) except OSError as e: logger.warning("Failed to remove duplicate pair: %s", e) result['action'] = 'duplicate' @@ -86,10 +97,6 @@ def pre_flight(content_path, meta_path, db, config): return result # Set up processing directory - processing_root = config.get('pipeline', {}).get( - 'processing_root', '/opt/recon/data/processing' - ) - proc_dir = os.path.join(processing_root, file_hash) try: os.makedirs(proc_dir, exist_ok=True) except Exception as e: From 96e1e642c4574ebf410bc28c46c84d66ba5d3f50 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 16:57:44 +0000 Subject: [PATCH 05/17] Phase 4: PDF processor with layered metadata extraction - Add lib/processors/pdf_processor.py with full pre_flight pipeline - Layered metadata: Source A (PDF dict), Source B (filename), Source C (Gemini) - Field-by-field voting with provenance tracking (metadata_provenance column) - Level-4 strict dedupe (title+author+edition+year) - Content failures route to _review/rejected_pdfs/ - Level-4 duplicates route to _review/duplicate_quarantine/ - Full text extraction using existing extract_text_from_page fallback chain - Schema: added metadata_provenance TEXT to documents table Co-Authored-By: Claude Opus 4.6 --- lib/processors/pdf_processor.py | 655 ++++++++++++++++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 lib/processors/pdf_processor.py diff --git a/lib/processors/pdf_processor.py b/lib/processors/pdf_processor.py new file mode 100644 index 0000000..2c9224e --- /dev/null +++ b/lib/processors/pdf_processor.py @@ -0,0 +1,655 @@ +""" +RECON PDF Processor + +Handles pre_flight for PDF content arriving in acquired/pdf/. +Opens the PDF, extracts layered metadata (PDF dict + filename + Gemini), +votes on fields, checks level-4 dedupe, runs full text extraction, +and registers in the database. + +Metadata sources: + A. PDF info dictionary (PyPDF2 reader.metadata) + B. Filename parsing (clean_filename_to_title + regex patterns) + C. Gemini LLM on first 3 pages of extracted text + +Voting: if 2+ sources agree on a field, use that value. +Otherwise priority: C (Gemini) > A (PDF dict) > B (filename). + +Level-4 dedupe: requires ALL FOUR fields (title, author, edition, year) +present and matching an existing document. Very conservative — only +catches near-certain duplicates. + +Failure modes: + - Transient (Gemini API): retry 3x with backoff, then continue without + - Content (unreadable PDF): move to _review/rejected_pdfs/ + - Level-4 duplicate: move to _review/duplicate_quarantine/ + +Phase 4: first implementation. +""" +import json +import logging +import os +import re +import shutil +import subprocess +import time + +import google.generativeai as genai +from PyPDF2 import PdfReader + +from lib.extractor import extract_text_from_page +from lib.utils import content_hash, clean_filename_to_title, sanitize_filename + +logger = logging.getLogger("recon.processors.pdf") + +# Maximum retries for transient (API) failures +MAX_TRANSIENT_RETRIES = 3 +TRANSIENT_BACKOFF_SECONDS = 30 + + +# ── Metadata Extraction Sources ───────────────────────────────────── + + +def _extract_pdf_dict(reader): + """Source A: Extract metadata from PDF's built-in info dictionary. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + try: + info = reader.metadata + if not info: + return result + except Exception: + return result + + # Title + title = None + try: + title = info.get('/Title') or info.title + except Exception: + pass + if title and isinstance(title, str) and len(title.strip()) > 2: + result['title'] = title.strip() + + # Author + author = None + try: + author = info.get('/Author') or info.author + except Exception: + pass + if author and isinstance(author, str) and len(author.strip()) > 1: + result['author'] = author.strip() + + # Year from CreationDate or ModDate + for date_key in ('/CreationDate', '/ModDate'): + try: + date_val = info.get(date_key) + except Exception: + continue + if date_val and isinstance(date_val, str): + match = re.search(r'(?:D:)?(\d{4})', date_val) + if match: + year = int(match.group(1)) + if 1800 <= year <= 2030: + result['year'] = str(year) + break + + # Edition from Subject or Title + for field_val in [info.get('/Subject'), title]: + if field_val and isinstance(field_val, str): + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + field_val, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + break + + return result + + +def _extract_filename_metadata(filename): + """Source B: Extract metadata by parsing the filename. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + stem = os.path.splitext(filename)[0] + + # Title from filename (using existing utility) + result['title'] = clean_filename_to_title(filename) + + # Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY near boundaries + year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem) + if year_match: + result['year'] = year_match.group(1) + + # Edition: look for Nth Edition, Edition N, etc. + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + stem, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + + # Author: look for "by Author Name" pattern + by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem) + if by_match: + result['author'] = by_match.group(1).strip() + + return result + + +def _extract_gemini_metadata(pages_text, config): + """Source C: Extract metadata using Gemini on first 3 pages text. + + Returns dict with keys: title, author, edition, year (any may be None). + Retries up to MAX_TRANSIENT_RETRIES times on transient failures. + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + keys = config.get('gemini_keys', []) + if not keys or len(pages_text.strip()) < 50: + return result + + prompt = ( + "Extract the following metadata from this book/document text.\n" + 'Return JSON: {"title": "...", "author": "...", "edition": "...", "year": "..."}\n' + "- title: The full title of the book or document\n" + "- author: The author(s) name(s)\n" + '- edition: The edition (e.g. "2nd Edition", "Revised Edition")\n' + "- year: The publication year (4-digit number as string)\n" + "If a field cannot be determined, use null.\n\n" + "Text from first pages:\n" + + pages_text[:6000] + ) + + for attempt in range(MAX_TRANSIENT_RETRIES): + try: + key = keys[attempt % len(keys)] + genai.configure(api_key=key) + model = genai.GenerativeModel( + config['gemini']['model'], + generation_config={ + "response_mime_type": config['gemini']['response_mime_type'] + } + ) + response = model.generate_content(prompt) + data = json.loads(response.text) + + for field in ('title', 'author', 'edition', 'year'): + val = data.get(field) + if val and isinstance(val, str) and val.strip(): + result[field] = val.strip() + + return result + + except Exception as e: + logger.warning( + "Gemini metadata attempt %d/%d failed: %s", + attempt + 1, MAX_TRANSIENT_RETRIES, e + ) + if attempt < MAX_TRANSIENT_RETRIES - 1: + time.sleep(TRANSIENT_BACKOFF_SECONDS) + + logger.warning( + "Gemini metadata extraction failed after %d attempts", MAX_TRANSIENT_RETRIES + ) + return result + + +# ── Voting ─────────────────────────────────────────────────────────── + + +def _vote_metadata(source_a, source_b, source_c): + """Vote on each metadata field across three sources. + + Priority: C (Gemini) > A (PDF dict) > B (filename). + If 2+ sources agree, use the agreed value. + + Returns: + (voted_dict, provenance_dict) + voted_dict: {field: value_or_None} + provenance_dict: {field: source_label_or_None} + """ + sources = {'pdf_dict': source_a, 'filename': source_b, 'gemini': source_c} + priority = ['gemini', 'pdf_dict', 'filename'] + voted = {} + provenance = {} + + for field in ('title', 'author', 'edition', 'year'): + values = {} + for name, src in sources.items(): + val = src.get(field) + if val: + values[name] = val + + if not values: + voted[field] = None + provenance[field] = None + continue + + # Normalize for comparison + def _norm(v): + if field == 'year': + return v.strip() + return v.strip().lower() + + # Group by normalized value + norm_groups = {} + for name, val in values.items(): + nv = _norm(val) + norm_groups.setdefault(nv, []).append((name, val)) + + # Find group with most agreement + best_group = max(norm_groups.values(), key=len) + + if len(best_group) >= 2: + # 2+ sources agree — use highest-priority original case + for p in priority: + for name, val in best_group: + if name == p: + voted[field] = val + names = ','.join(n for n, _ in best_group) + provenance[field] = "agreed({})".format(names) + break + if voted.get(field) is not None: + break + else: + # No agreement — highest priority wins + for p in priority: + if p in values: + voted[field] = values[p] + provenance[field] = p + break + + return voted, provenance + + +# ── Level-4 Dedupe ─────────────────────────────────────────────────── + + +def _check_level4_dedupe(voted, db): + """Level-4 strict dedupe: all four fields must be present and match. + + Returns (is_duplicate, matching_hash) or (False, None). + """ + title = voted.get('title') + author = voted.get('author') + edition = voted.get('edition') + year = voted.get('year') + + if not all([title, author, edition, year]): + return False, None + + conn = db._get_conn() + rows = conn.execute( + "SELECT hash, metadata_provenance FROM documents " + "WHERE book_title = ? AND book_author = ?", + (title, author) + ).fetchall() + + if not rows: + return False, None + + for row in rows: + prov_json = row['metadata_provenance'] + if not prov_json: + continue + try: + prov_data = json.loads(prov_json) + existing_voted = prov_data.get('voted', {}) + ex_edition = existing_voted.get('edition', '') + ex_year = existing_voted.get('year', '') + if (ex_edition and ex_year + and ex_edition.lower() == edition.lower() + and ex_year == year): + return True, row['hash'] + except (json.JSONDecodeError, AttributeError): + continue + + return False, None + + +# ── Page Count Fallback ────────────────────────────────────────────── + + +def _pdfinfo_page_count(pdf_path): + """Get page count via pdfinfo (poppler) when PdfReader fails.""" + try: + proc = subprocess.run( + ['pdfinfo', pdf_path], + capture_output=True, text=True, timeout=30 + ) + if proc.returncode == 0: + for line in proc.stdout.splitlines(): + if line.startswith('Pages:'): + return int(line.split(':', 1)[1].strip()) + except Exception: + pass + return 0 + + +# ── Pre-Flight ─────────────────────────────────────────────────────── + + +def pre_flight(content_path, meta_path, db, config): + """Process a PDF from acquired/pdf/. + + Args: + content_path: Path to the PDF file + meta_path: Path to .meta.json sidecar (may be None) + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'level4_duplicate', + 'content_failure', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + filename = os.path.basename(content_path) + + # ── Step 1: Hash ────────────────────────────────────────────── + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = "Cannot hash PDF: {}".format(e) + return result + + # ── Step 2: Stale state cleanup ─────────────────────────────── + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + shutil.rmtree(proc_dir, ignore_errors=True) + shutil.rmtree(concepts_dir, ignore_errors=True) + + # ── Step 3: Hash dedupe ─────────────────────────────────────── + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # ── Step 4: Size check ──────────────────────────────────────── + proc_cfg = config.get('processing', {}) + max_size_mb = proc_cfg.get('max_pdf_size_mb', 200) + try: + file_size_mb = os.path.getsize(content_path) / 1048576 + except OSError as e: + result['error'] = "Cannot stat PDF: {}".format(e) + return result + + if file_size_mb > max_size_mb: + result['error'] = "PDF too large: {:.0f}MB > {}MB limit".format( + file_size_mb, max_size_mb + ) + result['action'] = 'content_failure' + _move_to_rejected(content_path, meta_path, config, filename) + return result + + # ── Step 5: Open PDF ────────────────────────────────────────── + reader = None + page_count = 0 + + try: + reader = PdfReader(content_path) + page_count = len(reader.pages) + except Exception as e: + logger.warning( + "PdfReader failed for %s: %s — trying pdfinfo", filename, e + ) + page_count = _pdfinfo_page_count(content_path) + + if page_count == 0: + logger.error("Content failure: cannot read PDF %s", filename) + result['action'] = 'content_failure' + result['error'] = "Cannot read PDF (0 pages)" + _move_to_rejected(content_path, meta_path, config, filename) + return result + + # ── Step 6: Source A — PDF dict metadata ────────────────────── + source_a = _extract_pdf_dict(reader) if reader else { + 'title': None, 'author': None, 'edition': None, 'year': None + } + + # ── Step 7: Source B — Filename metadata ────────────────────── + source_b = _extract_filename_metadata(filename) + + # ── Step 8: Extract first 3 pages for Source C ──────────────── + page_timeout = proc_cfg.get('page_timeout', 30) + first_pages_text = "" + + if reader: + for i in range(min(3, page_count)): + try: + text, _method = extract_text_from_page( + reader, i, content_path, page_timeout + ) + first_pages_text += text + "\n" + except Exception as e: + logger.warning( + "Failed to extract page %d for metadata: %s", i + 1, e + ) + + # ── Step 9: Source C — Gemini metadata ──────────────────────── + source_c = _extract_gemini_metadata(first_pages_text, config) + + # ── Step 10: Vote ───────────────────────────────────────────── + voted, provenance = _vote_metadata(source_a, source_b, source_c) + + provenance_record = { + 'voted': voted, + 'provenance': provenance, + 'sources': { + 'pdf_dict': source_a, + 'filename': source_b, + 'gemini': source_c, + } + } + + # ── Step 11: Level-4 dedupe ─────────────────────────────────── + is_dup, dup_hash = _check_level4_dedupe(voted, db) + if is_dup: + logger.info( + "Level-4 duplicate: %s matches %s", file_hash[:8], dup_hash[:8] + ) + quarantine_dir = os.path.join( + config.get('library_root', '/mnt/library'), + '_review', 'duplicate_quarantine' + ) + try: + os.makedirs(quarantine_dir, exist_ok=True) + quarantine_path = os.path.join(quarantine_dir, filename) + shutil.move(content_path, quarantine_path) + if meta_path: + os.remove(meta_path) + + san_name = sanitize_filename(filename, doc_hash=file_hash) + db.queue_duplicate_review( + doc_hash=file_hash, + original_filename=filename, + sanitized_filename=san_name, + collision_with_hash=dup_hash, + duplicate_path=quarantine_path, + book_title=voted.get('title'), + book_author=voted.get('author'), + ) + except Exception as e: + logger.error("Failed to quarantine duplicate: %s", e) + + result['action'] = 'level4_duplicate' + return result + + # ── Step 12: Set up processing directory ────────────────────── + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = "Cannot create processing dir: {}".format(e) + return result + + pdf_proc_path = os.path.join(proc_dir, 'source.pdf') + try: + shutil.move(content_path, pdf_proc_path) + if meta_path: + shutil.move(meta_path, os.path.join(proc_dir, 'sidecar.meta.json')) + except Exception as e: + result['error'] = "Cannot move PDF to processing: {}".format(e) + return result + + # ── Step 13: Full text extraction ───────────────────────────── + # Re-open reader from new location + try: + reader = PdfReader(pdf_proc_path) + except Exception: + reader = None + + pages_extracted = 0 + ocr_pages = [] + ocr_methods = { + 'pypdf2': 0, 'pdftotext': 0, 'tesseract': 0, + 'gemini_vision': 0, 'none': 0, + } + + for i in range(page_count): + try: + if reader: + text, method = extract_text_from_page( + reader, i, pdf_proc_path, page_timeout + ) + else: + proc_result = subprocess.run( + ['pdftotext', '-f', str(i + 1), '-l', str(i + 1), + pdf_proc_path, '-'], + capture_output=True, text=True, timeout=page_timeout + ) + text = proc_result.stdout if proc_result.returncode == 0 else '' + method = 'pdftotext' if text.strip() else 'none' + + ocr_methods[method] += 1 + if method in ('tesseract', 'gemini_vision'): + ocr_pages.append(i + 1) + except Exception as e: + logger.warning("Page %d/%d failed: %s", i + 1, page_count, e) + text = '' + ocr_methods['none'] += 1 + + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i + 1)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(text) + + if text.strip(): + pages_extracted += 1 + + if (i + 1) % 50 == 0: + logger.info(" %s: page %d/%d", filename, i + 1, page_count) + + # ── Step 14: Write meta.json ────────────────────────────────── + meta = { + 'hash': file_hash, + 'filename': filename, + 'source_type': 'pdf', + 'page_count': page_count, + 'pages_extracted': pages_extracted, + 'ocr_pages': ocr_pages, + 'ocr_methods': ocr_methods, + 'metadata': voted, + 'metadata_provenance': provenance_record, + } + + # Merge sidecar meta if present + sidecar_path = os.path.join(proc_dir, 'sidecar.meta.json') + if os.path.exists(sidecar_path): + try: + with open(sidecar_path, encoding='utf-8') as f: + sidecar = json.load(f) + meta['sidecar'] = sidecar + except Exception: + pass + + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # ── Step 15: Register in catalogue + documents ──────────────── + display_title = voted.get('title') or clean_filename_to_title(filename) + size_bytes = os.path.getsize(pdf_proc_path) + + source = 'acquired/pdf' + category = 'PDF' + if meta.get('sidecar'): + source = meta['sidecar'].get('source', source) + category = meta['sidecar'].get('category', category) + + db.add_to_catalogue( + file_hash, filename, pdf_proc_path, size_bytes, source, category + ) + db.queue_document(file_hash) + + # ── Step 16: Update documents row ───────────────────────────── + conn = db._get_conn() + conn.execute( + "UPDATE documents SET " + "text_dir = ?, page_count = ?, book_title = ?, book_author = ?, " + "metadata_provenance = ? " + "WHERE hash = ?", + ( + proc_dir, + page_count, + voted.get('title'), + voted.get('author'), + json.dumps(provenance_record), + file_hash, + ) + ) + conn.commit() + + # ── Step 17: Status = extracted ─────────────────────────────── + db.update_status(file_hash, 'extracted', pages_extracted=pages_extracted) + + logger.info( + "PDF pre_flight complete: %s (%s) -> %d/%d pages in %s", + file_hash[:8], display_title, pages_extracted, page_count, proc_dir, + ) + + result['action'] = 'extracted' + return result + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _move_to_rejected(content_path, meta_path, config, filename): + """Move an unreadable PDF to _review/rejected_pdfs/.""" + review_dir = os.path.join( + config.get('library_root', '/mnt/library'), + '_review', 'rejected_pdfs' + ) + try: + os.makedirs(review_dir, exist_ok=True) + reject_path = os.path.join(review_dir, filename) + shutil.move(content_path, reject_path) + if meta_path: + os.remove(meta_path) + logger.info("Moved rejected PDF to %s", reject_path) + except Exception as e: + logger.error("Failed to move rejected PDF: %s", e) From d9aed35fd7231c04c79bd507128b2345932bed6b Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 18:30:58 +0000 Subject: [PATCH 06/17] Phase 5c-1: dispatcher loop, filing worker loop, service rewire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds dispatch_loop() alongside dispatch_once() for service-thread use. Adds filing_worker_loop() that watches for status=complete items in /opt/recon/data/processing/ and files them to library/Domain/Subdomain/. Rewires cmd_service() to run the new architecture: - Removed: scanner_loop, peertube_scanner_loop, crawler_scheduler_loop, organizer_loop (all replaced by dispatcher + new filing worker) - Kept: enrich and embed stage workers, progress, dashboard - Kept (vestigial): extract stage worker — will be removed in Phase 6 cleanup - Added: dispatcher loop thread, filing worker thread Phase 5c-1 of the refactor. Service not yet started — Phase 5c-2 will do that. Co-Authored-By: Claude Opus 4.6 --- lib/dispatcher.py | 28 +++++ lib/filing.py | 57 +++++++++++ recon.py | 255 +++------------------------------------------- 3 files changed, 98 insertions(+), 242 deletions(-) diff --git a/lib/dispatcher.py b/lib/dispatcher.py index f9a7b78..a26436c 100644 --- a/lib/dispatcher.py +++ b/lib/dispatcher.py @@ -142,3 +142,31 @@ def dispatch_once(): }) return results + + +def dispatch_loop(stop_event, db, config, interval=30): + """Run dispatch_once() on a loop until stop_event is set. + + Designed to run as a service thread. Never raises to the caller. + """ + logger.info("[dispatcher] Loop started (interval: %ds)", interval) + + while not stop_event.is_set(): + try: + results = dispatch_once() + if results: + actions = {} + for r in results: + a = r.get('action', 'unknown') + actions[a] = actions.get(a, 0) + 1 + logger.info("[dispatcher] Dispatched %d items: %s", + len(results), + ", ".join(f"{k}={v}" for k, v in sorted(actions.items()))) + else: + logger.debug("[dispatcher] No items to dispatch") + except Exception as e: + logger.error("[dispatcher] Error in dispatch_once: %s", e, exc_info=True) + + stop_event.wait(interval) + + logger.info("[dispatcher] Loop stopped") diff --git a/lib/filing.py b/lib/filing.py index 4267922..fc8ff00 100644 --- a/lib/filing.py +++ b/lib/filing.py @@ -156,3 +156,60 @@ def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False): logger.error("DB/Qdrant update failed for %s: %s", doc_hash[:8], e) return result + + +def filing_worker_loop(stop_event, db, config, interval=30): + """Run filing on items ready to be filed until stop_event is set. + + Watches for documents with status='complete', organized_at IS NULL, + and path in /opt/recon/data/processing/. Files them to library. + + Designed to run as a service thread. Never raises to the caller. + """ + logger.info("[filing] Worker started (interval: %ds)", interval) + + while not stop_event.is_set(): + try: + conn = db._get_conn() + rows = conn.execute( + "SELECT hash, path FROM documents " + "WHERE status = 'complete' " + "AND organized_at IS NULL " + "AND path LIKE '/opt/recon/data/processing/%' " + "LIMIT 50" + ).fetchall() + + if rows: + filed = 0 + skipped = 0 + errors = 0 + for row in rows: + if stop_event.is_set(): + break + try: + result = file_processed_item(row['hash'], row['path'], db, config) + action = result.get('action', 'unknown') + if action == 'filed': + filed += 1 + elif action.startswith('skip'): + skipped += 1 + elif action == 'error': + errors += 1 + logger.warning("[filing] Error filing %s: %s", + row['hash'][:8], result.get('error', 'unknown')) + except Exception as e: + errors += 1 + logger.error("[filing] Exception filing %s: %s", + row['hash'][:8], e, exc_info=True) + + logger.info("[filing] Batch: %d filed, %d skipped, %d errors", + filed, skipped, errors) + else: + logger.debug("[filing] No items ready to file") + + except Exception as e: + logger.error("[filing] Error in filing worker: %s", e, exc_info=True) + + stop_event.wait(interval) + + logger.info("[filing] Worker stopped") diff --git a/recon.py b/recon.py index 9cf5814..e00dfa2 100755 --- a/recon.py +++ b/recon.py @@ -668,14 +668,15 @@ def cmd_serve(args): def cmd_service(args): """Run RECON as a long-lived service. Called by systemd. - Bundles: Flask dashboard + pipeline stages + library scanner + PeerTube scanner + progress reporter. + Bundles: Flask dashboard + dispatcher + pipeline stages + filing worker + progress reporter. All threads are daemon threads; SIGTERM/SIGINT trigger graceful shutdown. """ from lib.extractor import run_extraction from lib.enricher import run_enrichment from lib.embedder import run_embedding from lib.api import app, run_server as start_dashboard - from lib.peertube_scraper import ingest_all as pt_ingest_all + from lib.dispatcher import dispatch_loop + from lib.filing import filing_worker_loop config = get_config() proc = config.get('processing', {}) @@ -685,13 +686,14 @@ def cmd_service(args): enrich_workers = proc.get('enrich_workers', 16) embed_workers = proc.get('embed_workers', 4) poll_interval = svc.get('stage_poll_interval', 30) - scan_interval = svc.get('scan_interval', 3600) + dispatch_interval = svc.get('dispatch_interval', 30) + filing_interval = svc.get('filing_interval', 30) progress_interval = svc.get('progress_interval', 60) web_host = config.get('web', {}).get('host', '0.0.0.0') web_port = config.get('web', {}).get('port', 8420) stop_event = threading.Event() - totals = {'extract': 0, 'enrich': 0, 'embed': 0, 'scan': 0} + totals = {'extract': 0, 'enrich': 0, 'embed': 0} def shutdown(signum, frame): sig_name = signal.Signals(signum).name @@ -716,36 +718,6 @@ def cmd_service(args): stop_event.wait(poll_interval) logger.info(f"[{name}] Stage stopped (total: {totals[name]})") - def scanner_loop(): - """Periodically scan library and queue new documents.""" - logger.info(f"[scanner] Started (interval: {scan_interval}s)") - # Run initial scan immediately - try: - new_cat = scan_library() - new_queued = queue_all() - synced = sync_qdrant_paths() - if new_cat or new_queued or synced: - logger.info(f"[scanner] Initial: {new_cat} catalogued, {new_queued} queued, {synced} paths synced") - except Exception as e: - logger.error(f"[scanner] Initial scan error: {e}", exc_info=True) - - while not stop_event.is_set(): - stop_event.wait(scan_interval) - if stop_event.is_set(): - break - try: - new_cat = scan_library() - new_queued = queue_all() - synced = sync_qdrant_paths() - totals['scan'] += new_queued - if new_cat or new_queued or synced: - logger.info(f"[scanner] {new_cat} catalogued, {new_queued} queued, {synced} paths synced") - else: - logger.debug("[scanner] No new documents") - except Exception as e: - logger.error(f"[scanner] Error: {e}", exc_info=True) - logger.info("[scanner] Stopped") - def progress_loop(): """Log pipeline status periodically.""" while not stop_event.is_set(): @@ -769,217 +741,19 @@ def cmd_service(args): except Exception: pass - def peertube_scanner_loop(): - """Periodically ingest new PeerTube video transcripts.""" - from lib.peertube_scraper import set_stop_check - set_stop_check(stop_event.is_set) - logger.info(f"[peertube] Scanner started (interval: {scan_interval}s)") - try: - result = pt_ingest_all(config=config) - ingested = result.get('ingested', 0) if isinstance(result, dict) else 0 - if ingested: - logger.info(f"[peertube] Initial scan: {ingested} transcripts ingested") - else: - logger.info("[peertube] Initial scan: no new transcripts") - except Exception as e: - logger.error(f"[peertube] Initial scan error: {e}", exc_info=True) - - while not stop_event.is_set(): - stop_event.wait(scan_interval) - if stop_event.is_set(): - break - try: - result = pt_ingest_all(config=config) - ingested = result.get('ingested', 0) if isinstance(result, dict) else 0 - if ingested: - logger.info(f"[peertube] {ingested} new transcripts ingested") - else: - logger.debug("[peertube] No new transcripts") - except Exception as e: - logger.error(f"[peertube] Scan error: {e}", exc_info=True) - logger.info("[peertube] Scanner stopped") - - def crawler_scheduler_loop(): - """Scheduled site crawler — crawls configured sites, one per cycle. - - - Reads sites from config.yaml crawler.sites list - - Processes sites in tier order (lower tier = higher priority) - - Tracks last-crawled timestamp per site in crawl_state.json - - Skips sites crawled within recrawl_interval_days (default 7) - - Waits inter_site_cooldown seconds between sites (default 30) - - Uses per-site delay for rate limiting during crawl - """ - import json as _json - from lib.crawler import crawl_site - - crawler_cfg = config.get('crawler', {}) - sites = crawler_cfg.get('sites', []) - if not sites: - logger.info("[crawler] No sites configured, scheduler disabled") - return - - recrawl_days = crawler_cfg.get('recrawl_interval_days', 7) - cooldown = crawler_cfg.get('inter_site_cooldown', 30) - state_file = os.path.join(config.get('paths', {}).get('data', '/opt/recon/data'), 'crawl_state.json') - - # Load persisted state - def load_state(): - try: - with open(state_file, 'r') as sf: - return _json.load(sf) - except (FileNotFoundError, _json.JSONDecodeError): - return {} - - def save_state(state): - try: - with open(state_file, 'w') as sf: - _json.dump(state, sf, indent=2) - except Exception as e: - logger.error(f"[crawler] Failed to save state: {e}") - - # Sort by tier (lower = higher priority) - sorted_sites = sorted(sites, key=lambda s: s.get('tier', 99)) - logger.info(f"[crawler] Scheduler started — {len(sorted_sites)} sites configured, " - f"recrawl every {recrawl_days}d, {cooldown}s cooldown between sites") - - # Initial delay — let the main pipeline stabilize before crawling - stop_event.wait(60) - if stop_event.is_set(): - return - - while not stop_event.is_set(): - state = load_state() - now = time.time() - crawled_this_cycle = 0 - - for site in sorted_sites: - if stop_event.is_set(): - break - - url = site.get('url', '') - if not url: - continue - - # Check recrawl interval - last_crawled = state.get(url, {}).get('last_crawled', 0) - age_days = (now - last_crawled) / 86400 - if age_days < recrawl_days: - logger.debug(f"[crawler] Skipping {url} — crawled {age_days:.1f}d ago") - continue - - category = site.get('category', 'Web') - max_depth = site.get('max_depth', crawler_cfg.get('max_depth', 3)) - max_pages = site.get('max_pages', crawler_cfg.get('max_pages', 500)) - delay = site.get('delay', crawler_cfg.get('rate_limit_delay', 1.0)) - tier = site.get('tier', 99) - notes = site.get('notes', '') - - logger.info(f"[crawler] Starting: {url} (tier {tier}, category={category}, " - f"depth={max_depth}, pages={max_pages}, delay={delay}s)") - - try: - result = crawl_site( - base_url=url, - category=category, - max_pages=max_pages, - max_depth=max_depth, - delay=delay, - config=config, - ) - - summary = result.get('summary', {}) - method = result.get('discovery_method', 'none') - logger.info(f"[crawler] Done: {url} via {method} — " - f"{summary.get('succeeded', 0)} new, " - f"{summary.get('duplicates', 0)} dupes, " - f"{summary.get('failed', 0)} failed") - - # Update state - state[url] = { - 'last_crawled': time.time(), - 'method': method, - 'succeeded': summary.get('succeeded', 0), - 'duplicates': summary.get('duplicates', 0), - 'failed': summary.get('failed', 0), - 'tier': tier, - 'category': category, - } - save_state(state) - crawled_this_cycle += 1 - - except Exception as e: - logger.error(f"[crawler] Failed: {url} — {e}", exc_info=True) - state[url] = { - 'last_crawled': time.time(), - 'error': str(e), - 'tier': tier, - 'category': category, - } - save_state(state) - - # Inter-site cooldown - if not stop_event.is_set(): - logger.debug(f"[crawler] Cooling down {cooldown}s before next site...") - stop_event.wait(cooldown) - - if crawled_this_cycle: - logger.info(f"[crawler] Cycle complete — {crawled_this_cycle} sites crawled") - else: - logger.debug("[crawler] Cycle complete — all sites within recrawl window") - - # Sleep until next cycle check (1 hour) - if not stop_event.is_set(): - stop_event.wait(3600) - - logger.info("[crawler] Scheduler stopped") - - def organizer_loop(): - """Organize completed documents into Domain/Subdomain folders.""" - from lib.organizer import organize_document - logger.info(f"[organizer] Started (interval: {poll_interval}s)") - # Initial delay — let pipeline process some docs first - stop_event.wait(60) - - while not stop_event.is_set(): - try: - org_db = StatusDB() - unorganized = org_db.get_unorganized(limit=100) - if unorganized: - moved = 0 - errors = 0 - for doc in unorganized: - if stop_event.is_set(): - break - result = organize_document(doc['hash'], org_db, config) - if result['action'] == 'moved': - moved += 1 - elif result['action'] == 'error': - errors += 1 - if moved or errors: - logger.info(f"[organizer] Organized {moved} documents ({errors} errors)") - # Sync paths to Qdrant after batch - if moved > 0: - synced = sync_qdrant_paths() - if synced: - logger.info(f"[organizer] Synced {synced} paths to Qdrant") - else: - logger.debug("[organizer] No unorganized documents") - except Exception as e: - logger.error(f"[organizer] Error: {e}", exc_info=True) - stop_event.wait(poll_interval) - logger.info("[organizer] Stopped") + db = StatusDB() threads = [ + threading.Thread(target=lambda: dispatch_loop(stop_event, db, config, interval=dispatch_interval), + daemon=True, name='dispatcher'), threading.Thread(target=stage_loop, daemon=True, name='extract', args=('extract', lambda: run_extraction(workers=extract_workers))), threading.Thread(target=stage_loop, daemon=True, name='enrich', args=('enrich', lambda: run_enrichment(workers=enrich_workers))), threading.Thread(target=stage_loop, daemon=True, name='embed', args=('embed', lambda: run_embedding(workers=embed_workers))), - threading.Thread(target=scanner_loop, daemon=True, name='scanner'), - threading.Thread(target=peertube_scanner_loop, daemon=True, name='peertube'), - threading.Thread(target=crawler_scheduler_loop, daemon=True, name='crawler'), - threading.Thread(target=organizer_loop, daemon=True, name='organizer'), + threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval), + daemon=True, name='filing'), threading.Thread(target=progress_loop, daemon=True, name='progress'), threading.Thread(target=lambda: start_dashboard(stop_event), daemon=True, name='dashboard'), @@ -988,9 +762,8 @@ def cmd_service(args): logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: extract={extract_workers}, enrich={enrich_workers}, embed={embed_workers}") - logger.info(f" Scanner: every {scan_interval}s | Progress: every {progress_interval}s") - crawler_sites = config.get('crawler', {}).get('sites', []) - logger.info(f" Crawler: {len(crawler_sites)} sites configured") + logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") + logger.info(f" Progress: every {progress_interval}s") for t in threads: t.start() @@ -1018,8 +791,6 @@ def cmd_service(args): logger.info("=== RECON Service Stopped ===") return 0 - - def cmd_ingest_peertube(args): from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats From 9fa60f9c86b313e7305ce5b66647216441480af6 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 20:15:48 +0000 Subject: [PATCH 07/17] Fix: stale cleanup in processors must fail loudly on permission errors Phase 5c-2 failed because shutil.rmtree(ignore_errors=True) silently failed to clean up root-owned legacy files in processing/{hash}/, letting the processor proceed into a half-cleaned directory and then crash on subsequent file writes. Changes: removed ignore_errors=True, wrapped in try/except that logs and re-raises, so the processor fails early and visibly if stale cleanup fails. Recovery from Phase 5c-2 failure. --- lib/processors/pdf_processor.py | 14 ++++++++++++-- lib/processors/transcript_processor.py | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/lib/processors/pdf_processor.py b/lib/processors/pdf_processor.py index 2c9224e..5d7a57f 100644 --- a/lib/processors/pdf_processor.py +++ b/lib/processors/pdf_processor.py @@ -372,8 +372,18 @@ def pre_flight(content_path, meta_path, db, config): ) proc_dir = os.path.join(processing_root, file_hash) concepts_dir = os.path.join(config['paths']['concepts'], file_hash) - shutil.rmtree(proc_dir, ignore_errors=True) - shutil.rmtree(concepts_dir, ignore_errors=True) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise # ── Step 3: Hash dedupe ─────────────────────────────────────── conn = db._get_conn() diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py index 21860cf..c5d8023 100644 --- a/lib/processors/transcript_processor.py +++ b/lib/processors/transcript_processor.py @@ -57,8 +57,18 @@ def pre_flight(content_path, meta_path, db, config): ) proc_dir = os.path.join(processing_root, file_hash) concepts_dir = os.path.join(config['paths']['concepts'], file_hash) - shutil.rmtree(proc_dir, ignore_errors=True) - shutil.rmtree(concepts_dir, ignore_errors=True) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise # Hash dedupe: if hash exists in catalogue, delete the pair and return conn = db._get_conn() From df29d598d377fcc1de81d912a743c292c4689f99 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 22:49:21 +0000 Subject: [PATCH 08/17] Phase 6a: transcripts mark organized in-place, skip filing Transcripts are derived text from PeerTube videos, not primary source files. They do not belong in library/Domain/Subdomain/ like PDFs. Change: transcript_processor.pre_flight() now sets organized_at = CURRENT_TIMESTAMP at the end of successful processing, marking the transcript as organized in place. The watch URL remains in catalogue.path and Qdrant download_url so users clicking search results go to the PeerTube video. The filing workers path LIKE filter naturally excludes transcripts since their documents.path is the watch URL, not a filesystem path. No filing worker changes needed. Back-fills 2,260 drain items from Phase 5c-2 via one-time SQL. Co-Authored-By: Claude Opus 4.6 --- lib/processors/transcript_processor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py index c5d8023..dbc3013 100644 --- a/lib/processors/transcript_processor.py +++ b/lib/processors/transcript_processor.py @@ -149,10 +149,16 @@ def pre_flight(content_path, meta_path, db, config): # Queue and advance to extracted db.queue_document(file_hash) - # Set text_dir and page_count on the documents row + # Set text_dir and page_count on the documents row. + # Transcripts are derived text from PeerTube videos, not primary sources. + # They don't get filed into library/Domain/Subdomain/ like PDFs -- instead, + # they're marked organized in-place. Their watch URL remains in catalogue.path + # and Qdrant download_url so users clicking search results go to PeerTube. + # The filing worker's path LIKE filter naturally excludes transcripts since + # their documents.path is the watch URL, not a filesystem path. conn = db._get_conn() conn.execute( - "UPDATE documents SET text_dir = ?, page_count = ? WHERE hash = ?", + "UPDATE documents SET text_dir = ?, page_count = ?, organized_at = CURRENT_TIMESTAMP WHERE hash = ?", (proc_dir, len(pages), file_hash) ) conn.commit() From 70b80cb312c45e20f65190a60ee76c410a426f29 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 23:05:29 +0000 Subject: [PATCH 09/17] Phase 6b: fix dashboard Untitled/WEB bug for transcripts Two bugs in the Recently Completed table: 1. Title showed "Untitled" for all transcripts because the dashboard read documents.book_title (populated by PDF metadata voting) which is NULL for transcripts. Fixed by COALESCE(book_title, filename) in the SQL query -- falls back to catalogue.filename which holds the real video title. 2. Type showed "WEB" for all transcripts because the type CASE expression only had web and pdf branches, with web matching any http% path -- and transcript paths are PeerTube watch URLs. Fixed by adding a transcript branch keyed on catalogue.source = stream.echo6.co, evaluated before the web branch. Also adds badge-transcript CSS (purple) and JS rendering case. Applied consistently to both the Recently Completed and Sources table queries. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 24 +++++++++++++++++------- static/css/recon.css | 1 + static/js/dashboard.js | 4 ++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lib/api.py b/lib/api.py index 4ceab68..fa72c3f 100644 --- a/lib/api.py +++ b/lib/api.py @@ -882,7 +882,11 @@ def _build_knowledge_stats(): sources = conn.execute(""" SELECT c.source, - CASE WHEN c.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type, + CASE + WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN c.path LIKE 'http%' THEN 'web' + ELSE 'pdf' + END as type, COUNT(DISTINCT c.hash) as catalogued, COUNT(DISTINCT CASE WHEN d.status = 'complete' THEN d.hash END) as complete, COUNT(DISTINCT CASE WHEN d.status NOT IN ('complete', 'failed') AND d.status IS NOT NULL THEN d.hash END) as in_pipeline, @@ -935,11 +939,17 @@ def _build_knowledge_stats(): """).fetchone()[0] recent = conn.execute(""" - SELECT book_title, status, concepts_extracted, vectors_inserted, - CASE WHEN path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type - FROM documents - WHERE status = 'complete' - ORDER BY embedded_at DESC + SELECT COALESCE(d.book_title, c.filename) as title, + d.status, d.concepts_extracted, d.vectors_inserted, + CASE + WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN d.path LIKE 'http%' THEN 'web' + ELSE 'pdf' + END as type + FROM documents d + JOIN catalogue c ON d.hash = c.hash + WHERE d.status = 'complete' + ORDER BY d.embedded_at DESC LIMIT 10 """).fetchall() @@ -979,7 +989,7 @@ def _build_knowledge_stats(): 'source_types': dict(sorted(source_type_counts.items(), key=lambda x: -x[1])), 'sample_size': sample_size, 'recent_complete': [{ - 'title': r['book_title'] or 'Untitled', + 'title': r['title'] or 'Untitled', 'concepts': r['concepts_extracted'] or 0, 'vectors': r['vectors_inserted'] or 0, 'type': r['type'], diff --git a/static/css/recon.css b/static/css/recon.css index d6752cf..95aed52 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -210,6 +210,7 @@ tr:hover { background: var(--bg-secondary); } } .badge-web { background: #1e3a5f; color: #60a5fa; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-pdf { background: #2d5a2d; color: #4ade80; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-transcript { background: #3b1f5e; color: #c084fc; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } /* ── Trend indicators ── */ .trend { font-size: 11px; margin-left: 6px; } diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 4e0b3d1..254d92a 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -88,7 +88,7 @@ var pipeCount = s.in_pipeline || 0; totalCat += catCount; totalComp += compCount; totalPipe += pipeCount; totalConcepts += s.concepts; totalVectors += s.vectors; - var badge = s.type === 'web' ? 'WEB' : 'PDF'; + var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : 'PDF'; var compPct = catCount > 0 ? (compCount / catCount * 100) : 0; var pipePct = catCount > 0 ? (pipeCount / catCount * 100) : 0; var compColor = compPct >= 100 ? '#00ff41' : compPct > 0 ? '#ffa500' : '#666'; @@ -185,7 +185,7 @@ rtb.innerHTML = 'None yet'; } else { rtb.innerHTML = data.recent_complete.map(function(r) { - var badge = r.type === 'web' ? 'WEB' : 'PDF'; + var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : 'PDF'; return '' + r.title + '' + badge + '' + r.concepts + '' + r.vectors + ''; }).join(''); From efae4023f6623beb2dee541c10620f64a1940bb4 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 14 Apr 2026 23:46:00 +0000 Subject: [PATCH 10/17] Phase 6c: remove vestigial extract worker, dead crawler, .bak files recon.py: - Remove extract stage_loop thread from cmd_service(). Confirmed vestigial: 0 queued items, silent logs over 24+ hour run. The new processors do extraction inline in pre_flight(). - Remove cmd_crawl CLI subcommand and its argparse registration. - Clean up associated imports and variables. Deleted: - lib/crawler.py (432 lines) -- old web crawler subsystem, only referenced by the removed CLI subcommand. - 24 .bak files (untracked pre-edit safety backups, originals preserved in git history). Investigation found the four old loop function definitions (scanner_loop, peertube_scanner_loop, crawler_scheduler_loop, organizer_loop) were already deleted in Phase 5c-1. Modules investigated and KEPT: - lib/web_scraper.py -- exports chunk_text() used by transcript_processor - lib/new_pipeline.py -- active Stream B library management CLI tool - lib/peertube_scraper.py -- only mechanism for transcript ingestion - lib/extractor.py -- would activate for new PDFs via cmd_run CLI Service restart verified: 6 threads (dispatcher, enrich, embed, filing, progress, dashboard), no extract worker, zero errors. Co-Authored-By: Claude Opus 4.6 --- lib/crawler.py | 432 ------------------------------------------------- recon.py | 92 +---------- 2 files changed, 3 insertions(+), 521 deletions(-) delete mode 100644 lib/crawler.py diff --git a/lib/crawler.py b/lib/crawler.py deleted file mode 100644 index a37a0b9..0000000 --- a/lib/crawler.py +++ /dev/null @@ -1,432 +0,0 @@ -""" -RECON Site Crawler — URL discovery for bulk web ingestion. - -Two discovery strategies: -1. Sitemap-based (preferred) — parses sitemap.xml for all URLs -2. Link-following (fallback) — crawls from root URL following internal links - -Discovered URLs are fed into web_scraper.ingest_url() for processing. -""" - -import re -import time -from collections import deque -from urllib.parse import urlparse, urljoin, urldefrag - -import requests -from lxml import etree - -from .utils import get_config, setup_logging - -logger = setup_logging('recon.crawler') - - -def _get_crawler_config(config=None): - """Load crawler config with defaults.""" - if config is None: - config = get_config() - crawler_cfg = config.get('crawler', {}) - web_cfg = config.get('web_scraper', {}) - return { - 'user_agent': ( - crawler_cfg.get('user_agent') or - web_cfg.get('user_agent') or - 'Mozilla/5.0 (compatible; RECON/1.0)' - ), - 'fetch_timeout': crawler_cfg.get('fetch_timeout', 30), - 'rate_limit_delay': crawler_cfg.get('rate_limit_delay', 1.0), - 'max_pages': crawler_cfg.get('max_pages', 500), - 'max_depth': crawler_cfg.get('max_depth', 3), - 'default_exclude': crawler_cfg.get('default_exclude', [ - '/search', '/404', '/login', '/signup', '/auth/', '/api/', '/assets/', '/static/' - ]), - } - - -# ─── Sitemap Discovery ───────────────────────────────────────────── - -def discover_sitemap_url(base_url, config=None): - """ - Find the sitemap URL for a site. - - Checks: robots.txt Sitemap: directive, /sitemap.xml, - /sitemap_index.xml, /sitemap-0.xml. - - Returns sitemap URL or None. - """ - cfg = _get_crawler_config(config) - headers = {'User-Agent': cfg['user_agent']} - parsed = urlparse(base_url) - root = f"{parsed.scheme}://{parsed.netloc}" - - # Check robots.txt first - try: - resp = requests.get( - f"{root}/robots.txt", - headers=headers, - timeout=cfg['fetch_timeout'] - ) - if resp.status_code == 200: - for line in resp.text.splitlines(): - if line.strip().lower().startswith('sitemap:'): - sitemap_url = line.split(':', 1)[1].strip() - # Handle "Sitemap: https://..." — split(':',1) keeps the URL intact - # but "Sitemap: https://..." splits into "Sitemap" and " https://..." - # Need to rejoin properly - if not sitemap_url.startswith('http'): - sitemap_url = line[line.index(':') + 1:].strip() - logger.info(f"Found sitemap in robots.txt: {sitemap_url}") - return sitemap_url - except Exception as e: - logger.debug(f"robots.txt fetch failed: {e}") - - # Try common sitemap locations - candidates = [ - f"{root}/sitemap.xml", - f"{root}/sitemap_index.xml", - f"{root}/sitemap-0.xml", - ] - - for url in candidates: - try: - resp = requests.head( - url, - headers=headers, - timeout=cfg['fetch_timeout'], - allow_redirects=True - ) - if resp.status_code == 200: - logger.info(f"Found sitemap at: {url}") - return url - except Exception: - continue - - logger.warning(f"No sitemap found for {base_url}") - return None - - -def parse_sitemap(sitemap_url, config=None): - """ - Parse a sitemap XML and return all page URLs. - - Handles standard sitemaps () and sitemap indexes - () with recursive sub-sitemap fetching. - """ - cfg = _get_crawler_config(config) - headers = {'User-Agent': cfg['user_agent']} - all_urls = [] - - def _fetch_and_parse(url, depth=0): - if depth > 3: - return - - try: - resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout']) - resp.raise_for_status() - except Exception as e: - logger.error(f"Failed to fetch sitemap {url}: {e}") - return - - try: - root = etree.fromstring(resp.content) - except etree.XMLSyntaxError as e: - logger.error(f"Invalid XML in sitemap {url}: {e}") - return - - nsmap = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} - - # Check if this is a sitemap index - sitemap_locs = root.findall('.//ns:sitemap/ns:loc', nsmap) - if sitemap_locs: - logger.info(f"Sitemap index at {url} — {len(sitemap_locs)} sub-sitemaps") - for loc in sitemap_locs: - if loc.text: - _fetch_and_parse(loc.text.strip(), depth + 1) - return - - # Standard sitemap — extract URLs - url_locs = root.findall('.//ns:loc', nsmap) - - # Fallback: try without namespace - if not url_locs: - url_locs = root.findall('.//loc') - - for loc in url_locs: - if loc.text: - all_urls.append(loc.text.strip()) - - logger.info(f"Parsed {len(url_locs)} URLs from {url}") - - _fetch_and_parse(sitemap_url) - - # Deduplicate preserving order - seen = set() - unique = [] - for url in all_urls: - url_clean = urldefrag(url)[0] - if url_clean not in seen: - seen.add(url_clean) - unique.append(url_clean) - - logger.info(f"Total unique URLs from sitemap: {len(unique)}") - return unique - - -# ─── Link-Following Discovery (Fallback) ─────────────────────────── - -def crawl_links(base_url, max_depth=3, max_pages=500, config=None): - """ - Discover URLs by following internal links (BFS). - Fallback when no sitemap is available. - """ - from bs4 import BeautifulSoup - - cfg = _get_crawler_config(config) - headers = {'User-Agent': cfg['user_agent']} - - parsed_base = urlparse(base_url) - base_domain = parsed_base.netloc - - discovered = [] - visited = set() - queue = deque([(base_url, 0)]) - - skip_extensions = ( - '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.svg', - '.css', '.js', '.zip', '.tar', '.gz', '.mp4', '.mp3', - '.ico', '.woff', '.woff2', '.ttf', '.eot', - ) - skip_paths = ( - '/tag/', '/tags/', '/page/', '/feed/', '/rss/', - '/wp-json/', '/wp-admin/', '/wp-includes/', - ) - - while queue and len(discovered) < max_pages: - url, depth = queue.popleft() - url = urldefrag(url)[0] - - if url in visited: - continue - if depth > max_depth: - continue - - visited.add(url) - discovered.append(url) - - if depth >= max_depth: - continue - - try: - resp = requests.get(url, headers=headers, timeout=cfg['fetch_timeout']) - if resp.status_code != 200: - continue - if 'text/html' not in resp.headers.get('content-type', ''): - continue - except Exception: - continue - - try: - soup = BeautifulSoup(resp.text, 'lxml') - except Exception: - continue - - for a_tag in soup.find_all('a', href=True): - href = a_tag['href'] - full_url = urljoin(url, href) - full_url = urldefrag(full_url)[0] - - parsed = urlparse(full_url) - if parsed.netloc != base_domain: - continue - if any(parsed.path.lower().endswith(ext) for ext in skip_extensions): - continue - if any(skip in parsed.path.lower() for skip in skip_paths): - continue - - if full_url not in visited: - queue.append((full_url, depth + 1)) - - time.sleep(cfg['rate_limit_delay']) - - logger.info(f"Link crawl: {len(discovered)} URLs (visited {len(visited)}, depth {max_depth})") - return discovered - - -# ─── URL Filtering ────────────────────────────────────────────────── - -def filter_urls(urls, include=None, exclude=None): - """ - Filter URLs by path prefix include/exclude rules. - - include: URL must match at least one prefix (if provided) - exclude: URL must not match any prefix - """ - filtered = [] - - for url in urls: - path = urlparse(url).path - - if include: - if not any(path.startswith(prefix) for prefix in include): - continue - - if exclude: - if any(path.startswith(prefix) for prefix in exclude): - continue - - filtered.append(url) - - logger.info(f"Filtered {len(urls)} -> {len(filtered)} URLs " - f"(include={include}, exclude={exclude})") - return filtered - - -# ─── Main Crawl Orchestrator ──────────────────────────────────────── - -def crawl_site( - base_url, - category='Web', - source=None, - include=None, - exclude=None, - max_pages=None, - max_depth=None, - delay=None, - dry_run=False, - use_sitemap=True, - use_links=True, - config=None, -): - """ - Crawl a site and ingest all discovered pages. - - 1. Discover URLs via sitemap or link-following - 2. Apply include/exclude filters - 3. Feed each URL through web_scraper.ingest_url() - - Returns summary dict with counts and per-URL results. - """ - if config is None: - config = get_config() - cfg = _get_crawler_config(config) - - if max_pages is None: - max_pages = cfg['max_pages'] - if max_depth is None: - max_depth = cfg['max_depth'] - if delay is None: - delay = cfg['rate_limit_delay'] - if source is None: - source = urlparse(base_url).netloc - - logger.info(f"Crawling {base_url} (category={category}, max_pages={max_pages})") - - # ── Phase 1: Discover URLs ── - - urls = [] - discovery_method = None - - if use_sitemap: - sitemap_url = discover_sitemap_url(base_url, config) - if sitemap_url: - urls = parse_sitemap(sitemap_url, config) - discovery_method = 'sitemap' - - if not urls and use_links: - logger.info("No sitemap URLs, falling back to link crawl...") - urls = crawl_links(base_url, max_depth=max_depth, max_pages=max_pages, config=config) - discovery_method = 'link_crawl' - - if not urls: - logger.warning(f"No URLs discovered for {base_url}") - return { - 'site': base_url, - 'discovery_method': None, - 'urls_discovered': 0, - 'urls_after_filter': 0, - 'results': [], - 'summary': {'total': 0, 'succeeded': 0, 'duplicates': 0, 'failed': 0}, - } - - # ── Phase 2: Filter URLs ── - - all_exclude = list(cfg['default_exclude']) - if exclude: - all_exclude.extend(exclude) - - urls = filter_urls(urls, include=include, exclude=all_exclude) - - if len(urls) > max_pages: - logger.info(f"Limiting to {max_pages} pages (discovered {len(urls)})") - urls = urls[:max_pages] - - logger.info(f"After filtering: {len(urls)} URLs to process") - - # ── Dry run ── - - if dry_run: - return { - 'site': base_url, - 'discovery_method': discovery_method, - 'dry_run': True, - 'urls_discovered': len(urls), - 'urls': urls, - } - - # ── Phase 3: Ingest each URL ── - - from .web_scraper import ingest_url - - results = [] - total = len(urls) - - for i, url in enumerate(urls, 1): - logger.info(f"[{i}/{total}] Ingesting: {url}") - - try: - result = ingest_url(url, category=category, source=source, config=config) - result['url'] = url - results.append(result) - - status = result.get('status', 'unknown') - title = result.get('title', '') - if status == 'duplicate': - logger.info(f" DUPLICATE: {title}") - else: - logger.info(f" OK: {title} ({result.get('page_count', 0)} pages)") - - except Exception as e: - logger.error(f" FAILED: {url} -- {e}") - results.append({ - 'url': url, - 'status': 'failed', - 'error': str(e), - }) - - if i < total and delay > 0: - time.sleep(delay) - - # ── Summary ── - - succeeded = sum(1 for r in results if r.get('status') not in ('failed', 'duplicate')) - duplicates = sum(1 for r in results if r.get('status') == 'duplicate') - failed = sum(1 for r in results if r.get('status') == 'failed') - - summary = { - 'total': len(results), - 'succeeded': succeeded, - 'duplicates': duplicates, - 'failed': failed, - } - - logger.info(f"Crawl complete: {succeeded} new, {duplicates} duplicates, {failed} failed out of {total}") - - return { - 'site': base_url, - 'domain': urlparse(base_url).netloc, - 'category': category, - 'discovery_method': discovery_method, - 'urls_discovered': total, - 'results': results, - 'summary': summary, - } diff --git a/recon.py b/recon.py index e00dfa2..0619ac0 100755 --- a/recon.py +++ b/recon.py @@ -3,7 +3,7 @@ RECON CLI — Main entry point. Subcommands: scan, queue, extract, enrich, embed, run, search, upload, -ingest-url, crawl, ingest-peertube, organize, status, catalogue, failures, validate, rebuild, serve, ingest. +ingest-url, ingest-peertube, organize, status, catalogue, failures, validate, rebuild, serve, ingest. Usage: cd /opt/recon && source venv/bin/activate && python3 recon.py """ @@ -580,73 +580,6 @@ def cmd_ingest_url(args): -def cmd_crawl(args): - from lib.crawler import crawl_site - - print(f"Crawling {args.url}...") - if args.include: - print(f" Include paths: {args.include}") - if args.exclude: - print(f" Exclude paths: {args.exclude}") - if args.dry_run: - print(f" DRY RUN — no content will be ingested") - print() - - result = crawl_site( - base_url=args.url, - category=args.category, - source=args.source, - include=args.include, - exclude=args.exclude, - max_pages=args.max_pages, - max_depth=args.max_depth, - delay=args.delay, - dry_run=args.dry_run, - use_sitemap=not args.no_sitemap, - ) - - method = result.get('discovery_method', 'none') - print(f"Discovery method: {method}") - - if args.dry_run: - urls = result.get('urls', []) - print(f"Found {len(urls)} URLs that would be ingested:\n") - for i, url in enumerate(urls, 1): - print(f" {i:4d}. {url}") - print(f"\nTotal: {len(urls)} pages") - print(f"Re-run without --dry-run to ingest.") - return 0 - - summary = result.get('summary', {}) - print(f"\nResults:") - print(f" New: {summary.get('succeeded', 0)}") - print(f" Duplicates: {summary.get('duplicates', 0)}") - print(f" Failed: {summary.get('failed', 0)}") - print(f" Total: {summary.get('total', 0)}") - - failed_results = [r for r in result.get('results', []) if r.get('status') == 'failed'] - if failed_results: - print(f"\nFailed URLs:") - for r in failed_results[:10]: - print(f" {r['url']}: {r.get('error', 'Unknown error')}") - if len(failed_results) > 10: - print(f" ... and {len(failed_results) - 10} more") - - if args.enrich or args.process: - print("\nRunning enrichment...") - from lib.enricher import run_enrichment - enriched = run_enrichment() - print(f" Enriched: {enriched}") - - if args.process: - print("\nRunning embedding...") - from lib.embedder import run_embedding - embedded = run_embedding() - print(f" Embedded: {embedded}") - - return 0 - - def cmd_validate(args): from scripts.validate import run_validation run_validation(deep=args.deep) @@ -671,7 +604,6 @@ def cmd_service(args): Bundles: Flask dashboard + dispatcher + pipeline stages + filing worker + progress reporter. All threads are daemon threads; SIGTERM/SIGINT trigger graceful shutdown. """ - from lib.extractor import run_extraction from lib.enricher import run_enrichment from lib.embedder import run_embedding from lib.api import app, run_server as start_dashboard @@ -682,7 +614,6 @@ def cmd_service(args): proc = config.get('processing', {}) svc = config.get('service', {}) - extract_workers = proc.get('extract_workers', 4) enrich_workers = proc.get('enrich_workers', 16) embed_workers = proc.get('embed_workers', 4) poll_interval = svc.get('stage_poll_interval', 30) @@ -693,7 +624,7 @@ def cmd_service(args): web_port = config.get('web', {}).get('port', 8420) stop_event = threading.Event() - totals = {'extract': 0, 'enrich': 0, 'embed': 0} + totals = {'enrich': 0, 'embed': 0} def shutdown(signum, frame): sig_name = signal.Signals(signum).name @@ -746,8 +677,6 @@ def cmd_service(args): threads = [ threading.Thread(target=lambda: dispatch_loop(stop_event, db, config, interval=dispatch_interval), daemon=True, name='dispatcher'), - threading.Thread(target=stage_loop, daemon=True, name='extract', - args=('extract', lambda: run_extraction(workers=extract_workers))), threading.Thread(target=stage_loop, daemon=True, name='enrich', args=('enrich', lambda: run_enrichment(workers=enrich_workers))), threading.Thread(target=stage_loop, daemon=True, name='embed', @@ -761,7 +690,7 @@ def cmd_service(args): logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") - logger.info(f" Workers: extract={extract_workers}, enrich={enrich_workers}, embed={embed_workers}") + logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") logger.info(f" Progress: every {progress_interval}s") @@ -1188,21 +1117,6 @@ def main(): p.set_defaults(func=cmd_ingest_url) # crawl - p = sub.add_parser('crawl', help='Crawl a site and ingest discovered pages') - p.add_argument('url', help='Base URL to crawl') - p.add_argument('--category', default='Web', help='Category for ingested content') - p.add_argument('--source', default=None, help='Source identifier (default: domain name)') - p.add_argument('--include', nargs='+', help='Only include URL paths starting with these prefixes') - p.add_argument('--exclude', nargs='+', help='Exclude URL paths starting with these prefixes') - p.add_argument('--max-pages', type=int, default=500, help='Maximum pages to ingest') - p.add_argument('--max-depth', type=int, default=3, help='Maximum link-follow depth') - p.add_argument('--delay', type=float, default=1.0, help='Delay between page fetches (seconds)') - p.add_argument('--dry-run', action='store_true', help='Discover URLs without ingesting') - p.add_argument('--no-sitemap', action='store_true', help='Skip sitemap, use link-following only') - p.add_argument('--enrich', action='store_true', help='Run enrichment after crawl') - p.add_argument('--process', action='store_true', help='Full pipeline: crawl + enrich + embed') - p.set_defaults(func=cmd_crawl) - # validate p = sub.add_parser('validate', help='Validate pipeline consistency') p.add_argument('--deep', action='store_true', help='Deep validation (check all files)') From 277110d9992794e9239c0d4afb520781f5bfbe1f Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 03:08:51 +0000 Subject: [PATCH 11/17] Phase 6d: PeerTube acquisition module + service thread New lib/acquisition/peertube.py replaces the removed peertube_scanner_loop. Polls PeerTube API every 30min, dedupes against catalogue (UUID + title), writes flat file pairs to data/acquired/stream/ for the dispatcher. - acquire_batch(): one-shot find-and-acquire with rate limiting - acquisition_loop(): service thread wrapper (interval from config) - list_new_videos(): dedup via _build_known_sets() against catalogue - acquire_one(): fetch VTT, convert, write .tmp then rename atomically cmd_service(): added peertube-acq daemon thread cmd_ingest_peertube(): rewired to use acquire_batch(), drops --channel/ --since/--enrich/--process (dispatcher handles full pipeline) config.yaml: added peertube.poll_interval: 1800 Co-Authored-By: Claude Opus 4.6 --- config.yaml | 1 + lib/acquisition/peertube.py | 224 ++++++++++++++++++++++++++++++++++++ recon.py | 52 ++++----- 3 files changed, 245 insertions(+), 32 deletions(-) create mode 100644 lib/acquisition/peertube.py diff --git a/config.yaml b/config.yaml index 1f5d1b0..a9dcf16 100644 --- a/config.yaml +++ b/config.yaml @@ -411,6 +411,7 @@ peertube: public_url: https://stream.echo6.co # Public URL for video links fetch_timeout: 30 # HTTP timeout for API/VTT requests rate_limit_delay: 0.5 # Delay between video ingestions (seconds) + poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) # Stream B: New Library Pipeline new_pipeline: diff --git a/lib/acquisition/peertube.py b/lib/acquisition/peertube.py new file mode 100644 index 0000000..43b9a9a --- /dev/null +++ b/lib/acquisition/peertube.py @@ -0,0 +1,224 @@ +""" +RECON PeerTube Acquisition Module + +Polls PeerTube for new video transcripts and writes them as flat file pairs +into data/acquired/stream/ for the dispatcher to pick up. + +Does NOT touch the database — that's transcript_processor's job. +""" +import json +import os +import time + +from lib.peertube_scraper import get_videos, get_captions, fetch_vtt, vtt_to_text, _get_pt_config +from lib.utils import content_hash, get_config, setup_logging + +logger = setup_logging("recon.acquisition.peertube") + + +def _build_known_sets(db): + """Build sets of known UUIDs and titles from catalogue. + + Queries catalogue once per batch for dedup against both cohorts: + - URL-path rows: extract UUID from https://stream.echo6.co/w/{uuid} + - Library-path rows: extract title from filename column + """ + conn = db._get_conn() + rows = conn.execute( + "SELECT path, filename FROM catalogue WHERE source = 'stream.echo6.co'" + ).fetchall() + known_uuids = set() + known_titles = set() + for row in rows: + path = row['path'] or '' + if '/w/' in path: + known_uuids.add(path.rsplit('/w/', 1)[-1]) + fname = row['filename'] or '' + if fname.endswith('.txt'): + known_titles.add(fname[:-4]) + else: + known_titles.add(fname) + return known_uuids, known_titles + + +def list_new_videos(db, config=None): + """Find PeerTube videos with captions not yet in catalogue. + + Returns list of (video_dict, caption_path) tuples for videos that have + captions and are not in the known UUID or title sets. + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + rate_delay = ptc.get('rate_limit_delay', 0.5) + + known_uuids, known_titles = _build_known_sets(db) + + videos = get_videos(config=config) + new_videos = [] + checked = 0 + + for video in videos: + if video['uuid'] in known_uuids: + continue + if video['name'] in known_titles: + continue + + # Rate limit caption API calls + if checked > 0: + time.sleep(rate_delay) + checked += 1 + + try: + captions = get_captions(video['uuid'], config) + except Exception as e: + logger.warning("[peertube] Failed to get captions for %s: %s", + video['uuid'][:8], e) + continue + + if not captions: + continue + + # Prefer English caption + caption_path = None + for c in captions: + if c.get('language', {}).get('id') == 'en': + caption_path = c['captionPath'] + break + if caption_path is None: + caption_path = captions[0]['captionPath'] + + new_videos.append((video, caption_path)) + + return new_videos + + +def acquire_one(video, caption_path, config=None): + """Fetch transcript and write to hopper as flat files. + + Returns hash string on success, None on skip/error. + Does NOT touch the database — that's transcript_processor's job. + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + + pipeline_cfg = config.get('pipeline', {}) + hopper_dir = os.path.join( + pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired'), + 'stream' + ) + os.makedirs(hopper_dir, exist_ok=True) + + uuid = video['uuid'] + + # Fetch and convert VTT + vtt_content = fetch_vtt(caption_path, config) + text, cue_timestamps = vtt_to_text(vtt_content) + + if not text or len(text.strip()) < 50: + logger.debug("[peertube] Transcript too short for %s (%s): %d chars", + video['name'], uuid, len(text) if text else 0) + return None + + # Write text to temp file, hash it, then rename to final name + tmp_txt = os.path.join(hopper_dir, f'{uuid}.txt.tmp') + with open(tmp_txt, 'w', encoding='utf-8') as f: + f.write(text) + + file_hash = content_hash(tmp_txt) + + # Check if final file already exists (race condition guard) + final_txt = os.path.join(hopper_dir, f'{file_hash}.txt') + final_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json') + if os.path.exists(final_txt): + os.remove(tmp_txt) + logger.debug("[peertube] Hopper file already exists: %s", file_hash[:8]) + return None + + # Build sidecar metadata + video_url = f"{ptc['public_url']}/w/{uuid}" + meta = { + 'title': video['name'], + 'source_url': video_url, + 'url': video_url, + 'source': 'stream.echo6.co', + 'source_type': 'transcript', + 'category': 'Transcript', + 'channel': video.get('channel_display', ''), + 'duration': video.get('duration', 0), + 'uuid': uuid, + 'cue_timestamps': cue_timestamps, + } + + # Write meta to tmp, then rename both atomically + # Meta first, then content — dispatcher only picks up when content file exists + tmp_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json.tmp') + with open(tmp_meta, 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + os.rename(tmp_meta, final_meta) + os.rename(tmp_txt, final_txt) + + logger.info("[peertube] Acquired: %s (%s) -> %s", + video['name'], uuid[:8], file_hash[:12]) + return file_hash + + +def acquire_batch(db, config=None): + """One-shot: find new videos and acquire them. + + Returns dict: {'acquired': N, 'skipped': N, 'errors': N} + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + rate_delay = ptc.get('rate_limit_delay', 0.5) + + result = {'acquired': 0, 'skipped': 0, 'errors': 0} + + try: + new_videos = list_new_videos(db, config) + except Exception as e: + logger.error("[peertube] Failed to list new videos: %s", e, exc_info=True) + result['errors'] = 1 + return result + + if not new_videos: + logger.debug("[peertube] No new videos found") + return result + + logger.info("[peertube] Found %d new videos to acquire", len(new_videos)) + + for i, (video, caption_path) in enumerate(new_videos): + if i > 0: + time.sleep(rate_delay) + try: + file_hash = acquire_one(video, caption_path, config) + if file_hash: + result['acquired'] += 1 + else: + result['skipped'] += 1 + except Exception as e: + logger.error("[peertube] Error acquiring %s (%s): %s", + video['name'], video['uuid'][:8], e, exc_info=True) + result['errors'] += 1 + + return result + + +def acquisition_loop(stop_event, db, config, interval=1800): + """Service loop: poll PeerTube for new transcripts every interval seconds.""" + logger.info("[peertube] Acquisition loop started (interval: %ds)", interval) + while not stop_event.is_set(): + try: + result = acquire_batch(db, config) + if result['acquired']: + logger.info("[peertube] Acquired %d new transcripts (%d skipped, %d errors)", + result['acquired'], result['skipped'], result['errors']) + else: + logger.debug("[peertube] No new transcripts") + except Exception as e: + logger.error("[peertube] Error: %s", e, exc_info=True) + stop_event.wait(interval) + logger.info("[peertube] Acquisition loop stopped") diff --git a/recon.py b/recon.py index 0619ac0..47dda7d 100755 --- a/recon.py +++ b/recon.py @@ -609,6 +609,7 @@ def cmd_service(args): from lib.api import app, run_server as start_dashboard from lib.dispatcher import dispatch_loop from lib.filing import filing_worker_loop + from lib.acquisition.peertube import acquisition_loop config = get_config() proc = config.get('processing', {}) @@ -683,6 +684,9 @@ def cmd_service(args): args=('embed', lambda: run_embedding(workers=embed_workers))), threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval), daemon=True, name='filing'), + threading.Thread(target=lambda: acquisition_loop(stop_event, db, config, + interval=config.get("peertube", {}).get("poll_interval", 1800)), + daemon=True, name="peertube-acq"), threading.Thread(target=progress_loop, daemon=True, name='progress'), threading.Thread(target=lambda: start_dashboard(stop_event), daemon=True, name='dashboard'), @@ -692,6 +696,8 @@ def cmd_service(args): logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") + pt_interval = config.get("peertube", {}).get("poll_interval", 1800) + logger.info(f" PeerTube acquisition: every {pt_interval}s") logger.info(f" Progress: every {progress_interval}s") for t in threads: @@ -721,7 +727,9 @@ def cmd_service(args): return 0 def cmd_ingest_peertube(args): - from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats + from lib.peertube_scraper import get_instance_stats + from lib.acquisition.peertube import acquire_batch + from lib.status import StatusDB if args.stats: stats = get_instance_stats() @@ -734,36 +742,20 @@ def cmd_ingest_peertube(args): print(f" {status}: {count}") return 0 - since = args.since if hasattr(args, 'since') and args.since else None + db = StatusDB() - if args.channel: - print(f"Ingesting PeerTube channel: {args.channel}") - result = ingest_channel(args.channel, since=since) - else: - print("Ingesting all PeerTube videos with captions") - result = ingest_all(since=since) + print("Acquiring PeerTube transcripts to hopper...") + result = acquire_batch(db) - summary = result.get('summary', {}) print(f"\nResults:") - print(f" Checked: {summary.get('total_checked', 0)}") - print(f" Ingested: {summary.get('ingested', 0)} ({summary.get('total_pages', 0)} pages)") - print(f" No captions: {summary.get('skipped_no_captions', 0)}") - print(f" Duplicates: {summary.get('skipped_duplicate', 0)}") - print(f" Failed: {summary.get('failed', 0)}") + print(f" Acquired: {result['acquired']}") + print(f" Skipped: {result['skipped']}") + print(f" Errors: {result['errors']}") - # Optional: run enrichment - if args.enrich or args.process: - print("\nRunning enrichment on extracted content...") - from lib.enricher import run_enrichment - enriched = run_enrichment() - print(f" Enriched: {enriched}") - - # Optional: run embedding too - if args.process: - print("\nRunning embedding...") - from lib.embedder import run_embedding - embedded = run_embedding() - print(f" Embedded: {embedded}") + if result['acquired']: + print(f"\n{result['acquired']} transcript(s) staged in hopper.") + print("The dispatcher will pick them up on its next cycle.") + print("Run 'recon status' to monitor progress.") return 0 @@ -1144,12 +1136,8 @@ def main(): p.set_defaults(func=cmd_organize) # ingest-peertube - p = sub.add_parser('ingest-peertube', help='Ingest PeerTube video transcripts') - p.add_argument('--channel', help='Ingest specific channel (actor_name)') - p.add_argument('--since', help='Only ingest videos published after this date (ISO format)') + p = sub.add_parser('ingest-peertube', help='Acquire PeerTube transcripts to hopper') p.add_argument('--stats', action='store_true', help='Show PeerTube instance stats') - p.add_argument('--enrich', action='store_true', help='Run enrichment after ingestion') - p.add_argument('--process', action='store_true', help='Full pipeline: ingest + enrich + embed') p.set_defaults(func=cmd_ingest_peertube) # ingest From 7e42528d2fbeb5b6ad731c8c7e62f747eb949189 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 03:15:41 +0000 Subject: [PATCH 12/17] Phase 6e: rewire dashboard PeerTube endpoint to acquisition module Replace legacy ingest_channel/ingest_all imports with acquire_batch from lib.acquisition.peertube. The endpoint now writes flat file pairs to the hopper and lets the dispatcher handle processing, matching the Phase 6d architecture. Removes channel/since/process parameters that were tied to the old direct-ingest path. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 61 +++++++++++++++++++----------------------------------- 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/lib/api.py b/lib/api.py index fa72c3f..421c54e 100644 --- a/lib/api.py +++ b/lib/api.py @@ -629,62 +629,43 @@ _peertube_results = {} @app.route('/api/ingest-peertube', methods=['POST']) def api_ingest_peertube(): - """Ingest PeerTube video transcripts.""" - data = request.get_json() or {} - channel = data.get('channel') - since = data.get('since') - process = data.get('process', False) + """Acquire new PeerTube transcripts into the hopper. - from .peertube_scraper import ingest_channel, ingest_all + Uses the acquisition module to find new videos and write flat file pairs + to data/acquired/stream/. The dispatcher picks them up automatically. + """ + from .acquisition.peertube import acquire_batch - job_id = f"pt_{hash(channel or 'all') & 0xFFFFFFFF:08x}_{int(__import__('time').time())}" + job_id = f"pt_{int(__import__('time').time()):08x}" - def _run_ingest(): + def _run_acquire(): try: - _peertube_results[job_id] = {'status': 'running', 'stage': 'ingesting', - 'channel': channel or 'all'} - if channel: - result = ingest_channel(channel, since=since) - else: - result = ingest_all(since=since) + _peertube_results[job_id] = {'status': 'running', 'stage': 'acquiring'} + db = StatusDB() + result = acquire_batch(db) - summary = result.get('summary', {}) _peertube_results[job_id] = { - 'status': 'running', 'stage': 'enriching', - 'channel': channel or 'all', 'ingest_summary': summary, + 'status': 'complete', + 'acquired': result['acquired'], + 'skipped': result['skipped'], + 'errors': result['errors'], + 'message': f"Acquired {result['acquired']} transcript(s). Dispatcher will process them." } - - if process: - logger.info(f"PeerTube {job_id}: ingestion done, running enrichment...") - from .enricher import run_enrichment - enriched = run_enrichment() - logger.info(f"PeerTube {job_id}: enriched {enriched} documents") - - _peertube_results[job_id]['stage'] = 'embedding' - from .embedder import run_embedding - embedded = run_embedding() - logger.info(f"PeerTube {job_id}: embedded {embedded} documents") - - summary['enriched'] = enriched - summary['embedded'] = embedded - - result['status'] = 'complete' - _peertube_results[job_id] = result + logger.info("PeerTube acquire %s: acquired=%d skipped=%d errors=%d", + job_id, result['acquired'], result['skipped'], result['errors']) except Exception as e: - logger.error(f"PeerTube ingestion {job_id} failed: {e}", exc_info=True) + logger.error("PeerTube acquire %s failed: %s", job_id, e, exc_info=True) _peertube_results[job_id] = {'error': str(e), 'status': 'failed'} - _peertube_results[job_id] = {'status': 'running', 'stage': 'starting', - 'channel': channel or 'all'} - t = threading.Thread(target=_run_ingest, daemon=True) + _peertube_results[job_id] = {'status': 'running', 'stage': 'starting'} + t = threading.Thread(target=_run_acquire, daemon=True) t.start() return jsonify({ 'job_id': job_id, 'status': 'started', - 'channel': channel or 'all', - 'message': f'PeerTube ingestion started. Check /api/ingest-peertube/{job_id}/status' + 'message': f'PeerTube acquisition started. Check /api/ingest-peertube/{job_id}/status' }), 202 From 7fe7d0358309b2802634f9d17b77186e173eb08b Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 03:20:46 +0000 Subject: [PATCH 13/17] Revert "Phase 6e: rewire dashboard PeerTube endpoint to acquisition module" This reverts commit 7e42528d2fbeb5b6ad731c8c7e62f747eb949189. --- lib/api.py | 61 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/lib/api.py b/lib/api.py index 421c54e..fa72c3f 100644 --- a/lib/api.py +++ b/lib/api.py @@ -629,43 +629,62 @@ _peertube_results = {} @app.route('/api/ingest-peertube', methods=['POST']) def api_ingest_peertube(): - """Acquire new PeerTube transcripts into the hopper. + """Ingest PeerTube video transcripts.""" + data = request.get_json() or {} + channel = data.get('channel') + since = data.get('since') + process = data.get('process', False) - Uses the acquisition module to find new videos and write flat file pairs - to data/acquired/stream/. The dispatcher picks them up automatically. - """ - from .acquisition.peertube import acquire_batch + from .peertube_scraper import ingest_channel, ingest_all - job_id = f"pt_{int(__import__('time').time()):08x}" + job_id = f"pt_{hash(channel or 'all') & 0xFFFFFFFF:08x}_{int(__import__('time').time())}" - def _run_acquire(): + def _run_ingest(): try: - _peertube_results[job_id] = {'status': 'running', 'stage': 'acquiring'} - db = StatusDB() - result = acquire_batch(db) + _peertube_results[job_id] = {'status': 'running', 'stage': 'ingesting', + 'channel': channel or 'all'} + if channel: + result = ingest_channel(channel, since=since) + else: + result = ingest_all(since=since) + summary = result.get('summary', {}) _peertube_results[job_id] = { - 'status': 'complete', - 'acquired': result['acquired'], - 'skipped': result['skipped'], - 'errors': result['errors'], - 'message': f"Acquired {result['acquired']} transcript(s). Dispatcher will process them." + 'status': 'running', 'stage': 'enriching', + 'channel': channel or 'all', 'ingest_summary': summary, } - logger.info("PeerTube acquire %s: acquired=%d skipped=%d errors=%d", - job_id, result['acquired'], result['skipped'], result['errors']) + + if process: + logger.info(f"PeerTube {job_id}: ingestion done, running enrichment...") + from .enricher import run_enrichment + enriched = run_enrichment() + logger.info(f"PeerTube {job_id}: enriched {enriched} documents") + + _peertube_results[job_id]['stage'] = 'embedding' + from .embedder import run_embedding + embedded = run_embedding() + logger.info(f"PeerTube {job_id}: embedded {embedded} documents") + + summary['enriched'] = enriched + summary['embedded'] = embedded + + result['status'] = 'complete' + _peertube_results[job_id] = result except Exception as e: - logger.error("PeerTube acquire %s failed: %s", job_id, e, exc_info=True) + logger.error(f"PeerTube ingestion {job_id} failed: {e}", exc_info=True) _peertube_results[job_id] = {'error': str(e), 'status': 'failed'} - _peertube_results[job_id] = {'status': 'running', 'stage': 'starting'} - t = threading.Thread(target=_run_acquire, daemon=True) + _peertube_results[job_id] = {'status': 'running', 'stage': 'starting', + 'channel': channel or 'all'} + t = threading.Thread(target=_run_ingest, daemon=True) t.start() return jsonify({ 'job_id': job_id, 'status': 'started', - 'message': f'PeerTube acquisition started. Check /api/ingest-peertube/{job_id}/status' + 'channel': channel or 'all', + 'message': f'PeerTube ingestion started. Check /api/ingest-peertube/{job_id}/status' }), 202 From 62539861f29e5765832ba8f17b570590808b49c4 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 22:39:31 +0000 Subject: [PATCH 14/17] Phase 6f: text processor for .txt file ingestion New processor: lib/processors/text_processor.py Handles plain text files (.txt) as primary source documents. Pipeline: acquired/text/ -> dispatcher -> text_processor.pre_flight() -> enrich -> embed -> filing worker -> library/Domain/Subdomain/ Metadata extraction via two-source vote: - Source A: filename parsing (title from filename) - Source B: Gemini LLM extraction (title/author/edition/year from first 3 pages of text) Page splitting reuses chunk_text() from lib/web_scraper.py. Filing behavior matches PDFs (files to library, not organized in-place like transcripts). Config: adds text: text_processor to pipeline.dispatch map. New hopper subfolder: data/acquired/text/ Co-Authored-By: Claude Opus 4.6 --- config.yaml | 1 + lib/processors/text_processor.py | 320 +++++++++++++++++++++++++++++++ 2 files changed, 321 insertions(+) create mode 100644 lib/processors/text_processor.py diff --git a/config.yaml b/config.yaml index a9dcf16..3e185f8 100644 --- a/config.yaml +++ b/config.yaml @@ -437,5 +437,6 @@ pipeline: pdf: pdf_processor stream: transcript_processor html: html_processor + text: text_processor # mtime stability threshold for picking up files from acquired/ mtime_stability_seconds: 10 diff --git a/lib/processors/text_processor.py b/lib/processors/text_processor.py new file mode 100644 index 0000000..47e68fc --- /dev/null +++ b/lib/processors/text_processor.py @@ -0,0 +1,320 @@ +""" +RECON Text Processor + +Handles pre_flight for plain .txt files arriving in acquired/text/. +These are primary source documents (books, manuals, guides), not derived +content like transcripts. + +Metadata extraction via two-source vote: + A. Filename parsing (title, optionally author) + B. Gemini LLM extraction (title/author/edition/year from first 3 pages) + +Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/ +by the filing worker (NOT organized in-place like transcripts). + +Phase 6f: initial implementation. +""" +import json +import logging +import os +import re +import shutil + +from lib.web_scraper import chunk_text +from lib.utils import content_hash, clean_filename_to_title +from lib.processors.pdf_processor import _extract_gemini_metadata + +logger = logging.getLogger("recon.processors.text") + +WORDS_PER_PAGE = 2000 + + +def _extract_filename_metadata(filename): + """Source A: Extract metadata by parsing the filename. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + stem = os.path.splitext(filename)[0] + + result['title'] = clean_filename_to_title(filename) + + # Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY + year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem) + if year_match: + result['year'] = year_match.group(1) + + # Edition: Nth Edition, Edition N, etc. + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + stem, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + + # Author: "Title - Author" or "Title by Author" patterns + # Try " - Author" at end + dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem) + if dash_match: + result['author'] = dash_match.group(1).strip() + else: + by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem) + if by_match: + result['author'] = by_match.group(1).strip() + + return result + + +def _vote_metadata(source_a, source_b): + """Vote on each metadata field across two sources. + + Priority: B (Gemini) > A (filename). + If both agree, note agreement. + + Returns: + (voted_dict, provenance_record) + """ + sources = {'filename': source_a, 'gemini': source_b} + priority = ['gemini', 'filename'] + voted = {} + provenance = {} + + for field in ('title', 'author', 'edition', 'year'): + values = {} + for name, src in sources.items(): + val = src.get(field) + if val and str(val).strip().lower() != 'null': + values[name] = val + + if not values: + voted[field] = None + provenance[field] = None + continue + + def _norm(v): + if field == 'year': + return v.strip() + return v.strip().lower() + + norm_groups = {} + for name, val in values.items(): + nv = _norm(val) + norm_groups.setdefault(nv, []).append((name, val)) + + best_group = max(norm_groups.values(), key=len) + + if len(best_group) >= 2: + # Both sources agree + for p in priority: + for name, val in best_group: + if name == p: + voted[field] = val + names = ','.join(n for n, _ in best_group) + provenance[field] = "agreed({})".format(names) + break + if voted.get(field) is not None: + break + else: + # No agreement — highest priority wins + for p in priority: + if p in values: + voted[field] = values[p] + provenance[field] = p + break + + provenance_record = { + 'voted': voted, + 'provenance': provenance, + 'sources': { + 'filename': source_a, + 'gemini': source_b, + } + } + + return voted, provenance_record + + +def pre_flight(content_path, meta_path, db, config): + """Process a .txt file dropped into acquired/text/. + + Args: + content_path: Path to the .txt file + meta_path: Path to .meta.json sidecar (may be None) + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'skip_empty', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + filename = os.path.basename(content_path) + + # ── Step 1: Hash ────────────────────────────────────────────── + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = "Cannot hash text file: {}".format(e) + return result + + # ── Step 2: Stale state cleanup ─────────────────────────────── + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise + + # ── Step 3: Hash dedupe ─────────────────────────────────────── + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # ── Step 4: Read text content ───────────────────────────────── + try: + with open(content_path, encoding='utf-8', errors='replace') as f: + raw_text = f.read() + except Exception as e: + result['error'] = "Cannot read text file: {}".format(e) + return result + + if len(raw_text.strip()) < 50: + logger.info("Text file too short (%d chars), skipping: %s", + len(raw_text.strip()), filename) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError: + pass + result['action'] = 'skip_empty' + return result + + # ── Step 5: Read optional sidecar ───────────────────────────── + sidecar = None + if meta_path: + try: + with open(meta_path, encoding='utf-8') as f: + sidecar = json.load(f) + except Exception as e: + logger.warning("Cannot read sidecar %s: %s", meta_path, e) + + # ── Step 6: Set up processing directory ─────────────────────── + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = "Cannot create processing dir: {}".format(e) + return result + + # ── Step 7: Move files to processing ────────────────────────── + source_path = os.path.join(proc_dir, 'source.txt') + try: + shutil.move(content_path, source_path) + if meta_path: + shutil.move(meta_path, os.path.join(proc_dir, 'meta.json')) + except Exception as e: + result['error'] = "Cannot move files to processing: {}".format(e) + return result + + # ── Step 8: Split into pages ────────────────────────────────── + pages = chunk_text(raw_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # ── Step 9: Source A — filename metadata ────────────────────── + source_a = _extract_filename_metadata(filename) + + # ── Step 10: Source B — Gemini metadata ─────────────────────── + first_pages_text = "\n".join(pages[:3]) + source_b = _extract_gemini_metadata(first_pages_text, config) + + # ── Step 11: Vote ───────────────────────────────────────────── + voted, provenance_record = _vote_metadata(source_a, source_b) + + # ── Step 12: Write meta.json ────────────────────────────────── + meta = { + 'hash': file_hash, + 'filename': filename, + 'source_type': 'text', + 'page_count': len(pages), + 'text_length': len(raw_text), + 'metadata': voted, + 'metadata_provenance': provenance_record, + } + if sidecar: + meta['sidecar'] = sidecar + + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # ── Step 13: Register in catalogue ──────────────────────────── + display_title = voted.get('title') or clean_filename_to_title(filename) + size_bytes = os.path.getsize(source_path) + + db.add_to_catalogue( + file_hash, filename, source_path, size_bytes, 'text', 'Document' + ) + + # ── Step 14: Queue and update documents row ─────────────────── + db.queue_document(file_hash) + + conn = db._get_conn() + conn.execute( + "UPDATE documents SET " + "text_dir = ?, page_count = ?, path = ?, " + "book_title = ?, book_author = ?, metadata_provenance = ? " + "WHERE hash = ?", + ( + proc_dir, + len(pages), + source_path, + voted.get('title'), + voted.get('author'), + json.dumps(provenance_record), + file_hash, + ) + ) + conn.commit() + + # ── Step 15: Status = extracted ─────────────────────────────── + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + logger.info( + "Text pre_flight complete: %s (%s) -> %d pages in %s", + file_hash[:8], display_title, len(pages), proc_dir, + ) + + result['action'] = 'extracted' + return result From f4659d155f4d563629455a94af3abc90e5fed2a5 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 23:08:19 +0000 Subject: [PATCH 15/17] Phase 6f-2: format normalizer in dispatcher Adds _normalize_formats() to the dispatcher that converts non-standard document formats to PDF before dispatch. Supports: - .epub, .mobi -> PDF via ebook-convert (Calibre) - .doc, .docx -> PDF via LibreOffice headless Called per-subfolder before _find_pairs() so _find_pairs() only ever sees standard content files. Conversion failures are logged and skipped -- the original file stays in acquired/ for manual review. Also converts 3 staged epub files and cleans up _staging/. Co-Authored-By: Claude Opus 4.6 --- lib/dispatcher.py | 65 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/lib/dispatcher.py b/lib/dispatcher.py index a26436c..7294ec4 100644 --- a/lib/dispatcher.py +++ b/lib/dispatcher.py @@ -7,10 +7,12 @@ hands them to the appropriate processor's pre_flight(). Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5. Phase 4: sidecar is optional (PDFs may arrive without .meta.json). +Phase 6f-2: format normalizer converts non-standard formats to PDF before dispatch. """ import importlib import logging import os +import subprocess import time from .utils import get_config @@ -21,6 +23,9 @@ logger = logging.getLogger("recon.dispatcher") # Content file extensions recognized by the dispatcher CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'} +# Non-standard formats that can be converted to PDF before dispatch +CONVERTIBLE_EXTENSIONS = {'.epub', '.mobi', '.doc', '.docx'} + def _load_processor(processor_name): """Dynamically import a processor module from lib.processors.""" @@ -35,6 +40,63 @@ def _load_processor(processor_name): return None +def _normalize_formats(subfolder_path): + """Convert non-standard document formats to PDF before dispatch. + + Walks the subfolder for files with convertible extensions (.epub, .mobi, + .doc, .docx). Converts each to PDF using the appropriate tool, then + deletes the original. + + Returns count of files converted. + """ + if not os.path.isdir(subfolder_path): + return 0 + + converted = 0 + + for fname in sorted(os.listdir(subfolder_path)): + stem, ext = os.path.splitext(fname) + if ext.lower() not in CONVERTIBLE_EXTENSIONS: + continue + + source = os.path.join(subfolder_path, fname) + target = os.path.join(subfolder_path, stem + '.pdf') + + if os.path.exists(target): + logger.debug("Target PDF already exists, skipping: %s", fname) + continue + + try: + if ext.lower() in ('.epub', '.mobi'): + subprocess.run( + ['ebook-convert', source, target], + capture_output=True, check=True, timeout=300, + ) + elif ext.lower() in ('.doc', '.docx'): + subprocess.run( + ['libreoffice', '--headless', '--convert-to', 'pdf', + '--outdir', subfolder_path, source], + capture_output=True, check=True, timeout=300, + ) + + if os.path.isfile(target) and os.path.getsize(target) > 0: + os.remove(source) + converted += 1 + logger.info("Converted %s -> %s.pdf", fname, stem) + else: + logger.warning("Conversion produced no output: %s", fname) + + except subprocess.TimeoutExpired: + logger.error("Conversion timed out: %s", fname) + except subprocess.CalledProcessError as e: + logger.error("Conversion failed for %s: %s", fname, + e.stderr.decode(errors='replace')[:200] if e.stderr else str(e)) + except Exception as e: + logger.error("Unexpected error converting %s: %s", fname, e) + + return converted + + def _find_pairs(subfolder_path): """Find content files (with optional sidecar) in a subfolder. @@ -115,6 +177,9 @@ def dispatch_once(): logger.error("Processor %s has no pre_flight function", processor_name) continue + # Convert non-standard formats to PDF before scanning for pairs + _normalize_formats(subfolder_path) + pairs = _find_pairs(subfolder_path) if not pairs: continue From 999cf37626142efa4b468fe8373e80ca2c077e9e Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 23:30:59 +0000 Subject: [PATCH 16/17] Fix: Gemini "null" string bug in pdf_processor metadata voting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same fix as text_processor — Gemini sometimes returns the literal string "null" instead of JSON null for empty metadata fields. The voting logic and Gemini extraction now both treat "null" strings as None. Co-Authored-By: Claude Opus 4.6 --- lib/processors/pdf_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/processors/pdf_processor.py b/lib/processors/pdf_processor.py index 5d7a57f..b5a17dd 100644 --- a/lib/processors/pdf_processor.py +++ b/lib/processors/pdf_processor.py @@ -181,7 +181,7 @@ def _extract_gemini_metadata(pages_text, config): for field in ('title', 'author', 'edition', 'year'): val = data.get(field) - if val and isinstance(val, str) and val.strip(): + if val and isinstance(val, str) and val.strip() and val.strip().lower() != "null": result[field] = val.strip() return result @@ -223,7 +223,7 @@ def _vote_metadata(source_a, source_b, source_c): values = {} for name, src in sources.items(): val = src.get(field) - if val: + if val and str(val).strip().lower() != "null": values[name] = val if not values: From e6224cb27909b5117ba60ee795ecfc14e753ac21 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 16 Apr 2026 02:18:45 +0000 Subject: [PATCH 17/17] Migrate dashboard upload to pipeline with multi-format support Upload handler now writes files to the appropriate hopper subfolder instead of copying directly to /mnt/library/: - .pdf -> acquired/pdf/ - .txt -> acquired/text/ - .epub, .doc, .docx, .mobi -> acquired/pdf/ (dispatcher format normalizer converts to PDF before processing) The dispatcher picks up files and routes through the appropriate processor (pdf_processor or text_processor) for full metadata voting, domain classification, and canonical filing. Changes to api_upload() / _process_upload(): - Relaxed extension check: PDF, TXT, EPUB, DOC, DOCX, MOBI - Routes to correct hopper subfolder by extension - Writes meta.json sidecar with original filename and category hint - Removed: direct library copy, add_to_catalogue, queue_document - Added: hopper-level dedup check (catches rapid re-uploads) - Kept: catalogue dedup check for immediate user feedback Changes to api_upload_status(): - Added fallback: checks acquired/ and processing/ dirs if hash not yet in documents table (covers gap between upload and dispatcher pickup) Template updated: accept attribute and help text now reflect multi-format support. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 106 ++++++++++++++++++++------------ templates/knowledge/upload.html | 9 +-- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/lib/api.py b/lib/api.py index fa72c3f..757ebf4 100644 --- a/lib/api.py +++ b/lib/api.py @@ -9,6 +9,7 @@ API endpoints for all pipeline operations including crawl, ingest, and search. Dependencies: Flask, qdrant-client, requests Config: web, vector_db, embedding sections of config.yaml """ +import glob import json import threading import os @@ -77,25 +78,20 @@ def _format_source_citation(payload): return book -def _resolve_upload_path(category, config): - """Resolve the target directory for an upload given a category name.""" - upload_paths = config.get('upload_paths', {}) - library_root = config['library_root'] +ALLOWED_EXTENSIONS = {'.pdf', '.txt', '.epub', '.doc', '.docx', '.mobi'} - if category in upload_paths: - return upload_paths[category] - - default_path = upload_paths.get('default', library_root) - safe_category = secure_filename(category) if category else '' - if safe_category: - return os.path.join(default_path, safe_category) - return default_path +HOPPER_ROUTING = { + '.pdf': '/opt/recon/data/acquired/pdf/', + '.txt': '/opt/recon/data/acquired/text/', + '.epub': '/opt/recon/data/acquired/pdf/', + '.doc': '/opt/recon/data/acquired/pdf/', + '.docx': '/opt/recon/data/acquired/pdf/', + '.mobi': '/opt/recon/data/acquired/pdf/', +} -def _process_upload(filepath, original_filename, category, config, db): - """Process a single PDF upload: hash, dedup, copy to library, catalogue, queue.""" - library_root = config['library_root'] - +def _process_upload(filepath, original_filename, ext, category, config, db): + """Process an upload: hash, dedup, drop into hopper for dispatcher pickup.""" file_hash = content_hash(filepath) conn = db._get_conn() @@ -103,34 +99,39 @@ def _process_upload(filepath, original_filename, category, config, db): if existing: raise ValueError(f"Duplicate: file already catalogued as {existing['filename']}") - target_dir = _resolve_upload_path(category, config) - os.makedirs(target_dir, exist_ok=True) + # Also check if already sitting in a hopper dir awaiting dispatch + for hopper in HOPPER_ROUTING.values(): + if any(os.path.exists(os.path.join(hopper, file_hash + e)) for e in ALLOWED_EXTENSIONS): + raise ValueError("Duplicate: file already queued for processing") - safe_name = secure_filename(original_filename) - if not safe_name: - safe_name = f"{file_hash}.pdf" - target_path = os.path.join(target_dir, safe_name) + hopper_dir = HOPPER_ROUTING.get(ext, '/opt/recon/data/acquired/pdf/') + os.makedirs(hopper_dir, exist_ok=True) - if os.path.exists(target_path): - base, ext = os.path.splitext(safe_name) - target_path = os.path.join(target_dir, f"{base}_{file_hash[:8]}{ext}") + target_path = os.path.join(hopper_dir, file_hash + ext) + meta_path = os.path.join(hopper_dir, file_hash + '.meta.json') + + stem = os.path.splitext(original_filename)[0] + sidecar = { + 'title': stem, + 'source': 'dashboard_upload', + 'source_type': ext.lstrip('.'), + 'category': category, + 'original_filename': original_filename, + } + + # Write sidecar first (with .tmp safety), then content + tmp_meta = meta_path + '.tmp' + with open(tmp_meta, 'w', encoding='utf-8') as f: + json.dump(sidecar, f, indent=2) + os.rename(tmp_meta, meta_path) shutil.copy2(filepath, target_path) - size = os.path.getsize(target_path) - - source, derived_category = derive_source_and_category(target_path, library_root) - - db.add_to_catalogue(file_hash, safe_name, target_path, size, source, derived_category) - db.queue_document(file_hash) return { 'hash': file_hash, - 'filename': safe_name, - 'category': derived_category, - 'source': source, - 'path': target_path, - 'size_bytes': size, - 'status': 'queued' + 'filename': original_filename, + 'source_type': ext.lstrip('.'), + 'status': 'queued', } @@ -346,22 +347,23 @@ def api_upload(): if not file.filename: return jsonify({'error': 'No file selected'}), 400 - if not file.filename.lower().endswith('.pdf'): - return jsonify({'error': 'Only PDF files are accepted'}), 400 + ext = os.path.splitext(file.filename)[1].lower() + if ext not in ALLOWED_EXTENSIONS: + return jsonify({'error': f'Unsupported file type: {ext}'}), 400 category = request.form.get('category', '').strip() config = get_config() db = StatusDB() - tmp_fd, tmp_path = tempfile.mkstemp(suffix='.pdf') + tmp_fd, tmp_path = tempfile.mkstemp(suffix=ext) try: file.save(tmp_path) if os.path.getsize(tmp_path) == 0: return jsonify({'error': 'Uploaded file is empty'}), 400 - result = _process_upload(tmp_path, file.filename, category, config, db) + result = _process_upload(tmp_path, file.filename, ext, category, config, db) return jsonify(result), 201 except ValueError as e: @@ -390,6 +392,28 @@ def api_upload_status(doc_hash): 'filename': cat['filename'], 'status': cat['status'], }) + + # Check hopper dirs for files awaiting dispatcher pickup + for hopper in ('/opt/recon/data/acquired/pdf/', '/opt/recon/data/acquired/text/'): + if glob.glob(os.path.join(hopper, doc_hash + '.*')): + return jsonify({ + 'hash': doc_hash, + 'status': 'pending', + 'message': 'Waiting for dispatcher', + }) + + # Check processing dir + proc_dir = os.path.join( + config.get('pipeline', {}).get('processing_root', '/opt/recon/data/processing'), + doc_hash, + ) + if os.path.isdir(proc_dir): + return jsonify({ + 'hash': doc_hash, + 'status': 'processing', + 'message': 'Being processed', + }) + return jsonify({'error': 'Document not found'}), 404 result = { diff --git a/templates/knowledge/upload.html b/templates/knowledge/upload.html index 3b58a3f..1f65bad 100644 --- a/templates/knowledge/upload.html +++ b/templates/knowledge/upload.html @@ -1,12 +1,13 @@ {% extends "base.html" %} {% block content %} -

Upload PDF

+

Upload Document

- - Document File + + Supported: PDF, TXT, EPUB, DOC, DOCX, MOBI
@@ -67,7 +68,7 @@ document.getElementById('upload-form').addEventListener('submit', async function result.innerHTML = 'Queued for processing
' + 'Hash: ' + data.hash + '
' + 'File: ' + data.filename + '
' + - 'Category: ' + data.source + '/' + data.category + ''; + 'Type: ' + data.source_type + ''; fileInput.value = ''; } else { status.style.color = '#ff4444';