From 7fe7d0358309b2802634f9d17b77186e173eb08b Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 03:20:46 +0000 Subject: [PATCH] Revert "Phase 6e: rewire dashboard PeerTube endpoint to acquisition module" This reverts commit 7e42528d2fbeb5b6ad731c8c7e62f747eb949189. --- lib/api.py | 61 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/lib/api.py b/lib/api.py index 421c54e..fa72c3f 100644 --- a/lib/api.py +++ b/lib/api.py @@ -629,43 +629,62 @@ _peertube_results = {} @app.route('/api/ingest-peertube', methods=['POST']) def api_ingest_peertube(): - """Acquire new PeerTube transcripts into the hopper. + """Ingest PeerTube video transcripts.""" + data = request.get_json() or {} + channel = data.get('channel') + since = data.get('since') + process = data.get('process', False) - Uses the acquisition module to find new videos and write flat file pairs - to data/acquired/stream/. The dispatcher picks them up automatically. - """ - from .acquisition.peertube import acquire_batch + from .peertube_scraper import ingest_channel, ingest_all - job_id = f"pt_{int(__import__('time').time()):08x}" + job_id = f"pt_{hash(channel or 'all') & 0xFFFFFFFF:08x}_{int(__import__('time').time())}" - def _run_acquire(): + def _run_ingest(): try: - _peertube_results[job_id] = {'status': 'running', 'stage': 'acquiring'} - db = StatusDB() - result = acquire_batch(db) + _peertube_results[job_id] = {'status': 'running', 'stage': 'ingesting', + 'channel': channel or 'all'} + if channel: + result = ingest_channel(channel, since=since) + else: + result = ingest_all(since=since) + summary = result.get('summary', {}) _peertube_results[job_id] = { - 'status': 'complete', - 'acquired': result['acquired'], - 'skipped': result['skipped'], - 'errors': result['errors'], - 'message': f"Acquired {result['acquired']} transcript(s). Dispatcher will process them." + 'status': 'running', 'stage': 'enriching', + 'channel': channel or 'all', 'ingest_summary': summary, } - logger.info("PeerTube acquire %s: acquired=%d skipped=%d errors=%d", - job_id, result['acquired'], result['skipped'], result['errors']) + + if process: + logger.info(f"PeerTube {job_id}: ingestion done, running enrichment...") + from .enricher import run_enrichment + enriched = run_enrichment() + logger.info(f"PeerTube {job_id}: enriched {enriched} documents") + + _peertube_results[job_id]['stage'] = 'embedding' + from .embedder import run_embedding + embedded = run_embedding() + logger.info(f"PeerTube {job_id}: embedded {embedded} documents") + + summary['enriched'] = enriched + summary['embedded'] = embedded + + result['status'] = 'complete' + _peertube_results[job_id] = result except Exception as e: - logger.error("PeerTube acquire %s failed: %s", job_id, e, exc_info=True) + logger.error(f"PeerTube ingestion {job_id} failed: {e}", exc_info=True) _peertube_results[job_id] = {'error': str(e), 'status': 'failed'} - _peertube_results[job_id] = {'status': 'running', 'stage': 'starting'} - t = threading.Thread(target=_run_acquire, daemon=True) + _peertube_results[job_id] = {'status': 'running', 'stage': 'starting', + 'channel': channel or 'all'} + t = threading.Thread(target=_run_ingest, daemon=True) t.start() return jsonify({ 'job_id': job_id, 'status': 'started', - 'message': f'PeerTube acquisition started. Check /api/ingest-peertube/{job_id}/status' + 'channel': channel or 'all', + 'message': f'PeerTube ingestion started. Check /api/ingest-peertube/{job_id}/status' }), 202