diff --git a/config.yaml b/config.yaml index a2709b0..0bed648 100644 --- a/config.yaml +++ b/config.yaml @@ -408,9 +408,14 @@ service: peertube: api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx) + api_url: http://192.168.1.170:9000 # Direct PeerTube API (bypasses nginx, for writer) + host_header: stream.echo6.co # Host header for PeerTube API requests + username: root # PeerTube admin username + password_env: PEERTUBE_PASSWORD # Env var holding PeerTube admin password public_url: https://stream.echo6.co # Public URL for video links fetch_timeout: 30 # HTTP timeout for API/VTT requests rate_limit_delay: 0.5 # Delay between video ingestions (seconds) + writer_rate_limit: 0.1 # Delay between category push API calls (seconds) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) scraper: diff --git a/docs/backlog.md b/docs/backlog.md new file mode 100644 index 0000000..286e086 --- /dev/null +++ b/docs/backlog.md @@ -0,0 +1,14 @@ +# RECON Backlog — Technical Debt + +## Qdrant Migration Follow-ups (2026-04-28) + +From the Qdrant source-of-truth migration pre-commit review. All nice-to-haves, zero production impact. + +1. **domain_assigner.py — list-type domain handling** + `_count_domains_from_qdrant` counts every element in a list-type `domain` payload. The embedder's `_validate_classification` normalizes lists to `payload['domain'] = valid[0]` before upsert, so multi-element lists never exist in production Qdrant data. For spec consistency, could match the embedder's first-only normalization. Zero impact since the embedder guarantees bare strings. + +2. **recon.py --tiebreaker-pass — Qdrant client threading** + The `--tiebreaker-pass` CLI branch calls `run_tiebreaker_pass(db, config)` without creating and passing a `QdrantClient`. The function handles this via lazy construction in `_get_qdrant_client`, which creates one client for the entire batch. Could thread a client from the CLI entry point for consistency with `--backfill`. Functionally fine as-is. + +3. **_get_qdrant_client — debug log caller identification** + The debug log `"Creating new QdrantClient (caller did not pass one)"` doesn't identify which function triggered the lazy construction. Could include caller info (e.g., `inspect.stack()[1].function`) for easier debug session triage. Low priority since it only fires for lazy construction paths. diff --git a/docs/deploy-blast-radius.md b/docs/deploy-blast-radius.md new file mode 100644 index 0000000..fd6bc79 --- /dev/null +++ b/docs/deploy-blast-radius.md @@ -0,0 +1,20 @@ +# Deploy Blast Radius Reference + +Quick-reference for operators during deployment of domain categorization. + +| Step | What Changes | Worst Case (Partial Failure) | Detection Signal | Rollback | Est. Rollback Time | +|------|-------------|------------------------------|-----------------|----------|-------------------| +| **Plugin install** | PeerTube plugin dir on CT 110 | PeerTube fails to start | `systemctl status peertube` shows failed | Move plugin dir to `.disabled`, restart PeerTube | 2 min | +| **PeerTube restart** | PeerTube service state | PeerTube crash loop | `journalctl -u peertube` shows repeated failures | Disable plugin, restart | 2 min | +| **Schema migration** (RECON restart) | 4 new nullable columns + 1 index in recon.db | Migration SQL error leaves partial columns | Python PRAGMA check fails | DROP COLUMN for each added column | 5 min | +| **--backfill** | `recon_domain` + `recon_domain_status` on ~22K rows | Wrong domain assignments | Spot-check 20 random docs | `UPDATE documents SET recon_domain = NULL, recon_domain_status = NULL ...` | 1 min | +| **--tiebreaker-pass** | ~1,100 rows: `tied_pass_1` to `tied_pass_2`/`tied_manual` | Wrong tiebreaker resolution | Spot-check 5 resolved items | Reset `tied_pass_2`/`tied_manual` back to `tied_pass_1` | 1 min | +| **--push-pending** | PeerTube `video.category` column on ~22K rows | Wrong categories visible to all PeerTube users | PeerTube UI shows wrong labels | `UPDATE video SET category = NULL WHERE category >= 100` + clear push timestamps | 2 min | +| **--reprocess-missing** | **DELETES** concept directories (irreversible locally) | Concepts deleted, re-enrichment fails (Gemini API down, quota hit) | `recon.py status` shows stuck `queued` items, concept dirs missing | Restore from Contabo backup (`rsync`) | 10-60 min depending on count | + +## Risk Tiers + +- **Low risk (read-only):** `--dry-run` on any command, status display +- **Medium risk (DB-only, reversible):** `--backfill`, `--tiebreaker-pass`, schema migration +- **High risk (external writes):** `--push-pending` (writes to PeerTube, visible to users) +- **Critical risk (destructive):** `--reprocess-missing` (deletes concept files, $130+ Gemini work at risk) diff --git a/docs/domain-assignment.md b/docs/domain-assignment.md new file mode 100644 index 0000000..f1f122f --- /dev/null +++ b/docs/domain-assignment.md @@ -0,0 +1,117 @@ +# Domain Assignment — Algorithm & Operations Guide + +## Overview + +RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin. + +## Data Source + +Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue. + +Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data. + +## Algorithm + +### Pass 1: Concept Domain Count (inline, per-document) + +Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`. + +1. Query Qdrant for all points with `doc_hash` matching the document +2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only +3. If zero concept vectors → `no_concepts` (terminal) +4. If single top domain → `assigned` +5. If tied → `tied_pass_1` (deferred to tiebreaker) + +### Pass 2: Channel Tiebreaker (batch) + +Runs via `assign-categories --tiebreaker-pass`. + +For each `tied_pass_1` document: + +1. Identify the tied domains from Qdrant +2. Look up the document's channel (`catalogue.category`) +3. **Skip-list check:** If channel is in `MEGA_CHANNEL_SKIP_LIST` (known non-topical catch-alls), skip tiebreaking → `tied_manual` +4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter) +5. Among the tied domains only, pick the one with the highest channel-wide concept count +6. If resolved → `tied_pass_2` +7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review) + +### Channel Skip List + +Certain channels are known non-topical catch-alls where channel-wide concept aggregation produces meaningless noise. These are listed explicitly in `MEGA_CHANNEL_SKIP_LIST` (defined in `lib/recon_domains.py`) and skip tiebreaking entirely — their tied items go straight to `tied_manual` for dashboard review. + +Current skip list: +- `Transcript` — Legacy catch-all (~9,200 videos), no topical coherence + +This is intentionally an explicit list, not a size threshold. Legitimate large channels (e.g., 1a-auto, forgotten-weapons) run the tiebreaker normally because their content is topically coherent. Adding a channel to the skip list requires a code change and a documented reason. + +## Status Values + +| Status | Meaning | Terminal? | Next Action | +|--------|---------|-----------|-------------| +| `assigned` | Clear winner from pass 1 | No | Push to PeerTube | +| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` | +| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube | +| `tied_manual` | Needs human review | No | Review at `/peertube/review` | +| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) | +| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` | +| `manual_assigned` | Human override from dashboard | No | Already pushed | + +**"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}` + +## CLI Commands + +```bash +cd /opt/recon && source venv/bin/activate + +# Show current assignment status +python3 recon.py assign-categories + +# Pass 1: backfill all unassigned complete stream documents +python3 recon.py assign-categories --backfill --dry-run +python3 recon.py assign-categories --backfill + +# Pass 2: resolve ties via channel analysis +python3 recon.py assign-categories --tiebreaker-pass + +# Push all assigned-but-unpushed categories to PeerTube API +python3 recon.py assign-categories --push-pending + +# Re-queue items with transient failures for full re-processing +python3 recon.py assign-categories --reprocess-missing + +# Limit processing count +python3 recon.py assign-categories --backfill --limit 100 +``` + +## Dashboard Review + +The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items. Each row displays: +- Video title and channel +- Top concept domains with counts +- Dropdown to select the correct domain +- Assign button (pushes to PeerTube immediately) + +Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI. + +## Pipeline Integration + +New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`: + +1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client +2. If clear winner: pushes category to PeerTube immediately +3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run +4. If no concepts: marks as `no_concepts` (terminal) +5. On Qdrant error: logs warning and continues — does not block the pipeline + +## Source Files + +| File | Purpose | +|------|---------| +| `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS | +| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers | +| `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` | +| `lib/embedder.py` | Post-embed hook (passes qdrant client) | +| `lib/status.py` | DB columns + helper methods | +| `lib/api.py` | Dashboard review routes | +| `recon.py` | CLI `assign-categories` command | diff --git a/docs/migration-runbook.md b/docs/migration-runbook.md new file mode 100644 index 0000000..db24b00 --- /dev/null +++ b/docs/migration-runbook.md @@ -0,0 +1,426 @@ +# Domain Categorization Migration Runbook + +Step-by-step procedure to deploy the PeerTube domain categorization feature. + +## Prerequisites + +- Feature branch `feature/peertube-domain-categorization` merged to master (or checked out) +- SSH access to recon-vm (192.168.1.130) and CT 110 (192.168.1.170) +- PeerTube admin credentials (`root` / password in `.env`) + +## Pre-Deploy Backups + +These backups MUST be completed before any state-changing step. + +### 1. Snapshot RECON database + +```bash +ssh zvx@192.168.1.130 +cp /opt/recon/data/recon.db "/opt/recon/data/recon.db.pre-domain-feature.$(date +%Y%m%d_%H%M%S).bak" +ls -la /opt/recon/data/recon.db.pre-domain-feature.*.bak # Confirm +``` + +### 2. Snapshot PeerTube PostgreSQL + +```bash +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres pg_dump peertube_prod' > "/tmp/peertube_prod.pre-domain-feature.$(date +%Y%m%d_%H%M%S).sql" +ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero +``` + +### 3. Verify off-site concept backup + +```bash +# Check last rsync to Contabo +ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5' +ssh root@100.64.0.1 'ls -la /opt/backups/recon/concepts/ | tail -5' +# Confirm timestamps match within 6 hours +``` + +### 4. Confirm RECON service state + +```bash +ssh zvx@192.168.1.130 'sudo systemctl status recon --no-pager' +# Note: do NOT restart until Step 3. If currently running, confirm no active +# enrichment/embedding workers before proceeding. +``` + +--- + +## Step 1: Deploy PeerTube Plugin to CT 110 + +```bash +# From recon-vm, copy plugin to CT 110 +ssh zvx@192.168.1.130 +cd /opt/recon/peertube-plugin/ +scp -r peertube-plugin-recon-domains root@192.168.1.241:'pct exec 110 -- mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains' + +# Or via the Proxmox host: +ssh root@192.168.1.243 # media host +pct exec 110 -- bash -c 'mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains' +# Copy files into the container (scp from recon-vm or use pct push) +``` + +Alternative: Install via PeerTube admin UI (Admin > Plugins > Install). + +```bash +# Restart PeerTube to register plugin +ssh root@192.168.1.243 'pct exec 110 -- systemctl restart peertube' +``` + +**STOP.** Check PeerTube logs for plugin registration errors: + +```bash +ssh root@192.168.1.243 'pct exec 110 -- journalctl -u peertube --since=-5min' | grep -i plugin +``` + +If any errors reference `peertube-plugin-recon-domains`, do NOT proceed. Diagnose +and fix the plugin before continuing. See Rollback: "Plugin install fails" below. + +## Step 2: Verify Plugin + +```bash +# From recon-vm +curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | grep -E '"1[0-1][0-9]"' +``` + +Should show all 18 categories (IDs 100-117). If any are missing, do NOT proceed. + +Run the parity test: +```bash +cd /opt/recon && source venv/bin/activate +python3 tests/test_constants_parity.py +``` + +## Step 3: Apply Schema Migration + +**Requires RECON restart (ask user first).** + +```bash +sudo systemctl restart recon +``` + +The migration runs automatically on startup via `StatusDB._init_db()`. Verify: + +```bash +cd /opt/recon && source venv/bin/activate +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +cols = [r[1] for r in conn.execute('PRAGMA table_info(documents)').fetchall()] +for c in ['recon_domain', 'recon_domain_status', 'recon_domain_assigned_at', 'peertube_category_pushed_at']: + assert c in cols, f'Missing: {c}' + print(f' {c}: OK') + +# Verify index exists +indexes = [r[1] for r in conn.execute('PRAGMA index_list(documents)').fetchall()] +assert 'idx_documents_recon_domain_status' in indexes, 'Missing index' +print(' idx_documents_recon_domain_status: OK') + +# Verify no columns were dropped +expected_existing = ['hash', 'status', 'filename', 'discovered_at'] +for c in expected_existing: + assert c in cols, f'ALERT: existing column {c} is missing!' +print('Migration verified — all columns present, no existing columns dropped') +" +``` + +## Step 4: Run Backfill + +```bash +cd /opt/recon && source venv/bin/activate + +# Dry run first +python3 recon.py assign-categories --backfill --dry-run +``` + +**STOP.** Verify dry-run output distribution roughly matches investigation benchmarks: +- ~94.8% `assigned` (clear winners) +- ~5.2% `tied_pass_1` (ties) +- ~19.5% `needs_reprocess` (missing/legacy concepts) + +If the distribution deviates more than 5 percentage points from these benchmarks, +halt and investigate. Do not proceed until the deviation is explained. + +```bash +# Execute pass 1 +python3 recon.py assign-categories --backfill +``` + +**STOP.** Spot-check 20 random assigned documents: + +```bash +python3 -c " +from lib.status import StatusDB +db = StatusDB() +rows = db._get_conn().execute( + \"SELECT d.hash, d.recon_domain FROM documents d WHERE d.recon_domain_status = 'assigned' ORDER BY RANDOM() LIMIT 20\" +).fetchall() +for r in rows: + print(r['hash'][:12], r['recon_domain']) +" +``` + +For each, visually verify against concept files: `ls data/concepts/{hash}/` and +spot-check one `window_*.json` to confirm the assigned domain is plausible. +Halt if any are wildly wrong. See Rollback: "Clear wrong backfill assignments" below. + +```bash +# Run tiebreaker pass +python3 recon.py assign-categories --tiebreaker-pass +``` + +**STOP.** Verify tiebreaker results: + +```bash +python3 -c " +from lib.status import StatusDB +db = StatusDB() +c = db.get_domain_status_counts() +print('Status breakdown:', c) +print() +print('tied_pass_2 (resolved):', c.get('tied_pass_2', 0)) +print('tied_manual (needs review):', c.get('tied_manual', 0)) +" +``` + +Spot-check 5 `tied_pass_2` items — verify the resolved domain is plausible given +the channel's other content. + +```bash +# Check overall status +python3 recon.py assign-categories +``` + +## Step 5: Push to PeerTube + +Push in stages. Do NOT push all at once. + +```bash +# Dry run: confirm count +python3 recon.py assign-categories --push-pending --dry-run + +# Stage 1: push 100 items +python3 recon.py assign-categories --push-pending --limit 100 +``` + +**STOP.** Verify in PeerTube UI (stream.echo6.co admin, or via API) that 100 videos +now show RECON domain categories. Spot-check 5 videos. + +```bash +# Verify via API: pick a random pushed video +python3 -c " +from lib.status import StatusDB +db = StatusDB() +row = db._get_conn().execute( + \"SELECT d.recon_domain, c.path FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.peertube_category_pushed_at IS NOT NULL ORDER BY RANDOM() LIMIT 1\" +).fetchone() +if row: + uuid = row['path'].rsplit('/w/', 1)[-1] if row['path'] and '/w/' in row['path'] else '?' + print(f'Domain: {row[\"recon_domain\"]} UUID: {uuid}') + print(f'Check: curl -s http://192.168.1.170:9000/api/v1/videos/{uuid} -H \"Host: stream.echo6.co\" | python3 -m json.tool | grep category') +" +``` + +```bash +# Stage 2: push 1000 items +python3 recon.py assign-categories --push-pending --limit 1000 +``` + +**STOP.** Verify via PeerTube database: + +```bash +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"' +``` + +```bash +# Stage 3: push remaining +python3 recon.py assign-categories --push-pending +``` + +## Step 6: Verify + +```bash +# Check PeerTube database directly +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"' + +# Check uncategorized +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT count(*) FROM video WHERE category IS NULL"' + +# Check RECON status +python3 recon.py assign-categories +``` + +## Step 7: Reprocess Missing Items (SEPARATE POST-DEPLOY OPERATION) + +**WARNING:** This step deletes concept directories. It is the only destructive +operation in the entire feature. Run it separately from the initial deploy, +after all other steps are verified and stable. + +```bash +# Dry run first — review what would be deleted +python3 recon.py assign-categories --reprocess-missing --dry-run --limit 10 +``` + +**STOP.** Review output. Verify concept dirs listed are genuinely stale (legacy +domains only, or missing concept files). The dry-run reports file counts for +each directory that would be deleted. + +```bash +# Small batch +python3 recon.py assign-categories --reprocess-missing --limit 10 +``` + +**STOP.** Verify: check that 10 items re-entered the pipeline. + +```bash +python3 recon.py status # queued count should increase by ~10 +``` + +Wait for pipeline to process them. Verify domain assignment on completion: + +```bash +# Check these specific items got re-enriched and assigned +python3 recon.py assign-categories +``` + +```bash +# Scale up +python3 recon.py assign-categories --reprocess-missing --limit 100 + +# Then unbounded +python3 recon.py assign-categories --reprocess-missing +``` + +**Note on interrupts:** If `--reprocess-missing` is interrupted mid-run, re-running +it is safe. Any documents stranded at `status='catalogued'` without being re-queued +can be recovered with `recon.py queue --source stream.echo6.co`. + +## Step 8: Dashboard Review + +Navigate to `https://recon.echo6.co/peertube/review` to review `tied_manual` items. +Each row shows the video, channel, tied domains, and concept counts. Select the +correct domain and click Assign. + +--- + +## Rollback Procedures + +### Plugin install fails or breaks PeerTube + +```bash +# Disable plugin without uninstalling +ssh root@192.168.1.243 'pct exec 110 -- bash -c " + mv /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains \ + /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains.disabled + systemctl restart peertube +"' + +# Verify PeerTube is healthy +curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | head + +# To fully remove: use PeerTube admin UI → Plugins → Uninstall +``` + +### Schema migration revert (drop new columns) + +Only needed if the columns cause problems. The columns are nullable and have no +constraints, so they should be inert. + +```bash +ssh zvx@192.168.1.130 'cd /opt/recon && source venv/bin/activate && python3 -c " +import sqlite3 +conn = sqlite3.connect(\"data/recon.db\") +for col in [\"recon_domain\", \"recon_domain_status\", \"recon_domain_assigned_at\", \"peertube_category_pushed_at\"]: + try: + conn.execute(f\"ALTER TABLE documents DROP COLUMN {col}\") + print(f\"Dropped: {col}\") + except Exception as e: + print(f\"Skip {col}: {e}\") +conn.execute(\"DROP INDEX IF EXISTS idx_documents_recon_domain_status\") +conn.commit() +print(\"Index dropped\") +"' +``` + +Note: SQLite ALTER TABLE DROP COLUMN requires SQLite 3.35.0+ (2021-03-12). +Ubuntu 24.04 ships 3.45.1 — this is fine. + +### Clear wrong backfill assignments (selective or full) + +```bash +cd /opt/recon && source venv/bin/activate + +# Clear ALL domain assignments +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +conn.execute('''UPDATE documents SET + recon_domain = NULL, recon_domain_status = NULL, + recon_domain_assigned_at = NULL, peertube_category_pushed_at = NULL''') +conn.commit() +print('Cleared all domain assignments') +" + +# Clear only tiebreaker results (reset to tied_pass_1 for re-run) +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +conn.execute('''UPDATE documents SET + recon_domain = NULL, recon_domain_status = 'tied_pass_1', + recon_domain_assigned_at = NULL +WHERE recon_domain_status IN ('tied_pass_2', 'tied_manual')''') +conn.commit() +" +``` + +### Clear wrong PeerTube categories + +```bash +# Reset ALL RECON categories (100+) to NULL in PeerTube +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \ + -c "UPDATE video SET category = NULL WHERE category >= 100"' + +# Verify +ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \ + -c "SELECT count(*) FROM video WHERE category >= 100"' +# Should return 0 + +# Also clear RECON pushed timestamps so --push-pending can retry +cd /opt/recon && source venv/bin/activate +python3 -c " +from lib.status import StatusDB +db = StatusDB() +conn = db._get_conn() +conn.execute('UPDATE documents SET peertube_category_pushed_at = NULL WHERE peertube_category_pushed_at IS NOT NULL') +conn.commit() +print('Cleared push timestamps') +" +``` + +### Restore concepts after failed --reprocess-missing + +```bash +# Concept backups are on Contabo at /opt/backups/recon/concepts/ +# Identify which hashes were deleted (check RECON logs) +ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20' + +# Restore specific hash from Contabo +HASH= +ssh root@100.64.0.1 "tar -cf - -C /opt/backups/recon/concepts/ $HASH" | \ + ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/" + +# Restore ALL concepts (nuclear option) +ssh root@100.64.0.1 'rsync -av /opt/backups/recon/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/' +``` + +### Fully remove feature + +1. Uninstall plugin from PeerTube admin UI +2. Restart PeerTube +3. Revert RECON code changes (`git checkout master`) +4. Restart RECON +5. Drop schema columns (see above) +6. Reset PeerTube categories (see above) diff --git a/lib/api.py b/lib/api.py index 8a1f383..2fecafc 100644 --- a/lib/api.py +++ b/lib/api.py @@ -88,6 +88,7 @@ KNOWLEDGE_SUBNAV = [ PEERTUBE_SUBNAV = [ {'href': '/peertube', 'label': 'Dashboard'}, {'href': '/peertube/channels', 'label': 'Channels'}, + {'href': '/peertube/review', 'label': 'Review'}, ] @@ -376,6 +377,86 @@ def peertube_channels(): domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels') +@app.route('/peertube/review') +def peertube_review(): + from .recon_domains import VALID_DOMAINS + return render_template('peertube/review.html', + domain='peertube', subnav=PEERTUBE_SUBNAV, + active_page='/peertube/review', + valid_domains=sorted(VALID_DOMAINS)) + + +@app.route('/api/peertube/review/stats') +def api_peertube_review_stats(): + db = StatusDB() + counts = db.get_domain_status_counts() + return jsonify(counts) + + +@app.route('/api/peertube/review/items') +def api_peertube_review_items(): + from .domain_assigner import _count_domains_from_qdrant, _get_qdrant_client + db = StatusDB() + config = get_config() + items = db.get_items_by_domain_status('tied_manual', limit=200) + + qdrant = _get_qdrant_client(config) + collection = config['vector_db']['collection'] + + result = [] + for item in items: + file_hash = item['hash'] + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) + top_domains = [{'domain': d, 'count': cnt} + for d, cnt in domain_counter.most_common(5)] + + result.append({ + 'hash': file_hash, + 'filename': item.get('filename', ''), + 'category': item.get('category', ''), + 'recon_domain': item.get('recon_domain'), + 'recon_domain_status': item.get('recon_domain_status'), + 'top_domains': top_domains, + }) + return jsonify(result) + + +@app.route('/api/peertube/review/assign', methods=['POST']) +def api_peertube_review_assign(): + from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP + from .peertube_writer import push_category, extract_uuid + data = request.get_json() + file_hash = data.get('hash') + domain = data.get('domain') + + if not file_hash or not domain: + return jsonify({'ok': False, 'error': 'Missing hash or domain'}), 400 + if domain not in VALID_DOMAINS: + return jsonify({'ok': False, 'error': f'Invalid domain: {domain}'}), 400 + + db = StatusDB() + config = get_config() + + db.set_domain_assignment(file_hash, domain, 'manual_assigned') + + # Push to PeerTube + conn = db._get_conn() + cat_row = conn.execute( + "SELECT path FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if cat_row: + uuid = extract_uuid(dict(cat_row)['path']) + if uuid: + cat_id = DOMAIN_CATEGORY_MAP[domain] + try: + push_category(uuid, cat_id, config) + db.set_peertube_pushed(file_hash) + except Exception as e: + return jsonify({'ok': True, 'warning': f'Assigned but PeerTube push failed: {e}'}) + + return jsonify({'ok': True, 'domain': domain}) + + @app.route('/settings/keys') def settings_keys(): from lib.key_manager import get_key_manager diff --git a/lib/domain_assigner.py b/lib/domain_assigner.py new file mode 100644 index 0000000..cbf69a1 --- /dev/null +++ b/lib/domain_assigner.py @@ -0,0 +1,319 @@ +""" +RECON Domain Assigner + +Computes per-video domain assignments from Qdrant vector payloads. +Two functions, two execution modes: + + compute_assignment() — pass 1, inline from post-embed hook + run_tiebreaker_pass() — batch, resolves ties via channel concept scan + +Data source: Qdrant `domain` payload field on concept vectors. +Previously read on-disk concept JSON files; migrated to Qdrant as +single source of truth (2026-04-28). + +Status values written to documents.recon_domain_status: + assigned — clear winner from pass 1 concept count + tied_pass_1 — concept tie, awaiting channel tiebreaker + tied_pass_2 — resolved by channel tiebreaker + tied_manual — needs human review (dashboard) + no_concepts — terminal, zero concept vectors in Qdrant + needs_reprocess — transient failure (Qdrant error, etc.) + manual_assigned — human override from dashboard +""" +from collections import Counter + +from qdrant_client import QdrantClient +from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny + +from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP, MEGA_CHANNEL_SKIP_LIST +from .utils import setup_logging + +logger = setup_logging('recon.domain_assigner') + + + +def _get_qdrant_client(config): + """Create a QdrantClient from RECON config. + + Callers should create one client and pass it through rather than + calling this repeatedly. + """ + logger.debug("Creating new QdrantClient (caller did not pass one)") + return QdrantClient( + host=config['vector_db']['host'], + port=config['vector_db']['port'], + timeout=60 + ) + + +def _count_domains_from_qdrant(qdrant, collection, doc_hash): + """Count valid domain occurrences for a single document from Qdrant. + + Scrolls all points matching doc_hash and counts domain values. + + Args: + qdrant: QdrantClient instance + collection: Qdrant collection name + doc_hash: Document hash to query + + Returns: + Counter of {domain_name: count} for valid domains. + Empty Counter if no points found (never None). + """ + domain_counter = Counter() + offset = None + + while True: + results, next_offset = qdrant.scroll( + collection_name=collection, + scroll_filter=Filter(must=[ + FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash)) + ]), + with_payload=["domain"], + with_vectors=False, + limit=200, + offset=offset, + ) + + for point in results: + dom = point.payload.get('domain') + if isinstance(dom, str) and dom in VALID_DOMAINS: + domain_counter[dom] += 1 + elif isinstance(dom, list): + for d in dom: + if isinstance(d, str) and d in VALID_DOMAINS: + domain_counter[d] += 1 + + if next_offset is None: + break + offset = next_offset + + return domain_counter + + +def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes): + """Count valid domain occurrences across multiple documents from Qdrant. + + Single scroll with MatchAny filter, with offset pagination for large + result sets. + + Args: + qdrant: QdrantClient instance + collection: Qdrant collection name + doc_hashes: List of document hashes to query + + Returns: + Counter of {domain_name: count} aggregated across all matching points. + """ + if not doc_hashes: + return Counter() + + domain_counter = Counter() + offset = None + + while True: + results, next_offset = qdrant.scroll( + collection_name=collection, + scroll_filter=Filter(must=[ + FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes)) + ]), + with_payload=["domain"], + with_vectors=False, + limit=10000, + offset=offset, + ) + + for point in results: + dom = point.payload.get('domain') + if isinstance(dom, str) and dom in VALID_DOMAINS: + domain_counter[dom] += 1 + elif isinstance(dom, list): + for d in dom: + if isinstance(d, str) and d in VALID_DOMAINS: + domain_counter[d] += 1 + + if next_offset is None: + break + offset = next_offset + + return domain_counter + + +def compute_assignment(file_hash, db, config, qdrant=None): + """Compute domain assignment for a single document (pass 1). + + Counts domain occurrences across all concept vectors in Qdrant. + If a single domain wins, assigns it. If tied, defers to batch + tiebreaker. + + Args: + file_hash: Document hash + db: StatusDB instance + config: RECON config dict + qdrant: Optional QdrantClient (created if not provided) + + Returns: + (domain, status) tuple where domain is a string or None, + and status is one of: 'assigned', 'tied_pass_1', 'no_concepts', + 'needs_reprocess' + """ + owns_client = False + if qdrant is None: + qdrant = _get_qdrant_client(config) + owns_client = True + + collection = config['vector_db']['collection'] + + try: + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) + except Exception as e: + logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}") + return (None, 'needs_reprocess') + + if len(domain_counter) == 0: + return (None, 'no_concepts') + + top = domain_counter.most_common(2) + top_domain = top[0][0] + top_count = top[0][1] + + if len(top) == 1 or top[1][1] < top_count: + return (top_domain, 'assigned') + + # Tie — defer to tiebreaker pass + return (None, 'tied_pass_1') + + +def _get_tied_domains(qdrant, collection, file_hash): + """Get the set of domains tied for first place in a document's concepts.""" + domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash) + if not domain_counter: + return [] + + top = domain_counter.most_common() + if not top: + return [] + + max_count = top[0][1] + return [dom for dom, cnt in top if cnt == max_count] + + +def _channel_video_hashes(db, channel_name, exclude_hash=None): + """Get all document hashes belonging to a PeerTube channel. + + Args: + db: StatusDB instance + channel_name: catalogue.category (channel actor name) + exclude_hash: Hash to exclude (the document being resolved) + + Returns: + List of document hashes + """ + conn = db._get_conn() + rows = conn.execute( + "SELECT hash FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'", + (channel_name,) + ).fetchall() + hashes = [r['hash'] for r in rows] + if exclude_hash: + hashes = [h for h in hashes if h != exclude_hash] + return hashes + + + + +def run_tiebreaker_pass(db, config, qdrant=None): + """Resolve tied domain assignments using channel-level Qdrant analysis. + + Processes all documents where recon_domain_status = 'tied_pass_1'. + + For each tied document, queries Qdrant for domain counts from all + other videos in the same channel and picks the tied domain with the + highest channel-wide count. + + Channels in MEGA_CHANNEL_SKIP_LIST (known non-topical catch-alls) skip + tiebreaking and go straight to 'tied_manual' for dashboard review. + + Args: + db: StatusDB instance + config: RECON config dict + qdrant: Optional QdrantClient (created if not provided) + + Returns: + Dict with counts: resolved, manual, skipped, errors + """ + owns_client = False + if qdrant is None: + qdrant = _get_qdrant_client(config) + owns_client = True + + collection = config['vector_db']['collection'] + tied_items = db.get_items_by_domain_status('tied_pass_1') + + stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)} + logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve") + + for item in tied_items: + file_hash = item['hash'] + channel = item.get('category', '') + + try: + tied_domains = _get_tied_domains(qdrant, collection, file_hash) + if not tied_domains: + db.set_domain_assignment(file_hash, None, 'no_concepts') + stats['skipped'] += 1 + continue + + if len(tied_domains) == 1: + # No longer tied (possibly re-enriched since pass 1) + db.set_domain_assignment(file_hash, tied_domains[0], 'assigned') + stats['resolved'] += 1 + continue + + # Skip-list check: known non-topical catch-all channels + if channel in MEGA_CHANNEL_SKIP_LIST: + fallback = sorted(tied_domains)[0] + db.set_domain_assignment(file_hash, fallback, 'tied_manual') + stats['manual'] += 1 + logger.debug(f" {file_hash[:12]}: skip-list channel '{channel}' → tied_manual") + continue + + # Channel tiebreaker: count domains across all other videos in channel + other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash) + channel_domain_counts = _count_domains_from_qdrant_batch( + qdrant, collection, other_hashes + ) + + # Among tied domains only, pick highest channel-wide count + best_domain = None + best_count = -1 + for dom in tied_domains: + c = channel_domain_counts.get(dom, 0) + if c > best_count: + best_count = c + best_domain = dom + + # Check if channel tiebreaker resolved it + tied_at_channel = [d for d in tied_domains + if channel_domain_counts.get(d, 0) == best_count] + + if len(tied_at_channel) == 1: + db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2') + stats['resolved'] += 1 + logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)") + continue + + # Still tied after channel scan — mark for manual review + fallback = sorted(tied_domains)[0] + db.set_domain_assignment(file_hash, fallback, 'tied_manual') + stats['manual'] += 1 + logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual") + + except Exception as e: + logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}") + stats['errors'] += 1 + + logger.info(f"Tiebreaker complete: {stats['resolved']} resolved, " + f"{stats['manual']} manual, {stats['skipped']} skipped, " + f"{stats['errors']} errors") + return stats diff --git a/lib/embedder.py b/lib/embedder.py index 8dcc45a..da6a952 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -27,13 +27,7 @@ from .utils import resolve_text_dir logger = setup_logging('recon.embedder') # ── Classification allowlists ─────────────────────────────────────────────── -VALID_DOMAINS = { - 'Agriculture & Livestock', 'Civil Organization', 'Communications', - 'Food Systems', 'Foundational Skills', 'Logistics', 'Medical', - 'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage', - 'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment', - 'Vehicles', 'Water Systems', 'Wilderness Skills', -} +from .recon_domains import VALID_DOMAINS VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'} VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'} @@ -261,15 +255,22 @@ def embed_single(file_hash, db, config): if not all_concepts: db.update_status(file_hash, 'complete', vectors_inserted=0) + # Tag stream docs with no concepts for reprocessing + _cat = db._get_conn().execute( + "SELECT source FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if _cat and dict(_cat)['source'] == 'stream.echo6.co': + db.set_domain_assignment(file_hash, None, 'needs_reprocess') logger.info(f"No concepts to embed for {doc['filename']}") return True - # Look up source from catalogue once per doc + # Look up source and path from catalogue once per doc cat_conn = db._get_conn() cat_row = cat_conn.execute( - "SELECT source FROM catalogue WHERE hash = ?", (file_hash,) + "SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,) ).fetchone() source = dict(cat_row)['source'] if cat_row else '' + catalogue_path = dict(cat_row)['path'] if cat_row else '' download_url = '' is_web = doc.get('path', '').startswith(('http://', 'https://')) @@ -321,6 +322,8 @@ def embed_single(file_hash, db, config): if not valid: db.update_status(file_hash, 'complete', vectors_inserted=0) + if source == 'stream.echo6.co': + db.set_domain_assignment(file_hash, None, 'needs_reprocess') logger.info(f"No valid concepts to embed for {doc['filename']}") return True @@ -401,6 +404,28 @@ def embed_single(file_hash, db, config): db.update_status(file_hash, 'complete', vectors_inserted=embedded_count) logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)") + + # Post-embed hook: assign domain for PeerTube videos + if source == 'stream.echo6.co': + try: + from .domain_assigner import compute_assignment + from .peertube_writer import push_category, extract_uuid + from .recon_domains import DOMAIN_CATEGORY_MAP + domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant) + db.set_domain_assignment(file_hash, domain, status) + if domain and status == 'assigned': + cat_id = DOMAIN_CATEGORY_MAP[domain] + uuid = extract_uuid(catalogue_path) + if uuid: + pushed, _token = push_category(uuid, cat_id, config) + if pushed: + db.set_peertube_pushed(file_hash) + logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube") + else: + logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending") + except Exception as e: + logger.warning(f"Domain assignment failed for {file_hash}: {e}") + return True except Exception as e: diff --git a/lib/enricher.py b/lib/enricher.py index e1e583c..f6d46bd 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -42,13 +42,7 @@ logger = setup_logging('recon.enricher') STALE_ENRICHING_HOURS = 2 # ── Classification allowlists ─────────────────────────────────────────────── -VALID_DOMAINS = { - 'Agriculture & Livestock', 'Civil Organization', 'Communications', - 'Food Systems', 'Foundational Skills', 'Logistics', 'Medical', - 'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage', - 'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment', - 'Vehicles', 'Water Systems', 'Wilderness Skills', -} +from .recon_domains import VALID_DOMAINS VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'} VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'} diff --git a/lib/peertube_writer.py b/lib/peertube_writer.py new file mode 100644 index 0000000..f8d463a --- /dev/null +++ b/lib/peertube_writer.py @@ -0,0 +1,323 @@ +""" +RECON PeerTube Writer + +Authenticated PeerTube API client for pushing domain category assignments. +Uses OAuth2 password grant, caches tokens, refreshes on 401. + +Config keys used: + peertube.api_url — internal PeerTube URL (http://192.168.1.170:9000) + peertube.host_header — Host header for API requests (stream.echo6.co) + peertube.username — PeerTube admin username + peertube.password_env — env var name holding the password + peertube.rate_limit_delay — delay between API calls (seconds) +""" +import json +import os +import time + +import requests as http_requests + +from .recon_domains import DOMAIN_CATEGORY_MAP +from .utils import setup_logging + +logger = setup_logging('recon.peertube_writer') + +TOKEN_CACHE_PATH = '/opt/recon/data/peertube-oauth-token.json' + + +def _get_peertube_config(config): + """Extract PeerTube writer config with defaults.""" + pt = config.get('peertube', {}) + return { + 'api_url': pt.get('api_url', pt.get('api_base', 'http://192.168.1.170:9000')), + 'host_header': pt.get('host_header', 'stream.echo6.co'), + 'username': pt.get('username', 'root'), + 'password_env': pt.get('password_env', 'PEERTUBE_PASSWORD'), + 'rate_limit_delay': pt.get('writer_rate_limit', 0.1), + } + + +def _load_cached_token(): + """Load cached OAuth token from disk.""" + if os.path.exists(TOKEN_CACHE_PATH): + try: + with open(TOKEN_CACHE_PATH, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return None + + +def _save_token(token_data): + """Save OAuth token to disk cache.""" + os.makedirs(os.path.dirname(TOKEN_CACHE_PATH), exist_ok=True) + with open(TOKEN_CACHE_PATH, 'w') as f: + json.dump(token_data, f) + + +def _get_oauth_client(api_url, host_header): + """Get PeerTube OAuth client credentials. + + Args: + api_url: Base API URL + host_header: Host header value + + Returns: + (client_id, client_secret) tuple + """ + resp = http_requests.get( + f"{api_url}/api/v1/oauth-clients/local", + headers={'Host': host_header}, + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + return data['client_id'], data['client_secret'] + + +def _get_token(api_url, host_header, username, password, client_id, client_secret): + """Obtain OAuth2 access token via password grant. + + Args: + api_url: Base API URL + host_header: Host header value + username: PeerTube username + password: PeerTube password + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + Token data dict with access_token, refresh_token, etc. + """ + resp = http_requests.post( + f"{api_url}/api/v1/users/token", + headers={'Host': host_header}, + data={ + 'client_id': client_id, + 'client_secret': client_secret, + 'grant_type': 'password', + 'username': username, + 'password': password, + }, + timeout=30, + ) + resp.raise_for_status() + token_data = resp.json() + token_data['client_id'] = client_id + token_data['client_secret'] = client_secret + _save_token(token_data) + return token_data + + +def _refresh_token(api_url, host_header, token_data): + """Refresh an expired access token. + + Returns: + New token data dict, or None on failure. + """ + try: + resp = http_requests.post( + f"{api_url}/api/v1/users/token", + headers={'Host': host_header}, + data={ + 'client_id': token_data['client_id'], + 'client_secret': token_data['client_secret'], + 'grant_type': 'refresh_token', + 'refresh_token': token_data['refresh_token'], + }, + timeout=30, + ) + resp.raise_for_status() + new_data = resp.json() + new_data['client_id'] = token_data['client_id'] + new_data['client_secret'] = token_data['client_secret'] + _save_token(new_data) + return new_data + except Exception as e: + logger.warning(f"Token refresh failed: {e}") + return None + + +def _ensure_token(config): + """Ensure we have a valid OAuth token. Returns token data dict. + + Tries cached token first, then obtains a new one. + """ + pt = _get_peertube_config(config) + password = os.environ.get(pt['password_env'], '') + if not password: + raise ValueError(f"PeerTube password not set in env var {pt['password_env']}") + + # Try cached token + token_data = _load_cached_token() + if token_data and 'access_token' in token_data: + return token_data + + # Get fresh token + client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header']) + return _get_token( + pt['api_url'], pt['host_header'], + pt['username'], password, + client_id, client_secret, + ) + + +def _api_request(method, path, config, token_data, **kwargs): + """Make an authenticated PeerTube API request with auto-refresh on 401. + + Args: + method: HTTP method ('GET', 'PUT', etc.) + path: API path (e.g. '/api/v1/videos/{uuid}') + config: RECON config dict + token_data: Current token data dict + **kwargs: Additional requests kwargs (json, data, etc.) + + Returns: + (response, token_data) tuple — token_data may be refreshed. + """ + pt = _get_peertube_config(config) + url = f"{pt['api_url']}{path}" + headers = { + 'Host': pt['host_header'], + 'Authorization': f"Bearer {token_data['access_token']}", + } + + resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs) + + if resp.status_code == 401: + # Try refresh + new_token = _refresh_token(pt['api_url'], pt['host_header'], token_data) + if new_token: + headers['Authorization'] = f"Bearer {new_token['access_token']}" + resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs) + return resp, new_token + else: + # Full re-auth + password = os.environ.get(pt['password_env'], '') + client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header']) + new_token = _get_token( + pt['api_url'], pt['host_header'], + pt['username'], password, + client_id, client_secret, + ) + headers['Authorization'] = f"Bearer {new_token['access_token']}" + resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs) + return resp, new_token + + return resp, token_data + + +def push_category(video_uuid, category_id, config, token_data=None): + """Push a category assignment to a single PeerTube video. + + Args: + video_uuid: PeerTube video UUID + category_id: Category ID (100-117) + config: RECON config dict + token_data: Optional pre-fetched token data + + Returns: + (success: bool, token_data: dict) tuple + """ + if token_data is None: + token_data = _ensure_token(config) + + resp, token_data = _api_request( + 'PUT', + f'/api/v1/videos/{video_uuid}', + config, + token_data, + json={'category': category_id}, + ) + + if resp.status_code in (200, 204): + return True, token_data + else: + logger.warning(f"Failed to push category for {video_uuid}: " + f"HTTP {resp.status_code} — {resp.text[:200]}") + return False, token_data + + +def extract_uuid(catalogue_path): + """Extract PeerTube video UUID from catalogue path. + + Catalogue paths for PeerTube videos look like: + https://stream.echo6.co/w/UUID + + Args: + catalogue_path: catalogue.path value + + Returns: + UUID string or None + """ + if not catalogue_path: + return None + if '/w/' in catalogue_path: + return catalogue_path.rsplit('/w/', 1)[-1] + return None + + +def push_pending(db, config, limit=None): + """Push all assigned-but-unpushed domain categories to PeerTube. + + Args: + db: StatusDB instance + config: RECON config dict + limit: Optional max number of items to push + + Returns: + (success_count, fail_count) tuple + """ + items = db.get_unpushed_assignments() + if limit: + items = items[:limit] + if not items: + logger.info("No unpushed assignments to push") + return (0, 0) + + pt = _get_peertube_config(config) + delay = pt['rate_limit_delay'] + + SYSTEMIC_FAIL_THRESHOLD = 5 # abort if first N items all fail + + logger.info(f"Pushing {len(items)} category assignments to PeerTube") + + token_data = _ensure_token(config) + success = 0 + failed = 0 + + for item in items: + file_hash = item['hash'] + domain = item.get('recon_domain') + catalogue_path = item.get('catalogue_path', '') + + if not domain or domain not in DOMAIN_CATEGORY_MAP: + logger.warning(f" {file_hash[:12]}: invalid domain '{domain}', skipping") + failed += 1 + continue + + uuid = extract_uuid(catalogue_path) + if not uuid: + logger.warning(f" {file_hash[:12]}: could not extract UUID from '{catalogue_path}'") + failed += 1 + continue + + category_id = DOMAIN_CATEGORY_MAP[domain] + ok, token_data = push_category(uuid, category_id, config, token_data) + + if ok: + db.set_peertube_pushed(file_hash) + success += 1 + else: + failed += 1 + + # Abort on systemic failure (e.g. plugin not installed, auth broken) + if success == 0 and failed >= SYSTEMIC_FAIL_THRESHOLD: + logger.error(f"Aborting push: first {failed} items all failed — " + f"check plugin installation and PeerTube API config") + break + + time.sleep(delay) + + logger.info(f"Push complete: {success} succeeded, {failed} failed") + return (success, failed) diff --git a/lib/recon_domains.py b/lib/recon_domains.py new file mode 100644 index 0000000..b1c8b3c --- /dev/null +++ b/lib/recon_domains.py @@ -0,0 +1,46 @@ +""" +RECON Domain Taxonomy + +Single source of truth for the 18 knowledge domains and their PeerTube +category ID mappings. IDs 100-117 are registered via the +peertube-plugin-recon-domains plugin. + +Import VALID_DOMAINS from here instead of defining local sets. +""" + +DOMAIN_CATEGORY_MAP = { + 'Agriculture & Livestock': 100, + 'Civil Organization': 101, + 'Communications': 102, + 'Food Systems': 103, + 'Foundational Skills': 104, + 'Logistics': 105, + 'Medical': 106, + 'Navigation': 107, + 'Operations': 108, + 'Power Systems': 109, + 'Preservation & Storage': 110, + 'Security': 111, + 'Shelter & Construction': 112, + 'Technology': 113, + 'Tools & Equipment': 114, + 'Vehicles': 115, + 'Water Systems': 116, + 'Wilderness Skills': 117, +} + +VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys()) + +CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()} + +# Channels whose tiebreaker is skipped because their content is non-topical +# (catch-alls, miscellany dumps, etc.). Items in these channels with tied +# domain counts go straight to tied_manual without channel-context tiebreaker. +# +# This is intentionally a hardcoded explicit list, not a size threshold. +# Adding a channel here requires an explicit decision — only add channels +# that are genuinely non-topical catch-alls where channel-wide concept +# aggregation would produce meaningless noise. +MEGA_CHANNEL_SKIP_LIST = { + 'Transcript', # Legacy catch-all, ~9,200 videos, no topical coherence +} diff --git a/lib/status.py b/lib/status.py index 974cabd..c710846 100644 --- a/lib/status.py +++ b/lib/status.py @@ -124,6 +124,22 @@ class StatusDB: except Exception: pass # column already exists + # Migration: domain assignment columns for PeerTube categorization + for col, coltype in [ + ('recon_domain', 'TEXT'), + ('recon_domain_status', 'TEXT'), + ('recon_domain_assigned_at', 'TEXT'), + ('peertube_category_pushed_at', 'TEXT'), + ]: + try: + conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}") + except Exception: + pass # column already exists + try: + conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)") + except Exception: + pass # index already exists + # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -448,7 +464,91 @@ class StatusDB: conn.commit() - # ── Scraper Job Helpers ───────────────────────────────────── + # ── Domain Assignment Helpers ──────────────────��───────────── + + def get_domain_assignment(self, file_hash): + """Get domain assignment for a document. + + Returns: + (recon_domain, recon_domain_status) tuple, or (None, None) if not set. + """ + conn = self._get_conn() + row = conn.execute( + "SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?", + (file_hash,) + ).fetchone() + if row: + return (row['recon_domain'], row['recon_domain_status']) + return (None, None) + + def set_domain_assignment(self, file_hash, domain, status): + """Set domain assignment and status for a document.""" + conn = self._get_conn() + conn.execute( + "UPDATE documents SET recon_domain = ?, recon_domain_status = ?, " + "recon_domain_assigned_at = ? WHERE hash = ?", + (domain, status, datetime.now(timezone.utc).isoformat(), file_hash) + ) + conn.commit() + + def set_peertube_pushed(self, file_hash): + """Mark a document's category as pushed to PeerTube.""" + conn = self._get_conn() + conn.execute( + "UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?", + (datetime.now(timezone.utc).isoformat(), file_hash) + ) + conn.commit() + + def get_unpushed_assignments(self): + """Get documents with domain assignments not yet pushed to PeerTube. + + Only returns stream.echo6.co source documents. + """ + conn = self._get_conn() + return [dict(r) for r in conn.execute( + """SELECT d.*, c.source, c.category, c.path as catalogue_path + FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.recon_domain IS NOT NULL + AND d.peertube_category_pushed_at IS NULL + AND c.source = 'stream.echo6.co' + ORDER BY d.recon_domain_assigned_at""", + ).fetchall()] + + def get_items_by_domain_status(self, status, limit=None): + """Get documents by domain assignment status.""" + conn = self._get_conn() + q = """SELECT d.*, c.source, c.category, c.path as catalogue_path + FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.recon_domain_status = ? + ORDER BY d.discovered_at""" + if limit: + q += f" LIMIT {int(limit)}" + return [dict(r) for r in conn.execute(q, (status,)).fetchall()] + + def get_domain_status_counts(self): + """Get counts of documents by domain assignment status.""" + conn = self._get_conn() + return {row['recon_domain_status']: row['cnt'] + for row in conn.execute( + "SELECT recon_domain_status, COUNT(*) as cnt " + "FROM documents WHERE recon_domain_status IS NOT NULL " + "GROUP BY recon_domain_status" + ).fetchall()} + + def get_domain_distribution(self): + """Get counts of documents per assigned domain.""" + conn = self._get_conn() + return {row['recon_domain']: row['cnt'] + for row in conn.execute( + "SELECT recon_domain, COUNT(*) as cnt " + "FROM documents WHERE recon_domain IS NOT NULL " + "GROUP BY recon_domain ORDER BY cnt DESC" + ).fetchall()} + + # ── Scraper Job Helpers ───────────────────────────────────── def get_pending_scrape_job(self): """Fetch the oldest pending scrape job.""" diff --git a/peertube-plugin/peertube-plugin-recon-domains/README.md b/peertube-plugin/peertube-plugin-recon-domains/README.md new file mode 100644 index 0000000..88ba035 --- /dev/null +++ b/peertube-plugin/peertube-plugin-recon-domains/README.md @@ -0,0 +1,52 @@ +# peertube-plugin-recon-domains + +Registers 18 RECON knowledge domains as PeerTube video categories using IDs 100-117. These categories are assigned automatically by RECON's domain assignment pipeline based on concept extraction analysis. + +## Category Mapping + +| ID | Domain | +|-----|---------------------------| +| 100 | Agriculture & Livestock | +| 101 | Civil Organization | +| 102 | Communications | +| 103 | Food Systems | +| 104 | Foundational Skills | +| 105 | Logistics | +| 106 | Medical | +| 107 | Navigation | +| 108 | Operations | +| 109 | Power Systems | +| 110 | Preservation & Storage | +| 111 | Security | +| 112 | Shelter & Construction | +| 113 | Technology | +| 114 | Tools & Equipment | +| 115 | Vehicles | +| 116 | Water Systems | +| 117 | Wilderness Skills | + +Built-in PeerTube categories (IDs 1-18) are not modified. + +## Install + +```bash +# Copy plugin to PeerTube storage +cp -r peertube-plugin-recon-domains /var/www/peertube/storage/plugins/node_modules/ + +# Register plugin via API or admin UI +# Admin > Plugins > Install > peertube-plugin-recon-domains + +# Restart PeerTube +sudo systemctl restart peertube +``` + +## Uninstall + +Remove the plugin via PeerTube admin UI or: + +```bash +rm -rf /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains +sudo systemctl restart peertube +``` + +Videos with RECON categories will revert to showing the raw category ID until the plugin is reinstalled. diff --git a/peertube-plugin/peertube-plugin-recon-domains/main.js b/peertube-plugin/peertube-plugin-recon-domains/main.js new file mode 100644 index 0000000..7185c34 --- /dev/null +++ b/peertube-plugin/peertube-plugin-recon-domains/main.js @@ -0,0 +1,32 @@ +async function register ({ videoCategoryManager }) { + const reconDomains = { + 100: 'Agriculture & Livestock', + 101: 'Civil Organization', + 102: 'Communications', + 103: 'Food Systems', + 104: 'Foundational Skills', + 105: 'Logistics', + 106: 'Medical', + 107: 'Navigation', + 108: 'Operations', + 109: 'Power Systems', + 110: 'Preservation & Storage', + 111: 'Security', + 112: 'Shelter & Construction', + 113: 'Technology', + 114: 'Tools & Equipment', + 115: 'Vehicles', + 116: 'Water Systems', + 117: 'Wilderness Skills' + } + + for (const [id, label] of Object.entries(reconDomains)) { + videoCategoryManager.addConstant(parseInt(id), label) + } +} + +async function unregister () { + return +} + +module.exports = { register, unregister } diff --git a/peertube-plugin/peertube-plugin-recon-domains/package.json b/peertube-plugin/peertube-plugin-recon-domains/package.json new file mode 100644 index 0000000..5c0db7a --- /dev/null +++ b/peertube-plugin/peertube-plugin-recon-domains/package.json @@ -0,0 +1,21 @@ +{ + "name": "peertube-plugin-recon-domains", + "version": "1.0.0", + "description": "Registers 18 RECON knowledge domains as PeerTube video categories (IDs 100-117)", + "engine": { + "peertube": ">=6.0.0" + }, + "keywords": [ + "peertube", + "plugin" + ], + "homepage": "https://forge.echo6.co/matt/recon", + "author": "Echo6", + "license": "MIT", + "library": "./main.js", + "staticDirs": {}, + "css": [], + "clientScripts": [], + "translations": {}, + "bugs": "https://forge.echo6.co/matt/recon/issues" +} diff --git a/recon.py b/recon.py index 9635a59..81c8a81 100755 --- a/recon.py +++ b/recon.py @@ -863,6 +863,166 @@ def cmd_ingest(args): return 0 +def cmd_assign_categories(args): + """Assign RECON domains to PeerTube videos and push categories.""" + from qdrant_client import QdrantClient + from lib.domain_assigner import compute_assignment, run_tiebreaker_pass + from lib.peertube_writer import push_pending, extract_uuid + from lib.recon_domains import DOMAIN_CATEGORY_MAP + + config = get_config() + db = StatusDB() + dry_run = args.dry_run + limit = args.limit + + if args.backfill: + # Pass 1: assign domains to all complete stream docs with no assignment + # or that previously got needs_reprocess + conn = db._get_conn() + q = """SELECT d.hash FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.status = 'complete' + AND (d.recon_domain IS NULL + OR d.recon_domain_status = 'needs_reprocess') + AND c.source = 'stream.echo6.co' + ORDER BY d.discovered_at""" + if limit: + q += f" LIMIT {int(limit)}" + rows = conn.execute(q).fetchall() + hashes = [r['hash'] for r in rows] + + if not hashes: + print("No unassigned complete stream documents found") + return 0 + + print(f"Backfill: processing {len(hashes)} documents" + + (" [DRY RUN]" if dry_run else "")) + + # Create one Qdrant client for the entire backfill + qdrant = QdrantClient( + host=config['vector_db']['host'], + port=config['vector_db']['port'], + timeout=60 + ) + + stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0} + for i, file_hash in enumerate(hashes): + try: + domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant) + stats[status] = stats.get(status, 0) + 1 + if not dry_run: + db.set_domain_assignment(file_hash, domain, status) + if (i + 1) % 1000 == 0: + print(f" Progress: {i + 1}/{len(hashes)}") + except Exception as e: + stats['errors'] += 1 + logger.warning(f" Assignment error for {file_hash[:12]}: {e}") + + print(f"\nBackfill results:") + for k, v in sorted(stats.items()): + print(f" {k}: {v}") + return 0 + + elif args.tiebreaker_pass: + if dry_run: + items = db.get_items_by_domain_status('tied_pass_1') + print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]") + return 0 + stats = run_tiebreaker_pass(db, config) + print(f"\nTiebreaker results:") + for k, v in sorted(stats.items()): + print(f" {k}: {v}") + return 0 + + elif args.push_pending: + if dry_run: + items = db.get_unpushed_assignments() + if limit: + items = items[:limit] + print(f"Push pending: {len(items)} items would be pushed [DRY RUN]") + return 0 + success, failed = push_pending(db, config, limit=limit) + print(f"\nPush results: {success} succeeded, {failed} failed") + return 0 + + elif args.reprocess_missing: + items = db.get_items_by_domain_status('needs_reprocess', limit=limit) + if not items: + print("No items with needs_reprocess status") + return 0 + + print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else "")) + requeued = 0 + for item in items: + file_hash = item['hash'] + if dry_run: + print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')}") + requeued += 1 + continue + + # Reset document status to allow re-processing + conn = db._get_conn() + conn.execute( + """UPDATE documents SET + status = 'catalogued', + concepts_extracted = 0, + vectors_inserted = 0, + recon_domain = NULL, + recon_domain_status = NULL, + recon_domain_assigned_at = NULL, + peertube_category_pushed_at = NULL, + error_message = NULL, + extracted_at = NULL, + enriched_at = NULL, + embedded_at = NULL + WHERE hash = ?""", + (file_hash,) + ) + conn.commit() + # Re-queue for pipeline processing + db.queue_document(file_hash) + requeued += 1 + + print(f"Requeued {requeued} items for reprocessing") + return 0 + + else: + # Default: show domain assignment status + status_counts = db.get_domain_status_counts() + domain_dist = db.get_domain_distribution() + + conn = db._get_conn() + total_stream = conn.execute( + """SELECT COUNT(*) as cnt FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'""" + ).fetchone()['cnt'] + unassigned = conn.execute( + """SELECT COUNT(*) as cnt FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'stream.echo6.co' AND d.status = 'complete' + AND d.recon_domain IS NULL""" + ).fetchone()['cnt'] + unpushed = len(db.get_unpushed_assignments()) + + print("=== Domain Assignment Status ===\n") + print(f"Total complete stream docs: {total_stream}") + print(f"Unassigned: {unassigned}") + print(f"Unpushed to PeerTube: {unpushed}") + + if status_counts: + print(f"\nAssignment status breakdown:") + for status, cnt in sorted(status_counts.items()): + print(f" {status:<20s} {cnt:>6d}") + + if domain_dist: + print(f"\nDomain distribution:") + for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]): + print(f" {domain:<35s} {cnt:>6d}") + + return 0 + + def cmd_pipeline(args): """Stream B library pipeline: status, migrate, reverse, watch, sweep.""" from lib.new_pipeline import ( @@ -1158,6 +1318,16 @@ def main(): p.set_defaults(func=cmd_ingest) + # assign-categories + p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos') + p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs') + p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis') + p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API') + p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items') + p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing') + p.add_argument('--limit', type=int, help='Limit number of items to process') + p.set_defaults(func=cmd_assign_categories) + # pipeline (Stream B) p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)') p.add_argument('pipeline_action', nargs='?', default='status', diff --git a/templates/peertube/review.html b/templates/peertube/review.html new file mode 100644 index 0000000..6793e03 --- /dev/null +++ b/templates/peertube/review.html @@ -0,0 +1,148 @@ +{% extends "base.html" %} +{% block content %} +
+
+
Manual Review
+
Assigned
+
Tied (Pass 1)
+
Needs Reprocess
+
+ +
+

