mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 14:44:54 +02:00
Phase 6f: text processor for .txt file ingestion
New processor: lib/processors/text_processor.py Handles plain text files (.txt) as primary source documents. Pipeline: acquired/text/ -> dispatcher -> text_processor.pre_flight() -> enrich -> embed -> filing worker -> library/Domain/Subdomain/ Metadata extraction via two-source vote: - Source A: filename parsing (title from filename) - Source B: Gemini LLM extraction (title/author/edition/year from first 3 pages of text) Page splitting reuses chunk_text() from lib/web_scraper.py. Filing behavior matches PDFs (files to library, not organized in-place like transcripts). Config: adds text: text_processor to pipeline.dispatch map. New hopper subfolder: data/acquired/text/ Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
7fe7d03583
commit
62539861f2
2 changed files with 321 additions and 0 deletions
|
|
@ -437,5 +437,6 @@ pipeline:
|
|||
pdf: pdf_processor
|
||||
stream: transcript_processor
|
||||
html: html_processor
|
||||
text: text_processor
|
||||
# mtime stability threshold for picking up files from acquired/
|
||||
mtime_stability_seconds: 10
|
||||
|
|
|
|||
320
lib/processors/text_processor.py
Normal file
320
lib/processors/text_processor.py
Normal file
|
|
@ -0,0 +1,320 @@
|
|||
"""
|
||||
RECON Text Processor
|
||||
|
||||
Handles pre_flight for plain .txt files arriving in acquired/text/.
|
||||
These are primary source documents (books, manuals, guides), not derived
|
||||
content like transcripts.
|
||||
|
||||
Metadata extraction via two-source vote:
|
||||
A. Filename parsing (title, optionally author)
|
||||
B. Gemini LLM extraction (title/author/edition/year from first 3 pages)
|
||||
|
||||
Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/
|
||||
by the filing worker (NOT organized in-place like transcripts).
|
||||
|
||||
Phase 6f: initial implementation.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
||||
from lib.web_scraper import chunk_text
|
||||
from lib.utils import content_hash, clean_filename_to_title
|
||||
from lib.processors.pdf_processor import _extract_gemini_metadata
|
||||
|
||||
logger = logging.getLogger("recon.processors.text")
|
||||
|
||||
WORDS_PER_PAGE = 2000
|
||||
|
||||
|
||||
def _extract_filename_metadata(filename):
|
||||
"""Source A: Extract metadata by parsing the filename.
|
||||
|
||||
Returns dict with keys: title, author, edition, year (any may be None).
|
||||
"""
|
||||
result = {'title': None, 'author': None, 'edition': None, 'year': None}
|
||||
|
||||
stem = os.path.splitext(filename)[0]
|
||||
|
||||
result['title'] = clean_filename_to_title(filename)
|
||||
|
||||
# Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY
|
||||
year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem)
|
||||
if year_match:
|
||||
result['year'] = year_match.group(1)
|
||||
|
||||
# Edition: Nth Edition, Edition N, etc.
|
||||
ed_match = re.search(
|
||||
r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)',
|
||||
stem, re.IGNORECASE
|
||||
)
|
||||
if ed_match:
|
||||
result['edition'] = ed_match.group(0).strip()
|
||||
|
||||
# Author: "Title - Author" or "Title by Author" patterns
|
||||
# Try " - Author" at end
|
||||
dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem)
|
||||
if dash_match:
|
||||
result['author'] = dash_match.group(1).strip()
|
||||
else:
|
||||
by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem)
|
||||
if by_match:
|
||||
result['author'] = by_match.group(1).strip()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _vote_metadata(source_a, source_b):
|
||||
"""Vote on each metadata field across two sources.
|
||||
|
||||
Priority: B (Gemini) > A (filename).
|
||||
If both agree, note agreement.
|
||||
|
||||
Returns:
|
||||
(voted_dict, provenance_record)
|
||||
"""
|
||||
sources = {'filename': source_a, 'gemini': source_b}
|
||||
priority = ['gemini', 'filename']
|
||||
voted = {}
|
||||
provenance = {}
|
||||
|
||||
for field in ('title', 'author', 'edition', 'year'):
|
||||
values = {}
|
||||
for name, src in sources.items():
|
||||
val = src.get(field)
|
||||
if val and str(val).strip().lower() != 'null':
|
||||
values[name] = val
|
||||
|
||||
if not values:
|
||||
voted[field] = None
|
||||
provenance[field] = None
|
||||
continue
|
||||
|
||||
def _norm(v):
|
||||
if field == 'year':
|
||||
return v.strip()
|
||||
return v.strip().lower()
|
||||
|
||||
norm_groups = {}
|
||||
for name, val in values.items():
|
||||
nv = _norm(val)
|
||||
norm_groups.setdefault(nv, []).append((name, val))
|
||||
|
||||
best_group = max(norm_groups.values(), key=len)
|
||||
|
||||
if len(best_group) >= 2:
|
||||
# Both sources agree
|
||||
for p in priority:
|
||||
for name, val in best_group:
|
||||
if name == p:
|
||||
voted[field] = val
|
||||
names = ','.join(n for n, _ in best_group)
|
||||
provenance[field] = "agreed({})".format(names)
|
||||
break
|
||||
if voted.get(field) is not None:
|
||||
break
|
||||
else:
|
||||
# No agreement — highest priority wins
|
||||
for p in priority:
|
||||
if p in values:
|
||||
voted[field] = values[p]
|
||||
provenance[field] = p
|
||||
break
|
||||
|
||||
provenance_record = {
|
||||
'voted': voted,
|
||||
'provenance': provenance,
|
||||
'sources': {
|
||||
'filename': source_a,
|
||||
'gemini': source_b,
|
||||
}
|
||||
}
|
||||
|
||||
return voted, provenance_record
|
||||
|
||||
|
||||
def pre_flight(content_path, meta_path, db, config):
|
||||
"""Process a .txt file dropped into acquired/text/.
|
||||
|
||||
Args:
|
||||
content_path: Path to the .txt file
|
||||
meta_path: Path to .meta.json sidecar (may be None)
|
||||
db: StatusDB instance
|
||||
config: RECON config dict
|
||||
|
||||
Returns:
|
||||
dict with keys: hash, action, source_path, error
|
||||
Actions: 'extracted', 'duplicate', 'skip_empty', 'error'
|
||||
"""
|
||||
result = {
|
||||
'hash': None,
|
||||
'action': 'error',
|
||||
'source_path': content_path,
|
||||
'error': None,
|
||||
}
|
||||
|
||||
filename = os.path.basename(content_path)
|
||||
|
||||
# ── Step 1: Hash ──────────────────────────────────────────────
|
||||
try:
|
||||
file_hash = content_hash(content_path)
|
||||
result['hash'] = file_hash
|
||||
except Exception as e:
|
||||
result['error'] = "Cannot hash text file: {}".format(e)
|
||||
return result
|
||||
|
||||
# ── Step 2: Stale state cleanup ───────────────────────────────
|
||||
processing_root = config.get('pipeline', {}).get(
|
||||
'processing_root', '/opt/recon/data/processing'
|
||||
)
|
||||
proc_dir = os.path.join(processing_root, file_hash)
|
||||
concepts_dir = os.path.join(config['paths']['concepts'], file_hash)
|
||||
if os.path.exists(proc_dir):
|
||||
try:
|
||||
shutil.rmtree(proc_dir)
|
||||
except Exception as e:
|
||||
logger.error("Stale cleanup failed for %s: %s", proc_dir, e)
|
||||
raise
|
||||
if os.path.exists(concepts_dir):
|
||||
try:
|
||||
shutil.rmtree(concepts_dir)
|
||||
except Exception as e:
|
||||
logger.error("Stale cleanup failed for %s: %s", concepts_dir, e)
|
||||
raise
|
||||
|
||||
# ── Step 3: Hash dedupe ───────────────────────────────────────
|
||||
conn = db._get_conn()
|
||||
existing = conn.execute(
|
||||
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
|
||||
).fetchone()
|
||||
if existing:
|
||||
logger.info("Duplicate hash %s, removing pair", file_hash[:8])
|
||||
try:
|
||||
os.remove(content_path)
|
||||
if meta_path:
|
||||
os.remove(meta_path)
|
||||
except OSError as e:
|
||||
logger.warning("Failed to remove duplicate pair: %s", e)
|
||||
result['action'] = 'duplicate'
|
||||
return result
|
||||
|
||||
# ── Step 4: Read text content ─────────────────────────────────
|
||||
try:
|
||||
with open(content_path, encoding='utf-8', errors='replace') as f:
|
||||
raw_text = f.read()
|
||||
except Exception as e:
|
||||
result['error'] = "Cannot read text file: {}".format(e)
|
||||
return result
|
||||
|
||||
if len(raw_text.strip()) < 50:
|
||||
logger.info("Text file too short (%d chars), skipping: %s",
|
||||
len(raw_text.strip()), filename)
|
||||
try:
|
||||
os.remove(content_path)
|
||||
if meta_path:
|
||||
os.remove(meta_path)
|
||||
except OSError:
|
||||
pass
|
||||
result['action'] = 'skip_empty'
|
||||
return result
|
||||
|
||||
# ── Step 5: Read optional sidecar ─────────────────────────────
|
||||
sidecar = None
|
||||
if meta_path:
|
||||
try:
|
||||
with open(meta_path, encoding='utf-8') as f:
|
||||
sidecar = json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning("Cannot read sidecar %s: %s", meta_path, e)
|
||||
|
||||
# ── Step 6: Set up processing directory ───────────────────────
|
||||
try:
|
||||
os.makedirs(proc_dir, exist_ok=True)
|
||||
except Exception as e:
|
||||
result['error'] = "Cannot create processing dir: {}".format(e)
|
||||
return result
|
||||
|
||||
# ── Step 7: Move files to processing ──────────────────────────
|
||||
source_path = os.path.join(proc_dir, 'source.txt')
|
||||
try:
|
||||
shutil.move(content_path, source_path)
|
||||
if meta_path:
|
||||
shutil.move(meta_path, os.path.join(proc_dir, 'meta.json'))
|
||||
except Exception as e:
|
||||
result['error'] = "Cannot move files to processing: {}".format(e)
|
||||
return result
|
||||
|
||||
# ── Step 8: Split into pages ──────────────────────────────────
|
||||
pages = chunk_text(raw_text, WORDS_PER_PAGE)
|
||||
for i, page_text in enumerate(pages, start=1):
|
||||
page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i))
|
||||
with open(page_path, 'w', encoding='utf-8') as f:
|
||||
f.write(page_text)
|
||||
|
||||
# ── Step 9: Source A — filename metadata ──────────────────────
|
||||
source_a = _extract_filename_metadata(filename)
|
||||
|
||||
# ── Step 10: Source B — Gemini metadata ───────────────────────
|
||||
first_pages_text = "\n".join(pages[:3])
|
||||
source_b = _extract_gemini_metadata(first_pages_text, config)
|
||||
|
||||
# ── Step 11: Vote ─────────────────────────────────────────────
|
||||
voted, provenance_record = _vote_metadata(source_a, source_b)
|
||||
|
||||
# ── Step 12: Write meta.json ──────────────────────────────────
|
||||
meta = {
|
||||
'hash': file_hash,
|
||||
'filename': filename,
|
||||
'source_type': 'text',
|
||||
'page_count': len(pages),
|
||||
'text_length': len(raw_text),
|
||||
'metadata': voted,
|
||||
'metadata_provenance': provenance_record,
|
||||
}
|
||||
if sidecar:
|
||||
meta['sidecar'] = sidecar
|
||||
|
||||
with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f:
|
||||
json.dump(meta, f, indent=2)
|
||||
|
||||
# ── Step 13: Register in catalogue ────────────────────────────
|
||||
display_title = voted.get('title') or clean_filename_to_title(filename)
|
||||
size_bytes = os.path.getsize(source_path)
|
||||
|
||||
db.add_to_catalogue(
|
||||
file_hash, filename, source_path, size_bytes, 'text', 'Document'
|
||||
)
|
||||
|
||||
# ── Step 14: Queue and update documents row ───────────────────
|
||||
db.queue_document(file_hash)
|
||||
|
||||
conn = db._get_conn()
|
||||
conn.execute(
|
||||
"UPDATE documents SET "
|
||||
"text_dir = ?, page_count = ?, path = ?, "
|
||||
"book_title = ?, book_author = ?, metadata_provenance = ? "
|
||||
"WHERE hash = ?",
|
||||
(
|
||||
proc_dir,
|
||||
len(pages),
|
||||
source_path,
|
||||
voted.get('title'),
|
||||
voted.get('author'),
|
||||
json.dumps(provenance_record),
|
||||
file_hash,
|
||||
)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# ── Step 15: Status = extracted ───────────────────────────────
|
||||
db.update_status(file_hash, 'extracted', pages_extracted=len(pages))
|
||||
|
||||
logger.info(
|
||||
"Text pre_flight complete: %s (%s) -> %d pages in %s",
|
||||
file_hash[:8], display_title, len(pages), proc_dir,
|
||||
)
|
||||
|
||||
result['action'] = 'extracted'
|
||||
return result
|
||||
Loading…
Add table
Add a link
Reference in a new issue