From 277110d9992794e9239c0d4afb520781f5bfbe1f Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 03:08:51 +0000 Subject: [PATCH] Phase 6d: PeerTube acquisition module + service thread New lib/acquisition/peertube.py replaces the removed peertube_scanner_loop. Polls PeerTube API every 30min, dedupes against catalogue (UUID + title), writes flat file pairs to data/acquired/stream/ for the dispatcher. - acquire_batch(): one-shot find-and-acquire with rate limiting - acquisition_loop(): service thread wrapper (interval from config) - list_new_videos(): dedup via _build_known_sets() against catalogue - acquire_one(): fetch VTT, convert, write .tmp then rename atomically cmd_service(): added peertube-acq daemon thread cmd_ingest_peertube(): rewired to use acquire_batch(), drops --channel/ --since/--enrich/--process (dispatcher handles full pipeline) config.yaml: added peertube.poll_interval: 1800 Co-Authored-By: Claude Opus 4.6 --- config.yaml | 1 + lib/acquisition/peertube.py | 224 ++++++++++++++++++++++++++++++++++++ recon.py | 52 ++++----- 3 files changed, 245 insertions(+), 32 deletions(-) create mode 100644 lib/acquisition/peertube.py diff --git a/config.yaml b/config.yaml index 1f5d1b0..a9dcf16 100644 --- a/config.yaml +++ b/config.yaml @@ -411,6 +411,7 @@ peertube: public_url: https://stream.echo6.co # Public URL for video links fetch_timeout: 30 # HTTP timeout for API/VTT requests rate_limit_delay: 0.5 # Delay between video ingestions (seconds) + poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) # Stream B: New Library Pipeline new_pipeline: diff --git a/lib/acquisition/peertube.py b/lib/acquisition/peertube.py new file mode 100644 index 0000000..43b9a9a --- /dev/null +++ b/lib/acquisition/peertube.py @@ -0,0 +1,224 @@ +""" +RECON PeerTube Acquisition Module + +Polls PeerTube for new video transcripts and writes them as flat file pairs +into data/acquired/stream/ for the dispatcher to pick up. + +Does NOT touch the database — that's transcript_processor's job. +""" +import json +import os +import time + +from lib.peertube_scraper import get_videos, get_captions, fetch_vtt, vtt_to_text, _get_pt_config +from lib.utils import content_hash, get_config, setup_logging + +logger = setup_logging("recon.acquisition.peertube") + + +def _build_known_sets(db): + """Build sets of known UUIDs and titles from catalogue. + + Queries catalogue once per batch for dedup against both cohorts: + - URL-path rows: extract UUID from https://stream.echo6.co/w/{uuid} + - Library-path rows: extract title from filename column + """ + conn = db._get_conn() + rows = conn.execute( + "SELECT path, filename FROM catalogue WHERE source = 'stream.echo6.co'" + ).fetchall() + known_uuids = set() + known_titles = set() + for row in rows: + path = row['path'] or '' + if '/w/' in path: + known_uuids.add(path.rsplit('/w/', 1)[-1]) + fname = row['filename'] or '' + if fname.endswith('.txt'): + known_titles.add(fname[:-4]) + else: + known_titles.add(fname) + return known_uuids, known_titles + + +def list_new_videos(db, config=None): + """Find PeerTube videos with captions not yet in catalogue. + + Returns list of (video_dict, caption_path) tuples for videos that have + captions and are not in the known UUID or title sets. + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + rate_delay = ptc.get('rate_limit_delay', 0.5) + + known_uuids, known_titles = _build_known_sets(db) + + videos = get_videos(config=config) + new_videos = [] + checked = 0 + + for video in videos: + if video['uuid'] in known_uuids: + continue + if video['name'] in known_titles: + continue + + # Rate limit caption API calls + if checked > 0: + time.sleep(rate_delay) + checked += 1 + + try: + captions = get_captions(video['uuid'], config) + except Exception as e: + logger.warning("[peertube] Failed to get captions for %s: %s", + video['uuid'][:8], e) + continue + + if not captions: + continue + + # Prefer English caption + caption_path = None + for c in captions: + if c.get('language', {}).get('id') == 'en': + caption_path = c['captionPath'] + break + if caption_path is None: + caption_path = captions[0]['captionPath'] + + new_videos.append((video, caption_path)) + + return new_videos + + +def acquire_one(video, caption_path, config=None): + """Fetch transcript and write to hopper as flat files. + + Returns hash string on success, None on skip/error. + Does NOT touch the database — that's transcript_processor's job. + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + + pipeline_cfg = config.get('pipeline', {}) + hopper_dir = os.path.join( + pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired'), + 'stream' + ) + os.makedirs(hopper_dir, exist_ok=True) + + uuid = video['uuid'] + + # Fetch and convert VTT + vtt_content = fetch_vtt(caption_path, config) + text, cue_timestamps = vtt_to_text(vtt_content) + + if not text or len(text.strip()) < 50: + logger.debug("[peertube] Transcript too short for %s (%s): %d chars", + video['name'], uuid, len(text) if text else 0) + return None + + # Write text to temp file, hash it, then rename to final name + tmp_txt = os.path.join(hopper_dir, f'{uuid}.txt.tmp') + with open(tmp_txt, 'w', encoding='utf-8') as f: + f.write(text) + + file_hash = content_hash(tmp_txt) + + # Check if final file already exists (race condition guard) + final_txt = os.path.join(hopper_dir, f'{file_hash}.txt') + final_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json') + if os.path.exists(final_txt): + os.remove(tmp_txt) + logger.debug("[peertube] Hopper file already exists: %s", file_hash[:8]) + return None + + # Build sidecar metadata + video_url = f"{ptc['public_url']}/w/{uuid}" + meta = { + 'title': video['name'], + 'source_url': video_url, + 'url': video_url, + 'source': 'stream.echo6.co', + 'source_type': 'transcript', + 'category': 'Transcript', + 'channel': video.get('channel_display', ''), + 'duration': video.get('duration', 0), + 'uuid': uuid, + 'cue_timestamps': cue_timestamps, + } + + # Write meta to tmp, then rename both atomically + # Meta first, then content — dispatcher only picks up when content file exists + tmp_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json.tmp') + with open(tmp_meta, 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + os.rename(tmp_meta, final_meta) + os.rename(tmp_txt, final_txt) + + logger.info("[peertube] Acquired: %s (%s) -> %s", + video['name'], uuid[:8], file_hash[:12]) + return file_hash + + +def acquire_batch(db, config=None): + """One-shot: find new videos and acquire them. + + Returns dict: {'acquired': N, 'skipped': N, 'errors': N} + """ + if config is None: + config = get_config() + ptc = _get_pt_config(config) + rate_delay = ptc.get('rate_limit_delay', 0.5) + + result = {'acquired': 0, 'skipped': 0, 'errors': 0} + + try: + new_videos = list_new_videos(db, config) + except Exception as e: + logger.error("[peertube] Failed to list new videos: %s", e, exc_info=True) + result['errors'] = 1 + return result + + if not new_videos: + logger.debug("[peertube] No new videos found") + return result + + logger.info("[peertube] Found %d new videos to acquire", len(new_videos)) + + for i, (video, caption_path) in enumerate(new_videos): + if i > 0: + time.sleep(rate_delay) + try: + file_hash = acquire_one(video, caption_path, config) + if file_hash: + result['acquired'] += 1 + else: + result['skipped'] += 1 + except Exception as e: + logger.error("[peertube] Error acquiring %s (%s): %s", + video['name'], video['uuid'][:8], e, exc_info=True) + result['errors'] += 1 + + return result + + +def acquisition_loop(stop_event, db, config, interval=1800): + """Service loop: poll PeerTube for new transcripts every interval seconds.""" + logger.info("[peertube] Acquisition loop started (interval: %ds)", interval) + while not stop_event.is_set(): + try: + result = acquire_batch(db, config) + if result['acquired']: + logger.info("[peertube] Acquired %d new transcripts (%d skipped, %d errors)", + result['acquired'], result['skipped'], result['errors']) + else: + logger.debug("[peertube] No new transcripts") + except Exception as e: + logger.error("[peertube] Error: %s", e, exc_info=True) + stop_event.wait(interval) + logger.info("[peertube] Acquisition loop stopped") diff --git a/recon.py b/recon.py index 0619ac0..47dda7d 100755 --- a/recon.py +++ b/recon.py @@ -609,6 +609,7 @@ def cmd_service(args): from lib.api import app, run_server as start_dashboard from lib.dispatcher import dispatch_loop from lib.filing import filing_worker_loop + from lib.acquisition.peertube import acquisition_loop config = get_config() proc = config.get('processing', {}) @@ -683,6 +684,9 @@ def cmd_service(args): args=('embed', lambda: run_embedding(workers=embed_workers))), threading.Thread(target=lambda: filing_worker_loop(stop_event, db, config, interval=filing_interval), daemon=True, name='filing'), + threading.Thread(target=lambda: acquisition_loop(stop_event, db, config, + interval=config.get("peertube", {}).get("poll_interval", 1800)), + daemon=True, name="peertube-acq"), threading.Thread(target=progress_loop, daemon=True, name='progress'), threading.Thread(target=lambda: start_dashboard(stop_event), daemon=True, name='dashboard'), @@ -692,6 +696,8 @@ def cmd_service(args): logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") + pt_interval = config.get("peertube", {}).get("poll_interval", 1800) + logger.info(f" PeerTube acquisition: every {pt_interval}s") logger.info(f" Progress: every {progress_interval}s") for t in threads: @@ -721,7 +727,9 @@ def cmd_service(args): return 0 def cmd_ingest_peertube(args): - from lib.peertube_scraper import ingest_channel, ingest_all, get_instance_stats + from lib.peertube_scraper import get_instance_stats + from lib.acquisition.peertube import acquire_batch + from lib.status import StatusDB if args.stats: stats = get_instance_stats() @@ -734,36 +742,20 @@ def cmd_ingest_peertube(args): print(f" {status}: {count}") return 0 - since = args.since if hasattr(args, 'since') and args.since else None + db = StatusDB() - if args.channel: - print(f"Ingesting PeerTube channel: {args.channel}") - result = ingest_channel(args.channel, since=since) - else: - print("Ingesting all PeerTube videos with captions") - result = ingest_all(since=since) + print("Acquiring PeerTube transcripts to hopper...") + result = acquire_batch(db) - summary = result.get('summary', {}) print(f"\nResults:") - print(f" Checked: {summary.get('total_checked', 0)}") - print(f" Ingested: {summary.get('ingested', 0)} ({summary.get('total_pages', 0)} pages)") - print(f" No captions: {summary.get('skipped_no_captions', 0)}") - print(f" Duplicates: {summary.get('skipped_duplicate', 0)}") - print(f" Failed: {summary.get('failed', 0)}") + print(f" Acquired: {result['acquired']}") + print(f" Skipped: {result['skipped']}") + print(f" Errors: {result['errors']}") - # Optional: run enrichment - if args.enrich or args.process: - print("\nRunning enrichment on extracted content...") - from lib.enricher import run_enrichment - enriched = run_enrichment() - print(f" Enriched: {enriched}") - - # Optional: run embedding too - if args.process: - print("\nRunning embedding...") - from lib.embedder import run_embedding - embedded = run_embedding() - print(f" Embedded: {embedded}") + if result['acquired']: + print(f"\n{result['acquired']} transcript(s) staged in hopper.") + print("The dispatcher will pick them up on its next cycle.") + print("Run 'recon status' to monitor progress.") return 0 @@ -1144,12 +1136,8 @@ def main(): p.set_defaults(func=cmd_organize) # ingest-peertube - p = sub.add_parser('ingest-peertube', help='Ingest PeerTube video transcripts') - p.add_argument('--channel', help='Ingest specific channel (actor_name)') - p.add_argument('--since', help='Only ingest videos published after this date (ISO format)') + p = sub.add_parser('ingest-peertube', help='Acquire PeerTube transcripts to hopper') p.add_argument('--stats', action='store_true', help='Show PeerTube instance stats') - p.add_argument('--enrich', action='store_true', help='Run enrichment after ingestion') - p.add_argument('--process', action='store_true', help='Full pipeline: ingest + enrich + embed') p.set_defaults(func=cmd_ingest_peertube) # ingest