Revert "Phase 6e: rewire dashboard PeerTube endpoint to acquisition module"

This reverts commit 7e42528d2f.
This commit is contained in:
Matt 2026-04-15 03:20:46 +00:00
commit 7fe7d03583

View file

@ -629,43 +629,62 @@ _peertube_results = {}
@app.route('/api/ingest-peertube', methods=['POST']) @app.route('/api/ingest-peertube', methods=['POST'])
def api_ingest_peertube(): def api_ingest_peertube():
"""Acquire new PeerTube transcripts into the hopper. """Ingest PeerTube video transcripts."""
data = request.get_json() or {}
channel = data.get('channel')
since = data.get('since')
process = data.get('process', False)
Uses the acquisition module to find new videos and write flat file pairs from .peertube_scraper import ingest_channel, ingest_all
to data/acquired/stream/. The dispatcher picks them up automatically.
"""
from .acquisition.peertube import acquire_batch
job_id = f"pt_{int(__import__('time').time()):08x}" job_id = f"pt_{hash(channel or 'all') & 0xFFFFFFFF:08x}_{int(__import__('time').time())}"
def _run_acquire(): def _run_ingest():
try: try:
_peertube_results[job_id] = {'status': 'running', 'stage': 'acquiring'} _peertube_results[job_id] = {'status': 'running', 'stage': 'ingesting',
db = StatusDB() 'channel': channel or 'all'}
result = acquire_batch(db) if channel:
result = ingest_channel(channel, since=since)
else:
result = ingest_all(since=since)
summary = result.get('summary', {})
_peertube_results[job_id] = { _peertube_results[job_id] = {
'status': 'complete', 'status': 'running', 'stage': 'enriching',
'acquired': result['acquired'], 'channel': channel or 'all', 'ingest_summary': summary,
'skipped': result['skipped'],
'errors': result['errors'],
'message': f"Acquired {result['acquired']} transcript(s). Dispatcher will process them."
} }
logger.info("PeerTube acquire %s: acquired=%d skipped=%d errors=%d",
job_id, result['acquired'], result['skipped'], result['errors']) if process:
logger.info(f"PeerTube {job_id}: ingestion done, running enrichment...")
from .enricher import run_enrichment
enriched = run_enrichment()
logger.info(f"PeerTube {job_id}: enriched {enriched} documents")
_peertube_results[job_id]['stage'] = 'embedding'
from .embedder import run_embedding
embedded = run_embedding()
logger.info(f"PeerTube {job_id}: embedded {embedded} documents")
summary['enriched'] = enriched
summary['embedded'] = embedded
result['status'] = 'complete'
_peertube_results[job_id] = result
except Exception as e: except Exception as e:
logger.error("PeerTube acquire %s failed: %s", job_id, e, exc_info=True) logger.error(f"PeerTube ingestion {job_id} failed: {e}", exc_info=True)
_peertube_results[job_id] = {'error': str(e), 'status': 'failed'} _peertube_results[job_id] = {'error': str(e), 'status': 'failed'}
_peertube_results[job_id] = {'status': 'running', 'stage': 'starting'} _peertube_results[job_id] = {'status': 'running', 'stage': 'starting',
t = threading.Thread(target=_run_acquire, daemon=True) 'channel': channel or 'all'}
t = threading.Thread(target=_run_ingest, daemon=True)
t.start() t.start()
return jsonify({ return jsonify({
'job_id': job_id, 'job_id': job_id,
'status': 'started', 'status': 'started',
'message': f'PeerTube acquisition started. Check /api/ingest-peertube/{job_id}/status' 'channel': channel or 'all',
'message': f'PeerTube ingestion started. Check /api/ingest-peertube/{job_id}/status'
}), 202 }), 202