From e6224cb27909b5117ba60ee795ecfc14e753ac21 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 16 Apr 2026 02:18:45 +0000 Subject: [PATCH] Migrate dashboard upload to pipeline with multi-format support Upload handler now writes files to the appropriate hopper subfolder instead of copying directly to /mnt/library/: - .pdf -> acquired/pdf/ - .txt -> acquired/text/ - .epub, .doc, .docx, .mobi -> acquired/pdf/ (dispatcher format normalizer converts to PDF before processing) The dispatcher picks up files and routes through the appropriate processor (pdf_processor or text_processor) for full metadata voting, domain classification, and canonical filing. Changes to api_upload() / _process_upload(): - Relaxed extension check: PDF, TXT, EPUB, DOC, DOCX, MOBI - Routes to correct hopper subfolder by extension - Writes meta.json sidecar with original filename and category hint - Removed: direct library copy, add_to_catalogue, queue_document - Added: hopper-level dedup check (catches rapid re-uploads) - Kept: catalogue dedup check for immediate user feedback Changes to api_upload_status(): - Added fallback: checks acquired/ and processing/ dirs if hash not yet in documents table (covers gap between upload and dispatcher pickup) Template updated: accept attribute and help text now reflect multi-format support. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 106 ++++++++++++++++++++------------ templates/knowledge/upload.html | 9 +-- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/lib/api.py b/lib/api.py index fa72c3f..757ebf4 100644 --- a/lib/api.py +++ b/lib/api.py @@ -9,6 +9,7 @@ API endpoints for all pipeline operations including crawl, ingest, and search. Dependencies: Flask, qdrant-client, requests Config: web, vector_db, embedding sections of config.yaml """ +import glob import json import threading import os @@ -77,25 +78,20 @@ def _format_source_citation(payload): return book -def _resolve_upload_path(category, config): - """Resolve the target directory for an upload given a category name.""" - upload_paths = config.get('upload_paths', {}) - library_root = config['library_root'] +ALLOWED_EXTENSIONS = {'.pdf', '.txt', '.epub', '.doc', '.docx', '.mobi'} - if category in upload_paths: - return upload_paths[category] - - default_path = upload_paths.get('default', library_root) - safe_category = secure_filename(category) if category else '' - if safe_category: - return os.path.join(default_path, safe_category) - return default_path +HOPPER_ROUTING = { + '.pdf': '/opt/recon/data/acquired/pdf/', + '.txt': '/opt/recon/data/acquired/text/', + '.epub': '/opt/recon/data/acquired/pdf/', + '.doc': '/opt/recon/data/acquired/pdf/', + '.docx': '/opt/recon/data/acquired/pdf/', + '.mobi': '/opt/recon/data/acquired/pdf/', +} -def _process_upload(filepath, original_filename, category, config, db): - """Process a single PDF upload: hash, dedup, copy to library, catalogue, queue.""" - library_root = config['library_root'] - +def _process_upload(filepath, original_filename, ext, category, config, db): + """Process an upload: hash, dedup, drop into hopper for dispatcher pickup.""" file_hash = content_hash(filepath) conn = db._get_conn() @@ -103,34 +99,39 @@ def _process_upload(filepath, original_filename, category, config, db): if existing: raise ValueError(f"Duplicate: file already catalogued as {existing['filename']}") - target_dir = _resolve_upload_path(category, config) - os.makedirs(target_dir, exist_ok=True) + # Also check if already sitting in a hopper dir awaiting dispatch + for hopper in HOPPER_ROUTING.values(): + if any(os.path.exists(os.path.join(hopper, file_hash + e)) for e in ALLOWED_EXTENSIONS): + raise ValueError("Duplicate: file already queued for processing") - safe_name = secure_filename(original_filename) - if not safe_name: - safe_name = f"{file_hash}.pdf" - target_path = os.path.join(target_dir, safe_name) + hopper_dir = HOPPER_ROUTING.get(ext, '/opt/recon/data/acquired/pdf/') + os.makedirs(hopper_dir, exist_ok=True) - if os.path.exists(target_path): - base, ext = os.path.splitext(safe_name) - target_path = os.path.join(target_dir, f"{base}_{file_hash[:8]}{ext}") + target_path = os.path.join(hopper_dir, file_hash + ext) + meta_path = os.path.join(hopper_dir, file_hash + '.meta.json') + + stem = os.path.splitext(original_filename)[0] + sidecar = { + 'title': stem, + 'source': 'dashboard_upload', + 'source_type': ext.lstrip('.'), + 'category': category, + 'original_filename': original_filename, + } + + # Write sidecar first (with .tmp safety), then content + tmp_meta = meta_path + '.tmp' + with open(tmp_meta, 'w', encoding='utf-8') as f: + json.dump(sidecar, f, indent=2) + os.rename(tmp_meta, meta_path) shutil.copy2(filepath, target_path) - size = os.path.getsize(target_path) - - source, derived_category = derive_source_and_category(target_path, library_root) - - db.add_to_catalogue(file_hash, safe_name, target_path, size, source, derived_category) - db.queue_document(file_hash) return { 'hash': file_hash, - 'filename': safe_name, - 'category': derived_category, - 'source': source, - 'path': target_path, - 'size_bytes': size, - 'status': 'queued' + 'filename': original_filename, + 'source_type': ext.lstrip('.'), + 'status': 'queued', } @@ -346,22 +347,23 @@ def api_upload(): if not file.filename: return jsonify({'error': 'No file selected'}), 400 - if not file.filename.lower().endswith('.pdf'): - return jsonify({'error': 'Only PDF files are accepted'}), 400 + ext = os.path.splitext(file.filename)[1].lower() + if ext not in ALLOWED_EXTENSIONS: + return jsonify({'error': f'Unsupported file type: {ext}'}), 400 category = request.form.get('category', '').strip() config = get_config() db = StatusDB() - tmp_fd, tmp_path = tempfile.mkstemp(suffix='.pdf') + tmp_fd, tmp_path = tempfile.mkstemp(suffix=ext) try: file.save(tmp_path) if os.path.getsize(tmp_path) == 0: return jsonify({'error': 'Uploaded file is empty'}), 400 - result = _process_upload(tmp_path, file.filename, category, config, db) + result = _process_upload(tmp_path, file.filename, ext, category, config, db) return jsonify(result), 201 except ValueError as e: @@ -390,6 +392,28 @@ def api_upload_status(doc_hash): 'filename': cat['filename'], 'status': cat['status'], }) + + # Check hopper dirs for files awaiting dispatcher pickup + for hopper in ('/opt/recon/data/acquired/pdf/', '/opt/recon/data/acquired/text/'): + if glob.glob(os.path.join(hopper, doc_hash + '.*')): + return jsonify({ + 'hash': doc_hash, + 'status': 'pending', + 'message': 'Waiting for dispatcher', + }) + + # Check processing dir + proc_dir = os.path.join( + config.get('pipeline', {}).get('processing_root', '/opt/recon/data/processing'), + doc_hash, + ) + if os.path.isdir(proc_dir): + return jsonify({ + 'hash': doc_hash, + 'status': 'processing', + 'message': 'Being processed', + }) + return jsonify({'error': 'Document not found'}), 404 result = { diff --git a/templates/knowledge/upload.html b/templates/knowledge/upload.html index 3b58a3f..1f65bad 100644 --- a/templates/knowledge/upload.html +++ b/templates/knowledge/upload.html @@ -1,12 +1,13 @@ {% extends "base.html" %} {% block content %} -

Upload PDF

+

Upload Document

- - Document File + + Supported: PDF, TXT, EPUB, DOC, DOCX, MOBI
@@ -67,7 +68,7 @@ document.getElementById('upload-form').addEventListener('submit', async function result.innerHTML = 'Queued for processing
' + 'Hash: ' + data.hash + '
' + 'File: ' + data.filename + '
' + - 'Category: ' + data.source + '/' + data.category + ''; + 'Type: ' + data.source_type + ''; fileInput.value = ''; } else { status.style.color = '#ff4444';