mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 22:54:46 +02:00
320 lines
11 KiB
Python
320 lines
11 KiB
Python
|
|
"""
|
||
|
|
RECON Text Processor
|
||
|
|
|
||
|
|
Handles pre_flight for plain .txt files arriving in acquired/text/.
|
||
|
|
These are primary source documents (books, manuals, guides), not derived
|
||
|
|
content like transcripts.
|
||
|
|
|
||
|
|
Metadata extraction via two-source vote:
|
||
|
|
A. Filename parsing (title, optionally author)
|
||
|
|
B. Gemini LLM extraction (title/author/edition/year from first 3 pages)
|
||
|
|
|
||
|
|
Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/
|
||
|
|
by the filing worker (NOT organized in-place like transcripts).
|
||
|
|
|
||
|
|
Phase 6f: initial implementation.
|
||
|
|
"""
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import shutil
|
||
|
|
|
||
|
|
from lib.web_scraper import chunk_text
|
||
|
|
from lib.utils import content_hash, clean_filename_to_title
|
||
|
|
from lib.processors.pdf_processor import _extract_gemini_metadata
|
||
|
|
|
||
|
|
logger = logging.getLogger("recon.processors.text")
|
||
|
|
|
||
|
|
WORDS_PER_PAGE = 2000
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_filename_metadata(filename):
|
||
|
|
"""Source A: Extract metadata by parsing the filename.
|
||
|
|
|
||
|
|
Returns dict with keys: title, author, edition, year (any may be None).
|
||
|
|
"""
|
||
|
|
result = {'title': None, 'author': None, 'edition': None, 'year': None}
|
||
|
|
|
||
|
|
stem = os.path.splitext(filename)[0]
|
||
|
|
|
||
|
|
result['title'] = clean_filename_to_title(filename)
|
||
|
|
|
||
|
|
# Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY
|
||
|
|
year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem)
|
||
|
|
if year_match:
|
||
|
|
result['year'] = year_match.group(1)
|
||
|
|
|
||
|
|
# Edition: Nth Edition, Edition N, etc.
|
||
|
|
ed_match = re.search(
|
||
|
|
r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)',
|
||
|
|
stem, re.IGNORECASE
|
||
|
|
)
|
||
|
|
if ed_match:
|
||
|
|
result['edition'] = ed_match.group(0).strip()
|
||
|
|
|
||
|
|
# Author: "Title - Author" or "Title by Author" patterns
|
||
|
|
# Try " - Author" at end
|
||
|
|
dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem)
|
||
|
|
if dash_match:
|
||
|
|
result['author'] = dash_match.group(1).strip()
|
||
|
|
else:
|
||
|
|
by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem)
|
||
|
|
if by_match:
|
||
|
|
result['author'] = by_match.group(1).strip()
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def _vote_metadata(source_a, source_b):
|
||
|
|
"""Vote on each metadata field across two sources.
|
||
|
|
|
||
|
|
Priority: B (Gemini) > A (filename).
|
||
|
|
If both agree, note agreement.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
(voted_dict, provenance_record)
|
||
|
|
"""
|
||
|
|
sources = {'filename': source_a, 'gemini': source_b}
|
||
|
|
priority = ['gemini', 'filename']
|
||
|
|
voted = {}
|
||
|
|
provenance = {}
|
||
|
|
|
||
|
|
for field in ('title', 'author', 'edition', 'year'):
|
||
|
|
values = {}
|
||
|
|
for name, src in sources.items():
|
||
|
|
val = src.get(field)
|
||
|
|
if val and str(val).strip().lower() != 'null':
|
||
|
|
values[name] = val
|
||
|
|
|
||
|
|
if not values:
|
||
|
|
voted[field] = None
|
||
|
|
provenance[field] = None
|
||
|
|
continue
|
||
|
|
|
||
|
|
def _norm(v):
|
||
|
|
if field == 'year':
|
||
|
|
return v.strip()
|
||
|
|
return v.strip().lower()
|
||
|
|
|
||
|
|
norm_groups = {}
|
||
|
|
for name, val in values.items():
|
||
|
|
nv = _norm(val)
|
||
|
|
norm_groups.setdefault(nv, []).append((name, val))
|
||
|
|
|
||
|
|
best_group = max(norm_groups.values(), key=len)
|
||
|
|
|
||
|
|
if len(best_group) >= 2:
|
||
|
|
# Both sources agree
|
||
|
|
for p in priority:
|
||
|
|
for name, val in best_group:
|
||
|
|
if name == p:
|
||
|
|
voted[field] = val
|
||
|
|
names = ','.join(n for n, _ in best_group)
|
||
|
|
provenance[field] = "agreed({})".format(names)
|
||
|
|
break
|
||
|
|
if voted.get(field) is not None:
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
# No agreement — highest priority wins
|
||
|
|
for p in priority:
|
||
|
|
if p in values:
|
||
|
|
voted[field] = values[p]
|
||
|
|
provenance[field] = p
|
||
|
|
break
|
||
|
|
|
||
|
|
provenance_record = {
|
||
|
|
'voted': voted,
|
||
|
|
'provenance': provenance,
|
||
|
|
'sources': {
|
||
|
|
'filename': source_a,
|
||
|
|
'gemini': source_b,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return voted, provenance_record
|
||
|
|
|
||
|
|
|
||
|
|
def pre_flight(content_path, meta_path, db, config):
|
||
|
|
"""Process a .txt file dropped into acquired/text/.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
content_path: Path to the .txt file
|
||
|
|
meta_path: Path to .meta.json sidecar (may be None)
|
||
|
|
db: StatusDB instance
|
||
|
|
config: RECON config dict
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
dict with keys: hash, action, source_path, error
|
||
|
|
Actions: 'extracted', 'duplicate', 'skip_empty', 'error'
|
||
|
|
"""
|
||
|
|
result = {
|
||
|
|
'hash': None,
|
||
|
|
'action': 'error',
|
||
|
|
'source_path': content_path,
|
||
|
|
'error': None,
|
||
|
|
}
|
||
|
|
|
||
|
|
filename = os.path.basename(content_path)
|
||
|
|
|
||
|
|
# ── Step 1: Hash ──────────────────────────────────────────────
|
||
|
|
try:
|
||
|
|
file_hash = content_hash(content_path)
|
||
|
|
result['hash'] = file_hash
|
||
|
|
except Exception as e:
|
||
|
|
result['error'] = "Cannot hash text file: {}".format(e)
|
||
|
|
return result
|
||
|
|
|
||
|
|
# ── Step 2: Stale state cleanup ───────────────────────────────
|
||
|
|
processing_root = config.get('pipeline', {}).get(
|
||
|
|
'processing_root', '/opt/recon/data/processing'
|
||
|
|
)
|
||
|
|
proc_dir = os.path.join(processing_root, file_hash)
|
||
|
|
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
|
||
|
|
if os.path.exists(proc_dir):
|
||
|
|
try:
|
||
|
|
shutil.rmtree(proc_dir)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Stale cleanup failed for %s: %s", proc_dir, e)
|
||
|
|
raise
|
||
|
|
if os.path.exists(concepts_dir):
|
||
|
|
try:
|
||
|
|
shutil.rmtree(concepts_dir)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error("Stale cleanup failed for %s: %s", concepts_dir, e)
|
||
|
|
raise
|
||
|
|
|
||
|
|
# ── Step 3: Hash dedupe ───────────────────────────────────────
|
||
|
|
conn = db._get_conn()
|
||
|
|
existing = conn.execute(
|
||
|
|
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
|
||
|
|
).fetchone()
|
||
|
|
if existing:
|
||
|
|
logger.info("Duplicate hash %s, removing pair", file_hash[:8])
|
||
|
|
try:
|
||
|
|
os.remove(content_path)
|
||
|
|
if meta_path:
|
||
|
|
os.remove(meta_path)
|
||
|
|
except OSError as e:
|
||
|
|
logger.warning("Failed to remove duplicate pair: %s", e)
|
||
|
|
result['action'] = 'duplicate'
|
||
|
|
return result
|
||
|
|
|
||
|
|
# ── Step 4: Read text content ─────────────────────────────────
|
||
|
|
try:
|
||
|
|
with open(content_path, encoding='utf-8', errors='replace') as f:
|
||
|
|
raw_text = f.read()
|
||
|
|
except Exception as e:
|
||
|
|
result['error'] = "Cannot read text file: {}".format(e)
|
||
|
|
return result
|
||
|
|
|
||
|
|
if len(raw_text.strip()) < 50:
|
||
|
|
logger.info("Text file too short (%d chars), skipping: %s",
|
||
|
|
len(raw_text.strip()), filename)
|
||
|
|
try:
|
||
|
|
os.remove(content_path)
|
||
|
|
if meta_path:
|
||
|
|
os.remove(meta_path)
|
||
|
|
except OSError:
|
||
|
|
pass
|
||
|
|
result['action'] = 'skip_empty'
|
||
|
|
return result
|
||
|
|
|
||
|
|
# ── Step 5: Read optional sidecar ─────────────────────────────
|
||
|
|
sidecar = None
|
||
|
|
if meta_path:
|
||
|
|
try:
|
||
|
|
with open(meta_path, encoding='utf-8') as f:
|
||
|
|
sidecar = json.load(f)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("Cannot read sidecar %s: %s", meta_path, e)
|
||
|
|
|
||
|
|
# ── Step 6: Set up processing directory ───────────────────────
|
||
|
|
try:
|
||
|
|
os.makedirs(proc_dir, exist_ok=True)
|
||
|
|
except Exception as e:
|
||
|
|
result['error'] = "Cannot create processing dir: {}".format(e)
|
||
|
|
return result
|
||
|
|
|
||
|
|
# ── Step 7: Move files to processing ──────────────────────────
|
||
|
|
source_path = os.path.join(proc_dir, 'source.txt')
|
||
|
|
try:
|
||
|
|
shutil.move(content_path, source_path)
|
||
|
|
if meta_path:
|
||
|
|
shutil.move(meta_path, os.path.join(proc_dir, 'meta.json'))
|
||
|
|
except Exception as e:
|
||
|
|
result['error'] = "Cannot move files to processing: {}".format(e)
|
||
|
|
return result
|
||
|
|
|
||
|
|
# ── Step 8: Split into pages ──────────────────────────────────
|
||
|
|
pages = chunk_text(raw_text, WORDS_PER_PAGE)
|
||
|
|
for i, page_text in enumerate(pages, start=1):
|
||
|
|
page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i))
|
||
|
|
with open(page_path, 'w', encoding='utf-8') as f:
|
||
|
|
f.write(page_text)
|
||
|
|
|
||
|
|
# ── Step 9: Source A — filename metadata ──────────────────────
|
||
|
|
source_a = _extract_filename_metadata(filename)
|
||
|
|
|
||
|
|
# ── Step 10: Source B — Gemini metadata ───────────────────────
|
||
|
|
first_pages_text = "\n".join(pages[:3])
|
||
|
|
source_b = _extract_gemini_metadata(first_pages_text, config)
|
||
|
|
|
||
|
|
# ── Step 11: Vote ─────────────────────────────────────────────
|
||
|
|
voted, provenance_record = _vote_metadata(source_a, source_b)
|
||
|
|
|
||
|
|
# ── Step 12: Write meta.json ──────────────────────────────────
|
||
|
|
meta = {
|
||
|
|
'hash': file_hash,
|
||
|
|
'filename': filename,
|
||
|
|
'source_type': 'text',
|
||
|
|
'page_count': len(pages),
|
||
|
|
'text_length': len(raw_text),
|
||
|
|
'metadata': voted,
|
||
|
|
'metadata_provenance': provenance_record,
|
||
|
|
}
|
||
|
|
if sidecar:
|
||
|
|
meta['sidecar'] = sidecar
|
||
|
|
|
||
|
|
with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f:
|
||
|
|
json.dump(meta, f, indent=2)
|
||
|
|
|
||
|
|
# ── Step 13: Register in catalogue ────────────────────────────
|
||
|
|
display_title = voted.get('title') or clean_filename_to_title(filename)
|
||
|
|
size_bytes = os.path.getsize(source_path)
|
||
|
|
|
||
|
|
db.add_to_catalogue(
|
||
|
|
file_hash, filename, source_path, size_bytes, 'text', 'Document'
|
||
|
|
)
|
||
|
|
|
||
|
|
# ── Step 14: Queue and update documents row ───────────────────
|
||
|
|
db.queue_document(file_hash)
|
||
|
|
|
||
|
|
conn = db._get_conn()
|
||
|
|
conn.execute(
|
||
|
|
"UPDATE documents SET "
|
||
|
|
"text_dir = ?, page_count = ?, path = ?, "
|
||
|
|
"book_title = ?, book_author = ?, metadata_provenance = ? "
|
||
|
|
"WHERE hash = ?",
|
||
|
|
(
|
||
|
|
proc_dir,
|
||
|
|
len(pages),
|
||
|
|
source_path,
|
||
|
|
voted.get('title'),
|
||
|
|
voted.get('author'),
|
||
|
|
json.dumps(provenance_record),
|
||
|
|
file_hash,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
conn.commit()
|
||
|
|
|
||
|
|
# ── Step 15: Status = extracted ───────────────────────────────
|
||
|
|
db.update_status(file_hash, 'extracted', pages_extracted=len(pages))
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
"Text pre_flight complete: %s (%s) -> %d pages in %s",
|
||
|
|
file_hash[:8], display_title, len(pages), proc_dir,
|
||
|
|
)
|
||
|
|
|
||
|
|
result['action'] = 'extracted'
|
||
|
|
return result
|