mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 06:34:40 +02:00
Adds _normalize_formats() to the dispatcher that converts non-standard document formats to PDF before dispatch. Supports: - .epub, .mobi -> PDF via ebook-convert (Calibre) - .doc, .docx -> PDF via LibreOffice headless Called per-subfolder before _find_pairs() so _find_pairs() only ever sees standard content files. Conversion failures are logged and skipped -- the original file stays in acquired/ for manual review. Also converts 3 staged epub files and cleans up _staging/. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
237 lines
8.3 KiB
Python
237 lines
8.3 KiB
Python
"""
|
|
RECON Dispatcher
|
|
|
|
Watches configured acquired/<subfolder>/ directories for content+sidecar pairs
|
|
that have been stable (mtime unchanged) for the configured threshold, then
|
|
hands them to the appropriate processor's pre_flight().
|
|
|
|
Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5.
|
|
Phase 4: sidecar is optional (PDFs may arrive without .meta.json).
|
|
Phase 6f-2: format normalizer converts non-standard formats to PDF before dispatch.
|
|
"""
|
|
import importlib
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import time
|
|
|
|
from .utils import get_config
|
|
from .status import StatusDB
|
|
|
|
logger = logging.getLogger("recon.dispatcher")
|
|
|
|
# Content file extensions recognized by the dispatcher
|
|
CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'}
|
|
|
|
# Non-standard formats that can be converted to PDF before dispatch
|
|
CONVERTIBLE_EXTENSIONS = {'.epub', '.mobi', '.doc', '.docx'}
|
|
|
|
|
|
def _load_processor(processor_name):
|
|
"""Dynamically import a processor module from lib.processors."""
|
|
module_path = f"lib.processors.{processor_name}"
|
|
try:
|
|
return importlib.import_module(module_path)
|
|
except ModuleNotFoundError:
|
|
logger.debug("Processor module not found: %s (not yet implemented)", processor_name)
|
|
return None
|
|
except ImportError as e:
|
|
logger.error("Failed to import processor %s: %s", processor_name, e)
|
|
return None
|
|
|
|
|
|
def _normalize_formats(subfolder_path):
|
|
"""Convert non-standard document formats to PDF before dispatch.
|
|
|
|
Walks the subfolder for files with convertible extensions (.epub, .mobi,
|
|
.doc, .docx). Converts each to PDF using the appropriate tool, then
|
|
deletes the original.
|
|
|
|
Returns count of files converted.
|
|
"""
|
|
if not os.path.isdir(subfolder_path):
|
|
return 0
|
|
|
|
converted = 0
|
|
|
|
for fname in sorted(os.listdir(subfolder_path)):
|
|
stem, ext = os.path.splitext(fname)
|
|
if ext.lower() not in CONVERTIBLE_EXTENSIONS:
|
|
continue
|
|
|
|
source = os.path.join(subfolder_path, fname)
|
|
target = os.path.join(subfolder_path, stem + '.pdf')
|
|
|
|
if os.path.exists(target):
|
|
logger.debug("Target PDF already exists, skipping: %s", fname)
|
|
continue
|
|
|
|
try:
|
|
if ext.lower() in ('.epub', '.mobi'):
|
|
subprocess.run(
|
|
['ebook-convert', source, target],
|
|
capture_output=True, check=True, timeout=300,
|
|
)
|
|
elif ext.lower() in ('.doc', '.docx'):
|
|
subprocess.run(
|
|
['libreoffice', '--headless', '--convert-to', 'pdf',
|
|
'--outdir', subfolder_path, source],
|
|
capture_output=True, check=True, timeout=300,
|
|
)
|
|
|
|
if os.path.isfile(target) and os.path.getsize(target) > 0:
|
|
os.remove(source)
|
|
converted += 1
|
|
logger.info("Converted %s -> %s.pdf", fname, stem)
|
|
else:
|
|
logger.warning("Conversion produced no output: %s", fname)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error("Conversion timed out: %s", fname)
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error("Conversion failed for %s: %s", fname,
|
|
e.stderr.decode(errors='replace')[:200] if e.stderr else str(e))
|
|
except Exception as e:
|
|
logger.error("Unexpected error converting %s: %s", fname, e)
|
|
|
|
return converted
|
|
|
|
|
|
def _find_pairs(subfolder_path):
|
|
"""Find content files (with optional sidecar) in a subfolder.
|
|
|
|
A pair is:
|
|
<basename>.<ext> — content file
|
|
<basename>.meta.json — optional sidecar
|
|
|
|
Returns list of (content_path, meta_path_or_None, basename) tuples.
|
|
"""
|
|
if not os.path.isdir(subfolder_path):
|
|
return []
|
|
|
|
files = set(os.listdir(subfolder_path))
|
|
pairs = []
|
|
seen_basenames = set()
|
|
|
|
# First pass: find .meta.json files and their matching content
|
|
for fname in sorted(files):
|
|
if fname.endswith('.meta.json'):
|
|
basename = fname[:-len('.meta.json')]
|
|
for ext in sorted(CONTENT_EXTENSIONS):
|
|
content_name = basename + ext
|
|
if content_name in files:
|
|
pairs.append((
|
|
os.path.join(subfolder_path, content_name),
|
|
os.path.join(subfolder_path, fname),
|
|
basename,
|
|
))
|
|
seen_basenames.add(content_name)
|
|
break
|
|
|
|
# Second pass: find solo content files (no sidecar)
|
|
for fname in sorted(files):
|
|
if fname in seen_basenames:
|
|
continue
|
|
_stem, ext = os.path.splitext(fname)
|
|
if ext.lower() in CONTENT_EXTENSIONS and not fname.endswith('.meta.json'):
|
|
pairs.append((
|
|
os.path.join(subfolder_path, fname),
|
|
None,
|
|
_stem,
|
|
))
|
|
|
|
return pairs
|
|
|
|
|
|
def _is_stable(filepath, stability_seconds):
|
|
"""Check if a file's mtime is older than stability_seconds ago."""
|
|
try:
|
|
mtime = os.path.getmtime(filepath)
|
|
return (time.time() - mtime) >= stability_seconds
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def dispatch_once():
|
|
"""One-shot dispatch: scan all configured acquired/ subfolders once.
|
|
|
|
Returns list of result dicts from processor pre_flight calls.
|
|
"""
|
|
config = get_config()
|
|
pipeline_cfg = config.get('pipeline', {})
|
|
acquired_root = pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired')
|
|
dispatch_map = pipeline_cfg.get('dispatch', {})
|
|
stability_seconds = pipeline_cfg.get('mtime_stability_seconds', 10)
|
|
|
|
db = StatusDB(config['paths']['db'])
|
|
results = []
|
|
|
|
for subfolder_name, processor_name in dispatch_map.items():
|
|
subfolder_path = os.path.join(acquired_root, subfolder_name)
|
|
|
|
processor = _load_processor(processor_name)
|
|
if processor is None:
|
|
continue
|
|
|
|
if not hasattr(processor, 'pre_flight'):
|
|
logger.error("Processor %s has no pre_flight function", processor_name)
|
|
continue
|
|
|
|
# Convert non-standard formats to PDF before scanning for pairs
|
|
_normalize_formats(subfolder_path)
|
|
|
|
pairs = _find_pairs(subfolder_path)
|
|
if not pairs:
|
|
continue
|
|
|
|
for content_path, meta_path, basename in pairs:
|
|
# Content file must be stable; sidecar too if present
|
|
if not _is_stable(content_path, stability_seconds):
|
|
logger.debug("File %s not yet stable, skipping", basename)
|
|
continue
|
|
if meta_path and not _is_stable(meta_path, stability_seconds):
|
|
logger.debug("Sidecar for %s not yet stable, skipping", basename)
|
|
continue
|
|
|
|
logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name)
|
|
try:
|
|
result = processor.pre_flight(content_path, meta_path, db, config)
|
|
results.append(result)
|
|
logger.info("Result for %s: %s", basename, result.get('action', 'unknown'))
|
|
except Exception as e:
|
|
logger.error("Processor %s crashed on %s: %s", processor_name, basename, e)
|
|
results.append({
|
|
'action': 'error',
|
|
'error': str(e),
|
|
'basename': basename,
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
def dispatch_loop(stop_event, db, config, interval=30):
|
|
"""Run dispatch_once() on a loop until stop_event is set.
|
|
|
|
Designed to run as a service thread. Never raises to the caller.
|
|
"""
|
|
logger.info("[dispatcher] Loop started (interval: %ds)", interval)
|
|
|
|
while not stop_event.is_set():
|
|
try:
|
|
results = dispatch_once()
|
|
if results:
|
|
actions = {}
|
|
for r in results:
|
|
a = r.get('action', 'unknown')
|
|
actions[a] = actions.get(a, 0) + 1
|
|
logger.info("[dispatcher] Dispatched %d items: %s",
|
|
len(results),
|
|
", ".join(f"{k}={v}" for k, v in sorted(actions.items())))
|
|
else:
|
|
logger.debug("[dispatcher] No items to dispatch")
|
|
except Exception as e:
|
|
logger.error("[dispatcher] Error in dispatch_once: %s", e, exc_info=True)
|
|
|
|
stop_event.wait(interval)
|
|
|
|
logger.info("[dispatcher] Loop stopped")
|