diff --git a/lib/dispatcher.py b/lib/dispatcher.py index f9a7b78..a26436c 100644 --- a/lib/dispatcher.py +++ b/lib/dispatcher.py @@ -142,3 +142,31 @@ def dispatch_once(): }) return results + + +def dispatch_loop(stop_event, db, config, interval=30): + """Run dispatch_once() on a loop until stop_event is set. + + Designed to run as a service thread. Never raises to the caller. + """ + logger.info("[dispatcher] Loop started (interval: %ds)", interval) + + while not stop_event.is_set(): + try: + results = dispatch_once() + if results: + actions = {} + for r in results: + a = r.get('action', 'unknown') + actions[a] = actions.get(a, 0) + 1 + logger.info("[dispatcher] Dispatched %d items: %s", + len(results), + ", ".join(f"{k}={v}" for k, v in sorted(actions.items()))) + else: + logger.debug("[dispatcher] No items to dispatch") + except Exception as e: + logger.error("[dispatcher] Error in dispatch_once: %s", e, exc_info=True) + + stop_event.wait(interval) + + logger.info("[dispatcher] Loop stopped") diff --git a/lib/filing.py b/lib/filing.py index 4267922..fc8ff00 100644 --- a/lib/filing.py +++ b/lib/filing.py @@ -156,3 +156,60 @@ def file_processed_item(doc_hash, source_file_path, db, config, dry_run=False): logger.error("DB/Qdrant update failed for %s: %s", doc_hash[:8], e) return result + + +def filing_worker_loop(stop_event, db, config, interval=30): + """Run filing on items ready to be filed until stop_event is set. + + Watches for documents with status='complete', organized_at IS NULL, + and path in /opt/recon/data/processing/. Files them to library. + + Designed to run as a service thread. Never raises to the caller. + """ + logger.info("[filing] Worker started (interval: %ds)", interval) + + while not stop_event.is_set(): + try: + conn = db._get_conn() + rows = conn.execute( + "SELECT hash, path FROM documents " + "WHERE status = 'complete' " + "AND organized_at IS NULL " + "AND path LIKE '/opt/recon/data/processing/%' " + "LIMIT 50" + ).fetchall() + + if rows: + filed = 0 + skipped = 0 + errors = 0 + for row in rows: + if stop_event.is_set(): + break + try: + result = file_processed_item(row['hash'], row['path'], db, config) + action = result.get('action', 'unknown') + if action == 'filed': + filed += 1 + elif action.startswith('skip'): + skipped += 1 + elif action == 'error': + errors += 1 + logger.warning("[filing] Error filing %s: %s", + row['hash'][:8], result.get('error', 'unknown')) + except Exception as e: + errors += 1 + logger.error("[filing] Exception filing %s: %s", + row['hash'][:8], e, exc_info=True) + + logger.info("[filing] Batch: %d filed, %d skipped, %d errors", + filed, skipped, errors) + else: + logger.debug("[filing] No items ready to file") + + except Exception as e: + logger.error("[filing] Error in filing worker: %s", e, exc_info=True) + + stop_event.wait(interval) + + logger.info("[filing] Worker stopped") diff --git a/recon.py b/recon.py index 9cf5814..e00dfa2 100755 --- a/recon.py +++ b/recon.py @@ -668,14 +668,15 @@ def cmd_serve(args): def cmd_service(args): """Run RECON as a long-lived service. Called by systemd. - Bundles: Flask dashboard + pipeline stages + library scanner + PeerTube scanner + progress reporter. + Bundles: Flask dashboard + dispatcher + pipeline stages + filing worker + progress reporter. All threads are daemon threads; SIGTERM/SIGINT trigger graceful shutdown. """ from lib.extractor import run_extraction from lib.enricher import run_enrichment from lib.embedder import run_embedding from lib.api import app, run_server as start_dashboard - from lib.peertube_scraper import ingest_all as pt_ingest_all + from lib.dispatcher import dispatch_loop + from lib.filing import filing_worker_loop config = get_config() proc = config.get('processing', {}) @@ -685,13 +686,14 @@ def cmd_service(args): enrich_workers = proc.get('enrich_workers', 16) embed_workers = proc.get('embed_workers', 4) poll_interval = svc.get('stage_poll_interval', 30) - scan_interval = svc.get('scan_interval', 3600) + dispatch_interval = svc.get('dispatch_interval', 30) + filing_interval = svc.get('filing_interval', 30) progress_interval = svc.get('progress_interval', 60) web_host = config.get('web', {}).get('host', '0.0.0.0') web_port = config.get('web', {}).get('port', 8420) stop_event = threading.Event() - totals = {'extract': 0, 'enrich': 0, 'embed': 0, 'scan': 0} + totals = {'extract': 0, 'enrich': 0, 'embed': 0} def shutdown(signum, frame): sig_name = signal.Signals(signum).name @@ -716,36 +718,6 @@ def cmd_service(args): stop_event.wait(poll_interval) logger.info(f"[{name}] Stage stopped (total: {totals[name]})") - def scanner_loop(): - """Periodically scan library and queue new documents.""" - logger.info(f"[scanner] Started (interval: {scan_interval}s)") - # Run initial scan immediately - try: - new_cat = scan_library() - new_queued = queue_all() - synced = sync_qdrant_paths() - if new_cat or new_queued or synced: - logger.info(f"[scanner] Initial: {new_cat} catalogued, {new_queued} queued, {synced} paths synced") - except Exception as e: - logger.error(f"[scanner] Initial scan error: {e}", exc_info=True) - - while not stop_event.is_set(): - stop_event.wait(scan_interval) - if stop_event.is_set(): - break - try: - new_cat = scan_library() - new_queued = queue_all() - synced = sync_qdrant_paths() - totals['scan'] += new_queued - if new_cat or new_queued or synced: - logger.info(f"[scanner] {new_cat} catalogued, {new_queued} queued, {synced} paths synced") - else: - logger.debug("[scanner] No new documents") - except Exception as e: - logger.error(f"[scanner] Error: {e}", exc_info=True) - logger.info("[scanner] Stopped") - def progress_loop(): """Log pipeline status periodically.""" while not stop_event.is_set(): @@ -769,217 +741,19 @@ def cmd_service(args): except Exception: pass - def peertube_scanner_loop(): - """Periodically ingest new PeerTube video transcripts.""" - from lib.peertube_scraper import set_stop_check - set_stop_check(stop_event.is_set) - logger.info(f"[peertube] Scanner started (interval: {scan_interval}s)") - try: - result = pt_ingest_all(config=config) - ingested = result.get('ingested', 0) if isinstance(result, dict) else 0 - if ingested: - logger.info(f"[peertube] Initial scan: {ingested} transcripts ingested") - else: - logger.info("[peertube] Initial scan: no new transcripts") - except Exception as e: - logger.error(f"[peertube] Initial scan error: {e}", exc_info=True) - - while not stop_event.is_set(): - stop_event.wait(scan_interval) - if stop_event.is_set(): - break - try: - result = pt_ingest_all(config=config) - ingested = result.get('ingested', 0) if isinstance(result, dict) else 0 - if ingested: - logger.info(f"[peertube] {ingested} new transcripts ingested") - else: - logger.debug("[peertube] No new transcripts") - except Exception as e: - logger.error(f"[peertube] Scan error: {e}", exc_info=True) - logger.info("[peertube] Scanner stopped") - - def crawler_scheduler_loop(): - """Scheduled site crawler — crawls configured sites, one per cycle. - - - Reads sites from config.yaml crawler.sites list - - Processes sites in tier order (lower tier = higher priority) - - Tracks last-crawled timestamp per site in crawl_state.json - - Skips sites crawled within recrawl_interval_days (default 7) - - Waits inter_site_cooldown seconds between sites (default 30) - - Uses per-site delay for rate limiting during crawl - """ - import json as _json - from lib.crawler import crawl_site - - crawler_cfg = config.get('crawler', {}) - sites = crawler_cfg.get('sites', []) - if not sites: - logger.info("[crawler] No sites configured, scheduler disabled") - return - - recrawl_days = crawler_cfg.get('recrawl_interval_days', 7) - cooldown = crawler_cfg.get('inter_site_cooldown', 30) - state_file = os.path.join(config.get('paths', {}).get('data', '/opt/recon/data'), 'crawl_state.json') - - # Load persisted state - def load_state(): - try: - with open(state_file, 'r') as sf: - return _json.load(sf) - except (FileNotFoundError, _json.JSONDecodeError): - return {} - - def save_state(state): - try: - with open(state_file, 'w') as sf: - _json.dump(state, sf, indent=2) - except Exception as e: - logger.error(f"[crawler] Failed to save state: {e}") - - # Sort by tier (lower = higher priority) - sorted_sites = sorted(sites, key=lambda s: s.get('tier', 99)) - logger.info(f"[crawler] Scheduler started — {len(sorted_sites)} sites configured, " - f"recrawl every {recrawl_days}d, {cooldown}s cooldown between sites") - - # Initial delay — let the main pipeline stabilize before crawling - stop_event.wait(60) - if stop_event.is_set(): - return - - while not stop_event.is_set(): - state = load_state() - now = time.time() - crawled_this_cycle = 0 - - for site in sorted_sites: - if stop_event.is_set(): - break - - url = site.get('url', '') - if not url: - continue - - # Check recrawl interval - last_crawled = state.get(url, {}).get('last_crawled', 0) - age_days = (now - last_crawled) / 86400 - if age_days < recrawl_days: - logger.debug(f"[crawler] Skipping {url} — crawled {age_days:.1f}d ago") - continue - - category = site.get('category', 'Web') - max_depth = site.get('max_depth', crawler_cfg.get('max_depth', 3)) - max_pages = site.get('max_pages', crawler_cfg.get('max_pages', 500)) - delay = site.get('delay', crawler_cfg.get('rate_limit_delay', 1.0)) - tier = site.get('tier', 99) - notes = site.get('notes', '') - - logger.info(f"[crawler] Starting: {url} (tier {tier}, category={category}, " - f"depth={max_depth}, pages={max_pages}, delay={delay}s)") - - try: - result = crawl_site( - base_url=url, - category=category, - max_pages=max_pages, - max_depth=max_depth, - delay=delay, - config=config, - ) - - summary = result.get('summary', {}) - method = result.get('discovery_method', 'none') - logger.info(f"[crawler] Done: {url} via {method} — " - f"{summary.get('succeeded', 0)} new, " - f"{summary.get('duplicates', 0)} dupes, " - f"{summary.get('failed', 0)} failed") - - # Update state - state[url] = { - 'last_crawled': time.time(), - 'method': method, - 'succeeded': summary.get('succeeded', 0), - 'duplicates': summary.get('duplicates', 0), - 'failed': summary.get('failed', 0), - 'tier': tier, - 'category': category, - } - save_state(state) - crawled_this_cycle += 1 - - except Exception as e: - logger.error(f"[crawler] Failed: {url} — {e}", exc_info=True) - state[url] = { - 'last_crawled': time.time(), - 'error': str(e), - 'tier': tier, - 'category': category, - } - save_state(state) - - # Inter-site cooldown - if not stop_event.is_set(): - logger.debug(f"[crawler] Cooling down {cooldown}s before next site...") - stop_event.wait(cooldown) - - if crawled_this_cycle: - logger.info(f"[crawler] Cycle complete — {crawled_this_cycle} sites crawled") - else: - logger.debug("[crawler] Cycle complete — all sites within recrawl window") - - # Sleep until next cycle check (1 hour) - if not stop_event.is_set(): - stop_event.wait(3600) - - logger.info("[crawler] Scheduler stopped") - - def organizer_loop(): - """Organize completed documents into Domain/Subdomain folders.""" - from lib.organizer import organize_document - logger.info(f"[organizer] Started (interval: {poll_interval}s)") - # Initial delay — let pipeline process some docs first - stop_event.wait(60) - - while not stop_event.is_set(): - try: - org_db = StatusDB() - unorganized = org_db.get_unorganized(limit=100) - if unorganized: - moved = 0 - errors = 0 - for doc in unorganized: - if stop_event.is_set(): - break - result = organize_document(doc['hash'], org_db, config) - if result['action'] == 'moved': - moved += 1 - elif result['action'] == 'error': - errors += 1 - if moved or errors: - logger.info(f"[organizer] Organized {moved} documents ({errors} errors)") - # Sync paths to Qdrant after batch - if moved > 0: - synced = sync_qdrant_paths() - if synced: - logger.info(f"[organizer] Synced {synced} paths to Qdrant") - else: - logger.debug("[organizer] No unorganized documents") - except Exception as e: - logger.error(f"[organizer] Error: {e}", exc_info=True) - stop_event.wait(poll_interval) - logger.info("[organizer] Stopped") + db = StatusDB() threads = [ + threading.Thread(target=lambda: dispatch_loop(stop_event, db, config, interval=dispatch_interval), + daemon=True, name='dispatcher'), threading.Thread(target=stage_loop, daemon=True, name='extract', args=('extract', lambda: run_extraction(workers=extract_workers))), threading.Thread(target=stage_loop, daemon=True, name='enrich', args=('enrich', lambda: run_enrichment(workers=enrich_workers))), threading.Thread(target=stage_loop, daemon=True, name='embed', args=('embed', lambda: run_embedding(workers=embed_workers))), - threading.Thread(target=scanner_loop, daemon=True, name='scanner'), - threading.Thread(target=peertube_scanner_loop, daemon=True, name='peertube'), - threading.Thread(target=crawler_scheduler_loop, daemon=True, name='crawler'), - threading.Thread(target=organizer_loop, daemon=True, name='organizer'), + threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval), + daemon=True, name='filing'), threading.Thread(target=progress_loop, daemon=True, name='progress'), threading.Thread(target=lambda: start_dashboard(stop_event), daemon=True, name='dashboard'), @@ -988,9 +762,8 @@ def cmd_service(args): logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: extract={extract_workers}, enrich={enrich_workers}, embed={embed_workers}") - logger.info(f" Scanner: every {scan_interval}s | Progress: every {progress_interval}s") - crawler_sites = config.get('crawler', {}).get('sites', []) - logger.info(f" Crawler: {len(crawler_sites)} sites configured") + logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") + logger.info(f" Progress: every {progress_interval}s") for t in threads: t.start() @@ -1018,8 +791,6 @@ def cmd_service(args): logger.info("=== RECON Service Stopped ===") return 0 - - def cmd_ingest_peertube(args): from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats