2026-04-14 14:57:23 +00:00
|
|
|
"""
|
|
|
|
|
RECON Status Tracker
|
|
|
|
|
|
|
|
|
|
SQLite operations for catalogue and documents tables. WAL mode, thread-local connections.
|
|
|
|
|
Status flow: catalogued -> queued -> extracting -> extracted -> enriching -> enriched -> embedding -> complete.
|
|
|
|
|
|
|
|
|
|
Config: paths.db
|
|
|
|
|
"""
|
|
|
|
|
import os
|
|
|
|
|
import sqlite3
|
|
|
|
|
import threading
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
|
|
|
|
from .utils import get_config
|
|
|
|
|
|
|
|
|
|
_local = threading.local()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StatusDB:
|
|
|
|
|
def __init__(self, db_path=None):
|
|
|
|
|
if db_path is None:
|
|
|
|
|
db_path = get_config()['paths']['db']
|
|
|
|
|
self.db_path = db_path
|
|
|
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
|
|
|
self._init_db()
|
|
|
|
|
|
|
|
|
|
def _get_conn(self):
|
|
|
|
|
if not hasattr(_local, 'conn') or _local.conn is None:
|
|
|
|
|
_local.conn = sqlite3.connect(self.db_path, timeout=30)
|
|
|
|
|
_local.conn.row_factory = sqlite3.Row
|
|
|
|
|
_local.conn.execute("PRAGMA journal_mode=WAL")
|
|
|
|
|
_local.conn.execute("PRAGMA busy_timeout=5000")
|
|
|
|
|
return _local.conn
|
|
|
|
|
|
|
|
|
|
def _init_db(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.executescript("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS catalogue (
|
|
|
|
|
hash TEXT PRIMARY KEY,
|
|
|
|
|
filename TEXT NOT NULL,
|
|
|
|
|
path TEXT NOT NULL,
|
|
|
|
|
size_bytes INTEGER,
|
|
|
|
|
source TEXT,
|
|
|
|
|
category TEXT,
|
|
|
|
|
status TEXT DEFAULT 'catalogued',
|
|
|
|
|
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
|
|
|
hash TEXT PRIMARY KEY,
|
|
|
|
|
filename TEXT NOT NULL,
|
|
|
|
|
path TEXT,
|
|
|
|
|
size_bytes INTEGER,
|
|
|
|
|
page_count INTEGER,
|
|
|
|
|
book_title TEXT,
|
|
|
|
|
book_author TEXT,
|
|
|
|
|
collection TEXT DEFAULT 'survival',
|
|
|
|
|
status TEXT DEFAULT 'pending',
|
|
|
|
|
pages_extracted INTEGER DEFAULT 0,
|
|
|
|
|
concepts_extracted INTEGER DEFAULT 0,
|
|
|
|
|
vectors_inserted INTEGER DEFAULT 0,
|
|
|
|
|
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
extracted_at TEXT,
|
|
|
|
|
enriched_at TEXT,
|
|
|
|
|
embedded_at TEXT,
|
|
|
|
|
error_message TEXT,
|
|
|
|
|
retry_count INTEGER DEFAULT 0
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS intel (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
source TEXT,
|
|
|
|
|
timestamp TEXT,
|
|
|
|
|
region TEXT,
|
|
|
|
|
category TEXT,
|
|
|
|
|
content TEXT,
|
|
|
|
|
summary TEXT,
|
|
|
|
|
key_facts TEXT,
|
|
|
|
|
credibility_score REAL,
|
|
|
|
|
verification_status TEXT,
|
|
|
|
|
vector_id INTEGER,
|
|
|
|
|
ingested_at TEXT DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS metrics_snapshots (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
timestamp TEXT NOT NULL,
|
|
|
|
|
metric_type TEXT NOT NULL,
|
|
|
|
|
data TEXT NOT NULL,
|
|
|
|
|
UNIQUE(timestamp, metric_type)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_catalogue_status ON catalogue(status);
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_catalogue_source ON catalogue(source);
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_documents_status ON documents(status);
|
|
|
|
|
""")
|
|
|
|
|
# Migration: add path_updated_at column if missing
|
|
|
|
|
try:
|
|
|
|
|
conn.execute("ALTER TABLE catalogue ADD COLUMN path_updated_at TEXT")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # column already exists
|
|
|
|
|
# Migration: add organized_at column to documents if missing
|
|
|
|
|
try:
|
|
|
|
|
conn.execute("ALTER TABLE documents ADD COLUMN organized_at TEXT")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # column already exists
|
|
|
|
|
|
2026-04-18 18:26:43 +00:00
|
|
|
# Migration: add subprocess_pid column to scrape_jobs if missing
|
|
|
|
|
try:
|
|
|
|
|
conn.execute("ALTER TABLE scrape_jobs ADD COLUMN subprocess_pid INTEGER")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # column already exists
|
|
|
|
|
|
|
|
|
|
# Migration: add reject pattern columns to scrape_jobs if missing
|
|
|
|
|
for col, coltype in [('additional_reject_patterns', 'TEXT'), ('skip_default_patterns', 'INTEGER DEFAULT 0')]:
|
|
|
|
|
try:
|
|
|
|
|
conn.execute(f"ALTER TABLE scrape_jobs ADD COLUMN {col} {coltype}")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # column already exists
|
|
|
|
|
|
|
|
|
|
# Migration: add crawl_mode column to scrape_jobs if missing
|
|
|
|
|
try:
|
|
|
|
|
conn.execute("ALTER TABLE scrape_jobs ADD COLUMN crawl_mode TEXT")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # column already exists
|
|
|
|
|
|
2026-04-14 14:57:23 +00:00
|
|
|
# Stream B: file_operations + duplicate_review tables
|
|
|
|
|
conn.executescript("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS file_operations (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
doc_hash TEXT NOT NULL,
|
|
|
|
|
operation TEXT NOT NULL,
|
|
|
|
|
source_path TEXT NOT NULL,
|
|
|
|
|
target_path TEXT NOT NULL,
|
|
|
|
|
source_filename TEXT NOT NULL,
|
|
|
|
|
target_filename TEXT NOT NULL,
|
|
|
|
|
original_filename TEXT,
|
|
|
|
|
collision_step INTEGER,
|
|
|
|
|
qdrant_points_updated INTEGER DEFAULT 0,
|
|
|
|
|
performed_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
reversed_at TEXT,
|
|
|
|
|
notes TEXT
|
|
|
|
|
);
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_fileops_hash ON file_operations(doc_hash);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS duplicate_review (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
doc_hash TEXT NOT NULL,
|
|
|
|
|
original_filename TEXT NOT NULL,
|
|
|
|
|
sanitized_filename TEXT NOT NULL,
|
|
|
|
|
collision_with_hash TEXT,
|
|
|
|
|
collision_path TEXT,
|
|
|
|
|
duplicate_path TEXT NOT NULL,
|
|
|
|
|
domain TEXT,
|
|
|
|
|
subdomain TEXT,
|
|
|
|
|
book_author TEXT,
|
|
|
|
|
book_title TEXT,
|
|
|
|
|
status TEXT DEFAULT 'pending',
|
|
|
|
|
resolution TEXT,
|
|
|
|
|
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
resolved_at TEXT
|
|
|
|
|
);
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status);
|
2026-04-18 18:26:43 +00:00
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS scrape_jobs (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
url TEXT NOT NULL,
|
|
|
|
|
title TEXT,
|
|
|
|
|
language TEXT DEFAULT 'eng',
|
|
|
|
|
category TEXT,
|
|
|
|
|
status TEXT DEFAULT 'pending',
|
|
|
|
|
page_count INTEGER DEFAULT 0,
|
|
|
|
|
error_message TEXT,
|
|
|
|
|
zim_filename TEXT,
|
|
|
|
|
zim_source_id INTEGER,
|
|
|
|
|
workspace_path TEXT,
|
|
|
|
|
subprocess_pid INTEGER,
|
|
|
|
|
additional_reject_patterns TEXT,
|
|
|
|
|
skip_default_patterns INTEGER DEFAULT 0,
|
|
|
|
|
crawl_mode TEXT,
|
|
|
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
started_at TEXT,
|
|
|
|
|
completed_at TEXT
|
|
|
|
|
);
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_scrape_status ON scrape_jobs(status);
|
2026-04-14 14:57:23 +00:00
|
|
|
""")
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def add_to_catalogue(self, file_hash, filename, path, size_bytes, source, category):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"""INSERT INTO catalogue (hash, filename, path, size_bytes, source, category)
|
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
|
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
|
|
|
path = excluded.path,
|
|
|
|
|
filename = excluded.filename,
|
|
|
|
|
source = excluded.source,
|
|
|
|
|
category = excluded.category,
|
|
|
|
|
path_updated_at = CASE
|
|
|
|
|
WHEN catalogue.path != excluded.path THEN CURRENT_TIMESTAMP
|
|
|
|
|
ELSE catalogue.path_updated_at
|
|
|
|
|
END""",
|
|
|
|
|
(file_hash, filename, path, size_bytes, source, category)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def queue_document(self, file_hash):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
row = conn.execute("SELECT * FROM catalogue WHERE hash = ?", (file_hash,)).fetchone()
|
|
|
|
|
if not row:
|
|
|
|
|
return False
|
|
|
|
|
conn.execute("UPDATE catalogue SET status = 'queued' WHERE hash = ?", (file_hash,))
|
|
|
|
|
conn.execute(
|
|
|
|
|
"""INSERT INTO documents (hash, filename, path, size_bytes, status)
|
|
|
|
|
VALUES (?, ?, ?, ?, 'queued')
|
|
|
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
|
|
|
path = excluded.path,
|
|
|
|
|
filename = excluded.filename""",
|
|
|
|
|
(row['hash'], row['filename'], row['path'], row['size_bytes'])
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def update_status(self, file_hash, status, **kwargs):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
sets = ["status = ?"]
|
|
|
|
|
vals = [status]
|
|
|
|
|
|
|
|
|
|
ts_field = {
|
|
|
|
|
'extracted': 'extracted_at',
|
|
|
|
|
'enriched': 'enriched_at',
|
|
|
|
|
'complete': 'embedded_at',
|
|
|
|
|
}.get(status)
|
|
|
|
|
if ts_field:
|
|
|
|
|
sets.append(f"{ts_field} = ?")
|
|
|
|
|
vals.append(datetime.now(timezone.utc).isoformat())
|
|
|
|
|
|
|
|
|
|
for k, v in kwargs.items():
|
|
|
|
|
sets.append(f"{k} = ?")
|
|
|
|
|
vals.append(v)
|
|
|
|
|
|
|
|
|
|
vals.append(file_hash)
|
|
|
|
|
conn.execute(f"UPDATE documents SET {', '.join(sets)} WHERE hash = ?", vals)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def get_by_status(self, status, limit=None):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
q = "SELECT * FROM documents WHERE status = ? ORDER BY discovered_at"
|
|
|
|
|
if limit:
|
|
|
|
|
q += f" LIMIT {int(limit)}"
|
|
|
|
|
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_catalogued(self, source=None, category=None, limit=None):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
q = "SELECT * FROM catalogue WHERE status = 'catalogued'"
|
|
|
|
|
params = []
|
|
|
|
|
if source:
|
|
|
|
|
q += " AND source = ?"
|
|
|
|
|
params.append(source)
|
|
|
|
|
if category:
|
|
|
|
|
q += " AND category = ?"
|
|
|
|
|
params.append(category)
|
|
|
|
|
q += " ORDER BY discovered_at"
|
|
|
|
|
if limit:
|
|
|
|
|
q += f" LIMIT {int(limit)}"
|
|
|
|
|
return [dict(r) for r in conn.execute(q, params).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_document(self, file_hash):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
row = conn.execute("SELECT * FROM documents WHERE hash = ?", (file_hash,)).fetchone()
|
|
|
|
|
return dict(row) if row else None
|
|
|
|
|
|
|
|
|
|
def get_status_counts(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
cat_counts = {}
|
|
|
|
|
for row in conn.execute("SELECT status, COUNT(*) as cnt FROM catalogue GROUP BY status"):
|
|
|
|
|
cat_counts[row['status']] = row['cnt']
|
|
|
|
|
|
|
|
|
|
doc_counts = {}
|
|
|
|
|
for row in conn.execute("SELECT status, COUNT(*) as cnt FROM documents GROUP BY status"):
|
|
|
|
|
doc_counts[row['status']] = row['cnt']
|
|
|
|
|
|
|
|
|
|
return {'catalogue': cat_counts, 'documents': doc_counts}
|
|
|
|
|
|
|
|
|
|
def get_failures(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT * FROM documents WHERE status = 'failed' ORDER BY discovered_at"
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def mark_failed(self, file_hash, error_msg):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE documents SET status = 'failed', error_message = ? WHERE hash = ?",
|
|
|
|
|
(str(error_msg)[:1000], file_hash)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def increment_retry(self, file_hash):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE documents SET retry_count = retry_count + 1, status = 'queued', error_message = NULL WHERE hash = ?",
|
|
|
|
|
(file_hash,)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def get_sources(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
return [r[0] for r in conn.execute(
|
|
|
|
|
"SELECT DISTINCT source FROM catalogue ORDER BY source"
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_categories(self, source=None):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
if source:
|
|
|
|
|
return [r[0] for r in conn.execute(
|
|
|
|
|
"SELECT DISTINCT category FROM catalogue WHERE source = ? ORDER BY category", (source,)
|
|
|
|
|
).fetchall()]
|
|
|
|
|
return [r[0] for r in conn.execute(
|
|
|
|
|
"SELECT DISTINCT category FROM catalogue ORDER BY category"
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_all_documents(self, status=None, source=None, category=None, limit=None, offset=None):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
q = """SELECT d.*, c.source, c.category FROM documents d
|
|
|
|
|
LEFT JOIN catalogue c ON d.hash = c.hash WHERE 1=1"""
|
|
|
|
|
params = []
|
|
|
|
|
if status:
|
|
|
|
|
q += " AND d.status = ?"
|
|
|
|
|
params.append(status)
|
|
|
|
|
if source:
|
|
|
|
|
q += " AND c.source = ?"
|
|
|
|
|
params.append(source)
|
|
|
|
|
if category:
|
|
|
|
|
q += " AND c.category = ?"
|
|
|
|
|
params.append(category)
|
|
|
|
|
q += " ORDER BY d.discovered_at DESC"
|
|
|
|
|
if limit:
|
|
|
|
|
q += f" LIMIT {int(limit)}"
|
|
|
|
|
if offset:
|
|
|
|
|
q += f" OFFSET {int(offset)}"
|
|
|
|
|
return [dict(r) for r in conn.execute(q, params).fetchall()]
|
|
|
|
|
|
|
|
|
|
def count_documents(self, source=None, category=None):
|
|
|
|
|
"""Count documents matching optional source/category filters."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
q = """SELECT COUNT(*) FROM documents d
|
|
|
|
|
LEFT JOIN catalogue c ON d.hash = c.hash WHERE 1=1"""
|
|
|
|
|
params = []
|
|
|
|
|
if source:
|
|
|
|
|
q += " AND c.source = ?"
|
|
|
|
|
params.append(source)
|
|
|
|
|
if category:
|
|
|
|
|
q += " AND c.category = ?"
|
|
|
|
|
params.append(category)
|
|
|
|
|
return conn.execute(q, params).fetchone()[0]
|
|
|
|
|
|
|
|
|
|
def catalogue_count(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
return conn.execute("SELECT COUNT(*) FROM catalogue").fetchone()[0]
|
|
|
|
|
|
|
|
|
|
def source_breakdown(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT source, COUNT(*) as count, SUM(size_bytes) as total_bytes FROM catalogue GROUP BY source ORDER BY count DESC"
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def category_breakdown(self, source=None):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
if source:
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT category, COUNT(*) as count FROM catalogue WHERE source = ? GROUP BY category ORDER BY count DESC",
|
|
|
|
|
(source,)
|
|
|
|
|
).fetchall()]
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT source, category, COUNT(*) as count FROM catalogue GROUP BY source, category ORDER BY source, count DESC"
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_path_updates(self):
|
|
|
|
|
"""Get catalogue entries where path was updated since last sync."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT hash, filename, path, source, category FROM catalogue "
|
|
|
|
|
"WHERE path_updated_at IS NOT NULL"
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def clear_path_update(self, file_hash):
|
|
|
|
|
"""Clear path_updated_at flag after Qdrant sync."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE catalogue SET path_updated_at = NULL WHERE hash = ?",
|
|
|
|
|
(file_hash,)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def sync_document_path(self, file_hash, path, filename):
|
|
|
|
|
"""Update path and filename in documents table."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE documents SET path = ?, filename = ? WHERE hash = ?",
|
|
|
|
|
(path, filename, file_hash)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def status_breakdown(self):
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
rows = conn.execute(
|
|
|
|
|
"SELECT status, COUNT(*) as count FROM catalogue GROUP BY status ORDER BY count DESC"
|
|
|
|
|
).fetchall()
|
|
|
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
|
|
|
|
def get_unorganized(self, limit=None):
|
|
|
|
|
"""Get completed documents that haven't been organized yet."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
q = "SELECT hash, filename, path FROM documents WHERE status = 'complete' AND organized_at IS NULL ORDER BY embedded_at"
|
|
|
|
|
if limit:
|
|
|
|
|
q += " LIMIT {}".format(int(limit))
|
|
|
|
|
return [dict(r) for r in conn.execute(q).fetchall()]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_ingest_pending(self, ingest_dir, limit=50):
|
|
|
|
|
"""Get completed docs in _ingest/ that haven't been organized."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
pattern = ingest_dir + '%'
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT hash, filename, path FROM documents "
|
|
|
|
|
"WHERE status = 'complete' AND organized_at IS NULL AND path LIKE ? "
|
|
|
|
|
"ORDER BY embedded_at LIMIT ?",
|
|
|
|
|
(pattern, limit)
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def mark_organized(self, file_hash):
|
|
|
|
|
"""Mark a document as organized (sets organized_at timestamp)."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE documents SET organized_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
|
|
|
(file_hash,)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def update_catalogue_path(self, file_hash, new_path, new_filename):
|
|
|
|
|
"""Update catalogue path/filename and flag for Qdrant sync."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE catalogue SET path = ?, filename = ?, path_updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
|
|
|
(new_path, new_filename, file_hash)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
2026-04-18 18:26:43 +00:00
|
|
|
|
|
|
|
|
# ── Scraper Job Helpers ─────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_pending_scrape_job(self):
|
|
|
|
|
"""Fetch the oldest pending scrape job."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
row = conn.execute(
|
|
|
|
|
"SELECT * FROM scrape_jobs WHERE status = 'pending' ORDER BY id ASC LIMIT 1"
|
|
|
|
|
).fetchone()
|
|
|
|
|
return dict(row) if row else None
|
|
|
|
|
|
|
|
|
|
def update_scrape_job(self, job_id, **kwargs):
|
|
|
|
|
"""Update arbitrary columns on a scrape job."""
|
|
|
|
|
if not kwargs:
|
|
|
|
|
return
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
sets = []
|
|
|
|
|
vals = []
|
|
|
|
|
for k, v in kwargs.items():
|
|
|
|
|
sets.append(f"{k} = ?")
|
|
|
|
|
vals.append(v)
|
|
|
|
|
vals.append(job_id)
|
|
|
|
|
conn.execute(f"UPDATE scrape_jobs SET {', '.join(sets)} WHERE id = ?", vals)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def get_scrape_jobs(self, status=None):
|
|
|
|
|
"""List scrape jobs, optionally filtered by status."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
if status:
|
|
|
|
|
rows = conn.execute(
|
|
|
|
|
"SELECT * FROM scrape_jobs WHERE status = ? ORDER BY id DESC", (status,)
|
|
|
|
|
).fetchall()
|
|
|
|
|
else:
|
|
|
|
|
rows = conn.execute(
|
|
|
|
|
"SELECT * FROM scrape_jobs ORDER BY id DESC"
|
|
|
|
|
).fetchall()
|
|
|
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
|
|
|
|
def get_scrape_job(self, job_id):
|
|
|
|
|
"""Get a single scrape job by ID."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
row = conn.execute("SELECT * FROM scrape_jobs WHERE id = ?", (job_id,)).fetchone()
|
|
|
|
|
return dict(row) if row else None
|
|
|
|
|
|
2026-04-14 14:57:23 +00:00
|
|
|
# ── Stream B: File Operations ───────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def log_file_operation(self, doc_hash, operation, source_path, target_path,
|
|
|
|
|
source_filename, target_filename, original_filename=None,
|
|
|
|
|
collision_step=None, qdrant_points_updated=0, notes=None):
|
|
|
|
|
"""Log a file move/rename operation for audit trail and rollback."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"""INSERT INTO file_operations
|
|
|
|
|
(doc_hash, operation, source_path, target_path,
|
|
|
|
|
source_filename, target_filename, original_filename,
|
|
|
|
|
collision_step, qdrant_points_updated, notes)
|
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
|
|
|
(doc_hash, operation, source_path, target_path,
|
|
|
|
|
source_filename, target_filename, original_filename,
|
|
|
|
|
collision_step, qdrant_points_updated, notes)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
return conn.execute("SELECT last_insert_rowid()").fetchone()[0]
|
|
|
|
|
|
|
|
|
|
def get_file_operations(self, doc_hash=None, limit=50):
|
|
|
|
|
"""Get file operations, optionally filtered by doc_hash."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
if doc_hash:
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT * FROM file_operations WHERE doc_hash = ? ORDER BY performed_at DESC LIMIT ?",
|
|
|
|
|
(doc_hash, limit)
|
|
|
|
|
).fetchall()]
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT * FROM file_operations WHERE reversed_at IS NULL ORDER BY performed_at DESC LIMIT ?",
|
|
|
|
|
(limit,)
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_file_operation(self, op_id):
|
|
|
|
|
"""Get a single file operation by ID."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
row = conn.execute("SELECT * FROM file_operations WHERE id = ?", (op_id,)).fetchone()
|
|
|
|
|
return dict(row) if row else None
|
|
|
|
|
|
|
|
|
|
def mark_operation_reversed(self, op_id):
|
|
|
|
|
"""Mark a file operation as reversed."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"UPDATE file_operations SET reversed_at = CURRENT_TIMESTAMP WHERE id = ?",
|
|
|
|
|
(op_id,)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def queue_duplicate_review(self, doc_hash, original_filename, sanitized_filename,
|
|
|
|
|
collision_with_hash=None, collision_path=None,
|
|
|
|
|
duplicate_path='', domain=None, subdomain=None,
|
|
|
|
|
book_author=None, book_title=None):
|
|
|
|
|
"""Queue a file for human duplicate review."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
conn.execute(
|
|
|
|
|
"""INSERT INTO duplicate_review
|
|
|
|
|
(doc_hash, original_filename, sanitized_filename,
|
|
|
|
|
collision_with_hash, collision_path, duplicate_path,
|
|
|
|
|
domain, subdomain, book_author, book_title)
|
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
|
|
|
(doc_hash, original_filename, sanitized_filename,
|
|
|
|
|
collision_with_hash, collision_path, duplicate_path,
|
|
|
|
|
domain, subdomain, book_author, book_title)
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
def get_duplicate_reviews(self, status='pending', limit=50):
|
|
|
|
|
"""Get duplicate review queue."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
return [dict(r) for r in conn.execute(
|
|
|
|
|
"SELECT * FROM duplicate_review WHERE status = ? ORDER BY discovered_at DESC LIMIT ?",
|
|
|
|
|
(status, limit)
|
|
|
|
|
).fetchall()]
|
|
|
|
|
|
|
|
|
|
def get_pipeline_stats(self):
|
|
|
|
|
"""Get Stream B pipeline statistics."""
|
|
|
|
|
conn = self._get_conn()
|
|
|
|
|
ops = conn.execute(
|
|
|
|
|
"SELECT operation, COUNT(*) as cnt FROM file_operations WHERE reversed_at IS NULL GROUP BY operation"
|
|
|
|
|
).fetchall()
|
|
|
|
|
dupes = conn.execute(
|
|
|
|
|
"SELECT status, COUNT(*) as cnt FROM duplicate_review GROUP BY status"
|
|
|
|
|
).fetchall()
|
|
|
|
|
acquired = 0
|
|
|
|
|
ingest = 0
|
|
|
|
|
try:
|
|
|
|
|
acquired_dir = get_config().get('new_pipeline', {}).get('acquired_dir', '')
|
|
|
|
|
ingest_dir = get_config().get('new_pipeline', {}).get('ingest_dir', '')
|
|
|
|
|
if acquired_dir and os.path.isdir(acquired_dir):
|
|
|
|
|
acquired = len([f for f in os.listdir(acquired_dir) if f.lower().endswith('.pdf')])
|
|
|
|
|
if ingest_dir and os.path.isdir(ingest_dir):
|
|
|
|
|
ingest = len([f for f in os.listdir(ingest_dir) if f.lower().endswith('.pdf')])
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
return {
|
|
|
|
|
'operations': {dict(r)['operation']: dict(r)['cnt'] for r in ops},
|
|
|
|
|
'duplicates': {dict(r)['status']: dict(r)['cnt'] for r in dupes},
|
|
|
|
|
'acquired_pending': acquired,
|
|
|
|
|
'ingest_pending': ingest,
|
|
|
|
|
}
|