recon/lib/dispatcher.py

237 lines
8.3 KiB
Python
Raw Permalink Normal View History

"""
RECON Dispatcher
Watches configured acquired/<subfolder>/ directories for content+sidecar pairs
that have been stable (mtime unchanged) for the configured threshold, then
hands them to the appropriate processor's pre_flight().
Phase 3: importable one-shot dispatcher. Service-loop integration in Phase 5.
Phase 4: sidecar is optional (PDFs may arrive without .meta.json).
Phase 6f-2: format normalizer converts non-standard formats to PDF before dispatch.
"""
import importlib
import logging
import os
import subprocess
import time
from .utils import get_config
from .status import StatusDB
logger = logging.getLogger("recon.dispatcher")
# Content file extensions recognized by the dispatcher
CONTENT_EXTENSIONS = {'.txt', '.vtt', '.html', '.pdf'}
# Non-standard formats that can be converted to PDF before dispatch
CONVERTIBLE_EXTENSIONS = {'.epub', '.mobi', '.doc', '.docx'}
def _load_processor(processor_name):
"""Dynamically import a processor module from lib.processors."""
module_path = f"lib.processors.{processor_name}"
try:
return importlib.import_module(module_path)
except ModuleNotFoundError:
logger.debug("Processor module not found: %s (not yet implemented)", processor_name)
return None
except ImportError as e:
logger.error("Failed to import processor %s: %s", processor_name, e)
return None
def _normalize_formats(subfolder_path):
"""Convert non-standard document formats to PDF before dispatch.
Walks the subfolder for files with convertible extensions (.epub, .mobi,
.doc, .docx). Converts each to PDF using the appropriate tool, then
deletes the original.
Returns count of files converted.
"""
if not os.path.isdir(subfolder_path):
return 0
converted = 0
for fname in sorted(os.listdir(subfolder_path)):
stem, ext = os.path.splitext(fname)
if ext.lower() not in CONVERTIBLE_EXTENSIONS:
continue
source = os.path.join(subfolder_path, fname)
target = os.path.join(subfolder_path, stem + '.pdf')
if os.path.exists(target):
logger.debug("Target PDF already exists, skipping: %s", fname)
continue
try:
if ext.lower() in ('.epub', '.mobi'):
subprocess.run(
['ebook-convert', source, target],
capture_output=True, check=True, timeout=300,
)
elif ext.lower() in ('.doc', '.docx'):
subprocess.run(
['libreoffice', '--headless', '--convert-to', 'pdf',
'--outdir', subfolder_path, source],
capture_output=True, check=True, timeout=300,
)
if os.path.isfile(target) and os.path.getsize(target) > 0:
os.remove(source)
converted += 1
logger.info("Converted %s -> %s.pdf", fname, stem)
else:
logger.warning("Conversion produced no output: %s", fname)
except subprocess.TimeoutExpired:
logger.error("Conversion timed out: %s", fname)
except subprocess.CalledProcessError as e:
logger.error("Conversion failed for %s: %s", fname,
e.stderr.decode(errors='replace')[:200] if e.stderr else str(e))
except Exception as e:
logger.error("Unexpected error converting %s: %s", fname, e)
return converted
def _find_pairs(subfolder_path):
"""Find content files (with optional sidecar) in a subfolder.
A pair is:
<basename>.<ext> content file
<basename>.meta.json optional sidecar
Returns list of (content_path, meta_path_or_None, basename) tuples.
"""
if not os.path.isdir(subfolder_path):
return []
files = set(os.listdir(subfolder_path))
pairs = []
seen_basenames = set()
# First pass: find .meta.json files and their matching content
for fname in sorted(files):
if fname.endswith('.meta.json'):
basename = fname[:-len('.meta.json')]
for ext in sorted(CONTENT_EXTENSIONS):
content_name = basename + ext
if content_name in files:
pairs.append((
os.path.join(subfolder_path, content_name),
os.path.join(subfolder_path, fname),
basename,
))
seen_basenames.add(content_name)
break
# Second pass: find solo content files (no sidecar)
for fname in sorted(files):
if fname in seen_basenames:
continue
_stem, ext = os.path.splitext(fname)
if ext.lower() in CONTENT_EXTENSIONS and not fname.endswith('.meta.json'):
pairs.append((
os.path.join(subfolder_path, fname),
None,
_stem,
))
return pairs
def _is_stable(filepath, stability_seconds):
"""Check if a file's mtime is older than stability_seconds ago."""
try:
mtime = os.path.getmtime(filepath)
return (time.time() - mtime) >= stability_seconds
except OSError:
return False
def dispatch_once():
"""One-shot dispatch: scan all configured acquired/ subfolders once.
Returns list of result dicts from processor pre_flight calls.
"""
config = get_config()
pipeline_cfg = config.get('pipeline', {})
acquired_root = pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired')
dispatch_map = pipeline_cfg.get('dispatch', {})
stability_seconds = pipeline_cfg.get('mtime_stability_seconds', 10)
db = StatusDB(config['paths']['db'])
results = []
for subfolder_name, processor_name in dispatch_map.items():
subfolder_path = os.path.join(acquired_root, subfolder_name)
processor = _load_processor(processor_name)
if processor is None:
continue
if not hasattr(processor, 'pre_flight'):
logger.error("Processor %s has no pre_flight function", processor_name)
continue
# Convert non-standard formats to PDF before scanning for pairs
_normalize_formats(subfolder_path)
pairs = _find_pairs(subfolder_path)
if not pairs:
continue
for content_path, meta_path, basename in pairs:
# Content file must be stable; sidecar too if present
if not _is_stable(content_path, stability_seconds):
logger.debug("File %s not yet stable, skipping", basename)
continue
if meta_path and not _is_stable(meta_path, stability_seconds):
logger.debug("Sidecar for %s not yet stable, skipping", basename)
continue
logger.info("Dispatching %s/%s to %s", subfolder_name, basename, processor_name)
try:
result = processor.pre_flight(content_path, meta_path, db, config)
results.append(result)
logger.info("Result for %s: %s", basename, result.get('action', 'unknown'))
except Exception as e:
logger.error("Processor %s crashed on %s: %s", processor_name, basename, e)
results.append({
'action': 'error',
'error': str(e),
'basename': basename,
})
return results
def dispatch_loop(stop_event, db, config, interval=30):
"""Run dispatch_once() on a loop until stop_event is set.
Designed to run as a service thread. Never raises to the caller.
"""
logger.info("[dispatcher] Loop started (interval: %ds)", interval)
while not stop_event.is_set():
try:
results = dispatch_once()
if results:
actions = {}
for r in results:
a = r.get('action', 'unknown')
actions[a] = actions.get(a, 0) + 1
logger.info("[dispatcher] Dispatched %d items: %s",
len(results),
", ".join(f"{k}={v}" for k, v in sorted(actions.items())))
else:
logger.debug("[dispatcher] No items to dispatch")
except Exception as e:
logger.error("[dispatcher] Error in dispatch_once: %s", e, exc_info=True)
stop_event.wait(interval)
logger.info("[dispatcher] Loop stopped")