Migrate dashboard upload to pipeline with multi-format support

Upload handler now writes files to the appropriate hopper subfolder
instead of copying directly to /mnt/library/:
- .pdf -> acquired/pdf/
- .txt -> acquired/text/
- .epub, .doc, .docx, .mobi -> acquired/pdf/ (dispatcher format
  normalizer converts to PDF before processing)

The dispatcher picks up files and routes through the appropriate
processor (pdf_processor or text_processor) for full metadata
voting, domain classification, and canonical filing.

Changes to api_upload() / _process_upload():
- Relaxed extension check: PDF, TXT, EPUB, DOC, DOCX, MOBI
- Routes to correct hopper subfolder by extension
- Writes meta.json sidecar with original filename and category hint
- Removed: direct library copy, add_to_catalogue, queue_document
- Added: hopper-level dedup check (catches rapid re-uploads)
- Kept: catalogue dedup check for immediate user feedback

Changes to api_upload_status():
- Added fallback: checks acquired/ and processing/ dirs if hash
  not yet in documents table (covers gap between upload and
  dispatcher pickup)

Template updated: accept attribute and help text now reflect
multi-format support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-16 02:18:45 +00:00
commit e6224cb279
2 changed files with 69 additions and 44 deletions

View file

@ -9,6 +9,7 @@ API endpoints for all pipeline operations including crawl, ingest, and search.
Dependencies: Flask, qdrant-client, requests
Config: web, vector_db, embedding sections of config.yaml
"""
import glob
import json
import threading
import os
@ -77,25 +78,20 @@ def _format_source_citation(payload):
return book
def _resolve_upload_path(category, config):
"""Resolve the target directory for an upload given a category name."""
upload_paths = config.get('upload_paths', {})
library_root = config['library_root']
ALLOWED_EXTENSIONS = {'.pdf', '.txt', '.epub', '.doc', '.docx', '.mobi'}
if category in upload_paths:
return upload_paths[category]
default_path = upload_paths.get('default', library_root)
safe_category = secure_filename(category) if category else ''
if safe_category:
return os.path.join(default_path, safe_category)
return default_path
HOPPER_ROUTING = {
'.pdf': '/opt/recon/data/acquired/pdf/',
'.txt': '/opt/recon/data/acquired/text/',
'.epub': '/opt/recon/data/acquired/pdf/',
'.doc': '/opt/recon/data/acquired/pdf/',
'.docx': '/opt/recon/data/acquired/pdf/',
'.mobi': '/opt/recon/data/acquired/pdf/',
}
def _process_upload(filepath, original_filename, category, config, db):
"""Process a single PDF upload: hash, dedup, copy to library, catalogue, queue."""
library_root = config['library_root']
def _process_upload(filepath, original_filename, ext, category, config, db):
"""Process an upload: hash, dedup, drop into hopper for dispatcher pickup."""
file_hash = content_hash(filepath)
conn = db._get_conn()
@ -103,34 +99,39 @@ def _process_upload(filepath, original_filename, category, config, db):
if existing:
raise ValueError(f"Duplicate: file already catalogued as {existing['filename']}")
target_dir = _resolve_upload_path(category, config)
os.makedirs(target_dir, exist_ok=True)
# Also check if already sitting in a hopper dir awaiting dispatch
for hopper in HOPPER_ROUTING.values():
if any(os.path.exists(os.path.join(hopper, file_hash + e)) for e in ALLOWED_EXTENSIONS):
raise ValueError("Duplicate: file already queued for processing")
safe_name = secure_filename(original_filename)
if not safe_name:
safe_name = f"{file_hash}.pdf"
target_path = os.path.join(target_dir, safe_name)
hopper_dir = HOPPER_ROUTING.get(ext, '/opt/recon/data/acquired/pdf/')
os.makedirs(hopper_dir, exist_ok=True)
if os.path.exists(target_path):
base, ext = os.path.splitext(safe_name)
target_path = os.path.join(target_dir, f"{base}_{file_hash[:8]}{ext}")
target_path = os.path.join(hopper_dir, file_hash + ext)
meta_path = os.path.join(hopper_dir, file_hash + '.meta.json')
stem = os.path.splitext(original_filename)[0]
sidecar = {
'title': stem,
'source': 'dashboard_upload',
'source_type': ext.lstrip('.'),
'category': category,
'original_filename': original_filename,
}
# Write sidecar first (with .tmp safety), then content
tmp_meta = meta_path + '.tmp'
with open(tmp_meta, 'w', encoding='utf-8') as f:
json.dump(sidecar, f, indent=2)
os.rename(tmp_meta, meta_path)
shutil.copy2(filepath, target_path)
size = os.path.getsize(target_path)
source, derived_category = derive_source_and_category(target_path, library_root)
db.add_to_catalogue(file_hash, safe_name, target_path, size, source, derived_category)
db.queue_document(file_hash)
return {
'hash': file_hash,
'filename': safe_name,
'category': derived_category,
'source': source,
'path': target_path,
'size_bytes': size,
'status': 'queued'
'filename': original_filename,
'source_type': ext.lstrip('.'),
'status': 'queued',
}
@ -346,22 +347,23 @@ def api_upload():
if not file.filename:
return jsonify({'error': 'No file selected'}), 400
if not file.filename.lower().endswith('.pdf'):
return jsonify({'error': 'Only PDF files are accepted'}), 400
ext = os.path.splitext(file.filename)[1].lower()
if ext not in ALLOWED_EXTENSIONS:
return jsonify({'error': f'Unsupported file type: {ext}'}), 400
category = request.form.get('category', '').strip()
config = get_config()
db = StatusDB()
tmp_fd, tmp_path = tempfile.mkstemp(suffix='.pdf')
tmp_fd, tmp_path = tempfile.mkstemp(suffix=ext)
try:
file.save(tmp_path)
if os.path.getsize(tmp_path) == 0:
return jsonify({'error': 'Uploaded file is empty'}), 400
result = _process_upload(tmp_path, file.filename, category, config, db)
result = _process_upload(tmp_path, file.filename, ext, category, config, db)
return jsonify(result), 201
except ValueError as e:
@ -390,6 +392,28 @@ def api_upload_status(doc_hash):
'filename': cat['filename'],
'status': cat['status'],
})
# Check hopper dirs for files awaiting dispatcher pickup
for hopper in ('/opt/recon/data/acquired/pdf/', '/opt/recon/data/acquired/text/'):
if glob.glob(os.path.join(hopper, doc_hash + '.*')):
return jsonify({
'hash': doc_hash,
'status': 'pending',
'message': 'Waiting for dispatcher',
})
# Check processing dir
proc_dir = os.path.join(
config.get('pipeline', {}).get('processing_root', '/opt/recon/data/processing'),
doc_hash,
)
if os.path.isdir(proc_dir):
return jsonify({
'hash': doc_hash,
'status': 'processing',
'message': 'Being processed',
})
return jsonify({'error': 'Document not found'}), 404
result = {