Manual Review Queue

+
+ + + + + + + + + + + + +
Loading...
+
+
+
+{% endblock %} +{% block scripts %} + +{% endblock %} diff --git a/tests/test_constants_parity.py b/tests/test_constants_parity.py new file mode 100644 index 0000000..5be1121 --- /dev/null +++ b/tests/test_constants_parity.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +Parity test: verify RECON domain taxonomy matches PeerTube categories. + +Tests: +1. DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS exactly +2. PeerTube API returns all 18 RECON categories (IDs 100-117) with correct labels + +Usage: + cd /opt/recon && source venv/bin/activate + python3 tests/test_constants_parity.py +""" +import json +import sys +import requests + +# Add parent dir to path for imports +sys.path.insert(0, '/opt/recon') +from lib.recon_domains import DOMAIN_CATEGORY_MAP, VALID_DOMAINS, CATEGORY_DOMAIN_MAP + + +def test_local_parity(): + """Verify DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS.""" + map_keys = set(DOMAIN_CATEGORY_MAP.keys()) + assert map_keys == VALID_DOMAINS, ( + f"Mismatch: in map but not VALID_DOMAINS: {map_keys - VALID_DOMAINS}, " + f"in VALID_DOMAINS but not map: {VALID_DOMAINS - map_keys}" + ) + assert len(CATEGORY_DOMAIN_MAP) == len(DOMAIN_CATEGORY_MAP), "Reverse map size mismatch" + print(f"[OK] Local parity: {len(VALID_DOMAINS)} domains, map keys match VALID_DOMAINS") + + +def test_peertube_categories(): + """Verify PeerTube API returns all 18 RECON categories.""" + url = "http://192.168.1.170:9000/api/v1/videos/categories" + headers = {"Host": "stream.echo6.co"} + + try: + resp = requests.get(url, headers=headers, timeout=10) + resp.raise_for_status() + except Exception as e: + print(f"[SKIP] PeerTube API unreachable: {e}") + return + + categories = resp.json() # dict of {id_str: label} + + missing = [] + wrong_label = [] + for domain, cat_id in DOMAIN_CATEGORY_MAP.items(): + cat_str = str(cat_id) + if cat_str not in categories: + missing.append((cat_id, domain)) + elif categories[cat_str] != domain: + wrong_label.append((cat_id, domain, categories[cat_str])) + + if missing: + print(f"[FAIL] Missing categories in PeerTube: {missing}") + print(" Deploy peertube-plugin-recon-domains to CT 110 first") + sys.exit(1) + + if wrong_label: + print(f"[FAIL] Wrong labels in PeerTube: {wrong_label}") + sys.exit(1) + + print(f"[OK] PeerTube parity: all {len(DOMAIN_CATEGORY_MAP)} categories present with correct labels") + + +if __name__ == '__main__': + test_local_parity() + test_peertube_categories() + print("\nAll parity tests passed.")