mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 14:44:54 +02:00
Compare commits
14 commits
master
...
feature/pe
| Author | SHA1 | Date | |
|---|---|---|---|
| 299be21f42 | |||
| d8196e60c7 | |||
| 043119fbaa | |||
| 3b37d96c4d | |||
| c04ccc5011 | |||
| 75b2e19e94 | |||
| a39ec56620 | |||
| d1270be64d | |||
| 6a17df8078 | |||
| a273b52c7e | |||
| 07d26a8ef0 | |||
| 8ab1f8c82f | |||
| 5f36c52fb1 | |||
| 71e3dc12ed |
18 changed files with 1981 additions and 17 deletions
|
|
@ -408,9 +408,14 @@ service:
|
||||||
|
|
||||||
peertube:
|
peertube:
|
||||||
api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx)
|
api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx)
|
||||||
|
api_url: http://192.168.1.170:9000 # Direct PeerTube API (bypasses nginx, for writer)
|
||||||
|
host_header: stream.echo6.co # Host header for PeerTube API requests
|
||||||
|
username: root # PeerTube admin username
|
||||||
|
password_env: PEERTUBE_PASSWORD # Env var holding PeerTube admin password
|
||||||
public_url: https://stream.echo6.co # Public URL for video links
|
public_url: https://stream.echo6.co # Public URL for video links
|
||||||
fetch_timeout: 30 # HTTP timeout for API/VTT requests
|
fetch_timeout: 30 # HTTP timeout for API/VTT requests
|
||||||
rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
|
rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
|
||||||
|
writer_rate_limit: 0.1 # Delay between category push API calls (seconds)
|
||||||
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
|
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
|
||||||
|
|
||||||
scraper:
|
scraper:
|
||||||
|
|
|
||||||
14
docs/backlog.md
Normal file
14
docs/backlog.md
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
# RECON Backlog — Technical Debt
|
||||||
|
|
||||||
|
## Qdrant Migration Follow-ups (2026-04-28)
|
||||||
|
|
||||||
|
From the Qdrant source-of-truth migration pre-commit review. All nice-to-haves, zero production impact.
|
||||||
|
|
||||||
|
1. **domain_assigner.py — list-type domain handling**
|
||||||
|
`_count_domains_from_qdrant` counts every element in a list-type `domain` payload. The embedder's `_validate_classification` normalizes lists to `payload['domain'] = valid[0]` before upsert, so multi-element lists never exist in production Qdrant data. For spec consistency, could match the embedder's first-only normalization. Zero impact since the embedder guarantees bare strings.
|
||||||
|
|
||||||
|
2. **recon.py --tiebreaker-pass — Qdrant client threading**
|
||||||
|
The `--tiebreaker-pass` CLI branch calls `run_tiebreaker_pass(db, config)` without creating and passing a `QdrantClient`. The function handles this via lazy construction in `_get_qdrant_client`, which creates one client for the entire batch. Could thread a client from the CLI entry point for consistency with `--backfill`. Functionally fine as-is.
|
||||||
|
|
||||||
|
3. **_get_qdrant_client — debug log caller identification**
|
||||||
|
The debug log `"Creating new QdrantClient (caller did not pass one)"` doesn't identify which function triggered the lazy construction. Could include caller info (e.g., `inspect.stack()[1].function`) for easier debug session triage. Low priority since it only fires for lazy construction paths.
|
||||||
20
docs/deploy-blast-radius.md
Normal file
20
docs/deploy-blast-radius.md
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
# Deploy Blast Radius Reference
|
||||||
|
|
||||||
|
Quick-reference for operators during deployment of domain categorization.
|
||||||
|
|
||||||
|
| Step | What Changes | Worst Case (Partial Failure) | Detection Signal | Rollback | Est. Rollback Time |
|
||||||
|
|------|-------------|------------------------------|-----------------|----------|-------------------|
|
||||||
|
| **Plugin install** | PeerTube plugin dir on CT 110 | PeerTube fails to start | `systemctl status peertube` shows failed | Move plugin dir to `.disabled`, restart PeerTube | 2 min |
|
||||||
|
| **PeerTube restart** | PeerTube service state | PeerTube crash loop | `journalctl -u peertube` shows repeated failures | Disable plugin, restart | 2 min |
|
||||||
|
| **Schema migration** (RECON restart) | 4 new nullable columns + 1 index in recon.db | Migration SQL error leaves partial columns | Python PRAGMA check fails | DROP COLUMN for each added column | 5 min |
|
||||||
|
| **--backfill** | `recon_domain` + `recon_domain_status` on ~22K rows | Wrong domain assignments | Spot-check 20 random docs | `UPDATE documents SET recon_domain = NULL, recon_domain_status = NULL ...` | 1 min |
|
||||||
|
| **--tiebreaker-pass** | ~1,100 rows: `tied_pass_1` to `tied_pass_2`/`tied_manual` | Wrong tiebreaker resolution | Spot-check 5 resolved items | Reset `tied_pass_2`/`tied_manual` back to `tied_pass_1` | 1 min |
|
||||||
|
| **--push-pending** | PeerTube `video.category` column on ~22K rows | Wrong categories visible to all PeerTube users | PeerTube UI shows wrong labels | `UPDATE video SET category = NULL WHERE category >= 100` + clear push timestamps | 2 min |
|
||||||
|
| **--reprocess-missing** | **DELETES** concept directories (irreversible locally) | Concepts deleted, re-enrichment fails (Gemini API down, quota hit) | `recon.py status` shows stuck `queued` items, concept dirs missing | Restore from Contabo backup (`rsync`) | 10-60 min depending on count |
|
||||||
|
|
||||||
|
## Risk Tiers
|
||||||
|
|
||||||
|
- **Low risk (read-only):** `--dry-run` on any command, status display
|
||||||
|
- **Medium risk (DB-only, reversible):** `--backfill`, `--tiebreaker-pass`, schema migration
|
||||||
|
- **High risk (external writes):** `--push-pending` (writes to PeerTube, visible to users)
|
||||||
|
- **Critical risk (destructive):** `--reprocess-missing` (deletes concept files, $130+ Gemini work at risk)
|
||||||
117
docs/domain-assignment.md
Normal file
117
docs/domain-assignment.md
Normal file
|
|
@ -0,0 +1,117 @@
|
||||||
|
# Domain Assignment — Algorithm & Operations Guide
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin.
|
||||||
|
|
||||||
|
## Data Source
|
||||||
|
|
||||||
|
Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue.
|
||||||
|
|
||||||
|
Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data.
|
||||||
|
|
||||||
|
## Algorithm
|
||||||
|
|
||||||
|
### Pass 1: Concept Domain Count (inline, per-document)
|
||||||
|
|
||||||
|
Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`.
|
||||||
|
|
||||||
|
1. Query Qdrant for all points with `doc_hash` matching the document
|
||||||
|
2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only
|
||||||
|
3. If zero concept vectors → `no_concepts` (terminal)
|
||||||
|
4. If single top domain → `assigned`
|
||||||
|
5. If tied → `tied_pass_1` (deferred to tiebreaker)
|
||||||
|
|
||||||
|
### Pass 2: Channel Tiebreaker (batch)
|
||||||
|
|
||||||
|
Runs via `assign-categories --tiebreaker-pass`.
|
||||||
|
|
||||||
|
For each `tied_pass_1` document:
|
||||||
|
|
||||||
|
1. Identify the tied domains from Qdrant
|
||||||
|
2. Look up the document's channel (`catalogue.category`)
|
||||||
|
3. **Skip-list check:** If channel is in `MEGA_CHANNEL_SKIP_LIST` (known non-topical catch-alls), skip tiebreaking → `tied_manual`
|
||||||
|
4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter)
|
||||||
|
5. Among the tied domains only, pick the one with the highest channel-wide concept count
|
||||||
|
6. If resolved → `tied_pass_2`
|
||||||
|
7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review)
|
||||||
|
|
||||||
|
### Channel Skip List
|
||||||
|
|
||||||
|
Certain channels are known non-topical catch-alls where channel-wide concept aggregation produces meaningless noise. These are listed explicitly in `MEGA_CHANNEL_SKIP_LIST` (defined in `lib/recon_domains.py`) and skip tiebreaking entirely — their tied items go straight to `tied_manual` for dashboard review.
|
||||||
|
|
||||||
|
Current skip list:
|
||||||
|
- `Transcript` — Legacy catch-all (~9,200 videos), no topical coherence
|
||||||
|
|
||||||
|
This is intentionally an explicit list, not a size threshold. Legitimate large channels (e.g., 1a-auto, forgotten-weapons) run the tiebreaker normally because their content is topically coherent. Adding a channel to the skip list requires a code change and a documented reason.
|
||||||
|
|
||||||
|
## Status Values
|
||||||
|
|
||||||
|
| Status | Meaning | Terminal? | Next Action |
|
||||||
|
|--------|---------|-----------|-------------|
|
||||||
|
| `assigned` | Clear winner from pass 1 | No | Push to PeerTube |
|
||||||
|
| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` |
|
||||||
|
| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube |
|
||||||
|
| `tied_manual` | Needs human review | No | Review at `/peertube/review` |
|
||||||
|
| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) |
|
||||||
|
| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` |
|
||||||
|
| `manual_assigned` | Human override from dashboard | No | Already pushed |
|
||||||
|
|
||||||
|
**"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}`
|
||||||
|
|
||||||
|
## CLI Commands
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
|
||||||
|
# Show current assignment status
|
||||||
|
python3 recon.py assign-categories
|
||||||
|
|
||||||
|
# Pass 1: backfill all unassigned complete stream documents
|
||||||
|
python3 recon.py assign-categories --backfill --dry-run
|
||||||
|
python3 recon.py assign-categories --backfill
|
||||||
|
|
||||||
|
# Pass 2: resolve ties via channel analysis
|
||||||
|
python3 recon.py assign-categories --tiebreaker-pass
|
||||||
|
|
||||||
|
# Push all assigned-but-unpushed categories to PeerTube API
|
||||||
|
python3 recon.py assign-categories --push-pending
|
||||||
|
|
||||||
|
# Re-queue items with transient failures for full re-processing
|
||||||
|
python3 recon.py assign-categories --reprocess-missing
|
||||||
|
|
||||||
|
# Limit processing count
|
||||||
|
python3 recon.py assign-categories --backfill --limit 100
|
||||||
|
```
|
||||||
|
|
||||||
|
## Dashboard Review
|
||||||
|
|
||||||
|
The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items. Each row displays:
|
||||||
|
- Video title and channel
|
||||||
|
- Top concept domains with counts
|
||||||
|
- Dropdown to select the correct domain
|
||||||
|
- Assign button (pushes to PeerTube immediately)
|
||||||
|
|
||||||
|
Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI.
|
||||||
|
|
||||||
|
## Pipeline Integration
|
||||||
|
|
||||||
|
New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`:
|
||||||
|
|
||||||
|
1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client
|
||||||
|
2. If clear winner: pushes category to PeerTube immediately
|
||||||
|
3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run
|
||||||
|
4. If no concepts: marks as `no_concepts` (terminal)
|
||||||
|
5. On Qdrant error: logs warning and continues — does not block the pipeline
|
||||||
|
|
||||||
|
## Source Files
|
||||||
|
|
||||||
|
| File | Purpose |
|
||||||
|
|------|---------|
|
||||||
|
| `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS |
|
||||||
|
| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers |
|
||||||
|
| `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` |
|
||||||
|
| `lib/embedder.py` | Post-embed hook (passes qdrant client) |
|
||||||
|
| `lib/status.py` | DB columns + helper methods |
|
||||||
|
| `lib/api.py` | Dashboard review routes |
|
||||||
|
| `recon.py` | CLI `assign-categories` command |
|
||||||
426
docs/migration-runbook.md
Normal file
426
docs/migration-runbook.md
Normal file
|
|
@ -0,0 +1,426 @@
|
||||||
|
# Domain Categorization Migration Runbook
|
||||||
|
|
||||||
|
Step-by-step procedure to deploy the PeerTube domain categorization feature.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
|
||||||
|
- Feature branch `feature/peertube-domain-categorization` merged to master (or checked out)
|
||||||
|
- SSH access to recon-vm (192.168.1.130) and CT 110 (192.168.1.170)
|
||||||
|
- PeerTube admin credentials (`root` / password in `.env`)
|
||||||
|
|
||||||
|
## Pre-Deploy Backups
|
||||||
|
|
||||||
|
These backups MUST be completed before any state-changing step.
|
||||||
|
|
||||||
|
### 1. Snapshot RECON database
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh zvx@192.168.1.130
|
||||||
|
cp /opt/recon/data/recon.db "/opt/recon/data/recon.db.pre-domain-feature.$(date +%Y%m%d_%H%M%S).bak"
|
||||||
|
ls -la /opt/recon/data/recon.db.pre-domain-feature.*.bak # Confirm
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Snapshot PeerTube PostgreSQL
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres pg_dump peertube_prod' > "/tmp/peertube_prod.pre-domain-feature.$(date +%Y%m%d_%H%M%S).sql"
|
||||||
|
ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Verify off-site concept backup
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Check last rsync to Contabo
|
||||||
|
ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5'
|
||||||
|
ssh root@100.64.0.1 'ls -la /opt/backups/recon/concepts/ | tail -5'
|
||||||
|
# Confirm timestamps match within 6 hours
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. Confirm RECON service state
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh zvx@192.168.1.130 'sudo systemctl status recon --no-pager'
|
||||||
|
# Note: do NOT restart until Step 3. If currently running, confirm no active
|
||||||
|
# enrichment/embedding workers before proceeding.
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Step 1: Deploy PeerTube Plugin to CT 110
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# From recon-vm, copy plugin to CT 110
|
||||||
|
ssh zvx@192.168.1.130
|
||||||
|
cd /opt/recon/peertube-plugin/
|
||||||
|
scp -r peertube-plugin-recon-domains root@192.168.1.241:'pct exec 110 -- mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
|
||||||
|
|
||||||
|
# Or via the Proxmox host:
|
||||||
|
ssh root@192.168.1.243 # media host
|
||||||
|
pct exec 110 -- bash -c 'mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
|
||||||
|
# Copy files into the container (scp from recon-vm or use pct push)
|
||||||
|
```
|
||||||
|
|
||||||
|
Alternative: Install via PeerTube admin UI (Admin > Plugins > Install).
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Restart PeerTube to register plugin
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- systemctl restart peertube'
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Check PeerTube logs for plugin registration errors:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- journalctl -u peertube --since=-5min' | grep -i plugin
|
||||||
|
```
|
||||||
|
|
||||||
|
If any errors reference `peertube-plugin-recon-domains`, do NOT proceed. Diagnose
|
||||||
|
and fix the plugin before continuing. See Rollback: "Plugin install fails" below.
|
||||||
|
|
||||||
|
## Step 2: Verify Plugin
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# From recon-vm
|
||||||
|
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | grep -E '"1[0-1][0-9]"'
|
||||||
|
```
|
||||||
|
|
||||||
|
Should show all 18 categories (IDs 100-117). If any are missing, do NOT proceed.
|
||||||
|
|
||||||
|
Run the parity test:
|
||||||
|
```bash
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
python3 tests/test_constants_parity.py
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 3: Apply Schema Migration
|
||||||
|
|
||||||
|
**Requires RECON restart (ask user first).**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
sudo systemctl restart recon
|
||||||
|
```
|
||||||
|
|
||||||
|
The migration runs automatically on startup via `StatusDB._init_db()`. Verify:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
conn = db._get_conn()
|
||||||
|
cols = [r[1] for r in conn.execute('PRAGMA table_info(documents)').fetchall()]
|
||||||
|
for c in ['recon_domain', 'recon_domain_status', 'recon_domain_assigned_at', 'peertube_category_pushed_at']:
|
||||||
|
assert c in cols, f'Missing: {c}'
|
||||||
|
print(f' {c}: OK')
|
||||||
|
|
||||||
|
# Verify index exists
|
||||||
|
indexes = [r[1] for r in conn.execute('PRAGMA index_list(documents)').fetchall()]
|
||||||
|
assert 'idx_documents_recon_domain_status' in indexes, 'Missing index'
|
||||||
|
print(' idx_documents_recon_domain_status: OK')
|
||||||
|
|
||||||
|
# Verify no columns were dropped
|
||||||
|
expected_existing = ['hash', 'status', 'filename', 'discovered_at']
|
||||||
|
for c in expected_existing:
|
||||||
|
assert c in cols, f'ALERT: existing column {c} is missing!'
|
||||||
|
print('Migration verified — all columns present, no existing columns dropped')
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 4: Run Backfill
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
|
||||||
|
# Dry run first
|
||||||
|
python3 recon.py assign-categories --backfill --dry-run
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Verify dry-run output distribution roughly matches investigation benchmarks:
|
||||||
|
- ~94.8% `assigned` (clear winners)
|
||||||
|
- ~5.2% `tied_pass_1` (ties)
|
||||||
|
- ~19.5% `needs_reprocess` (missing/legacy concepts)
|
||||||
|
|
||||||
|
If the distribution deviates more than 5 percentage points from these benchmarks,
|
||||||
|
halt and investigate. Do not proceed until the deviation is explained.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Execute pass 1
|
||||||
|
python3 recon.py assign-categories --backfill
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Spot-check 20 random assigned documents:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
rows = db._get_conn().execute(
|
||||||
|
\"SELECT d.hash, d.recon_domain FROM documents d WHERE d.recon_domain_status = 'assigned' ORDER BY RANDOM() LIMIT 20\"
|
||||||
|
).fetchall()
|
||||||
|
for r in rows:
|
||||||
|
print(r['hash'][:12], r['recon_domain'])
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
For each, visually verify against concept files: `ls data/concepts/{hash}/` and
|
||||||
|
spot-check one `window_*.json` to confirm the assigned domain is plausible.
|
||||||
|
Halt if any are wildly wrong. See Rollback: "Clear wrong backfill assignments" below.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Run tiebreaker pass
|
||||||
|
python3 recon.py assign-categories --tiebreaker-pass
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Verify tiebreaker results:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
c = db.get_domain_status_counts()
|
||||||
|
print('Status breakdown:', c)
|
||||||
|
print()
|
||||||
|
print('tied_pass_2 (resolved):', c.get('tied_pass_2', 0))
|
||||||
|
print('tied_manual (needs review):', c.get('tied_manual', 0))
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
Spot-check 5 `tied_pass_2` items — verify the resolved domain is plausible given
|
||||||
|
the channel's other content.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Check overall status
|
||||||
|
python3 recon.py assign-categories
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 5: Push to PeerTube
|
||||||
|
|
||||||
|
Push in stages. Do NOT push all at once.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Dry run: confirm count
|
||||||
|
python3 recon.py assign-categories --push-pending --dry-run
|
||||||
|
|
||||||
|
# Stage 1: push 100 items
|
||||||
|
python3 recon.py assign-categories --push-pending --limit 100
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Verify in PeerTube UI (stream.echo6.co admin, or via API) that 100 videos
|
||||||
|
now show RECON domain categories. Spot-check 5 videos.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Verify via API: pick a random pushed video
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
row = db._get_conn().execute(
|
||||||
|
\"SELECT d.recon_domain, c.path FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.peertube_category_pushed_at IS NOT NULL ORDER BY RANDOM() LIMIT 1\"
|
||||||
|
).fetchone()
|
||||||
|
if row:
|
||||||
|
uuid = row['path'].rsplit('/w/', 1)[-1] if row['path'] and '/w/' in row['path'] else '?'
|
||||||
|
print(f'Domain: {row[\"recon_domain\"]} UUID: {uuid}')
|
||||||
|
print(f'Check: curl -s http://192.168.1.170:9000/api/v1/videos/{uuid} -H \"Host: stream.echo6.co\" | python3 -m json.tool | grep category')
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Stage 2: push 1000 items
|
||||||
|
python3 recon.py assign-categories --push-pending --limit 1000
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Verify via PeerTube database:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Stage 3: push remaining
|
||||||
|
python3 recon.py assign-categories --push-pending
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 6: Verify
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Check PeerTube database directly
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
|
||||||
|
|
||||||
|
# Check uncategorized
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT count(*) FROM video WHERE category IS NULL"'
|
||||||
|
|
||||||
|
# Check RECON status
|
||||||
|
python3 recon.py assign-categories
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 7: Reprocess Missing Items (SEPARATE POST-DEPLOY OPERATION)
|
||||||
|
|
||||||
|
**WARNING:** This step deletes concept directories. It is the only destructive
|
||||||
|
operation in the entire feature. Run it separately from the initial deploy,
|
||||||
|
after all other steps are verified and stable.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Dry run first — review what would be deleted
|
||||||
|
python3 recon.py assign-categories --reprocess-missing --dry-run --limit 10
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Review output. Verify concept dirs listed are genuinely stale (legacy
|
||||||
|
domains only, or missing concept files). The dry-run reports file counts for
|
||||||
|
each directory that would be deleted.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Small batch
|
||||||
|
python3 recon.py assign-categories --reprocess-missing --limit 10
|
||||||
|
```
|
||||||
|
|
||||||
|
**STOP.** Verify: check that 10 items re-entered the pipeline.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python3 recon.py status # queued count should increase by ~10
|
||||||
|
```
|
||||||
|
|
||||||
|
Wait for pipeline to process them. Verify domain assignment on completion:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Check these specific items got re-enriched and assigned
|
||||||
|
python3 recon.py assign-categories
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Scale up
|
||||||
|
python3 recon.py assign-categories --reprocess-missing --limit 100
|
||||||
|
|
||||||
|
# Then unbounded
|
||||||
|
python3 recon.py assign-categories --reprocess-missing
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note on interrupts:** If `--reprocess-missing` is interrupted mid-run, re-running
|
||||||
|
it is safe. Any documents stranded at `status='catalogued'` without being re-queued
|
||||||
|
can be recovered with `recon.py queue --source stream.echo6.co`.
|
||||||
|
|
||||||
|
## Step 8: Dashboard Review
|
||||||
|
|
||||||
|
Navigate to `https://recon.echo6.co/peertube/review` to review `tied_manual` items.
|
||||||
|
Each row shows the video, channel, tied domains, and concept counts. Select the
|
||||||
|
correct domain and click Assign.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Rollback Procedures
|
||||||
|
|
||||||
|
### Plugin install fails or breaks PeerTube
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Disable plugin without uninstalling
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- bash -c "
|
||||||
|
mv /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains \
|
||||||
|
/var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains.disabled
|
||||||
|
systemctl restart peertube
|
||||||
|
"'
|
||||||
|
|
||||||
|
# Verify PeerTube is healthy
|
||||||
|
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | head
|
||||||
|
|
||||||
|
# To fully remove: use PeerTube admin UI → Plugins → Uninstall
|
||||||
|
```
|
||||||
|
|
||||||
|
### Schema migration revert (drop new columns)
|
||||||
|
|
||||||
|
Only needed if the columns cause problems. The columns are nullable and have no
|
||||||
|
constraints, so they should be inert.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh zvx@192.168.1.130 'cd /opt/recon && source venv/bin/activate && python3 -c "
|
||||||
|
import sqlite3
|
||||||
|
conn = sqlite3.connect(\"data/recon.db\")
|
||||||
|
for col in [\"recon_domain\", \"recon_domain_status\", \"recon_domain_assigned_at\", \"peertube_category_pushed_at\"]:
|
||||||
|
try:
|
||||||
|
conn.execute(f\"ALTER TABLE documents DROP COLUMN {col}\")
|
||||||
|
print(f\"Dropped: {col}\")
|
||||||
|
except Exception as e:
|
||||||
|
print(f\"Skip {col}: {e}\")
|
||||||
|
conn.execute(\"DROP INDEX IF EXISTS idx_documents_recon_domain_status\")
|
||||||
|
conn.commit()
|
||||||
|
print(\"Index dropped\")
|
||||||
|
"'
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: SQLite ALTER TABLE DROP COLUMN requires SQLite 3.35.0+ (2021-03-12).
|
||||||
|
Ubuntu 24.04 ships 3.45.1 — this is fine.
|
||||||
|
|
||||||
|
### Clear wrong backfill assignments (selective or full)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
|
||||||
|
# Clear ALL domain assignments
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
conn = db._get_conn()
|
||||||
|
conn.execute('''UPDATE documents SET
|
||||||
|
recon_domain = NULL, recon_domain_status = NULL,
|
||||||
|
recon_domain_assigned_at = NULL, peertube_category_pushed_at = NULL''')
|
||||||
|
conn.commit()
|
||||||
|
print('Cleared all domain assignments')
|
||||||
|
"
|
||||||
|
|
||||||
|
# Clear only tiebreaker results (reset to tied_pass_1 for re-run)
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
conn = db._get_conn()
|
||||||
|
conn.execute('''UPDATE documents SET
|
||||||
|
recon_domain = NULL, recon_domain_status = 'tied_pass_1',
|
||||||
|
recon_domain_assigned_at = NULL
|
||||||
|
WHERE recon_domain_status IN ('tied_pass_2', 'tied_manual')''')
|
||||||
|
conn.commit()
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Clear wrong PeerTube categories
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Reset ALL RECON categories (100+) to NULL in PeerTube
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
|
||||||
|
-c "UPDATE video SET category = NULL WHERE category >= 100"'
|
||||||
|
|
||||||
|
# Verify
|
||||||
|
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
|
||||||
|
-c "SELECT count(*) FROM video WHERE category >= 100"'
|
||||||
|
# Should return 0
|
||||||
|
|
||||||
|
# Also clear RECON pushed timestamps so --push-pending can retry
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
python3 -c "
|
||||||
|
from lib.status import StatusDB
|
||||||
|
db = StatusDB()
|
||||||
|
conn = db._get_conn()
|
||||||
|
conn.execute('UPDATE documents SET peertube_category_pushed_at = NULL WHERE peertube_category_pushed_at IS NOT NULL')
|
||||||
|
conn.commit()
|
||||||
|
print('Cleared push timestamps')
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Restore concepts after failed --reprocess-missing
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Concept backups are on Contabo at /opt/backups/recon/concepts/
|
||||||
|
# Identify which hashes were deleted (check RECON logs)
|
||||||
|
ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20'
|
||||||
|
|
||||||
|
# Restore specific hash from Contabo
|
||||||
|
HASH=<hash_from_log>
|
||||||
|
ssh root@100.64.0.1 "tar -cf - -C /opt/backups/recon/concepts/ $HASH" | \
|
||||||
|
ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/"
|
||||||
|
|
||||||
|
# Restore ALL concepts (nuclear option)
|
||||||
|
ssh root@100.64.0.1 'rsync -av /opt/backups/recon/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Fully remove feature
|
||||||
|
|
||||||
|
1. Uninstall plugin from PeerTube admin UI
|
||||||
|
2. Restart PeerTube
|
||||||
|
3. Revert RECON code changes (`git checkout master`)
|
||||||
|
4. Restart RECON
|
||||||
|
5. Drop schema columns (see above)
|
||||||
|
6. Reset PeerTube categories (see above)
|
||||||
81
lib/api.py
81
lib/api.py
|
|
@ -88,6 +88,7 @@ KNOWLEDGE_SUBNAV = [
|
||||||
PEERTUBE_SUBNAV = [
|
PEERTUBE_SUBNAV = [
|
||||||
{'href': '/peertube', 'label': 'Dashboard'},
|
{'href': '/peertube', 'label': 'Dashboard'},
|
||||||
{'href': '/peertube/channels', 'label': 'Channels'},
|
{'href': '/peertube/channels', 'label': 'Channels'},
|
||||||
|
{'href': '/peertube/review', 'label': 'Review'},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -376,6 +377,86 @@ def peertube_channels():
|
||||||
domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels')
|
domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels')
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/peertube/review')
|
||||||
|
def peertube_review():
|
||||||
|
from .recon_domains import VALID_DOMAINS
|
||||||
|
return render_template('peertube/review.html',
|
||||||
|
domain='peertube', subnav=PEERTUBE_SUBNAV,
|
||||||
|
active_page='/peertube/review',
|
||||||
|
valid_domains=sorted(VALID_DOMAINS))
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/api/peertube/review/stats')
|
||||||
|
def api_peertube_review_stats():
|
||||||
|
db = StatusDB()
|
||||||
|
counts = db.get_domain_status_counts()
|
||||||
|
return jsonify(counts)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/api/peertube/review/items')
|
||||||
|
def api_peertube_review_items():
|
||||||
|
from .domain_assigner import _count_domains_from_qdrant, _get_qdrant_client
|
||||||
|
db = StatusDB()
|
||||||
|
config = get_config()
|
||||||
|
items = db.get_items_by_domain_status('tied_manual', limit=200)
|
||||||
|
|
||||||
|
qdrant = _get_qdrant_client(config)
|
||||||
|
collection = config['vector_db']['collection']
|
||||||
|
|
||||||
|
result = []
|
||||||
|
for item in items:
|
||||||
|
file_hash = item['hash']
|
||||||
|
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
|
||||||
|
top_domains = [{'domain': d, 'count': cnt}
|
||||||
|
for d, cnt in domain_counter.most_common(5)]
|
||||||
|
|
||||||
|
result.append({
|
||||||
|
'hash': file_hash,
|
||||||
|
'filename': item.get('filename', ''),
|
||||||
|
'category': item.get('category', ''),
|
||||||
|
'recon_domain': item.get('recon_domain'),
|
||||||
|
'recon_domain_status': item.get('recon_domain_status'),
|
||||||
|
'top_domains': top_domains,
|
||||||
|
})
|
||||||
|
return jsonify(result)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/api/peertube/review/assign', methods=['POST'])
|
||||||
|
def api_peertube_review_assign():
|
||||||
|
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP
|
||||||
|
from .peertube_writer import push_category, extract_uuid
|
||||||
|
data = request.get_json()
|
||||||
|
file_hash = data.get('hash')
|
||||||
|
domain = data.get('domain')
|
||||||
|
|
||||||
|
if not file_hash or not domain:
|
||||||
|
return jsonify({'ok': False, 'error': 'Missing hash or domain'}), 400
|
||||||
|
if domain not in VALID_DOMAINS:
|
||||||
|
return jsonify({'ok': False, 'error': f'Invalid domain: {domain}'}), 400
|
||||||
|
|
||||||
|
db = StatusDB()
|
||||||
|
config = get_config()
|
||||||
|
|
||||||
|
db.set_domain_assignment(file_hash, domain, 'manual_assigned')
|
||||||
|
|
||||||
|
# Push to PeerTube
|
||||||
|
conn = db._get_conn()
|
||||||
|
cat_row = conn.execute(
|
||||||
|
"SELECT path FROM catalogue WHERE hash = ?", (file_hash,)
|
||||||
|
).fetchone()
|
||||||
|
if cat_row:
|
||||||
|
uuid = extract_uuid(dict(cat_row)['path'])
|
||||||
|
if uuid:
|
||||||
|
cat_id = DOMAIN_CATEGORY_MAP[domain]
|
||||||
|
try:
|
||||||
|
push_category(uuid, cat_id, config)
|
||||||
|
db.set_peertube_pushed(file_hash)
|
||||||
|
except Exception as e:
|
||||||
|
return jsonify({'ok': True, 'warning': f'Assigned but PeerTube push failed: {e}'})
|
||||||
|
|
||||||
|
return jsonify({'ok': True, 'domain': domain})
|
||||||
|
|
||||||
|
|
||||||
@app.route('/settings/keys')
|
@app.route('/settings/keys')
|
||||||
def settings_keys():
|
def settings_keys():
|
||||||
from lib.key_manager import get_key_manager
|
from lib.key_manager import get_key_manager
|
||||||
|
|
|
||||||
319
lib/domain_assigner.py
Normal file
319
lib/domain_assigner.py
Normal file
|
|
@ -0,0 +1,319 @@
|
||||||
|
"""
|
||||||
|
RECON Domain Assigner
|
||||||
|
|
||||||
|
Computes per-video domain assignments from Qdrant vector payloads.
|
||||||
|
Two functions, two execution modes:
|
||||||
|
|
||||||
|
compute_assignment() — pass 1, inline from post-embed hook
|
||||||
|
run_tiebreaker_pass() — batch, resolves ties via channel concept scan
|
||||||
|
|
||||||
|
Data source: Qdrant `domain` payload field on concept vectors.
|
||||||
|
Previously read on-disk concept JSON files; migrated to Qdrant as
|
||||||
|
single source of truth (2026-04-28).
|
||||||
|
|
||||||
|
Status values written to documents.recon_domain_status:
|
||||||
|
assigned — clear winner from pass 1 concept count
|
||||||
|
tied_pass_1 — concept tie, awaiting channel tiebreaker
|
||||||
|
tied_pass_2 — resolved by channel tiebreaker
|
||||||
|
tied_manual — needs human review (dashboard)
|
||||||
|
no_concepts — terminal, zero concept vectors in Qdrant
|
||||||
|
needs_reprocess — transient failure (Qdrant error, etc.)
|
||||||
|
manual_assigned — human override from dashboard
|
||||||
|
"""
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny
|
||||||
|
|
||||||
|
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP, MEGA_CHANNEL_SKIP_LIST
|
||||||
|
from .utils import setup_logging
|
||||||
|
|
||||||
|
logger = setup_logging('recon.domain_assigner')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def _get_qdrant_client(config):
|
||||||
|
"""Create a QdrantClient from RECON config.
|
||||||
|
|
||||||
|
Callers should create one client and pass it through rather than
|
||||||
|
calling this repeatedly.
|
||||||
|
"""
|
||||||
|
logger.debug("Creating new QdrantClient (caller did not pass one)")
|
||||||
|
return QdrantClient(
|
||||||
|
host=config['vector_db']['host'],
|
||||||
|
port=config['vector_db']['port'],
|
||||||
|
timeout=60
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _count_domains_from_qdrant(qdrant, collection, doc_hash):
|
||||||
|
"""Count valid domain occurrences for a single document from Qdrant.
|
||||||
|
|
||||||
|
Scrolls all points matching doc_hash and counts domain values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
qdrant: QdrantClient instance
|
||||||
|
collection: Qdrant collection name
|
||||||
|
doc_hash: Document hash to query
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Counter of {domain_name: count} for valid domains.
|
||||||
|
Empty Counter if no points found (never None).
|
||||||
|
"""
|
||||||
|
domain_counter = Counter()
|
||||||
|
offset = None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
results, next_offset = qdrant.scroll(
|
||||||
|
collection_name=collection,
|
||||||
|
scroll_filter=Filter(must=[
|
||||||
|
FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash))
|
||||||
|
]),
|
||||||
|
with_payload=["domain"],
|
||||||
|
with_vectors=False,
|
||||||
|
limit=200,
|
||||||
|
offset=offset,
|
||||||
|
)
|
||||||
|
|
||||||
|
for point in results:
|
||||||
|
dom = point.payload.get('domain')
|
||||||
|
if isinstance(dom, str) and dom in VALID_DOMAINS:
|
||||||
|
domain_counter[dom] += 1
|
||||||
|
elif isinstance(dom, list):
|
||||||
|
for d in dom:
|
||||||
|
if isinstance(d, str) and d in VALID_DOMAINS:
|
||||||
|
domain_counter[d] += 1
|
||||||
|
|
||||||
|
if next_offset is None:
|
||||||
|
break
|
||||||
|
offset = next_offset
|
||||||
|
|
||||||
|
return domain_counter
|
||||||
|
|
||||||
|
|
||||||
|
def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes):
|
||||||
|
"""Count valid domain occurrences across multiple documents from Qdrant.
|
||||||
|
|
||||||
|
Single scroll with MatchAny filter, with offset pagination for large
|
||||||
|
result sets.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
qdrant: QdrantClient instance
|
||||||
|
collection: Qdrant collection name
|
||||||
|
doc_hashes: List of document hashes to query
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Counter of {domain_name: count} aggregated across all matching points.
|
||||||
|
"""
|
||||||
|
if not doc_hashes:
|
||||||
|
return Counter()
|
||||||
|
|
||||||
|
domain_counter = Counter()
|
||||||
|
offset = None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
results, next_offset = qdrant.scroll(
|
||||||
|
collection_name=collection,
|
||||||
|
scroll_filter=Filter(must=[
|
||||||
|
FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes))
|
||||||
|
]),
|
||||||
|
with_payload=["domain"],
|
||||||
|
with_vectors=False,
|
||||||
|
limit=10000,
|
||||||
|
offset=offset,
|
||||||
|
)
|
||||||
|
|
||||||
|
for point in results:
|
||||||
|
dom = point.payload.get('domain')
|
||||||
|
if isinstance(dom, str) and dom in VALID_DOMAINS:
|
||||||
|
domain_counter[dom] += 1
|
||||||
|
elif isinstance(dom, list):
|
||||||
|
for d in dom:
|
||||||
|
if isinstance(d, str) and d in VALID_DOMAINS:
|
||||||
|
domain_counter[d] += 1
|
||||||
|
|
||||||
|
if next_offset is None:
|
||||||
|
break
|
||||||
|
offset = next_offset
|
||||||
|
|
||||||
|
return domain_counter
|
||||||
|
|
||||||
|
|
||||||
|
def compute_assignment(file_hash, db, config, qdrant=None):
|
||||||
|
"""Compute domain assignment for a single document (pass 1).
|
||||||
|
|
||||||
|
Counts domain occurrences across all concept vectors in Qdrant.
|
||||||
|
If a single domain wins, assigns it. If tied, defers to batch
|
||||||
|
tiebreaker.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_hash: Document hash
|
||||||
|
db: StatusDB instance
|
||||||
|
config: RECON config dict
|
||||||
|
qdrant: Optional QdrantClient (created if not provided)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(domain, status) tuple where domain is a string or None,
|
||||||
|
and status is one of: 'assigned', 'tied_pass_1', 'no_concepts',
|
||||||
|
'needs_reprocess'
|
||||||
|
"""
|
||||||
|
owns_client = False
|
||||||
|
if qdrant is None:
|
||||||
|
qdrant = _get_qdrant_client(config)
|
||||||
|
owns_client = True
|
||||||
|
|
||||||
|
collection = config['vector_db']['collection']
|
||||||
|
|
||||||
|
try:
|
||||||
|
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}")
|
||||||
|
return (None, 'needs_reprocess')
|
||||||
|
|
||||||
|
if len(domain_counter) == 0:
|
||||||
|
return (None, 'no_concepts')
|
||||||
|
|
||||||
|
top = domain_counter.most_common(2)
|
||||||
|
top_domain = top[0][0]
|
||||||
|
top_count = top[0][1]
|
||||||
|
|
||||||
|
if len(top) == 1 or top[1][1] < top_count:
|
||||||
|
return (top_domain, 'assigned')
|
||||||
|
|
||||||
|
# Tie — defer to tiebreaker pass
|
||||||
|
return (None, 'tied_pass_1')
|
||||||
|
|
||||||
|
|
||||||
|
def _get_tied_domains(qdrant, collection, file_hash):
|
||||||
|
"""Get the set of domains tied for first place in a document's concepts."""
|
||||||
|
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
|
||||||
|
if not domain_counter:
|
||||||
|
return []
|
||||||
|
|
||||||
|
top = domain_counter.most_common()
|
||||||
|
if not top:
|
||||||
|
return []
|
||||||
|
|
||||||
|
max_count = top[0][1]
|
||||||
|
return [dom for dom, cnt in top if cnt == max_count]
|
||||||
|
|
||||||
|
|
||||||
|
def _channel_video_hashes(db, channel_name, exclude_hash=None):
|
||||||
|
"""Get all document hashes belonging to a PeerTube channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: StatusDB instance
|
||||||
|
channel_name: catalogue.category (channel actor name)
|
||||||
|
exclude_hash: Hash to exclude (the document being resolved)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of document hashes
|
||||||
|
"""
|
||||||
|
conn = db._get_conn()
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT hash FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'",
|
||||||
|
(channel_name,)
|
||||||
|
).fetchall()
|
||||||
|
hashes = [r['hash'] for r in rows]
|
||||||
|
if exclude_hash:
|
||||||
|
hashes = [h for h in hashes if h != exclude_hash]
|
||||||
|
return hashes
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def run_tiebreaker_pass(db, config, qdrant=None):
|
||||||
|
"""Resolve tied domain assignments using channel-level Qdrant analysis.
|
||||||
|
|
||||||
|
Processes all documents where recon_domain_status = 'tied_pass_1'.
|
||||||
|
|
||||||
|
For each tied document, queries Qdrant for domain counts from all
|
||||||
|
other videos in the same channel and picks the tied domain with the
|
||||||
|
highest channel-wide count.
|
||||||
|
|
||||||
|
Channels in MEGA_CHANNEL_SKIP_LIST (known non-topical catch-alls) skip
|
||||||
|
tiebreaking and go straight to 'tied_manual' for dashboard review.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: StatusDB instance
|
||||||
|
config: RECON config dict
|
||||||
|
qdrant: Optional QdrantClient (created if not provided)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with counts: resolved, manual, skipped, errors
|
||||||
|
"""
|
||||||
|
owns_client = False
|
||||||
|
if qdrant is None:
|
||||||
|
qdrant = _get_qdrant_client(config)
|
||||||
|
owns_client = True
|
||||||
|
|
||||||
|
collection = config['vector_db']['collection']
|
||||||
|
tied_items = db.get_items_by_domain_status('tied_pass_1')
|
||||||
|
|
||||||
|
stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)}
|
||||||
|
logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve")
|
||||||
|
|
||||||
|
for item in tied_items:
|
||||||
|
file_hash = item['hash']
|
||||||
|
channel = item.get('category', '')
|
||||||
|
|
||||||
|
try:
|
||||||
|
tied_domains = _get_tied_domains(qdrant, collection, file_hash)
|
||||||
|
if not tied_domains:
|
||||||
|
db.set_domain_assignment(file_hash, None, 'no_concepts')
|
||||||
|
stats['skipped'] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if len(tied_domains) == 1:
|
||||||
|
# No longer tied (possibly re-enriched since pass 1)
|
||||||
|
db.set_domain_assignment(file_hash, tied_domains[0], 'assigned')
|
||||||
|
stats['resolved'] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Skip-list check: known non-topical catch-all channels
|
||||||
|
if channel in MEGA_CHANNEL_SKIP_LIST:
|
||||||
|
fallback = sorted(tied_domains)[0]
|
||||||
|
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
|
||||||
|
stats['manual'] += 1
|
||||||
|
logger.debug(f" {file_hash[:12]}: skip-list channel '{channel}' → tied_manual")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Channel tiebreaker: count domains across all other videos in channel
|
||||||
|
other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash)
|
||||||
|
channel_domain_counts = _count_domains_from_qdrant_batch(
|
||||||
|
qdrant, collection, other_hashes
|
||||||
|
)
|
||||||
|
|
||||||
|
# Among tied domains only, pick highest channel-wide count
|
||||||
|
best_domain = None
|
||||||
|
best_count = -1
|
||||||
|
for dom in tied_domains:
|
||||||
|
c = channel_domain_counts.get(dom, 0)
|
||||||
|
if c > best_count:
|
||||||
|
best_count = c
|
||||||
|
best_domain = dom
|
||||||
|
|
||||||
|
# Check if channel tiebreaker resolved it
|
||||||
|
tied_at_channel = [d for d in tied_domains
|
||||||
|
if channel_domain_counts.get(d, 0) == best_count]
|
||||||
|
|
||||||
|
if len(tied_at_channel) == 1:
|
||||||
|
db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2')
|
||||||
|
stats['resolved'] += 1
|
||||||
|
logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Still tied after channel scan — mark for manual review
|
||||||
|
fallback = sorted(tied_domains)[0]
|
||||||
|
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
|
||||||
|
stats['manual'] += 1
|
||||||
|
logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}")
|
||||||
|
stats['errors'] += 1
|
||||||
|
|
||||||
|
logger.info(f"Tiebreaker complete: {stats['resolved']} resolved, "
|
||||||
|
f"{stats['manual']} manual, {stats['skipped']} skipped, "
|
||||||
|
f"{stats['errors']} errors")
|
||||||
|
return stats
|
||||||
|
|
@ -27,13 +27,7 @@ from .utils import resolve_text_dir
|
||||||
logger = setup_logging('recon.embedder')
|
logger = setup_logging('recon.embedder')
|
||||||
|
|
||||||
# ── Classification allowlists ───────────────────────────────────────────────
|
# ── Classification allowlists ───────────────────────────────────────────────
|
||||||
VALID_DOMAINS = {
|
from .recon_domains import VALID_DOMAINS
|
||||||
'Agriculture & Livestock', 'Civil Organization', 'Communications',
|
|
||||||
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
|
|
||||||
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
|
|
||||||
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
|
|
||||||
'Vehicles', 'Water Systems', 'Wilderness Skills',
|
|
||||||
}
|
|
||||||
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
|
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
|
||||||
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
|
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
|
||||||
|
|
||||||
|
|
@ -261,15 +255,22 @@ def embed_single(file_hash, db, config):
|
||||||
|
|
||||||
if not all_concepts:
|
if not all_concepts:
|
||||||
db.update_status(file_hash, 'complete', vectors_inserted=0)
|
db.update_status(file_hash, 'complete', vectors_inserted=0)
|
||||||
|
# Tag stream docs with no concepts for reprocessing
|
||||||
|
_cat = db._get_conn().execute(
|
||||||
|
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
|
||||||
|
).fetchone()
|
||||||
|
if _cat and dict(_cat)['source'] == 'stream.echo6.co':
|
||||||
|
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
|
||||||
logger.info(f"No concepts to embed for {doc['filename']}")
|
logger.info(f"No concepts to embed for {doc['filename']}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Look up source from catalogue once per doc
|
# Look up source and path from catalogue once per doc
|
||||||
cat_conn = db._get_conn()
|
cat_conn = db._get_conn()
|
||||||
cat_row = cat_conn.execute(
|
cat_row = cat_conn.execute(
|
||||||
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
|
"SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,)
|
||||||
).fetchone()
|
).fetchone()
|
||||||
source = dict(cat_row)['source'] if cat_row else ''
|
source = dict(cat_row)['source'] if cat_row else ''
|
||||||
|
catalogue_path = dict(cat_row)['path'] if cat_row else ''
|
||||||
|
|
||||||
download_url = ''
|
download_url = ''
|
||||||
is_web = doc.get('path', '').startswith(('http://', 'https://'))
|
is_web = doc.get('path', '').startswith(('http://', 'https://'))
|
||||||
|
|
@ -321,6 +322,8 @@ def embed_single(file_hash, db, config):
|
||||||
|
|
||||||
if not valid:
|
if not valid:
|
||||||
db.update_status(file_hash, 'complete', vectors_inserted=0)
|
db.update_status(file_hash, 'complete', vectors_inserted=0)
|
||||||
|
if source == 'stream.echo6.co':
|
||||||
|
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
|
||||||
logger.info(f"No valid concepts to embed for {doc['filename']}")
|
logger.info(f"No valid concepts to embed for {doc['filename']}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
@ -401,6 +404,28 @@ def embed_single(file_hash, db, config):
|
||||||
|
|
||||||
db.update_status(file_hash, 'complete', vectors_inserted=embedded_count)
|
db.update_status(file_hash, 'complete', vectors_inserted=embedded_count)
|
||||||
logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)")
|
logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)")
|
||||||
|
|
||||||
|
# Post-embed hook: assign domain for PeerTube videos
|
||||||
|
if source == 'stream.echo6.co':
|
||||||
|
try:
|
||||||
|
from .domain_assigner import compute_assignment
|
||||||
|
from .peertube_writer import push_category, extract_uuid
|
||||||
|
from .recon_domains import DOMAIN_CATEGORY_MAP
|
||||||
|
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
|
||||||
|
db.set_domain_assignment(file_hash, domain, status)
|
||||||
|
if domain and status == 'assigned':
|
||||||
|
cat_id = DOMAIN_CATEGORY_MAP[domain]
|
||||||
|
uuid = extract_uuid(catalogue_path)
|
||||||
|
if uuid:
|
||||||
|
pushed, _token = push_category(uuid, cat_id, config)
|
||||||
|
if pushed:
|
||||||
|
db.set_peertube_pushed(file_hash)
|
||||||
|
logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube")
|
||||||
|
else:
|
||||||
|
logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Domain assignment failed for {file_hash}: {e}")
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -42,13 +42,7 @@ logger = setup_logging('recon.enricher')
|
||||||
STALE_ENRICHING_HOURS = 2
|
STALE_ENRICHING_HOURS = 2
|
||||||
|
|
||||||
# ── Classification allowlists ───────────────────────────────────────────────
|
# ── Classification allowlists ───────────────────────────────────────────────
|
||||||
VALID_DOMAINS = {
|
from .recon_domains import VALID_DOMAINS
|
||||||
'Agriculture & Livestock', 'Civil Organization', 'Communications',
|
|
||||||
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
|
|
||||||
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
|
|
||||||
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
|
|
||||||
'Vehicles', 'Water Systems', 'Wilderness Skills',
|
|
||||||
}
|
|
||||||
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
|
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
|
||||||
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
|
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
|
||||||
|
|
||||||
|
|
|
||||||
323
lib/peertube_writer.py
Normal file
323
lib/peertube_writer.py
Normal file
|
|
@ -0,0 +1,323 @@
|
||||||
|
"""
|
||||||
|
RECON PeerTube Writer
|
||||||
|
|
||||||
|
Authenticated PeerTube API client for pushing domain category assignments.
|
||||||
|
Uses OAuth2 password grant, caches tokens, refreshes on 401.
|
||||||
|
|
||||||
|
Config keys used:
|
||||||
|
peertube.api_url — internal PeerTube URL (http://192.168.1.170:9000)
|
||||||
|
peertube.host_header — Host header for API requests (stream.echo6.co)
|
||||||
|
peertube.username — PeerTube admin username
|
||||||
|
peertube.password_env — env var name holding the password
|
||||||
|
peertube.rate_limit_delay — delay between API calls (seconds)
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import requests as http_requests
|
||||||
|
|
||||||
|
from .recon_domains import DOMAIN_CATEGORY_MAP
|
||||||
|
from .utils import setup_logging
|
||||||
|
|
||||||
|
logger = setup_logging('recon.peertube_writer')
|
||||||
|
|
||||||
|
TOKEN_CACHE_PATH = '/opt/recon/data/peertube-oauth-token.json'
|
||||||
|
|
||||||
|
|
||||||
|
def _get_peertube_config(config):
|
||||||
|
"""Extract PeerTube writer config with defaults."""
|
||||||
|
pt = config.get('peertube', {})
|
||||||
|
return {
|
||||||
|
'api_url': pt.get('api_url', pt.get('api_base', 'http://192.168.1.170:9000')),
|
||||||
|
'host_header': pt.get('host_header', 'stream.echo6.co'),
|
||||||
|
'username': pt.get('username', 'root'),
|
||||||
|
'password_env': pt.get('password_env', 'PEERTUBE_PASSWORD'),
|
||||||
|
'rate_limit_delay': pt.get('writer_rate_limit', 0.1),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _load_cached_token():
|
||||||
|
"""Load cached OAuth token from disk."""
|
||||||
|
if os.path.exists(TOKEN_CACHE_PATH):
|
||||||
|
try:
|
||||||
|
with open(TOKEN_CACHE_PATH, 'r') as f:
|
||||||
|
return json.load(f)
|
||||||
|
except (json.JSONDecodeError, OSError):
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _save_token(token_data):
|
||||||
|
"""Save OAuth token to disk cache."""
|
||||||
|
os.makedirs(os.path.dirname(TOKEN_CACHE_PATH), exist_ok=True)
|
||||||
|
with open(TOKEN_CACHE_PATH, 'w') as f:
|
||||||
|
json.dump(token_data, f)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_oauth_client(api_url, host_header):
|
||||||
|
"""Get PeerTube OAuth client credentials.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
api_url: Base API URL
|
||||||
|
host_header: Host header value
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(client_id, client_secret) tuple
|
||||||
|
"""
|
||||||
|
resp = http_requests.get(
|
||||||
|
f"{api_url}/api/v1/oauth-clients/local",
|
||||||
|
headers={'Host': host_header},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
return data['client_id'], data['client_secret']
|
||||||
|
|
||||||
|
|
||||||
|
def _get_token(api_url, host_header, username, password, client_id, client_secret):
|
||||||
|
"""Obtain OAuth2 access token via password grant.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
api_url: Base API URL
|
||||||
|
host_header: Host header value
|
||||||
|
username: PeerTube username
|
||||||
|
password: PeerTube password
|
||||||
|
client_id: OAuth client ID
|
||||||
|
client_secret: OAuth client secret
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Token data dict with access_token, refresh_token, etc.
|
||||||
|
"""
|
||||||
|
resp = http_requests.post(
|
||||||
|
f"{api_url}/api/v1/users/token",
|
||||||
|
headers={'Host': host_header},
|
||||||
|
data={
|
||||||
|
'client_id': client_id,
|
||||||
|
'client_secret': client_secret,
|
||||||
|
'grant_type': 'password',
|
||||||
|
'username': username,
|
||||||
|
'password': password,
|
||||||
|
},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
token_data = resp.json()
|
||||||
|
token_data['client_id'] = client_id
|
||||||
|
token_data['client_secret'] = client_secret
|
||||||
|
_save_token(token_data)
|
||||||
|
return token_data
|
||||||
|
|
||||||
|
|
||||||
|
def _refresh_token(api_url, host_header, token_data):
|
||||||
|
"""Refresh an expired access token.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
New token data dict, or None on failure.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
resp = http_requests.post(
|
||||||
|
f"{api_url}/api/v1/users/token",
|
||||||
|
headers={'Host': host_header},
|
||||||
|
data={
|
||||||
|
'client_id': token_data['client_id'],
|
||||||
|
'client_secret': token_data['client_secret'],
|
||||||
|
'grant_type': 'refresh_token',
|
||||||
|
'refresh_token': token_data['refresh_token'],
|
||||||
|
},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
new_data = resp.json()
|
||||||
|
new_data['client_id'] = token_data['client_id']
|
||||||
|
new_data['client_secret'] = token_data['client_secret']
|
||||||
|
_save_token(new_data)
|
||||||
|
return new_data
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Token refresh failed: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_token(config):
|
||||||
|
"""Ensure we have a valid OAuth token. Returns token data dict.
|
||||||
|
|
||||||
|
Tries cached token first, then obtains a new one.
|
||||||
|
"""
|
||||||
|
pt = _get_peertube_config(config)
|
||||||
|
password = os.environ.get(pt['password_env'], '')
|
||||||
|
if not password:
|
||||||
|
raise ValueError(f"PeerTube password not set in env var {pt['password_env']}")
|
||||||
|
|
||||||
|
# Try cached token
|
||||||
|
token_data = _load_cached_token()
|
||||||
|
if token_data and 'access_token' in token_data:
|
||||||
|
return token_data
|
||||||
|
|
||||||
|
# Get fresh token
|
||||||
|
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
|
||||||
|
return _get_token(
|
||||||
|
pt['api_url'], pt['host_header'],
|
||||||
|
pt['username'], password,
|
||||||
|
client_id, client_secret,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _api_request(method, path, config, token_data, **kwargs):
|
||||||
|
"""Make an authenticated PeerTube API request with auto-refresh on 401.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
method: HTTP method ('GET', 'PUT', etc.)
|
||||||
|
path: API path (e.g. '/api/v1/videos/{uuid}')
|
||||||
|
config: RECON config dict
|
||||||
|
token_data: Current token data dict
|
||||||
|
**kwargs: Additional requests kwargs (json, data, etc.)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(response, token_data) tuple — token_data may be refreshed.
|
||||||
|
"""
|
||||||
|
pt = _get_peertube_config(config)
|
||||||
|
url = f"{pt['api_url']}{path}"
|
||||||
|
headers = {
|
||||||
|
'Host': pt['host_header'],
|
||||||
|
'Authorization': f"Bearer {token_data['access_token']}",
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
|
||||||
|
|
||||||
|
if resp.status_code == 401:
|
||||||
|
# Try refresh
|
||||||
|
new_token = _refresh_token(pt['api_url'], pt['host_header'], token_data)
|
||||||
|
if new_token:
|
||||||
|
headers['Authorization'] = f"Bearer {new_token['access_token']}"
|
||||||
|
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
|
||||||
|
return resp, new_token
|
||||||
|
else:
|
||||||
|
# Full re-auth
|
||||||
|
password = os.environ.get(pt['password_env'], '')
|
||||||
|
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
|
||||||
|
new_token = _get_token(
|
||||||
|
pt['api_url'], pt['host_header'],
|
||||||
|
pt['username'], password,
|
||||||
|
client_id, client_secret,
|
||||||
|
)
|
||||||
|
headers['Authorization'] = f"Bearer {new_token['access_token']}"
|
||||||
|
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
|
||||||
|
return resp, new_token
|
||||||
|
|
||||||
|
return resp, token_data
|
||||||
|
|
||||||
|
|
||||||
|
def push_category(video_uuid, category_id, config, token_data=None):
|
||||||
|
"""Push a category assignment to a single PeerTube video.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
video_uuid: PeerTube video UUID
|
||||||
|
category_id: Category ID (100-117)
|
||||||
|
config: RECON config dict
|
||||||
|
token_data: Optional pre-fetched token data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(success: bool, token_data: dict) tuple
|
||||||
|
"""
|
||||||
|
if token_data is None:
|
||||||
|
token_data = _ensure_token(config)
|
||||||
|
|
||||||
|
resp, token_data = _api_request(
|
||||||
|
'PUT',
|
||||||
|
f'/api/v1/videos/{video_uuid}',
|
||||||
|
config,
|
||||||
|
token_data,
|
||||||
|
json={'category': category_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
if resp.status_code in (200, 204):
|
||||||
|
return True, token_data
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to push category for {video_uuid}: "
|
||||||
|
f"HTTP {resp.status_code} — {resp.text[:200]}")
|
||||||
|
return False, token_data
|
||||||
|
|
||||||
|
|
||||||
|
def extract_uuid(catalogue_path):
|
||||||
|
"""Extract PeerTube video UUID from catalogue path.
|
||||||
|
|
||||||
|
Catalogue paths for PeerTube videos look like:
|
||||||
|
https://stream.echo6.co/w/UUID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
catalogue_path: catalogue.path value
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
UUID string or None
|
||||||
|
"""
|
||||||
|
if not catalogue_path:
|
||||||
|
return None
|
||||||
|
if '/w/' in catalogue_path:
|
||||||
|
return catalogue_path.rsplit('/w/', 1)[-1]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def push_pending(db, config, limit=None):
|
||||||
|
"""Push all assigned-but-unpushed domain categories to PeerTube.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: StatusDB instance
|
||||||
|
config: RECON config dict
|
||||||
|
limit: Optional max number of items to push
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(success_count, fail_count) tuple
|
||||||
|
"""
|
||||||
|
items = db.get_unpushed_assignments()
|
||||||
|
if limit:
|
||||||
|
items = items[:limit]
|
||||||
|
if not items:
|
||||||
|
logger.info("No unpushed assignments to push")
|
||||||
|
return (0, 0)
|
||||||
|
|
||||||
|
pt = _get_peertube_config(config)
|
||||||
|
delay = pt['rate_limit_delay']
|
||||||
|
|
||||||
|
SYSTEMIC_FAIL_THRESHOLD = 5 # abort if first N items all fail
|
||||||
|
|
||||||
|
logger.info(f"Pushing {len(items)} category assignments to PeerTube")
|
||||||
|
|
||||||
|
token_data = _ensure_token(config)
|
||||||
|
success = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
file_hash = item['hash']
|
||||||
|
domain = item.get('recon_domain')
|
||||||
|
catalogue_path = item.get('catalogue_path', '')
|
||||||
|
|
||||||
|
if not domain or domain not in DOMAIN_CATEGORY_MAP:
|
||||||
|
logger.warning(f" {file_hash[:12]}: invalid domain '{domain}', skipping")
|
||||||
|
failed += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
uuid = extract_uuid(catalogue_path)
|
||||||
|
if not uuid:
|
||||||
|
logger.warning(f" {file_hash[:12]}: could not extract UUID from '{catalogue_path}'")
|
||||||
|
failed += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
category_id = DOMAIN_CATEGORY_MAP[domain]
|
||||||
|
ok, token_data = push_category(uuid, category_id, config, token_data)
|
||||||
|
|
||||||
|
if ok:
|
||||||
|
db.set_peertube_pushed(file_hash)
|
||||||
|
success += 1
|
||||||
|
else:
|
||||||
|
failed += 1
|
||||||
|
|
||||||
|
# Abort on systemic failure (e.g. plugin not installed, auth broken)
|
||||||
|
if success == 0 and failed >= SYSTEMIC_FAIL_THRESHOLD:
|
||||||
|
logger.error(f"Aborting push: first {failed} items all failed — "
|
||||||
|
f"check plugin installation and PeerTube API config")
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(delay)
|
||||||
|
|
||||||
|
logger.info(f"Push complete: {success} succeeded, {failed} failed")
|
||||||
|
return (success, failed)
|
||||||
46
lib/recon_domains.py
Normal file
46
lib/recon_domains.py
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
"""
|
||||||
|
RECON Domain Taxonomy
|
||||||
|
|
||||||
|
Single source of truth for the 18 knowledge domains and their PeerTube
|
||||||
|
category ID mappings. IDs 100-117 are registered via the
|
||||||
|
peertube-plugin-recon-domains plugin.
|
||||||
|
|
||||||
|
Import VALID_DOMAINS from here instead of defining local sets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
DOMAIN_CATEGORY_MAP = {
|
||||||
|
'Agriculture & Livestock': 100,
|
||||||
|
'Civil Organization': 101,
|
||||||
|
'Communications': 102,
|
||||||
|
'Food Systems': 103,
|
||||||
|
'Foundational Skills': 104,
|
||||||
|
'Logistics': 105,
|
||||||
|
'Medical': 106,
|
||||||
|
'Navigation': 107,
|
||||||
|
'Operations': 108,
|
||||||
|
'Power Systems': 109,
|
||||||
|
'Preservation & Storage': 110,
|
||||||
|
'Security': 111,
|
||||||
|
'Shelter & Construction': 112,
|
||||||
|
'Technology': 113,
|
||||||
|
'Tools & Equipment': 114,
|
||||||
|
'Vehicles': 115,
|
||||||
|
'Water Systems': 116,
|
||||||
|
'Wilderness Skills': 117,
|
||||||
|
}
|
||||||
|
|
||||||
|
VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys())
|
||||||
|
|
||||||
|
CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()}
|
||||||
|
|
||||||
|
# Channels whose tiebreaker is skipped because their content is non-topical
|
||||||
|
# (catch-alls, miscellany dumps, etc.). Items in these channels with tied
|
||||||
|
# domain counts go straight to tied_manual without channel-context tiebreaker.
|
||||||
|
#
|
||||||
|
# This is intentionally a hardcoded explicit list, not a size threshold.
|
||||||
|
# Adding a channel here requires an explicit decision — only add channels
|
||||||
|
# that are genuinely non-topical catch-alls where channel-wide concept
|
||||||
|
# aggregation would produce meaningless noise.
|
||||||
|
MEGA_CHANNEL_SKIP_LIST = {
|
||||||
|
'Transcript', # Legacy catch-all, ~9,200 videos, no topical coherence
|
||||||
|
}
|
||||||
100
lib/status.py
100
lib/status.py
|
|
@ -124,6 +124,22 @@ class StatusDB:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass # column already exists
|
pass # column already exists
|
||||||
|
|
||||||
|
# Migration: domain assignment columns for PeerTube categorization
|
||||||
|
for col, coltype in [
|
||||||
|
('recon_domain', 'TEXT'),
|
||||||
|
('recon_domain_status', 'TEXT'),
|
||||||
|
('recon_domain_assigned_at', 'TEXT'),
|
||||||
|
('peertube_category_pushed_at', 'TEXT'),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}")
|
||||||
|
except Exception:
|
||||||
|
pass # column already exists
|
||||||
|
try:
|
||||||
|
conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)")
|
||||||
|
except Exception:
|
||||||
|
pass # index already exists
|
||||||
|
|
||||||
# Stream B: file_operations + duplicate_review tables
|
# Stream B: file_operations + duplicate_review tables
|
||||||
conn.executescript("""
|
conn.executescript("""
|
||||||
CREATE TABLE IF NOT EXISTS file_operations (
|
CREATE TABLE IF NOT EXISTS file_operations (
|
||||||
|
|
@ -448,6 +464,90 @@ class StatusDB:
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Domain Assignment Helpers ──────────────────<E29480><E29480>─────────────
|
||||||
|
|
||||||
|
def get_domain_assignment(self, file_hash):
|
||||||
|
"""Get domain assignment for a document.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(recon_domain, recon_domain_status) tuple, or (None, None) if not set.
|
||||||
|
"""
|
||||||
|
conn = self._get_conn()
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?",
|
||||||
|
(file_hash,)
|
||||||
|
).fetchone()
|
||||||
|
if row:
|
||||||
|
return (row['recon_domain'], row['recon_domain_status'])
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
|
def set_domain_assignment(self, file_hash, domain, status):
|
||||||
|
"""Set domain assignment and status for a document."""
|
||||||
|
conn = self._get_conn()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE documents SET recon_domain = ?, recon_domain_status = ?, "
|
||||||
|
"recon_domain_assigned_at = ? WHERE hash = ?",
|
||||||
|
(domain, status, datetime.now(timezone.utc).isoformat(), file_hash)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
def set_peertube_pushed(self, file_hash):
|
||||||
|
"""Mark a document's category as pushed to PeerTube."""
|
||||||
|
conn = self._get_conn()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?",
|
||||||
|
(datetime.now(timezone.utc).isoformat(), file_hash)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
def get_unpushed_assignments(self):
|
||||||
|
"""Get documents with domain assignments not yet pushed to PeerTube.
|
||||||
|
|
||||||
|
Only returns stream.echo6.co source documents.
|
||||||
|
"""
|
||||||
|
conn = self._get_conn()
|
||||||
|
return [dict(r) for r in conn.execute(
|
||||||
|
"""SELECT d.*, c.source, c.category, c.path as catalogue_path
|
||||||
|
FROM documents d
|
||||||
|
LEFT JOIN catalogue c ON d.hash = c.hash
|
||||||
|
WHERE d.recon_domain IS NOT NULL
|
||||||
|
AND d.peertube_category_pushed_at IS NULL
|
||||||
|
AND c.source = 'stream.echo6.co'
|
||||||
|
ORDER BY d.recon_domain_assigned_at""",
|
||||||
|
).fetchall()]
|
||||||
|
|
||||||
|
def get_items_by_domain_status(self, status, limit=None):
|
||||||
|
"""Get documents by domain assignment status."""
|
||||||
|
conn = self._get_conn()
|
||||||
|
q = """SELECT d.*, c.source, c.category, c.path as catalogue_path
|
||||||
|
FROM documents d
|
||||||
|
LEFT JOIN catalogue c ON d.hash = c.hash
|
||||||
|
WHERE d.recon_domain_status = ?
|
||||||
|
ORDER BY d.discovered_at"""
|
||||||
|
if limit:
|
||||||
|
q += f" LIMIT {int(limit)}"
|
||||||
|
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
|
||||||
|
|
||||||
|
def get_domain_status_counts(self):
|
||||||
|
"""Get counts of documents by domain assignment status."""
|
||||||
|
conn = self._get_conn()
|
||||||
|
return {row['recon_domain_status']: row['cnt']
|
||||||
|
for row in conn.execute(
|
||||||
|
"SELECT recon_domain_status, COUNT(*) as cnt "
|
||||||
|
"FROM documents WHERE recon_domain_status IS NOT NULL "
|
||||||
|
"GROUP BY recon_domain_status"
|
||||||
|
).fetchall()}
|
||||||
|
|
||||||
|
def get_domain_distribution(self):
|
||||||
|
"""Get counts of documents per assigned domain."""
|
||||||
|
conn = self._get_conn()
|
||||||
|
return {row['recon_domain']: row['cnt']
|
||||||
|
for row in conn.execute(
|
||||||
|
"SELECT recon_domain, COUNT(*) as cnt "
|
||||||
|
"FROM documents WHERE recon_domain IS NOT NULL "
|
||||||
|
"GROUP BY recon_domain ORDER BY cnt DESC"
|
||||||
|
).fetchall()}
|
||||||
|
|
||||||
# ── Scraper Job Helpers ─────────────────────────────────────
|
# ── Scraper Job Helpers ─────────────────────────────────────
|
||||||
|
|
||||||
def get_pending_scrape_job(self):
|
def get_pending_scrape_job(self):
|
||||||
|
|
|
||||||
52
peertube-plugin/peertube-plugin-recon-domains/README.md
Normal file
52
peertube-plugin/peertube-plugin-recon-domains/README.md
Normal file
|
|
@ -0,0 +1,52 @@
|
||||||
|
# peertube-plugin-recon-domains
|
||||||
|
|
||||||
|
Registers 18 RECON knowledge domains as PeerTube video categories using IDs 100-117. These categories are assigned automatically by RECON's domain assignment pipeline based on concept extraction analysis.
|
||||||
|
|
||||||
|
## Category Mapping
|
||||||
|
|
||||||
|
| ID | Domain |
|
||||||
|
|-----|---------------------------|
|
||||||
|
| 100 | Agriculture & Livestock |
|
||||||
|
| 101 | Civil Organization |
|
||||||
|
| 102 | Communications |
|
||||||
|
| 103 | Food Systems |
|
||||||
|
| 104 | Foundational Skills |
|
||||||
|
| 105 | Logistics |
|
||||||
|
| 106 | Medical |
|
||||||
|
| 107 | Navigation |
|
||||||
|
| 108 | Operations |
|
||||||
|
| 109 | Power Systems |
|
||||||
|
| 110 | Preservation & Storage |
|
||||||
|
| 111 | Security |
|
||||||
|
| 112 | Shelter & Construction |
|
||||||
|
| 113 | Technology |
|
||||||
|
| 114 | Tools & Equipment |
|
||||||
|
| 115 | Vehicles |
|
||||||
|
| 116 | Water Systems |
|
||||||
|
| 117 | Wilderness Skills |
|
||||||
|
|
||||||
|
Built-in PeerTube categories (IDs 1-18) are not modified.
|
||||||
|
|
||||||
|
## Install
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Copy plugin to PeerTube storage
|
||||||
|
cp -r peertube-plugin-recon-domains /var/www/peertube/storage/plugins/node_modules/
|
||||||
|
|
||||||
|
# Register plugin via API or admin UI
|
||||||
|
# Admin > Plugins > Install > peertube-plugin-recon-domains
|
||||||
|
|
||||||
|
# Restart PeerTube
|
||||||
|
sudo systemctl restart peertube
|
||||||
|
```
|
||||||
|
|
||||||
|
## Uninstall
|
||||||
|
|
||||||
|
Remove the plugin via PeerTube admin UI or:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
rm -rf /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains
|
||||||
|
sudo systemctl restart peertube
|
||||||
|
```
|
||||||
|
|
||||||
|
Videos with RECON categories will revert to showing the raw category ID until the plugin is reinstalled.
|
||||||
32
peertube-plugin/peertube-plugin-recon-domains/main.js
Normal file
32
peertube-plugin/peertube-plugin-recon-domains/main.js
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
async function register ({ videoCategoryManager }) {
|
||||||
|
const reconDomains = {
|
||||||
|
100: 'Agriculture & Livestock',
|
||||||
|
101: 'Civil Organization',
|
||||||
|
102: 'Communications',
|
||||||
|
103: 'Food Systems',
|
||||||
|
104: 'Foundational Skills',
|
||||||
|
105: 'Logistics',
|
||||||
|
106: 'Medical',
|
||||||
|
107: 'Navigation',
|
||||||
|
108: 'Operations',
|
||||||
|
109: 'Power Systems',
|
||||||
|
110: 'Preservation & Storage',
|
||||||
|
111: 'Security',
|
||||||
|
112: 'Shelter & Construction',
|
||||||
|
113: 'Technology',
|
||||||
|
114: 'Tools & Equipment',
|
||||||
|
115: 'Vehicles',
|
||||||
|
116: 'Water Systems',
|
||||||
|
117: 'Wilderness Skills'
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [id, label] of Object.entries(reconDomains)) {
|
||||||
|
videoCategoryManager.addConstant(parseInt(id), label)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function unregister () {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { register, unregister }
|
||||||
21
peertube-plugin/peertube-plugin-recon-domains/package.json
Normal file
21
peertube-plugin/peertube-plugin-recon-domains/package.json
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
{
|
||||||
|
"name": "peertube-plugin-recon-domains",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "Registers 18 RECON knowledge domains as PeerTube video categories (IDs 100-117)",
|
||||||
|
"engine": {
|
||||||
|
"peertube": ">=6.0.0"
|
||||||
|
},
|
||||||
|
"keywords": [
|
||||||
|
"peertube",
|
||||||
|
"plugin"
|
||||||
|
],
|
||||||
|
"homepage": "https://forge.echo6.co/matt/recon",
|
||||||
|
"author": "Echo6",
|
||||||
|
"license": "MIT",
|
||||||
|
"library": "./main.js",
|
||||||
|
"staticDirs": {},
|
||||||
|
"css": [],
|
||||||
|
"clientScripts": [],
|
||||||
|
"translations": {},
|
||||||
|
"bugs": "https://forge.echo6.co/matt/recon/issues"
|
||||||
|
}
|
||||||
170
recon.py
170
recon.py
|
|
@ -863,6 +863,166 @@ def cmd_ingest(args):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_assign_categories(args):
|
||||||
|
"""Assign RECON domains to PeerTube videos and push categories."""
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
from lib.domain_assigner import compute_assignment, run_tiebreaker_pass
|
||||||
|
from lib.peertube_writer import push_pending, extract_uuid
|
||||||
|
from lib.recon_domains import DOMAIN_CATEGORY_MAP
|
||||||
|
|
||||||
|
config = get_config()
|
||||||
|
db = StatusDB()
|
||||||
|
dry_run = args.dry_run
|
||||||
|
limit = args.limit
|
||||||
|
|
||||||
|
if args.backfill:
|
||||||
|
# Pass 1: assign domains to all complete stream docs with no assignment
|
||||||
|
# or that previously got needs_reprocess
|
||||||
|
conn = db._get_conn()
|
||||||
|
q = """SELECT d.hash FROM documents d
|
||||||
|
LEFT JOIN catalogue c ON d.hash = c.hash
|
||||||
|
WHERE d.status = 'complete'
|
||||||
|
AND (d.recon_domain IS NULL
|
||||||
|
OR d.recon_domain_status = 'needs_reprocess')
|
||||||
|
AND c.source = 'stream.echo6.co'
|
||||||
|
ORDER BY d.discovered_at"""
|
||||||
|
if limit:
|
||||||
|
q += f" LIMIT {int(limit)}"
|
||||||
|
rows = conn.execute(q).fetchall()
|
||||||
|
hashes = [r['hash'] for r in rows]
|
||||||
|
|
||||||
|
if not hashes:
|
||||||
|
print("No unassigned complete stream documents found")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
print(f"Backfill: processing {len(hashes)} documents" +
|
||||||
|
(" [DRY RUN]" if dry_run else ""))
|
||||||
|
|
||||||
|
# Create one Qdrant client for the entire backfill
|
||||||
|
qdrant = QdrantClient(
|
||||||
|
host=config['vector_db']['host'],
|
||||||
|
port=config['vector_db']['port'],
|
||||||
|
timeout=60
|
||||||
|
)
|
||||||
|
|
||||||
|
stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0}
|
||||||
|
for i, file_hash in enumerate(hashes):
|
||||||
|
try:
|
||||||
|
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
|
||||||
|
stats[status] = stats.get(status, 0) + 1
|
||||||
|
if not dry_run:
|
||||||
|
db.set_domain_assignment(file_hash, domain, status)
|
||||||
|
if (i + 1) % 1000 == 0:
|
||||||
|
print(f" Progress: {i + 1}/{len(hashes)}")
|
||||||
|
except Exception as e:
|
||||||
|
stats['errors'] += 1
|
||||||
|
logger.warning(f" Assignment error for {file_hash[:12]}: {e}")
|
||||||
|
|
||||||
|
print(f"\nBackfill results:")
|
||||||
|
for k, v in sorted(stats.items()):
|
||||||
|
print(f" {k}: {v}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
elif args.tiebreaker_pass:
|
||||||
|
if dry_run:
|
||||||
|
items = db.get_items_by_domain_status('tied_pass_1')
|
||||||
|
print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]")
|
||||||
|
return 0
|
||||||
|
stats = run_tiebreaker_pass(db, config)
|
||||||
|
print(f"\nTiebreaker results:")
|
||||||
|
for k, v in sorted(stats.items()):
|
||||||
|
print(f" {k}: {v}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
elif args.push_pending:
|
||||||
|
if dry_run:
|
||||||
|
items = db.get_unpushed_assignments()
|
||||||
|
if limit:
|
||||||
|
items = items[:limit]
|
||||||
|
print(f"Push pending: {len(items)} items would be pushed [DRY RUN]")
|
||||||
|
return 0
|
||||||
|
success, failed = push_pending(db, config, limit=limit)
|
||||||
|
print(f"\nPush results: {success} succeeded, {failed} failed")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
elif args.reprocess_missing:
|
||||||
|
items = db.get_items_by_domain_status('needs_reprocess', limit=limit)
|
||||||
|
if not items:
|
||||||
|
print("No items with needs_reprocess status")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else ""))
|
||||||
|
requeued = 0
|
||||||
|
for item in items:
|
||||||
|
file_hash = item['hash']
|
||||||
|
if dry_run:
|
||||||
|
print(f" Would reprocess: {file_hash[:12]} — {item.get('filename', '?')}")
|
||||||
|
requeued += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Reset document status to allow re-processing
|
||||||
|
conn = db._get_conn()
|
||||||
|
conn.execute(
|
||||||
|
"""UPDATE documents SET
|
||||||
|
status = 'catalogued',
|
||||||
|
concepts_extracted = 0,
|
||||||
|
vectors_inserted = 0,
|
||||||
|
recon_domain = NULL,
|
||||||
|
recon_domain_status = NULL,
|
||||||
|
recon_domain_assigned_at = NULL,
|
||||||
|
peertube_category_pushed_at = NULL,
|
||||||
|
error_message = NULL,
|
||||||
|
extracted_at = NULL,
|
||||||
|
enriched_at = NULL,
|
||||||
|
embedded_at = NULL
|
||||||
|
WHERE hash = ?""",
|
||||||
|
(file_hash,)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
# Re-queue for pipeline processing
|
||||||
|
db.queue_document(file_hash)
|
||||||
|
requeued += 1
|
||||||
|
|
||||||
|
print(f"Requeued {requeued} items for reprocessing")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Default: show domain assignment status
|
||||||
|
status_counts = db.get_domain_status_counts()
|
||||||
|
domain_dist = db.get_domain_distribution()
|
||||||
|
|
||||||
|
conn = db._get_conn()
|
||||||
|
total_stream = conn.execute(
|
||||||
|
"""SELECT COUNT(*) as cnt FROM documents d
|
||||||
|
LEFT JOIN catalogue c ON d.hash = c.hash
|
||||||
|
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'"""
|
||||||
|
).fetchone()['cnt']
|
||||||
|
unassigned = conn.execute(
|
||||||
|
"""SELECT COUNT(*) as cnt FROM documents d
|
||||||
|
LEFT JOIN catalogue c ON d.hash = c.hash
|
||||||
|
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'
|
||||||
|
AND d.recon_domain IS NULL"""
|
||||||
|
).fetchone()['cnt']
|
||||||
|
unpushed = len(db.get_unpushed_assignments())
|
||||||
|
|
||||||
|
print("=== Domain Assignment Status ===\n")
|
||||||
|
print(f"Total complete stream docs: {total_stream}")
|
||||||
|
print(f"Unassigned: {unassigned}")
|
||||||
|
print(f"Unpushed to PeerTube: {unpushed}")
|
||||||
|
|
||||||
|
if status_counts:
|
||||||
|
print(f"\nAssignment status breakdown:")
|
||||||
|
for status, cnt in sorted(status_counts.items()):
|
||||||
|
print(f" {status:<20s} {cnt:>6d}")
|
||||||
|
|
||||||
|
if domain_dist:
|
||||||
|
print(f"\nDomain distribution:")
|
||||||
|
for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]):
|
||||||
|
print(f" {domain:<35s} {cnt:>6d}")
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def cmd_pipeline(args):
|
def cmd_pipeline(args):
|
||||||
"""Stream B library pipeline: status, migrate, reverse, watch, sweep."""
|
"""Stream B library pipeline: status, migrate, reverse, watch, sweep."""
|
||||||
from lib.new_pipeline import (
|
from lib.new_pipeline import (
|
||||||
|
|
@ -1158,6 +1318,16 @@ def main():
|
||||||
p.set_defaults(func=cmd_ingest)
|
p.set_defaults(func=cmd_ingest)
|
||||||
|
|
||||||
|
|
||||||
|
# assign-categories
|
||||||
|
p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos')
|
||||||
|
p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs')
|
||||||
|
p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis')
|
||||||
|
p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API')
|
||||||
|
p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items')
|
||||||
|
p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing')
|
||||||
|
p.add_argument('--limit', type=int, help='Limit number of items to process')
|
||||||
|
p.set_defaults(func=cmd_assign_categories)
|
||||||
|
|
||||||
# pipeline (Stream B)
|
# pipeline (Stream B)
|
||||||
p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)')
|
p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)')
|
||||||
p.add_argument('pipeline_action', nargs='?', default='status',
|
p.add_argument('pipeline_action', nargs='?', default='status',
|
||||||
|
|
|
||||||
148
templates/peertube/review.html
Normal file
148
templates/peertube/review.html
Normal file
|
|
@ -0,0 +1,148 @@
|
||||||
|
{% extends "base.html" %}
|
||||||
|
{% block content %}
|
||||||
|
<div id="pt-review">
|
||||||
|
<div class="stat-grid" style="grid-template-columns:repeat(4, 1fr);">
|
||||||
|
<div class="stat-card"><div class="label">Manual Review</div><div class="value" id="rv-manual">—</div></div>
|
||||||
|
<div class="stat-card"><div class="label">Assigned</div><div class="value" id="rv-assigned">—</div></div>
|
||||||
|
<div class="stat-card"><div class="label">Tied (Pass 1)</div><div class="value" id="rv-tied1">—</div></div>
|
||||||
|
<div class="stat-card"><div class="label">Needs Reprocess</div><div class="value" id="rv-reprocess">—</div></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="panel" style="margin-top:16px;">
|
||||||
|
<h3 class="section-title" style="margin-bottom:12px;">Manual Review Queue</h3>
|
||||||
|
<div id="rv-items-container">
|
||||||
|
<table class="data-table" id="rv-table" style="display:none;">
|
||||||
|
<thead>
|
||||||
|
<tr>
|
||||||
|
<th>Video</th>
|
||||||
|
<th>Channel</th>
|
||||||
|
<th>Current Domain</th>
|
||||||
|
<th>Top Domains</th>
|
||||||
|
<th>Assign</th>
|
||||||
|
</tr>
|
||||||
|
</thead>
|
||||||
|
<tbody id="rv-tbody"></tbody>
|
||||||
|
</table>
|
||||||
|
<div id="rv-empty" class="text-muted" style="padding:24px;text-align:center;">Loading...</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
{% endblock %}
|
||||||
|
{% block scripts %}
|
||||||
|
<script>
|
||||||
|
const VALID_DOMAINS = {{ valid_domains | tojson }};
|
||||||
|
|
||||||
|
async function loadStats() {
|
||||||
|
try {
|
||||||
|
const resp = await fetch('/api/peertube/review/stats');
|
||||||
|
const data = await resp.json();
|
||||||
|
document.getElementById('rv-manual').textContent = data.tied_manual || 0;
|
||||||
|
document.getElementById('rv-assigned').textContent =
|
||||||
|
(data.assigned || 0) + (data.tied_pass_2 || 0) + (data.manual_assigned || 0);
|
||||||
|
document.getElementById('rv-tied1').textContent = data.tied_pass_1 || 0;
|
||||||
|
document.getElementById('rv-reprocess').textContent = data.needs_reprocess || 0;
|
||||||
|
} catch (e) {
|
||||||
|
console.error('Failed to load stats:', e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function loadItems() {
|
||||||
|
try {
|
||||||
|
const resp = await fetch('/api/peertube/review/items');
|
||||||
|
const items = await resp.json();
|
||||||
|
const tbody = document.getElementById('rv-tbody');
|
||||||
|
const table = document.getElementById('rv-table');
|
||||||
|
const empty = document.getElementById('rv-empty');
|
||||||
|
|
||||||
|
if (!items.length) {
|
||||||
|
empty.textContent = 'No items pending manual review.';
|
||||||
|
table.style.display = 'none';
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tbody.innerHTML = '';
|
||||||
|
table.style.display = '';
|
||||||
|
empty.style.display = 'none';
|
||||||
|
|
||||||
|
items.forEach(item => {
|
||||||
|
const tr = document.createElement('tr');
|
||||||
|
tr.id = 'row-' + item.hash;
|
||||||
|
|
||||||
|
// Video title/filename
|
||||||
|
const tdVideo = document.createElement('td');
|
||||||
|
tdVideo.textContent = item.filename || item.hash.slice(0, 12);
|
||||||
|
tdVideo.title = item.hash;
|
||||||
|
tr.appendChild(tdVideo);
|
||||||
|
|
||||||
|
// Channel
|
||||||
|
const tdChannel = document.createElement('td');
|
||||||
|
tdChannel.textContent = item.category || '—';
|
||||||
|
tr.appendChild(tdChannel);
|
||||||
|
|
||||||
|
// Current domain
|
||||||
|
const tdCurrent = document.createElement('td');
|
||||||
|
tdCurrent.textContent = item.recon_domain || '—';
|
||||||
|
tr.appendChild(tdCurrent);
|
||||||
|
|
||||||
|
// Top domains (from concept counts)
|
||||||
|
const tdTop = document.createElement('td');
|
||||||
|
if (item.top_domains) {
|
||||||
|
tdTop.innerHTML = item.top_domains.map(d =>
|
||||||
|
'<span class="badge">' + d.domain + ' (' + d.count + ')</span>'
|
||||||
|
).join(' ');
|
||||||
|
}
|
||||||
|
tr.appendChild(tdTop);
|
||||||
|
|
||||||
|
// Assign dropdown + button
|
||||||
|
const tdAssign = document.createElement('td');
|
||||||
|
const sel = document.createElement('select');
|
||||||
|
sel.className = 'input-sm';
|
||||||
|
sel.innerHTML = '<option value="">Select...</option>' +
|
||||||
|
VALID_DOMAINS.map(d => '<option value="' + d + '">' + d + '</option>').join('');
|
||||||
|
if (item.recon_domain) {
|
||||||
|
sel.value = item.recon_domain;
|
||||||
|
}
|
||||||
|
const btn = document.createElement('button');
|
||||||
|
btn.className = 'btn btn-sm btn-primary';
|
||||||
|
btn.textContent = 'Assign';
|
||||||
|
btn.onclick = () => assignDomain(item.hash, sel.value, tr);
|
||||||
|
tdAssign.appendChild(sel);
|
||||||
|
tdAssign.appendChild(btn);
|
||||||
|
tr.appendChild(tdAssign);
|
||||||
|
|
||||||
|
tbody.appendChild(tr);
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
document.getElementById('rv-empty').textContent = 'Error loading items: ' + e.message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function assignDomain(hash, domain, row) {
|
||||||
|
if (!domain) {
|
||||||
|
alert('Select a domain first');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const resp = await fetch('/api/peertube/review/assign', {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {'Content-Type': 'application/json'},
|
||||||
|
body: JSON.stringify({hash: hash, domain: domain})
|
||||||
|
});
|
||||||
|
const result = await resp.json();
|
||||||
|
if (result.ok) {
|
||||||
|
row.style.opacity = '0.4';
|
||||||
|
row.querySelector('button').disabled = true;
|
||||||
|
row.querySelector('button').textContent = 'Done';
|
||||||
|
loadStats();
|
||||||
|
} else {
|
||||||
|
alert('Assignment failed: ' + (result.error || 'unknown error'));
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
alert('Error: ' + e.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loadStats();
|
||||||
|
loadItems();
|
||||||
|
</script>
|
||||||
|
{% endblock %}
|
||||||
71
tests/test_constants_parity.py
Normal file
71
tests/test_constants_parity.py
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Parity test: verify RECON domain taxonomy matches PeerTube categories.
|
||||||
|
|
||||||
|
Tests:
|
||||||
|
1. DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS exactly
|
||||||
|
2. PeerTube API returns all 18 RECON categories (IDs 100-117) with correct labels
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
cd /opt/recon && source venv/bin/activate
|
||||||
|
python3 tests/test_constants_parity.py
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import requests
|
||||||
|
|
||||||
|
# Add parent dir to path for imports
|
||||||
|
sys.path.insert(0, '/opt/recon')
|
||||||
|
from lib.recon_domains import DOMAIN_CATEGORY_MAP, VALID_DOMAINS, CATEGORY_DOMAIN_MAP
|
||||||
|
|
||||||
|
|
||||||
|
def test_local_parity():
|
||||||
|
"""Verify DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS."""
|
||||||
|
map_keys = set(DOMAIN_CATEGORY_MAP.keys())
|
||||||
|
assert map_keys == VALID_DOMAINS, (
|
||||||
|
f"Mismatch: in map but not VALID_DOMAINS: {map_keys - VALID_DOMAINS}, "
|
||||||
|
f"in VALID_DOMAINS but not map: {VALID_DOMAINS - map_keys}"
|
||||||
|
)
|
||||||
|
assert len(CATEGORY_DOMAIN_MAP) == len(DOMAIN_CATEGORY_MAP), "Reverse map size mismatch"
|
||||||
|
print(f"[OK] Local parity: {len(VALID_DOMAINS)} domains, map keys match VALID_DOMAINS")
|
||||||
|
|
||||||
|
|
||||||
|
def test_peertube_categories():
|
||||||
|
"""Verify PeerTube API returns all 18 RECON categories."""
|
||||||
|
url = "http://192.168.1.170:9000/api/v1/videos/categories"
|
||||||
|
headers = {"Host": "stream.echo6.co"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = requests.get(url, headers=headers, timeout=10)
|
||||||
|
resp.raise_for_status()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[SKIP] PeerTube API unreachable: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
categories = resp.json() # dict of {id_str: label}
|
||||||
|
|
||||||
|
missing = []
|
||||||
|
wrong_label = []
|
||||||
|
for domain, cat_id in DOMAIN_CATEGORY_MAP.items():
|
||||||
|
cat_str = str(cat_id)
|
||||||
|
if cat_str not in categories:
|
||||||
|
missing.append((cat_id, domain))
|
||||||
|
elif categories[cat_str] != domain:
|
||||||
|
wrong_label.append((cat_id, domain, categories[cat_str]))
|
||||||
|
|
||||||
|
if missing:
|
||||||
|
print(f"[FAIL] Missing categories in PeerTube: {missing}")
|
||||||
|
print(" Deploy peertube-plugin-recon-domains to CT 110 first")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if wrong_label:
|
||||||
|
print(f"[FAIL] Wrong labels in PeerTube: {wrong_label}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(f"[OK] PeerTube parity: all {len(DOMAIN_CATEGORY_MAP)} categories present with correct labels")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
test_local_parity()
|
||||||
|
test_peertube_categories()
|
||||||
|
print("\nAll parity tests passed.")
|
||||||
Loading…
Add table
Add a link
Reference in a new issue