From 7c1af0f06317b46571f61ebe2e95d196e6a0ea51 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 16 Apr 2026 23:39:34 +0000 Subject: [PATCH 1/8] =?UTF-8?q?Phase=201:=20Kiwix=20foundation=20=E2=80=94?= =?UTF-8?q?=20ZIM=20monitor=20and=20kiwix-serve=20setup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add lib/zim_monitor.py: polls kiwix-serve OPDS v2 catalog, detects new ZIMs, reads accurate article count from python-libzim Counter metadata (not inflated OPDS count), inserts into zim_sources table. Idempotent on re-run, marks removed ZIMs. - DB schema: zim_sources, zim_samples, zim_articles tables (created via sqlite3, not in migrations — matches existing RECON pattern) - kiwix-tools 3.7.0 installed from binary tarball at /opt/recon/bin/ (Ubuntu 24.04 apt ships 3.5.0 which lacks OPDS v2) - kiwix.service systemd unit on port 8430 - python-libzim 3.9.0 installed - Test ZIM: Appropedia EN maxi (496 MB, 19,445 articles) - Add bin/ to .gitignore (binary tarball, not source) --- .gitignore | 3 + lib/zim_monitor.py | 217 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 lib/zim_monitor.py diff --git a/.gitignore b/.gitignore index 238cabb..3fb01ef 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,6 @@ recon.db # OS .DS_Store + +# Kiwix binary tools (installed from tarball) +bin/ diff --git a/lib/zim_monitor.py b/lib/zim_monitor.py new file mode 100644 index 0000000..248fc0f --- /dev/null +++ b/lib/zim_monitor.py @@ -0,0 +1,217 @@ +""" +ZIM Monitor — detects ZIMs loaded in kiwix-serve and tracks them in recon.db. + +Polls the kiwix-serve OPDS v2 catalog, compares against the zim_sources table, +and for new ZIMs reads accurate metadata via python-libzim's Counter field. + +Standalone: python3 /opt/recon/lib/zim_monitor.py +As module: from lib.zim_monitor import scan_zims +""" +import logging +import os +import sqlite3 +import sys +import urllib.request +from xml.etree import ElementTree as ET + +sys.path.insert(0, "/opt/recon") +from lib.utils import setup_logging + +try: + from libzim.reader import Archive + HAVE_LIBZIM = True +except ImportError: + HAVE_LIBZIM = False + +OPDS_URL = "http://localhost:8430/catalog/v2/entries?count=-1" +ZIM_DIR = "/mnt/kiwix" +DB_PATH = "/opt/recon/data/recon.db" + +ATOM_NS = "http://www.w3.org/2005/Atom" + +logger = logging.getLogger("recon.zim_monitor") + + +def _text(element, tag, ns=ATOM_NS): + """Get text content of a child element, or None.""" + child = element.find(f"{{{ns}}}{tag}") + if child is not None and child.text: + return child.text.strip() + return None + + +def parse_counter(counter_str): + """Parse ZIM Counter metadata into {mimetype: count}.""" + result = {} + for pair in counter_str.split(";"): + if "=" in pair: + mime, count = pair.split("=", 1) + try: + result[mime.strip()] = int(count.strip()) + except ValueError: + pass + return result + + +def fetch_opds(): + """Fetch OPDS v2 catalog from kiwix-serve. Returns list of dicts.""" + try: + with urllib.request.urlopen(OPDS_URL, timeout=10) as resp: + data = resp.read() + except Exception as e: + logger.error("Failed to fetch OPDS catalog: %s", e) + return [] + + root = ET.fromstring(data) + entries = [] + for entry in root.findall(f"{{{ATOM_NS}}}entry"): + uuid_raw = _text(entry, "id") + uuid = uuid_raw.replace("urn:uuid:", "") if uuid_raw else None + + # Derive ZIM filename from the content link href + zim_filename = None + for link in entry.findall(f"{{{ATOM_NS}}}link"): + if link.get("type") == "text/html": + href = link.get("href", "") + # href looks like /content/appropedia_en_all_maxi_2025-11 + name = href.rsplit("/", 1)[-1] if "/" in href else href + if name: + zim_filename = name + ".zim" + break + + entries.append({ + "uuid": uuid, + "title": _text(entry, "title"), + "name": _text(entry, "name"), + "flavour": _text(entry, "flavour"), + "language": _text(entry, "language"), + "category": _text(entry, "category") or None, + "summary": _text(entry, "summary"), + "article_count_opds": int(_text(entry, "articleCount") or 0), + "zim_filename": zim_filename, + }) + return entries + + +def get_libzim_metadata(zim_path): + """Open a ZIM file and read accurate metadata via python-libzim.""" + if not HAVE_LIBZIM: + logger.warning("python-libzim not available, skipping metadata read") + return {} + + zim = Archive(zim_path) + meta = {} + + def _get_meta(key): + try: + return zim.get_metadata(key).decode("utf-8", errors="replace") + except RuntimeError: + return None + + meta["title"] = _get_meta("Title") + meta["description"] = _get_meta("Description") + meta["language"] = _get_meta("Language") + meta["tags"] = _get_meta("Tags") + + counter_str = _get_meta("Counter") + if counter_str: + counts = parse_counter(counter_str) + meta["article_count"] = counts.get("text/html", 0) + meta["counter_raw"] = counter_str + else: + meta["article_count"] = 0 + meta["counter_raw"] = None + + return meta + + +def scan_zims(): + """Compare OPDS catalog against zim_sources table. Insert/update as needed.""" + logger.info("Scanning kiwix-serve OPDS catalog...") + opds_entries = fetch_opds() + if not opds_entries: + logger.info("No entries in OPDS catalog (or fetch failed)") + return + + logger.info("OPDS returned %d entries", len(opds_entries)) + + con = sqlite3.connect(DB_PATH) + con.row_factory = sqlite3.Row + + # Get existing zim_sources keyed by filename + existing = {} + for row in con.execute("SELECT id, zim_filename, status FROM zim_sources"): + existing[row["zim_filename"]] = dict(row) + + opds_filenames = set() + new_count = 0 + + for entry in opds_entries: + filename = entry["zim_filename"] + if not filename: + logger.warning("Skipping OPDS entry with no derivable filename: %s", entry) + continue + + opds_filenames.add(filename) + + if filename in existing: + logger.debug("Already tracked: %s (status=%s)", filename, existing[filename]["status"]) + continue + + # New ZIM — read accurate metadata via python-libzim + zim_path = os.path.join(ZIM_DIR, filename) + if not os.path.isfile(zim_path): + logger.warning("ZIM file not found on disk: %s", zim_path) + continue + + logger.info("New ZIM detected: %s — reading metadata via libzim", filename) + meta = get_libzim_metadata(zim_path) + + con.execute( + """INSERT INTO zim_sources + (zim_filename, zim_path, zim_uuid, title, description, + language, category, article_count, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'detected')""", + ( + filename, + zim_path, + entry["uuid"], + meta.get("title") or entry["title"], + meta.get("description") or entry["summary"], + meta.get("language") or entry["language"], + entry["category"], + meta.get("article_count", 0), + ), + ) + new_count += 1 + logger.info( + " Inserted: %s — title=%r, articles=%s (OPDS said %s)", + filename, + meta.get("title") or entry["title"], + meta.get("article_count", 0), + entry["article_count_opds"], + ) + + # Detect removed ZIMs (in DB but not in OPDS, and not already marked removed) + removed_count = 0 + for filename, row in existing.items(): + if filename not in opds_filenames and row["status"] != "removed": + con.execute( + "UPDATE zim_sources SET status = 'removed' WHERE id = ?", + (row["id"],), + ) + removed_count += 1 + logger.info("Marked removed: %s", filename) + + con.commit() + con.close() + + logger.info( + "Scan complete: %d new, %d removed, %d total in catalog", + new_count, removed_count, len(opds_entries), + ) + + +if __name__ == "__main__": + setup_logging("recon.zim_monitor") + scan_zims() From c60aa5e80dbbd00447d558bfe6f54c794b5d51f2 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 17 Apr 2026 02:03:12 +0000 Subject: [PATCH 2/8] =?UTF-8?q?Phase=202:=20ZIM=20processor=20=E2=80=94=20?= =?UTF-8?q?batch=20article=20ingestion=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds lib/processors/zim_processor.py which opens a ZIM file via python-libzim, iterates HTML articles, strips to clean text (lxml), and feeds each article into the existing RECON enrichment pipeline. Key features: - HTML to text via lxml (strips nav/footer/script/style) - Filters redirects, non-HTML entries, stubs (<200 chars) - Content hash dedup against existing catalogue - Creates processing dirs with page files and meta.json - Registers articles as "extracted" for automatic enrichment - Checkpointing via zim_sources.last_checkpoint for resume - Configurable batch size and delay for rate control - Standalone CLI: python3 -m lib.processors.zim_processor Tested: 100 Appropedia articles processed in 3s, enricher picks them up automatically via the existing pipeline. Co-Authored-By: Claude Opus 4.6 --- lib/processors/zim_processor.py | 387 ++++++++++++++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 lib/processors/zim_processor.py diff --git a/lib/processors/zim_processor.py b/lib/processors/zim_processor.py new file mode 100644 index 0000000..ba29952 --- /dev/null +++ b/lib/processors/zim_processor.py @@ -0,0 +1,387 @@ +""" +RECON ZIM Processor + +Batch importer for ZIM files. Opens a ZIM via python-libzim, iterates +HTML articles, strips to clean text, creates processing directories, +and registers each article as "extracted" for the enricher to pick up. + +This is NOT a dispatcher-style processor (no pre_flight). ZIMs contain +thousands of articles — ingestion is triggered explicitly or by the +ZIM monitor. + +Usage: + python3 -m lib.processors.zim_processor --zim-source-id 1 + python3 -m lib.processors.zim_processor --zim-source-id 1 --limit 100 --batch-size 50 +""" +import argparse +import hashlib +import json +import logging +import os +import re +import sys +import time + +from lxml import html as lxml_html + +sys.path.insert(0, "/opt/recon") + +from lib.utils import setup_logging, get_config +from lib.status import StatusDB +from lib.web_scraper import chunk_text + +logger = logging.getLogger("recon.processors.zim") + +WORDS_PER_PAGE = 2000 +MIN_TEXT_LENGTH = 200 + +# Elements to strip before text extraction +STRIP_TAGS = {'nav', 'footer', 'script', 'style', 'header', 'aside'} + + +def _text_hash(text): + """Compute MD5 hash of text content (matching content_hash style).""" + return hashlib.md5(text.encode('utf-8')).hexdigest() + + +def _html_to_text(html_bytes): + """Convert HTML bytes to clean text via lxml. + + Strips nav, footer, script, style elements. Decodes entities. + Normalizes whitespace. + """ + try: + doc = lxml_html.fromstring(html_bytes) + except Exception: + return "" + + # Strip unwanted elements + for tag in STRIP_TAGS: + for el in doc.iter(tag): + el.drop_tree() + + # Extract text + text = doc.text_content() + + # Normalize whitespace: collapse runs of spaces, normalize newlines + text = re.sub(r'[ \t]+', ' ', text) + text = re.sub(r'\n{3,}', '\n\n', text) + text = text.strip() + + return text + + +def ingest_zim(zim_source_id, db, config, stop_event=None, + batch_size=100, batch_delay=1.0, limit=None): + """Process all articles from a ZIM file registered in zim_sources. + + - Reads zim_path from zim_sources table + - Iterates articles, creates processing dirs, registers in DB + - Checkpoints progress via zim_sources.last_checkpoint + - Respects stop_event for graceful shutdown + - Yields after each batch to avoid monopolizing resources + + Args: + zim_source_id: ID in zim_sources table + db: StatusDB instance + config: RECON config dict + stop_event: threading.Event for graceful shutdown (optional) + batch_size: articles per batch before sleeping + batch_delay: seconds to sleep between batches + limit: max articles to process (None = all) + + Returns: + dict with counts: processed, skipped, duplicates, errors + """ + from libzim.reader import Archive + + conn = db._get_conn() + + # Load ZIM source record + row = conn.execute( + "SELECT * FROM zim_sources WHERE id = ?", (zim_source_id,) + ).fetchone() + if not row: + logger.error("ZIM source ID %d not found", zim_source_id) + return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + + zim_source = dict(row) + zim_path = zim_source['zim_path'] + zim_filename = zim_source['zim_filename'] + zim_title = zim_source.get('title') or zim_filename + + if not os.path.isfile(zim_path): + logger.error("ZIM file not found: %s", zim_path) + return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + + logger.info("Opening ZIM: %s (%s)", zim_title, zim_filename) + zim = Archive(zim_path) + total_entries = zim.entry_count + + # Read checkpoint to resume from + last_checkpoint = zim_source.get('last_checkpoint') + start_idx = 0 + if last_checkpoint: + try: + start_idx = int(last_checkpoint) + logger.info("Resuming from checkpoint: entry %d", start_idx) + except ValueError: + logger.warning("Invalid checkpoint value: %s, starting from 0", last_checkpoint) + + # Update status to ingesting + conn.execute( + "UPDATE zim_sources SET status = 'ingesting', started_at = CURRENT_TIMESTAMP WHERE id = ?", + (zim_source_id,) + ) + conn.commit() + + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + + # Get already-processed article paths for this ZIM source (dedup within ZIM) + existing_paths = set() + for r in conn.execute( + "SELECT article_path FROM zim_articles WHERE zim_source_id = ?", + (zim_source_id,) + ).fetchall(): + existing_paths.add(r['article_path']) + + stats = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + # Track what was already flushed to DB to avoid double-counting + flushed = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + batch_count = 0 + total_processed_this_run = 0 + last_entry_idx = start_idx + + for entry_idx in range(start_idx, total_entries): + if stop_event and stop_event.is_set(): + logger.info("Stop event set, halting ZIM ingest at entry %d", entry_idx) + break + + if limit and total_processed_this_run >= limit: + logger.info("Reached limit of %d articles", limit) + break + + last_entry_idx = entry_idx + + try: + entry = zim._get_entry_by_id(entry_idx) + except Exception: + continue + + # Skip redirects + if entry.is_redirect: + continue + + try: + item = entry.get_item() + except Exception: + continue + + # Skip non-HTML + if item.mimetype != "text/html": + continue + + article_path = entry.path + article_title = entry.title + + # Skip if already processed in a prior run + if article_path in existing_paths: + continue + + # Extract and clean text + try: + html_bytes = bytes(item.content) + clean_text = _html_to_text(html_bytes) + except Exception as e: + logger.debug("HTML extraction failed for %s: %s", article_path, e) + stats['errors'] += 1 + continue + + # Skip stubs + if len(clean_text) < MIN_TEXT_LENGTH: + stats['skipped'] += 1 + continue + + # Compute content hash + file_hash = _text_hash(clean_text) + + # Deduplicate against existing catalogue + cat_row = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if cat_row: + # Record in zim_articles as skipped duplicate + conn.execute( + """INSERT OR IGNORE INTO zim_articles + (zim_source_id, article_path, article_title, status, processed_at) + VALUES (?, ?, ?, 'skipped', CURRENT_TIMESTAMP)""", + (zim_source_id, article_path, article_title) + ) + stats['duplicates'] += 1 + total_processed_this_run += 1 + continue + + # Create processing directory + proc_dir = os.path.join(processing_root, file_hash) + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + logger.error("Cannot create processing dir %s: %s", proc_dir, e) + stats['errors'] += 1 + continue + + # Split into page files + pages = chunk_text(clean_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # Write meta.json + meta = { + 'hash': file_hash, + 'filename': article_title + '.html', + 'source_type': 'zim', + 'zim_file': zim_filename, + 'zim_source_id': zim_source_id, + 'article_title': article_title, + 'article_path': article_path, + 'page_count': len(pages), + 'text_length': len(clean_text), + } + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # Register in catalogue + db.add_to_catalogue( + file_hash, + article_title + '.html', + zim_path, # source path is the ZIM file + len(clean_text), # size in bytes (text) + 'kiwix', # source + zim_title, # category = ZIM title + ) + + # Queue document + db.queue_document(file_hash) + + # Set text_dir, page_count, book_title on documents row + # Mark organized_at immediately (ZIM articles don't get filed to library) + conn.execute( + "UPDATE documents SET text_dir = ?, page_count = ?, " + "book_title = ?, organized_at = CURRENT_TIMESTAMP " + "WHERE hash = ?", + (proc_dir, len(pages), article_title, file_hash) + ) + + # Update status to extracted + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + # Record in zim_articles + conn.execute( + """INSERT OR IGNORE INTO zim_articles + (zim_source_id, article_path, article_title, status, processed_at) + VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)""", + (zim_source_id, article_path, article_title) + ) + conn.commit() + + stats['processed'] += 1 + total_processed_this_run += 1 + batch_count += 1 + + # Progress logging + total_done = zim_source['processed_count'] + stats['processed'] + article_count = zim_source.get('article_count', 0) + if stats['processed'] % 500 == 0 and article_count > 0: + pct = total_done / article_count * 100 + logger.info( + "ZIM ingest [%s]: %s/%s (%.1f%%)", + zim_title, f"{total_done:,}", f"{article_count:,}", pct + ) + + # Batch checkpoint — flush only the delta since last flush + if batch_count >= batch_size: + delta_p = stats['processed'] - flushed['processed'] + delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) + delta_e = stats['errors'] - flushed['errors'] + conn.execute( + "UPDATE zim_sources SET processed_count = processed_count + ?, " + "skipped_count = skipped_count + ?, error_count = error_count + ?, " + "last_checkpoint = ? WHERE id = ?", + (delta_p, delta_s, delta_e, str(entry_idx + 1), zim_source_id) + ) + conn.commit() + flushed['processed'] = stats['processed'] + flushed['skipped'] = stats['skipped'] + flushed['duplicates'] = stats['duplicates'] + flushed['errors'] = stats['errors'] + + batch_count = 0 + + if batch_delay > 0: + time.sleep(batch_delay) + + # Final checkpoint — flush only the unflushed delta + final_status = 'complete' + if limit and total_processed_this_run >= limit: + final_status = 'ingesting' # not done yet, just hit the limit + + delta_p = stats['processed'] - flushed['processed'] + delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) + delta_e = stats['errors'] - flushed['errors'] + + conn.execute( + "UPDATE zim_sources SET processed_count = processed_count + ?, " + "skipped_count = skipped_count + ?, error_count = error_count + ?, " + "last_checkpoint = ?, status = ?, completed_at = CASE WHEN ? = 'complete' THEN CURRENT_TIMESTAMP ELSE completed_at END " + "WHERE id = ?", + (delta_p, delta_s, delta_e, str(last_entry_idx + 1), + final_status, final_status, zim_source_id) + ) + conn.commit() + + logger.info( + "ZIM ingest [%s] %s: %d processed, %d skipped, %d duplicates, %d errors", + zim_title, final_status, + stats['processed'], stats['skipped'], stats['duplicates'], stats['errors'] + ) + + return stats + + +def main(): + """CLI entry point for standalone ZIM processing.""" + parser = argparse.ArgumentParser(description="RECON ZIM Processor") + parser.add_argument('--zim-source-id', type=int, required=True, + help="ID from zim_sources table") + parser.add_argument('--batch-size', type=int, default=100, + help="Articles per batch (default: 100)") + parser.add_argument('--batch-delay', type=float, default=1.0, + help="Seconds between batches (default: 1.0)") + parser.add_argument('--limit', type=int, default=None, + help="Max articles to process (default: all)") + args = parser.parse_args() + + setup_logging('recon.processors.zim') + + config = get_config() + db = StatusDB(config['paths']['db']) + + stats = ingest_zim( + zim_source_id=args.zim_source_id, + db=db, + config=config, + batch_size=args.batch_size, + batch_delay=args.batch_delay, + limit=args.limit, + ) + + print(f"\nResults: {stats['processed']} processed, {stats['skipped']} skipped, " + f"{stats['duplicates']} duplicates, {stats['errors']} errors") + + +if __name__ == "__main__": + main() From 26351608874ff74955c78e4197b388f7a42d40a8 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 17 Apr 2026 07:00:24 +0000 Subject: [PATCH 3/8] Kiwix integration: ZIM processor, dashboard tab, wiki.echo6.co citations - ZIM processor: extract articles from ZIM files, feed into existing enrichment pipeline - Dashboard: Kiwix tab with library table, ingest toggle, upload, remove - kiwix-serve on port 8430, wiki.echo6.co behind Authentik - Citation URLs point to wiki.echo6.co/{zimname}/{article_path} - Dashboard shows WIKI type badge for ZIM-sourced content - Appropedia EN (19,445 articles) fully ingested as proof of concept --- lib/api.py | 308 +++++++++++++++++++++++++++++++++ lib/embedder.py | 13 +- static/css/recon.css | 14 ++ static/js/dashboard.js | 4 +- static/js/kiwix.js | 136 +++++++++++++++ templates/base.html | 1 + templates/kiwix/dashboard.html | 48 +++++ 7 files changed, 521 insertions(+), 3 deletions(-) create mode 100644 static/js/kiwix.js create mode 100644 templates/kiwix/dashboard.html diff --git a/lib/api.py b/lib/api.py index 757ebf4..980578b 100644 --- a/lib/api.py +++ b/lib/api.py @@ -35,12 +35,15 @@ _cache = { 'qdrant_scroll': None, 'qdrant_scroll_ts': 0, 'quick_stats': None, + 'kiwix_sources': None, } app = Flask(__name__, template_folder=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'templates'), static_folder=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'static')) +app.config['MAX_CONTENT_LENGTH'] = None # ZIM files can be multi-GB + # ── Navigation Constants ── KNOWLEDGE_SUBNAV = [ @@ -56,6 +59,8 @@ PEERTUBE_SUBNAV = [ {'href': '/peertube/channels', 'label': 'Channels'}, ] + +KIWIX_SUBNAV = [] # Single-page, no subnav needed SETTINGS_SUBNAV = [ {'href': '/settings/keys', 'label': 'API Keys'}, {'href': '/settings/cookies', 'label': 'YouTube Cookies'}, @@ -908,6 +913,7 @@ def _build_knowledge_stats(): c.source, CASE WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN c.source = 'kiwix' THEN 'wiki' WHEN c.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type, @@ -967,6 +973,7 @@ def _build_knowledge_stats(): d.status, d.concepts_extracted, d.vectors_inserted, CASE WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN c.source = 'kiwix' THEN 'wiki' WHEN d.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type @@ -1072,6 +1079,12 @@ def start_cache_warmer(stop_event=None): except Exception as e: logger.warning(f" Quick stats warm-up failed: {e}") + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + logger.info(" Kiwix sources cached") + except Exception as e: + logger.warning(f" Kiwix sources warm-up failed: {e}") + logger.info("Cache warmer ready — all data pre-loaded") # Continuous refresh loop @@ -1098,6 +1111,10 @@ def start_cache_warmer(stop_event=None): _cache['quick_stats'] = _build_quick_stats() except Exception: pass + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass # PeerTube dashboard: every 30s (cycle 2, offset) if cycle % 2 == 1: @@ -1930,6 +1947,297 @@ def api_peertube_dashboard(): return jsonify(_cache['pt_dashboard']) + +# ── Kiwix Dashboard ── + +@app.route('/kiwix') +def kiwix_dashboard(): + return render_template('kiwix/dashboard.html', + domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix') + + +@app.route('/api/kiwix/sources') +def api_kiwix_sources(): + """Serve pre-cached Kiwix sources data (never blocks).""" + if _cache['kiwix_sources'] is None: + return jsonify({'error': 'Warming up, try again in a few seconds'}), 503 + return jsonify(_cache['kiwix_sources']) + + +@app.route('/api/kiwix/toggle-ingest/', methods=['POST']) +def api_kiwix_toggle_ingest(source_id): + """Toggle ingest_enabled on a ZIM source.""" + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT id, status, ingest_enabled FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + data = request.get_json(silent=True) or {} + new_val = 1 if data.get('enabled', not row['ingest_enabled']) else 0 + conn.execute("UPDATE zim_sources SET ingest_enabled = ? WHERE id = ?", (new_val, source_id)) + conn.commit() + + # If toggling ON and source is eligible, spawn ingest in background + if new_val == 1 and row['status'] == 'detected': + _spawn_zim_ingest(source_id) + + return jsonify({'ok': True, 'ingest_enabled': new_val}) + + +@app.route('/api/kiwix/trigger-ingest/', methods=['POST']) +def api_kiwix_trigger_ingest(source_id): + """Explicit one-shot ingest trigger.""" + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT id FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + _spawn_zim_ingest(source_id) + return jsonify({'ok': True}) + + +@app.route('/api/kiwix/upload', methods=['POST']) +def api_kiwix_upload(): + """Accept ZIM file upload, register with kiwix-serve, scan.""" + import subprocess + if 'file' not in request.files: + return jsonify({'error': 'No file provided'}), 400 + + f = request.files['file'] + if not f.filename or not f.filename.endswith('.zim'): + return jsonify({'error': 'File must be a .zim file'}), 400 + + filename = secure_filename(f.filename) + dest = os.path.join('/mnt/kiwix', filename) + tmp_dest = dest + '.tmp' + + try: + f.save(tmp_dest) + os.rename(tmp_dest, dest) + except Exception as e: + if os.path.exists(tmp_dest): + os.remove(tmp_dest) + return jsonify({'error': f'Save failed: {e}'}), 500 + + # Register with kiwix-serve library + try: + subprocess.run( + ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'add', dest], + capture_output=True, text=True, timeout=30 + ) + except Exception as e: + logger.warning(f"kiwix-manage add failed: {e}") + + # Scan for new entry + try: + from .zim_monitor import scan_zims + scan_zims() + except Exception as e: + logger.warning(f"scan_zims after upload failed: {e}") + + # Refresh cache + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + + return jsonify({'ok': True, 'filename': filename}) + + + +@app.route('/api/kiwix/remove/', methods=['POST']) +def api_kiwix_remove(source_id): + """Remove a ZIM source: delete vectors, DB records, library entry, and file.""" + import subprocess + import requests as req + + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + zim_source = dict(row) + zim_filename = zim_source['zim_filename'] + zim_path = zim_source['zim_path'] + zim_title = zim_source.get('title', zim_filename) + results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False} + + # Step 1: Find all document hashes for this ZIM source + doc_hashes = [r['hash'] for r in conn.execute( + "SELECT c.hash FROM catalogue c WHERE c.source = 'kiwix' AND c.category = ?", + (zim_title,) + ).fetchall()] + + # Step 2: Delete vectors from Qdrant + if doc_hashes: + config = get_config() + qdrant_host = config.get('vector_db', {}).get('host', '100.64.0.14') + qdrant_port = config.get('vector_db', {}).get('port', 6333) + collection = config.get('vector_db', {}).get('collection', 'recon_knowledge') + + # Delete in batches of 100 hashes + for i in range(0, len(doc_hashes), 100): + batch = doc_hashes[i:i+100] + try: + resp = req.post( + f"http://{qdrant_host}:{qdrant_port}/collections/{collection}/points/delete", + json={ + "filter": { + "must": [{ + "key": "doc_hash", + "match": {"any": batch} + }] + } + }, + timeout=30 + ) + if resp.status_code == 200: + results['vectors_deleted'] += len(batch) + except Exception as e: + logger.warning(f"Qdrant delete batch failed: {e}") + + # Step 3: Delete DB records + for h in doc_hashes: + # Delete processing directory if it exists + text_dir_row = conn.execute("SELECT text_dir FROM documents WHERE hash = ?", (h,)).fetchone() + if text_dir_row and text_dir_row['text_dir']: + try: + import shutil + shutil.rmtree(text_dir_row['text_dir'], ignore_errors=True) + except Exception: + pass + conn.execute("DELETE FROM documents WHERE hash = ?", (h,)) + conn.execute("DELETE FROM catalogue WHERE hash = ?", (h,)) + results['docs_deleted'] = len(doc_hashes) + + # Delete zim_articles records + conn.execute("DELETE FROM zim_articles WHERE zim_source_id = ?", (source_id,)) + + # Delete zim_sources record + conn.execute("DELETE FROM zim_sources WHERE id = ?", (source_id,)) + conn.commit() + + # Step 4: Remove from kiwix-serve library + try: + # Get the book ID from library.xml + subprocess.run( + ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', zim_filename.replace('.zim', '')], + capture_output=True, text=True, timeout=10 + ) + except Exception as e: + logger.warning(f"kiwix-manage remove failed: {e}") + + # Step 5: Delete the ZIM file + if os.path.isfile(zim_path): + try: + os.remove(zim_path) + results['file_deleted'] = True + except Exception as e: + logger.warning(f"ZIM file delete failed: {e}") + results['file_deleted'] = False + + # Refresh cache + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + + logger.info(f"Removed ZIM source '{zim_title}': {results}") + return jsonify({'ok': True, 'results': results}) + + +def _spawn_zim_ingest(source_id): + """Start ZIM ingestion in a background thread.""" + def _run(): + try: + from .processors.zim_processor import ingest_zim + config = get_config() + db = StatusDB() + logger.info(f"Starting ZIM ingest for source {source_id}") + result = ingest_zim(source_id, db, config) + logger.info(f"ZIM ingest complete for source {source_id}: {result}") + # Refresh cache after completion + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + except Exception as e: + logger.error(f"ZIM ingest failed for source {source_id}: {e}") + + t = threading.Thread(target=_run, daemon=True, name=f'zim-ingest-{source_id}') + t.start() + + +def _build_kiwix_sources(): + """Build Kiwix sources data for the dashboard cache.""" + import urllib.request + + db = StatusDB() + conn = db._get_conn() + + # Get all ZIM sources + rows = conn.execute(""" + SELECT id, zim_filename, title, description, language, category, + article_count, status, processed_count, skipped_count, error_count, + ingest_enabled, detected_at, started_at, completed_at + FROM zim_sources + ORDER BY detected_at DESC + """).fetchall() + + sources = [] + total_articles = 0 + total_processed = 0 + total_in_pipeline = 0 + + for r in rows: + source = dict(r) + total_articles += r['article_count'] or 0 + total_processed += r['processed_count'] or 0 + + # Get pipeline stats for this source's documents + pipeline = {} + try: + pipe_rows = conn.execute(""" + SELECT d.status, COUNT(*) as cnt + FROM documents d + JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'kiwix' + GROUP BY d.status + """).fetchall() + for pr in pipe_rows: + pipeline[pr['status']] = pr['cnt'] + except Exception: + pass + + in_pipe = sum(v for k, v in pipeline.items() if k not in ('complete', 'failed')) + total_in_pipeline += in_pipe + source['pipeline'] = pipeline + sources.append(source) + + # Check kiwix-serve health + kiwix_status = 'inactive' + try: + resp = urllib.request.urlopen("http://localhost:8430", timeout=3) + if resp.status == 200: + kiwix_status = 'active' + except Exception: + pass + + return { + 'sources': sources, + 'kiwix_serve': {'status': kiwix_status, 'url': 'https://wiki.echo6.co'}, + 'totals': { + 'sources': len(sources), + 'articles': total_articles, + 'processed': total_processed, + 'in_pipeline': total_in_pipeline, + } + } + + # ── Metrics API ── @app.route('/api/metrics/history') diff --git a/lib/embedder.py b/lib/embedder.py index 35fcb58..034624a 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -10,6 +10,7 @@ Dependencies: requests, qdrant-client Config: embedding, vector_db, processing.embed_workers """ import json +import re import os import time import traceback @@ -290,7 +291,17 @@ def embed_single(file_hash, db, config): page_timestamps = meta['page_timestamps'] except Exception: pass - if doc.get('path'): + # For ZIM articles, build wiki.echo6.co URL from meta.json + if source_type == 'zim' and meta.get('article_path'): + from urllib.parse import quote as url_quote + zim_name = meta.get('zim_name', '') + if not zim_name: + # Derive from zim_file: strip flavor/date suffix + zf = meta.get('zim_file', '') + zim_name = re.sub(r'_(?:maxi|mini|nopic)_[\d-]+\.zim$', '', zf) + article_path = url_quote(meta['article_path'], safe='/:@!$&()*+,;=-._~') + download_url = f'https://wiki.echo6.co/{zim_name}/{article_path}' + elif doc.get('path'): download_url = generate_download_url( doc['path'], config.get('library_root', '/mnt/library') ) diff --git a/static/css/recon.css b/static/css/recon.css index 95aed52..9289f93 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -211,6 +211,7 @@ tr:hover { background: var(--bg-secondary); } .badge-web { background: #1e3a5f; color: #60a5fa; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-pdf { background: #2d5a2d; color: #4ade80; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-transcript { background: #3b1f5e; color: #c084fc; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-wiki { background: #1f4a3b; color: #34d399; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } /* ── Trend indicators ── */ .trend { font-size: 11px; margin-left: 6px; } @@ -315,3 +316,16 @@ tr:hover { background: var(--bg-secondary); } .errors-panel.has-errors { display: block; } .errors-panel summary { color: var(--red); cursor: pointer; font-size: 13px; margin-bottom: 8px; } .errors-panel .error-line { color: var(--text-muted); font-size: 11px; padding: 2px 0; border-bottom: 1px solid var(--border); } + +/* ── Toggle switch ── */ +.toggle-switch { position: relative; display: inline-block; width: 40px; height: 20px; } +.toggle-switch input { opacity: 0; width: 0; height: 0; } +.toggle-slider { position: absolute; cursor: pointer; inset: 0; background: #333; border-radius: 20px; transition: 0.3s; } +.toggle-slider:before { content: ''; position: absolute; height: 16px; width: 16px; left: 2px; bottom: 2px; background: #888; border-radius: 50%; transition: 0.3s; } +.toggle-switch input:checked + .toggle-slider { background: #1a4a2e; } +.toggle-switch input:checked + .toggle-slider:before { transform: translateX(20px); background: #00ff41; } + +/* ── Kiwix status badges ── */ +.badge-complete { background: #1a4a2e; color: #00ff41; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-ingesting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 254d92a..0bd0b39 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -88,7 +88,7 @@ var pipeCount = s.in_pipeline || 0; totalCat += catCount; totalComp += compCount; totalPipe += pipeCount; totalConcepts += s.concepts; totalVectors += s.vectors; - var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : 'PDF'; + var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : s.type === 'wiki' ? 'WIKI' : 'PDF'; var compPct = catCount > 0 ? (compCount / catCount * 100) : 0; var pipePct = catCount > 0 ? (pipeCount / catCount * 100) : 0; var compColor = compPct >= 100 ? '#00ff41' : compPct > 0 ? '#ffa500' : '#666'; @@ -185,7 +185,7 @@ rtb.innerHTML = 'None yet'; } else { rtb.innerHTML = data.recent_complete.map(function(r) { - var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : 'PDF'; + var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : r.type === 'wiki' ? 'WIKI' : 'PDF'; return '' + r.title + '' + badge + '' + r.concepts + '' + r.vectors + ''; }).join(''); diff --git a/static/js/kiwix.js b/static/js/kiwix.js new file mode 100644 index 0000000..aab8552 --- /dev/null +++ b/static/js/kiwix.js @@ -0,0 +1,136 @@ +/* RECON Kiwix Dashboard JS */ +(function() { + 'use strict'; + + function loadKiwixDashboard() { + return RECON.fetchJSON('/api/kiwix/sources').then(function(data) { + // Update stat cards + var t = data.totals || {}; + RECON.set('kx-sources', RECON.fmt(t.sources)); + RECON.set('kx-articles', RECON.fmt(t.articles)); + RECON.set('kx-processed', RECON.fmt(t.processed)); + RECON.set('kx-pipeline', RECON.fmt(t.in_pipeline)); + + // Kiwix-serve status dot + var ks = data.kiwix_serve || {}; + var dot = document.getElementById('svc-kiwix-serve'); + dot.className = 'svc-dot ' + (ks.status === 'active' ? 'active' : 'inactive'); + + // ZIM table + var sources = data.sources || []; + var html = ''; + sources.forEach(function(s) { + var pctDone = s.article_count > 0 ? (s.processed_count / s.article_count * 100).toFixed(1) : 0; + var statusBadge = s.status === 'complete' ? 'COMPLETE' : + s.status === 'ingesting' ? 'INGESTING' : + 'DETECTED'; + // Derive browse URL from zim_filename + var zimName = s.zim_filename.replace(/_(?:maxi|mini|nopic)_[\d-]+\.zim$/, ''); + var browseUrl = 'https://wiki.echo6.co/' + zimName + '/'; + // Toggle switch + var checked = s.ingest_enabled ? ' checked' : ''; + var toggle = ''; + + html += '' + + '' + (s.title || s.zim_filename) + '' + + '
' + s.zim_filename + '
' + + '' + (s.language || '\u2014') + '' + + '' + RECON.fmt(s.article_count) + '' + + '' + RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + + ' (' + pctDone + '%)' + + '' + statusBadge + '' + + '' + toggle + '' + + 'Browse' + + '' + + ''; + }); + if (!html) html = 'No ZIM sources detected'; + RECON.setHTML('kx-table-body', html); + }).catch(function(err) { + console.error('Kiwix dashboard error:', err); + }); + } + + function toggleIngest(id, enabled) { + RECON.postJSON('/api/kiwix/toggle-ingest/' + id, {enabled: enabled}).then(function(data) { + if (data.ok) loadKiwixDashboard(); + }); + } + + function removeSource(id, title) { + if (!confirm('Remove "' + title + '"?\n\nThis will delete the ZIM file, all ingested documents, and associated vectors from Qdrant. This cannot be undone.')) return; + RECON.postJSON('/api/kiwix/remove/' + id).then(function(data) { + if (data.ok) { + var r = data.results || {}; + alert('Removed: ' + r.docs_deleted + ' docs, ~' + r.vectors_deleted + ' vector batches deleted, file ' + (r.file_deleted ? 'deleted' : 'not found')); + loadKiwixDashboard(); + } + }); + } + + function triggerIngest(id) { + RECON.postJSON('/api/kiwix/trigger-ingest/' + id).then(function(data) { + if (data.ok) loadKiwixDashboard(); + }); + } + + function uploadZim() { + var input = document.getElementById('kx-file-input'); + var file = input.files[0]; + if (!file) return; + + var statusEl = document.getElementById('kx-upload-status'); + var progressDiv = document.getElementById('kx-upload-progress'); + var progressBar = document.getElementById('kx-progress-bar'); + var progressText = document.getElementById('kx-progress-text'); + + statusEl.textContent = 'Uploading ' + file.name + '...'; + progressDiv.style.display = 'block'; + + var formData = new FormData(); + formData.append('file', file); + + var xhr = new XMLHttpRequest(); + xhr.open('POST', '/api/kiwix/upload', true); + + xhr.upload.onprogress = function(e) { + if (e.lengthComputable) { + var pct = (e.loaded / e.total * 100).toFixed(1); + progressBar.style.width = pct + '%'; + progressText.textContent = RECON.fmtBytes(e.loaded) + ' / ' + RECON.fmtBytes(e.total) + ' (' + pct + '%)'; + } + }; + + xhr.onload = function() { + if (xhr.status === 200) { + var resp = JSON.parse(xhr.responseText); + statusEl.textContent = resp.ok ? 'Upload complete: ' + resp.filename : 'Error: ' + (resp.error || 'Unknown'); + progressBar.style.width = '100%'; + progressBar.style.background = resp.ok ? '#16a34a' : '#dc2626'; + if (resp.ok) loadKiwixDashboard(); + } else { + statusEl.textContent = 'Upload failed (HTTP ' + xhr.status + ')'; + progressBar.style.background = '#dc2626'; + } + input.value = ''; + }; + + xhr.onerror = function() { + statusEl.textContent = 'Upload failed (network error)'; + progressBar.style.background = '#dc2626'; + input.value = ''; + }; + + xhr.send(formData); + } + + // Expose for inline onclick + window.KIWIX = { toggleIngest: toggleIngest, triggerIngest: triggerIngest, remove: removeSource }; + + document.addEventListener('DOMContentLoaded', function() { + RECON.startRefresh(loadKiwixDashboard, 30000); + document.getElementById('kx-file-input').addEventListener('change', uploadZim); + }); +})(); diff --git a/templates/base.html b/templates/base.html index 09db6d8..49b1a21 100644 --- a/templates/base.html +++ b/templates/base.html @@ -19,6 +19,7 @@ diff --git a/templates/kiwix/dashboard.html b/templates/kiwix/dashboard.html new file mode 100644 index 0000000..72bbed4 --- /dev/null +++ b/templates/kiwix/dashboard.html @@ -0,0 +1,48 @@ +{% extends "base.html" %} +{% block content %} +
+ +
+
ZIM Sources
+
Total Articles
+
Processed
+
In Pipeline
+
+ + +
+
Kiwix-Serve
+ +
+ + +
+

