Phase 6d: PeerTube acquisition module + service thread

New lib/acquisition/peertube.py replaces the removed peertube_scanner_loop.
Polls PeerTube API every 30min, dedupes against catalogue (UUID + title),
writes flat file pairs to data/acquired/stream/ for the dispatcher.

- acquire_batch(): one-shot find-and-acquire with rate limiting
- acquisition_loop(): service thread wrapper (interval from config)
- list_new_videos(): dedup via _build_known_sets() against catalogue
- acquire_one(): fetch VTT, convert, write .tmp then rename atomically

cmd_service(): added peertube-acq daemon thread
cmd_ingest_peertube(): rewired to use acquire_batch(), drops --channel/
  --since/--enrich/--process (dispatcher handles full pipeline)
config.yaml: added peertube.poll_interval: 1800

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-15 03:08:51 +00:00
commit 277110d999
3 changed files with 245 additions and 32 deletions

224
lib/acquisition/peertube.py Normal file
View file

@ -0,0 +1,224 @@
"""
RECON PeerTube Acquisition Module
Polls PeerTube for new video transcripts and writes them as flat file pairs
into data/acquired/stream/ for the dispatcher to pick up.
Does NOT touch the database that's transcript_processor's job.
"""
import json
import os
import time
from lib.peertube_scraper import get_videos, get_captions, fetch_vtt, vtt_to_text, _get_pt_config
from lib.utils import content_hash, get_config, setup_logging
logger = setup_logging("recon.acquisition.peertube")
def _build_known_sets(db):
"""Build sets of known UUIDs and titles from catalogue.
Queries catalogue once per batch for dedup against both cohorts:
- URL-path rows: extract UUID from https://stream.echo6.co/w/{uuid}
- Library-path rows: extract title from filename column
"""
conn = db._get_conn()
rows = conn.execute(
"SELECT path, filename FROM catalogue WHERE source = 'stream.echo6.co'"
).fetchall()
known_uuids = set()
known_titles = set()
for row in rows:
path = row['path'] or ''
if '/w/' in path:
known_uuids.add(path.rsplit('/w/', 1)[-1])
fname = row['filename'] or ''
if fname.endswith('.txt'):
known_titles.add(fname[:-4])
else:
known_titles.add(fname)
return known_uuids, known_titles
def list_new_videos(db, config=None):
"""Find PeerTube videos with captions not yet in catalogue.
Returns list of (video_dict, caption_path) tuples for videos that have
captions and are not in the known UUID or title sets.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
rate_delay = ptc.get('rate_limit_delay', 0.5)
known_uuids, known_titles = _build_known_sets(db)
videos = get_videos(config=config)
new_videos = []
checked = 0
for video in videos:
if video['uuid'] in known_uuids:
continue
if video['name'] in known_titles:
continue
# Rate limit caption API calls
if checked > 0:
time.sleep(rate_delay)
checked += 1
try:
captions = get_captions(video['uuid'], config)
except Exception as e:
logger.warning("[peertube] Failed to get captions for %s: %s",
video['uuid'][:8], e)
continue
if not captions:
continue
# Prefer English caption
caption_path = None
for c in captions:
if c.get('language', {}).get('id') == 'en':
caption_path = c['captionPath']
break
if caption_path is None:
caption_path = captions[0]['captionPath']
new_videos.append((video, caption_path))
return new_videos
def acquire_one(video, caption_path, config=None):
"""Fetch transcript and write to hopper as flat files.
Returns hash string on success, None on skip/error.
Does NOT touch the database that's transcript_processor's job.
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
pipeline_cfg = config.get('pipeline', {})
hopper_dir = os.path.join(
pipeline_cfg.get('acquired_root', '/opt/recon/data/acquired'),
'stream'
)
os.makedirs(hopper_dir, exist_ok=True)
uuid = video['uuid']
# Fetch and convert VTT
vtt_content = fetch_vtt(caption_path, config)
text, cue_timestamps = vtt_to_text(vtt_content)
if not text or len(text.strip()) < 50:
logger.debug("[peertube] Transcript too short for %s (%s): %d chars",
video['name'], uuid, len(text) if text else 0)
return None
# Write text to temp file, hash it, then rename to final name
tmp_txt = os.path.join(hopper_dir, f'{uuid}.txt.tmp')
with open(tmp_txt, 'w', encoding='utf-8') as f:
f.write(text)
file_hash = content_hash(tmp_txt)
# Check if final file already exists (race condition guard)
final_txt = os.path.join(hopper_dir, f'{file_hash}.txt')
final_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json')
if os.path.exists(final_txt):
os.remove(tmp_txt)
logger.debug("[peertube] Hopper file already exists: %s", file_hash[:8])
return None
# Build sidecar metadata
video_url = f"{ptc['public_url']}/w/{uuid}"
meta = {
'title': video['name'],
'source_url': video_url,
'url': video_url,
'source': 'stream.echo6.co',
'source_type': 'transcript',
'category': 'Transcript',
'channel': video.get('channel_display', ''),
'duration': video.get('duration', 0),
'uuid': uuid,
'cue_timestamps': cue_timestamps,
}
# Write meta to tmp, then rename both atomically
# Meta first, then content — dispatcher only picks up when content file exists
tmp_meta = os.path.join(hopper_dir, f'{file_hash}.meta.json.tmp')
with open(tmp_meta, 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
os.rename(tmp_meta, final_meta)
os.rename(tmp_txt, final_txt)
logger.info("[peertube] Acquired: %s (%s) -> %s",
video['name'], uuid[:8], file_hash[:12])
return file_hash
def acquire_batch(db, config=None):
"""One-shot: find new videos and acquire them.
Returns dict: {'acquired': N, 'skipped': N, 'errors': N}
"""
if config is None:
config = get_config()
ptc = _get_pt_config(config)
rate_delay = ptc.get('rate_limit_delay', 0.5)
result = {'acquired': 0, 'skipped': 0, 'errors': 0}
try:
new_videos = list_new_videos(db, config)
except Exception as e:
logger.error("[peertube] Failed to list new videos: %s", e, exc_info=True)
result['errors'] = 1
return result
if not new_videos:
logger.debug("[peertube] No new videos found")
return result
logger.info("[peertube] Found %d new videos to acquire", len(new_videos))
for i, (video, caption_path) in enumerate(new_videos):
if i > 0:
time.sleep(rate_delay)
try:
file_hash = acquire_one(video, caption_path, config)
if file_hash:
result['acquired'] += 1
else:
result['skipped'] += 1
except Exception as e:
logger.error("[peertube] Error acquiring %s (%s): %s",
video['name'], video['uuid'][:8], e, exc_info=True)
result['errors'] += 1
return result
def acquisition_loop(stop_event, db, config, interval=1800):
"""Service loop: poll PeerTube for new transcripts every interval seconds."""
logger.info("[peertube] Acquisition loop started (interval: %ds)", interval)
while not stop_event.is_set():
try:
result = acquire_batch(db, config)
if result['acquired']:
logger.info("[peertube] Acquired %d new transcripts (%d skipped, %d errors)",
result['acquired'], result['skipped'], result['errors'])
else:
logger.debug("[peertube] No new transcripts")
except Exception as e:
logger.error("[peertube] Error: %s", e, exc_info=True)
stop_event.wait(interval)
logger.info("[peertube] Acquisition loop stopped")