mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 06:34:40 +02:00
Phase 2: ZIM processor — batch article ingestion pipeline
Adds lib/processors/zim_processor.py which opens a ZIM file via python-libzim, iterates HTML articles, strips to clean text (lxml), and feeds each article into the existing RECON enrichment pipeline. Key features: - HTML to text via lxml (strips nav/footer/script/style) - Filters redirects, non-HTML entries, stubs (<200 chars) - Content hash dedup against existing catalogue - Creates processing dirs with page files and meta.json - Registers articles as "extracted" for automatic enrichment - Checkpointing via zim_sources.last_checkpoint for resume - Configurable batch size and delay for rate control - Standalone CLI: python3 -m lib.processors.zim_processor Tested: 100 Appropedia articles processed in 3s, enricher picks them up automatically via the existing pipeline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
7c1af0f063
commit
c60aa5e80d
1 changed files with 387 additions and 0 deletions
387
lib/processors/zim_processor.py
Normal file
387
lib/processors/zim_processor.py
Normal file
|
|
@ -0,0 +1,387 @@
|
|||
"""
|
||||
RECON ZIM Processor
|
||||
|
||||
Batch importer for ZIM files. Opens a ZIM via python-libzim, iterates
|
||||
HTML articles, strips to clean text, creates processing directories,
|
||||
and registers each article as "extracted" for the enricher to pick up.
|
||||
|
||||
This is NOT a dispatcher-style processor (no pre_flight). ZIMs contain
|
||||
thousands of articles — ingestion is triggered explicitly or by the
|
||||
ZIM monitor.
|
||||
|
||||
Usage:
|
||||
python3 -m lib.processors.zim_processor --zim-source-id 1
|
||||
python3 -m lib.processors.zim_processor --zim-source-id 1 --limit 100 --batch-size 50
|
||||
"""
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
from lxml import html as lxml_html
|
||||
|
||||
sys.path.insert(0, "/opt/recon")
|
||||
|
||||
from lib.utils import setup_logging, get_config
|
||||
from lib.status import StatusDB
|
||||
from lib.web_scraper import chunk_text
|
||||
|
||||
logger = logging.getLogger("recon.processors.zim")
|
||||
|
||||
WORDS_PER_PAGE = 2000
|
||||
MIN_TEXT_LENGTH = 200
|
||||
|
||||
# Elements to strip before text extraction
|
||||
STRIP_TAGS = {'nav', 'footer', 'script', 'style', 'header', 'aside'}
|
||||
|
||||
|
||||
def _text_hash(text):
|
||||
"""Compute MD5 hash of text content (matching content_hash style)."""
|
||||
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def _html_to_text(html_bytes):
|
||||
"""Convert HTML bytes to clean text via lxml.
|
||||
|
||||
Strips nav, footer, script, style elements. Decodes entities.
|
||||
Normalizes whitespace.
|
||||
"""
|
||||
try:
|
||||
doc = lxml_html.fromstring(html_bytes)
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
# Strip unwanted elements
|
||||
for tag in STRIP_TAGS:
|
||||
for el in doc.iter(tag):
|
||||
el.drop_tree()
|
||||
|
||||
# Extract text
|
||||
text = doc.text_content()
|
||||
|
||||
# Normalize whitespace: collapse runs of spaces, normalize newlines
|
||||
text = re.sub(r'[ \t]+', ' ', text)
|
||||
text = re.sub(r'\n{3,}', '\n\n', text)
|
||||
text = text.strip()
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def ingest_zim(zim_source_id, db, config, stop_event=None,
|
||||
batch_size=100, batch_delay=1.0, limit=None):
|
||||
"""Process all articles from a ZIM file registered in zim_sources.
|
||||
|
||||
- Reads zim_path from zim_sources table
|
||||
- Iterates articles, creates processing dirs, registers in DB
|
||||
- Checkpoints progress via zim_sources.last_checkpoint
|
||||
- Respects stop_event for graceful shutdown
|
||||
- Yields after each batch to avoid monopolizing resources
|
||||
|
||||
Args:
|
||||
zim_source_id: ID in zim_sources table
|
||||
db: StatusDB instance
|
||||
config: RECON config dict
|
||||
stop_event: threading.Event for graceful shutdown (optional)
|
||||
batch_size: articles per batch before sleeping
|
||||
batch_delay: seconds to sleep between batches
|
||||
limit: max articles to process (None = all)
|
||||
|
||||
Returns:
|
||||
dict with counts: processed, skipped, duplicates, errors
|
||||
"""
|
||||
from libzim.reader import Archive
|
||||
|
||||
conn = db._get_conn()
|
||||
|
||||
# Load ZIM source record
|
||||
row = conn.execute(
|
||||
"SELECT * FROM zim_sources WHERE id = ?", (zim_source_id,)
|
||||
).fetchone()
|
||||
if not row:
|
||||
logger.error("ZIM source ID %d not found", zim_source_id)
|
||||
return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0}
|
||||
|
||||
zim_source = dict(row)
|
||||
zim_path = zim_source['zim_path']
|
||||
zim_filename = zim_source['zim_filename']
|
||||
zim_title = zim_source.get('title') or zim_filename
|
||||
|
||||
if not os.path.isfile(zim_path):
|
||||
logger.error("ZIM file not found: %s", zim_path)
|
||||
return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0}
|
||||
|
||||
logger.info("Opening ZIM: %s (%s)", zim_title, zim_filename)
|
||||
zim = Archive(zim_path)
|
||||
total_entries = zim.entry_count
|
||||
|
||||
# Read checkpoint to resume from
|
||||
last_checkpoint = zim_source.get('last_checkpoint')
|
||||
start_idx = 0
|
||||
if last_checkpoint:
|
||||
try:
|
||||
start_idx = int(last_checkpoint)
|
||||
logger.info("Resuming from checkpoint: entry %d", start_idx)
|
||||
except ValueError:
|
||||
logger.warning("Invalid checkpoint value: %s, starting from 0", last_checkpoint)
|
||||
|
||||
# Update status to ingesting
|
||||
conn.execute(
|
||||
"UPDATE zim_sources SET status = 'ingesting', started_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||||
(zim_source_id,)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
processing_root = config.get('pipeline', {}).get(
|
||||
'processing_root', '/opt/recon/data/processing'
|
||||
)
|
||||
|
||||
# Get already-processed article paths for this ZIM source (dedup within ZIM)
|
||||
existing_paths = set()
|
||||
for r in conn.execute(
|
||||
"SELECT article_path FROM zim_articles WHERE zim_source_id = ?",
|
||||
(zim_source_id,)
|
||||
).fetchall():
|
||||
existing_paths.add(r['article_path'])
|
||||
|
||||
stats = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0}
|
||||
# Track what was already flushed to DB to avoid double-counting
|
||||
flushed = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0}
|
||||
batch_count = 0
|
||||
total_processed_this_run = 0
|
||||
last_entry_idx = start_idx
|
||||
|
||||
for entry_idx in range(start_idx, total_entries):
|
||||
if stop_event and stop_event.is_set():
|
||||
logger.info("Stop event set, halting ZIM ingest at entry %d", entry_idx)
|
||||
break
|
||||
|
||||
if limit and total_processed_this_run >= limit:
|
||||
logger.info("Reached limit of %d articles", limit)
|
||||
break
|
||||
|
||||
last_entry_idx = entry_idx
|
||||
|
||||
try:
|
||||
entry = zim._get_entry_by_id(entry_idx)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Skip redirects
|
||||
if entry.is_redirect:
|
||||
continue
|
||||
|
||||
try:
|
||||
item = entry.get_item()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Skip non-HTML
|
||||
if item.mimetype != "text/html":
|
||||
continue
|
||||
|
||||
article_path = entry.path
|
||||
article_title = entry.title
|
||||
|
||||
# Skip if already processed in a prior run
|
||||
if article_path in existing_paths:
|
||||
continue
|
||||
|
||||
# Extract and clean text
|
||||
try:
|
||||
html_bytes = bytes(item.content)
|
||||
clean_text = _html_to_text(html_bytes)
|
||||
except Exception as e:
|
||||
logger.debug("HTML extraction failed for %s: %s", article_path, e)
|
||||
stats['errors'] += 1
|
||||
continue
|
||||
|
||||
# Skip stubs
|
||||
if len(clean_text) < MIN_TEXT_LENGTH:
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Compute content hash
|
||||
file_hash = _text_hash(clean_text)
|
||||
|
||||
# Deduplicate against existing catalogue
|
||||
cat_row = conn.execute(
|
||||
"SELECT hash FROM catalogue WHERE hash = ?", (file_hash,)
|
||||
).fetchone()
|
||||
if cat_row:
|
||||
# Record in zim_articles as skipped duplicate
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO zim_articles
|
||||
(zim_source_id, article_path, article_title, status, processed_at)
|
||||
VALUES (?, ?, ?, 'skipped', CURRENT_TIMESTAMP)""",
|
||||
(zim_source_id, article_path, article_title)
|
||||
)
|
||||
stats['duplicates'] += 1
|
||||
total_processed_this_run += 1
|
||||
continue
|
||||
|
||||
# Create processing directory
|
||||
proc_dir = os.path.join(processing_root, file_hash)
|
||||
try:
|
||||
os.makedirs(proc_dir, exist_ok=True)
|
||||
except Exception as e:
|
||||
logger.error("Cannot create processing dir %s: %s", proc_dir, e)
|
||||
stats['errors'] += 1
|
||||
continue
|
||||
|
||||
# Split into page files
|
||||
pages = chunk_text(clean_text, WORDS_PER_PAGE)
|
||||
for i, page_text in enumerate(pages, start=1):
|
||||
page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i))
|
||||
with open(page_path, 'w', encoding='utf-8') as f:
|
||||
f.write(page_text)
|
||||
|
||||
# Write meta.json
|
||||
meta = {
|
||||
'hash': file_hash,
|
||||
'filename': article_title + '.html',
|
||||
'source_type': 'zim',
|
||||
'zim_file': zim_filename,
|
||||
'zim_source_id': zim_source_id,
|
||||
'article_title': article_title,
|
||||
'article_path': article_path,
|
||||
'page_count': len(pages),
|
||||
'text_length': len(clean_text),
|
||||
}
|
||||
with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f:
|
||||
json.dump(meta, f, indent=2)
|
||||
|
||||
# Register in catalogue
|
||||
db.add_to_catalogue(
|
||||
file_hash,
|
||||
article_title + '.html',
|
||||
zim_path, # source path is the ZIM file
|
||||
len(clean_text), # size in bytes (text)
|
||||
'kiwix', # source
|
||||
zim_title, # category = ZIM title
|
||||
)
|
||||
|
||||
# Queue document
|
||||
db.queue_document(file_hash)
|
||||
|
||||
# Set text_dir, page_count, book_title on documents row
|
||||
# Mark organized_at immediately (ZIM articles don't get filed to library)
|
||||
conn.execute(
|
||||
"UPDATE documents SET text_dir = ?, page_count = ?, "
|
||||
"book_title = ?, organized_at = CURRENT_TIMESTAMP "
|
||||
"WHERE hash = ?",
|
||||
(proc_dir, len(pages), article_title, file_hash)
|
||||
)
|
||||
|
||||
# Update status to extracted
|
||||
db.update_status(file_hash, 'extracted', pages_extracted=len(pages))
|
||||
|
||||
# Record in zim_articles
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO zim_articles
|
||||
(zim_source_id, article_path, article_title, status, processed_at)
|
||||
VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)""",
|
||||
(zim_source_id, article_path, article_title)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
stats['processed'] += 1
|
||||
total_processed_this_run += 1
|
||||
batch_count += 1
|
||||
|
||||
# Progress logging
|
||||
total_done = zim_source['processed_count'] + stats['processed']
|
||||
article_count = zim_source.get('article_count', 0)
|
||||
if stats['processed'] % 500 == 0 and article_count > 0:
|
||||
pct = total_done / article_count * 100
|
||||
logger.info(
|
||||
"ZIM ingest [%s]: %s/%s (%.1f%%)",
|
||||
zim_title, f"{total_done:,}", f"{article_count:,}", pct
|
||||
)
|
||||
|
||||
# Batch checkpoint — flush only the delta since last flush
|
||||
if batch_count >= batch_size:
|
||||
delta_p = stats['processed'] - flushed['processed']
|
||||
delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates'])
|
||||
delta_e = stats['errors'] - flushed['errors']
|
||||
conn.execute(
|
||||
"UPDATE zim_sources SET processed_count = processed_count + ?, "
|
||||
"skipped_count = skipped_count + ?, error_count = error_count + ?, "
|
||||
"last_checkpoint = ? WHERE id = ?",
|
||||
(delta_p, delta_s, delta_e, str(entry_idx + 1), zim_source_id)
|
||||
)
|
||||
conn.commit()
|
||||
flushed['processed'] = stats['processed']
|
||||
flushed['skipped'] = stats['skipped']
|
||||
flushed['duplicates'] = stats['duplicates']
|
||||
flushed['errors'] = stats['errors']
|
||||
|
||||
batch_count = 0
|
||||
|
||||
if batch_delay > 0:
|
||||
time.sleep(batch_delay)
|
||||
|
||||
# Final checkpoint — flush only the unflushed delta
|
||||
final_status = 'complete'
|
||||
if limit and total_processed_this_run >= limit:
|
||||
final_status = 'ingesting' # not done yet, just hit the limit
|
||||
|
||||
delta_p = stats['processed'] - flushed['processed']
|
||||
delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates'])
|
||||
delta_e = stats['errors'] - flushed['errors']
|
||||
|
||||
conn.execute(
|
||||
"UPDATE zim_sources SET processed_count = processed_count + ?, "
|
||||
"skipped_count = skipped_count + ?, error_count = error_count + ?, "
|
||||
"last_checkpoint = ?, status = ?, completed_at = CASE WHEN ? = 'complete' THEN CURRENT_TIMESTAMP ELSE completed_at END "
|
||||
"WHERE id = ?",
|
||||
(delta_p, delta_s, delta_e, str(last_entry_idx + 1),
|
||||
final_status, final_status, zim_source_id)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
logger.info(
|
||||
"ZIM ingest [%s] %s: %d processed, %d skipped, %d duplicates, %d errors",
|
||||
zim_title, final_status,
|
||||
stats['processed'], stats['skipped'], stats['duplicates'], stats['errors']
|
||||
)
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def main():
|
||||
"""CLI entry point for standalone ZIM processing."""
|
||||
parser = argparse.ArgumentParser(description="RECON ZIM Processor")
|
||||
parser.add_argument('--zim-source-id', type=int, required=True,
|
||||
help="ID from zim_sources table")
|
||||
parser.add_argument('--batch-size', type=int, default=100,
|
||||
help="Articles per batch (default: 100)")
|
||||
parser.add_argument('--batch-delay', type=float, default=1.0,
|
||||
help="Seconds between batches (default: 1.0)")
|
||||
parser.add_argument('--limit', type=int, default=None,
|
||||
help="Max articles to process (default: all)")
|
||||
args = parser.parse_args()
|
||||
|
||||
setup_logging('recon.processors.zim')
|
||||
|
||||
config = get_config()
|
||||
db = StatusDB(config['paths']['db'])
|
||||
|
||||
stats = ingest_zim(
|
||||
zim_source_id=args.zim_source_id,
|
||||
db=db,
|
||||
config=config,
|
||||
batch_size=args.batch_size,
|
||||
batch_delay=args.batch_delay,
|
||||
limit=args.limit,
|
||||
)
|
||||
|
||||
print(f"\nResults: {stats['processed']} processed, {stats['skipped']} skipped, "
|
||||
f"{stats['duplicates']} duplicates, {stats['errors']} errors")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue