Compare commits

...

14 commits

Author SHA1 Message Date
299be21f42 Replace mega-channel size rule with explicit skip list
The >500-video threshold was too aggressive — it skipped tiebreaking
for legitimate large channels (1a-auto, forgotten-weapons, etc.) where
channel context correctly resolves ties. Replace with an explicit
MEGA_CHANNEL_SKIP_LIST in recon_domains.py. Only known non-topical
catch-alls (currently just "Transcript") skip the tiebreaker.

Removed _channel_video_count() helper and MEGA_CHANNEL_THRESHOLD
constant (no longer used).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 04:24:39 +00:00
d8196e60c7 Track domain-assignment follow-up items
Three nice-to-haves from the Qdrant migration pre-commit review:
list-type domain handling, tiebreaker client threading, debug log
caller identification. All zero production impact.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 03:59:19 +00:00
043119fbaa Fix: review UI domain breakdown from Qdrant
The review items API endpoint still read concept domain counts from
on-disk JSON files, which would show empty breakdowns for items with
missing concept directories. Replace disk reads with the same
_count_domains_from_qdrant function used by domain assignment.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 03:59:13 +00:00
3b37d96c4d Switch domain assignment to Qdrant as source of truth
Replace on-disk concept file reads with Qdrant payload queries for
domain assignment. This unlocks assignment for ~10,120 items that had
missing or legacy-only concept files on disk while Qdrant held the
correct 18-domain taxonomy data.

Changes:
- domain_assigner.py: Replace _count_concept_domains (disk) with
  _count_domains_from_qdrant and _count_domains_from_qdrant_batch
  (Qdrant scroll queries). Add _get_qdrant_client helper. Remove
  pass 3 defensive re-run (Qdrant reads are consistent). Add
  no_concepts terminal status for zero-vector documents.
- embedder.py: Post-embed hook passes existing qdrant client to
  compute_assignment, avoiding a second connection.
- recon.py: Backfill creates one QdrantClient for the batch. SQL
  filter includes existing needs_reprocess items. Dry-run reports
  no_concepts as separate bucket. --reprocess-missing removes
  concept-dir deletion step (no longer reads from disk).
- docs/domain-assignment.md: Algorithm references Qdrant, documents
  no_concepts status, removes pass 3 description.

Dry-run results: 20,453 assigned, 1,392 tied, 298 no_concepts,
0 needs_reprocess, 0 errors (previously 10,416 needs_reprocess).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 03:59:06 +00:00
c04ccc5011 Fix plugin for PeerTube v8.0.2 validation
PeerTube v8.0.2 requires a "bugs" field in package.json and an
unregister() export in main.js. Add both to pass plugin validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 01:55:47 +00:00
75b2e19e94 Fix: correct Contabo concept backup path in docs (/opt/backups/recon/concepts/)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 01:42:05 +00:00
a39ec56620 Docs: domain assignment guide, migration runbook, blast radius
- domain-assignment.md: algorithm walkthrough (pass 1/2/3), status values,
  CLI command reference, dashboard review guide
- migration-runbook.md: step-by-step deploy with pre-deploy backups,
  8 STOP pause points for operator verification, staged push rollout,
  quarantined --reprocess-missing procedure, 5 rollback procedures
- deploy-blast-radius.md: per-step risk reference with worst case,
  detection signals, rollback procedures, and risk tiers

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:06:49 +00:00
d1270be64d Phase 7: manual review dashboard for tied items
Adds /peertube/review page showing only tied_manual items for human
domain assignment. Each row displays video title, channel, concept
domain counts, and a dropdown to select the correct domain.

Routes: GET /peertube/review (page), GET /api/peertube/review/items
(JSON), GET /api/peertube/review/stats (counts),
POST /api/peertube/review/assign (assign + push to PeerTube).

Review subnav entry added to PEERTUBE_SUBNAV.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:06:25 +00:00
6a17df8078 Phase 6: post-embed domain assignment hook
After a stream.echo6.co video completes embedding, automatically runs
compute_assignment (pass 1 only). Clear winners get pushed to PeerTube
immediately; ties are marked tied_pass_1 for the batch tiebreaker.

Also tags stream docs that hit early-return paths (no concepts, no valid
concepts) with needs_reprocess status so they are visible to the
--reprocess-missing CLI command.

Error handling: domain assignment failure logs a warning but does not
block the embedding pipeline.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:06:07 +00:00
a273b52c7e Phase 5: assign-categories CLI commands
Adds assign-categories subcommand with flags:
  --backfill     Pass 1 domain assignment for all complete stream docs
  --tiebreaker-pass  Resolve ties via channel concept analysis
  --push-pending Push assigned categories to PeerTube API (staged via --limit)
  --reprocess-missing  Re-queue items with missing/legacy concepts
  --dry-run      Preview without writes (enhanced for reprocess: shows
                 concept dir existence and file counts)
  --limit N      Cap processing count

Includes pre-deletion audit logging for --reprocess-missing (logs path,
file count, and hash before each shutil.rmtree).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:05:19 +00:00
07d26a8ef0 Phase 4: authenticated PeerTube category writer
OAuth2 password-grant client for PeerTube API with token caching and
auto-refresh on 401. Pushes domain categories via PUT /api/v1/videos/{uuid}.

Includes limit parameter on push_pending for staged rollouts, and
systemic failure detection that aborts after 5 consecutive failures
(catches missing plugin or broken auth before wasting API calls).

Config section added to config.yaml for PeerTube API connection
parameters. Real credentials remain in .env (gitignored).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:04:51 +00:00
8ab1f8c82f Phase 3: compute_assignment and run_tiebreaker_pass
Domain assigner module with two functions:
- compute_assignment(): pass 1 concept domain count, inline per-document
- run_tiebreaker_pass(): batch channel-level tiebreaker (pass 2 + pass 3
  defensive re-run), mega-channel rule (>500 videos skip to tied_manual)

Filters legacy domains (Sustainment Systems, Off-Grid Systems, Defense &
Tactics) from concept counts automatically.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:04:37 +00:00
5f36c52fb1 Phase 2: documents table schema for domain assignment
Adds four columns to documents table via idempotent ALTER TABLE
migrations: recon_domain, recon_domain_status, recon_domain_assigned_at,
peertube_category_pushed_at. Adds index on recon_domain_status.

Includes StatusDB helper methods: get/set_domain_assignment,
set_peertube_pushed, get_unpushed_assignments, get_items_by_domain_status,
get_domain_status_counts, get_domain_distribution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:04:29 +00:00
71e3dc12ed Phase 1: PeerTube plugin and recon_domains module
Single source of truth for the 18 RECON knowledge domains mapped to
PeerTube category IDs 100-117. Replaces duplicate VALID_DOMAINS sets
in enricher.py and embedder.py with imports from lib/recon_domains.py.

Includes PeerTube plugin (peertube-plugin-recon-domains) that registers
custom categories via videoCategoryManager.addConstant(), and a parity
test to verify constants match between RECON and the PeerTube API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-28 00:04:13 +00:00
18 changed files with 1981 additions and 17 deletions

View file

@ -408,9 +408,14 @@ service:
peertube: peertube:
api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx) api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx)
api_url: http://192.168.1.170:9000 # Direct PeerTube API (bypasses nginx, for writer)
host_header: stream.echo6.co # Host header for PeerTube API requests
username: root # PeerTube admin username
password_env: PEERTUBE_PASSWORD # Env var holding PeerTube admin password
public_url: https://stream.echo6.co # Public URL for video links public_url: https://stream.echo6.co # Public URL for video links
fetch_timeout: 30 # HTTP timeout for API/VTT requests fetch_timeout: 30 # HTTP timeout for API/VTT requests
rate_limit_delay: 0.5 # Delay between video ingestions (seconds) rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
writer_rate_limit: 0.1 # Delay between category push API calls (seconds)
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
scraper: scraper:

14
docs/backlog.md Normal file
View file

@ -0,0 +1,14 @@
# RECON Backlog — Technical Debt
## Qdrant Migration Follow-ups (2026-04-28)
From the Qdrant source-of-truth migration pre-commit review. All nice-to-haves, zero production impact.
1. **domain_assigner.py — list-type domain handling**
`_count_domains_from_qdrant` counts every element in a list-type `domain` payload. The embedder's `_validate_classification` normalizes lists to `payload['domain'] = valid[0]` before upsert, so multi-element lists never exist in production Qdrant data. For spec consistency, could match the embedder's first-only normalization. Zero impact since the embedder guarantees bare strings.
2. **recon.py --tiebreaker-pass — Qdrant client threading**
The `--tiebreaker-pass` CLI branch calls `run_tiebreaker_pass(db, config)` without creating and passing a `QdrantClient`. The function handles this via lazy construction in `_get_qdrant_client`, which creates one client for the entire batch. Could thread a client from the CLI entry point for consistency with `--backfill`. Functionally fine as-is.
3. **_get_qdrant_client — debug log caller identification**
The debug log `"Creating new QdrantClient (caller did not pass one)"` doesn't identify which function triggered the lazy construction. Could include caller info (e.g., `inspect.stack()[1].function`) for easier debug session triage. Low priority since it only fires for lazy construction paths.

