From 71e3dc12ed205d9855914b788db80e5457b52b35 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:04:13 +0000 Subject: [PATCH 01/14] Phase 1: PeerTube plugin and recon_domains module Single source of truth for the 18 RECON knowledge domains mapped to PeerTube category IDs 100-117. Replaces duplicate VALID_DOMAINS sets in enricher.py and embedder.py with imports from lib/recon_domains.py. Includes PeerTube plugin (peertube-plugin-recon-domains) that registers custom categories via videoCategoryManager.addConstant(), and a parity test to verify constants match between RECON and the PeerTube API. Co-Authored-By: Claude Opus 4.6 --- lib/embedder.py | 8 +-- lib/enricher.py | 8 +-- lib/recon_domains.py | 34 +++++++++ .../peertube-plugin-recon-domains/README.md | 52 ++++++++++++++ .../peertube-plugin-recon-domains/main.js | 28 ++++++++ .../package.json | 20 ++++++ tests/test_constants_parity.py | 71 +++++++++++++++++++ 7 files changed, 207 insertions(+), 14 deletions(-) create mode 100644 lib/recon_domains.py create mode 100644 peertube-plugin/peertube-plugin-recon-domains/README.md create mode 100644 peertube-plugin/peertube-plugin-recon-domains/main.js create mode 100644 peertube-plugin/peertube-plugin-recon-domains/package.json create mode 100644 tests/test_constants_parity.py diff --git a/lib/embedder.py b/lib/embedder.py index 8dcc45a..e80dad8 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -27,13 +27,7 @@ from .utils import resolve_text_dir logger = setup_logging('recon.embedder') # ── Classification allowlists ─────────────────────────────────────────────── -VALID_DOMAINS = { - 'Agriculture & Livestock', 'Civil Organization', 'Communications', - 'Food Systems', 'Foundational Skills', 'Logistics', 'Medical', - 'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage', - 'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment', - 'Vehicles', 'Water Systems', 'Wilderness Skills', -} +from .recon_domains import VALID_DOMAINS VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'} VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'} diff --git a/lib/enricher.py b/lib/enricher.py index e1e583c..f6d46bd 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -42,13 +42,7 @@ logger = setup_logging('recon.enricher') STALE_ENRICHING_HOURS = 2 # ── Classification allowlists ─────────────────────────────────────────────── -VALID_DOMAINS = { - 'Agriculture & Livestock', 'Civil Organization', 'Communications', - 'Food Systems', 'Foundational Skills', 'Logistics', 'Medical', - 'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage', - 'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment', - 'Vehicles', 'Water Systems', 'Wilderness Skills', -} +from .recon_domains import VALID_DOMAINS VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'} VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'} diff --git a/lib/recon_domains.py b/lib/recon_domains.py new file mode 100644 index 0000000..350a05c --- /dev/null +++ b/lib/recon_domains.py @@ -0,0 +1,34 @@ +""" +RECON Domain Taxonomy + +Single source of truth for the 18 knowledge domains and their PeerTube +category ID mappings. IDs 100-117 are registered via the +peertube-plugin-recon-domains plugin. + +Import VALID_DOMAINS from here instead of defining local sets. +""" + +DOMAIN_CATEGORY_MAP = { + 'Agriculture & Livestock': 100, + 'Civil Organization': 101, + 'Communications': 102, + 'Food Systems': 103, + 'Foundational Skills': 104, + 'Logistics': 105, + 'Medical': 106, + 'Navigation': 107, + 'Operations': 108, + 'Power Systems': 109, + 'Preservation & Storage': 110, + 'Security': 111, + 'Shelter & Construction': 112, + 'Technology': 113, + 'Tools & Equipment': 114, + 'Vehicles': 115, + 'Water Systems': 116, + 'Wilderness Skills': 117, +} + +VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys()) + +CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()} diff --git a/peertube-plugin/peertube-plugin-recon-domains/README.md b/peertube-plugin/peertube-plugin-recon-domains/README.md new file mode 100644 index 0000000..88ba035 --- /dev/null +++ b/peertube-plugin/peertube-plugin-recon-domains/README.md @@ -0,0 +1,52 @@ +# peertube-plugin-recon-domains + +Registers 18 RECON knowledge domains as PeerTube video categories using IDs 100-117. These categories are assigned automatically by RECON's domain assignment pipeline based on concept extraction analysis. + +## Category Mapping + +| ID | Domain | +|-----|---------------------------| +| 100 | Agriculture & Livestock | +| 101 | Civil Organization | +| 102 | Communications | +| 103 | Food Systems | +| 104 | Foundational Skills | +| 105 | Logistics | +| 106 | Medical | +| 107 | Navigation | +| 108 | Operations | +| 109 | Power Systems | +| 110 | Preservation & Storage | +| 111 | Security | +| 112 | Shelter & Construction | +| 113 | Technology | +| 114 | Tools & Equipment | +| 115 | Vehicles | +| 116 | Water Systems | +| 117 | Wilderness Skills | + +Built-in PeerTube categories (IDs 1-18) are not modified. + +## Install + +```bash +# Copy plugin to PeerTube storage +cp -r peertube-plugin-recon-domains /var/www/peertube/storage/plugins/node_modules/ + +# Register plugin via API or admin UI +# Admin > Plugins > Install > peertube-plugin-recon-domains + +# Restart PeerTube +sudo systemctl restart peertube +``` + +## Uninstall + +Remove the plugin via PeerTube admin UI or: + +```bash +rm -rf /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains +sudo systemctl restart peertube +``` + +Videos with RECON categories will revert to showing the raw category ID until the plugin is reinstalled. diff --git a/peertube-plugin/peertube-plugin-recon-domains/main.js b/peertube-plugin/peertube-plugin-recon-domains/main.js new file mode 100644 index 0000000..7f7a9cb --- /dev/null +++ b/peertube-plugin/peertube-plugin-recon-domains/main.js @@ -0,0 +1,28 @@ +async function register ({ videoCategoryManager }) { + const reconDomains = { + 100: 'Agriculture & Livestock', + 101: 'Civil Organization', + 102: 'Communications', + 103: 'Food Systems', + 104: 'Foundational Skills', + 105: 'Logistics', + 106: 'Medical', + 107: 'Navigation', + 108: 'Operations', + 109: 'Power Systems', + 110: 'Preservation & Storage', + 111: 'Security', + 112: 'Shelter & Construction', + 113: 'Technology', + 114: 'Tools & Equipment', + 115: 'Vehicles', + 116: 'Water Systems', + 117: 'Wilderness Skills' + } + + for (const [id, label] of Object.entries(reconDomains)) { + videoCategoryManager.addConstant(parseInt(id), label) + } +} + +module.exports = { register } diff --git a/peertube-plugin/peertube-plugin-recon-domains/package.json b/peertube-plugin/peertube-plugin-recon-domains/package.json new file mode 100644 index 0000000..e5788a3 --- /dev/null +++ b/peertube-plugin/peertube-plugin-recon-domains/package.json @@ -0,0 +1,20 @@ +{ + "name": "peertube-plugin-recon-domains", + "version": "1.0.0", + "description": "Registers 18 RECON knowledge domains as PeerTube video categories (IDs 100-117)", + "engine": { + "peertube": ">=6.0.0" + }, + "keywords": [ + "peertube", + "plugin" + ], + "homepage": "https://forge.echo6.co/matt/recon", + "author": "Echo6", + "license": "MIT", + "library": "./main.js", + "staticDirs": {}, + "css": [], + "clientScripts": [], + "translations": {} +} diff --git a/tests/test_constants_parity.py b/tests/test_constants_parity.py new file mode 100644 index 0000000..5be1121 --- /dev/null +++ b/tests/test_constants_parity.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +Parity test: verify RECON domain taxonomy matches PeerTube categories. + +Tests: +1. DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS exactly +2. PeerTube API returns all 18 RECON categories (IDs 100-117) with correct labels + +Usage: + cd /opt/recon && source venv/bin/activate + python3 tests/test_constants_parity.py +""" +import json +import sys +import requests + +# Add parent dir to path for imports +sys.path.insert(0, '/opt/recon') +from lib.recon_domains import DOMAIN_CATEGORY_MAP, VALID_DOMAINS, CATEGORY_DOMAIN_MAP + + +def test_local_parity(): + """Verify DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS.""" + map_keys = set(DOMAIN_CATEGORY_MAP.keys()) + assert map_keys == VALID_DOMAINS, ( + f"Mismatch: in map but not VALID_DOMAINS: {map_keys - VALID_DOMAINS}, " + f"in VALID_DOMAINS but not map: {VALID_DOMAINS - map_keys}" + ) + assert len(CATEGORY_DOMAIN_MAP) == len(DOMAIN_CATEGORY_MAP), "Reverse map size mismatch" + print(f"[OK] Local parity: {len(VALID_DOMAINS)} domains, map keys match VALID_DOMAINS") + + +def test_peertube_categories(): + """Verify PeerTube API returns all 18 RECON categories.""" + url = "http://192.168.1.170:9000/api/v1/videos/categories" + headers = {"Host": "stream.echo6.co"} + + try: + resp = requests.get(url, headers=headers, timeout=10) + resp.raise_for_status() + except Exception as e: + print(f"[SKIP] PeerTube API unreachable: {e}") + return + + categories = resp.json() # dict of {id_str: label} + + missing = [] + wrong_label = [] + for domain, cat_id in DOMAIN_CATEGORY_MAP.items(): + cat_str = str(cat_id) + if cat_str not in categories: + missing.append((cat_id, domain)) + elif categories[cat_str] != domain: + wrong_label.append((cat_id, domain, categories[cat_str])) + + if missing: + print(f"[FAIL] Missing categories in PeerTube: {missing}") + print(" Deploy peertube-plugin-recon-domains to CT 110 first") + sys.exit(1) + + if wrong_label: + print(f"[FAIL] Wrong labels in PeerTube: {wrong_label}") + sys.exit(1) + + print(f"[OK] PeerTube parity: all {len(DOMAIN_CATEGORY_MAP)} categories present with correct labels") + + +if __name__ == '__main__': + test_local_parity() + test_peertube_categories() + print("\nAll parity tests passed.") From 5f36c52fb156f754f0819f4a20da3f300114a911 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:04:29 +0000 Subject: [PATCH 02/14] Phase 2: documents table schema for domain assignment Adds four columns to documents table via idempotent ALTER TABLE migrations: recon_domain, recon_domain_status, recon_domain_assigned_at, peertube_category_pushed_at. Adds index on recon_domain_status. Includes StatusDB helper methods: get/set_domain_assignment, set_peertube_pushed, get_unpushed_assignments, get_items_by_domain_status, get_domain_status_counts, get_domain_distribution. Co-Authored-By: Claude Opus 4.6 --- lib/status.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/lib/status.py b/lib/status.py index 974cabd..c710846 100644 --- a/lib/status.py +++ b/lib/status.py @@ -124,6 +124,22 @@ class StatusDB: except Exception: pass # column already exists + # Migration: domain assignment columns for PeerTube categorization + for col, coltype in [ + ('recon_domain', 'TEXT'), + ('recon_domain_status', 'TEXT'), + ('recon_domain_assigned_at', 'TEXT'), + ('peertube_category_pushed_at', 'TEXT'), + ]: + try: + conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}") + except Exception: + pass # column already exists + try: + conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)") + except Exception: + pass # index already exists + # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -448,7 +464,91 @@ class StatusDB: conn.commit() - # ── Scraper Job Helpers ───────────────────────────────────── + # ── Domain Assignment Helpers ──────────────────��───────────── + + def get_domain_assignment(self, file_hash): + """Get domain assignment for a document. + + Returns: + (recon_domain, recon_domain_status) tuple, or (None, None) if not set. + """ + conn = self._get_conn() + row = conn.execute( + "SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?", + (file_hash,) + ).fetchone() + if row: + return (row['recon_domain'], row['recon_domain_status']) + return (None, None) + + def set_domain_assignment(self, file_hash, domain, status): + """Set domain assignment and status for a document.""" + conn = self._get_conn() + conn.execute( + "UPDATE documents SET recon_domain = ?, recon_domain_status = ?, " + "recon_domain_assigned_at = ? WHERE hash = ?", + (domain, status, datetime.now(timezone.utc).isoformat(), file_hash) + ) + conn.commit() + + def set_peertube_pushed(self, file_hash): + """Mark a document's category as pushed to PeerTube.""" + conn = self._get_conn() + conn.execute( + "UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?", + (datetime.now(timezone.utc).isoformat(), file_hash) + ) + conn.commit() + + def get_unpushed_assignments(self): + """Get documents with domain assignments not yet pushed to PeerTube. + + Only returns stream.echo6.co source documents. + """ + conn = self._get_conn() + return [dict(r) for r in conn.execute( + """SELECT d.*, c.source, c.category, c.path as catalogue_path + FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.recon_domain IS NOT NULL + AND d.peertube_category_pushed_at IS NULL + AND c.source = 'stream.echo6.co' + ORDER BY d.recon_domain_assigned_at""", + ).fetchall()] + + def get_items_by_domain_status(self, status, limit=None): + """Get documents by domain assignment status.""" + conn = self._get_conn() + q = """SELECT d.*, c.source, c.category, c.path as catalogue_path + FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.recon_domain_status = ? + ORDER BY d.discovered_at""" + if limit: + q += f" LIMIT {int(limit)}" + return [dict(r) for r in conn.execute(q, (status,)).fetchall()] + + def get_domain_status_counts(self): + """Get counts of documents by domain assignment status.""" + conn = self._get_conn() + return {row['recon_domain_status']: row['cnt'] + for row in conn.execute( + "SELECT recon_domain_status, COUNT(*) as cnt " + "FROM documents WHERE recon_domain_status IS NOT NULL " + "GROUP BY recon_domain_status" + ).fetchall()} + + def get_domain_distribution(self): + """Get counts of documents per assigned domain.""" + conn = self._get_conn() + return {row['recon_domain']: row['cnt'] + for row in conn.execute( + "SELECT recon_domain, COUNT(*) as cnt " + "FROM documents WHERE recon_domain IS NOT NULL " + "GROUP BY recon_domain ORDER BY cnt DESC" + ).fetchall()} + + # ── Scraper Job Helpers ───────────────────────────────────── def get_pending_scrape_job(self): """Fetch the oldest pending scrape job.""" From 8ab1f8c82fadd6b98f160e8c647cc4bcbc9a944e Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:04:37 +0000 Subject: [PATCH 03/14] Phase 3: compute_assignment and run_tiebreaker_pass Domain assigner module with two functions: - compute_assignment(): pass 1 concept domain count, inline per-document - run_tiebreaker_pass(): batch channel-level tiebreaker (pass 2 + pass 3 defensive re-run), mega-channel rule (>500 videos skip to tied_manual) Filters legacy domains (Sustainment Systems, Off-Grid Systems, Defense & Tactics) from concept counts automatically. Co-Authored-By: Claude Opus 4.6 --- lib/domain_assigner.py | 284 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 lib/domain_assigner.py diff --git a/lib/domain_assigner.py b/lib/domain_assigner.py new file mode 100644 index 0000000..c00894f --- /dev/null +++ b/lib/domain_assigner.py @@ -0,0 +1,284 @@ +""" +RECON Domain Assigner + +Computes per-video domain assignments from concept extraction results. +Two functions, two execution modes: + + compute_assignment() — pass 1, inline from post-embed hook + run_tiebreaker_pass() — batch, resolves ties via channel concept scan + +Status values written to documents.recon_domain_status: + assigned — clear winner from pass 1 concept count + tied_pass_1 — concept tie, awaiting channel tiebreaker + tied_pass_2 — resolved by channel tiebreaker + tied_manual — needs human review (dashboard) + needs_reprocess — missing concepts or only legacy domains + manual_assigned — human override from dashboard +""" +import json +import os +from collections import Counter + +from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP +from .utils import setup_logging + +logger = setup_logging('recon.domain_assigner') + +# Channels with more than this many videos skip channel tiebreaking entirely +MEGA_CHANNEL_THRESHOLD = 500 + + +def _count_concept_domains(concepts_dir, file_hash): + """Read concept files and count valid domain occurrences. + + Args: + concepts_dir: Base concepts directory (e.g. /opt/recon/data/concepts) + file_hash: Document hash + + Returns: + Counter of {domain_name: count} for valid domains only, + or None if no concept directory exists. + """ + doc_concepts_dir = os.path.join(concepts_dir, file_hash) + if not os.path.isdir(doc_concepts_dir): + return None + + domain_counter = Counter() + + for fname in os.listdir(doc_concepts_dir): + if not fname.startswith('window_') or not fname.endswith('.json'): + continue + fpath = os.path.join(doc_concepts_dir, fname) + try: + with open(fpath, 'r') as f: + concepts = json.load(f) + except (json.JSONDecodeError, OSError): + continue + + if not isinstance(concepts, list): + continue + + for concept in concepts: + if not isinstance(concept, dict): + continue + dom = concept.get('domain') + if isinstance(dom, str) and dom in VALID_DOMAINS: + domain_counter[dom] += 1 + elif isinstance(dom, list): + for d in dom: + if isinstance(d, str) and d in VALID_DOMAINS: + domain_counter[d] += 1 + + return domain_counter + + +def compute_assignment(file_hash, db, config): + """Compute domain assignment for a single document (pass 1). + + Counts domain occurrences across all concepts. If a single domain + wins, assigns it. If tied, defers to batch tiebreaker. + + Args: + file_hash: Document hash + db: StatusDB instance + config: RECON config dict + + Returns: + (domain, status) tuple where domain is a string or None, + and status is one of: 'assigned', 'tied_pass_1', 'needs_reprocess' + """ + concepts_dir = config['paths']['concepts'] + domain_counter = _count_concept_domains(concepts_dir, file_hash) + + if domain_counter is None or len(domain_counter) == 0: + return (None, 'needs_reprocess') + + top = domain_counter.most_common(2) + top_domain = top[0][0] + top_count = top[0][1] + + if len(top) == 1 or top[1][1] < top_count: + return (top_domain, 'assigned') + + # Tie — defer to tiebreaker pass + return (None, 'tied_pass_1') + + +def _get_tied_domains(concepts_dir, file_hash): + """Get the set of domains tied for first place in a document's concepts.""" + domain_counter = _count_concept_domains(concepts_dir, file_hash) + if not domain_counter: + return [] + + top = domain_counter.most_common() + if not top: + return [] + + max_count = top[0][1] + return [dom for dom, cnt in top if cnt == max_count] + + +def _channel_video_hashes(db, channel_name, exclude_hash=None): + """Get all document hashes belonging to a PeerTube channel. + + Args: + db: StatusDB instance + channel_name: catalogue.category (channel actor name) + exclude_hash: Hash to exclude (the document being resolved) + + Returns: + List of document hashes + """ + conn = db._get_conn() + rows = conn.execute( + "SELECT hash FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'", + (channel_name,) + ).fetchall() + hashes = [r['hash'] for r in rows] + if exclude_hash: + hashes = [h for h in hashes if h != exclude_hash] + return hashes + + +def _channel_video_count(db, channel_name): + """Count total videos in a channel.""" + conn = db._get_conn() + row = conn.execute( + "SELECT COUNT(*) as cnt FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'", + (channel_name,) + ).fetchone() + return row['cnt'] if row else 0 + + +def run_tiebreaker_pass(db, config): + """Resolve tied domain assignments using channel-level concept analysis. + + Processes all documents where recon_domain_status = 'tied_pass_1'. + + Pass 2: For each tied document, reads concept files from all other + videos in the same channel and picks the tied domain with the highest + channel-wide count. + + Pass 3 (defensive re-run): Re-reads the same channel concept files a + second time with identical logic. This catches concept-file changes + that occurred mid-run (e.g. concurrent enrichment writing new windows). + In steady state pass 3 produces the same result as pass 2, but under + concurrent writes it can resolve a tie that pass 2 missed. + + Mega-channels (>500 videos) skip both passes and go straight to + 'tied_manual' for dashboard review. + + Args: + db: StatusDB instance + config: RECON config dict + + Returns: + Dict with counts: resolved, manual, skipped, errors + """ + concepts_dir = config['paths']['concepts'] + tied_items = db.get_items_by_domain_status('tied_pass_1') + + stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)} + logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve") + + # Cache channel sizes to avoid repeated queries + channel_size_cache = {} + + for item in tied_items: + file_hash = item['hash'] + channel = item.get('category', '') + + try: + tied_domains = _get_tied_domains(concepts_dir, file_hash) + if not tied_domains: + db.set_domain_assignment(file_hash, None, 'needs_reprocess') + stats['skipped'] += 1 + continue + + if len(tied_domains) == 1: + # No longer tied (possibly re-enriched since pass 1) + db.set_domain_assignment(file_hash, tied_domains[0], 'assigned') + stats['resolved'] += 1 + continue + + # Check mega-channel rule + if channel not in channel_size_cache: + channel_size_cache[channel] = _channel_video_count(db, channel) + + if channel_size_cache[channel] > MEGA_CHANNEL_THRESHOLD: + fallback = sorted(tied_domains)[0] + db.set_domain_assignment(file_hash, fallback, 'tied_manual') + stats['manual'] += 1 + logger.debug(f" {file_hash[:12]}: mega-channel '{channel}' " + f"({channel_size_cache[channel]} videos), → tied_manual") + continue + + # Channel tiebreaker: count domains across all other videos in channel + other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash) + channel_domain_counts = Counter() + + for other_hash in other_hashes: + other_counts = _count_concept_domains(concepts_dir, other_hash) + if other_counts: + channel_domain_counts.update(other_counts) + + # Among tied domains only, pick highest channel-wide count + best_domain = None + best_count = -1 + for dom in tied_domains: + c = channel_domain_counts.get(dom, 0) + if c > best_count: + best_count = c + best_domain = dom + + # Pass 2: check if channel tiebreaker resolved it + tied_at_channel = [d for d in tied_domains + if channel_domain_counts.get(d, 0) == best_count] + + if len(tied_at_channel) == 1: + db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2') + stats['resolved'] += 1 + logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (pass 2 channel tiebreaker)") + continue + + # Pass 3: defensive re-run — re-count channel concepts to catch + # concept-file changes that occurred mid-run. Identical logic to + # pass 2; resolves races where files were written between the + # two reads. + channel_domain_counts_p3 = Counter() + for other_hash in other_hashes: + other_counts = _count_concept_domains(concepts_dir, other_hash) + if other_counts: + channel_domain_counts_p3.update(other_counts) + + best_domain_p3 = None + best_count_p3 = -1 + for dom in tied_domains: + c = channel_domain_counts_p3.get(dom, 0) + if c > best_count_p3: + best_count_p3 = c + best_domain_p3 = dom + + tied_at_p3 = [d for d in tied_domains + if channel_domain_counts_p3.get(d, 0) == best_count_p3] + + if len(tied_at_p3) == 1: + db.set_domain_assignment(file_hash, best_domain_p3, 'tied_pass_2') + stats['resolved'] += 1 + logger.debug(f" {file_hash[:12]}: resolved → {best_domain_p3} (pass 3 defensive re-run)") + continue + + # Still tied after pass 3 — mark for manual review + fallback = sorted(tied_domains)[0] + db.set_domain_assignment(file_hash, fallback, 'tied_manual') + stats['manual'] += 1 + logger.debug(f" {file_hash[:12]}: still tied after pass 3, → tied_manual") + + except Exception as e: + logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}") + stats['errors'] += 1 + + logger.info(f"Tiebreaker complete: {stats['resolved']} resolved, " + f"{stats['manual']} manual, {stats['skipped']} skipped, " + f"{stats['errors']} errors") + return stats From 07d26a8ef04ec7727f32a781b30f8e2ad82a30b7 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:04:51 +0000 Subject: [PATCH 04/14] Phase 4: authenticated PeerTube category writer OAuth2 password-grant client for PeerTube API with token caching and auto-refresh on 401. Pushes domain categories via PUT /api/v1/videos/{uuid}. Includes limit parameter on push_pending for staged rollouts, and systemic failure detection that aborts after 5 consecutive failures (catches missing plugin or broken auth before wasting API calls). Config section added to config.yaml for PeerTube API connection parameters. Real credentials remain in .env (gitignored). Co-Authored-By: Claude Opus 4.6 --- config.yaml | 5 + lib/peertube_writer.py | 323 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100644 lib/peertube_writer.py diff --git a/config.yaml b/config.yaml index a2709b0..0bed648 100644 --- a/config.yaml +++ b/config.yaml @@ -408,9 +408,14 @@ service: peertube: api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx) + api_url: http://192.168.1.170:9000 # Direct PeerTube API (bypasses nginx, for writer) + host_header: stream.echo6.co # Host header for PeerTube API requests + username: root # PeerTube admin username + password_env: PEERTUBE_PASSWORD # Env var holding PeerTube admin password public_url: https://stream.echo6.co # Public URL for video links fetch_timeout: 30 # HTTP timeout for API/VTT requests rate_limit_delay: 0.5 # Delay between video ingestions (seconds) + writer_rate_limit: 0.1 # Delay between category push API calls (seconds) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) scraper: diff --git a/lib/peertube_writer.py b/lib/peertube_writer.py new file mode 100644 index 0000000..f8d463a --- /dev/null +++ b/lib/peertube_writer.py @@ -0,0 +1,323 @@ +""" +RECON PeerTube Writer + +Authenticated PeerTube API client for pushing domain category assignments. +Uses OAuth2 password grant, caches tokens, refreshes on 401. + +Config keys used: + peertube.api_url — internal PeerTube URL (http://192.168.1.170:9000) + peertube.host_header — Host header for API requests (stream.echo6.co) + peertube.username — PeerTube admin username + peertube.password_env — env var name holding the password + peertube.rate_limit_delay — delay between API calls (seconds) +""" +import json +import os +import time + +import requests as http_requests + +from .recon_domains import DOMAIN_CATEGORY_MAP +from .utils import setup_logging + +logger = setup_logging('recon.peertube_writer') + +TOKEN_CACHE_PATH = '/opt/recon/data/peertube-oauth-token.json' + + +def _get_peertube_config(config): + """Extract PeerTube writer config with defaults.""" + pt = config.get('peertube', {}) + return { + 'api_url': pt.get('api_url', pt.get('api_base', 'http://192.168.1.170:9000')), + 'host_header': pt.get('host_header', 'stream.echo6.co'), + 'username': pt.get('username', 'root'), + 'password_env': pt.get('password_env', 'PEERTUBE_PASSWORD'), + 'rate_limit_delay': pt.get('writer_rate_limit', 0.1), + } + + +def _load_cached_token(): + """Load cached OAuth token from disk.""" + if os.path.exists(TOKEN_CACHE_PATH): + try: + with open(TOKEN_CACHE_PATH, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return None + + +def _save_token(token_data): + """Save OAuth token to disk cache.""" + os.makedirs(os.path.dirname(TOKEN_CACHE_PATH), exist_ok=True) + with open(TOKEN_CACHE_PATH, 'w') as f: + json.dump(token_data, f) + + +def _get_oauth_client(api_url, host_header): + """Get PeerTube OAuth client credentials. + + Args: + api_url: Base API URL + host_header: Host header value + + Returns: + (client_id, client_secret) tuple + """ + resp = http_requests.get( + f"{api_url}/api/v1/oauth-clients/local", + headers={'Host': host_header}, + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + return data['client_id'], data['client_secret'] + + +def _get_token(api_url, host_header, username, password, client_id, client_secret): + """Obtain OAuth2 access token via password grant. + + Args: + api_url: Base API URL + host_header: Host header value + username: PeerTube username + password: PeerTube password + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + Token data dict with access_token, refresh_token, etc. + """ + resp = http_requests.post( + f"{api_url}/api/v1/users/token", + headers={'Host': host_header}, + data={ + 'client_id': client_id, + 'client_secret': client_secret, + 'grant_type': 'password', + 'username': username, + 'password': password, + }, + timeout=30, + ) + resp.raise_for_status() + token_data = resp.json() + token_data['client_id'] = client_id + token_data['client_secret'] = client_secret + _save_token(token_data) + return token_data + + +def _refresh_token(api_url, host_header, token_data): + """Refresh an expired access token. + + Returns: + New token data dict, or None on failure. + """ + try: + resp = http_requests.post( + f"{api_url}/api/v1/users/token", + headers={'Host': host_header}, + data={ + 'client_id': token_data['client_id'], + 'client_secret': token_data['client_secret'], + 'grant_type': 'refresh_token', + 'refresh_token': token_data['refresh_token'], + }, + timeout=30, + ) + resp.raise_for_status() + new_data = resp.json() + new_data['client_id'] = token_data['client_id'] + new_data['client_secret'] = token_data['client_secret'] + _save_token(new_data) + return new_data + except Exception as e: + logger.warning(f"Token refresh failed: {e}") + return None + + +def _ensure_token(config): + """Ensure we have a valid OAuth token. Returns token data dict. + + Tries cached token first, then obtains a new one. + """ + pt = _get_peertube_config(config) + password = os.environ.get(pt['password_env'], '') + if not password: + raise ValueError(f"PeerTube password not set in env var {pt['password_env']}") + + # Try cached token + token_data = _load_cached_token() + if token_data and 'access_token' in token_data: + return token_data + + # Get fresh token + client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header']) + return _get_token( + pt['api_url'], pt['host_header'], + pt['username'], password, + client_id, client_secret, + ) + + +def _api_request(method, path, config, token_data, **kwargs): + """Make an authenticated PeerTube API request with auto-refresh on 401. + + Args: + method: HTTP method ('GET', 'PUT', etc.) + path: API path (e.g. '/api/v1/videos/{uuid}') + config: RECON config dict + token_data: Current token data dict + **kwargs: Additional requests kwargs (json, data, etc.) + + Returns: + (response, token_data) tuple — token_data may be refreshed. + """ + pt = _get_peertube_config(config) + url = f"{pt['api_url']}{path}" + headers = { + 'Host': pt['host_header'], + 'Authorization': f"Bearer {token_data['access_token']}", + } + + resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs) + + if resp.status_code == 401: + # Try refresh + new_token = _refresh_token(pt['api_url'], pt['host_header'], token_data) + if new_token: + headers['Authorization'] = f"Bearer {new_token['access_token']}" + resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs) + return resp, new_token + else: + # Full re-auth + password = os.environ.get(pt['password_env'], '') + client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header']) + new_token = _get_token( + pt['api_url'], pt['host_header'], + pt['username'], password, + client_id, client_secret, + ) + headers['Authorization'] = f"Bearer {new_token['access_token']}" + resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs) + return resp, new_token + + return resp, token_data + + +def push_category(video_uuid, category_id, config, token_data=None): + """Push a category assignment to a single PeerTube video. + + Args: + video_uuid: PeerTube video UUID + category_id: Category ID (100-117) + config: RECON config dict + token_data: Optional pre-fetched token data + + Returns: + (success: bool, token_data: dict) tuple + """ + if token_data is None: + token_data = _ensure_token(config) + + resp, token_data = _api_request( + 'PUT', + f'/api/v1/videos/{video_uuid}', + config, + token_data, + json={'category': category_id}, + ) + + if resp.status_code in (200, 204): + return True, token_data + else: + logger.warning(f"Failed to push category for {video_uuid}: " + f"HTTP {resp.status_code} — {resp.text[:200]}") + return False, token_data + + +def extract_uuid(catalogue_path): + """Extract PeerTube video UUID from catalogue path. + + Catalogue paths for PeerTube videos look like: + https://stream.echo6.co/w/UUID + + Args: + catalogue_path: catalogue.path value + + Returns: + UUID string or None + """ + if not catalogue_path: + return None + if '/w/' in catalogue_path: + return catalogue_path.rsplit('/w/', 1)[-1] + return None + + +def push_pending(db, config, limit=None): + """Push all assigned-but-unpushed domain categories to PeerTube. + + Args: + db: StatusDB instance + config: RECON config dict + limit: Optional max number of items to push + + Returns: + (success_count, fail_count) tuple + """ + items = db.get_unpushed_assignments() + if limit: + items = items[:limit] + if not items: + logger.info("No unpushed assignments to push") + return (0, 0) + + pt = _get_peertube_config(config) + delay = pt['rate_limit_delay'] + + SYSTEMIC_FAIL_THRESHOLD = 5 # abort if first N items all fail + + logger.info(f"Pushing {len(items)} category assignments to PeerTube") + + token_data = _ensure_token(config) + success = 0 + failed = 0 + + for item in items: + file_hash = item['hash'] + domain = item.get('recon_domain') + catalogue_path = item.get('catalogue_path', '') + + if not domain or domain not in DOMAIN_CATEGORY_MAP: + logger.warning(f" {file_hash[:12]}: invalid domain '{domain}', skipping") + failed += 1 + continue + + uuid = extract_uuid(catalogue_path) + if not uuid: + logger.warning(f" {file_hash[:12]}: could not extract UUID from '{catalogue_path}'") + failed += 1 + continue + + category_id = DOMAIN_CATEGORY_MAP[domain] + ok, token_data = push_category(uuid, category_id, config, token_data) + + if ok: + db.set_peertube_pushed(file_hash) + success += 1 + else: + failed += 1 + + # Abort on systemic failure (e.g. plugin not installed, auth broken) + if success == 0 and failed >= SYSTEMIC_FAIL_THRESHOLD: + logger.error(f"Aborting push: first {failed} items all failed — " + f"check plugin installation and PeerTube API config") + break + + time.sleep(delay) + + logger.info(f"Push complete: {success} succeeded, {failed} failed") + return (success, failed) From a273b52c7e635e96d38a6573140610ed0162d5dc Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:05:19 +0000 Subject: [PATCH 05/14] Phase 5: assign-categories CLI commands Adds assign-categories subcommand with flags: --backfill Pass 1 domain assignment for all complete stream docs --tiebreaker-pass Resolve ties via channel concept analysis --push-pending Push assigned categories to PeerTube API (staged via --limit) --reprocess-missing Re-queue items with missing/legacy concepts --dry-run Preview without writes (enhanced for reprocess: shows concept dir existence and file counts) --limit N Cap processing count Includes pre-deletion audit logging for --reprocess-missing (logs path, file count, and hash before each shutil.rmtree). Co-Authored-By: Claude Opus 4.6 --- recon.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/recon.py b/recon.py index 9635a59..cad106a 100755 --- a/recon.py +++ b/recon.py @@ -863,6 +863,168 @@ def cmd_ingest(args): return 0 +def cmd_assign_categories(args): + """Assign RECON domains to PeerTube videos and push categories.""" + from lib.domain_assigner import compute_assignment, run_tiebreaker_pass + from lib.peertube_writer import push_pending, extract_uuid + from lib.recon_domains import DOMAIN_CATEGORY_MAP + + config = get_config() + db = StatusDB() + dry_run = args.dry_run + limit = args.limit + + if args.backfill: + # Pass 1: assign domains to all complete stream docs with no assignment + conn = db._get_conn() + q = """SELECT d.hash FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.status = 'complete' + AND d.recon_domain IS NULL + AND c.source = 'stream.echo6.co' + ORDER BY d.discovered_at""" + if limit: + q += f" LIMIT {int(limit)}" + rows = conn.execute(q).fetchall() + hashes = [r['hash'] for r in rows] + + if not hashes: + print("No unassigned complete stream documents found") + return 0 + + print(f"Backfill: processing {len(hashes)} documents" + + (" [DRY RUN]" if dry_run else "")) + + stats = {'assigned': 0, 'tied_pass_1': 0, 'needs_reprocess': 0, 'errors': 0} + for i, file_hash in enumerate(hashes): + try: + domain, status = compute_assignment(file_hash, db, config) + stats[status] = stats.get(status, 0) + 1 + if not dry_run: + db.set_domain_assignment(file_hash, domain, status) + if (i + 1) % 1000 == 0: + print(f" Progress: {i + 1}/{len(hashes)}") + except Exception as e: + stats['errors'] += 1 + logger.warning(f" Assignment error for {file_hash[:12]}: {e}") + + print(f"\nBackfill results:") + for k, v in sorted(stats.items()): + print(f" {k}: {v}") + return 0 + + elif args.tiebreaker_pass: + if dry_run: + items = db.get_items_by_domain_status('tied_pass_1') + print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]") + return 0 + stats = run_tiebreaker_pass(db, config) + print(f"\nTiebreaker results:") + for k, v in sorted(stats.items()): + print(f" {k}: {v}") + return 0 + + elif args.push_pending: + if dry_run: + items = db.get_unpushed_assignments() + if limit: + items = items[:limit] + print(f"Push pending: {len(items)} items would be pushed [DRY RUN]") + return 0 + success, failed = push_pending(db, config, limit=limit) + print(f"\nPush results: {success} succeeded, {failed} failed") + return 0 + + elif args.reprocess_missing: + items = db.get_items_by_domain_status('needs_reprocess', limit=limit) + if not items: + print("No items with needs_reprocess status") + return 0 + + print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else "")) + requeued = 0 + for item in items: + file_hash = item['hash'] + if dry_run: + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + has_concepts = os.path.isdir(concepts_dir) + concept_count = len(os.listdir(concepts_dir)) if has_concepts else 0 + detail = f"DELETE {concept_count} concept files" if has_concepts else "no concept dir" + print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')} ({detail})") + requeued += 1 + continue + + # Remove stale concept files + import shutil + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.isdir(concepts_dir): + logger.info(f" Deleting concept dir: {concepts_dir} " + f"({len(os.listdir(concepts_dir))} files, hash={file_hash})") + shutil.rmtree(concepts_dir) + + # Reset document status to allow re-processing + conn = db._get_conn() + conn.execute( + """UPDATE documents SET + status = 'catalogued', + concepts_extracted = 0, + vectors_inserted = 0, + recon_domain = NULL, + recon_domain_status = NULL, + recon_domain_assigned_at = NULL, + peertube_category_pushed_at = NULL, + error_message = NULL, + extracted_at = NULL, + enriched_at = NULL, + embedded_at = NULL + WHERE hash = ?""", + (file_hash,) + ) + conn.commit() + # Re-queue for pipeline processing + db.queue_document(file_hash) + requeued += 1 + + print(f"Requeued {requeued} items for reprocessing") + return 0 + + else: + # Default: show domain assignment status + status_counts = db.get_domain_status_counts() + domain_dist = db.get_domain_distribution() + + conn = db._get_conn() + total_stream = conn.execute( + """SELECT COUNT(*) as cnt FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'""" + ).fetchone()['cnt'] + unassigned = conn.execute( + """SELECT COUNT(*) as cnt FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'stream.echo6.co' AND d.status = 'complete' + AND d.recon_domain IS NULL""" + ).fetchone()['cnt'] + unpushed = len(db.get_unpushed_assignments()) + + print("=== Domain Assignment Status ===\n") + print(f"Total complete stream docs: {total_stream}") + print(f"Unassigned: {unassigned}") + print(f"Unpushed to PeerTube: {unpushed}") + + if status_counts: + print(f"\nAssignment status breakdown:") + for status, cnt in sorted(status_counts.items()): + print(f" {status:<20s} {cnt:>6d}") + + if domain_dist: + print(f"\nDomain distribution:") + for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]): + print(f" {domain:<35s} {cnt:>6d}") + + return 0 + + def cmd_pipeline(args): """Stream B library pipeline: status, migrate, reverse, watch, sweep.""" from lib.new_pipeline import ( @@ -1158,6 +1320,16 @@ def main(): p.set_defaults(func=cmd_ingest) + # assign-categories + p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos') + p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs') + p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis') + p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API') + p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items') + p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing') + p.add_argument('--limit', type=int, help='Limit number of items to process') + p.set_defaults(func=cmd_assign_categories) + # pipeline (Stream B) p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)') p.add_argument('pipeline_action', nargs='?', default='status', From 6a17df8078bd5b307eddb8de6b61120393d3f8a0 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:06:07 +0000 Subject: [PATCH 06/14] Phase 6: post-embed domain assignment hook After a stream.echo6.co video completes embedding, automatically runs compute_assignment (pass 1 only). Clear winners get pushed to PeerTube immediately; ties are marked tied_pass_1 for the batch tiebreaker. Also tags stream docs that hit early-return paths (no concepts, no valid concepts) with needs_reprocess status so they are visible to the --reprocess-missing CLI command. Error handling: domain assignment failure logs a warning but does not block the embedding pipeline. Co-Authored-By: Claude Opus 4.6 --- lib/embedder.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/lib/embedder.py b/lib/embedder.py index e80dad8..b1f59ca 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -255,15 +255,22 @@ def embed_single(file_hash, db, config): if not all_concepts: db.update_status(file_hash, 'complete', vectors_inserted=0) + # Tag stream docs with no concepts for reprocessing + _cat = db._get_conn().execute( + "SELECT source FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if _cat and dict(_cat)['source'] == 'stream.echo6.co': + db.set_domain_assignment(file_hash, None, 'needs_reprocess') logger.info(f"No concepts to embed for {doc['filename']}") return True - # Look up source from catalogue once per doc + # Look up source and path from catalogue once per doc cat_conn = db._get_conn() cat_row = cat_conn.execute( - "SELECT source FROM catalogue WHERE hash = ?", (file_hash,) + "SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,) ).fetchone() source = dict(cat_row)['source'] if cat_row else '' + catalogue_path = dict(cat_row)['path'] if cat_row else '' download_url = '' is_web = doc.get('path', '').startswith(('http://', 'https://')) @@ -315,6 +322,8 @@ def embed_single(file_hash, db, config): if not valid: db.update_status(file_hash, 'complete', vectors_inserted=0) + if source == 'stream.echo6.co': + db.set_domain_assignment(file_hash, None, 'needs_reprocess') logger.info(f"No valid concepts to embed for {doc['filename']}") return True @@ -395,6 +404,28 @@ def embed_single(file_hash, db, config): db.update_status(file_hash, 'complete', vectors_inserted=embedded_count) logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)") + + # Post-embed hook: assign domain for PeerTube videos + if source == 'stream.echo6.co': + try: + from .domain_assigner import compute_assignment + from .peertube_writer import push_category, extract_uuid + from .recon_domains import DOMAIN_CATEGORY_MAP + domain, status = compute_assignment(file_hash, db, config) + db.set_domain_assignment(file_hash, domain, status) + if domain and status == 'assigned': + cat_id = DOMAIN_CATEGORY_MAP[domain] + uuid = extract_uuid(catalogue_path) + if uuid: + pushed, _token = push_category(uuid, cat_id, config) + if pushed: + db.set_peertube_pushed(file_hash) + logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube") + else: + logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending") + except Exception as e: + logger.warning(f"Domain assignment failed for {file_hash}: {e}") + return True except Exception as e: From d1270be64df750fd93995548c4b14ee699c90cc9 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:06:25 +0000 Subject: [PATCH 07/14] Phase 7: manual review dashboard for tied items Adds /peertube/review page showing only tied_manual items for human domain assignment. Each row displays video title, channel, concept domain counts, and a dropdown to select the correct domain. Routes: GET /peertube/review (page), GET /api/peertube/review/items (JSON), GET /api/peertube/review/stats (counts), POST /api/peertube/review/assign (assign + push to PeerTube). Review subnav entry added to PEERTUBE_SUBNAV. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 98 ++++++++++++++++++++++ templates/peertube/review.html | 148 +++++++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 templates/peertube/review.html diff --git a/lib/api.py b/lib/api.py index 8a1f383..f7d9b10 100644 --- a/lib/api.py +++ b/lib/api.py @@ -88,6 +88,7 @@ KNOWLEDGE_SUBNAV = [ PEERTUBE_SUBNAV = [ {'href': '/peertube', 'label': 'Dashboard'}, {'href': '/peertube/channels', 'label': 'Channels'}, + {'href': '/peertube/review', 'label': 'Review'}, ] @@ -376,6 +377,103 @@ def peertube_channels(): domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels') +@app.route('/peertube/review') +def peertube_review(): + from .recon_domains import VALID_DOMAINS + return render_template('peertube/review.html', + domain='peertube', subnav=PEERTUBE_SUBNAV, + active_page='/peertube/review', + valid_domains=sorted(VALID_DOMAINS)) + + +@app.route('/api/peertube/review/stats') +def api_peertube_review_stats(): + db = StatusDB() + counts = db.get_domain_status_counts() + return jsonify(counts) + + +@app.route('/api/peertube/review/items') +def api_peertube_review_items(): + import json as _json + from .recon_domains import VALID_DOMAINS + db = StatusDB() + config = get_config() + items = db.get_items_by_domain_status('tied_manual', limit=200) + + result = [] + concepts_dir = config['paths']['concepts'] + for item in items: + file_hash = item['hash'] + # Count domains from concept files + top_domains = [] + doc_concepts_dir = os.path.join(concepts_dir, file_hash) + if os.path.isdir(doc_concepts_dir): + from collections import Counter + domain_counter = Counter() + for fname in os.listdir(doc_concepts_dir): + if not fname.startswith('window_') or not fname.endswith('.json'): + continue + try: + with open(os.path.join(doc_concepts_dir, fname)) as f: + concepts = _json.load(f) + for c in concepts: + if isinstance(c, dict): + dom = c.get('domain') + if isinstance(dom, str) and dom in VALID_DOMAINS: + domain_counter[dom] += 1 + except Exception: + continue + top_domains = [{'domain': d, 'count': cnt} + for d, cnt in domain_counter.most_common(5)] + + result.append({ + 'hash': file_hash, + 'filename': item.get('filename', ''), + 'category': item.get('category', ''), + 'recon_domain': item.get('recon_domain'), + 'recon_domain_status': item.get('recon_domain_status'), + 'top_domains': top_domains, + }) + return jsonify(result) + + +@app.route('/api/peertube/review/assign', methods=['POST']) +def api_peertube_review_assign(): + from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP + from .peertube_writer import push_category, extract_uuid + data = request.get_json() + file_hash = data.get('hash') + domain = data.get('domain') + + if not file_hash or not domain: + return jsonify({'ok': False, 'error': 'Missing hash or domain'}), 400 + if domain not in VALID_DOMAINS: + return jsonify({'ok': False, 'error': f'Invalid domain: {domain}'}), 400 + + db = StatusDB() + config = get_config() + + db.set_domain_assignment(file_hash, domain, 'manual_assigned') + + # Push to PeerTube + conn = db._get_conn() + cat_row = conn.execute( + "SELECT path FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if cat_row: + uuid = extract_uuid(dict(cat_row)['path']) + if uuid: + cat_id = DOMAIN_CATEGORY_MAP[domain] + try: + push_category(uuid, cat_id, config) + db.set_peertube_pushed(file_hash) + except Exception as e: + return jsonify({'ok': True, 'warning': f'Assigned but PeerTube push failed: {e}'}) + + return jsonify({'ok': True, 'domain': domain}) + + @app.route('/settings/keys') def settings_keys(): from lib.key_manager import get_key_manager diff --git a/templates/peertube/review.html b/templates/peertube/review.html new file mode 100644 index 0000000..6793e03 --- /dev/null +++ b/templates/peertube/review.html @@ -0,0 +1,148 @@ +{% extends "base.html" %} +{% block content %} +
+
+
Manual Review
+
Assigned
+
Tied (Pass 1)
+
Needs Reprocess
+
+ +
+

