recon/lib/processors/text_processor.py
Matt 62539861f2 Phase 6f: text processor for .txt file ingestion
New processor: lib/processors/text_processor.py
Handles plain text files (.txt) as primary source documents.

Pipeline: acquired/text/ -> dispatcher -> text_processor.pre_flight()
-> enrich -> embed -> filing worker -> library/Domain/Subdomain/

Metadata extraction via two-source vote:
- Source A: filename parsing (title from filename)
- Source B: Gemini LLM extraction (title/author/edition/year from
  first 3 pages of text)

Page splitting reuses chunk_text() from lib/web_scraper.py.
Filing behavior matches PDFs (files to library, not organized
in-place like transcripts).

Config: adds text: text_processor to pipeline.dispatch map.
New hopper subfolder: data/acquired/text/

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 22:39:31 +00:00

320 lines
11 KiB
Python

"""
RECON Text Processor
Handles pre_flight for plain .txt files arriving in acquired/text/.
These are primary source documents (books, manuals, guides), not derived
content like transcripts.
Metadata extraction via two-source vote:
A. Filename parsing (title, optionally author)
B. Gemini LLM extraction (title/author/edition/year from first 3 pages)
Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/
by the filing worker (NOT organized in-place like transcripts).
Phase 6f: initial implementation.
"""
import json
import logging
import os
import re
import shutil
from lib.web_scraper import chunk_text
from lib.utils import content_hash, clean_filename_to_title
from lib.processors.pdf_processor import _extract_gemini_metadata
logger = logging.getLogger("recon.processors.text")
WORDS_PER_PAGE = 2000
def _extract_filename_metadata(filename):
"""Source A: Extract metadata by parsing the filename.
Returns dict with keys: title, author, edition, year (any may be None).
"""
result = {'title': None, 'author': None, 'edition': None, 'year': None}
stem = os.path.splitext(filename)[0]
result['title'] = clean_filename_to_title(filename)
# Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY
year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem)
if year_match:
result['year'] = year_match.group(1)
# Edition: Nth Edition, Edition N, etc.
ed_match = re.search(
r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)',
stem, re.IGNORECASE
)
if ed_match:
result['edition'] = ed_match.group(0).strip()
# Author: "Title - Author" or "Title by Author" patterns
# Try " - Author" at end
dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem)
if dash_match:
result['author'] = dash_match.group(1).strip()
else:
by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem)
if by_match:
result['author'] = by_match.group(1).strip()
return result
def _vote_metadata(source_a, source_b):
"""Vote on each metadata field across two sources.
Priority: B (Gemini) > A (filename).
If both agree, note agreement.
Returns:
(voted_dict, provenance_record)
"""
sources = {'filename': source_a, 'gemini': source_b}
priority = ['gemini', 'filename']
voted = {}
provenance = {}
for field in ('title', 'author', 'edition', 'year'):
values = {}
for name, src in sources.items():
val = src.get(field)
if val and str(val).strip().lower() != 'null':
values[name] = val
if not values:
voted[field] = None
provenance[field] = None
continue
def _norm(v):
if field == 'year':
return v.strip()
return v.strip().lower()
norm_groups = {}
for name, val in values.items():
nv = _norm(val)
norm_groups.setdefault(nv, []).append((name, val))
best_group = max(norm_groups.values(), key=len)
if len(best_group) >= 2:
# Both sources agree
for p in priority:
for name, val in best_group:
if name == p:
voted[field] = val
names = ','.join(n for n, _ in best_group)
provenance[field] = "agreed({})".format(names)
break
if voted.get(field) is not None:
break
else:
# No agreement — highest priority wins
for p in priority:
if p in values:
voted[field] = values[p]
provenance[field] = p
break
provenance_record = {
'voted': voted,
'provenance': provenance,
'sources': {
'filename': source_a,
'gemini': source_b,
}
}
return voted, provenance_record
def pre_flight(content_path, meta_path, db, config):
"""Process a .txt file dropped into acquired/text/.
Args:
content_path: Path to the .txt file
meta_path: Path to .meta.json sidecar (may be None)
db: StatusDB instance
config: RECON config dict
Returns:
dict with keys: hash, action, source_path, error
Actions: 'extracted', 'duplicate', 'skip_empty', 'error'
"""
result = {
'hash': None,
'action': 'error',
'source_path': content_path,
'error': None,
}
filename = os.path.basename(content_path)
# ── Step 1: Hash ──────────────────────────────────────────────
try:
file_hash = content_hash(content_path)
result['hash'] = file_hash
except Exception as e:
result['error'] = "Cannot hash text file: {}".format(e)
return result
# ── Step 2: Stale state cleanup ───────────────────────────────
processing_root = config.get('pipeline', {}).get(
'processing_root', '/opt/recon/data/processing'
)
proc_dir = os.path.join(processing_root, file_hash)
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
if os.path.exists(proc_dir):
try:
shutil.rmtree(proc_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", proc_dir, e)
raise
if os.path.exists(concepts_dir):
try:
shutil.rmtree(concepts_dir)
except Exception as e:
logger.error("Stale cleanup failed for %s: %s", concepts_dir, e)
raise
# ── Step 3: Hash dedupe ───────────────────────────────────────
conn = db._get_conn()
existing = conn.execute(
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if existing:
logger.info("Duplicate hash %s, removing pair", file_hash[:8])
try:
os.remove(content_path)
if meta_path:
os.remove(meta_path)
except OSError as e:
logger.warning("Failed to remove duplicate pair: %s", e)
result['action'] = 'duplicate'
return result
# ── Step 4: Read text content ─────────────────────────────────
try:
with open(content_path, encoding='utf-8', errors='replace') as f:
raw_text = f.read()
except Exception as e:
result['error'] = "Cannot read text file: {}".format(e)
return result
if len(raw_text.strip()) < 50:
logger.info("Text file too short (%d chars), skipping: %s",
len(raw_text.strip()), filename)
try:
os.remove(content_path)
if meta_path:
os.remove(meta_path)
except OSError:
pass
result['action'] = 'skip_empty'
return result
# ── Step 5: Read optional sidecar ─────────────────────────────
sidecar = None
if meta_path:
try:
with open(meta_path, encoding='utf-8') as f:
sidecar = json.load(f)
except Exception as e:
logger.warning("Cannot read sidecar %s: %s", meta_path, e)
# ── Step 6: Set up processing directory ───────────────────────
try:
os.makedirs(proc_dir, exist_ok=True)
except Exception as e:
result['error'] = "Cannot create processing dir: {}".format(e)
return result
# ── Step 7: Move files to processing ──────────────────────────
source_path = os.path.join(proc_dir, 'source.txt')
try:
shutil.move(content_path, source_path)
if meta_path:
shutil.move(meta_path, os.path.join(proc_dir, 'meta.json'))
except Exception as e:
result['error'] = "Cannot move files to processing: {}".format(e)
return result
# ── Step 8: Split into pages ──────────────────────────────────
pages = chunk_text(raw_text, WORDS_PER_PAGE)
for i, page_text in enumerate(pages, start=1):
page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i))
with open(page_path, 'w', encoding='utf-8') as f:
f.write(page_text)
# ── Step 9: Source A — filename metadata ──────────────────────
source_a = _extract_filename_metadata(filename)
# ── Step 10: Source B — Gemini metadata ───────────────────────
first_pages_text = "\n".join(pages[:3])
source_b = _extract_gemini_metadata(first_pages_text, config)
# ── Step 11: Vote ─────────────────────────────────────────────
voted, provenance_record = _vote_metadata(source_a, source_b)
# ── Step 12: Write meta.json ──────────────────────────────────
meta = {
'hash': file_hash,
'filename': filename,
'source_type': 'text',
'page_count': len(pages),
'text_length': len(raw_text),
'metadata': voted,
'metadata_provenance': provenance_record,
}
if sidecar:
meta['sidecar'] = sidecar
with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
# ── Step 13: Register in catalogue ────────────────────────────
display_title = voted.get('title') or clean_filename_to_title(filename)
size_bytes = os.path.getsize(source_path)
db.add_to_catalogue(
file_hash, filename, source_path, size_bytes, 'text', 'Document'
)
# ── Step 14: Queue and update documents row ───────────────────
db.queue_document(file_hash)
conn = db._get_conn()
conn.execute(
"UPDATE documents SET "
"text_dir = ?, page_count = ?, path = ?, "
"book_title = ?, book_author = ?, metadata_provenance = ? "
"WHERE hash = ?",
(
proc_dir,
len(pages),
source_path,
voted.get('title'),
voted.get('author'),
json.dumps(provenance_record),
file_hash,
)
)
conn.commit()
# ── Step 15: Status = extracted ───────────────────────────────
db.update_status(file_hash, 'extracted', pages_extracted=len(pages))
logger.info(
"Text pre_flight complete: %s (%s) -> %d pages in %s",
file_hash[:8], display_title, len(pages), proc_dir,
)
result['action'] = 'extracted'
return result