ZIM Library

+ + + + + + + +
TitleLanguageArticlesProgressStatusIngestBrowse
Loading...
+
+ + +
+

Upload ZIM File

+
+ + + +
+ +
+
+{% endblock %} +{% block scripts %} + +{% endblock %} From 501004ecf158f7815036cfd8fb4908625ba59137 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 17 Apr 2026 07:30:30 +0000 Subject: [PATCH 4/8] Filter non-English articles from ZIM ingestion Skip articles with MediaWiki translation suffixes (/es, /fr, /pl, etc.) before text extraction to avoid wasting Gemini enrichment on translations. Uses path-based regex matching against ISO 639 language codes. ~5,276 non-English articles already ingested from Appropedia (top: es=837, zh=765, ru=475, fr=433, ko=407). Purge decision deferred. --- lib/processors/zim_processor.py | 40 +++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/lib/processors/zim_processor.py b/lib/processors/zim_processor.py index ba29952..b258408 100644 --- a/lib/processors/zim_processor.py +++ b/lib/processors/zim_processor.py @@ -38,6 +38,39 @@ MIN_TEXT_LENGTH = 200 # Elements to strip before text extraction STRIP_TAGS = {'nav', 'footer', 'script', 'style', 'header', 'aside'} +# Non-English article path suffix pattern (MediaWiki ZIMs use /XX or /XXX suffixes) +# Matches paths ending in /xx where xx is a 2-3 letter lowercase language code +_LANG_SUFFIX_RE = re.compile(r'/[a-z]{2,3}$') +# Common ISO 639-1/2 language codes to filter (excludes 'en') +_NON_EN_LANGS = { + 'aa','ab','af','ak','am','an','ar','as','av','ay','az', + 'ba','be','bg','bh','bi','bm','bn','bo','br','bs', + 'ca','ce','ch','co','cr','cs','cu','cv','cy', + 'da','de','dv','dz', + 'ee','el','eo','es','et','eu', + 'fa','ff','fi','fj','fo','fr','fy', + 'ga','gd','gl','gn','gu','gv', + 'ha','he','hi','ho','hr','ht','hu','hy','hz', + 'ia','id','ie','ig','ii','ik','io','is','it','iu', + 'ja','jv', + 'ka','kg','ki','kj','kk','kl','km','kn','ko','kr','ks','ku','kv','kw','ky', + 'la','lb','lg','li','ln','lo','lt','lu','lv', + 'mg','mh','mi','mk','ml','mn','mo','mr','ms','mt','my', + 'na','nb','nd','ne','ng','nl','nn','no','nr','nv','ny', + 'oc','oj','om','or','os', + 'pa','pi','pl','ps','pt', + 'qu', + 'rm','rn','ro','ru','rw', + 'sa','sc','sd','se','sg','sh','si','sk','sl','sm','sn','so','sq','sr','ss','st','su','sv','sw', + 'ta','te','tg','th','ti','tk','tl','tn','to','tr','ts','tt','tw','ty', + 'ug','uk','ur','uz', + 've','vi','vo', + 'wa','wo', + 'xh', + 'yi','yo', + 'za','zh','zu', +} + def _text_hash(text): """Compute MD5 hash of text content (matching content_hash style).""" @@ -190,6 +223,13 @@ def ingest_zim(zim_source_id, db, config, stop_event=None, if article_path in existing_paths: continue + # Skip non-English articles (MediaWiki translation suffix pattern) + lang_match = _LANG_SUFFIX_RE.search(article_path) + if lang_match and lang_match.group(0)[1:] in _NON_EN_LANGS: + stats['skipped'] += 1 + total_processed_this_run += 1 + continue + # Extract and clean text try: html_bytes = bytes(item.content) From 6f2a1d206ee5254442af793ded40af9fb918b414 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 17 Apr 2026 14:37:13 +0000 Subject: [PATCH 5/8] Add langdetect language filter to enricher + purge non-English ZIM articles - Install langdetect package for content-level language detection - Add _check_language() to enricher.py: reads first 1500 chars of first page, detects language via langdetect, skips if not in allowed list - Configurable via config.yaml pipeline.language_filter and pipeline.allowed_languages (default: en only) - Catches non-English content from ANY source (PDF, web, ZIM, PeerTube) before burning Gemini API quota on enrichment - Add scan_zims retry logic (3 attempts, 2s delay) for upload handler - Purged 6,483 stale non-English zim_articles rows from DB Co-Authored-By: Claude Opus 4.6 --- config.yaml | 4 ++++ lib/api.py | 16 +++++++++------ lib/enricher.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 6 deletions(-) diff --git a/config.yaml b/config.yaml index 3e185f8..4b147fd 100644 --- a/config.yaml +++ b/config.yaml @@ -440,3 +440,7 @@ pipeline: text: text_processor # mtime stability threshold for picking up files from acquired/ mtime_stability_seconds: 10 + # Language filter: skip non-English content before Gemini enrichment + language_filter: true # Enable langdetect-based filtering + allowed_languages: # ISO 639-1 codes allowed through enrichment + - en diff --git a/lib/api.py b/lib/api.py index 980578b..653c80f 100644 --- a/lib/api.py +++ b/lib/api.py @@ -2030,12 +2030,16 @@ def api_kiwix_upload(): except Exception as e: logger.warning(f"kiwix-manage add failed: {e}") - # Scan for new entry - try: - from .zim_monitor import scan_zims - scan_zims() - except Exception as e: - logger.warning(f"scan_zims after upload failed: {e}") + # Scan for new entry (retry — monitorLibrary may need a moment to reload) + import time as _time + from .zim_monitor import scan_zims + for attempt in range(3): + try: + scan_zims() + break + except Exception as e: + logger.warning(f"scan_zims attempt {attempt+1} failed: {e}") + _time.sleep(2) # Refresh cache try: diff --git a/lib/enricher.py b/lib/enricher.py index d9540aa..e1e583c 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -27,6 +27,15 @@ from .utils import get_config, setup_logging from .status import StatusDB from .utils import resolve_text_dir +try: + from langdetect import detect as _detect_lang + from langdetect.lang_detect_exception import LangDetectException + _HAS_LANGDETECT = True +except ImportError: + _HAS_LANGDETECT = False + +ALLOWED_LANGUAGES = {'en'} # Default: English only + logger = setup_logging('recon.enricher') # Docs stuck in "enriching" longer than this get reset to "extracted" for retry @@ -341,6 +350,42 @@ def validate_and_fix_concepts(concepts, key, config): return concepts +def _check_language(text_dir, config): + """Check language of document text. Returns (is_allowed, detected_lang). + + Reads first 1000 chars from first page file and uses langdetect. + Returns (True, lang) if language is allowed, (False, lang) if not. + Falls back to (True, 'unknown') if detection fails (benefit of the doubt). + """ + if not _HAS_LANGDETECT: + return True, 'unknown' + + # Check if language filter is enabled in config + pipeline_cfg = config.get('pipeline', {}) + if not pipeline_cfg.get('language_filter', True): + return True, 'disabled' + + allowed = set(pipeline_cfg.get('allowed_languages', ['en'])) + + # Read first page for detection + page_files = sorted([f for f in os.listdir(text_dir) + if f.startswith('page_') and f.endswith('.txt')]) + if not page_files: + return True, 'no_pages' + + try: + with open(os.path.join(text_dir, page_files[0]), encoding='utf-8') as f: + sample = f.read(1500) + if len(sample.strip()) < 50: + return True, 'too_short' + lang = _detect_lang(sample) + return (lang in allowed), lang + except LangDetectException: + return True, 'detection_failed' + except Exception: + return True, 'error' + + def enrich_single(file_hash, db, config, key_rotator): doc = db.get_document(file_hash) if not doc: @@ -359,6 +404,14 @@ def enrich_single(file_hash, db, config, key_rotator): db.mark_failed(file_hash, f"Text directory not found: {text_dir}") return False + # Language gate: skip non-English documents before burning Gemini quota + lang_ok, detected_lang = _check_language(text_dir, config) + if not lang_ok: + logger.info(f"Skipping {file_hash[:12]}... detected language '{detected_lang}' " + f"(allowed: {config.get('pipeline', {}).get('allowed_languages', ['en'])})") + db.mark_failed(file_hash, f"Language filter: detected '{detected_lang}', not in allowed list") + return False + db.update_status(file_hash, 'enriching') try: From fed02186faef263210d8e6cf25d9571a257076d0 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 17 Apr 2026 15:22:44 +0000 Subject: [PATCH 6/8] Fix Kiwix status badges to reflect full pipeline state Status was showing COMPLETE after ZIM extraction finished, even when documents were still queued for enrichment/embedding. Now computes effective_status by checking actual pipeline state per-source: - DETECTED: ingest not enabled (gray) - EXTRACTING: ZIM processor running (blue) - PROCESSING: extracted but docs still in enricher/embedder queue (amber) - COMPLETE: all docs fully enriched and embedded in Qdrant (green) Also fixed _build_kiwix_sources pipeline query to filter by category per-source instead of returning global kiwix stats for every source. Progress column now shows "X / Y in Qdrant" when processing, or "X / Y extracted" otherwise. Co-Authored-By: Claude Opus 4.6 --- lib/api.py | 20 +++++++++++++++++--- static/css/recon.css | 2 ++ static/js/kiwix.js | 17 ++++++++++++----- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/lib/api.py b/lib/api.py index 653c80f..a739ec0 100644 --- a/lib/api.py +++ b/lib/api.py @@ -2198,19 +2198,20 @@ def _build_kiwix_sources(): for r in rows: source = dict(r) + zim_title = r['title'] or r['zim_filename'] total_articles += r['article_count'] or 0 total_processed += r['processed_count'] or 0 - # Get pipeline stats for this source's documents + # Get pipeline stats for THIS source's documents (filtered by category) pipeline = {} try: pipe_rows = conn.execute(""" SELECT d.status, COUNT(*) as cnt FROM documents d JOIN catalogue c ON d.hash = c.hash - WHERE c.source = 'kiwix' + WHERE c.source = 'kiwix' AND c.category = ? GROUP BY d.status - """).fetchall() + """, (zim_title,)).fetchall() for pr in pipe_rows: pipeline[pr['status']] = pr['cnt'] except Exception: @@ -2219,6 +2220,19 @@ def _build_kiwix_sources(): in_pipe = sum(v for k, v in pipeline.items() if k not in ('complete', 'failed')) total_in_pipeline += in_pipe source['pipeline'] = pipeline + + # Compute effective status reflecting full pipeline state + db_status = r['status'] + if db_status == 'complete' and pipeline: + if in_pipe > 0: + source['effective_status'] = 'processing' + else: + source['effective_status'] = 'complete' + elif db_status == 'ingesting': + source['effective_status'] = 'extracting' + else: + source['effective_status'] = db_status # 'detected' + sources.append(source) # Check kiwix-serve health diff --git a/static/css/recon.css b/static/css/recon.css index 9289f93..31d6306 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -329,3 +329,5 @@ tr:hover { background: var(--bg-secondary); } .badge-complete { background: #1a4a2e; color: #00ff41; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-ingesting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-processing { background: #4a3a1a; color: #f59e0b; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-extracting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } diff --git a/static/js/kiwix.js b/static/js/kiwix.js index aab8552..5b21f66 100644 --- a/static/js/kiwix.js +++ b/static/js/kiwix.js @@ -20,9 +20,15 @@ var sources = data.sources || []; var html = ''; sources.forEach(function(s) { - var pctDone = s.article_count > 0 ? (s.processed_count / s.article_count * 100).toFixed(1) : 0; - var statusBadge = s.status === 'complete' ? 'COMPLETE' : - s.status === 'ingesting' ? 'INGESTING' : + var es = s.effective_status || s.status; + var pipe = s.pipeline || {}; + var pipeComplete = pipe.complete || 0; + var pipeTotal = 0; + for (var k in pipe) pipeTotal += pipe[k]; + var pctDone = pipeTotal > 0 ? (pipeComplete / pipeTotal * 100).toFixed(1) : 0; + var statusBadge = es === 'complete' ? 'COMPLETE' : + es === 'processing' ? 'PROCESSING' : + es === 'extracting' ? 'EXTRACTING' : 'DETECTED'; // Derive browse URL from zim_filename var zimName = s.zim_filename.replace(/_(?:maxi|mini|nopic)_[\d-]+\.zim$/, ''); @@ -38,8 +44,9 @@ '
' + s.zim_filename + '
' + '' + (s.language || '\u2014') + '' + '' + RECON.fmt(s.article_count) + '' + - '' + RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + - ' (' + pctDone + '%)' + + '' + (es === 'processing' ? + RECON.fmt(pipeComplete) + ' / ' + RECON.fmt(pipeTotal) + ' in Qdrant (' + pctDone + '%)' : + RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + ' extracted') + '' + '' + statusBadge + '' + '' + toggle + '' + 'Browse' + From a40ce47127b9801c13b6ed418e01bbafc3ac3445 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 17 Apr 2026 15:31:01 +0000 Subject: [PATCH 7/8] Fix progress column to show Qdrant count for completed sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Complete sources now show "19,344 in Qdrant" instead of misleading extraction counts. Each status gets contextual progress display: complete → X in Qdrant, processing → X/Y in Qdrant (%), extracting → X/Y extracted, detected → dash. Co-Authored-By: Claude Opus 4.6 --- static/js/kiwix.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/static/js/kiwix.js b/static/js/kiwix.js index 5b21f66..c85ee93 100644 --- a/static/js/kiwix.js +++ b/static/js/kiwix.js @@ -44,9 +44,13 @@ '
' + s.zim_filename + '
' + '' + (s.language || '\u2014') + '' + '' + RECON.fmt(s.article_count) + '' + - '' + (es === 'processing' ? + '' + (es === 'complete' && pipeComplete > 0 ? + RECON.fmt(pipeComplete) + ' in Qdrant' : + es === 'processing' ? RECON.fmt(pipeComplete) + ' / ' + RECON.fmt(pipeTotal) + ' in Qdrant (' + pctDone + '%)' : - RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + ' extracted') + '' + + es === 'extracting' ? + RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + ' extracted' : + '\u2014') + '' + '' + statusBadge + '' + '' + toggle + '' + 'Browse' + From b250d0c25777f2467f792762dbc32485e0c1cb97 Mon Sep 17 00:00:00 2001 From: Matt Date: Sat, 18 Apr 2026 00:06:52 +0000 Subject: [PATCH 8/8] Fix Kiwix download URL generation in embedder - Add /content/ prefix to wiki.echo6.co URLs (required by kiwix-serve) - Stop stripping ZIM flavor/date suffix (e.g. _maxi_2025-11) from filename - Use str.removesuffix instead of regex to strip only .zim extension Before: https://wiki.echo6.co/appropedia_en_all/Article After: https://wiki.echo6.co/content/appropedia_en_all_maxi_2025-11/Article Co-Authored-By: Claude Opus 4.6 --- lib/embedder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/embedder.py b/lib/embedder.py index 034624a..8dcc45a 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -296,11 +296,11 @@ def embed_single(file_hash, db, config): from urllib.parse import quote as url_quote zim_name = meta.get('zim_name', '') if not zim_name: - # Derive from zim_file: strip flavor/date suffix + # Derive from zim_file: strip only .zim extension, keep full name zf = meta.get('zim_file', '') - zim_name = re.sub(r'_(?:maxi|mini|nopic)_[\d-]+\.zim$', '', zf) + zim_name = zf.removesuffix('.zim') article_path = url_quote(meta['article_path'], safe='/:@!$&()*+,;=-._~') - download_url = f'https://wiki.echo6.co/{zim_name}/{article_path}' + download_url = f'https://wiki.echo6.co/content/{zim_name}/{article_path}' elif doc.get('path'): download_url = generate_download_url( doc['path'], config.get('library_root', '/mnt/library')