From 5f36c52fb156f754f0819f4a20da3f300114a911 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 28 Apr 2026 00:04:29 +0000 Subject: [PATCH] Phase 2: documents table schema for domain assignment Adds four columns to documents table via idempotent ALTER TABLE migrations: recon_domain, recon_domain_status, recon_domain_assigned_at, peertube_category_pushed_at. Adds index on recon_domain_status. Includes StatusDB helper methods: get/set_domain_assignment, set_peertube_pushed, get_unpushed_assignments, get_items_by_domain_status, get_domain_status_counts, get_domain_distribution. Co-Authored-By: Claude Opus 4.6 --- lib/status.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/lib/status.py b/lib/status.py index 974cabd..c710846 100644 --- a/lib/status.py +++ b/lib/status.py @@ -124,6 +124,22 @@ class StatusDB: except Exception: pass # column already exists + # Migration: domain assignment columns for PeerTube categorization + for col, coltype in [ + ('recon_domain', 'TEXT'), + ('recon_domain_status', 'TEXT'), + ('recon_domain_assigned_at', 'TEXT'), + ('peertube_category_pushed_at', 'TEXT'), + ]: + try: + conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}") + except Exception: + pass # column already exists + try: + conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)") + except Exception: + pass # index already exists + # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -448,7 +464,91 @@ class StatusDB: conn.commit() - # ── Scraper Job Helpers ───────────────────────────────────── + # ── Domain Assignment Helpers ──────────────────��───────────── + + def get_domain_assignment(self, file_hash): + """Get domain assignment for a document. + + Returns: + (recon_domain, recon_domain_status) tuple, or (None, None) if not set. + """ + conn = self._get_conn() + row = conn.execute( + "SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?", + (file_hash,) + ).fetchone() + if row: + return (row['recon_domain'], row['recon_domain_status']) + return (None, None) + + def set_domain_assignment(self, file_hash, domain, status): + """Set domain assignment and status for a document.""" + conn = self._get_conn() + conn.execute( + "UPDATE documents SET recon_domain = ?, recon_domain_status = ?, " + "recon_domain_assigned_at = ? WHERE hash = ?", + (domain, status, datetime.now(timezone.utc).isoformat(), file_hash) + ) + conn.commit() + + def set_peertube_pushed(self, file_hash): + """Mark a document's category as pushed to PeerTube.""" + conn = self._get_conn() + conn.execute( + "UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?", + (datetime.now(timezone.utc).isoformat(), file_hash) + ) + conn.commit() + + def get_unpushed_assignments(self): + """Get documents with domain assignments not yet pushed to PeerTube. + + Only returns stream.echo6.co source documents. + """ + conn = self._get_conn() + return [dict(r) for r in conn.execute( + """SELECT d.*, c.source, c.category, c.path as catalogue_path + FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.recon_domain IS NOT NULL + AND d.peertube_category_pushed_at IS NULL + AND c.source = 'stream.echo6.co' + ORDER BY d.recon_domain_assigned_at""", + ).fetchall()] + + def get_items_by_domain_status(self, status, limit=None): + """Get documents by domain assignment status.""" + conn = self._get_conn() + q = """SELECT d.*, c.source, c.category, c.path as catalogue_path + FROM documents d + LEFT JOIN catalogue c ON d.hash = c.hash + WHERE d.recon_domain_status = ? + ORDER BY d.discovered_at""" + if limit: + q += f" LIMIT {int(limit)}" + return [dict(r) for r in conn.execute(q, (status,)).fetchall()] + + def get_domain_status_counts(self): + """Get counts of documents by domain assignment status.""" + conn = self._get_conn() + return {row['recon_domain_status']: row['cnt'] + for row in conn.execute( + "SELECT recon_domain_status, COUNT(*) as cnt " + "FROM documents WHERE recon_domain_status IS NOT NULL " + "GROUP BY recon_domain_status" + ).fetchall()} + + def get_domain_distribution(self): + """Get counts of documents per assigned domain.""" + conn = self._get_conn() + return {row['recon_domain']: row['cnt'] + for row in conn.execute( + "SELECT recon_domain, COUNT(*) as cnt " + "FROM documents WHERE recon_domain IS NOT NULL " + "GROUP BY recon_domain ORDER BY cnt DESC" + ).fetchall()} + + # ── Scraper Job Helpers ───────────────────────────────────── def get_pending_scrape_job(self): """Fetch the oldest pending scrape job."""