mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 06:34:40 +02:00
Current state of the pipeline code as of 2026-04-14 (Phase 1 scaffolding complete). Config has new_pipeline.enabled=false and crawler.sites=[] per refactor plan. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
580 lines
17 KiB
Python
580 lines
17 KiB
Python
"""
|
|
RECON PeerTube Scraper — Video transcript ingestion.
|
|
|
|
Fetches WebVTT captions from a PeerTube instance, converts to plain text,
|
|
chunks into pages, and feeds into the standard RECON enrichment pipeline.
|
|
|
|
Output format matches lib/web_scraper.py so the enricher and embedder
|
|
process transcript content identically to web content.
|
|
"""
|
|
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import os
|
|
import bisect
|
|
import re
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import quote
|
|
|
|
import requests
|
|
import webvtt
|
|
|
|
from .utils import get_config, setup_logging
|
|
from .status import StatusDB
|
|
from .web_scraper import chunk_text
|
|
|
|
logger = setup_logging('recon.peertube_scraper')
|
|
|
|
# Module-level stop flag — set by service thread for graceful shutdown
|
|
_stop_check = None
|
|
|
|
def set_stop_check(fn):
|
|
"""Register a callable that returns True when shutdown is requested."""
|
|
global _stop_check
|
|
_stop_check = fn
|
|
|
|
# Defaults (overridden by config.yaml peertube section)
|
|
DEFAULT_API_BASE = 'http://192.168.1.170'
|
|
DEFAULT_PUBLIC_URL = 'https://stream.echo6.co'
|
|
DEFAULT_FETCH_TIMEOUT = 30
|
|
DEFAULT_RATE_LIMIT_DELAY = 0.5
|
|
|
|
|
|
def _get_pt_config(config=None):
|
|
"""Get PeerTube settings from config, with defaults."""
|
|
if config is None:
|
|
config = get_config()
|
|
pt = config.get('peertube', {})
|
|
return {
|
|
'api_base': pt.get('api_base', DEFAULT_API_BASE),
|
|
'public_url': pt.get('public_url', DEFAULT_PUBLIC_URL),
|
|
'fetch_timeout': pt.get('fetch_timeout', DEFAULT_FETCH_TIMEOUT),
|
|
'rate_limit_delay': pt.get('rate_limit_delay', DEFAULT_RATE_LIMIT_DELAY),
|
|
}
|
|
|
|
|
|
def _api_get(path, config=None, params=None):
|
|
"""Make a GET request to the PeerTube API."""
|
|
ptc = _get_pt_config(config)
|
|
url = f"{ptc['api_base']}{path}"
|
|
resp = requests.get(url, params=params, timeout=ptc['fetch_timeout'])
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def get_videos(channel=None, since=None, config=None):
|
|
"""
|
|
Paginate through all published videos on the PeerTube instance.
|
|
|
|
Args:
|
|
channel: Filter to this channel actor_name (e.g., 'mental-outlaw')
|
|
since: ISO date string — only return videos published after this date
|
|
config: RECON config dict
|
|
|
|
Returns list of video dicts with: uuid, name, duration,
|
|
channel.name, channel.displayName, publishedAt, description.
|
|
"""
|
|
ptc = _get_pt_config(config)
|
|
videos = []
|
|
start = 0
|
|
count = 100 # PeerTube supports up to 100 per page
|
|
|
|
while True:
|
|
if channel:
|
|
path = f"/api/v1/video-channels/{channel}/videos"
|
|
else:
|
|
path = "/api/v1/videos"
|
|
|
|
data = _api_get(path, config, params={
|
|
'count': count,
|
|
'start': start,
|
|
'sort': '-publishedAt',
|
|
})
|
|
|
|
total = data.get('total', 0)
|
|
batch = data.get('data', [])
|
|
|
|
if not batch:
|
|
break
|
|
|
|
for v in batch:
|
|
published = v.get('publishedAt', '')
|
|
|
|
# Filter by since date
|
|
if since and published < since:
|
|
# Videos are sorted by publishedAt desc, so once we pass
|
|
# the since threshold, all remaining are older — stop
|
|
return videos
|
|
|
|
videos.append({
|
|
'uuid': v['uuid'],
|
|
'name': v['name'],
|
|
'duration': v.get('duration', 0),
|
|
'channel_name': v.get('channel', {}).get('name', ''),
|
|
'channel_display': v.get('channel', {}).get('displayName', ''),
|
|
'publishedAt': published,
|
|
'description': (v.get('description') or '')[:500],
|
|
})
|
|
|
|
start += count
|
|
if start >= total:
|
|
break
|
|
|
|
# Check for shutdown during pagination
|
|
if _stop_check and _stop_check():
|
|
logger.info(f"Shutdown requested during video listing — returning {len(videos)} collected so far")
|
|
return videos
|
|
|
|
# Rate limit pagination requests
|
|
time.sleep(ptc['rate_limit_delay'])
|
|
|
|
return videos
|
|
|
|
|
|
def get_captions(uuid, config=None):
|
|
"""Get caption list for a video. Returns list of caption dicts."""
|
|
data = _api_get(f"/api/v1/videos/{uuid}/captions", config)
|
|
return data.get('data', [])
|
|
|
|
|
|
def fetch_vtt(caption_path, config=None):
|
|
"""Fetch raw VTT file content from PeerTube."""
|
|
ptc = _get_pt_config(config)
|
|
url = f"{ptc['api_base']}{caption_path}"
|
|
resp = requests.get(url, timeout=ptc['fetch_timeout'])
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
|
|
|
|
|
|
def _parse_vtt_time(time_str):
|
|
"""Parse VTT timestamp string (HH:MM:SS.mmm or MM:SS.mmm) to seconds."""
|
|
parts = time_str.split(':')
|
|
if len(parts) == 3:
|
|
h, m, s = parts
|
|
return int(h) * 3600 + int(m) * 60 + float(s)
|
|
elif len(parts) == 2:
|
|
m, s = parts
|
|
return int(m) * 60 + float(s)
|
|
return 0.0
|
|
|
|
|
|
def vtt_to_text(vtt_content):
|
|
"""
|
|
Convert WebVTT content to clean plain text with timestamp tracking.
|
|
|
|
Strips timestamps, de-duplicates consecutive identical cues (common with
|
|
Whisper output), removes HTML tags, and joins cues with spaces (not
|
|
newlines — Whisper cues break mid-sentence).
|
|
|
|
Returns (text, cue_timestamps) where:
|
|
- text: clean prose string
|
|
- cue_timestamps: list of (start_seconds, char_offset) tuples tracking
|
|
where each VTT cue begins in the output text
|
|
"""
|
|
buf = io.StringIO(vtt_content)
|
|
try:
|
|
captions = webvtt.read_buffer(buf)
|
|
except Exception:
|
|
# Fallback: manual regex parse if webvtt-py fails
|
|
return _vtt_to_text_fallback(vtt_content)
|
|
|
|
prev_text = None
|
|
segments = []
|
|
raw_timestamps = [] # (start_seconds, segment_index)
|
|
|
|
for caption in captions:
|
|
text = caption.text.strip()
|
|
if not text:
|
|
continue
|
|
|
|
# Strip HTML tags
|
|
text = re.sub(r'<[^>]+>', '', text)
|
|
|
|
# De-duplicate consecutive identical cues
|
|
if text == prev_text:
|
|
continue
|
|
prev_text = text
|
|
|
|
start_seconds = _parse_vtt_time(caption.start)
|
|
raw_timestamps.append((start_seconds, len(segments)))
|
|
segments.append(text)
|
|
|
|
# Join with spaces — VTT cues break mid-sentence
|
|
raw = ' '.join(segments)
|
|
|
|
# Clean up double spaces and whitespace
|
|
raw = re.sub(r'\s+', ' ', raw).strip()
|
|
|
|
# Compute char offsets for each tracked segment
|
|
seg_offsets = []
|
|
pos = 0
|
|
for i, seg in enumerate(segments):
|
|
seg_offsets.append(pos)
|
|
pos += len(seg) + 1 # +1 for space separator
|
|
|
|
cue_timestamps = []
|
|
for start_secs, seg_idx in raw_timestamps:
|
|
if seg_idx < len(seg_offsets):
|
|
cue_timestamps.append((start_secs, seg_offsets[seg_idx]))
|
|
|
|
return raw, cue_timestamps
|
|
|
|
|
|
def _vtt_to_text_fallback(vtt_content):
|
|
"""Regex-based VTT parser as fallback. Returns (text, cue_timestamps)."""
|
|
lines = vtt_content.split('\n')
|
|
prev_text = None
|
|
segments = []
|
|
raw_timestamps = []
|
|
last_time = 0.0
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line or line == 'WEBVTT':
|
|
continue
|
|
if '-->' in line:
|
|
# Parse start time from "00:01:23.456 --> 00:01:25.789"
|
|
time_part = line.split('-->')[0].strip()
|
|
last_time = _parse_vtt_time(time_part)
|
|
continue
|
|
if line.isdigit():
|
|
continue
|
|
|
|
text = re.sub(r'<[^>]+>', '', line)
|
|
if text == prev_text:
|
|
continue
|
|
prev_text = text
|
|
raw_timestamps.append((last_time, len(segments)))
|
|
segments.append(text)
|
|
|
|
raw = ' '.join(segments)
|
|
raw = re.sub(r'\s+', ' ', raw).strip()
|
|
|
|
# Compute char offsets
|
|
seg_offsets = []
|
|
pos = 0
|
|
for seg in segments:
|
|
seg_offsets.append(pos)
|
|
pos += len(seg) + 1
|
|
|
|
cue_timestamps = []
|
|
for start_secs, seg_idx in raw_timestamps:
|
|
if seg_idx < len(seg_offsets):
|
|
cue_timestamps.append((start_secs, seg_offsets[seg_idx]))
|
|
|
|
return raw, cue_timestamps
|
|
|
|
|
|
|
|
def _map_page_timestamps(pages, full_text, cue_timestamps):
|
|
"""
|
|
Map page numbers to video timestamps.
|
|
|
|
For each page, finds its approximate start position in the full text,
|
|
then looks up the nearest VTT cue timestamp via binary search.
|
|
|
|
Returns dict: {"page_0001": 0.0, "page_0002": 312.5, ...}
|
|
"""
|
|
if not cue_timestamps:
|
|
return {}
|
|
|
|
offsets = [ct[1] for ct in cue_timestamps]
|
|
times = [ct[0] for ct in cue_timestamps]
|
|
|
|
page_ts = {}
|
|
search_start = 0
|
|
|
|
for i, page_text in enumerate(pages):
|
|
page_name = f"page_{i+1:04d}"
|
|
|
|
# Find where this page starts in the full text
|
|
snippet = page_text[:200].strip()
|
|
pos = full_text.find(snippet, search_start)
|
|
if pos < 0:
|
|
pos = search_start # fallback
|
|
|
|
# Binary search for nearest cue at or before this position
|
|
idx = bisect.bisect_right(offsets, pos) - 1
|
|
if idx < 0:
|
|
idx = 0
|
|
|
|
page_ts[page_name] = round(times[idx], 1)
|
|
search_start = pos + len(snippet)
|
|
|
|
return page_ts
|
|
|
|
def _content_hash(text):
|
|
"""MD5 hash of text content — same as web_scraper."""
|
|
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def ingest_video(uuid, video_meta, config=None):
|
|
"""
|
|
Ingest a single PeerTube video transcript.
|
|
|
|
Fetches captions, converts VTT to text, chunks into pages,
|
|
saves to data/text/{hash}/, and sets status to 'extracted'.
|
|
|
|
Args:
|
|
uuid: Video UUID
|
|
video_meta: Dict with name, duration, channel_name, channel_display,
|
|
publishedAt, description
|
|
config: RECON config dict
|
|
|
|
Returns dict with hash, status, title, page_count — or None if no captions.
|
|
"""
|
|
if config is None:
|
|
config = get_config()
|
|
ptc = _get_pt_config(config)
|
|
db = StatusDB()
|
|
|
|
# Get captions
|
|
captions = get_captions(uuid, config)
|
|
if not captions:
|
|
return None
|
|
|
|
# Prefer English caption
|
|
caption = None
|
|
for c in captions:
|
|
if c.get('language', {}).get('id') == 'en':
|
|
caption = c
|
|
break
|
|
if caption is None:
|
|
caption = captions[0]
|
|
|
|
# Fetch VTT
|
|
vtt_content = fetch_vtt(caption['captionPath'], config)
|
|
|
|
# Convert to plain text with timestamp tracking
|
|
text, cue_timestamps = vtt_to_text(vtt_content)
|
|
if not text or len(text) < 50:
|
|
logger.warning(f"Transcript too short for {video_meta['name']} ({uuid}): {len(text)} chars")
|
|
return None
|
|
|
|
# Hash the text content
|
|
doc_hash = _content_hash(text)
|
|
|
|
# Check for duplicate
|
|
conn = db._get_conn()
|
|
existing = conn.execute("SELECT * FROM catalogue WHERE hash = ?", (doc_hash,)).fetchone()
|
|
if existing:
|
|
doc = db.get_document(doc_hash)
|
|
existing_status = doc['status'] if doc else existing['status']
|
|
logger.debug(f"Duplicate transcript (hash {doc_hash[:12]}...) — {video_meta['name']}")
|
|
return {
|
|
'hash': doc_hash,
|
|
'status': 'duplicate',
|
|
'title': video_meta['name'],
|
|
'existing_status': existing_status,
|
|
}
|
|
|
|
# Chunk into pages
|
|
words_per_page = config.get('web_scraper', {}).get('words_per_page', 2000)
|
|
pages = chunk_text(text, words_per_page)
|
|
|
|
# Compute page-to-timestamp mapping
|
|
page_timestamps = _map_page_timestamps(pages, text, cue_timestamps)
|
|
|
|
# Save text files
|
|
text_dir = os.path.join(config['paths']['text'], doc_hash)
|
|
os.makedirs(text_dir, exist_ok=True)
|
|
|
|
for i, page_text in enumerate(pages, 1):
|
|
page_file = os.path.join(text_dir, f"page_{i:04d}.txt")
|
|
with open(page_file, 'w', encoding='utf-8') as f:
|
|
f.write(page_text)
|
|
|
|
# Save meta.json
|
|
video_url = f"{ptc['public_url']}/w/{uuid}"
|
|
meta = {
|
|
'hash': doc_hash,
|
|
'source_type': 'transcript',
|
|
'url': video_url,
|
|
'title': video_meta['name'],
|
|
'author': video_meta.get('channel_display', ''),
|
|
'channel': video_meta.get('channel_name', ''),
|
|
'duration': video_meta.get('duration', 0),
|
|
'date': video_meta.get('publishedAt', ''),
|
|
'description': video_meta.get('description', ''),
|
|
'sitename': 'stream.echo6.co',
|
|
'page_count': len(pages),
|
|
'text_length': len(text),
|
|
'page_timestamps': page_timestamps,
|
|
'fetched_at': datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
with open(os.path.join(text_dir, 'meta.json'), 'w') as f:
|
|
json.dump(meta, f, indent=2)
|
|
|
|
# Display filename for catalogue
|
|
display_name = re.sub(r'[^\w\s._-]', '', video_meta['name'])[:200].strip()
|
|
if not display_name:
|
|
display_name = uuid
|
|
|
|
# Add to catalogue
|
|
db.add_to_catalogue(
|
|
doc_hash, display_name, video_url,
|
|
len(text), 'stream.echo6.co', video_meta.get('channel_name', 'unknown')
|
|
)
|
|
|
|
# Queue + advance to extracted
|
|
db.queue_document(doc_hash)
|
|
db.update_status(doc_hash, 'extracted',
|
|
page_count=len(pages),
|
|
pages_extracted=len(pages),
|
|
book_title=video_meta['name'],
|
|
book_author=video_meta.get('channel_display', ''))
|
|
|
|
logger.info(
|
|
f"Ingested transcript: {video_meta['name']} ({uuid[:8]}...) "
|
|
f"-> {doc_hash[:12]}... ({len(pages)} pages, {len(text)} chars)"
|
|
)
|
|
|
|
return {
|
|
'hash': doc_hash,
|
|
'status': 'extracted',
|
|
'title': video_meta['name'],
|
|
'page_count': len(pages),
|
|
'text_length': len(text),
|
|
'page_timestamps': page_timestamps,
|
|
'channel': video_meta.get('channel_name', ''),
|
|
'duration': video_meta.get('duration', 0),
|
|
'url': video_url,
|
|
}
|
|
|
|
|
|
def ingest_channel(channel_name, config=None, since=None):
|
|
"""
|
|
Ingest all captioned videos from a specific channel.
|
|
|
|
Returns summary dict.
|
|
"""
|
|
if config is None:
|
|
config = get_config()
|
|
ptc = _get_pt_config(config)
|
|
|
|
logger.info(f"Ingesting channel: {channel_name}")
|
|
videos = get_videos(channel=channel_name, since=since, config=config)
|
|
return _ingest_video_list(videos, config, ptc)
|
|
|
|
|
|
def ingest_all(config=None, since=None):
|
|
"""
|
|
Ingest all captioned videos from the entire PeerTube instance.
|
|
|
|
Returns summary dict.
|
|
"""
|
|
if config is None:
|
|
config = get_config()
|
|
ptc = _get_pt_config(config)
|
|
|
|
logger.info("Ingesting all PeerTube videos with captions")
|
|
videos = get_videos(since=since, config=config)
|
|
return _ingest_video_list(videos, config, ptc)
|
|
|
|
|
|
def _ingest_video_list(videos, config, ptc):
|
|
"""Process a list of videos — shared logic for ingest_channel and ingest_all."""
|
|
results = []
|
|
skipped_no_captions = 0
|
|
skipped_duplicate = 0
|
|
failed = 0
|
|
ingested = 0
|
|
total_pages = 0
|
|
|
|
total = len(videos)
|
|
logger.info(f"Found {total} videos to check for captions")
|
|
|
|
for i, video in enumerate(videos, 1):
|
|
if _stop_check and _stop_check():
|
|
logger.info(f"Shutdown requested — stopping after {i-1}/{total} videos")
|
|
break
|
|
uuid = video['uuid']
|
|
|
|
try:
|
|
result = ingest_video(uuid, video, config)
|
|
|
|
if result is None:
|
|
skipped_no_captions += 1
|
|
elif result['status'] == 'duplicate':
|
|
skipped_duplicate += 1
|
|
else:
|
|
ingested += 1
|
|
total_pages += result.get('page_count', 0)
|
|
results.append(result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{i}/{total}] Failed: {video['name']} ({uuid}) — {e}")
|
|
failed += 1
|
|
|
|
# Check for shutdown
|
|
if _stop_check and _stop_check():
|
|
logger.info(f"Shutdown requested — stopping after {i}/{total} videos")
|
|
break
|
|
|
|
# Rate limit
|
|
if i < total:
|
|
time.sleep(ptc['rate_limit_delay'])
|
|
|
|
# Progress logging every 50 videos
|
|
if i % 50 == 0:
|
|
logger.info(
|
|
f"Progress: {i}/{total} checked — "
|
|
f"{ingested} ingested, {skipped_no_captions} no captions, "
|
|
f"{skipped_duplicate} dupes, {failed} failed"
|
|
)
|
|
|
|
logger.info(
|
|
f"PeerTube ingestion complete: {ingested} ingested ({total_pages} pages), "
|
|
f"{skipped_no_captions} no captions, {skipped_duplicate} duplicates, "
|
|
f"{failed} failed out of {total} videos"
|
|
)
|
|
|
|
return {
|
|
'results': results,
|
|
'summary': {
|
|
'total_checked': total,
|
|
'ingested': ingested,
|
|
'skipped_no_captions': skipped_no_captions,
|
|
'skipped_duplicate': skipped_duplicate,
|
|
'failed': failed,
|
|
'total_pages': total_pages,
|
|
}
|
|
}
|
|
|
|
|
|
def get_instance_stats(config=None):
|
|
"""Get PeerTube instance statistics for the dashboard."""
|
|
if config is None:
|
|
config = get_config()
|
|
db = StatusDB()
|
|
|
|
# Total videos on instance
|
|
try:
|
|
data = _api_get("/api/v1/videos", config, params={'count': 1})
|
|
total_videos = data.get('total', 0)
|
|
except Exception:
|
|
total_videos = 0
|
|
|
|
# Videos ingested into RECON (from catalogue)
|
|
conn = db._get_conn()
|
|
ingested = conn.execute(
|
|
"SELECT count(*) FROM catalogue WHERE source = 'stream.echo6.co'"
|
|
).fetchone()[0]
|
|
|
|
# Status breakdown
|
|
status_rows = conn.execute(
|
|
"SELECT d.status, count(*) as cnt FROM documents d "
|
|
"JOIN catalogue c ON d.hash = c.hash "
|
|
"WHERE c.source = 'stream.echo6.co' "
|
|
"GROUP BY d.status"
|
|
).fetchall()
|
|
status_breakdown = {row['status']: row['cnt'] for row in status_rows}
|
|
|
|
return {
|
|
'total_videos': total_videos,
|
|
'ingested': ingested,
|
|
'status_breakdown': status_breakdown,
|
|
}
|