From 7e42528d2fbeb5b6ad731c8c7e62f747eb949189 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 15 Apr 2026 03:15:41 +0000 Subject: [PATCH] Phase 6e: rewire dashboard PeerTube endpoint to acquisition module Replace legacy ingest_channel/ingest_all imports with acquire_batch from lib.acquisition.peertube. The endpoint now writes flat file pairs to the hopper and lets the dispatcher handle processing, matching the Phase 6d architecture. Removes channel/since/process parameters that were tied to the old direct-ingest path. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 61 +++++++++++++++++++----------------------------------- 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/lib/api.py b/lib/api.py index fa72c3f..421c54e 100644 --- a/lib/api.py +++ b/lib/api.py @@ -629,62 +629,43 @@ _peertube_results = {} @app.route('/api/ingest-peertube', methods=['POST']) def api_ingest_peertube(): - """Ingest PeerTube video transcripts.""" - data = request.get_json() or {} - channel = data.get('channel') - since = data.get('since') - process = data.get('process', False) + """Acquire new PeerTube transcripts into the hopper. - from .peertube_scraper import ingest_channel, ingest_all + Uses the acquisition module to find new videos and write flat file pairs + to data/acquired/stream/. The dispatcher picks them up automatically. + """ + from .acquisition.peertube import acquire_batch - job_id = f"pt_{hash(channel or 'all') & 0xFFFFFFFF:08x}_{int(__import__('time').time())}" + job_id = f"pt_{int(__import__('time').time()):08x}" - def _run_ingest(): + def _run_acquire(): try: - _peertube_results[job_id] = {'status': 'running', 'stage': 'ingesting', - 'channel': channel or 'all'} - if channel: - result = ingest_channel(channel, since=since) - else: - result = ingest_all(since=since) + _peertube_results[job_id] = {'status': 'running', 'stage': 'acquiring'} + db = StatusDB() + result = acquire_batch(db) - summary = result.get('summary', {}) _peertube_results[job_id] = { - 'status': 'running', 'stage': 'enriching', - 'channel': channel or 'all', 'ingest_summary': summary, + 'status': 'complete', + 'acquired': result['acquired'], + 'skipped': result['skipped'], + 'errors': result['errors'], + 'message': f"Acquired {result['acquired']} transcript(s). Dispatcher will process them." } - - if process: - logger.info(f"PeerTube {job_id}: ingestion done, running enrichment...") - from .enricher import run_enrichment - enriched = run_enrichment() - logger.info(f"PeerTube {job_id}: enriched {enriched} documents") - - _peertube_results[job_id]['stage'] = 'embedding' - from .embedder import run_embedding - embedded = run_embedding() - logger.info(f"PeerTube {job_id}: embedded {embedded} documents") - - summary['enriched'] = enriched - summary['embedded'] = embedded - - result['status'] = 'complete' - _peertube_results[job_id] = result + logger.info("PeerTube acquire %s: acquired=%d skipped=%d errors=%d", + job_id, result['acquired'], result['skipped'], result['errors']) except Exception as e: - logger.error(f"PeerTube ingestion {job_id} failed: {e}", exc_info=True) + logger.error("PeerTube acquire %s failed: %s", job_id, e, exc_info=True) _peertube_results[job_id] = {'error': str(e), 'status': 'failed'} - _peertube_results[job_id] = {'status': 'running', 'stage': 'starting', - 'channel': channel or 'all'} - t = threading.Thread(target=_run_ingest, daemon=True) + _peertube_results[job_id] = {'status': 'running', 'stage': 'starting'} + t = threading.Thread(target=_run_acquire, daemon=True) t.start() return jsonify({ 'job_id': job_id, 'status': 'started', - 'channel': channel or 'all', - 'message': f'PeerTube ingestion started. Check /api/ingest-peertube/{job_id}/status' + 'message': f'PeerTube acquisition started. Check /api/ingest-peertube/{job_id}/status' }), 202