diff --git a/docs/domain-assignment.md b/docs/domain-assignment.md index 394a652..8651419 100644 --- a/docs/domain-assignment.md +++ b/docs/domain-assignment.md @@ -2,7 +2,13 @@ ## Overview -RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concepts extracted from its transcript. Assignments are pushed to PeerTube as category metadata via a custom plugin. +RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin. + +## Data Source + +Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue. + +Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data. ## Algorithm @@ -10,9 +16,9 @@ RECON's domain assignment feature maps each PeerTube video to one of 18 knowledg Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`. -1. Read all `data/concepts/{hash}/window_*.json` files -2. Count domain occurrences across all concepts, filtering to `VALID_DOMAINS` only (skips legacy domains) -3. If no valid concepts → `needs_reprocess` +1. Query Qdrant for all points with `doc_hash` matching the document +2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only +3. If zero concept vectors → `no_concepts` (terminal) 4. If single top domain → `assigned` 5. If tied → `tied_pass_1` (deferred to tiebreaker) @@ -22,20 +28,13 @@ Runs via `assign-categories --tiebreaker-pass`. For each `tied_pass_1` document: -1. Identify the tied domains +1. Identify the tied domains from Qdrant 2. Look up the document's channel (`catalogue.category`) 3. **Mega-channel rule:** If channel has >500 videos, skip tiebreaking → `tied_manual` -4. Read concept files for all other videos in the same channel +4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter) 5. Among the tied domains only, pick the one with the highest channel-wide concept count 6. If resolved → `tied_pass_2` -7. If still tied → proceed to pass 3 - -### Pass 3: Defensive Re-Run - -If pass 2 does not resolve the tie, re-read the same channel concept files and re-run identical counting logic. This catches concept-file changes that occurred mid-run (e.g. concurrent enrichment writing new windows during the batch). In steady state, pass 3 produces the same result as pass 2, but under concurrent writes it can resolve a tie that pass 2 missed. - -- If resolved → `tied_pass_2` (same status — the column tracks "channel scan resolved it") -- If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) +7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) ### Mega-Channel Rule @@ -43,14 +42,15 @@ Channels with >500 videos (like the "Transcript" catch-all with ~9,200 videos) a ## Status Values -| Status | Meaning | Next Action | -|--------|---------|-------------| -| `assigned` | Clear winner from pass 1 | Push to PeerTube | -| `tied_pass_1` | Concept tie, awaiting tiebreaker | Run `--tiebreaker-pass` | -| `tied_pass_2` | Resolved by channel tiebreaker | Push to PeerTube | -| `tied_manual` | Needs human review | Review at `/peertube/review` | -| `needs_reprocess` | Missing concepts or only legacy domains | Run `--reprocess-missing` | -| `manual_assigned` | Human override from dashboard | Already pushed | +| Status | Meaning | Terminal? | Next Action | +|--------|---------|-----------|-------------| +| `assigned` | Clear winner from pass 1 | No | Push to PeerTube | +| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` | +| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube | +| `tied_manual` | Needs human review | No | Review at `/peertube/review` | +| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) | +| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` | +| `manual_assigned` | Human override from dashboard | No | Already pushed | **"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}` @@ -72,7 +72,7 @@ python3 recon.py assign-categories --tiebreaker-pass # Push all assigned-but-unpushed categories to PeerTube API python3 recon.py assign-categories --push-pending -# Re-queue items with missing/legacy concepts +# Re-queue items with transient failures for full re-processing python3 recon.py assign-categories --reprocess-missing # Limit processing count @@ -87,25 +87,26 @@ The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items - Dropdown to select the correct domain - Assign button (pushes to PeerTube immediately) -Items with `needs_reprocess` status do NOT appear in the review UI — they are handled exclusively via the CLI `--reprocess-missing` command. +Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI. ## Pipeline Integration New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`: -1. Runs `compute_assignment()` (pass 1 only) +1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client 2. If clear winner: pushes category to PeerTube immediately 3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run -4. On error: logs warning and continues — does not block the pipeline +4. If no concepts: marks as `no_concepts` (terminal) +5. On Qdrant error: logs warning and continues — does not block the pipeline ## Source Files | File | Purpose | |------|---------| | `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS | -| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` | +| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers | | `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` | -| `lib/embedder.py` | Post-embed hook | +| `lib/embedder.py` | Post-embed hook (passes qdrant client) | | `lib/status.py` | DB columns + helper methods | | `lib/api.py` | Dashboard review routes | | `recon.py` | CLI `assign-categories` command | diff --git a/lib/domain_assigner.py b/lib/domain_assigner.py index c00894f..a6f0f47 100644 --- a/lib/domain_assigner.py +++ b/lib/domain_assigner.py @@ -1,24 +1,30 @@ """ RECON Domain Assigner -Computes per-video domain assignments from concept extraction results. +Computes per-video domain assignments from Qdrant vector payloads. Two functions, two execution modes: compute_assignment() — pass 1, inline from post-embed hook run_tiebreaker_pass() — batch, resolves ties via channel concept scan +Data source: Qdrant `domain` payload field on concept vectors. +Previously read on-disk concept JSON files; migrated to Qdrant as +single source of truth (2026-04-28). + Status values written to documents.recon_domain_status: assigned — clear winner from pass 1 concept count tied_pass_1 — concept tie, awaiting channel tiebreaker tied_pass_2 — resolved by channel tiebreaker tied_manual — needs human review (dashboard) - needs_reprocess — missing concepts or only legacy domains + no_concepts — terminal, zero concept vectors in Qdrant + needs_reprocess — transient failure (Qdrant error, etc.) manual_assigned — human override from dashboard """ -import json -import os from collections import Counter +from qdrant_client import QdrantClient +from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny + from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP from .utils import setup_logging @@ -28,40 +34,51 @@ logger = setup_logging('recon.domain_assigner') MEGA_CHANNEL_THRESHOLD = 500 -def _count_concept_domains(concepts_dir, file_hash): - """Read concept files and count valid domain occurrences. +def _get_qdrant_client(config): + """Create a QdrantClient from RECON config. + + Callers should create one client and pass it through rather than + calling this repeatedly. + """ + logger.debug("Creating new QdrantClient (caller did not pass one)") + return QdrantClient( + host=config['vector_db']['host'], + port=config['vector_db']['port'], + timeout=60 + ) + + +def _count_domains_from_qdrant(qdrant, collection, doc_hash): + """Count valid domain occurrences for a single document from Qdrant. + + Scrolls all points matching doc_hash and counts domain values. Args: - concepts_dir: Base concepts directory (e.g. /opt/recon/data/concepts) - file_hash: Document hash + qdrant: QdrantClient instance + collection: Qdrant collection name + doc_hash: Document hash to query Returns: - Counter of {domain_name: count} for valid domains only, - or None if no concept directory exists. + Counter of {domain_name: count} for valid domains. + Empty Counter if no points found (never None). """ - doc_concepts_dir = os.path.join(concepts_dir, file_hash) - if not os.path.isdir(doc_concepts_dir): - return None - domain_counter = Counter() + offset = None - for fname in os.listdir(doc_concepts_dir): - if not fname.startswith('window_') or not fname.endswith('.json'): - continue - fpath = os.path.join(doc_concepts_dir, fname) - try: - with open(fpath, 'r') as f: - concepts = json.load(f) - except (json.JSONDecodeError, OSError): - continue + while True: + results, next_offset = qdrant.scroll( + collection_name=collection, + scroll_filter=Filter(must=[ + FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash)) + ]), + with_payload=["domain"], + with_vectors=False, + limit=200, + offset=offset, + ) - if not isinstance(concepts, list): - continue - - for concept in concepts: - if not isinstance(concept, dict): - continue - dom = concept.get('domain') + for point in results: + dom = point.payload.get('domain') if isinstance(dom, str) and dom in VALID_DOMAINS: domain_counter[dom] += 1 elif isinstance(dom, list): @@ -69,30 +86,95 @@ def _count_concept_domains(concepts_dir, file_hash): if isinstance(d, str) and d in VALID_DOMAINS: domain_counter[d] += 1 + if next_offset is None: + break + offset = next_offset + return domain_counter -def compute_assignment(file_hash, db, config): +def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes): + """Count valid domain occurrences across multiple documents from Qdrant. + + Single scroll with MatchAny filter, with offset pagination for large + result sets. + + Args: + qdrant: QdrantClient instance + collection: Qdrant collection name + doc_hashes: List of document hashes to query + + Returns: + Counter of {domain_name: count} aggregated across all matching points. + """ + if not doc_hashes: + return Counter() + + domain_counter = Counter() + offset = None + + while True: + results, next_offset = qdrant.scroll( + collection_name=collection, + scroll_filter=Filter(must=[ + FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes)) + ]), + with_payload=["domain"], + with_vectors=False, + limit=10000, + offset=offset, + ) + + for point in results: + dom = point.payload.get('domain') + if isinstance(dom, str) and dom in VALID_DOMAINS: + domain_counter[dom] += 1 + elif isinstance(dom, list): + for d in dom: + if isinstance(d, str) and d in VALID_DOMAINS: + domain_counter[d] += 1 + + if next_offset is None: + break + offset = next_offset + + return domain_counter + + +def compute_assignment(file_hash, db, config, qdrant=None): """Compute domain assignment for a single document (pass 1). - Counts domain occurrences across all concepts. If a single domain - wins, assigns it. If tied, defers to batch tiebreaker. + Counts domain occurrences across all concept vectors in Qdrant. + If a single domain wins, assigns it. If tied, defers to batch + tiebreaker. Args: file_hash: Document hash db: StatusDB instance config: RECON config dict + qdrant: Optional QdrantClient (created if not provided) Returns: (domain, status) tuple where domain is a string or None, - and status is one of: 'assigned', 'tied_pass_1', 'needs_reprocess' + and status is one of: 'assigned', 'tied_pass_1', 'no_concepts', + 'needs_reprocess' """ - concepts_dir = config['paths']['concepts'] - domain_counter = _count_concept_domains(concepts_dir, file_hash) + owns_client = False + if qdrant is None: + qdrant = _get_qdrant_client(config) + owns_client = True - if domain_counter is None or len(domain_counter) == 0: + collection = config['vector_db']['collection'] + + try: + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) + except Exception as e: + logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}") return (None, 'needs_reprocess') + if len(domain_counter) == 0: + return (None, 'no_concepts') + top = domain_counter.most_common(2) top_domain = top[0][0] top_count = top[0][1] @@ -104,9 +186,9 @@ def compute_assignment(file_hash, db, config): return (None, 'tied_pass_1') -def _get_tied_domains(concepts_dir, file_hash): +def _get_tied_domains(qdrant, collection, file_hash): """Get the set of domains tied for first place in a document's concepts.""" - domain_counter = _count_concept_domains(concepts_dir, file_hash) + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) if not domain_counter: return [] @@ -150,32 +232,32 @@ def _channel_video_count(db, channel_name): return row['cnt'] if row else 0 -def run_tiebreaker_pass(db, config): - """Resolve tied domain assignments using channel-level concept analysis. +def run_tiebreaker_pass(db, config, qdrant=None): + """Resolve tied domain assignments using channel-level Qdrant analysis. Processes all documents where recon_domain_status = 'tied_pass_1'. - Pass 2: For each tied document, reads concept files from all other - videos in the same channel and picks the tied domain with the highest - channel-wide count. + For each tied document, queries Qdrant for domain counts from all + other videos in the same channel and picks the tied domain with the + highest channel-wide count. - Pass 3 (defensive re-run): Re-reads the same channel concept files a - second time with identical logic. This catches concept-file changes - that occurred mid-run (e.g. concurrent enrichment writing new windows). - In steady state pass 3 produces the same result as pass 2, but under - concurrent writes it can resolve a tie that pass 2 missed. - - Mega-channels (>500 videos) skip both passes and go straight to + Mega-channels (>500 videos) skip tiebreaking and go straight to 'tied_manual' for dashboard review. Args: db: StatusDB instance config: RECON config dict + qdrant: Optional QdrantClient (created if not provided) Returns: Dict with counts: resolved, manual, skipped, errors """ - concepts_dir = config['paths']['concepts'] + owns_client = False + if qdrant is None: + qdrant = _get_qdrant_client(config) + owns_client = True + + collection = config['vector_db']['collection'] tied_items = db.get_items_by_domain_status('tied_pass_1') stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)} @@ -189,9 +271,9 @@ def run_tiebreaker_pass(db, config): channel = item.get('category', '') try: - tied_domains = _get_tied_domains(concepts_dir, file_hash) + tied_domains = _get_tied_domains(qdrant, collection, file_hash) if not tied_domains: - db.set_domain_assignment(file_hash, None, 'needs_reprocess') + db.set_domain_assignment(file_hash, None, 'no_concepts') stats['skipped'] += 1 continue @@ -215,12 +297,9 @@ def run_tiebreaker_pass(db, config): # Channel tiebreaker: count domains across all other videos in channel other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash) - channel_domain_counts = Counter() - - for other_hash in other_hashes: - other_counts = _count_concept_domains(concepts_dir, other_hash) - if other_counts: - channel_domain_counts.update(other_counts) + channel_domain_counts = _count_domains_from_qdrant_batch( + qdrant, collection, other_hashes + ) # Among tied domains only, pick highest channel-wide count best_domain = None @@ -231,48 +310,21 @@ def run_tiebreaker_pass(db, config): best_count = c best_domain = dom - # Pass 2: check if channel tiebreaker resolved it + # Check if channel tiebreaker resolved it tied_at_channel = [d for d in tied_domains if channel_domain_counts.get(d, 0) == best_count] if len(tied_at_channel) == 1: db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2') stats['resolved'] += 1 - logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (pass 2 channel tiebreaker)") + logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)") continue - # Pass 3: defensive re-run — re-count channel concepts to catch - # concept-file changes that occurred mid-run. Identical logic to - # pass 2; resolves races where files were written between the - # two reads. - channel_domain_counts_p3 = Counter() - for other_hash in other_hashes: - other_counts = _count_concept_domains(concepts_dir, other_hash) - if other_counts: - channel_domain_counts_p3.update(other_counts) - - best_domain_p3 = None - best_count_p3 = -1 - for dom in tied_domains: - c = channel_domain_counts_p3.get(dom, 0) - if c > best_count_p3: - best_count_p3 = c - best_domain_p3 = dom - - tied_at_p3 = [d for d in tied_domains - if channel_domain_counts_p3.get(d, 0) == best_count_p3] - - if len(tied_at_p3) == 1: - db.set_domain_assignment(file_hash, best_domain_p3, 'tied_pass_2') - stats['resolved'] += 1 - logger.debug(f" {file_hash[:12]}: resolved → {best_domain_p3} (pass 3 defensive re-run)") - continue - - # Still tied after pass 3 — mark for manual review + # Still tied after channel scan — mark for manual review fallback = sorted(tied_domains)[0] db.set_domain_assignment(file_hash, fallback, 'tied_manual') stats['manual'] += 1 - logger.debug(f" {file_hash[:12]}: still tied after pass 3, → tied_manual") + logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual") except Exception as e: logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}") diff --git a/lib/embedder.py b/lib/embedder.py index b1f59ca..da6a952 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -411,7 +411,7 @@ def embed_single(file_hash, db, config): from .domain_assigner import compute_assignment from .peertube_writer import push_category, extract_uuid from .recon_domains import DOMAIN_CATEGORY_MAP - domain, status = compute_assignment(file_hash, db, config) + domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant) db.set_domain_assignment(file_hash, domain, status) if domain and status == 'assigned': cat_id = DOMAIN_CATEGORY_MAP[domain] diff --git a/recon.py b/recon.py index cad106a..81c8a81 100755 --- a/recon.py +++ b/recon.py @@ -865,6 +865,7 @@ def cmd_ingest(args): def cmd_assign_categories(args): """Assign RECON domains to PeerTube videos and push categories.""" + from qdrant_client import QdrantClient from lib.domain_assigner import compute_assignment, run_tiebreaker_pass from lib.peertube_writer import push_pending, extract_uuid from lib.recon_domains import DOMAIN_CATEGORY_MAP @@ -876,11 +877,13 @@ def cmd_assign_categories(args): if args.backfill: # Pass 1: assign domains to all complete stream docs with no assignment + # or that previously got needs_reprocess conn = db._get_conn() q = """SELECT d.hash FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.status = 'complete' - AND d.recon_domain IS NULL + AND (d.recon_domain IS NULL + OR d.recon_domain_status = 'needs_reprocess') AND c.source = 'stream.echo6.co' ORDER BY d.discovered_at""" if limit: @@ -895,10 +898,17 @@ def cmd_assign_categories(args): print(f"Backfill: processing {len(hashes)} documents" + (" [DRY RUN]" if dry_run else "")) - stats = {'assigned': 0, 'tied_pass_1': 0, 'needs_reprocess': 0, 'errors': 0} + # Create one Qdrant client for the entire backfill + qdrant = QdrantClient( + host=config['vector_db']['host'], + port=config['vector_db']['port'], + timeout=60 + ) + + stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0} for i, file_hash in enumerate(hashes): try: - domain, status = compute_assignment(file_hash, db, config) + domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant) stats[status] = stats.get(status, 0) + 1 if not dry_run: db.set_domain_assignment(file_hash, domain, status) @@ -946,22 +956,10 @@ def cmd_assign_categories(args): for item in items: file_hash = item['hash'] if dry_run: - concepts_dir = os.path.join(config['paths']['concepts'], file_hash) - has_concepts = os.path.isdir(concepts_dir) - concept_count = len(os.listdir(concepts_dir)) if has_concepts else 0 - detail = f"DELETE {concept_count} concept files" if has_concepts else "no concept dir" - print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')} ({detail})") + print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')}") requeued += 1 continue - # Remove stale concept files - import shutil - concepts_dir = os.path.join(config['paths']['concepts'], file_hash) - if os.path.isdir(concepts_dir): - logger.info(f" Deleting concept dir: {concepts_dir} " - f"({len(os.listdir(concepts_dir))} files, hash={file_hash})") - shutil.rmtree(concepts_dir) - # Reset document status to allow re-processing conn = db._get_conn() conn.execute(