diff --git a/recon.py b/recon.py index 9635a59..cad106a 100755 --- a/recon.py +++ b/recon.py @@ -863,6 +863,168 @@ def cmd_ingest(args): return 0 +def cmd_assign_categories(args): + """Assign RECON domains to PeerTube videos and push categories.""" + from lib.domain_assigner import compute_assignment, run_tiebreaker_pass + from lib.peertube_writer import push_pending, extract_uuid + from lib.recon_domains import DOMAIN_CATEGORY_MAP + + config = get_config() + db = StatusDB() + dry_run = args.dry_run + limit = args.limit + + if args.backfill: + # Pass 1: assign domains to all complete stream docs with no assignment + conn = db._get_conn() + q = """SELECT d.hash FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.status = 'complete' + AND d.recon_domain IS NULL + AND c.source = 'stream.echo6.co' + ORDER BY d.discovered_at""" + if limit: + q += f" LIMIT {int(limit)}" + rows = conn.execute(q).fetchall() + hashes = [r['hash'] for r in rows] + + if not hashes: + print("No unassigned complete stream documents found") + return 0 + + print(f"Backfill: processing {len(hashes)} documents" + + (" [DRY RUN]" if dry_run else "")) + + stats = {'assigned': 0, 'tied_pass_1': 0, 'needs_reprocess': 0, 'errors': 0} + for i, file_hash in enumerate(hashes): + try: + domain, status = compute_assignment(file_hash, db, config) + stats[status] = stats.get(status, 0) + 1 + if not dry_run: + db.set_domain_assignment(file_hash, domain, status) + if (i + 1) % 1000 == 0: + print(f" Progress: {i + 1}/{len(hashes)}") + except Exception as e: + stats['errors'] += 1 + logger.warning(f" Assignment error for {file_hash[:12]}: {e}") + + print(f"\nBackfill results:") + for k, v in sorted(stats.items()): + print(f" {k}: {v}") + return 0 + + elif args.tiebreaker_pass: + if dry_run: + items = db.get_items_by_domain_status('tied_pass_1') + print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]") + return 0 + stats = run_tiebreaker_pass(db, config) + print(f"\nTiebreaker results:") + for k, v in sorted(stats.items()): + print(f" {k}: {v}") + return 0 + + elif args.push_pending: + if dry_run: + items = db.get_unpushed_assignments() + if limit: + items = items[:limit] + print(f"Push pending: {len(items)} items would be pushed [DRY RUN]") + return 0 + success, failed = push_pending(db, config, limit=limit) + print(f"\nPush results: {success} succeeded, {failed} failed") + return 0 + + elif args.reprocess_missing: + items = db.get_items_by_domain_status('needs_reprocess', limit=limit) + if not items: + print("No items with needs_reprocess status") + return 0 + + print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else "")) + requeued = 0 + for item in items: + file_hash = item['hash'] + if dry_run: + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + has_concepts = os.path.isdir(concepts_dir) + concept_count = len(os.listdir(concepts_dir)) if has_concepts else 0 + detail = f"DELETE {concept_count} concept files" if has_concepts else "no concept dir" + print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')} ({detail})") + requeued += 1 + continue + + # Remove stale concept files + import shutil + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.isdir(concepts_dir): + logger.info(f" Deleting concept dir: {concepts_dir} " + f"({len(os.listdir(concepts_dir))} files, hash={file_hash})") + shutil.rmtree(concepts_dir) + + # Reset document status to allow re-processing + conn = db._get_conn() + conn.execute( + """UPDATE documents SET + status = 'catalogued', + concepts_extracted = 0, + vectors_inserted = 0, + recon_domain = NULL, + recon_domain_status = NULL, + recon_domain_assigned_at = NULL, + peertube_category_pushed_at = NULL, + error_message = NULL, + extracted_at = NULL, + enriched_at = NULL, + embedded_at = NULL + WHERE hash = ?""", + (file_hash,) + ) + conn.commit() + # Re-queue for pipeline processing + db.queue_document(file_hash) + requeued += 1 + + print(f"Requeued {requeued} items for reprocessing") + return 0 + + else: + # Default: show domain assignment status + status_counts = db.get_domain_status_counts() + domain_dist = db.get_domain_distribution() + + conn = db._get_conn() + total_stream = conn.execute( + """SELECT COUNT(*) as cnt FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'""" + ).fetchone()['cnt'] + unassigned = conn.execute( + """SELECT COUNT(*) as cnt FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'stream.echo6.co' AND d.status = 'complete' + AND d.recon_domain IS NULL""" + ).fetchone()['cnt'] + unpushed = len(db.get_unpushed_assignments()) + + print("=== Domain Assignment Status ===\n") + print(f"Total complete stream docs: {total_stream}") + print(f"Unassigned: {unassigned}") + print(f"Unpushed to PeerTube: {unpushed}") + + if status_counts: + print(f"\nAssignment status breakdown:") + for status, cnt in sorted(status_counts.items()): + print(f" {status:<20s} {cnt:>6d}") + + if domain_dist: + print(f"\nDomain distribution:") + for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]): + print(f" {domain:<35s} {cnt:>6d}") + + return 0 + + def cmd_pipeline(args): """Stream B library pipeline: status, migrate, reverse, watch, sweep.""" from lib.new_pipeline import ( @@ -1158,6 +1320,16 @@ def main(): p.set_defaults(func=cmd_ingest) + # assign-categories + p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos') + p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs') + p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis') + p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API') + p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items') + p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing') + p.add_argument('--limit', type=int, help='Limit number of items to process') + p.set_defaults(func=cmd_assign_categories) + # pipeline (Stream B) p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)') p.add_argument('pipeline_action', nargs='?', default='status',