Manual Review Queue

+
+ + + + + + + + + + + + +
Loading...
+
+
+
+{% endblock %} +{% block scripts %} + +{% endblock %} From a39ec566201770e10a8b440211adaf80704a57b8 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:06:49 +0000 Subject: [PATCH 08/14] Docs: domain assignment guide, migration runbook, blast radius - domain-assignment.md: algorithm walkthrough (pass 1/2/3), status values, CLI command reference, dashboard review guide - migration-runbook.md: step-by-step deploy with pre-deploy backups, 8 STOP pause points for operator verification, staged push rollout, quarantined --reprocess-missing procedure, 5 rollback procedures - deploy-blast-radius.md: per-step risk reference with worst case, detection signals, rollback procedures, and risk tiers Co-Authored-By: Claude Opus 4.6 --- docs/deploy-blast-radius.md | 20 ++ docs/domain-assignment.md | 111 ++++++++++ docs/migration-runbook.md | 426 ++++++++++++++++++++++++++++++++++++ 3 files changed, 557 insertions(+) create mode 100644 docs/deploy-blast-radius.md create mode 100644 docs/domain-assignment.md create mode 100644 docs/migration-runbook.md diff --git a/docs/deploy-blast-radius.md b/docs/deploy-blast-radius.md new file mode 100644 index 0000000..fd6bc79 --- /dev/null +++ b/docs/deploy-blast-radius.md @@ -0,0 +1,20 @@ +# Deploy Blast Radius Reference + +Quick-reference for operators during deployment of domain categorization. + +| Step | What Changes | Worst Case (Partial Failure) | Detection Signal | Rollback | Est. Rollback Time | +|------|-------------|------------------------------|-----------------|----------|-------------------| +| **Plugin install** | PeerTube plugin dir on CT 110 | PeerTube fails to start | `systemctl status peertube` shows failed | Move plugin dir to `.disabled`, restart PeerTube | 2 min | +| **PeerTube restart** | PeerTube service state | PeerTube crash loop | `journalctl -u peertube` shows repeated failures | Disable plugin, restart | 2 min | +| **Schema migration** (RECON restart) | 4 new nullable columns + 1 index in recon.db | Migration SQL error leaves partial columns | Python PRAGMA check fails | DROP COLUMN for each added column | 5 min | +| **--backfill** | `recon_domain` + `recon_domain_status` on ~22K rows | Wrong domain assignments | Spot-check 20 random docs | `UPDATE documents SET recon_domain = NULL, recon_domain_status = NULL ...` | 1 min | +| **--tiebreaker-pass** | ~1,100 rows: `tied_pass_1` to `tied_pass_2`/`tied_manual` | Wrong tiebreaker resolution | Spot-check 5 resolved items | Reset `tied_pass_2`/`tied_manual` back to `tied_pass_1` | 1 min | +| **--push-pending** | PeerTube `video.category` column on ~22K rows | Wrong categories visible to all PeerTube users | PeerTube UI shows wrong labels | `UPDATE video SET category = NULL WHERE category >= 100` + clear push timestamps | 2 min | +| **--reprocess-missing** | **DELETES** concept directories (irreversible locally) | Concepts deleted, re-enrichment fails (Gemini API down, quota hit) | `recon.py status` shows stuck `queued` items, concept dirs missing | Restore from Contabo backup (`rsync`) | 10-60 min depending on count | + +## Risk Tiers + +- **Low risk (read-only):** `--dry-run` on any command, status display +- **Medium risk (DB-only, reversible):** `--backfill`, `--tiebreaker-pass`, schema migration +- **High risk (external writes):** `--push-pending` (writes to PeerTube, visible to users) +- **Critical risk (destructive):** `--reprocess-missing` (deletes concept files, $130+ Gemini work at risk) diff --git a/docs/domain-assignment.md b/docs/domain-assignment.md new file mode 100644 index 0000000..394a652 --- /dev/null +++ b/docs/domain-assignment.md @@ -0,0 +1,111 @@ +# Domain Assignment — Algorithm & Operations Guide + +## Overview + +RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concepts extracted from its transcript. Assignments are pushed to PeerTube as category metadata via a custom plugin. + +## Algorithm + +### Pass 1: Concept Domain Count (inline, per-document) + +Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`. + +1. Read all `data/concepts/{hash}/window_*.json` files +2. Count domain occurrences across all concepts, filtering to `VALID_DOMAINS` only (skips legacy domains) +3. If no valid concepts → `needs_reprocess` +4. If single top domain → `assigned` +5. If tied → `tied_pass_1` (deferred to tiebreaker) + +### Pass 2: Channel Tiebreaker (batch) + +Runs via `assign-categories --tiebreaker-pass`. + +For each `tied_pass_1` document: + +1. Identify the tied domains +2. Look up the document's channel (`catalogue.category`) +3. **Mega-channel rule:** If channel has >500 videos, skip tiebreaking → `tied_manual` +4. Read concept files for all other videos in the same channel +5. Among the tied domains only, pick the one with the highest channel-wide concept count +6. If resolved → `tied_pass_2` +7. If still tied → proceed to pass 3 + +### Pass 3: Defensive Re-Run + +If pass 2 does not resolve the tie, re-read the same channel concept files and re-run identical counting logic. This catches concept-file changes that occurred mid-run (e.g. concurrent enrichment writing new windows during the batch). In steady state, pass 3 produces the same result as pass 2, but under concurrent writes it can resolve a tie that pass 2 missed. + +- If resolved → `tied_pass_2` (same status — the column tracks "channel scan resolved it") +- If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) + +### Mega-Channel Rule + +Channels with >500 videos (like the "Transcript" catch-all with ~9,200 videos) are not topically coherent. Scanning their concepts produces meaningless aggregate data. These go straight to `tied_manual` for dashboard review. + +## Status Values + +| Status | Meaning | Next Action | +|--------|---------|-------------| +| `assigned` | Clear winner from pass 1 | Push to PeerTube | +| `tied_pass_1` | Concept tie, awaiting tiebreaker | Run `--tiebreaker-pass` | +| `tied_pass_2` | Resolved by channel tiebreaker | Push to PeerTube | +| `tied_manual` | Needs human review | Review at `/peertube/review` | +| `needs_reprocess` | Missing concepts or only legacy domains | Run `--reprocess-missing` | +| `manual_assigned` | Human override from dashboard | Already pushed | + +**"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}` + +## CLI Commands + +```bash +cd /opt/recon && source venv/bin/activate + +# Show current assignment status +python3 recon.py assign-categories + +# Pass 1: backfill all unassigned complete stream documents +python3 recon.py assign-categories --backfill --dry-run +python3 recon.py assign-categories --backfill + +# Pass 2: resolve ties via channel analysis +python3 recon.py assign-categories --tiebreaker-pass + +# Push all assigned-but-unpushed categories to PeerTube API +python3 recon.py assign-categories --push-pending + +# Re-queue items with missing/legacy concepts +python3 recon.py assign-categories --reprocess-missing + +# Limit processing count +python3 recon.py assign-categories --backfill --limit 100 +``` + +## Dashboard Review + +The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items. Each row displays: +- Video title and channel +- Top concept domains with counts +- Dropdown to select the correct domain +- Assign button (pushes to PeerTube immediately) + +Items with `needs_reprocess` status do NOT appear in the review UI — they are handled exclusively via the CLI `--reprocess-missing` command. + +## Pipeline Integration + +New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`: + +1. Runs `compute_assignment()` (pass 1 only) +2. If clear winner: pushes category to PeerTube immediately +3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run +4. On error: logs warning and continues — does not block the pipeline + +## Source Files + +| File | Purpose | +|------|---------| +| `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS | +| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` | +| `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` | +| `lib/embedder.py` | Post-embed hook | +| `lib/status.py` | DB columns + helper methods | +| `lib/api.py` | Dashboard review routes | +| `recon.py` | CLI `assign-categories` command | diff --git a/docs/migration-runbook.md b/docs/migration-runbook.md new file mode 100644 index 0000000..c962874 --- /dev/null +++ b/docs/migration-runbook.md @@ -0,0 +1,426 @@ +# Domain Categorization Migration Runbook + +Step-by-step procedure to deploy the PeerTube domain categorization feature. + +## Prerequisites + +- Feature branch `feature/peertube-domain-categorization` merged to master (or checked out) +- SSH access to recon-vm (192.168.1.130) and CT 110 (192.168.1.170) +- PeerTube admin credentials (`root` / password in `.env`) + +## Pre-Deploy Backups + +These backups MUST be completed before any state-changing step. + +### 1. Snapshot RECON database + +```bash +ssh zvx@192.168.1.130 +cp /opt/recon/data/recon.db "/opt/recon/data/recon.db.pre-domain-feature.$(date +%Y%m%d_%H%M%S).bak" +ls -la /opt/recon/data/recon.db.pre-domain-feature.*.bak # Confirm +``` + +### 2. Snapshot PeerTube PostgreSQL + +```bash +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres pg_dump peertube_prod' > "/tmp/peertube_prod.pre-domain-feature.$(date +%Y%m%d_%H%M%S).sql" +ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero +``` + +### 3. Verify off-site concept backup + +```bash +# Check last rsync to Contabo +ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5' +ssh root@100.64.0.1 'ls -la /opt/recon-backup/concepts/ | tail -5' +# Confirm timestamps match within 6 hours +``` + +### 4. Confirm RECON service state + +```bash +ssh zvx@192.168.1.130 'sudo systemctl status recon --no-pager' +# Note: do NOT restart until Step 3. If currently running, confirm no active +# enrichment/embedding workers before proceeding. +``` + +--- + +## Step 1: Deploy PeerTube Plugin to CT 110 + +```bash +# From recon-vm, copy plugin to CT 110 +ssh zvx@192.168.1.130 +cd /opt/recon/peertube-plugin/ +scp -r peertube-plugin-recon-domains root@192.168.1.241:'pct exec 110 -- mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains' + +# Or via the Proxmox host: +ssh root@192.168.1.243 # media host +pct exec 110 -- bash -c 'mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains' +# Copy files into the container (scp from recon-vm or use pct push) +``` + +Alternative: Install via PeerTube admin UI (Admin > Plugins > Install). + +```bash +# Restart PeerTube to register plugin +ssh root@192.168.1.243 'pct exec 110 -- systemctl restart peertube' +``` + +**STOP.** Check PeerTube logs for plugin registration errors: + +```bash +ssh root@192.168.1.243 'pct exec 110 -- journalctl -u peertube --since=-5min' | grep -i plugin +``` + +If any errors reference `peertube-plugin-recon-domains`, do NOT proceed. Diagnose +and fix the plugin before continuing. See Rollback: "Plugin install fails" below. + +## Step 2: Verify Plugin + +```bash +# From recon-vm +curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | grep -E '"1[0-1][0-9]"' +``` + +Should show all 18 categories (IDs 100-117). If any are missing, do NOT proceed. + +Run the parity test: +```bash +cd /opt/recon && source venv/bin/activate +python3 tests/test_constants_parity.py +``` + +## Step 3: Apply Schema Migration + +**Requires RECON restart (ask user first).** + +```bash +sudo systemctl restart recon +``` + +The migration runs automatically on startup via `StatusDB._init_db()`. Verify: + +```bash +cd /opt/recon && source venv/bin/activate +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +cols = [r[1] for r in conn.execute('PRAGMA table_info(documents)').fetchall()] +for c in ['recon_domain', 'recon_domain_status', 'recon_domain_assigned_at', 'peertube_category_pushed_at']: + assert c in cols, f'Missing: {c}' + print(f' {c}: OK') + +# Verify index exists +indexes = [r[1] for r in conn.execute('PRAGMA index_list(documents)').fetchall()] +assert 'idx_documents_recon_domain_status' in indexes, 'Missing index' +print(' idx_documents_recon_domain_status: OK') + +# Verify no columns were dropped +expected_existing = ['hash', 'status', 'filename', 'discovered_at'] +for c in expected_existing: + assert c in cols, f'ALERT: existing column {c} is missing!' +print('Migration verified — all columns present, no existing columns dropped') +" +``` + +## Step 4: Run Backfill + +```bash +cd /opt/recon && source venv/bin/activate + +# Dry run first +python3 recon.py assign-categories --backfill --dry-run +``` + +**STOP.** Verify dry-run output distribution roughly matches investigation benchmarks: +- ~94.8% `assigned` (clear winners) +- ~5.2% `tied_pass_1` (ties) +- ~19.5% `needs_reprocess` (missing/legacy concepts) + +If the distribution deviates more than 5 percentage points from these benchmarks, +halt and investigate. Do not proceed until the deviation is explained. + +```bash +# Execute pass 1 +python3 recon.py assign-categories --backfill +``` + +**STOP.** Spot-check 20 random assigned documents: + +```bash +python3 -c " +from lib.status import StatusDB +db = StatusDB() +rows = db._get_conn().execute( + \"SELECT d.hash, d.recon_domain FROM documents d WHERE d.recon_domain_status = 'assigned' ORDER BY RANDOM() LIMIT 20\" +).fetchall() +for r in rows: + print(r['hash'][:12], r['recon_domain']) +" +``` + +For each, visually verify against concept files: `ls data/concepts/{hash}/` and +spot-check one `window_*.json` to confirm the assigned domain is plausible. +Halt if any are wildly wrong. See Rollback: "Clear wrong backfill assignments" below. + +```bash +# Run tiebreaker pass +python3 recon.py assign-categories --tiebreaker-pass +``` + +**STOP.** Verify tiebreaker results: + +```bash +python3 -c " +from lib.status import StatusDB +db = StatusDB() +c = db.get_domain_status_counts() +print('Status breakdown:', c) +print() +print('tied_pass_2 (resolved):', c.get('tied_pass_2', 0)) +print('tied_manual (needs review):', c.get('tied_manual', 0)) +" +``` + +Spot-check 5 `tied_pass_2` items — verify the resolved domain is plausible given +the channel's other content. + +```bash +# Check overall status +python3 recon.py assign-categories +``` + +## Step 5: Push to PeerTube + +Push in stages. Do NOT push all at once. + +```bash +# Dry run: confirm count +python3 recon.py assign-categories --push-pending --dry-run + +# Stage 1: push 100 items +python3 recon.py assign-categories --push-pending --limit 100 +``` + +**STOP.** Verify in PeerTube UI (stream.echo6.co admin, or via API) that 100 videos +now show RECON domain categories. Spot-check 5 videos. + +```bash +# Verify via API: pick a random pushed video +python3 -c " +from lib.status import StatusDB +db = StatusDB() +row = db._get_conn().execute( + \"SELECT d.recon_domain, c.path FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.peertube_category_pushed_at IS NOT NULL ORDER BY RANDOM() LIMIT 1\" +).fetchone() +if row: + uuid = row['path'].rsplit('/w/', 1)[-1] if row['path'] and '/w/' in row['path'] else '?' + print(f'Domain: {row[\"recon_domain\"]} UUID: {uuid}') + print(f'Check: curl -s http://192.168.1.170:9000/api/v1/videos/{uuid} -H \"Host: stream.echo6.co\" | python3 -m json.tool | grep category') +" +``` + +```bash +# Stage 2: push 1000 items +python3 recon.py assign-categories --push-pending --limit 1000 +``` + +**STOP.** Verify via PeerTube database: + +```bash +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"' +``` + +```bash +# Stage 3: push remaining +python3 recon.py assign-categories --push-pending +``` + +## Step 6: Verify + +```bash +# Check PeerTube database directly +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"' + +# Check uncategorized +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT count(*) FROM video WHERE category IS NULL"' + +# Check RECON status +python3 recon.py assign-categories +``` + +## Step 7: Reprocess Missing Items (SEPARATE POST-DEPLOY OPERATION) + +**WARNING:** This step deletes concept directories. It is the only destructive +operation in the entire feature. Run it separately from the initial deploy, +after all other steps are verified and stable. + +```bash +# Dry run first — review what would be deleted +python3 recon.py assign-categories --reprocess-missing --dry-run --limit 10 +``` + +**STOP.** Review output. Verify concept dirs listed are genuinely stale (legacy +domains only, or missing concept files). The dry-run reports file counts for +each directory that would be deleted. + +```bash +# Small batch +python3 recon.py assign-categories --reprocess-missing --limit 10 +``` + +**STOP.** Verify: check that 10 items re-entered the pipeline. + +```bash +python3 recon.py status # queued count should increase by ~10 +``` + +Wait for pipeline to process them. Verify domain assignment on completion: + +```bash +# Check these specific items got re-enriched and assigned +python3 recon.py assign-categories +``` + +```bash +# Scale up +python3 recon.py assign-categories --reprocess-missing --limit 100 + +# Then unbounded +python3 recon.py assign-categories --reprocess-missing +``` + +**Note on interrupts:** If `--reprocess-missing` is interrupted mid-run, re-running +it is safe. Any documents stranded at `status='catalogued'` without being re-queued +can be recovered with `recon.py queue --source stream.echo6.co`. + +## Step 8: Dashboard Review + +Navigate to `https://recon.echo6.co/peertube/review` to review `tied_manual` items. +Each row shows the video, channel, tied domains, and concept counts. Select the +correct domain and click Assign. + +--- + +## Rollback Procedures + +### Plugin install fails or breaks PeerTube + +```bash +# Disable plugin without uninstalling +ssh root@192.168.1.243 'pct exec 110 -- bash -c " + mv /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains \ + /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains.disabled + systemctl restart peertube +"' + +# Verify PeerTube is healthy +curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | head + +# To fully remove: use PeerTube admin UI → Plugins → Uninstall +``` + +### Schema migration revert (drop new columns) + +Only needed if the columns cause problems. The columns are nullable and have no +constraints, so they should be inert. + +```bash +ssh zvx@192.168.1.130 'cd /opt/recon && source venv/bin/activate && python3 -c " +import sqlite3 +conn = sqlite3.connect(\"data/recon.db\") +for col in [\"recon_domain\", \"recon_domain_status\", \"recon_domain_assigned_at\", \"peertube_category_pushed_at\"]: + try: + conn.execute(f\"ALTER TABLE documents DROP COLUMN {col}\") + print(f\"Dropped: {col}\") + except Exception as e: + print(f\"Skip {col}: {e}\") +conn.execute(\"DROP INDEX IF EXISTS idx_documents_recon_domain_status\") +conn.commit() +print(\"Index dropped\") +"' +``` + +Note: SQLite ALTER TABLE DROP COLUMN requires SQLite 3.35.0+ (2021-03-12). +Ubuntu 24.04 ships 3.45.1 — this is fine. + +### Clear wrong backfill assignments (selective or full) + +```bash +cd /opt/recon && source venv/bin/activate + +# Clear ALL domain assignments +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +conn.execute('''UPDATE documents SET + recon_domain = NULL, recon_domain_status = NULL, + recon_domain_assigned_at = NULL, peertube_category_pushed_at = NULL''') +conn.commit() +print('Cleared all domain assignments') +" + +# Clear only tiebreaker results (reset to tied_pass_1 for re-run) +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +conn.execute('''UPDATE documents SET + recon_domain = NULL, recon_domain_status = 'tied_pass_1', + recon_domain_assigned_at = NULL +WHERE recon_domain_status IN ('tied_pass_2', 'tied_manual')''') +conn.commit() +" +``` + +### Clear wrong PeerTube categories + +```bash +# Reset ALL RECON categories (100+) to NULL in PeerTube +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \ + -c "UPDATE video SET category = NULL WHERE category >= 100"' + +# Verify +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \ + -c "SELECT count(*) FROM video WHERE category >= 100"' +# Should return 0 + +# Also clear RECON pushed timestamps so --push-pending can retry +cd /opt/recon && source venv/bin/activate +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +conn.execute('UPDATE documents SET peertube_category_pushed_at = NULL WHERE peertube_category_pushed_at IS NOT NULL') +conn.commit() +print('Cleared push timestamps') +" +``` + +### Restore concepts after failed --reprocess-missing + +```bash +# Concept backups are on Contabo at /opt/recon-backup/concepts/ +# Identify which hashes were deleted (check RECON logs) +ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20' + +# Restore specific hash from Contabo +HASH= +ssh root@100.64.0.1 "tar -cf - -C /opt/recon-backup/concepts/ $HASH" | \ + ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/" + +# Restore ALL concepts (nuclear option) +ssh root@100.64.0.1 'rsync -av /opt/recon-backup/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/' +``` + +### Fully remove feature + +1. Uninstall plugin from PeerTube admin UI +2. Restart PeerTube +3. Revert RECON code changes (`git checkout master`) +4. Restart RECON +5. Drop schema columns (see above) +6. Reset PeerTube categories (see above) From 75b2e19e9444f0edf576edded21b62ed72844119 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 01:42:05 +0000 Subject: [PATCH 09/14] Fix: correct Contabo concept backup path in docs (/opt/backups/recon/concepts/) Co-Authored-By: Claude Opus 4.6 --- docs/migration-runbook.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/migration-runbook.md b/docs/migration-runbook.md index c962874..db24b00 100644 --- a/docs/migration-runbook.md +++ b/docs/migration-runbook.md @@ -32,7 +32,7 @@ ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero ```bash # Check last rsync to Contabo ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5' -ssh root@100.64.0.1 'ls -la /opt/recon-backup/concepts/ | tail -5' +ssh root@100.64.0.1 'ls -la /opt/backups/recon/concepts/ | tail -5' # Confirm timestamps match within 6 hours ``` @@ -403,17 +403,17 @@ print('Cleared push timestamps') ### Restore concepts after failed --reprocess-missing ```bash -# Concept backups are on Contabo at /opt/recon-backup/concepts/ +# Concept backups are on Contabo at /opt/backups/recon/concepts/ # Identify which hashes were deleted (check RECON logs) ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20' # Restore specific hash from Contabo HASH= -ssh root@100.64.0.1 "tar -cf - -C /opt/recon-backup/concepts/ $HASH" | \ +ssh root@100.64.0.1 "tar -cf - -C /opt/backups/recon/concepts/ $HASH" | \ ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/" # Restore ALL concepts (nuclear option) -ssh root@100.64.0.1 'rsync -av /opt/recon-backup/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/' +ssh root@100.64.0.1 'rsync -av /opt/backups/recon/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/' ``` ### Fully remove feature From c04ccc5011c64040405cde61783fba10463157b2 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 01:55:47 +0000 Subject: [PATCH 10/14] Fix plugin for PeerTube v8.0.2 validation PeerTube v8.0.2 requires a "bugs" field in package.json and an unregister() export in main.js. Add both to pass plugin validation. Co-Authored-By: Claude Opus 4.6 --- peertube-plugin/peertube-plugin-recon-domains/main.js | 6 +++++- peertube-plugin/peertube-plugin-recon-domains/package.json | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/peertube-plugin/peertube-plugin-recon-domains/main.js b/peertube-plugin/peertube-plugin-recon-domains/main.js index 7f7a9cb..7185c34 100644 --- a/peertube-plugin/peertube-plugin-recon-domains/main.js +++ b/peertube-plugin/peertube-plugin-recon-domains/main.js @@ -25,4 +25,8 @@ async function register ({ videoCategoryManager }) { } } -module.exports = { register } +async function unregister () { + return +} + +module.exports = { register, unregister } diff --git a/peertube-plugin/peertube-plugin-recon-domains/package.json b/peertube-plugin/peertube-plugin-recon-domains/package.json index e5788a3..5c0db7a 100644 --- a/peertube-plugin/peertube-plugin-recon-domains/package.json +++ b/peertube-plugin/peertube-plugin-recon-domains/package.json @@ -16,5 +16,6 @@ "staticDirs": {}, "css": [], "clientScripts": [], - "translations": {} + "translations": {}, + "bugs": "https://forge.echo6.co/matt/recon/issues" } From 3b37d96c4dfdb2b280e4b76eaa23f527d7117b72 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 03:59:06 +0000 Subject: [PATCH 11/14] Switch domain assignment to Qdrant as source of truth Replace on-disk concept file reads with Qdrant payload queries for domain assignment. This unlocks assignment for ~10,120 items that had missing or legacy-only concept files on disk while Qdrant held the correct 18-domain taxonomy data. Changes: - domain_assigner.py: Replace _count_concept_domains (disk) with _count_domains_from_qdrant and _count_domains_from_qdrant_batch (Qdrant scroll queries). Add _get_qdrant_client helper. Remove pass 3 defensive re-run (Qdrant reads are consistent). Add no_concepts terminal status for zero-vector documents. - embedder.py: Post-embed hook passes existing qdrant client to compute_assignment, avoiding a second connection. - recon.py: Backfill creates one QdrantClient for the batch. SQL filter includes existing needs_reprocess items. Dry-run reports no_concepts as separate bucket. --reprocess-missing removes concept-dir deletion step (no longer reads from disk). - docs/domain-assignment.md: Algorithm references Qdrant, documents no_concepts status, removes pass 3 description. Dry-run results: 20,453 assigned, 1,392 tied, 298 no_concepts, 0 needs_reprocess, 0 errors (previously 10,416 needs_reprocess). Co-Authored-By: Claude Opus 4.6 --- docs/domain-assignment.md | 57 +++++----- lib/domain_assigner.py | 234 +++++++++++++++++++++++--------------- lib/embedder.py | 2 +- recon.py | 30 +++-- 4 files changed, 187 insertions(+), 136 deletions(-) diff --git a/docs/domain-assignment.md b/docs/domain-assignment.md index 394a652..8651419 100644 --- a/docs/domain-assignment.md +++ b/docs/domain-assignment.md @@ -2,7 +2,13 @@ ## Overview -RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concepts extracted from its transcript. Assignments are pushed to PeerTube as category metadata via a custom plugin. +RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin. + +## Data Source + +Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue. + +Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data. ## Algorithm @@ -10,9 +16,9 @@ RECON's domain assignment feature maps each PeerTube video to one of 18 knowledg Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`. -1. Read all `data/concepts/{hash}/window_*.json` files -2. Count domain occurrences across all concepts, filtering to `VALID_DOMAINS` only (skips legacy domains) -3. If no valid concepts → `needs_reprocess` +1. Query Qdrant for all points with `doc_hash` matching the document +2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only +3. If zero concept vectors → `no_concepts` (terminal) 4. If single top domain → `assigned` 5. If tied → `tied_pass_1` (deferred to tiebreaker) @@ -22,20 +28,13 @@ Runs via `assign-categories --tiebreaker-pass`. For each `tied_pass_1` document: -1. Identify the tied domains +1. Identify the tied domains from Qdrant 2. Look up the document's channel (`catalogue.category`) 3. **Mega-channel rule:** If channel has >500 videos, skip tiebreaking → `tied_manual` -4. Read concept files for all other videos in the same channel +4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter) 5. Among the tied domains only, pick the one with the highest channel-wide concept count 6. If resolved → `tied_pass_2` -7. If still tied → proceed to pass 3 - -### Pass 3: Defensive Re-Run - -If pass 2 does not resolve the tie, re-read the same channel concept files and re-run identical counting logic. This catches concept-file changes that occurred mid-run (e.g. concurrent enrichment writing new windows during the batch). In steady state, pass 3 produces the same result as pass 2, but under concurrent writes it can resolve a tie that pass 2 missed. - -- If resolved → `tied_pass_2` (same status — the column tracks "channel scan resolved it") -- If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) +7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) ### Mega-Channel Rule @@ -43,14 +42,15 @@ Channels with >500 videos (like the "Transcript" catch-all with ~9,200 videos) a ## Status Values -| Status | Meaning | Next Action | -|--------|---------|-------------| -| `assigned` | Clear winner from pass 1 | Push to PeerTube | -| `tied_pass_1` | Concept tie, awaiting tiebreaker | Run `--tiebreaker-pass` | -| `tied_pass_2` | Resolved by channel tiebreaker | Push to PeerTube | -| `tied_manual` | Needs human review | Review at `/peertube/review` | -| `needs_reprocess` | Missing concepts or only legacy domains | Run `--reprocess-missing` | -| `manual_assigned` | Human override from dashboard | Already pushed | +| Status | Meaning | Terminal? | Next Action | +|--------|---------|-----------|-------------| +| `assigned` | Clear winner from pass 1 | No | Push to PeerTube | +| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` | +| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube | +| `tied_manual` | Needs human review | No | Review at `/peertube/review` | +| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) | +| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` | +| `manual_assigned` | Human override from dashboard | No | Already pushed | **"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}` @@ -72,7 +72,7 @@ python3 recon.py assign-categories --tiebreaker-pass # Push all assigned-but-unpushed categories to PeerTube API python3 recon.py assign-categories --push-pending -# Re-queue items with missing/legacy concepts +# Re-queue items with transient failures for full re-processing python3 recon.py assign-categories --reprocess-missing # Limit processing count @@ -87,25 +87,26 @@ The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items - Dropdown to select the correct domain - Assign button (pushes to PeerTube immediately) -Items with `needs_reprocess` status do NOT appear in the review UI — they are handled exclusively via the CLI `--reprocess-missing` command. +Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI. ## Pipeline Integration New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`: -1. Runs `compute_assignment()` (pass 1 only) +1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client 2. If clear winner: pushes category to PeerTube immediately 3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run -4. On error: logs warning and continues — does not block the pipeline +4. If no concepts: marks as `no_concepts` (terminal) +5. On Qdrant error: logs warning and continues — does not block the pipeline ## Source Files | File | Purpose | |------|---------| | `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS | -| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` | +| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers | | `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` | -| `lib/embedder.py` | Post-embed hook | +| `lib/embedder.py` | Post-embed hook (passes qdrant client) | | `lib/status.py` | DB columns + helper methods | | `lib/api.py` | Dashboard review routes | | `recon.py` | CLI `assign-categories` command | diff --git a/lib/domain_assigner.py b/lib/domain_assigner.py index c00894f..a6f0f47 100644 --- a/lib/domain_assigner.py +++ b/lib/domain_assigner.py @@ -1,24 +1,30 @@ """ RECON Domain Assigner -Computes per-video domain assignments from concept extraction results. +Computes per-video domain assignments from Qdrant vector payloads. Two functions, two execution modes: compute_assignment() — pass 1, inline from post-embed hook run_tiebreaker_pass() — batch, resolves ties via channel concept scan +Data source: Qdrant `domain` payload field on concept vectors. +Previously read on-disk concept JSON files; migrated to Qdrant as +single source of truth (2026-04-28). + Status values written to documents.recon_domain_status: assigned — clear winner from pass 1 concept count tied_pass_1 — concept tie, awaiting channel tiebreaker tied_pass_2 — resolved by channel tiebreaker tied_manual — needs human review (dashboard) - needs_reprocess — missing concepts or only legacy domains + no_concepts — terminal, zero concept vectors in Qdrant + needs_reprocess — transient failure (Qdrant error, etc.) manual_assigned — human override from dashboard """ -import json -import os from collections import Counter +from qdrant_client import QdrantClient +from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny + from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP from .utils import setup_logging @@ -28,40 +34,51 @@ logger = setup_logging('recon.domain_assigner') MEGA_CHANNEL_THRESHOLD = 500 -def _count_concept_domains(concepts_dir, file_hash): - """Read concept files and count valid domain occurrences. +def _get_qdrant_client(config): + """Create a QdrantClient from RECON config. + + Callers should create one client and pass it through rather than + calling this repeatedly. + """ + logger.debug("Creating new QdrantClient (caller did not pass one)") + return QdrantClient( + host=config['vector_db']['host'], + port=config['vector_db']['port'], + timeout=60 + ) + + +def _count_domains_from_qdrant(qdrant, collection, doc_hash): + """Count valid domain occurrences for a single document from Qdrant. + + Scrolls all points matching doc_hash and counts domain values. Args: - concepts_dir: Base concepts directory (e.g. /opt/recon/data/concepts) - file_hash: Document hash + qdrant: QdrantClient instance + collection: Qdrant collection name + doc_hash: Document hash to query Returns: - Counter of {domain_name: count} for valid domains only, - or None if no concept directory exists. + Counter of {domain_name: count} for valid domains. + Empty Counter if no points found (never None). """ - doc_concepts_dir = os.path.join(concepts_dir, file_hash) - if not os.path.isdir(doc_concepts_dir): - return None - domain_counter = Counter() + offset = None - for fname in os.listdir(doc_concepts_dir): - if not fname.startswith('window_') or not fname.endswith('.json'): - continue - fpath = os.path.join(doc_concepts_dir, fname) - try: - with open(fpath, 'r') as f: - concepts = json.load(f) - except (json.JSONDecodeError, OSError): - continue + while True: + results, next_offset = qdrant.scroll( + collection_name=collection, + scroll_filter=Filter(must=[ + FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash)) + ]), + with_payload=["domain"], + with_vectors=False, + limit=200, + offset=offset, + ) - if not isinstance(concepts, list): - continue - - for concept in concepts: - if not isinstance(concept, dict): - continue - dom = concept.get('domain') + for point in results: + dom = point.payload.get('domain') if isinstance(dom, str) and dom in VALID_DOMAINS: domain_counter[dom] += 1 elif isinstance(dom, list): @@ -69,30 +86,95 @@ def _count_concept_domains(concepts_dir, file_hash): if isinstance(d, str) and d in VALID_DOMAINS: domain_counter[d] += 1 + if next_offset is None: + break + offset = next_offset + return domain_counter -def compute_assignment(file_hash, db, config): +def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes): + """Count valid domain occurrences across multiple documents from Qdrant. + + Single scroll with MatchAny filter, with offset pagination for large + result sets. + + Args: + qdrant: QdrantClient instance + collection: Qdrant collection name + doc_hashes: List of document hashes to query + + Returns: + Counter of {domain_name: count} aggregated across all matching points. + """ + if not doc_hashes: + return Counter() + + domain_counter = Counter() + offset = None + + while True: + results, next_offset = qdrant.scroll( + collection_name=collection, + scroll_filter=Filter(must=[ + FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes)) + ]), + with_payload=["domain"], + with_vectors=False, + limit=10000, + offset=offset, + ) + + for point in results: + dom = point.payload.get('domain') + if isinstance(dom, str) and dom in VALID_DOMAINS: + domain_counter[dom] += 1 + elif isinstance(dom, list): + for d in dom: + if isinstance(d, str) and d in VALID_DOMAINS: + domain_counter[d] += 1 + + if next_offset is None: + break + offset = next_offset + + return domain_counter + + +def compute_assignment(file_hash, db, config, qdrant=None): """Compute domain assignment for a single document (pass 1). - Counts domain occurrences across all concepts. If a single domain - wins, assigns it. If tied, defers to batch tiebreaker. + Counts domain occurrences across all concept vectors in Qdrant. + If a single domain wins, assigns it. If tied, defers to batch + tiebreaker. Args: file_hash: Document hash db: StatusDB instance config: RECON config dict + qdrant: Optional QdrantClient (created if not provided) Returns: (domain, status) tuple where domain is a string or None, - and status is one of: 'assigned', 'tied_pass_1', 'needs_reprocess' + and status is one of: 'assigned', 'tied_pass_1', 'no_concepts', + 'needs_reprocess' """ - concepts_dir = config['paths']['concepts'] - domain_counter = _count_concept_domains(concepts_dir, file_hash) + owns_client = False + if qdrant is None: + qdrant = _get_qdrant_client(config) + owns_client = True - if domain_counter is None or len(domain_counter) == 0: + collection = config['vector_db']['collection'] + + try: + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) + except Exception as e: + logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}") return (None, 'needs_reprocess') + if len(domain_counter) == 0: + return (None, 'no_concepts') + top = domain_counter.most_common(2) top_domain = top[0][0] top_count = top[0][1] @@ -104,9 +186,9 @@ def compute_assignment(file_hash, db, config): return (None, 'tied_pass_1') -def _get_tied_domains(concepts_dir, file_hash): +def _get_tied_domains(qdrant, collection, file_hash): """Get the set of domains tied for first place in a document's concepts.""" - domain_counter = _count_concept_domains(concepts_dir, file_hash) + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) if not domain_counter: return [] @@ -150,32 +232,32 @@ def _channel_video_count(db, channel_name): return row['cnt'] if row else 0 -def run_tiebreaker_pass(db, config): - """Resolve tied domain assignments using channel-level concept analysis. +def run_tiebreaker_pass(db, config, qdrant=None): + """Resolve tied domain assignments using channel-level Qdrant analysis. Processes all documents where recon_domain_status = 'tied_pass_1'. - Pass 2: For each tied document, reads concept files from all other - videos in the same channel and picks the tied domain with the highest - channel-wide count. + For each tied document, queries Qdrant for domain counts from all + other videos in the same channel and picks the tied domain with the + highest channel-wide count. - Pass 3 (defensive re-run): Re-reads the same channel concept files a - second time with identical logic. This catches concept-file changes - that occurred mid-run (e.g. concurrent enrichment writing new windows). - In steady state pass 3 produces the same result as pass 2, but under - concurrent writes it can resolve a tie that pass 2 missed. - - Mega-channels (>500 videos) skip both passes and go straight to + Mega-channels (>500 videos) skip tiebreaking and go straight to 'tied_manual' for dashboard review. Args: db: StatusDB instance config: RECON config dict + qdrant: Optional QdrantClient (created if not provided) Returns: Dict with counts: resolved, manual, skipped, errors """ - concepts_dir = config['paths']['concepts'] + owns_client = False + if qdrant is None: + qdrant = _get_qdrant_client(config) + owns_client = True + + collection = config['vector_db']['collection'] tied_items = db.get_items_by_domain_status('tied_pass_1') stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)} @@ -189,9 +271,9 @@ def run_tiebreaker_pass(db, config): channel = item.get('category', '') try: - tied_domains = _get_tied_domains(concepts_dir, file_hash) + tied_domains = _get_tied_domains(qdrant, collection, file_hash) if not tied_domains: - db.set_domain_assignment(file_hash, None, 'needs_reprocess') + db.set_domain_assignment(file_hash, None, 'no_concepts') stats['skipped'] += 1 continue @@ -215,12 +297,9 @@ def run_tiebreaker_pass(db, config): # Channel tiebreaker: count domains across all other videos in channel other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash) - channel_domain_counts = Counter() - - for other_hash in other_hashes: - other_counts = _count_concept_domains(concepts_dir, other_hash) - if other_counts: - channel_domain_counts.update(other_counts) + channel_domain_counts = _count_domains_from_qdrant_batch( + qdrant, collection, other_hashes + ) # Among tied domains only, pick highest channel-wide count best_domain = None @@ -231,48 +310,21 @@ def run_tiebreaker_pass(db, config): best_count = c best_domain = dom - # Pass 2: check if channel tiebreaker resolved it + # Check if channel tiebreaker resolved it tied_at_channel = [d for d in tied_domains if channel_domain_counts.get(d, 0) == best_count] if len(tied_at_channel) == 1: db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2') stats['resolved'] += 1 - logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (pass 2 channel tiebreaker)") + logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)") continue - # Pass 3: defensive re-run — re-count channel concepts to catch - # concept-file changes that occurred mid-run. Identical logic to - # pass 2; resolves races where files were written between the - # two reads. - channel_domain_counts_p3 = Counter() - for other_hash in other_hashes: - other_counts = _count_concept_domains(concepts_dir, other_hash) - if other_counts: - channel_domain_counts_p3.update(other_counts) - - best_domain_p3 = None - best_count_p3 = -1 - for dom in tied_domains: - c = channel_domain_counts_p3.get(dom, 0) - if c > best_count_p3: - best_count_p3 = c - best_domain_p3 = dom - - tied_at_p3 = [d for d in tied_domains - if channel_domain_counts_p3.get(d, 0) == best_count_p3] - - if len(tied_at_p3) == 1: - db.set_domain_assignment(file_hash, best_domain_p3, 'tied_pass_2') - stats['resolved'] += 1 - logger.debug(f" {file_hash[:12]}: resolved → {best_domain_p3} (pass 3 defensive re-run)") - continue - - # Still tied after pass 3 — mark for manual review + # Still tied after channel scan — mark for manual review fallback = sorted(tied_domains)[0] db.set_domain_assignment(file_hash, fallback, 'tied_manual') stats['manual'] += 1 - logger.debug(f" {file_hash[:12]}: still tied after pass 3, → tied_manual") + logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual") except Exception as e: logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}") diff --git a/lib/embedder.py b/lib/embedder.py index b1f59ca..da6a952 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -411,7 +411,7 @@ def embed_single(file_hash, db, config): from .domain_assigner import compute_assignment from .peertube_writer import push_category, extract_uuid from .recon_domains import DOMAIN_CATEGORY_MAP - domain, status = compute_assignment(file_hash, db, config) + domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant) db.set_domain_assignment(file_hash, domain, status) if domain and status == 'assigned': cat_id = DOMAIN_CATEGORY_MAP[domain] diff --git a/recon.py b/recon.py index cad106a..81c8a81 100755 --- a/recon.py +++ b/recon.py @@ -865,6 +865,7 @@ def cmd_ingest(args): def cmd_assign_categories(args): """Assign RECON domains to PeerTube videos and push categories.""" + from qdrant_client import QdrantClient from lib.domain_assigner import compute_assignment, run_tiebreaker_pass from lib.peertube_writer import push_pending, extract_uuid from lib.recon_domains import DOMAIN_CATEGORY_MAP @@ -876,11 +877,13 @@ def cmd_assign_categories(args): if args.backfill: # Pass 1: assign domains to all complete stream docs with no assignment + # or that previously got needs_reprocess conn = db._get_conn() q = """SELECT d.hash FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.status = 'complete' - AND d.recon_domain IS NULL + AND (d.recon_domain IS NULL + OR d.recon_domain_status = 'needs_reprocess') AND c.source = 'stream.echo6.co' ORDER BY d.discovered_at""" if limit: @@ -895,10 +898,17 @@ def cmd_assign_categories(args): print(f"Backfill: processing {len(hashes)} documents" + (" [DRY RUN]" if dry_run else "")) - stats = {'assigned': 0, 'tied_pass_1': 0, 'needs_reprocess': 0, 'errors': 0} + # Create one Qdrant client for the entire backfill + qdrant = QdrantClient( + host=config['vector_db']['host'], + port=config['vector_db']['port'], + timeout=60 + ) + + stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0} for i, file_hash in enumerate(hashes): try: - domain, status = compute_assignment(file_hash, db, config) + domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant) stats[status] = stats.get(status, 0) + 1 if not dry_run: db.set_domain_assignment(file_hash, domain, status) @@ -946,22 +956,10 @@ def cmd_assign_categories(args): for item in items: file_hash = item['hash'] if dry_run: - concepts_dir = os.path.join(config['paths']['concepts'], file_hash) - has_concepts = os.path.isdir(concepts_dir) - concept_count = len(os.listdir(concepts_dir)) if has_concepts else 0 - detail = f"DELETE {concept_count} concept files" if has_concepts else "no concept dir" - print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')} ({detail})") + print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')}") requeued += 1 continue - # Remove stale concept files - import shutil - concepts_dir = os.path.join(config['paths']['concepts'], file_hash) - if os.path.isdir(concepts_dir): - logger.info(f" Deleting concept dir: {concepts_dir} " - f"({len(os.listdir(concepts_dir))} files, hash={file_hash})") - shutil.rmtree(concepts_dir) - # Reset document status to allow re-processing conn = db._get_conn() conn.execute( From 043119fbaaaf13ecb9e04a34624f91f3ea16f97c Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 03:59:13 +0000 Subject: [PATCH 12/14] Fix: review UI domain breakdown from Qdrant The review items API endpoint still read concept domain counts from on-disk JSON files, which would show empty breakdowns for items with missing concept directories. Replace disk reads with the same _count_domains_from_qdrant function used by domain assignment. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 31 +++++++------------------------ 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/lib/api.py b/lib/api.py index f7d9b10..2fecafc 100644 --- a/lib/api.py +++ b/lib/api.py @@ -395,37 +395,20 @@ def api_peertube_review_stats(): @app.route('/api/peertube/review/items') def api_peertube_review_items(): - import json as _json - from .recon_domains import VALID_DOMAINS + from .domain_assigner import _count_domains_from_qdrant, _get_qdrant_client db = StatusDB() config = get_config() items = db.get_items_by_domain_status('tied_manual', limit=200) + qdrant = _get_qdrant_client(config) + collection = config['vector_db']['collection'] + result = [] - concepts_dir = config['paths']['concepts'] for item in items: file_hash = item['hash'] - # Count domains from concept files - top_domains = [] - doc_concepts_dir = os.path.join(concepts_dir, file_hash) - if os.path.isdir(doc_concepts_dir): - from collections import Counter - domain_counter = Counter() - for fname in os.listdir(doc_concepts_dir): - if not fname.startswith('window_') or not fname.endswith('.json'): - continue - try: - with open(os.path.join(doc_concepts_dir, fname)) as f: - concepts = _json.load(f) - for c in concepts: - if isinstance(c, dict): - dom = c.get('domain') - if isinstance(dom, str) and dom in VALID_DOMAINS: - domain_counter[dom] += 1 - except Exception: - continue - top_domains = [{'domain': d, 'count': cnt} - for d, cnt in domain_counter.most_common(5)] + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) + top_domains = [{'domain': d, 'count': cnt} + for d, cnt in domain_counter.most_common(5)] result.append({ 'hash': file_hash, From d8196e60c7616f1da1174f7a0516e8cfdbbf3cb7 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 03:59:19 +0000 Subject: [PATCH 13/14] Track domain-assignment follow-up items Three nice-to-haves from the Qdrant migration pre-commit review: list-type domain handling, tiebreaker client threading, debug log caller identification. All zero production impact. Co-Authored-By: Claude Opus 4.6 --- docs/backlog.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 docs/backlog.md diff --git a/docs/backlog.md b/docs/backlog.md new file mode 100644 index 0000000..286e086 --- /dev/null +++ b/docs/backlog.md @@ -0,0 +1,14 @@ +# RECON Backlog — Technical Debt + +## Qdrant Migration Follow-ups (2026-04-28) + +From the Qdrant source-of-truth migration pre-commit review. All nice-to-haves, zero production impact. + +1. **domain_assigner.py — list-type domain handling** + `_count_domains_from_qdrant` counts every element in a list-type `domain` payload. The embedder's `_validate_classification` normalizes lists to `payload['domain'] = valid[0]` before upsert, so multi-element lists never exist in production Qdrant data. For spec consistency, could match the embedder's first-only normalization. Zero impact since the embedder guarantees bare strings. + +2. **recon.py --tiebreaker-pass — Qdrant client threading** + The `--tiebreaker-pass` CLI branch calls `run_tiebreaker_pass(db, config)` without creating and passing a `QdrantClient`. The function handles this via lazy construction in `_get_qdrant_client`, which creates one client for the entire batch. Could thread a client from the CLI entry point for consistency with `--backfill`. Functionally fine as-is. + +3. **_get_qdrant_client — debug log caller identification** + The debug log `"Creating new QdrantClient (caller did not pass one)"` doesn't identify which function triggered the lazy construction. Could include caller info (e.g., `inspect.stack()[1].function`) for easier debug session triage. Low priority since it only fires for lazy construction paths. From 299be21f4291cc196b99b357f51acf137bf8c4fc Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 04:24:39 +0000 Subject: [PATCH 14/14] Replace mega-channel size rule with explicit skip list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The >500-video threshold was too aggressive — it skipped tiebreaking for legitimate large channels (1a-auto, forgotten-weapons, etc.) where channel context correctly resolves ties. Replace with an explicit MEGA_CHANNEL_SKIP_LIST in recon_domains.py. Only known non-topical catch-alls (currently just "Transcript") skip the tiebreaker. Removed _channel_video_count() helper and MEGA_CHANNEL_THRESHOLD constant (no longer used). Co-Authored-By: Claude Opus 4.6 --- docs/domain-assignment.md | 11 ++++++++--- lib/domain_assigner.py | 29 ++++++----------------------- lib/recon_domains.py | 12 ++++++++++++ 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/docs/domain-assignment.md b/docs/domain-assignment.md index 8651419..f1f122f 100644 --- a/docs/domain-assignment.md +++ b/docs/domain-assignment.md @@ -30,15 +30,20 @@ For each `tied_pass_1` document: 1. Identify the tied domains from Qdrant 2. Look up the document's channel (`catalogue.category`) -3. **Mega-channel rule:** If channel has >500 videos, skip tiebreaking → `tied_manual` +3. **Skip-list check:** If channel is in `MEGA_CHANNEL_SKIP_LIST` (known non-topical catch-alls), skip tiebreaking → `tied_manual` 4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter) 5. Among the tied domains only, pick the one with the highest channel-wide concept count 6. If resolved → `tied_pass_2` 7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) -### Mega-Channel Rule +### Channel Skip List -Channels with >500 videos (like the "Transcript" catch-all with ~9,200 videos) are not topically coherent. Scanning their concepts produces meaningless aggregate data. These go straight to `tied_manual` for dashboard review. +Certain channels are known non-topical catch-alls where channel-wide concept aggregation produces meaningless noise. These are listed explicitly in `MEGA_CHANNEL_SKIP_LIST` (defined in `lib/recon_domains.py`) and skip tiebreaking entirely — their tied items go straight to `tied_manual` for dashboard review. + +Current skip list: +- `Transcript` — Legacy catch-all (~9,200 videos), no topical coherence + +This is intentionally an explicit list, not a size threshold. Legitimate large channels (e.g., 1a-auto, forgotten-weapons) run the tiebreaker normally because their content is topically coherent. Adding a channel to the skip list requires a code change and a documented reason. ## Status Values diff --git a/lib/domain_assigner.py b/lib/domain_assigner.py index a6f0f47..cbf69a1 100644 --- a/lib/domain_assigner.py +++ b/lib/domain_assigner.py @@ -25,13 +25,11 @@ from collections import Counter from qdrant_client import QdrantClient from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny -from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP +from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP, MEGA_CHANNEL_SKIP_LIST from .utils import setup_logging logger = setup_logging('recon.domain_assigner') -# Channels with more than this many videos skip channel tiebreaking entirely -MEGA_CHANNEL_THRESHOLD = 500 def _get_qdrant_client(config): @@ -222,14 +220,6 @@ def _channel_video_hashes(db, channel_name, exclude_hash=None): return hashes -def _channel_video_count(db, channel_name): - """Count total videos in a channel.""" - conn = db._get_conn() - row = conn.execute( - "SELECT COUNT(*) as cnt FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'", - (channel_name,) - ).fetchone() - return row['cnt'] if row else 0 def run_tiebreaker_pass(db, config, qdrant=None): @@ -241,8 +231,8 @@ def run_tiebreaker_pass(db, config, qdrant=None): other videos in the same channel and picks the tied domain with the highest channel-wide count. - Mega-channels (>500 videos) skip tiebreaking and go straight to - 'tied_manual' for dashboard review. + Channels in MEGA_CHANNEL_SKIP_LIST (known non-topical catch-alls) skip + tiebreaking and go straight to 'tied_manual' for dashboard review. Args: db: StatusDB instance @@ -263,9 +253,6 @@ def run_tiebreaker_pass(db, config, qdrant=None): stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)} logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve") - # Cache channel sizes to avoid repeated queries - channel_size_cache = {} - for item in tied_items: file_hash = item['hash'] channel = item.get('category', '') @@ -283,16 +270,12 @@ def run_tiebreaker_pass(db, config, qdrant=None): stats['resolved'] += 1 continue - # Check mega-channel rule - if channel not in channel_size_cache: - channel_size_cache[channel] = _channel_video_count(db, channel) - - if channel_size_cache[channel] > MEGA_CHANNEL_THRESHOLD: + # Skip-list check: known non-topical catch-all channels + if channel in MEGA_CHANNEL_SKIP_LIST: fallback = sorted(tied_domains)[0] db.set_domain_assignment(file_hash, fallback, 'tied_manual') stats['manual'] += 1 - logger.debug(f" {file_hash[:12]}: mega-channel '{channel}' " - f"({channel_size_cache[channel]} videos), → tied_manual") + logger.debug(f" {file_hash[:12]}: skip-list channel '{channel}' → tied_manual") continue # Channel tiebreaker: count domains across all other videos in channel diff --git a/lib/recon_domains.py b/lib/recon_domains.py index 350a05c..b1c8b3c 100644 --- a/lib/recon_domains.py +++ b/lib/recon_domains.py @@ -32,3 +32,15 @@ DOMAIN_CATEGORY_MAP = { VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys()) CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()} + +# Channels whose tiebreaker is skipped because their content is non-topical +# (catch-alls, miscellany dumps, etc.). Items in these channels with tied +# domain counts go straight to tied_manual without channel-context tiebreaker. +# +# This is intentionally a hardcoded explicit list, not a size threshold. +# Adding a channel here requires an explicit decision — only add channels +# that are genuinely non-topical catch-alls where channel-wide concept +# aggregation would produce meaningless noise. +MEGA_CHANNEL_SKIP_LIST = { + 'Transcript', # Legacy catch-all, ~9,200 videos, no topical coherence +}