From 6a17df8078bd5b307eddb8de6b61120393d3f8a0 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:06:07 +0000 Subject: [PATCH] Phase 6: post-embed domain assignment hook After a stream.echo6.co video completes embedding, automatically runs compute_assignment (pass 1 only). Clear winners get pushed to PeerTube immediately; ties are marked tied_pass_1 for the batch tiebreaker. Also tags stream docs that hit early-return paths (no concepts, no valid concepts) with needs_reprocess status so they are visible to the --reprocess-missing CLI command. Error handling: domain assignment failure logs a warning but does not block the embedding pipeline. Co-Authored-By: Claude Opus 4.6 --- lib/embedder.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/lib/embedder.py b/lib/embedder.py index e80dad8..b1f59ca 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -255,15 +255,22 @@ def embed_single(file_hash, db, config): if not all_concepts: db.update_status(file_hash, 'complete', vectors_inserted=0) + # Tag stream docs with no concepts for reprocessing + _cat = db._get_conn().execute( + "SELECT source FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if _cat and dict(_cat)['source'] == 'stream.echo6.co': + db.set_domain_assignment(file_hash, None, 'needs_reprocess') logger.info(f"No concepts to embed for {doc['filename']}") return True - # Look up source from catalogue once per doc + # Look up source and path from catalogue once per doc cat_conn = db._get_conn() cat_row = cat_conn.execute( - "SELECT source FROM catalogue WHERE hash = ?", (file_hash,) + "SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,) ).fetchone() source = dict(cat_row)['source'] if cat_row else '' + catalogue_path = dict(cat_row)['path'] if cat_row else '' download_url = '' is_web = doc.get('path', '').startswith(('http://', 'https://')) @@ -315,6 +322,8 @@ def embed_single(file_hash, db, config): if not valid: db.update_status(file_hash, 'complete', vectors_inserted=0) + if source == 'stream.echo6.co': + db.set_domain_assignment(file_hash, None, 'needs_reprocess') logger.info(f"No valid concepts to embed for {doc['filename']}") return True @@ -395,6 +404,28 @@ def embed_single(file_hash, db, config): db.update_status(file_hash, 'complete', vectors_inserted=embedded_count) logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)") + + # Post-embed hook: assign domain for PeerTube videos + if source == 'stream.echo6.co': + try: + from .domain_assigner import compute_assignment + from .peertube_writer import push_category, extract_uuid + from .recon_domains import DOMAIN_CATEGORY_MAP + domain, status = compute_assignment(file_hash, db, config) + db.set_domain_assignment(file_hash, domain, status) + if domain and status == 'assigned': + cat_id = DOMAIN_CATEGORY_MAP[domain] + uuid = extract_uuid(catalogue_path) + if uuid: + pushed, _token = push_category(uuid, cat_id, config) + if pushed: + db.set_peertube_pushed(file_hash) + logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube") + else: + logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending") + except Exception as e: + logger.warning(f"Domain assignment failed for {file_hash}: {e}") + return True except Exception as e: