Phase 5: assign-categories CLI commands

Adds assign-categories subcommand with flags:
  --backfill     Pass 1 domain assignment for all complete stream docs
  --tiebreaker-pass  Resolve ties via channel concept analysis
  --push-pending Push assigned categories to PeerTube API (staged via --limit)
  --reprocess-missing  Re-queue items with missing/legacy concepts
  --dry-run      Preview without writes (enhanced for reprocess: shows
                 concept dir existence and file counts)
  --limit N      Cap processing count

Includes pre-deletion audit logging for --reprocess-missing (logs path,
file count, and hash before each shutil.rmtree).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-28 00:05:19 +00:00
commit a273b52c7e

172
recon.py
View file

@ -863,6 +863,168 @@ def cmd_ingest(args):
return 0 return 0
def cmd_assign_categories(args):
"""Assign RECON domains to PeerTube videos and push categories."""
from lib.domain_assigner import compute_assignment, run_tiebreaker_pass
from lib.peertube_writer import push_pending, extract_uuid
from lib.recon_domains import DOMAIN_CATEGORY_MAP
config = get_config()
db = StatusDB()
dry_run = args.dry_run
limit = args.limit
if args.backfill:
# Pass 1: assign domains to all complete stream docs with no assignment
conn = db._get_conn()
q = """SELECT d.hash FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.status = 'complete'
AND d.recon_domain IS NULL
AND c.source = 'stream.echo6.co'
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
rows = conn.execute(q).fetchall()
hashes = [r['hash'] for r in rows]
if not hashes:
print("No unassigned complete stream documents found")
return 0
print(f"Backfill: processing {len(hashes)} documents" +
(" [DRY RUN]" if dry_run else ""))
stats = {'assigned': 0, 'tied_pass_1': 0, 'needs_reprocess': 0, 'errors': 0}
for i, file_hash in enumerate(hashes):
try:
domain, status = compute_assignment(file_hash, db, config)
stats[status] = stats.get(status, 0) + 1
if not dry_run:
db.set_domain_assignment(file_hash, domain, status)
if (i + 1) % 1000 == 0:
print(f" Progress: {i + 1}/{len(hashes)}")
except Exception as e:
stats['errors'] += 1
logger.warning(f" Assignment error for {file_hash[:12]}: {e}")
print(f"\nBackfill results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.tiebreaker_pass:
if dry_run:
items = db.get_items_by_domain_status('tied_pass_1')
print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]")
return 0
stats = run_tiebreaker_pass(db, config)
print(f"\nTiebreaker results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.push_pending:
if dry_run:
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
print(f"Push pending: {len(items)} items would be pushed [DRY RUN]")
return 0
success, failed = push_pending(db, config, limit=limit)
print(f"\nPush results: {success} succeeded, {failed} failed")
return 0
elif args.reprocess_missing:
items = db.get_items_by_domain_status('needs_reprocess', limit=limit)
if not items:
print("No items with needs_reprocess status")
return 0
print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else ""))
requeued = 0
for item in items:
file_hash = item['hash']
if dry_run:
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
has_concepts = os.path.isdir(concepts_dir)
concept_count = len(os.listdir(concepts_dir)) if has_concepts else 0
detail = f"DELETE {concept_count} concept files" if has_concepts else "no concept dir"
print(f" Would reprocess: {file_hash[:12]}{item.get('filename', '?')} ({detail})")
requeued += 1
continue
# Remove stale concept files
import shutil
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
if os.path.isdir(concepts_dir):
logger.info(f" Deleting concept dir: {concepts_dir} "
f"({len(os.listdir(concepts_dir))} files, hash={file_hash})")
shutil.rmtree(concepts_dir)
# Reset document status to allow re-processing
conn = db._get_conn()
conn.execute(
"""UPDATE documents SET
status = 'catalogued',
concepts_extracted = 0,
vectors_inserted = 0,
recon_domain = NULL,
recon_domain_status = NULL,
recon_domain_assigned_at = NULL,
peertube_category_pushed_at = NULL,
error_message = NULL,
extracted_at = NULL,
enriched_at = NULL,
embedded_at = NULL
WHERE hash = ?""",
(file_hash,)
)
conn.commit()
# Re-queue for pipeline processing
db.queue_document(file_hash)
requeued += 1
print(f"Requeued {requeued} items for reprocessing")
return 0
else:
# Default: show domain assignment status
status_counts = db.get_domain_status_counts()
domain_dist = db.get_domain_distribution()
conn = db._get_conn()
total_stream = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'"""
).fetchone()['cnt']
unassigned = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'
AND d.recon_domain IS NULL"""
).fetchone()['cnt']
unpushed = len(db.get_unpushed_assignments())
print("=== Domain Assignment Status ===\n")
print(f"Total complete stream docs: {total_stream}")
print(f"Unassigned: {unassigned}")
print(f"Unpushed to PeerTube: {unpushed}")
if status_counts:
print(f"\nAssignment status breakdown:")
for status, cnt in sorted(status_counts.items()):
print(f" {status:<20s} {cnt:>6d}")
if domain_dist:
print(f"\nDomain distribution:")
for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]):
print(f" {domain:<35s} {cnt:>6d}")
return 0
def cmd_pipeline(args): def cmd_pipeline(args):
"""Stream B library pipeline: status, migrate, reverse, watch, sweep.""" """Stream B library pipeline: status, migrate, reverse, watch, sweep."""
from lib.new_pipeline import ( from lib.new_pipeline import (
@ -1158,6 +1320,16 @@ def main():
p.set_defaults(func=cmd_ingest) p.set_defaults(func=cmd_ingest)
# assign-categories
p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos')
p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs')
p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis')
p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API')
p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items')
p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing')
p.add_argument('--limit', type=int, help='Limit number of items to process')
p.set_defaults(func=cmd_assign_categories)
# pipeline (Stream B) # pipeline (Stream B)
p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)') p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)')
p.add_argument('pipeline_action', nargs='?', default='status', p.add_argument('pipeline_action', nargs='?', default='status',