Phase 6e: rewire dashboard PeerTube endpoint to acquisition module

Replace legacy ingest_channel/ingest_all imports with acquire_batch
from lib.acquisition.peertube. The endpoint now writes flat file pairs
to the hopper and lets the dispatcher handle processing, matching the
Phase 6d architecture. Removes channel/since/process parameters that
were tied to the old direct-ingest path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-15 03:15:41 +00:00
commit 7e42528d2f

View file

@ -629,62 +629,43 @@ _peertube_results = {}
@app.route('/api/ingest-peertube', methods=['POST'])
def api_ingest_peertube():
"""Ingest PeerTube video transcripts."""
data = request.get_json() or {}
channel = data.get('channel')
since = data.get('since')
process = data.get('process', False)
"""Acquire new PeerTube transcripts into the hopper.
from .peertube_scraper import ingest_channel, ingest_all
Uses the acquisition module to find new videos and write flat file pairs
to data/acquired/stream/. The dispatcher picks them up automatically.
"""
from .acquisition.peertube import acquire_batch
job_id = f"pt_{hash(channel or 'all') & 0xFFFFFFFF:08x}_{int(__import__('time').time())}"
job_id = f"pt_{int(__import__('time').time()):08x}"
def _run_ingest():
def _run_acquire():
try:
_peertube_results[job_id] = {'status': 'running', 'stage': 'ingesting',
'channel': channel or 'all'}
if channel:
result = ingest_channel(channel, since=since)
else:
result = ingest_all(since=since)
_peertube_results[job_id] = {'status': 'running', 'stage': 'acquiring'}
db = StatusDB()
result = acquire_batch(db)
summary = result.get('summary', {})
_peertube_results[job_id] = {
'status': 'running', 'stage': 'enriching',
'channel': channel or 'all', 'ingest_summary': summary,
'status': 'complete',
'acquired': result['acquired'],
'skipped': result['skipped'],
'errors': result['errors'],
'message': f"Acquired {result['acquired']} transcript(s). Dispatcher will process them."
}
if process:
logger.info(f"PeerTube {job_id}: ingestion done, running enrichment...")
from .enricher import run_enrichment
enriched = run_enrichment()
logger.info(f"PeerTube {job_id}: enriched {enriched} documents")
_peertube_results[job_id]['stage'] = 'embedding'
from .embedder import run_embedding
embedded = run_embedding()
logger.info(f"PeerTube {job_id}: embedded {embedded} documents")
summary['enriched'] = enriched
summary['embedded'] = embedded
result['status'] = 'complete'
_peertube_results[job_id] = result
logger.info("PeerTube acquire %s: acquired=%d skipped=%d errors=%d",
job_id, result['acquired'], result['skipped'], result['errors'])
except Exception as e:
logger.error(f"PeerTube ingestion {job_id} failed: {e}", exc_info=True)
logger.error("PeerTube acquire %s failed: %s", job_id, e, exc_info=True)
_peertube_results[job_id] = {'error': str(e), 'status': 'failed'}
_peertube_results[job_id] = {'status': 'running', 'stage': 'starting',
'channel': channel or 'all'}
t = threading.Thread(target=_run_ingest, daemon=True)
_peertube_results[job_id] = {'status': 'running', 'stage': 'starting'}
t = threading.Thread(target=_run_acquire, daemon=True)
t.start()
return jsonify({
'job_id': job_id,
'status': 'started',
'channel': channel or 'all',
'message': f'PeerTube ingestion started. Check /api/ingest-peertube/{job_id}/status'
'message': f'PeerTube acquisition started. Check /api/ingest-peertube/{job_id}/status'
}), 202