diff --git a/config.yaml b/config.yaml index a9dcf16..3e185f8 100644 --- a/config.yaml +++ b/config.yaml @@ -437,5 +437,6 @@ pipeline: pdf: pdf_processor stream: transcript_processor html: html_processor + text: text_processor # mtime stability threshold for picking up files from acquired/ mtime_stability_seconds: 10 diff --git a/lib/processors/text_processor.py b/lib/processors/text_processor.py new file mode 100644 index 0000000..47e68fc --- /dev/null +++ b/lib/processors/text_processor.py @@ -0,0 +1,320 @@ +""" +RECON Text Processor + +Handles pre_flight for plain .txt files arriving in acquired/text/. +These are primary source documents (books, manuals, guides), not derived +content like transcripts. + +Metadata extraction via two-source vote: + A. Filename parsing (title, optionally author) + B. Gemini LLM extraction (title/author/edition/year from first 3 pages) + +Filing behavior matches PDFs: files get organized to library/Domain/Subdomain/ +by the filing worker (NOT organized in-place like transcripts). + +Phase 6f: initial implementation. +""" +import json +import logging +import os +import re +import shutil + +from lib.web_scraper import chunk_text +from lib.utils import content_hash, clean_filename_to_title +from lib.processors.pdf_processor import _extract_gemini_metadata + +logger = logging.getLogger("recon.processors.text") + +WORDS_PER_PAGE = 2000 + + +def _extract_filename_metadata(filename): + """Source A: Extract metadata by parsing the filename. + + Returns dict with keys: title, author, edition, year (any may be None). + """ + result = {'title': None, 'author': None, 'edition': None, 'year': None} + + stem = os.path.splitext(filename)[0] + + result['title'] = clean_filename_to_title(filename) + + # Year: look for (YYYY) or [YYYY] or _YYYY_ or standalone YYYY + year_match = re.search(r'[\(\[_\s]((?:19|20)\d{2})[\)\]_\s.]', stem) + if year_match: + result['year'] = year_match.group(1) + + # Edition: Nth Edition, Edition N, etc. + ed_match = re.search( + r'(\d+)(?:st|nd|rd|th)?\s*(?:edition|ed\.?)', + stem, re.IGNORECASE + ) + if ed_match: + result['edition'] = ed_match.group(0).strip() + + # Author: "Title - Author" or "Title by Author" patterns + # Try " - Author" at end + dash_match = re.search(r'\s+-\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', stem) + if dash_match: + result['author'] = dash_match.group(1).strip() + else: + by_match = re.search(r'\bby\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', stem) + if by_match: + result['author'] = by_match.group(1).strip() + + return result + + +def _vote_metadata(source_a, source_b): + """Vote on each metadata field across two sources. + + Priority: B (Gemini) > A (filename). + If both agree, note agreement. + + Returns: + (voted_dict, provenance_record) + """ + sources = {'filename': source_a, 'gemini': source_b} + priority = ['gemini', 'filename'] + voted = {} + provenance = {} + + for field in ('title', 'author', 'edition', 'year'): + values = {} + for name, src in sources.items(): + val = src.get(field) + if val and str(val).strip().lower() != 'null': + values[name] = val + + if not values: + voted[field] = None + provenance[field] = None + continue + + def _norm(v): + if field == 'year': + return v.strip() + return v.strip().lower() + + norm_groups = {} + for name, val in values.items(): + nv = _norm(val) + norm_groups.setdefault(nv, []).append((name, val)) + + best_group = max(norm_groups.values(), key=len) + + if len(best_group) >= 2: + # Both sources agree + for p in priority: + for name, val in best_group: + if name == p: + voted[field] = val + names = ','.join(n for n, _ in best_group) + provenance[field] = "agreed({})".format(names) + break + if voted.get(field) is not None: + break + else: + # No agreement — highest priority wins + for p in priority: + if p in values: + voted[field] = values[p] + provenance[field] = p + break + + provenance_record = { + 'voted': voted, + 'provenance': provenance, + 'sources': { + 'filename': source_a, + 'gemini': source_b, + } + } + + return voted, provenance_record + + +def pre_flight(content_path, meta_path, db, config): + """Process a .txt file dropped into acquired/text/. + + Args: + content_path: Path to the .txt file + meta_path: Path to .meta.json sidecar (may be None) + db: StatusDB instance + config: RECON config dict + + Returns: + dict with keys: hash, action, source_path, error + Actions: 'extracted', 'duplicate', 'skip_empty', 'error' + """ + result = { + 'hash': None, + 'action': 'error', + 'source_path': content_path, + 'error': None, + } + + filename = os.path.basename(content_path) + + # ── Step 1: Hash ────────────────────────────────────────────── + try: + file_hash = content_hash(content_path) + result['hash'] = file_hash + except Exception as e: + result['error'] = "Cannot hash text file: {}".format(e) + return result + + # ── Step 2: Stale state cleanup ─────────────────────────────── + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + proc_dir = os.path.join(processing_root, file_hash) + concepts_dir = os.path.join(config['paths']['concepts'], file_hash) + if os.path.exists(proc_dir): + try: + shutil.rmtree(proc_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", proc_dir, e) + raise + if os.path.exists(concepts_dir): + try: + shutil.rmtree(concepts_dir) + except Exception as e: + logger.error("Stale cleanup failed for %s: %s", concepts_dir, e) + raise + + # ── Step 3: Hash dedupe ─────────────────────────────────────── + conn = db._get_conn() + existing = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if existing: + logger.info("Duplicate hash %s, removing pair", file_hash[:8]) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError as e: + logger.warning("Failed to remove duplicate pair: %s", e) + result['action'] = 'duplicate' + return result + + # ── Step 4: Read text content ───────────────────────────────── + try: + with open(content_path, encoding='utf-8', errors='replace') as f: + raw_text = f.read() + except Exception as e: + result['error'] = "Cannot read text file: {}".format(e) + return result + + if len(raw_text.strip()) < 50: + logger.info("Text file too short (%d chars), skipping: %s", + len(raw_text.strip()), filename) + try: + os.remove(content_path) + if meta_path: + os.remove(meta_path) + except OSError: + pass + result['action'] = 'skip_empty' + return result + + # ── Step 5: Read optional sidecar ───────────────────────────── + sidecar = None + if meta_path: + try: + with open(meta_path, encoding='utf-8') as f: + sidecar = json.load(f) + except Exception as e: + logger.warning("Cannot read sidecar %s: %s", meta_path, e) + + # ── Step 6: Set up processing directory ─────────────────────── + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + result['error'] = "Cannot create processing dir: {}".format(e) + return result + + # ── Step 7: Move files to processing ────────────────────────── + source_path = os.path.join(proc_dir, 'source.txt') + try: + shutil.move(content_path, source_path) + if meta_path: + shutil.move(meta_path, os.path.join(proc_dir, 'meta.json')) + except Exception as e: + result['error'] = "Cannot move files to processing: {}".format(e) + return result + + # ── Step 8: Split into pages ────────────────────────────────── + pages = chunk_text(raw_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # ── Step 9: Source A — filename metadata ────────────────────── + source_a = _extract_filename_metadata(filename) + + # ── Step 10: Source B — Gemini metadata ─────────────────────── + first_pages_text = "\n".join(pages[:3]) + source_b = _extract_gemini_metadata(first_pages_text, config) + + # ── Step 11: Vote ───────────────────────────────────────────── + voted, provenance_record = _vote_metadata(source_a, source_b) + + # ── Step 12: Write meta.json ────────────────────────────────── + meta = { + 'hash': file_hash, + 'filename': filename, + 'source_type': 'text', + 'page_count': len(pages), + 'text_length': len(raw_text), + 'metadata': voted, + 'metadata_provenance': provenance_record, + } + if sidecar: + meta['sidecar'] = sidecar + + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # ── Step 13: Register in catalogue ──────────────────────────── + display_title = voted.get('title') or clean_filename_to_title(filename) + size_bytes = os.path.getsize(source_path) + + db.add_to_catalogue( + file_hash, filename, source_path, size_bytes, 'text', 'Document' + ) + + # ── Step 14: Queue and update documents row ─────────────────── + db.queue_document(file_hash) + + conn = db._get_conn() + conn.execute( + "UPDATE documents SET " + "text_dir = ?, page_count = ?, path = ?, " + "book_title = ?, book_author = ?, metadata_provenance = ? " + "WHERE hash = ?", + ( + proc_dir, + len(pages), + source_path, + voted.get('title'), + voted.get('author'), + json.dumps(provenance_record), + file_hash, + ) + ) + conn.commit() + + # ── Step 15: Status = extracted ─────────────────────────────── + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + logger.info( + "Text pre_flight complete: %s (%s) -> %d pages in %s", + file_hash[:8], display_title, len(pages), proc_dir, + ) + + result['action'] = 'extracted' + return result