View file

@ -0,0 +1,20 @@
# Deploy Blast Radius Reference
Quick-reference for operators during deployment of domain categorization.
| Step | What Changes | Worst Case (Partial Failure) | Detection Signal | Rollback | Est. Rollback Time |
|------|-------------|------------------------------|-----------------|----------|-------------------|
| **Plugin install** | PeerTube plugin dir on CT 110 | PeerTube fails to start | `systemctl status peertube` shows failed | Move plugin dir to `.disabled`, restart PeerTube | 2 min |
| **PeerTube restart** | PeerTube service state | PeerTube crash loop | `journalctl -u peertube` shows repeated failures | Disable plugin, restart | 2 min |
| **Schema migration** (RECON restart) | 4 new nullable columns + 1 index in recon.db | Migration SQL error leaves partial columns | Python PRAGMA check fails | DROP COLUMN for each added column | 5 min |
| **--backfill** | `recon_domain` + `recon_domain_status` on ~22K rows | Wrong domain assignments | Spot-check 20 random docs | `UPDATE documents SET recon_domain = NULL, recon_domain_status = NULL ...` | 1 min |
| **--tiebreaker-pass** | ~1,100 rows: `tied_pass_1` to `tied_pass_2`/`tied_manual` | Wrong tiebreaker resolution | Spot-check 5 resolved items | Reset `tied_pass_2`/`tied_manual` back to `tied_pass_1` | 1 min |
| **--push-pending** | PeerTube `video.category` column on ~22K rows | Wrong categories visible to all PeerTube users | PeerTube UI shows wrong labels | `UPDATE video SET category = NULL WHERE category >= 100` + clear push timestamps | 2 min |
| **--reprocess-missing** | **DELETES** concept directories (irreversible locally) | Concepts deleted, re-enrichment fails (Gemini API down, quota hit) | `recon.py status` shows stuck `queued` items, concept dirs missing | Restore from Contabo backup (`rsync`) | 10-60 min depending on count |
## Risk Tiers
- **Low risk (read-only):** `--dry-run` on any command, status display
- **Medium risk (DB-only, reversible):** `--backfill`, `--tiebreaker-pass`, schema migration
- **High risk (external writes):** `--push-pending` (writes to PeerTube, visible to users)
- **Critical risk (destructive):** `--reprocess-missing` (deletes concept files, $130+ Gemini work at risk)

117
docs/domain-assignment.md Normal file
View file

@ -0,0 +1,117 @@
# Domain Assignment — Algorithm & Operations Guide
## Overview
RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin.
## Data Source
Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue.
Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data.
## Algorithm
### Pass 1: Concept Domain Count (inline, per-document)
Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`.
1. Query Qdrant for all points with `doc_hash` matching the document
2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only
3. If zero concept vectors → `no_concepts` (terminal)
4. If single top domain → `assigned`
5. If tied → `tied_pass_1` (deferred to tiebreaker)
### Pass 2: Channel Tiebreaker (batch)
Runs via `assign-categories --tiebreaker-pass`.
For each `tied_pass_1` document:
1. Identify the tied domains from Qdrant
2. Look up the document's channel (`catalogue.category`)
3. **Skip-list check:** If channel is in `MEGA_CHANNEL_SKIP_LIST` (known non-topical catch-alls), skip tiebreaking → `tied_manual`
4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter)
5. Among the tied domains only, pick the one with the highest channel-wide concept count
6. If resolved → `tied_pass_2`
7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review)
### Channel Skip List
Certain channels are known non-topical catch-alls where channel-wide concept aggregation produces meaningless noise. These are listed explicitly in `MEGA_CHANNEL_SKIP_LIST` (defined in `lib/recon_domains.py`) and skip tiebreaking entirely — their tied items go straight to `tied_manual` for dashboard review.
Current skip list:
- `Transcript` — Legacy catch-all (~9,200 videos), no topical coherence
This is intentionally an explicit list, not a size threshold. Legitimate large channels (e.g., 1a-auto, forgotten-weapons) run the tiebreaker normally because their content is topically coherent. Adding a channel to the skip list requires a code change and a documented reason.
## Status Values
| Status | Meaning | Terminal? | Next Action |
|--------|---------|-----------|-------------|
| `assigned` | Clear winner from pass 1 | No | Push to PeerTube |
| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` |
| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube |
| `tied_manual` | Needs human review | No | Review at `/peertube/review` |
| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) |
| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` |
| `manual_assigned` | Human override from dashboard | No | Already pushed |
**"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}`
## CLI Commands
```bash
cd /opt/recon && source venv/bin/activate
# Show current assignment status
python3 recon.py assign-categories
# Pass 1: backfill all unassigned complete stream documents
python3 recon.py assign-categories --backfill --dry-run
python3 recon.py assign-categories --backfill
# Pass 2: resolve ties via channel analysis
python3 recon.py assign-categories --tiebreaker-pass
# Push all assigned-but-unpushed categories to PeerTube API
python3 recon.py assign-categories --push-pending
# Re-queue items with transient failures for full re-processing
python3 recon.py assign-categories --reprocess-missing
# Limit processing count
python3 recon.py assign-categories --backfill --limit 100
```
## Dashboard Review
The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items. Each row displays:
- Video title and channel
- Top concept domains with counts
- Dropdown to select the correct domain
- Assign button (pushes to PeerTube immediately)
Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI.
## Pipeline Integration
New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`:
1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client
2. If clear winner: pushes category to PeerTube immediately
3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run
4. If no concepts: marks as `no_concepts` (terminal)
5. On Qdrant error: logs warning and continues — does not block the pipeline
## Source Files
| File | Purpose |
|------|---------|
| `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS |
| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers |
| `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` |
| `lib/embedder.py` | Post-embed hook (passes qdrant client) |
| `lib/status.py` | DB columns + helper methods |
| `lib/api.py` | Dashboard review routes |
| `recon.py` | CLI `assign-categories` command |

426
docs/migration-runbook.md Normal file
View file

@ -0,0 +1,426 @@
# Domain Categorization Migration Runbook
Step-by-step procedure to deploy the PeerTube domain categorization feature.
## Prerequisites
- Feature branch `feature/peertube-domain-categorization` merged to master (or checked out)
- SSH access to recon-vm (192.168.1.130) and CT 110 (192.168.1.170)
- PeerTube admin credentials (`root` / password in `.env`)
## Pre-Deploy Backups
These backups MUST be completed before any state-changing step.
### 1. Snapshot RECON database
```bash
ssh zvx@192.168.1.130
cp /opt/recon/data/recon.db "/opt/recon/data/recon.db.pre-domain-feature.$(date +%Y%m%d_%H%M%S).bak"
ls -la /opt/recon/data/recon.db.pre-domain-feature.*.bak # Confirm
```
### 2. Snapshot PeerTube PostgreSQL
```bash
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres pg_dump peertube_prod' > "/tmp/peertube_prod.pre-domain-feature.$(date +%Y%m%d_%H%M%S).sql"
ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero
```
### 3. Verify off-site concept backup
```bash
# Check last rsync to Contabo
ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5'
ssh root@100.64.0.1 'ls -la /opt/backups/recon/concepts/ | tail -5'
# Confirm timestamps match within 6 hours
```
### 4. Confirm RECON service state
```bash
ssh zvx@192.168.1.130 'sudo systemctl status recon --no-pager'
# Note: do NOT restart until Step 3. If currently running, confirm no active
# enrichment/embedding workers before proceeding.
```
---
## Step 1: Deploy PeerTube Plugin to CT 110
```bash
# From recon-vm, copy plugin to CT 110
ssh zvx@192.168.1.130
cd /opt/recon/peertube-plugin/
scp -r peertube-plugin-recon-domains root@192.168.1.241:'pct exec 110 -- mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
# Or via the Proxmox host:
ssh root@192.168.1.243 # media host
pct exec 110 -- bash -c 'mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
# Copy files into the container (scp from recon-vm or use pct push)
```
Alternative: Install via PeerTube admin UI (Admin > Plugins > Install).
```bash
# Restart PeerTube to register plugin
ssh root@192.168.1.243 'pct exec 110 -- systemctl restart peertube'
```
**STOP.** Check PeerTube logs for plugin registration errors:
```bash
ssh root@192.168.1.243 'pct exec 110 -- journalctl -u peertube --since=-5min' | grep -i plugin
```
If any errors reference `peertube-plugin-recon-domains`, do NOT proceed. Diagnose
and fix the plugin before continuing. See Rollback: "Plugin install fails" below.
## Step 2: Verify Plugin
```bash
# From recon-vm
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | grep -E '"1[0-1][0-9]"'
```
Should show all 18 categories (IDs 100-117). If any are missing, do NOT proceed.
Run the parity test:
```bash
cd /opt/recon && source venv/bin/activate
python3 tests/test_constants_parity.py
```
## Step 3: Apply Schema Migration
**Requires RECON restart (ask user first).**
```bash
sudo systemctl restart recon
```
The migration runs automatically on startup via `StatusDB._init_db()`. Verify:
```bash
cd /opt/recon && source venv/bin/activate
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
cols = [r[1] for r in conn.execute('PRAGMA table_info(documents)').fetchall()]
for c in ['recon_domain', 'recon_domain_status', 'recon_domain_assigned_at', 'peertube_category_pushed_at']:
assert c in cols, f'Missing: {c}'
print(f' {c}: OK')
# Verify index exists
indexes = [r[1] for r in conn.execute('PRAGMA index_list(documents)').fetchall()]
assert 'idx_documents_recon_domain_status' in indexes, 'Missing index'
print(' idx_documents_recon_domain_status: OK')
# Verify no columns were dropped
expected_existing = ['hash', 'status', 'filename', 'discovered_at']
for c in expected_existing:
assert c in cols, f'ALERT: existing column {c} is missing!'
print('Migration verified — all columns present, no existing columns dropped')
"
```
## Step 4: Run Backfill
```bash
cd /opt/recon && source venv/bin/activate
# Dry run first
python3 recon.py assign-categories --backfill --dry-run
```
**STOP.** Verify dry-run output distribution roughly matches investigation benchmarks:
- ~94.8% `assigned` (clear winners)
- ~5.2% `tied_pass_1` (ties)
- ~19.5% `needs_reprocess` (missing/legacy concepts)
If the distribution deviates more than 5 percentage points from these benchmarks,
halt and investigate. Do not proceed until the deviation is explained.
```bash
# Execute pass 1
python3 recon.py assign-categories --backfill
```
**STOP.** Spot-check 20 random assigned documents:
```bash
python3 -c "
from lib.status import StatusDB
db = StatusDB()
rows = db._get_conn().execute(
\"SELECT d.hash, d.recon_domain FROM documents d WHERE d.recon_domain_status = 'assigned' ORDER BY RANDOM() LIMIT 20\"
).fetchall()
for r in rows:
print(r['hash'][:12], r['recon_domain'])
"
```
For each, visually verify against concept files: `ls data/concepts/{hash}/` and
spot-check one `window_*.json` to confirm the assigned domain is plausible.
Halt if any are wildly wrong. See Rollback: "Clear wrong backfill assignments" below.
```bash
# Run tiebreaker pass
python3 recon.py assign-categories --tiebreaker-pass
```
**STOP.** Verify tiebreaker results:
```bash
python3 -c "
from lib.status import StatusDB
db = StatusDB()
c = db.get_domain_status_counts()
print('Status breakdown:', c)
print()
print('tied_pass_2 (resolved):', c.get('tied_pass_2', 0))
print('tied_manual (needs review):', c.get('tied_manual', 0))
"
```
Spot-check 5 `tied_pass_2` items — verify the resolved domain is plausible given
the channel's other content.
```bash
# Check overall status
python3 recon.py assign-categories
```
## Step 5: Push to PeerTube
Push in stages. Do NOT push all at once.
```bash
# Dry run: confirm count
python3 recon.py assign-categories --push-pending --dry-run
# Stage 1: push 100 items
python3 recon.py assign-categories --push-pending --limit 100
```
**STOP.** Verify in PeerTube UI (stream.echo6.co admin, or via API) that 100 videos
now show RECON domain categories. Spot-check 5 videos.
```bash
# Verify via API: pick a random pushed video
python3 -c "
from lib.status import StatusDB
db = StatusDB()
row = db._get_conn().execute(
\"SELECT d.recon_domain, c.path FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.peertube_category_pushed_at IS NOT NULL ORDER BY RANDOM() LIMIT 1\"
).fetchone()
if row:
uuid = row['path'].rsplit('/w/', 1)[-1] if row['path'] and '/w/' in row['path'] else '?'
print(f'Domain: {row[\"recon_domain\"]} UUID: {uuid}')
print(f'Check: curl -s http://192.168.1.170:9000/api/v1/videos/{uuid} -H \"Host: stream.echo6.co\" | python3 -m json.tool | grep category')
"
```
```bash
# Stage 2: push 1000 items
python3 recon.py assign-categories --push-pending --limit 1000
```
**STOP.** Verify via PeerTube database:
```bash
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
```
```bash
# Stage 3: push remaining
python3 recon.py assign-categories --push-pending
```
## Step 6: Verify
```bash
# Check PeerTube database directly
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
# Check uncategorized
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT count(*) FROM video WHERE category IS NULL"'
# Check RECON status
python3 recon.py assign-categories
```
## Step 7: Reprocess Missing Items (SEPARATE POST-DEPLOY OPERATION)
**WARNING:** This step deletes concept directories. It is the only destructive
operation in the entire feature. Run it separately from the initial deploy,
after all other steps are verified and stable.
```bash
# Dry run first — review what would be deleted
python3 recon.py assign-categories --reprocess-missing --dry-run --limit 10
```
**STOP.** Review output. Verify concept dirs listed are genuinely stale (legacy
domains only, or missing concept files). The dry-run reports file counts for
each directory that would be deleted.
```bash
# Small batch
python3 recon.py assign-categories --reprocess-missing --limit 10
```
**STOP.** Verify: check that 10 items re-entered the pipeline.
```bash
python3 recon.py status # queued count should increase by ~10
```
Wait for pipeline to process them. Verify domain assignment on completion:
```bash
# Check these specific items got re-enriched and assigned
python3 recon.py assign-categories
```
```bash
# Scale up
python3 recon.py assign-categories --reprocess-missing --limit 100
# Then unbounded
python3 recon.py assign-categories --reprocess-missing
```
**Note on interrupts:** If `--reprocess-missing` is interrupted mid-run, re-running
it is safe. Any documents stranded at `status='catalogued'` without being re-queued
can be recovered with `recon.py queue --source stream.echo6.co`.
## Step 8: Dashboard Review
Navigate to `https://recon.echo6.co/peertube/review` to review `tied_manual` items.
Each row shows the video, channel, tied domains, and concept counts. Select the
correct domain and click Assign.
---
## Rollback Procedures
### Plugin install fails or breaks PeerTube
```bash
# Disable plugin without uninstalling
ssh root@192.168.1.243 'pct exec 110 -- bash -c "
mv /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains \
/var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains.disabled
systemctl restart peertube
"'
# Verify PeerTube is healthy
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | head
# To fully remove: use PeerTube admin UI → Plugins → Uninstall
```
### Schema migration revert (drop new columns)
Only needed if the columns cause problems. The columns are nullable and have no
constraints, so they should be inert.
```bash
ssh zvx@192.168.1.130 'cd /opt/recon && source venv/bin/activate && python3 -c "
import sqlite3
conn = sqlite3.connect(\"data/recon.db\")
for col in [\"recon_domain\", \"recon_domain_status\", \"recon_domain_assigned_at\", \"peertube_category_pushed_at\"]:
try:
conn.execute(f\"ALTER TABLE documents DROP COLUMN {col}\")
print(f\"Dropped: {col}\")
except Exception as e:
print(f\"Skip {col}: {e}\")
conn.execute(\"DROP INDEX IF EXISTS idx_documents_recon_domain_status\")
conn.commit()
print(\"Index dropped\")
"'
```
Note: SQLite ALTER TABLE DROP COLUMN requires SQLite 3.35.0+ (2021-03-12).
Ubuntu 24.04 ships 3.45.1 — this is fine.
### Clear wrong backfill assignments (selective or full)
```bash
cd /opt/recon && source venv/bin/activate
# Clear ALL domain assignments
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('''UPDATE documents SET
recon_domain = NULL, recon_domain_status = NULL,
recon_domain_assigned_at = NULL, peertube_category_pushed_at = NULL''')
conn.commit()
print('Cleared all domain assignments')
"
# Clear only tiebreaker results (reset to tied_pass_1 for re-run)
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('''UPDATE documents SET
recon_domain = NULL, recon_domain_status = 'tied_pass_1',
recon_domain_assigned_at = NULL
WHERE recon_domain_status IN ('tied_pass_2', 'tied_manual')''')
conn.commit()
"
```
### Clear wrong PeerTube categories
```bash
# Reset ALL RECON categories (100+) to NULL in PeerTube
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
-c "UPDATE video SET category = NULL WHERE category >= 100"'
# Verify
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
-c "SELECT count(*) FROM video WHERE category >= 100"'
# Should return 0
# Also clear RECON pushed timestamps so --push-pending can retry
cd /opt/recon && source venv/bin/activate
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('UPDATE documents SET peertube_category_pushed_at = NULL WHERE peertube_category_pushed_at IS NOT NULL')
conn.commit()
print('Cleared push timestamps')
"
```
### Restore concepts after failed --reprocess-missing
```bash
# Concept backups are on Contabo at /opt/backups/recon/concepts/
# Identify which hashes were deleted (check RECON logs)
ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20'
# Restore specific hash from Contabo
HASH=<hash_from_log>
ssh root@100.64.0.1 "tar -cf - -C /opt/backups/recon/concepts/ $HASH" | \
ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/"
# Restore ALL concepts (nuclear option)
ssh root@100.64.0.1 'rsync -av /opt/backups/recon/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/'
```
### Fully remove feature
1. Uninstall plugin from PeerTube admin UI
2. Restart PeerTube
3. Revert RECON code changes (`git checkout master`)
4. Restart RECON
5. Drop schema columns (see above)
6. Reset PeerTube categories (see above)

View file

@ -88,6 +88,7 @@ KNOWLEDGE_SUBNAV = [
PEERTUBE_SUBNAV = [ PEERTUBE_SUBNAV = [
{'href': '/peertube', 'label': 'Dashboard'}, {'href': '/peertube', 'label': 'Dashboard'},
{'href': '/peertube/channels', 'label': 'Channels'}, {'href': '/peertube/channels', 'label': 'Channels'},
{'href': '/peertube/review', 'label': 'Review'},
] ]
@ -376,6 +377,86 @@ def peertube_channels():
domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels') domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels')
@app.route('/peertube/review')
def peertube_review():
from .recon_domains import VALID_DOMAINS
return render_template('peertube/review.html',
domain='peertube', subnav=PEERTUBE_SUBNAV,
active_page='/peertube/review',
valid_domains=sorted(VALID_DOMAINS))
@app.route('/api/peertube/review/stats')
def api_peertube_review_stats():
db = StatusDB()
counts = db.get_domain_status_counts()
return jsonify(counts)
@app.route('/api/peertube/review/items')
def api_peertube_review_items():
from .domain_assigner import _count_domains_from_qdrant, _get_qdrant_client
db = StatusDB()
config = get_config()
items = db.get_items_by_domain_status('tied_manual', limit=200)
qdrant = _get_qdrant_client(config)
collection = config['vector_db']['collection']
result = []
for item in items:
file_hash = item['hash']
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
top_domains = [{'domain': d, 'count': cnt}
for d, cnt in domain_counter.most_common(5)]
result.append({
'hash': file_hash,
'filename': item.get('filename', ''),
'category': item.get('category', ''),
'recon_domain': item.get('recon_domain'),
'recon_domain_status': item.get('recon_domain_status'),
'top_domains': top_domains,
})
return jsonify(result)
@app.route('/api/peertube/review/assign', methods=['POST'])
def api_peertube_review_assign():
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP
from .peertube_writer import push_category, extract_uuid
data = request.get_json()
file_hash = data.get('hash')
domain = data.get('domain')
if not file_hash or not domain:
return jsonify({'ok': False, 'error': 'Missing hash or domain'}), 400
if domain not in VALID_DOMAINS:
return jsonify({'ok': False, 'error': f'Invalid domain: {domain}'}), 400
db = StatusDB()
config = get_config()
db.set_domain_assignment(file_hash, domain, 'manual_assigned')
# Push to PeerTube
conn = db._get_conn()
cat_row = conn.execute(
"SELECT path FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if cat_row:
uuid = extract_uuid(dict(cat_row)['path'])
if uuid:
cat_id = DOMAIN_CATEGORY_MAP[domain]
try:
push_category(uuid, cat_id, config)
db.set_peertube_pushed(file_hash)
except Exception as e:
return jsonify({'ok': True, 'warning': f'Assigned but PeerTube push failed: {e}'})
return jsonify({'ok': True, 'domain': domain})
@app.route('/settings/keys') @app.route('/settings/keys')
def settings_keys(): def settings_keys():
from lib.key_manager import get_key_manager from lib.key_manager import get_key_manager

319
lib/domain_assigner.py Normal file
View file

@ -0,0 +1,319 @@
"""
RECON Domain Assigner
Computes per-video domain assignments from Qdrant vector payloads.
Two functions, two execution modes:
compute_assignment() pass 1, inline from post-embed hook
run_tiebreaker_pass() batch, resolves ties via channel concept scan
Data source: Qdrant `domain` payload field on concept vectors.
Previously read on-disk concept JSON files; migrated to Qdrant as
single source of truth (2026-04-28).
Status values written to documents.recon_domain_status:
assigned clear winner from pass 1 concept count
tied_pass_1 concept tie, awaiting channel tiebreaker
tied_pass_2 resolved by channel tiebreaker
tied_manual needs human review (dashboard)
no_concepts terminal, zero concept vectors in Qdrant
needs_reprocess transient failure (Qdrant error, etc.)
manual_assigned human override from dashboard
"""
from collections import Counter
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP, MEGA_CHANNEL_SKIP_LIST
from .utils import setup_logging
logger = setup_logging('recon.domain_assigner')
def _get_qdrant_client(config):
"""Create a QdrantClient from RECON config.
Callers should create one client and pass it through rather than
calling this repeatedly.
"""
logger.debug("Creating new QdrantClient (caller did not pass one)")
return QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
def _count_domains_from_qdrant(qdrant, collection, doc_hash):
"""Count valid domain occurrences for a single document from Qdrant.
Scrolls all points matching doc_hash and counts domain values.
Args:
qdrant: QdrantClient instance
collection: Qdrant collection name
doc_hash: Document hash to query
Returns:
Counter of {domain_name: count} for valid domains.
Empty Counter if no points found (never None).
"""
domain_counter = Counter()
offset = None
while True:
results, next_offset = qdrant.scroll(
collection_name=collection,
scroll_filter=Filter(must=[
FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash))
]),
with_payload=["domain"],
with_vectors=False,
limit=200,
offset=offset,
)
for point in results:
dom = point.payload.get('domain')
if isinstance(dom, str) and dom in VALID_DOMAINS:
domain_counter[dom] += 1
elif isinstance(dom, list):
for d in dom:
if isinstance(d, str) and d in VALID_DOMAINS:
domain_counter[d] += 1
if next_offset is None:
break
offset = next_offset
return domain_counter
def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes):
"""Count valid domain occurrences across multiple documents from Qdrant.
Single scroll with MatchAny filter, with offset pagination for large
result sets.
Args:
qdrant: QdrantClient instance
collection: Qdrant collection name
doc_hashes: List of document hashes to query
Returns:
Counter of {domain_name: count} aggregated across all matching points.
"""
if not doc_hashes:
return Counter()
domain_counter = Counter()
offset = None
while True:
results, next_offset = qdrant.scroll(
collection_name=collection,
scroll_filter=Filter(must=[
FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes))
]),
with_payload=["domain"],
with_vectors=False,
limit=10000,
offset=offset,
)
for point in results:
dom = point.payload.get('domain')
if isinstance(dom, str) and dom in VALID_DOMAINS:
domain_counter[dom] += 1
elif isinstance(dom, list):
for d in dom:
if isinstance(d, str) and d in VALID_DOMAINS:
domain_counter[d] += 1
if next_offset is None:
break
offset = next_offset
return domain_counter
def compute_assignment(file_hash, db, config, qdrant=None):
"""Compute domain assignment for a single document (pass 1).
Counts domain occurrences across all concept vectors in Qdrant.
If a single domain wins, assigns it. If tied, defers to batch
tiebreaker.
Args:
file_hash: Document hash
db: StatusDB instance
config: RECON config dict
qdrant: Optional QdrantClient (created if not provided)
Returns:
(domain, status) tuple where domain is a string or None,
and status is one of: 'assigned', 'tied_pass_1', 'no_concepts',
'needs_reprocess'
"""
owns_client = False
if qdrant is None:
qdrant = _get_qdrant_client(config)
owns_client = True
collection = config['vector_db']['collection']
try:
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
except Exception as e:
logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}")
return (None, 'needs_reprocess')
if len(domain_counter) == 0:
return (None, 'no_concepts')
top = domain_counter.most_common(2)
top_domain = top[0][0]
top_count = top[0][1]
if len(top) == 1 or top[1][1] < top_count:
return (top_domain, 'assigned')
# Tie — defer to tiebreaker pass
return (None, 'tied_pass_1')
def _get_tied_domains(qdrant, collection, file_hash):
"""Get the set of domains tied for first place in a document's concepts."""
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
if not domain_counter:
return []
top = domain_counter.most_common()
if not top:
return []
max_count = top[0][1]
return [dom for dom, cnt in top if cnt == max_count]
def _channel_video_hashes(db, channel_name, exclude_hash=None):
"""Get all document hashes belonging to a PeerTube channel.
Args:
db: StatusDB instance
channel_name: catalogue.category (channel actor name)
exclude_hash: Hash to exclude (the document being resolved)
Returns:
List of document hashes
"""
conn = db._get_conn()
rows = conn.execute(
"SELECT hash FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'",
(channel_name,)
).fetchall()
hashes = [r['hash'] for r in rows]
if exclude_hash:
hashes = [h for h in hashes if h != exclude_hash]
return hashes
def run_tiebreaker_pass(db, config, qdrant=None):
"""Resolve tied domain assignments using channel-level Qdrant analysis.
Processes all documents where recon_domain_status = 'tied_pass_1'.
For each tied document, queries Qdrant for domain counts from all
other videos in the same channel and picks the tied domain with the
highest channel-wide count.
Channels in MEGA_CHANNEL_SKIP_LIST (known non-topical catch-alls) skip
tiebreaking and go straight to 'tied_manual' for dashboard review.
Args:
db: StatusDB instance
config: RECON config dict
qdrant: Optional QdrantClient (created if not provided)
Returns:
Dict with counts: resolved, manual, skipped, errors
"""
owns_client = False
if qdrant is None:
qdrant = _get_qdrant_client(config)
owns_client = True
collection = config['vector_db']['collection']
tied_items = db.get_items_by_domain_status('tied_pass_1')
stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)}
logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve")
for item in tied_items:
file_hash = item['hash']
channel = item.get('category', '')
try:
tied_domains = _get_tied_domains(qdrant, collection, file_hash)
if not tied_domains:
db.set_domain_assignment(file_hash, None, 'no_concepts')
stats['skipped'] += 1
continue
if len(tied_domains) == 1:
# No longer tied (possibly re-enriched since pass 1)
db.set_domain_assignment(file_hash, tied_domains[0], 'assigned')
stats['resolved'] += 1
continue
# Skip-list check: known non-topical catch-all channels
if channel in MEGA_CHANNEL_SKIP_LIST:
fallback = sorted(tied_domains)[0]
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
stats['manual'] += 1
logger.debug(f" {file_hash[:12]}: skip-list channel '{channel}' → tied_manual")
continue
# Channel tiebreaker: count domains across all other videos in channel
other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash)
channel_domain_counts = _count_domains_from_qdrant_batch(
qdrant, collection, other_hashes
)
# Among tied domains only, pick highest channel-wide count
best_domain = None
best_count = -1
for dom in tied_domains:
c = channel_domain_counts.get(dom, 0)
if c > best_count:
best_count = c
best_domain = dom
# Check if channel tiebreaker resolved it
tied_at_channel = [d for d in tied_domains
if channel_domain_counts.get(d, 0) == best_count]
if len(tied_at_channel) == 1:
db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2')
stats['resolved'] += 1
logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)")
continue
# Still tied after channel scan — mark for manual review
fallback = sorted(tied_domains)[0]
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
stats['manual'] += 1
logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual")
except Exception as e:
logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}")
stats['errors'] += 1
logger.info(f"Tiebreaker complete: {stats['resolved']} resolved, "
f"{stats['manual']} manual, {stats['skipped']} skipped, "
f"{stats['errors']} errors")
return stats

View file

@ -27,13 +27,7 @@ from .utils import resolve_text_dir
logger = setup_logging('recon.embedder') logger = setup_logging('recon.embedder')
# ── Classification allowlists ─────────────────────────────────────────────── # ── Classification allowlists ───────────────────────────────────────────────
VALID_DOMAINS = { from .recon_domains import VALID_DOMAINS
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'} VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'} VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
@ -261,15 +255,22 @@ def embed_single(file_hash, db, config):
if not all_concepts: if not all_concepts:
db.update_status(file_hash, 'complete', vectors_inserted=0) db.update_status(file_hash, 'complete', vectors_inserted=0)
# Tag stream docs with no concepts for reprocessing
_cat = db._get_conn().execute(
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if _cat and dict(_cat)['source'] == 'stream.echo6.co':
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
logger.info(f"No concepts to embed for {doc['filename']}") logger.info(f"No concepts to embed for {doc['filename']}")
return True return True
# Look up source from catalogue once per doc # Look up source and path from catalogue once per doc
cat_conn = db._get_conn() cat_conn = db._get_conn()
cat_row = cat_conn.execute( cat_row = cat_conn.execute(
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,) "SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone() ).fetchone()
source = dict(cat_row)['source'] if cat_row else '' source = dict(cat_row)['source'] if cat_row else ''
catalogue_path = dict(cat_row)['path'] if cat_row else ''
download_url = '' download_url = ''
is_web = doc.get('path', '').startswith(('http://', 'https://')) is_web = doc.get('path', '').startswith(('http://', 'https://'))
@ -321,6 +322,8 @@ def embed_single(file_hash, db, config):
if not valid: if not valid:
db.update_status(file_hash, 'complete', vectors_inserted=0) db.update_status(file_hash, 'complete', vectors_inserted=0)
if source == 'stream.echo6.co':
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
logger.info(f"No valid concepts to embed for {doc['filename']}") logger.info(f"No valid concepts to embed for {doc['filename']}")
return True return True
@ -401,6 +404,28 @@ def embed_single(file_hash, db, config):
db.update_status(file_hash, 'complete', vectors_inserted=embedded_count) db.update_status(file_hash, 'complete', vectors_inserted=embedded_count)
logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)") logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)")
# Post-embed hook: assign domain for PeerTube videos
if source == 'stream.echo6.co':
try:
from .domain_assigner import compute_assignment
from .peertube_writer import push_category, extract_uuid
from .recon_domains import DOMAIN_CATEGORY_MAP
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
db.set_domain_assignment(file_hash, domain, status)
if domain and status == 'assigned':
cat_id = DOMAIN_CATEGORY_MAP[domain]
uuid = extract_uuid(catalogue_path)
if uuid:
pushed, _token = push_category(uuid, cat_id, config)
if pushed:
db.set_peertube_pushed(file_hash)
logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube")
else:
logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending")
except Exception as e:
logger.warning(f"Domain assignment failed for {file_hash}: {e}")
return True return True
except Exception as e: except Exception as e:

View file

@ -42,13 +42,7 @@ logger = setup_logging('recon.enricher')
STALE_ENRICHING_HOURS = 2 STALE_ENRICHING_HOURS = 2
# ── Classification allowlists ─────────────────────────────────────────────── # ── Classification allowlists ───────────────────────────────────────────────
VALID_DOMAINS = { from .recon_domains import VALID_DOMAINS
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'} VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'} VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}

323
lib/peertube_writer.py Normal file
View file

@ -0,0 +1,323 @@
"""
RECON PeerTube Writer
Authenticated PeerTube API client for pushing domain category assignments.
Uses OAuth2 password grant, caches tokens, refreshes on 401.
Config keys used:
peertube.api_url internal PeerTube URL (http://192.168.1.170:9000)
peertube.host_header Host header for API requests (stream.echo6.co)
peertube.username PeerTube admin username
peertube.password_env env var name holding the password
peertube.rate_limit_delay delay between API calls (seconds)
"""
import json
import os
import time
import requests as http_requests
from .recon_domains import DOMAIN_CATEGORY_MAP
from .utils import setup_logging
logger = setup_logging('recon.peertube_writer')
TOKEN_CACHE_PATH = '/opt/recon/data/peertube-oauth-token.json'
def _get_peertube_config(config):
"""Extract PeerTube writer config with defaults."""
pt = config.get('peertube', {})
return {
'api_url': pt.get('api_url', pt.get('api_base', 'http://192.168.1.170:9000')),
'host_header': pt.get('host_header', 'stream.echo6.co'),
'username': pt.get('username', 'root'),
'password_env': pt.get('password_env', 'PEERTUBE_PASSWORD'),
'rate_limit_delay': pt.get('writer_rate_limit', 0.1),
}
def _load_cached_token():
"""Load cached OAuth token from disk."""
if os.path.exists(TOKEN_CACHE_PATH):
try:
with open(TOKEN_CACHE_PATH, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return None
def _save_token(token_data):
"""Save OAuth token to disk cache."""
os.makedirs(os.path.dirname(TOKEN_CACHE_PATH), exist_ok=True)
with open(TOKEN_CACHE_PATH, 'w') as f:
json.dump(token_data, f)
def _get_oauth_client(api_url, host_header):
"""Get PeerTube OAuth client credentials.
Args:
api_url: Base API URL
host_header: Host header value
Returns:
(client_id, client_secret) tuple
"""
resp = http_requests.get(
f"{api_url}/api/v1/oauth-clients/local",
headers={'Host': host_header},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data['client_id'], data['client_secret']
def _get_token(api_url, host_header, username, password, client_id, client_secret):
"""Obtain OAuth2 access token via password grant.
Args:
api_url: Base API URL
host_header: Host header value
username: PeerTube username
password: PeerTube password
client_id: OAuth client ID
client_secret: OAuth client secret
Returns:
Token data dict with access_token, refresh_token, etc.
"""
resp = http_requests.post(
f"{api_url}/api/v1/users/token",
headers={'Host': host_header},
data={
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'password',
'username': username,
'password': password,
},
timeout=30,
)
resp.raise_for_status()
token_data = resp.json()
token_data['client_id'] = client_id
token_data['client_secret'] = client_secret
_save_token(token_data)
return token_data
def _refresh_token(api_url, host_header, token_data):
"""Refresh an expired access token.
Returns:
New token data dict, or None on failure.
"""
try:
resp = http_requests.post(
f"{api_url}/api/v1/users/token",
headers={'Host': host_header},
data={
'client_id': token_data['client_id'],
'client_secret': token_data['client_secret'],
'grant_type': 'refresh_token',
'refresh_token': token_data['refresh_token'],
},
timeout=30,
)
resp.raise_for_status()
new_data = resp.json()
new_data['client_id'] = token_data['client_id']
new_data['client_secret'] = token_data['client_secret']
_save_token(new_data)
return new_data
except Exception as e:
logger.warning(f"Token refresh failed: {e}")
return None
def _ensure_token(config):
"""Ensure we have a valid OAuth token. Returns token data dict.
Tries cached token first, then obtains a new one.
"""
pt = _get_peertube_config(config)
password = os.environ.get(pt['password_env'], '')
if not password:
raise ValueError(f"PeerTube password not set in env var {pt['password_env']}")
# Try cached token
token_data = _load_cached_token()
if token_data and 'access_token' in token_data:
return token_data
# Get fresh token
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
return _get_token(
pt['api_url'], pt['host_header'],
pt['username'], password,
client_id, client_secret,
)
def _api_request(method, path, config, token_data, **kwargs):
"""Make an authenticated PeerTube API request with auto-refresh on 401.
Args:
method: HTTP method ('GET', 'PUT', etc.)
path: API path (e.g. '/api/v1/videos/{uuid}')
config: RECON config dict
token_data: Current token data dict
**kwargs: Additional requests kwargs (json, data, etc.)
Returns:
(response, token_data) tuple token_data may be refreshed.
"""
pt = _get_peertube_config(config)
url = f"{pt['api_url']}{path}"
headers = {
'Host': pt['host_header'],
'Authorization': f"Bearer {token_data['access_token']}",
}
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
if resp.status_code == 401:
# Try refresh
new_token = _refresh_token(pt['api_url'], pt['host_header'], token_data)
if new_token:
headers['Authorization'] = f"Bearer {new_token['access_token']}"
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
return resp, new_token
else:
# Full re-auth
password = os.environ.get(pt['password_env'], '')
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
new_token = _get_token(
pt['api_url'], pt['host_header'],
pt['username'], password,
client_id, client_secret,
)
headers['Authorization'] = f"Bearer {new_token['access_token']}"
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
return resp, new_token
return resp, token_data
def push_category(video_uuid, category_id, config, token_data=None):
"""Push a category assignment to a single PeerTube video.
Args:
video_uuid: PeerTube video UUID
category_id: Category ID (100-117)
config: RECON config dict
token_data: Optional pre-fetched token data
Returns:
(success: bool, token_data: dict) tuple
"""
if token_data is None:
token_data = _ensure_token(config)
resp, token_data = _api_request(
'PUT',
f'/api/v1/videos/{video_uuid}',
config,
token_data,
json={'category': category_id},
)
if resp.status_code in (200, 204):
return True, token_data
else:
logger.warning(f"Failed to push category for {video_uuid}: "
f"HTTP {resp.status_code}{resp.text[:200]}")
return False, token_data
def extract_uuid(catalogue_path):
"""Extract PeerTube video UUID from catalogue path.
Catalogue paths for PeerTube videos look like:
https://stream.echo6.co/w/UUID
Args:
catalogue_path: catalogue.path value
Returns:
UUID string or None
"""
if not catalogue_path:
return None
if '/w/' in catalogue_path:
return catalogue_path.rsplit('/w/', 1)[-1]
return None
def push_pending(db, config, limit=None):
"""Push all assigned-but-unpushed domain categories to PeerTube.
Args:
db: StatusDB instance
config: RECON config dict
limit: Optional max number of items to push
Returns:
(success_count, fail_count) tuple
"""
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
if not items:
logger.info("No unpushed assignments to push")
return (0, 0)
pt = _get_peertube_config(config)
delay = pt['rate_limit_delay']
SYSTEMIC_FAIL_THRESHOLD = 5 # abort if first N items all fail
logger.info(f"Pushing {len(items)} category assignments to PeerTube")
token_data = _ensure_token(config)
success = 0
failed = 0
for item in items:
file_hash = item['hash']
domain = item.get('recon_domain')
catalogue_path = item.get('catalogue_path', '')
if not domain or domain not in DOMAIN_CATEGORY_MAP:
logger.warning(f" {file_hash[:12]}: invalid domain '{domain}', skipping")
failed += 1
continue
uuid = extract_uuid(catalogue_path)
if not uuid:
logger.warning(f" {file_hash[:12]}: could not extract UUID from '{catalogue_path}'")
failed += 1
continue
category_id = DOMAIN_CATEGORY_MAP[domain]
ok, token_data = push_category(uuid, category_id, config, token_data)
if ok:
db.set_peertube_pushed(file_hash)
success += 1
else:
failed += 1
# Abort on systemic failure (e.g. plugin not installed, auth broken)
if success == 0 and failed >= SYSTEMIC_FAIL_THRESHOLD:
logger.error(f"Aborting push: first {failed} items all failed — "
f"check plugin installation and PeerTube API config")
break
time.sleep(delay)
logger.info(f"Push complete: {success} succeeded, {failed} failed")
return (success, failed)

46
lib/recon_domains.py Normal file
View file

@ -0,0 +1,46 @@
"""
RECON Domain Taxonomy
Single source of truth for the 18 knowledge domains and their PeerTube
category ID mappings. IDs 100-117 are registered via the
peertube-plugin-recon-domains plugin.
Import VALID_DOMAINS from here instead of defining local sets.
"""
DOMAIN_CATEGORY_MAP = {
'Agriculture & Livestock': 100,
'Civil Organization': 101,
'Communications': 102,
'Food Systems': 103,
'Foundational Skills': 104,
'Logistics': 105,
'Medical': 106,
'Navigation': 107,
'Operations': 108,
'Power Systems': 109,
'Preservation & Storage': 110,
'Security': 111,
'Shelter & Construction': 112,
'Technology': 113,
'Tools & Equipment': 114,
'Vehicles': 115,
'Water Systems': 116,
'Wilderness Skills': 117,
}
VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys())
CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()}
# Channels whose tiebreaker is skipped because their content is non-topical
# (catch-alls, miscellany dumps, etc.). Items in these channels with tied
# domain counts go straight to tied_manual without channel-context tiebreaker.
#
# This is intentionally a hardcoded explicit list, not a size threshold.
# Adding a channel here requires an explicit decision — only add channels
# that are genuinely non-topical catch-alls where channel-wide concept
# aggregation would produce meaningless noise.
MEGA_CHANNEL_SKIP_LIST = {
'Transcript', # Legacy catch-all, ~9,200 videos, no topical coherence
}

View file

@ -124,6 +124,22 @@ class StatusDB:
except Exception: except Exception:
pass # column already exists pass # column already exists
# Migration: domain assignment columns for PeerTube categorization
for col, coltype in [
('recon_domain', 'TEXT'),
('recon_domain_status', 'TEXT'),
('recon_domain_assigned_at', 'TEXT'),
('peertube_category_pushed_at', 'TEXT'),
]:
try:
conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}")
except Exception:
pass # column already exists
try:
conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)")
except Exception:
pass # index already exists
# Stream B: file_operations + duplicate_review tables # Stream B: file_operations + duplicate_review tables
conn.executescript(""" conn.executescript("""
CREATE TABLE IF NOT EXISTS file_operations ( CREATE TABLE IF NOT EXISTS file_operations (
@ -448,6 +464,90 @@ class StatusDB:
conn.commit() conn.commit()
# ── Domain Assignment Helpers ──────────────────<E29480><E29480>─────────────
def get_domain_assignment(self, file_hash):
"""Get domain assignment for a document.
Returns:
(recon_domain, recon_domain_status) tuple, or (None, None) if not set.
"""
conn = self._get_conn()
row = conn.execute(
"SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?",
(file_hash,)
).fetchone()
if row:
return (row['recon_domain'], row['recon_domain_status'])
return (None, None)
def set_domain_assignment(self, file_hash, domain, status):
"""Set domain assignment and status for a document."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET recon_domain = ?, recon_domain_status = ?, "
"recon_domain_assigned_at = ? WHERE hash = ?",
(domain, status, datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def set_peertube_pushed(self, file_hash):
"""Mark a document's category as pushed to PeerTube."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?",
(datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def get_unpushed_assignments(self):
"""Get documents with domain assignments not yet pushed to PeerTube.
Only returns stream.echo6.co source documents.
"""
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"""SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain IS NOT NULL
AND d.peertube_category_pushed_at IS NULL
AND c.source = 'stream.echo6.co'
ORDER BY d.recon_domain_assigned_at""",
).fetchall()]
def get_items_by_domain_status(self, status, limit=None):
"""Get documents by domain assignment status."""
conn = self._get_conn()
q = """SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain_status = ?
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
def get_domain_status_counts(self):
"""Get counts of documents by domain assignment status."""
conn = self._get_conn()
return {row['recon_domain_status']: row['cnt']
for row in conn.execute(
"SELECT recon_domain_status, COUNT(*) as cnt "
"FROM documents WHERE recon_domain_status IS NOT NULL "
"GROUP BY recon_domain_status"
).fetchall()}
def get_domain_distribution(self):
"""Get counts of documents per assigned domain."""
conn = self._get_conn()
return {row['recon_domain']: row['cnt']
for row in conn.execute(
"SELECT recon_domain, COUNT(*) as cnt "
"FROM documents WHERE recon_domain IS NOT NULL "
"GROUP BY recon_domain ORDER BY cnt DESC"
).fetchall()}
# ── Scraper Job Helpers ───────────────────────────────────── # ── Scraper Job Helpers ─────────────────────────────────────
def get_pending_scrape_job(self): def get_pending_scrape_job(self):

View file

@ -0,0 +1,52 @@
# peertube-plugin-recon-domains
Registers 18 RECON knowledge domains as PeerTube video categories using IDs 100-117. These categories are assigned automatically by RECON's domain assignment pipeline based on concept extraction analysis.
## Category Mapping
| ID | Domain |
|-----|---------------------------|
| 100 | Agriculture & Livestock |
| 101 | Civil Organization |
| 102 | Communications |
| 103 | Food Systems |
| 104 | Foundational Skills |
| 105 | Logistics |
| 106 | Medical |
| 107 | Navigation |
| 108 | Operations |
| 109 | Power Systems |
| 110 | Preservation & Storage |
| 111 | Security |
| 112 | Shelter & Construction |
| 113 | Technology |
| 114 | Tools & Equipment |
| 115 | Vehicles |
| 116 | Water Systems |
| 117 | Wilderness Skills |
Built-in PeerTube categories (IDs 1-18) are not modified.
## Install
```bash
# Copy plugin to PeerTube storage
cp -r peertube-plugin-recon-domains /var/www/peertube/storage/plugins/node_modules/
# Register plugin via API or admin UI
# Admin > Plugins > Install > peertube-plugin-recon-domains
# Restart PeerTube
sudo systemctl restart peertube
```
## Uninstall
Remove the plugin via PeerTube admin UI or:
```bash
rm -rf /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains
sudo systemctl restart peertube
```
Videos with RECON categories will revert to showing the raw category ID until the plugin is reinstalled.

View file

@ -0,0 +1,32 @@
async function register ({ videoCategoryManager }) {
const reconDomains = {
100: 'Agriculture & Livestock',
101: 'Civil Organization',
102: 'Communications',
103: 'Food Systems',
104: 'Foundational Skills',
105: 'Logistics',
106: 'Medical',
107: 'Navigation',
108: 'Operations',
109: 'Power Systems',
110: 'Preservation & Storage',
111: 'Security',
112: 'Shelter & Construction',
113: 'Technology',
114: 'Tools & Equipment',
115: 'Vehicles',
116: 'Water Systems',
117: 'Wilderness Skills'
}
for (const [id, label] of Object.entries(reconDomains)) {
videoCategoryManager.addConstant(parseInt(id), label)
}
}
async function unregister () {
return
}
module.exports = { register, unregister }

View file

@ -0,0 +1,21 @@
{
"name": "peertube-plugin-recon-domains",
"version": "1.0.0",
"description": "Registers 18 RECON knowledge domains as PeerTube video categories (IDs 100-117)",
"engine": {
"peertube": ">=6.0.0"
},
"keywords": [
"peertube",
"plugin"
],
"homepage": "https://forge.echo6.co/matt/recon",
"author": "Echo6",
"license": "MIT",
"library": "./main.js",
"staticDirs": {},
"css": [],
"clientScripts": [],
"translations": {},
"bugs": "https://forge.echo6.co/matt/recon/issues"
}

170
recon.py
View file

@ -863,6 +863,166 @@ def cmd_ingest(args):
return 0 return 0
def cmd_assign_categories(args):
"""Assign RECON domains to PeerTube videos and push categories."""
from qdrant_client import QdrantClient
from lib.domain_assigner import compute_assignment, run_tiebreaker_pass
from lib.peertube_writer import push_pending, extract_uuid
from lib.recon_domains import DOMAIN_CATEGORY_MAP
config = get_config()
db = StatusDB()
dry_run = args.dry_run
limit = args.limit
if args.backfill:
# Pass 1: assign domains to all complete stream docs with no assignment
# or that previously got needs_reprocess
conn = db._get_conn()
q = """SELECT d.hash FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.status = 'complete'
AND (d.recon_domain IS NULL
OR d.recon_domain_status = 'needs_reprocess')
AND c.source = 'stream.echo6.co'
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
rows = conn.execute(q).fetchall()
hashes = [r['hash'] for r in rows]
if not hashes:
print("No unassigned complete stream documents found")
return 0
print(f"Backfill: processing {len(hashes)} documents" +
(" [DRY RUN]" if dry_run else ""))
# Create one Qdrant client for the entire backfill
qdrant = QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0}
for i, file_hash in enumerate(hashes):
try:
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
stats[status] = stats.get(status, 0) + 1
if not dry_run:
db.set_domain_assignment(file_hash, domain, status)
if (i + 1) % 1000 == 0:
print(f" Progress: {i + 1}/{len(hashes)}")
except Exception as e:
stats['errors'] += 1
logger.warning(f" Assignment error for {file_hash[:12]}: {e}")
print(f"\nBackfill results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.tiebreaker_pass:
if dry_run:
items = db.get_items_by_domain_status('tied_pass_1')
print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]")
return 0
stats = run_tiebreaker_pass(db, config)
print(f"\nTiebreaker results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.push_pending:
if dry_run:
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
print(f"Push pending: {len(items)} items would be pushed [DRY RUN]")
return 0
success, failed = push_pending(db, config, limit=limit)
print(f"\nPush results: {success} succeeded, {failed} failed")
return 0
elif args.reprocess_missing:
items = db.get_items_by_domain_status('needs_reprocess', limit=limit)
if not items:
print("No items with needs_reprocess status")
return 0
print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else ""))
requeued = 0
for item in items:
file_hash = item['hash']
if dry_run:
print(f" Would reprocess: {file_hash[:12]}{item.get('filename', '?')}")
requeued += 1
continue
# Reset document status to allow re-processing
conn = db._get_conn()
conn.execute(
"""UPDATE documents SET
status = 'catalogued',
concepts_extracted = 0,
vectors_inserted = 0,
recon_domain = NULL,
recon_domain_status = NULL,
recon_domain_assigned_at = NULL,
peertube_category_pushed_at = NULL,
error_message = NULL,
extracted_at = NULL,
enriched_at = NULL,
embedded_at = NULL
WHERE hash = ?""",
(file_hash,)
)
conn.commit()
# Re-queue for pipeline processing
db.queue_document(file_hash)
requeued += 1
print(f"Requeued {requeued} items for reprocessing")
return 0
else:
# Default: show domain assignment status
status_counts = db.get_domain_status_counts()
domain_dist = db.get_domain_distribution()
conn = db._get_conn()
total_stream = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'"""
).fetchone()['cnt']
unassigned = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'
AND d.recon_domain IS NULL"""
).fetchone()['cnt']
unpushed = len(db.get_unpushed_assignments())
print("=== Domain Assignment Status ===\n")
print(f"Total complete stream docs: {total_stream}")
print(f"Unassigned: {unassigned}")
print(f"Unpushed to PeerTube: {unpushed}")
if status_counts:
print(f"\nAssignment status breakdown:")
for status, cnt in sorted(status_counts.items()):
print(f" {status:<20s} {cnt:>6d}")
if domain_dist:
print(f"\nDomain distribution:")
for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]):
print(f" {domain:<35s} {cnt:>6d}")
return 0
def cmd_pipeline(args): def cmd_pipeline(args):
"""Stream B library pipeline: status, migrate, reverse, watch, sweep.""" """Stream B library pipeline: status, migrate, reverse, watch, sweep."""
from lib.new_pipeline import ( from lib.new_pipeline import (
@ -1158,6 +1318,16 @@ def main():
p.set_defaults(func=cmd_ingest) p.set_defaults(func=cmd_ingest)
# assign-categories
p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos')
p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs')
p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis')
p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API')
p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items')
p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing')
p.add_argument('--limit', type=int, help='Limit number of items to process')
p.set_defaults(func=cmd_assign_categories)
# pipeline (Stream B) # pipeline (Stream B)
p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)') p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)')
p.add_argument('pipeline_action', nargs='?', default='status', p.add_argument('pipeline_action', nargs='?', default='status',

View file

@ -0,0 +1,148 @@
{% extends "base.html" %}
{% block content %}
<div id="pt-review">
<div class="stat-grid" style="grid-template-columns:repeat(4, 1fr);">
<div class="stat-card"><div class="label">Manual Review</div><div class="value" id="rv-manual"></div></div>
<div class="stat-card"><div class="label">Assigned</div><div class="value" id="rv-assigned"></div></div>
<div class="stat-card"><div class="label">Tied (Pass 1)</div><div class="value" id="rv-tied1"></div></div>
<div class="stat-card"><div class="label">Needs Reprocess</div><div class="value" id="rv-reprocess"></div></div>
</div>
<div class="panel" style="margin-top:16px;">
<h3 class="section-title" style="margin-bottom:12px;">Manual Review Queue</h3>
<div id="rv-items-container">
<table class="data-table" id="rv-table" style="display:none;">
<thead>
<tr>
<th>Video</th>
<th>Channel</th>
<th>Current Domain</th>
<th>Top Domains</th>
<th>Assign</th>
</tr>
</thead>
<tbody id="rv-tbody"></tbody>
</table>
<div id="rv-empty" class="text-muted" style="padding:24px;text-align:center;">Loading...</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
const VALID_DOMAINS = {{ valid_domains | tojson }};
async function loadStats() {
try {
const resp = await fetch('/api/peertube/review/stats');
const data = await resp.json();
document.getElementById('rv-manual').textContent = data.tied_manual || 0;
document.getElementById('rv-assigned').textContent =
(data.assigned || 0) + (data.tied_pass_2 || 0) + (data.manual_assigned || 0);
document.getElementById('rv-tied1').textContent = data.tied_pass_1 || 0;
document.getElementById('rv-reprocess').textContent = data.needs_reprocess || 0;
} catch (e) {
console.error('Failed to load stats:', e);
}
}
async function loadItems() {
try {
const resp = await fetch('/api/peertube/review/items');
const items = await resp.json();
const tbody = document.getElementById('rv-tbody');
const table = document.getElementById('rv-table');
const empty = document.getElementById('rv-empty');
if (!items.length) {
empty.textContent = 'No items pending manual review.';
table.style.display = 'none';
return;
}
tbody.innerHTML = '';
table.style.display = '';
empty.style.display = 'none';
items.forEach(item => {
const tr = document.createElement('tr');
tr.id = 'row-' + item.hash;
// Video title/filename
const tdVideo = document.createElement('td');
tdVideo.textContent = item.filename || item.hash.slice(0, 12);
tdVideo.title = item.hash;
tr.appendChild(tdVideo);
// Channel
const tdChannel = document.createElement('td');
tdChannel.textContent = item.category || '—';
tr.appendChild(tdChannel);
// Current domain
const tdCurrent = document.createElement('td');
tdCurrent.textContent = item.recon_domain || '—';
tr.appendChild(tdCurrent);
// Top domains (from concept counts)
const tdTop = document.createElement('td');
if (item.top_domains) {
tdTop.innerHTML = item.top_domains.map(d =>
'<span class="badge">' + d.domain + ' (' + d.count + ')</span>'
).join(' ');
}
tr.appendChild(tdTop);
// Assign dropdown + button
const tdAssign = document.createElement('td');
const sel = document.createElement('select');
sel.className = 'input-sm';
sel.innerHTML = '<option value="">Select...</option>' +
VALID_DOMAINS.map(d => '<option value="' + d + '">' + d + '</option>').join('');
if (item.recon_domain) {
sel.value = item.recon_domain;
}
const btn = document.createElement('button');
btn.className = 'btn btn-sm btn-primary';
btn.textContent = 'Assign';
btn.onclick = () => assignDomain(item.hash, sel.value, tr);
tdAssign.appendChild(sel);
tdAssign.appendChild(btn);
tr.appendChild(tdAssign);
tbody.appendChild(tr);
});
} catch (e) {
document.getElementById('rv-empty').textContent = 'Error loading items: ' + e.message;
}
}
async function assignDomain(hash, domain, row) {
if (!domain) {
alert('Select a domain first');
return;
}
try {
const resp = await fetch('/api/peertube/review/assign', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({hash: hash, domain: domain})
});
const result = await resp.json();
if (result.ok) {
row.style.opacity = '0.4';
row.querySelector('button').disabled = true;
row.querySelector('button').textContent = 'Done';
loadStats();
} else {
alert('Assignment failed: ' + (result.error || 'unknown error'));
}
} catch (e) {
alert('Error: ' + e.message);
}
}
loadStats();
loadItems();
</script>
{% endblock %}

View file

@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
Parity test: verify RECON domain taxonomy matches PeerTube categories.
Tests:
1. DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS exactly
2. PeerTube API returns all 18 RECON categories (IDs 100-117) with correct labels
Usage:
cd /opt/recon && source venv/bin/activate
python3 tests/test_constants_parity.py
"""
import json
import sys
import requests
# Add parent dir to path for imports
sys.path.insert(0, '/opt/recon')
from lib.recon_domains import DOMAIN_CATEGORY_MAP, VALID_DOMAINS, CATEGORY_DOMAIN_MAP
def test_local_parity():
"""Verify DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS."""
map_keys = set(DOMAIN_CATEGORY_MAP.keys())
assert map_keys == VALID_DOMAINS, (
f"Mismatch: in map but not VALID_DOMAINS: {map_keys - VALID_DOMAINS}, "
f"in VALID_DOMAINS but not map: {VALID_DOMAINS - map_keys}"
)
assert len(CATEGORY_DOMAIN_MAP) == len(DOMAIN_CATEGORY_MAP), "Reverse map size mismatch"
print(f"[OK] Local parity: {len(VALID_DOMAINS)} domains, map keys match VALID_DOMAINS")
def test_peertube_categories():
"""Verify PeerTube API returns all 18 RECON categories."""
url = "http://192.168.1.170:9000/api/v1/videos/categories"
headers = {"Host": "stream.echo6.co"}
try:
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
except Exception as e:
print(f"[SKIP] PeerTube API unreachable: {e}")
return
categories = resp.json() # dict of {id_str: label}
missing = []
wrong_label = []
for domain, cat_id in DOMAIN_CATEGORY_MAP.items():
cat_str = str(cat_id)
if cat_str not in categories:
missing.append((cat_id, domain))
elif categories[cat_str] != domain:
wrong_label.append((cat_id, domain, categories[cat_str]))
if missing:
print(f"[FAIL] Missing categories in PeerTube: {missing}")
print(" Deploy peertube-plugin-recon-domains to CT 110 first")
sys.exit(1)
if wrong_label:
print(f"[FAIL] Wrong labels in PeerTube: {wrong_label}")
sys.exit(1)
print(f"[OK] PeerTube parity: all {len(DOMAIN_CATEGORY_MAP)} categories present with correct labels")
if __name__ == '__main__':
test_local_parity()
test_peertube_categories()
print("\nAll parity tests passed.")