Phase 5c-1: dispatcher loop, filing worker loop, service rewire

Adds dispatch_loop() alongside dispatch_once() for service-thread use.
Adds filing_worker_loop() that watches for status=complete items in
/opt/recon/data/processing/ and files them to library/Domain/Subdomain/.

Rewires cmd_service() to run the new architecture:
- Removed: scanner_loop, peertube_scanner_loop, crawler_scheduler_loop,
  organizer_loop (all replaced by dispatcher + new filing worker)
- Kept: enrich and embed stage workers, progress, dashboard
- Kept (vestigial): extract stage worker — will be removed in Phase 6 cleanup
- Added: dispatcher loop thread, filing worker thread

Phase 5c-1 of the refactor. Service not yet started — Phase 5c-2 will do that.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-14 18:30:58 +00:00
commit d9aed35fd7
3 changed files with 98 additions and 242 deletions

255
recon.py
View file

@ -668,14 +668,15 @@ def cmd_serve(args):
def cmd_service(args):
"""Run RECON as a long-lived service. Called by systemd.
Bundles: Flask dashboard + pipeline stages + library scanner + PeerTube scanner + progress reporter.
Bundles: Flask dashboard + dispatcher + pipeline stages + filing worker + progress reporter.
All threads are daemon threads; SIGTERM/SIGINT trigger graceful shutdown.
"""
from lib.extractor import run_extraction
from lib.enricher import run_enrichment
from lib.embedder import run_embedding
from lib.api import app, run_server as start_dashboard
from lib.peertube_scraper import ingest_all as pt_ingest_all
from lib.dispatcher import dispatch_loop
from lib.filing import filing_worker_loop
config = get_config()
proc = config.get('processing', {})
@ -685,13 +686,14 @@ def cmd_service(args):
enrich_workers = proc.get('enrich_workers', 16)
embed_workers = proc.get('embed_workers', 4)
poll_interval = svc.get('stage_poll_interval', 30)
scan_interval = svc.get('scan_interval', 3600)
dispatch_interval = svc.get('dispatch_interval', 30)
filing_interval = svc.get('filing_interval', 30)
progress_interval = svc.get('progress_interval', 60)
web_host = config.get('web', {}).get('host', '0.0.0.0')
web_port = config.get('web', {}).get('port', 8420)
stop_event = threading.Event()
totals = {'extract': 0, 'enrich': 0, 'embed': 0, 'scan': 0}
totals = {'extract': 0, 'enrich': 0, 'embed': 0}
def shutdown(signum, frame):
sig_name = signal.Signals(signum).name
@ -716,36 +718,6 @@ def cmd_service(args):
stop_event.wait(poll_interval)
logger.info(f"[{name}] Stage stopped (total: {totals[name]})")
def scanner_loop():
"""Periodically scan library and queue new documents."""
logger.info(f"[scanner] Started (interval: {scan_interval}s)")
# Run initial scan immediately
try:
new_cat = scan_library()
new_queued = queue_all()
synced = sync_qdrant_paths()
if new_cat or new_queued or synced:
logger.info(f"[scanner] Initial: {new_cat} catalogued, {new_queued} queued, {synced} paths synced")
except Exception as e:
logger.error(f"[scanner] Initial scan error: {e}", exc_info=True)
while not stop_event.is_set():
stop_event.wait(scan_interval)
if stop_event.is_set():
break
try:
new_cat = scan_library()
new_queued = queue_all()
synced = sync_qdrant_paths()
totals['scan'] += new_queued
if new_cat or new_queued or synced:
logger.info(f"[scanner] {new_cat} catalogued, {new_queued} queued, {synced} paths synced")
else:
logger.debug("[scanner] No new documents")
except Exception as e:
logger.error(f"[scanner] Error: {e}", exc_info=True)
logger.info("[scanner] Stopped")
def progress_loop():
"""Log pipeline status periodically."""
while not stop_event.is_set():
@ -769,217 +741,19 @@ def cmd_service(args):
except Exception:
pass
def peertube_scanner_loop():
"""Periodically ingest new PeerTube video transcripts."""
from lib.peertube_scraper import set_stop_check
set_stop_check(stop_event.is_set)
logger.info(f"[peertube] Scanner started (interval: {scan_interval}s)")
try:
result = pt_ingest_all(config=config)
ingested = result.get('ingested', 0) if isinstance(result, dict) else 0
if ingested:
logger.info(f"[peertube] Initial scan: {ingested} transcripts ingested")
else:
logger.info("[peertube] Initial scan: no new transcripts")
except Exception as e:
logger.error(f"[peertube] Initial scan error: {e}", exc_info=True)
while not stop_event.is_set():
stop_event.wait(scan_interval)
if stop_event.is_set():
break
try:
result = pt_ingest_all(config=config)
ingested = result.get('ingested', 0) if isinstance(result, dict) else 0
if ingested:
logger.info(f"[peertube] {ingested} new transcripts ingested")
else:
logger.debug("[peertube] No new transcripts")
except Exception as e:
logger.error(f"[peertube] Scan error: {e}", exc_info=True)
logger.info("[peertube] Scanner stopped")
def crawler_scheduler_loop():
"""Scheduled site crawler — crawls configured sites, one per cycle.
- Reads sites from config.yaml crawler.sites list
- Processes sites in tier order (lower tier = higher priority)
- Tracks last-crawled timestamp per site in crawl_state.json
- Skips sites crawled within recrawl_interval_days (default 7)
- Waits inter_site_cooldown seconds between sites (default 30)
- Uses per-site delay for rate limiting during crawl
"""
import json as _json
from lib.crawler import crawl_site
crawler_cfg = config.get('crawler', {})
sites = crawler_cfg.get('sites', [])
if not sites:
logger.info("[crawler] No sites configured, scheduler disabled")
return
recrawl_days = crawler_cfg.get('recrawl_interval_days', 7)
cooldown = crawler_cfg.get('inter_site_cooldown', 30)
state_file = os.path.join(config.get('paths', {}).get('data', '/opt/recon/data'), 'crawl_state.json')
# Load persisted state
def load_state():
try:
with open(state_file, 'r') as sf:
return _json.load(sf)
except (FileNotFoundError, _json.JSONDecodeError):
return {}
def save_state(state):
try:
with open(state_file, 'w') as sf:
_json.dump(state, sf, indent=2)
except Exception as e:
logger.error(f"[crawler] Failed to save state: {e}")
# Sort by tier (lower = higher priority)
sorted_sites = sorted(sites, key=lambda s: s.get('tier', 99))
logger.info(f"[crawler] Scheduler started — {len(sorted_sites)} sites configured, "
f"recrawl every {recrawl_days}d, {cooldown}s cooldown between sites")
# Initial delay — let the main pipeline stabilize before crawling
stop_event.wait(60)
if stop_event.is_set():
return
while not stop_event.is_set():
state = load_state()
now = time.time()
crawled_this_cycle = 0
for site in sorted_sites:
if stop_event.is_set():
break
url = site.get('url', '')
if not url:
continue
# Check recrawl interval
last_crawled = state.get(url, {}).get('last_crawled', 0)
age_days = (now - last_crawled) / 86400
if age_days < recrawl_days:
logger.debug(f"[crawler] Skipping {url} — crawled {age_days:.1f}d ago")
continue
category = site.get('category', 'Web')
max_depth = site.get('max_depth', crawler_cfg.get('max_depth', 3))
max_pages = site.get('max_pages', crawler_cfg.get('max_pages', 500))
delay = site.get('delay', crawler_cfg.get('rate_limit_delay', 1.0))
tier = site.get('tier', 99)
notes = site.get('notes', '')
logger.info(f"[crawler] Starting: {url} (tier {tier}, category={category}, "
f"depth={max_depth}, pages={max_pages}, delay={delay}s)")
try:
result = crawl_site(
base_url=url,
category=category,
max_pages=max_pages,
max_depth=max_depth,
delay=delay,
config=config,
)
summary = result.get('summary', {})
method = result.get('discovery_method', 'none')
logger.info(f"[crawler] Done: {url} via {method}"
f"{summary.get('succeeded', 0)} new, "
f"{summary.get('duplicates', 0)} dupes, "
f"{summary.get('failed', 0)} failed")
# Update state
state[url] = {
'last_crawled': time.time(),
'method': method,
'succeeded': summary.get('succeeded', 0),
'duplicates': summary.get('duplicates', 0),
'failed': summary.get('failed', 0),
'tier': tier,
'category': category,
}
save_state(state)
crawled_this_cycle += 1
except Exception as e:
logger.error(f"[crawler] Failed: {url}{e}", exc_info=True)
state[url] = {
'last_crawled': time.time(),
'error': str(e),
'tier': tier,
'category': category,
}
save_state(state)
# Inter-site cooldown
if not stop_event.is_set():
logger.debug(f"[crawler] Cooling down {cooldown}s before next site...")
stop_event.wait(cooldown)
if crawled_this_cycle:
logger.info(f"[crawler] Cycle complete — {crawled_this_cycle} sites crawled")
else:
logger.debug("[crawler] Cycle complete — all sites within recrawl window")
# Sleep until next cycle check (1 hour)
if not stop_event.is_set():
stop_event.wait(3600)
logger.info("[crawler] Scheduler stopped")
def organizer_loop():
"""Organize completed documents into Domain/Subdomain folders."""
from lib.organizer import organize_document
logger.info(f"[organizer] Started (interval: {poll_interval}s)")
# Initial delay — let pipeline process some docs first
stop_event.wait(60)
while not stop_event.is_set():
try:
org_db = StatusDB()
unorganized = org_db.get_unorganized(limit=100)
if unorganized:
moved = 0
errors = 0
for doc in unorganized:
if stop_event.is_set():
break
result = organize_document(doc['hash'], org_db, config)
if result['action'] == 'moved':
moved += 1
elif result['action'] == 'error':
errors += 1
if moved or errors:
logger.info(f"[organizer] Organized {moved} documents ({errors} errors)")
# Sync paths to Qdrant after batch
if moved > 0:
synced = sync_qdrant_paths()
if synced:
logger.info(f"[organizer] Synced {synced} paths to Qdrant")
else:
logger.debug("[organizer] No unorganized documents")
except Exception as e:
logger.error(f"[organizer] Error: {e}", exc_info=True)
stop_event.wait(poll_interval)
logger.info("[organizer] Stopped")
db = StatusDB()
threads = [
threading.Thread(target=lambda: dispatch_loop(stop_event, db, config, interval=dispatch_interval),
daemon=True, name='dispatcher'),
threading.Thread(target=stage_loop, daemon=True, name='extract',
args=('extract', lambda: run_extraction(workers=extract_workers))),
threading.Thread(target=stage_loop, daemon=True, name='enrich',
args=('enrich', lambda: run_enrichment(workers=enrich_workers))),
threading.Thread(target=stage_loop, daemon=True, name='embed',
args=('embed', lambda: run_embedding(workers=embed_workers))),
threading.Thread(target=scanner_loop, daemon=True, name='scanner'),
threading.Thread(target=peertube_scanner_loop, daemon=True, name='peertube'),
threading.Thread(target=crawler_scheduler_loop, daemon=True, name='crawler'),
threading.Thread(target=organizer_loop, daemon=True, name='organizer'),
threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval),
daemon=True, name='filing'),
threading.Thread(target=progress_loop, daemon=True, name='progress'),
threading.Thread(target=lambda: start_dashboard(stop_event),
daemon=True, name='dashboard'),
@ -988,9 +762,8 @@ def cmd_service(args):
logger.info("=== RECON Service Starting ===")
logger.info(f" Dashboard: {web_host}:{web_port}")
logger.info(f" Workers: extract={extract_workers}, enrich={enrich_workers}, embed={embed_workers}")
logger.info(f" Scanner: every {scan_interval}s | Progress: every {progress_interval}s")
crawler_sites = config.get('crawler', {}).get('sites', [])
logger.info(f" Crawler: {len(crawler_sites)} sites configured")
logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s")
logger.info(f" Progress: every {progress_interval}s")
for t in threads:
t.start()
@ -1018,8 +791,6 @@ def cmd_service(args):
logger.info("=== RECON Service Stopped ===")
return 0
def cmd_ingest_peertube(args):
from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats