diff --git a/lib/dispatcher.py b/lib/dispatcher.py index 0abb314..f9a7b78 100644 --- a/lib/dispatcher.py +++ b/lib/dispatcher.py @@ -6,6 +6,7 @@ that have been stable (mtime unchanged) for the configured threshold, then hands them to the appropriate processor's pre_flight(). Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5. +Phase 4: sidecar is optional (PDFs may arrive without .meta.json). """ import importlib import logging @@ -17,37 +18,44 @@ from .status import StatusDB logger = logging.getLogger("recon.dispatcher") +# Content file extensions recognized by the dispatcher +CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'} + def _load_processor(processor_name): """Dynamically import a processor module from lib.processors.""" module_path = f"lib.processors.{processor_name}" try: return importlib.import_module(module_path) + except ModuleNotFoundError: + logger.debug("Processor module not found: %s (not yet implemented)", processor_name) + return None except ImportError as e: - logger.error("Cannot load processor %s: %s", processor_name, e) + logger.error("Failed to import processor %s: %s", processor_name, e) return None def _find_pairs(subfolder_path): - """Find content+sidecar pairs in a subfolder. + """Find content files (with optional sidecar) in a subfolder. - A pair is two files sharing a basename: - .txt (or other content extension) - .meta.json (sidecar) + A pair is: + . — content file + .meta.json — optional sidecar - Returns list of (content_path, meta_path, basename) tuples. + Returns list of (content_path, meta_path_or_None, basename) tuples. """ if not os.path.isdir(subfolder_path): return [] files = set(os.listdir(subfolder_path)) pairs = [] + seen_basenames = set() + # First pass: find .meta.json files and their matching content for fname in sorted(files): if fname.endswith('.meta.json'): basename = fname[:-len('.meta.json')] - # Look for matching content file (try common extensions) - for ext in ['.txt', '.vtt', '.html', '.pdf']: + for ext in sorted(CONTENT_EXTENSIONS): content_name = basename + ext if content_name in files: pairs.append(( @@ -55,8 +63,21 @@ def _find_pairs(subfolder_path): os.path.join(subfolder_path, fname), basename, )) + seen_basenames.add(content_name) break + # Second pass: find solo content files (no sidecar) + for fname in sorted(files): + if fname in seen_basenames: + continue + _stem, ext = os.path.splitext(fname) + if ext.lower() in CONTENT_EXTENSIONS and not fname.endswith('.meta.json'): + pairs.append(( + os.path.join(subfolder_path, fname), + None, + _stem, + )) + return pairs @@ -99,10 +120,12 @@ def dispatch_once(): continue for content_path, meta_path, basename in pairs: - # Both files must be stable - if not (_is_stable(content_path, stability_seconds) and - _is_stable(meta_path, stability_seconds)): - logger.debug("Pair %s not yet stable, skipping", basename) + # Content file must be stable; sidecar too if present + if not _is_stable(content_path, stability_seconds): + logger.debug("File %s not yet stable, skipping", basename) + continue + if meta_path and not _is_stable(meta_path, stability_seconds): + logger.debug("Sidecar for %s not yet stable, skipping", basename) continue logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name) diff --git a/lib/filing.py b/lib/filing.py index bb33e2c..4267922 100644 --- a/lib/filing.py +++ b/lib/filing.py @@ -95,6 +95,15 @@ def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False): result["action"] = "skip_unclassified" return result + # Fix 1.1: Preserve the source file's actual extension instead of + # the default .pdf that sanitize_filename() may have applied + source_ext = os.path.splitext(source_file_path)[1].lower() + if source_ext: + target_stem, _old_ext = os.path.splitext(target_path) + target_path = target_stem + source_ext + san_stem, _old_ext = os.path.splitext(sanitized_name) + sanitized_name = san_stem + source_ext + result["target_path"] = target_path # If already at target (idempotency), just mark organized diff --git a/lib/processors/transcript_processor.py b/lib/processors/transcript_processor.py index 392db6d..21860cf 100644 --- a/lib/processors/transcript_processor.py +++ b/lib/processors/transcript_processor.py @@ -6,6 +6,7 @@ Reads a raw text file + meta.json sidecar, hashes, dedupes, splits into page_NNNN.txt files, and registers in the database. Phase 3: first processor implementation. +Phase 4: added stale state cleanup at start of pre_flight. """ import hashlib import json @@ -50,6 +51,15 @@ def pre_flight(content_path, meta_path, db, config): result['error'] = f"Cannot hash content file: {e}" return result + # Stale state cleanup — remove any pre-existing processing/concepts dirs + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + shutil.rmtree(proc_dir, ignore_errors=True) + shutil.rmtree(concepts_dir, ignore_errors=True) + # Hash dedupe: if hash exists in catalogue, delete the pair and return conn = db._get_conn() existing = conn.execute( @@ -59,7 +69,8 @@ def pre_flight(content_path, meta_path, db, config): logger.info("Duplicate hash %s, removing pair", file_hash[:8]) try: os.remove(content_path) - os.remove(meta_path) + if meta_path: + os.remove(meta_path) except OSError as e: logger.warning("Failed to remove duplicate pair: %s", e) result['action'] = 'duplicate' @@ -86,10 +97,6 @@ def pre_flight(content_path, meta_path, db, config): return result # Set up processing directory - processing_root = config.get('pipeline', {}).get( - 'processing_root', '/opt/recon/data/processing' - ) - proc_dir = os.path.join(processing_root, file_hash) try: os.makedirs(proc_dir, exist_ok=True) except Exception as e: