""" RECON ZIM Processor Batch importer for ZIM files. Opens a ZIM via python-libzim, iterates HTML articles, strips to clean text, creates processing directories, and registers each article as "extracted" for the enricher to pick up. This is NOT a dispatcher-style processor (no pre_flight). ZIMs contain thousands of articles — ingestion is triggered explicitly or by the ZIM monitor. Usage: python3 -m lib.processors.zim_processor --zim-source-id 1 python3 -m lib.processors.zim_processor --zim-source-id 1 --limit 100 --batch-size 50 """ import argparse import hashlib import json import logging import os import re import sys import time from lxml import html as lxml_html sys.path.insert(0, "/opt/recon") from lib.utils import setup_logging, get_config from lib.status import StatusDB from lib.web_scraper import chunk_text logger = logging.getLogger("recon.processors.zim") WORDS_PER_PAGE = 2000 MIN_TEXT_LENGTH = 200 # Elements to strip before text extraction STRIP_TAGS = {'nav', 'footer', 'script', 'style', 'header', 'aside'} def _text_hash(text): """Compute MD5 hash of text content (matching content_hash style).""" return hashlib.md5(text.encode('utf-8')).hexdigest() def _html_to_text(html_bytes): """Convert HTML bytes to clean text via lxml. Strips nav, footer, script, style elements. Decodes entities. Normalizes whitespace. """ try: doc = lxml_html.fromstring(html_bytes) except Exception: return "" # Strip unwanted elements for tag in STRIP_TAGS: for el in doc.iter(tag): el.drop_tree() # Extract text text = doc.text_content() # Normalize whitespace: collapse runs of spaces, normalize newlines text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) text = text.strip() return text def ingest_zim(zim_source_id, db, config, stop_event=None, batch_size=100, batch_delay=1.0, limit=None): """Process all articles from a ZIM file registered in zim_sources. - Reads zim_path from zim_sources table - Iterates articles, creates processing dirs, registers in DB - Checkpoints progress via zim_sources.last_checkpoint - Respects stop_event for graceful shutdown - Yields after each batch to avoid monopolizing resources Args: zim_source_id: ID in zim_sources table db: StatusDB instance config: RECON config dict stop_event: threading.Event for graceful shutdown (optional) batch_size: articles per batch before sleeping batch_delay: seconds to sleep between batches limit: max articles to process (None = all) Returns: dict with counts: processed, skipped, duplicates, errors """ from libzim.reader import Archive conn = db._get_conn() # Load ZIM source record row = conn.execute( "SELECT * FROM zim_sources WHERE id = ?", (zim_source_id,) ).fetchone() if not row: logger.error("ZIM source ID %d not found", zim_source_id) return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} zim_source = dict(row) zim_path = zim_source['zim_path'] zim_filename = zim_source['zim_filename'] zim_title = zim_source.get('title') or zim_filename if not os.path.isfile(zim_path): logger.error("ZIM file not found: %s", zim_path) return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} logger.info("Opening ZIM: %s (%s)", zim_title, zim_filename) zim = Archive(zim_path) total_entries = zim.entry_count # Read checkpoint to resume from last_checkpoint = zim_source.get('last_checkpoint') start_idx = 0 if last_checkpoint: try: start_idx = int(last_checkpoint) logger.info("Resuming from checkpoint: entry %d", start_idx) except ValueError: logger.warning("Invalid checkpoint value: %s, starting from 0", last_checkpoint) # Update status to ingesting conn.execute( "UPDATE zim_sources SET status = 'ingesting', started_at = CURRENT_TIMESTAMP WHERE id = ?", (zim_source_id,) ) conn.commit() processing_root = config.get('pipeline', {}).get( 'processing_root', '/opt/recon/data/processing' ) # Get already-processed article paths for this ZIM source (dedup within ZIM) existing_paths = set() for r in conn.execute( "SELECT article_path FROM zim_articles WHERE zim_source_id = ?", (zim_source_id,) ).fetchall(): existing_paths.add(r['article_path']) stats = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} # Track what was already flushed to DB to avoid double-counting flushed = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} batch_count = 0 total_processed_this_run = 0 last_entry_idx = start_idx for entry_idx in range(start_idx, total_entries): if stop_event and stop_event.is_set(): logger.info("Stop event set, halting ZIM ingest at entry %d", entry_idx) break if limit and total_processed_this_run >= limit: logger.info("Reached limit of %d articles", limit) break last_entry_idx = entry_idx try: entry = zim._get_entry_by_id(entry_idx) except Exception: continue # Skip redirects if entry.is_redirect: continue try: item = entry.get_item() except Exception: continue # Skip non-HTML if item.mimetype != "text/html": continue article_path = entry.path article_title = entry.title # Skip if already processed in a prior run if article_path in existing_paths: continue # Extract and clean text try: html_bytes = bytes(item.content) clean_text = _html_to_text(html_bytes) except Exception as e: logger.debug("HTML extraction failed for %s: %s", article_path, e) stats['errors'] += 1 continue # Skip stubs if len(clean_text) < MIN_TEXT_LENGTH: stats['skipped'] += 1 continue # Compute content hash file_hash = _text_hash(clean_text) # Deduplicate against existing catalogue cat_row = conn.execute( "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) ).fetchone() if cat_row: # Record in zim_articles as skipped duplicate conn.execute( """INSERT OR IGNORE INTO zim_articles (zim_source_id, article_path, article_title, status, processed_at) VALUES (?, ?, ?, 'skipped', CURRENT_TIMESTAMP)""", (zim_source_id, article_path, article_title) ) stats['duplicates'] += 1 total_processed_this_run += 1 continue # Create processing directory proc_dir = os.path.join(processing_root, file_hash) try: os.makedirs(proc_dir, exist_ok=True) except Exception as e: logger.error("Cannot create processing dir %s: %s", proc_dir, e) stats['errors'] += 1 continue # Split into page files pages = chunk_text(clean_text, WORDS_PER_PAGE) for i, page_text in enumerate(pages, start=1): page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) with open(page_path, 'w', encoding='utf-8') as f: f.write(page_text) # Write meta.json meta = { 'hash': file_hash, 'filename': article_title + '.html', 'source_type': 'zim', 'zim_file': zim_filename, 'zim_source_id': zim_source_id, 'article_title': article_title, 'article_path': article_path, 'page_count': len(pages), 'text_length': len(clean_text), } with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: json.dump(meta, f, indent=2) # Register in catalogue db.add_to_catalogue( file_hash, article_title + '.html', zim_path, # source path is the ZIM file len(clean_text), # size in bytes (text) 'kiwix', # source zim_title, # category = ZIM title ) # Queue document db.queue_document(file_hash) # Set text_dir, page_count, book_title on documents row # Mark organized_at immediately (ZIM articles don't get filed to library) conn.execute( "UPDATE documents SET text_dir = ?, page_count = ?, " "book_title = ?, organized_at = CURRENT_TIMESTAMP " "WHERE hash = ?", (proc_dir, len(pages), article_title, file_hash) ) # Update status to extracted db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) # Record in zim_articles conn.execute( """INSERT OR IGNORE INTO zim_articles (zim_source_id, article_path, article_title, status, processed_at) VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)""", (zim_source_id, article_path, article_title) ) conn.commit() stats['processed'] += 1 total_processed_this_run += 1 batch_count += 1 # Progress logging total_done = zim_source['processed_count'] + stats['processed'] article_count = zim_source.get('article_count', 0) if stats['processed'] % 500 == 0 and article_count > 0: pct = total_done / article_count * 100 logger.info( "ZIM ingest [%s]: %s/%s (%.1f%%)", zim_title, f"{total_done:,}", f"{article_count:,}", pct ) # Batch checkpoint — flush only the delta since last flush if batch_count >= batch_size: delta_p = stats['processed'] - flushed['processed'] delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) delta_e = stats['errors'] - flushed['errors'] conn.execute( "UPDATE zim_sources SET processed_count = processed_count + ?, " "skipped_count = skipped_count + ?, error_count = error_count + ?, " "last_checkpoint = ? WHERE id = ?", (delta_p, delta_s, delta_e, str(entry_idx + 1), zim_source_id) ) conn.commit() flushed['processed'] = stats['processed'] flushed['skipped'] = stats['skipped'] flushed['duplicates'] = stats['duplicates'] flushed['errors'] = stats['errors'] batch_count = 0 if batch_delay > 0: time.sleep(batch_delay) # Final checkpoint — flush only the unflushed delta final_status = 'complete' if limit and total_processed_this_run >= limit: final_status = 'ingesting' # not done yet, just hit the limit delta_p = stats['processed'] - flushed['processed'] delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) delta_e = stats['errors'] - flushed['errors'] conn.execute( "UPDATE zim_sources SET processed_count = processed_count + ?, " "skipped_count = skipped_count + ?, error_count = error_count + ?, " "last_checkpoint = ?, status = ?, completed_at = CASE WHEN ? = 'complete' THEN CURRENT_TIMESTAMP ELSE completed_at END " "WHERE id = ?", (delta_p, delta_s, delta_e, str(last_entry_idx + 1), final_status, final_status, zim_source_id) ) conn.commit() logger.info( "ZIM ingest [%s] %s: %d processed, %d skipped, %d duplicates, %d errors", zim_title, final_status, stats['processed'], stats['skipped'], stats['duplicates'], stats['errors'] ) return stats def main(): """CLI entry point for standalone ZIM processing.""" parser = argparse.ArgumentParser(description="RECON ZIM Processor") parser.add_argument('--zim-source-id', type=int, required=True, help="ID from zim_sources table") parser.add_argument('--batch-size', type=int, default=100, help="Articles per batch (default: 100)") parser.add_argument('--batch-delay', type=float, default=1.0, help="Seconds between batches (default: 1.0)") parser.add_argument('--limit', type=int, default=None, help="Max articles to process (default: all)") args = parser.parse_args() setup_logging('recon.processors.zim') config = get_config() db = StatusDB(config['paths']['db']) stats = ingest_zim( zim_source_id=args.zim_source_id, db=db, config=config, batch_size=args.batch_size, batch_delay=args.batch_delay, limit=args.limit, ) print(f"\nResults: {stats['processed']} processed, {stats['skipped']} skipped, " f"{stats['duplicates']} duplicates, {stats['errors']} errors") if __name__ == "__main__": main()