Phase 2: documents table schema for domain assignment

Adds four columns to documents table via idempotent ALTER TABLE
migrations: recon_domain, recon_domain_status, recon_domain_assigned_at,
peertube_category_pushed_at. Adds index on recon_domain_status.

Includes StatusDB helper methods: get/set_domain_assignment,
set_peertube_pushed, get_unpushed_assignments, get_items_by_domain_status,
get_domain_status_counts, get_domain_distribution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-28 00:04:29 +00:00
commit 5f36c52fb1

View file

@ -124,6 +124,22 @@ class StatusDB:
except Exception: except Exception:
pass # column already exists pass # column already exists
# Migration: domain assignment columns for PeerTube categorization
for col, coltype in [
('recon_domain', 'TEXT'),
('recon_domain_status', 'TEXT'),
('recon_domain_assigned_at', 'TEXT'),
('peertube_category_pushed_at', 'TEXT'),
]:
try:
conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}")
except Exception:
pass # column already exists
try:
conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)")
except Exception:
pass # index already exists
# Stream B: file_operations + duplicate_review tables # Stream B: file_operations + duplicate_review tables
conn.executescript(""" conn.executescript("""
CREATE TABLE IF NOT EXISTS file_operations ( CREATE TABLE IF NOT EXISTS file_operations (
@ -448,6 +464,90 @@ class StatusDB:
conn.commit() conn.commit()
# ── Domain Assignment Helpers ──────────────────<E29480><E29480>─────────────
def get_domain_assignment(self, file_hash):
"""Get domain assignment for a document.
Returns:
(recon_domain, recon_domain_status) tuple, or (None, None) if not set.
"""
conn = self._get_conn()
row = conn.execute(
"SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?",
(file_hash,)
).fetchone()
if row:
return (row['recon_domain'], row['recon_domain_status'])
return (None, None)
def set_domain_assignment(self, file_hash, domain, status):
"""Set domain assignment and status for a document."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET recon_domain = ?, recon_domain_status = ?, "
"recon_domain_assigned_at = ? WHERE hash = ?",
(domain, status, datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def set_peertube_pushed(self, file_hash):
"""Mark a document's category as pushed to PeerTube."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?",
(datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def get_unpushed_assignments(self):
"""Get documents with domain assignments not yet pushed to PeerTube.
Only returns stream.echo6.co source documents.
"""
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"""SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain IS NOT NULL
AND d.peertube_category_pushed_at IS NULL
AND c.source = 'stream.echo6.co'
ORDER BY d.recon_domain_assigned_at""",
).fetchall()]
def get_items_by_domain_status(self, status, limit=None):
"""Get documents by domain assignment status."""
conn = self._get_conn()
q = """SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain_status = ?
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
def get_domain_status_counts(self):
"""Get counts of documents by domain assignment status."""
conn = self._get_conn()
return {row['recon_domain_status']: row['cnt']
for row in conn.execute(
"SELECT recon_domain_status, COUNT(*) as cnt "
"FROM documents WHERE recon_domain_status IS NOT NULL "
"GROUP BY recon_domain_status"
).fetchall()}
def get_domain_distribution(self):
"""Get counts of documents per assigned domain."""
conn = self._get_conn()
return {row['recon_domain']: row['cnt']
for row in conn.execute(
"SELECT recon_domain, COUNT(*) as cnt "
"FROM documents WHERE recon_domain IS NOT NULL "
"GROUP BY recon_domain ORDER BY cnt DESC"
).fetchall()}
# ── Scraper Job Helpers ───────────────────────────────────── # ── Scraper Job Helpers ─────────────────────────────────────
def get_pending_scrape_job(self): def get_pending_scrape_job(self):