diff --git a/.gitignore b/.gitignore index 238cabb..bce13d8 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,7 @@ recon.db # OS .DS_Store + +# Kiwix binary tools (installed from tarball) +bin/ +status.db diff --git a/config.yaml b/config.yaml index 3e185f8..a2709b0 100644 --- a/config.yaml +++ b/config.yaml @@ -413,6 +413,14 @@ peertube: rate_limit_delay: 0.5 # Delay between video ingestions (seconds) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) +scraper: + workspace: /opt/recon/data/scraper # Working directory (tmp dirs for Zimit output) + output_dir: /mnt/kiwix # Finished .zim files land here (kiwix-serve library) + default_language: eng # ISO 639-3 language code for ZIM metadata + poll_interval: 300 # Seconds between checking for pending scrape jobs + docker_image: ghcr.io/openzim/zimit # Zimit Docker image for web crawling + docker_workers: 2 # Concurrent crawl workers inside Zimit container + # Stream B: New Library Pipeline new_pipeline: # Disabled 2026-04-14 for refactor — see refactored-recon repo for context @@ -440,3 +448,7 @@ pipeline: text: text_processor # mtime stability threshold for picking up files from acquired/ mtime_stability_seconds: 10 + # Language filter: skip non-English content before Gemini enrichment + language_filter: true # Enable langdetect-based filtering + allowed_languages: # ISO 639-1 codes allowed through enrichment + - en diff --git a/config/address_book.yaml b/config/address_book.yaml new file mode 100644 index 0000000..24bc81c --- /dev/null +++ b/config/address_book.yaml @@ -0,0 +1,18 @@ +# RECON Address Book — saved locations for navigation shortcuts. +# Entries are matched by name and aliases (case-insensitive). +# Add new entries by appending to the list below. + +entries: + - id: home + name: Home + aliases: + - home + - matt's house + - 214 north st + - 214 north street + address: "214 North St, Filer, ID 83328" + lat: 42.5735833 + lon: -114.6066389 + tags: + - residence + - primary diff --git a/config/profiles/home.yaml b/config/profiles/home.yaml new file mode 100644 index 0000000..de704d9 --- /dev/null +++ b/config/profiles/home.yaml @@ -0,0 +1,67 @@ +# Deployment profile: Home (VM 1130) +# Active on the main Echo6 deployment. Full stack with planet-scale NA tiles. +# Override via RECON_PROFILE env var in /etc/systemd/system/recon.service + +profile: home +region_name: "North America" + +tileset: + url: "/tiles/planet/current.pmtiles" + bounds: [-168, 14, -52, 72] + max_zoom: 15 + attribution: "Protomaps © OSM" + +tileset_hillshade: + url: "/tiles/planet-dem.pmtiles" + encoding: "terrarium" + max_zoom: 12 + +traffic: + provider: "tomtom" + proxy_url: "/api/traffic/flow/{z}/{x}/{y}.png" + +place_details: + local_source: "nominatim" + local_bbox: [-125.0, 31.3, -104.0, 49.0] + fallback_source: "overpass" + +services: + geocode: "/api/geocode" + reverse: "/api/reverse" + address_book: "/api/address_book" + valhalla: "/valhalla" + +auth: + login_url: "/outpost.goauthentik.io/start?rd=%2F" + logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/" + +features: + has_nominatim_details: true + has_kiwix_wiki: true + has_hillshade: true + has_3d_terrain: false + has_traffic_overlay: true + has_landclass: true + has_public_lands_layer: true + has_contours: true + has_contours_test: false + has_contours_test_10ft: false + has_address_book_write: false + has_overture_enrichment: true + has_google_places_enrichment: true + has_contacts: true + has_wiki_rewriting: true + has_wiki_discovery: false + has_usfs_trails: true + has_blm_trails: true + +defaults: + center: [42.5736, -114.6066] + zoom: 10 + +# Offroute wilderness routing +offroute: + osm_pbf_path: "/mnt/nav/sources/idaho-latest.osm.pbf" + densify_interval_m: 100 + postgis_dsn: "dbname=padus" + diff --git a/config/profiles/minimal_pi.yaml b/config/profiles/minimal_pi.yaml new file mode 100644 index 0000000..c2fd90a --- /dev/null +++ b/config/profiles/minimal_pi.yaml @@ -0,0 +1,51 @@ +# Deployment profile: Minimal Pi (single-state pocket deployment) +# Template for the lightest possible field kit — Idaho only. +# Override via RECON_PROFILE env var. + +profile: minimal_pi +region_name: "Idaho" + +tileset: + url: "/tiles/idaho.pmtiles" + bounds: [-117.5, 42.0, -111.0, 49.0] + max_zoom: 15 + attribution: "Protomaps © OSM" + +tileset_hillshade: + url: "/tiles/hillshade-idaho.pmtiles" + encoding: "terrarium" + max_zoom: 12 + +traffic: + provider: "tomtom" + proxy_url: "/api/traffic/flow/{z}/{x}/{y}.png" + +services: + geocode: "/api/geocode" + reverse: "/api/reverse" + address_book: "/api/address_book" + valhalla: "/valhalla" + +# TODO(matt): confirm logout next= host for this profile +auth: + login_url: "/outpost.goauthentik.io/start?rd=%2F" + logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/" + +features: + has_nominatim_details: false + has_kiwix_wiki: false + has_hillshade: false + has_3d_terrain: false + has_traffic_overlay: false + has_landclass: false + has_public_lands_layer: false + has_address_book_write: true + has_overture_enrichment: false + has_google_places_enrichment: false + has_contacts: false + has_wiki_rewriting: false + has_wiki_discovery: false + +defaults: + center: [44.0, -114.0] + zoom: 7 diff --git a/config/profiles/regional_pi.yaml b/config/profiles/regional_pi.yaml new file mode 100644 index 0000000..b6f2cad --- /dev/null +++ b/config/profiles/regional_pi.yaml @@ -0,0 +1,59 @@ +# Deployment profile: Regional Pi (multi-state field kit) +# Template for a Raspberry Pi covering Idaho + surrounding states. +# Override via RECON_PROFILE env var. + +profile: regional_pi +region_name: "Idaho + Neighbors" + +tileset: + url: "/tiles/regional.pmtiles" + bounds: [-125, 40, -104, 49] + max_zoom: 15 + attribution: "Protomaps © OSM" + +tileset_hillshade: + url: "/tiles/hillshade-regional.pmtiles" + encoding: "terrarium" + max_zoom: 12 + +traffic: + provider: "tomtom" + proxy_url: "/api/traffic/flow/{z}/{x}/{y}.png" + +place_details: + local_source: "nominatim" + local_bbox: [-125.0, 40.0, -104.0, 49.0] + fallback_source: "overpass" + +services: + geocode: "/api/geocode" + reverse: "/api/reverse" + address_book: "/api/address_book" + valhalla: "/valhalla" + +# TODO(matt): confirm logout next= host for this profile +auth: + login_url: "/outpost.goauthentik.io/start?rd=%2F" + logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/" + +features: + has_nominatim_details: true + has_kiwix_wiki: false + has_hillshade: true + has_3d_terrain: false + has_traffic_overlay: true + has_landclass: true + has_public_lands_layer: true + has_contours: true + has_contours_test: true + has_contours_test_10ft: true + has_address_book_write: true + has_overture_enrichment: false + has_google_places_enrichment: false + has_contacts: false + has_wiki_rewriting: true + has_wiki_discovery: false + +defaults: + center: [44.0, -114.0] + zoom: 7 diff --git a/lib/api.py b/lib/api.py index 757ebf4..a0697bf 100644 --- a/lib/api.py +++ b/lib/api.py @@ -35,12 +35,33 @@ _cache = { 'qdrant_scroll': None, 'qdrant_scroll_ts': 0, 'quick_stats': None, + 'kiwix_sources': None, } app = Flask(__name__, template_folder=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'templates'), static_folder=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'static')) +app.config['MAX_CONTENT_LENGTH'] = None # ZIM files can be multi-GB + + +# ── Large ZIM upload support ── +# Override stream factory so ZIM uploads write directly to /mnt/kiwix/ +# instead of /tmp (which is on the 96GB root disk and can't hold 100GB+ ZIMs). +from flask import Request as _FlaskRequest + +class _LargeZimRequest(_FlaskRequest): + def _get_file_stream(self, total_content_length, content_type, filename=None, content_length=None): + if filename and filename.lower().endswith('.zim'): + return tempfile.NamedTemporaryFile('wb+', dir='/mnt/kiwix', prefix='.upload_', suffix='.tmp', delete=False) + return super()._get_file_stream(total_content_length, content_type, filename, content_length) + +app.request_class = _LargeZimRequest +# ── Netsyms Blueprint ── +from .netsyms_api import netsyms_bp +app.register_blueprint(netsyms_bp) + + # ── Navigation Constants ── KNOWLEDGE_SUBNAV = [ @@ -56,6 +77,11 @@ PEERTUBE_SUBNAV = [ {'href': '/peertube/channels', 'label': 'Channels'}, ] + +KIWIX_SUBNAV = [ + {'href': '/kiwix', 'label': 'Library'}, + {'href': '/kiwix/scraper', 'label': 'Scraper'}, +] SETTINGS_SUBNAV = [ {'href': '/settings/keys', 'label': 'API Keys'}, {'href': '/settings/cookies', 'label': 'YouTube Cookies'}, @@ -908,6 +934,7 @@ def _build_knowledge_stats(): c.source, CASE WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN c.source = 'kiwix' THEN 'wiki' WHEN c.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type, @@ -967,6 +994,7 @@ def _build_knowledge_stats(): d.status, d.concepts_extracted, d.vectors_inserted, CASE WHEN c.source = 'stream.echo6.co' THEN 'transcript' + WHEN c.source = 'kiwix' THEN 'wiki' WHEN d.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type @@ -1072,6 +1100,12 @@ def start_cache_warmer(stop_event=None): except Exception as e: logger.warning(f" Quick stats warm-up failed: {e}") + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + logger.info(" Kiwix sources cached") + except Exception as e: + logger.warning(f" Kiwix sources warm-up failed: {e}") + logger.info("Cache warmer ready — all data pre-loaded") # Continuous refresh loop @@ -1098,6 +1132,10 @@ def start_cache_warmer(stop_event=None): _cache['quick_stats'] = _build_quick_stats() except Exception: pass + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass # PeerTube dashboard: every 30s (cycle 2, offset) if cycle % 2 == 1: @@ -1281,6 +1319,9 @@ def api_keys_reload(): return jsonify({'count': count}) + + + # ── YouTube Cookie Management ── PEERTUBE_HOST = '192.168.1.170' @@ -1930,6 +1971,528 @@ def api_peertube_dashboard(): return jsonify(_cache['pt_dashboard']) + +# ── Kiwix Dashboard ── + +@app.route('/kiwix') +def kiwix_dashboard(): + return render_template('kiwix/dashboard.html', + domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix') + + +@app.route('/kiwix/scraper') +def kiwix_scraper(): + return render_template('kiwix/scraper.html', + domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix/scraper') + + +@app.route('/api/kiwix/sources') +def api_kiwix_sources(): + """Serve pre-cached Kiwix sources data (never blocks).""" + if _cache['kiwix_sources'] is None: + return jsonify({'error': 'Warming up, try again in a few seconds'}), 503 + return jsonify(_cache['kiwix_sources']) + + +@app.route('/api/kiwix/toggle-ingest/', methods=['POST']) +def api_kiwix_toggle_ingest(source_id): + """Toggle ingest_enabled on a ZIM source.""" + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT id, status, ingest_enabled FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + data = request.get_json(silent=True) or {} + new_val = 1 if data.get('enabled', not row['ingest_enabled']) else 0 + conn.execute("UPDATE zim_sources SET ingest_enabled = ? WHERE id = ?", (new_val, source_id)) + conn.commit() + + # If toggling ON and source is eligible, spawn ingest in background + if new_val == 1 and row['status'] == 'detected': + _spawn_zim_ingest(source_id) + + return jsonify({'ok': True, 'ingest_enabled': new_val}) + + +@app.route('/api/kiwix/trigger-ingest/', methods=['POST']) +def api_kiwix_trigger_ingest(source_id): + """Explicit one-shot ingest trigger.""" + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT id FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + _spawn_zim_ingest(source_id) + return jsonify({'ok': True}) + + +@app.route('/api/kiwix/upload', methods=['POST']) +def api_kiwix_upload(): + """Accept ZIM file upload, register with kiwix-serve, scan.""" + import subprocess + if 'file' not in request.files: + return jsonify({'error': 'No file provided'}), 400 + + f = request.files['file'] + if not f.filename or not f.filename.endswith('.zim'): + return jsonify({'error': 'File must be a .zim file'}), 400 + + filename = secure_filename(f.filename) + dest = os.path.join('/mnt/kiwix', filename) + + try: + # Stream was written directly to /mnt/kiwix/ by _LargeZimRequest — + # rename in-place instead of copying 100GB+ through f.save() + if hasattr(f.stream, 'name') and f.stream.name: + tmp_path = f.stream.name + f.stream.close() + os.rename(tmp_path, dest) + else: + tmp_dest = dest + '.tmp' + f.save(tmp_dest) + os.rename(tmp_dest, dest) + except Exception as e: + # Clean up any temp files on failure + for p in [locals().get('tmp_path', ''), locals().get('tmp_dest', '')]: + if p and os.path.exists(p): + os.remove(p) + return jsonify({'error': f'Save failed: {e}'}), 500 + + # Register with kiwix-serve library + try: + subprocess.run( + ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'add', dest], + capture_output=True, text=True, timeout=30 + ) + except Exception as e: + logger.warning(f"kiwix-manage add failed: {e}") + + # Scan for new entry (retry — monitorLibrary may need a moment to reload) + import time as _time + from .zim_monitor import scan_zims + for attempt in range(3): + try: + scan_zims() + break + except Exception as e: + logger.warning(f"scan_zims attempt {attempt+1} failed: {e}") + _time.sleep(2) + + # Refresh cache + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + + return jsonify({'ok': True, 'filename': filename}) + + + +def _full_zim_cleanup(source_id): + """Full ZIM cleanup: Qdrant vectors, DB records, kiwix-manage, SIGHUP, file delete. + Returns dict with results. Caller handles cache refresh.""" + import subprocess + import signal + import requests as req + + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return None + + zim_source = dict(row) + zim_filename = zim_source['zim_filename'] + zim_path = zim_source['zim_path'] + zim_title = zim_source.get('title', zim_filename) + results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False, 'scrape_jobs_deleted': 0} + + # Step 1: Find all document hashes for this ZIM source + doc_hashes = [r['hash'] for r in conn.execute( + "SELECT c.hash FROM catalogue c WHERE c.source = 'kiwix' AND c.category = ?", + (zim_title,) + ).fetchall()] + + # Step 2: Delete vectors from Qdrant + if doc_hashes: + config = get_config() + qdrant_host = config.get('vector_db', {}).get('host', '100.64.0.14') + qdrant_port = config.get('vector_db', {}).get('port', 6333) + collection = config.get('vector_db', {}).get('collection', 'recon_knowledge') + + # Delete in batches of 100 hashes + for i in range(0, len(doc_hashes), 100): + batch = doc_hashes[i:i+100] + try: + resp = req.post( + f"http://{qdrant_host}:{qdrant_port}/collections/{collection}/points/delete", + json={ + "filter": { + "must": [{ + "key": "doc_hash", + "match": {"any": batch} + }] + } + }, + timeout=30 + ) + if resp.status_code == 200: + results['vectors_deleted'] += len(batch) + except Exception as e: + logger.warning(f"Qdrant delete batch failed: {e}") + + # Step 3: Delete DB records + for h in doc_hashes: + # Delete processing directory if it exists + text_dir_row = conn.execute("SELECT text_dir FROM documents WHERE hash = ?", (h,)).fetchone() + if text_dir_row and text_dir_row['text_dir']: + try: + import shutil + shutil.rmtree(text_dir_row['text_dir'], ignore_errors=True) + except Exception: + pass + conn.execute("DELETE FROM documents WHERE hash = ?", (h,)) + conn.execute("DELETE FROM catalogue WHERE hash = ?", (h,)) + results['docs_deleted'] = len(doc_hashes) + + # Delete zim_articles records + conn.execute("DELETE FROM zim_articles WHERE zim_source_id = ?", (source_id,)) + + # Delete zim_sources record + conn.execute("DELETE FROM zim_sources WHERE id = ?", (source_id,)) + conn.commit() + + # Step 4: Remove from kiwix-serve library + try: + subprocess.run( + ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', zim_filename.replace('.zim', '')], + capture_output=True, text=True, timeout=10 + ) + except Exception as e: + logger.warning(f"kiwix-manage remove failed: {e}") + + # Step 4b: SIGHUP kiwix-serve to reload library + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Failed to signal kiwix-serve: {e}") + + # Step 5: Delete the ZIM file + if os.path.isfile(zim_path): + try: + os.remove(zim_path) + results['file_deleted'] = True + except Exception as e: + logger.warning(f"ZIM file delete failed: {e}") + results['file_deleted'] = False + + # Step 6: Delete any linked scrape_jobs rows + try: + res = conn.execute("DELETE FROM scrape_jobs WHERE zim_source_id = ?", (source_id,)) + conn.commit() + results['scrape_jobs_deleted'] = res.rowcount + except Exception as e: + logger.warning(f"scrape_jobs cleanup failed: {e}") + + logger.info(f"Full ZIM cleanup for source {source_id} ('{zim_title}'): {results}") + return results + + +@app.route('/api/kiwix/remove/', methods=['POST']) +def api_kiwix_remove(source_id): + """Remove a ZIM source: delete vectors, DB records, library entry, and file.""" + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + results = _full_zim_cleanup(source_id) + if results is None: + return jsonify({'error': 'Source not found during cleanup'}), 404 + + # Refresh cache + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + + return jsonify({'ok': True, 'results': results}) + + +def _spawn_zim_ingest(source_id): + """Start ZIM ingestion in a background thread.""" + def _run(): + try: + from .processors.zim_processor import ingest_zim + config = get_config() + db = StatusDB() + logger.info(f"Starting ZIM ingest for source {source_id}") + result = ingest_zim(source_id, db, config) + logger.info(f"ZIM ingest complete for source {source_id}: {result}") + # Refresh cache after completion + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + except Exception as e: + logger.error(f"ZIM ingest failed for source {source_id}: {e}") + + t = threading.Thread(target=_run, daemon=True, name=f'zim-ingest-{source_id}') + t.start() + + +def _build_kiwix_sources(): + """Build Kiwix sources data for the dashboard cache.""" + import urllib.request + + db = StatusDB() + conn = db._get_conn() + + # Get all ZIM sources + rows = conn.execute(""" + SELECT id, zim_filename, title, description, language, category, + article_count, status, processed_count, skipped_count, error_count, + ingest_enabled, detected_at, started_at, completed_at + FROM zim_sources + ORDER BY detected_at DESC + """).fetchall() + + sources = [] + total_articles = 0 + total_processed = 0 + total_in_pipeline = 0 + + for r in rows: + source = dict(r) + zim_title = r['title'] or r['zim_filename'] + total_articles += r['article_count'] or 0 + total_processed += r['processed_count'] or 0 + + # Get pipeline stats for THIS source's documents (filtered by category) + pipeline = {} + try: + pipe_rows = conn.execute(""" + SELECT d.status, COUNT(*) as cnt + FROM documents d + JOIN catalogue c ON d.hash = c.hash + WHERE c.source = 'kiwix' AND c.category = ? + GROUP BY d.status + """, (zim_title,)).fetchall() + for pr in pipe_rows: + pipeline[pr['status']] = pr['cnt'] + except Exception: + pass + + in_pipe = sum(v for k, v in pipeline.items() if k not in ('complete', 'failed')) + total_in_pipeline += in_pipe + source['pipeline'] = pipeline + + # Compute effective status reflecting full pipeline state + db_status = r['status'] + if db_status == 'complete' and pipeline: + if in_pipe > 0: + source['effective_status'] = 'processing' + else: + source['effective_status'] = 'complete' + elif db_status == 'ingesting': + source['effective_status'] = 'extracting' + else: + source['effective_status'] = db_status # 'detected' + + sources.append(source) + + # Check kiwix-serve health + kiwix_status = 'inactive' + try: + resp = urllib.request.urlopen("http://localhost:8430", timeout=3) + if resp.status == 200: + kiwix_status = 'active' + except Exception: + pass + + return { + 'sources': sources, + 'kiwix_serve': {'status': kiwix_status, 'url': 'https://wiki.echo6.co'}, + 'totals': { + 'sources': len(sources), + 'articles': total_articles, + 'processed': total_processed, + 'in_pipeline': total_in_pipeline, + } + } + + + + +# ── Scraper API ── + +@app.route('/api/scraper/submit', methods=['POST']) +def api_scraper_submit(): + """Submit a new web scrape job.""" + data = request.get_json(silent=True) or {} + url = (data.get('url') or '').strip() + + if not url: + return jsonify({'error': 'url is required'}), 400 + if not url.startswith(('http://', 'https://')): + return jsonify({'error': 'URL must start with http:// or https://'}), 400 + + config = get_config() + scraper_cfg = config.get('scraper', {}) + language = data.get('language') or scraper_cfg.get('default_language', 'eng') + title = data.get('title', '').strip() or None + category = data.get('category', '').strip() or None + + db = StatusDB() + conn = db._get_conn() + conn.execute( + "INSERT INTO scrape_jobs (url, title, language, category, crawl_mode) VALUES (?, ?, ?, ?, ?)", + (url, title, language, category, 'zimit') + ) + conn.commit() + job_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + + logger.info(f"Scraper job {job_id} submitted: {url}") + return jsonify({'ok': True, 'job_id': job_id}), 201 + + +@app.route('/api/scraper/jobs') +def api_scraper_jobs(): + """List scrape jobs, optionally filtered by status.""" + status_filter = request.args.get('status') + db = StatusDB() + jobs = db.get_scrape_jobs(status=status_filter) + return jsonify({'jobs': jobs}) + + +@app.route('/api/scraper/cancel/', methods=['POST']) +def api_scraper_cancel(job_id): + """Cancel a scrape job.""" + + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] in ('complete', 'cancelled'): + return jsonify({'error': f"Job already {job['status']}"}), 400 + + # Set cancelled in DB — the runner loop checks this between phases + db.update_scrape_job(job_id, status='cancelled') + + # Stop the Docker container if running + container_name = f'recon-scraper-{job_id}' + try: + import subprocess as _subprocess + _subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + except Exception: + pass + + logger.info(f"Scraper job {job_id} cancelled") + return jsonify({'ok': True}) + + +@app.route('/api/scraper/retry/', methods=['POST']) +def api_scraper_retry(job_id): + """Retry a failed or cancelled scrape job.""" + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] not in ('failed', 'cancelled'): + return jsonify({'error': f"Job status is '{job['status']}', can only retry failed or cancelled jobs"}), 400 + + db.update_scrape_job(job_id, + status='pending', + error_message=None, + subprocess_pid=None, + crawl_mode=None, + started_at=None, + completed_at=None) + + logger.info(f"Scraper job {job_id} reset to pending for retry") + return jsonify({'ok': True}) + + +@app.route('/api/scraper/delete/', methods=['POST']) +def api_scraper_delete(job_id): + """Delete a scrape job and clean up any associated ZIM artifacts.""" + import subprocess + import signal + + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] == 'running': + return jsonify({'error': 'Cannot delete a running job \u2014 cancel it first'}), 400 + + zim_cleanup_results = None + + # If the job has a linked zim_source, do full cleanup + if job.get('zim_source_id'): + zim_cleanup_results = _full_zim_cleanup(job['zim_source_id']) + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + elif job.get('zim_filename'): + # No zim_source row, but there may be an orphan file + library entry + zim_path = os.path.join('/mnt/kiwix', job['zim_filename']) + if os.path.isfile(zim_path): + try: + os.remove(zim_path) + logger.info(f"Deleted orphan ZIM file: {zim_path}") + except Exception as e: + logger.warning(f"Failed to delete orphan ZIM file {zim_path}: {e}") + try: + subprocess.run( + ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', + job['zim_filename'].replace('.zim', '')], + capture_output=True, text=True, timeout=10 + ) + except Exception as e: + logger.warning(f"kiwix-manage remove failed for orphan: {e}") + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Failed to signal kiwix-serve: {e}") + + # Delete the scrape_jobs row (may already be gone if _full_zim_cleanup deleted it) + conn = db._get_conn() + conn.execute("DELETE FROM scrape_jobs WHERE id = ?", (job_id,)) + conn.commit() + + logger.info(f"Scraper job {job_id} deleted (zim_cleanup={zim_cleanup_results})") + return jsonify({'ok': True, 'zim_cleanup': zim_cleanup_results}) + + +@app.route('/api/scraper/clear-failed', methods=['POST']) +def api_scraper_clear_failed(): + """Delete all failed and cancelled scrape jobs.""" + db = StatusDB() + conn = db._get_conn() + result = conn.execute("DELETE FROM scrape_jobs WHERE status IN ('failed', 'cancelled')") + conn.commit() + count = result.rowcount + logger.info(f"Cleared {count} failed/cancelled scraper jobs") + return jsonify({'ok': True, 'deleted': count}) + + # ── Metrics API ── @app.route('/api/metrics/history') diff --git a/lib/aurora_nav_tool.py b/lib/aurora_nav_tool.py new file mode 100644 index 0000000..2b7285d --- /dev/null +++ b/lib/aurora_nav_tool.py @@ -0,0 +1,117 @@ +""" +title: Navigation +author: Echo6 +version: 1.1.0 +description: Turn-by-turn directions and geocoding via Photon + Valhalla on recon-vm. Supports driving, walking, cycling, and truck routing with worldwide coverage (281M places). +""" + +import re +import json +import requests +from pydantic import BaseModel, Field + +_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$') + + +class Tools: + class Valves(BaseModel): + photon_url: str = Field( + default="http://100.64.0.24:2322", + description="Photon geocoding service URL (recon-vm)", + ) + valhalla_url: str = Field( + default="http://100.64.0.24:8002", + description="Valhalla routing service URL (recon-vm)", + ) + + def __init__(self): + self.valves = self.Valves() + + def _geocode(self, query: str): + m = _COORD_RE.match(query.strip()) + if m: + lat, lon = float(m.group(1)), float(m.group(2)) + return lat, lon, query + resp = requests.get( + f"{self.valves.photon_url}/api", + params={"q": query, "limit": 1}, + timeout=10, + ) + resp.raise_for_status() + features = resp.json().get("features", []) + if not features: + return None, None, None + props = features[0]["properties"] + coords = features[0]["geometry"]["coordinates"] + parts = [props.get("name", "")] + for key in ("city", "state", "country"): + v = props.get(key) + if v and v != parts[-1]: + parts.append(v) + return coords[1], coords[0], ", ".join(p for p in parts if p) + + def get_directions( + self, + origin: str, + destination: str, + mode: str = "auto", + ) -> str: + """ + Get turn-by-turn directions between two locations. When this tool returns results, present the directions exactly as returned — do not summarize or rephrase. Include all steps. + + :param origin: Starting location — address, place name, or lat,lon coordinates + :param destination: Destination — address, place name, or lat,lon coordinates + :param mode: Travel mode: auto, pedestrian, bicycle, or truck (default: auto) + :return: Formatted turn-by-turn directions + """ + if mode not in ("auto", "pedestrian", "bicycle", "truck"): + mode = "auto" + + orig_lat, orig_lon, orig_name = self._geocode(origin) + if orig_lat is None: + return f"Could not find location: {origin}" + + dest_lat, dest_lon, dest_name = self._geocode(destination) + if dest_lat is None: + return f"Could not find location: {destination}" + + try: + resp = requests.post( + f"{self.valves.valhalla_url}/route", + json={ + "locations": [ + {"lat": orig_lat, "lon": orig_lon}, + {"lat": dest_lat, "lon": dest_lon}, + ], + "costing": mode, + "directions_options": {"units": "miles"}, + }, + timeout=30, + ) + except requests.RequestException: + return "Navigation service unavailable" + + if resp.status_code != 200: + return "No route found between locations" + + trip = resp.json()["trip"] + summary = trip["summary"] + legs = trip["legs"][0]["maneuvers"] + + miles = round(summary["length"], 1) + minutes = round(summary["time"] / 60, 1) + + lines = [ + f"Directions from {orig_name} to {dest_name} ({mode}):", + f"Distance: {miles} miles | Time: {minutes} minutes", + "", + ] + for i, m in enumerate(legs, 1): + inst = m["instruction"] + dist = m.get("length", 0) + if dist > 0: + lines.append(f"{i}. {inst} — {round(dist, 1)} mi") + else: + lines.append(f"{i}. {inst}") + + return "\n".join(lines) diff --git a/lib/auth.py b/lib/auth.py new file mode 100644 index 0000000..22b08d2 --- /dev/null +++ b/lib/auth.py @@ -0,0 +1,22 @@ +""" +RECON Auth Helper — extract user identity from Authentik forward-auth headers. +""" +from functools import wraps +from flask import request, jsonify + + +def get_user_id(): + """Return X-Authentik-Username or None.""" + return request.headers.get('X-Authentik-Username') + + +def require_auth(f): + """Decorator: 401 if no Authentik auth header.""" + @wraps(f) + def wrapper(*args, **kwargs): + user_id = get_user_id() + if not user_id: + return jsonify({'error': 'Authentication required'}), 401 + request.user_id = user_id + return f(*args, **kwargs) + return wrapper diff --git a/lib/deployment_config.py b/lib/deployment_config.py new file mode 100644 index 0000000..ab6aa17 --- /dev/null +++ b/lib/deployment_config.py @@ -0,0 +1,54 @@ +""" +Deployment profile loader. + +Reads RECON_PROFILE env var (default: "home"), loads the matching YAML +from config/profiles/.yaml, and caches the parsed dict in memory. + +Exposes get_deployment_config() as the in-process accessor for the profile. + +Note: its former consumers (the /api/landclass gate, google_places, +place_detail, offroute/router) were all extracted to navi-* services or removed +across cleanups #4–#6/#27 — recon has no remaining caller of +get_deployment_config() today; the module is retained per cleanup #1. +(The former /api/config HTTP endpoint that served this dict to the frontend was +removed once navi-config (:8422) took over that route.) +""" +import os +import yaml +from .utils import setup_logging + +logger = setup_logging('recon.deployment_config') + +_config_cache = None + + +def load_deployment_config(): + """Load and cache the deployment profile. Called once at import time.""" + global _config_cache + + profile = os.environ.get('RECON_PROFILE', 'home') + config_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config', 'profiles') + config_path = os.path.join(config_dir, f'{profile}.yaml') + + if not os.path.exists(config_path): + raise FileNotFoundError( + f"Deployment profile '{profile}' not found at {config_path}. " + f"Available profiles: {', '.join(f.replace('.yaml','') for f in os.listdir(config_dir) if f.endswith('.yaml'))}" + ) + + with open(config_path, 'r') as f: + _config_cache = yaml.safe_load(f) + + logger.info(f"Loaded deployment profile: {profile} ({_config_cache.get('region_name', 'unknown')})") + return _config_cache + + +def get_deployment_config(): + """Return the cached deployment config dict.""" + if _config_cache is None: + load_deployment_config() + return _config_cache + + +# Load on import so startup fails fast if profile is missing +load_deployment_config() diff --git a/lib/embedder.py b/lib/embedder.py index 35fcb58..8dcc45a 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -10,6 +10,7 @@ Dependencies: requests, qdrant-client Config: embedding, vector_db, processing.embed_workers """ import json +import re import os import time import traceback @@ -290,7 +291,17 @@ def embed_single(file_hash, db, config): page_timestamps = meta['page_timestamps'] except Exception: pass - if doc.get('path'): + # For ZIM articles, build wiki.echo6.co URL from meta.json + if source_type == 'zim' and meta.get('article_path'): + from urllib.parse import quote as url_quote + zim_name = meta.get('zim_name', '') + if not zim_name: + # Derive from zim_file: strip only .zim extension, keep full name + zf = meta.get('zim_file', '') + zim_name = zf.removesuffix('.zim') + article_path = url_quote(meta['article_path'], safe='/:@!$&()*+,;=-._~') + download_url = f'https://wiki.echo6.co/content/{zim_name}/{article_path}' + elif doc.get('path'): download_url = generate_download_url( doc['path'], config.get('library_root', '/mnt/library') ) diff --git a/lib/enricher.py b/lib/enricher.py index d9540aa..e1e583c 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -27,6 +27,15 @@ from .utils import get_config, setup_logging from .status import StatusDB from .utils import resolve_text_dir +try: + from langdetect import detect as _detect_lang + from langdetect.lang_detect_exception import LangDetectException + _HAS_LANGDETECT = True +except ImportError: + _HAS_LANGDETECT = False + +ALLOWED_LANGUAGES = {'en'} # Default: English only + logger = setup_logging('recon.enricher') # Docs stuck in "enriching" longer than this get reset to "extracted" for retry @@ -341,6 +350,42 @@ def validate_and_fix_concepts(concepts, key, config): return concepts +def _check_language(text_dir, config): + """Check language of document text. Returns (is_allowed, detected_lang). + + Reads first 1000 chars from first page file and uses langdetect. + Returns (True, lang) if language is allowed, (False, lang) if not. + Falls back to (True, 'unknown') if detection fails (benefit of the doubt). + """ + if not _HAS_LANGDETECT: + return True, 'unknown' + + # Check if language filter is enabled in config + pipeline_cfg = config.get('pipeline', {}) + if not pipeline_cfg.get('language_filter', True): + return True, 'disabled' + + allowed = set(pipeline_cfg.get('allowed_languages', ['en'])) + + # Read first page for detection + page_files = sorted([f for f in os.listdir(text_dir) + if f.startswith('page_') and f.endswith('.txt')]) + if not page_files: + return True, 'no_pages' + + try: + with open(os.path.join(text_dir, page_files[0]), encoding='utf-8') as f: + sample = f.read(1500) + if len(sample.strip()) < 50: + return True, 'too_short' + lang = _detect_lang(sample) + return (lang in allowed), lang + except LangDetectException: + return True, 'detection_failed' + except Exception: + return True, 'error' + + def enrich_single(file_hash, db, config, key_rotator): doc = db.get_document(file_hash) if not doc: @@ -359,6 +404,14 @@ def enrich_single(file_hash, db, config, key_rotator): db.mark_failed(file_hash, f"Text directory not found: {text_dir}") return False + # Language gate: skip non-English documents before burning Gemini quota + lang_ok, detected_lang = _check_language(text_dir, config) + if not lang_ok: + logger.info(f"Skipping {file_hash[:12]}... detected language '{detected_lang}' " + f"(allowed: {config.get('pipeline', {}).get('allowed_languages', ['en'])})") + db.mark_failed(file_hash, f"Language filter: detected '{detected_lang}', not in allowed list") + return False + db.update_status(file_hash, 'enriching') try: diff --git a/lib/extractor.py b/lib/extractor.py index 13159c9..bc236ab 100644 --- a/lib/extractor.py +++ b/lib/extractor.py @@ -21,6 +21,7 @@ Config: processing.extract_workers, processing.max_pdf_size_mb, processing.extract_timeout, processing.page_timeout """ import base64 +import re import json import os import random @@ -99,6 +100,40 @@ def _is_transient(error_str): return any(sig in s for sig in transient_signals) +def _text_quality_ok(text, min_length=50): + """Check if extracted text meets quality thresholds. + + Beyond the basic length check, validates: + - Word-boundary ratio: at least 60% of tokens should be real words (2+ alpha chars) + - Concatenation ratio: lowercase-immediately-followed-by-uppercase shouldn't exceed 10% of word count + + Returns True if text passes all checks. + """ + text = text.strip() + if len(text) < min_length: + return False + + words = text.split() + if not words: + return False + + # Word-like ratio: tokens with 2+ alphabetic characters + word_like = sum(1 for w in words if len(re.findall(r'[a-zA-Z]', w)) >= 2) + word_ratio = word_like / len(words) + if word_ratio < 0.60: + return False + + # Concatenation detector: lowercase immediately followed by uppercase + # Filter out common camelCase patterns in code (short tokens) + concat_hits = len(re.findall(r'[a-z][A-Z]', text)) + concat_ratio = concat_hits / len(words) if words else 0 + if concat_ratio > 0.10: + return False + + return True + + + def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30): """Render a single PDF page to PNG bytes using pdftoppm. @@ -224,7 +259,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): # Method 1: pdftotext (poppler) try: result = subprocess.run( - ['pdftotext', '-f', str(page_num_0indexed + 1), + ['pdftotext', '-layout', '-f', str(page_num_0indexed + 1), '-l', str(page_num_0indexed + 1), pdf_path, '-'], capture_output=True, text=True, timeout=page_timeout ) @@ -233,7 +268,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): except Exception: pass - if len(text.strip()) >= 50: + if _text_quality_ok(text): return text, 'pdftotext' # Method 2: pdftoppm + Tesseract OCR @@ -258,7 +293,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): except Exception: pass - if len(text.strip()) >= 50: + if _text_quality_ok(text): return text, 'tesseract' # Method 3: Gemini Vision (last resort) @@ -276,8 +311,26 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): # ── Core extraction functions ── def _pypdf2_extract(reader, page_num): - """Extract text from a PyPDF2 page object. Runs inside a thread for timeout.""" - return reader.pages[page_num].extract_text() or '' + """Extract text from a PyPDF2 page object. Runs inside a thread for timeout. + + Tries default extraction first (space_width=200). If quality check fails, + retries with space_width=100 which better detects word boundaries in + tightly-kerned PDFs (common in Haynes/workshop manuals). + + Note: PyPDF2 3.0.1 does not support layout=True. The space_width parameter + controls word-boundary detection tolerance. Lower values = more aggressive + space insertion between characters. + """ + text = reader.pages[page_num].extract_text() or '' + if _text_quality_ok(text): + return text + + # Retry with tighter word-boundary detection + text_tight = reader.pages[page_num].extract_text(space_width=100.0) or '' + if len(text_tight.strip()) >= len(text.strip()): + return text_tight + + return text def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): @@ -302,13 +355,13 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): except Exception: text = '' - if len(text.strip()) >= 50: + if _text_quality_ok(text): return text, 'pypdf2' # Method 2: pdftotext via subprocess (inherently timeout-safe) try: result = subprocess.run( - ['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'], + ['pdftotext', '-layout', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'], capture_output=True, text=True, timeout=page_timeout ) if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()): @@ -316,7 +369,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): except Exception: pass - if len(text.strip()) >= 50: + if _text_quality_ok(text): return text, 'pdftotext' # Method 3: pdftoppm + Tesseract OCR @@ -340,7 +393,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): except Exception: pass - if len(text.strip()) >= 50: + if _text_quality_ok(text): return text, 'tesseract' # Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs) diff --git a/lib/netsyms.py b/lib/netsyms.py new file mode 100644 index 0000000..d51162e --- /dev/null +++ b/lib/netsyms.py @@ -0,0 +1,228 @@ +""" +RECON Netsyms AddressDatabase2025 — SQLite-backed US+CA address lookup. + +Provides 159.78M geocoded addresses as tier-2 between address book +(exact named locations) and Photon (full-text global geocoding). + +Database: /mnt/nav/addresses/AddressDatabase2025.sqlite (read-only) +""" + +import os +import re +import sqlite3 +import threading + +from .utils import setup_logging + +logger = setup_logging('recon.netsyms') + +_DB_PATH = '/mnt/nav/addresses/AddressDatabase2025.sqlite' + +_conn = None +_lock = threading.Lock() +_cached_row_count = None + +# US states + DC + territories, CA provinces, for free-text parsing +_STATE_CODES = { + 'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', + 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', + 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', + 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', + 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY', + 'DC', 'PR', 'VI', 'GU', 'AS', 'MP', + # Canadian provinces + 'AB', 'BC', 'MB', 'NB', 'NL', 'NS', 'NT', 'NU', 'ON', 'PE', + 'QC', 'SK', 'YT', +} + +_NUMBER_RE = re.compile(r'^(\d+[\w-]*)(.*)$') + + +def _get_conn(): + """Lazy-open a read-only SQLite connection.""" + global _conn + if _conn is not None: + return _conn + with _lock: + if _conn is not None: + return _conn + uri = f'file:{_DB_PATH}?mode=ro' + _conn = sqlite3.connect(uri, uri=True, check_same_thread=False) + _conn.row_factory = sqlite3.Row + logger.info("Netsyms DB opened: %s", _DB_PATH) + return _conn + + +def _row_to_dict(row): + """Convert a sqlite3.Row to a plain dict with lat/lon keys.""" + return { + 'zipcode': row['zipcode'], + 'number': row['number'], + 'street': row['street'], + 'street2': row['street2'], + 'city': row['city'], + 'state': row['state'], + 'plus4': row['plus4'], + 'country': row['country'], + 'lat': float(row['latitude']), + 'lon': float(row['longitude']), + 'source': row['source'], + } + + +def lookup_by_street(number, street, city=None, state=None, + zipcode=None, country=None, limit=20): + """Match on number + street, with optional qualifiers.""" + conn = _get_conn() + clauses = ['number = ?', 'street = ?'] + params = [str(number).strip().upper(), street.strip().upper()] + + if city: + clauses.append('city = ?') + params.append(city.strip().upper()) + if state: + clauses.append('state = ?') + params.append(state.strip().upper()) + if zipcode: + clauses.append('zipcode = ?') + params.append(zipcode.strip()) + if country: + clauses.append('country = ?') + params.append(country.strip().upper()) + + sql = f"SELECT * FROM addresses WHERE {' AND '.join(clauses)} LIMIT ?" + params.append(limit) + + with _lock: + try: + rows = conn.execute(sql, params).fetchall() + except sqlite3.Error as e: + logger.warning("Netsyms lookup_by_street error: %s", e) + return [] + + results = [_row_to_dict(r) for r in rows] + logger.debug("lookup_by_street(%s, %s, city=%s, state=%s) → %d results", + number, street, city, state, len(results)) + return results + + +def lookup_free_text(query, country_hint=None): + """Parse a free-text address and look it up.""" + q = query.strip() + if not q: + return [] + + # Strip trailing zipcode if present + zipcode = None + zip_match = re.search(r'\b(\d{5})\s*$', q) + if zip_match: + zipcode = zip_match.group(1) + q = q[:zip_match.start()].strip().rstrip(',').strip() + + # Strip trailing state + tokens = re.split(r'[,\s]+', q) + tokens = [t for t in tokens if t] + if not tokens: + return [] + + state = None + if len(tokens) >= 2 and tokens[-1].upper() in _STATE_CODES: + state = tokens[-1].upper() + tokens = tokens[:-1] + + # Leading digits → number + number = None + if tokens and re.match(r'^\d', tokens[0]): + number = tokens[0] + tokens = tokens[1:] + + if not tokens: + # Only a number, or empty — try zipcode if we have one + if zipcode: + return lookup_by_zipcode(zipcode, limit=20) + return [] + + # If state was found and we have 2+ tokens remaining, last token is city + city = None + if state and len(tokens) >= 2: + city = tokens[-1] + tokens = tokens[:-1] + + street = ' '.join(tokens) + + if number: + results = lookup_by_street(number, street, city=city, state=state, + zipcode=zipcode, country=country_hint) + if results: + logger.debug("lookup_free_text(%r) → %d results via street match", + query, len(results)) + return results + + # Fallback: try zipcode only if available + if zipcode: + return lookup_by_zipcode(zipcode, limit=20) + + logger.debug("lookup_free_text(%r) → 0 results", query) + return [] + + +def lookup_by_zipcode(zipcode, limit=100): + """Direct zipcode lookup.""" + conn = _get_conn() + sql = "SELECT * FROM addresses WHERE zipcode = ? LIMIT ?" + params = [zipcode.strip(), limit] + + with _lock: + try: + rows = conn.execute(sql, params).fetchall() + except sqlite3.Error as e: + logger.warning("Netsyms lookup_by_zipcode error: %s", e) + return [] + + results = [_row_to_dict(r) for r in rows] + logger.debug("lookup_by_zipcode(%s) → %d results", zipcode, len(results)) + return results + + +def health(): + """Health check with cached row count.""" + global _cached_row_count + + try: + file_size = os.path.getsize(_DB_PATH) + except OSError: + return {'ok': False, 'row_count': 0, 'file_size_bytes': 0, + 'indexed_countries': []} + + try: + conn = _get_conn() + except Exception: + return {'ok': False, 'row_count': 0, 'file_size_bytes': file_size, + 'indexed_countries': []} + + if _cached_row_count is None: + with _lock: + if _cached_row_count is None: + try: + row = conn.execute( + "SELECT COUNT(*) AS cnt FROM addresses" + ).fetchone() + _cached_row_count = row['cnt'] + except sqlite3.Error: + _cached_row_count = 0 + + with _lock: + try: + rows = conn.execute( + "SELECT DISTINCT country FROM addresses" + ).fetchall() + countries = sorted(r['country'] for r in rows) + except sqlite3.Error: + countries = [] + + return { + 'ok': True, + 'row_count': _cached_row_count, + 'file_size_bytes': file_size, + 'indexed_countries': countries, + } diff --git a/lib/netsyms_api.py b/lib/netsyms_api.py new file mode 100644 index 0000000..dbae24e --- /dev/null +++ b/lib/netsyms_api.py @@ -0,0 +1,31 @@ +""" +RECON Netsyms API — Flask Blueprint. + +GET /api/netsyms/lookup?q=&country= +GET /api/netsyms/health +""" + +from flask import Blueprint, request, jsonify + +from . import netsyms +from .utils import setup_logging + +logger = setup_logging('recon.netsyms_api') + +netsyms_bp = Blueprint('netsyms', __name__) + + +@netsyms_bp.route('/api/netsyms/lookup') +def api_netsyms_lookup(): + q = request.args.get('q', '').strip() + if not q: + return jsonify({'error': 'Missing q parameter'}), 400 + + country = request.args.get('country', '').strip() or None + results = netsyms.lookup_free_text(q, country_hint=country) + return jsonify({'results': results, 'count': len(results), 'query': q}) + + +@netsyms_bp.route('/api/netsyms/health') +def api_netsyms_health(): + return jsonify(netsyms.health()) diff --git a/lib/netsyms_test.py b/lib/netsyms_test.py new file mode 100644 index 0000000..ed70472 --- /dev/null +++ b/lib/netsyms_test.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +"""Tests for Netsyms address database module.""" + +import sys +import os + +# Ensure the lib directory is importable +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lib import netsyms + + +def test_lookup_by_street_lowercase(): + results = netsyms.lookup_by_street("214", "North St", city="Filer", state="ID") + assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" + r = results[0] + assert abs(r['lat'] - 42.5736) < 0.01, f"Lat mismatch: {r['lat']}" + assert abs(r['lon'] - (-114.6066)) < 0.01, f"Lon mismatch: {r['lon']}" + print(" PASS: lookup_by_street (lowercase)") + + +def test_lookup_by_street_uppercase(): + results = netsyms.lookup_by_street("214", "NORTH ST", city="FILER", state="ID") + assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" + r = results[0] + assert abs(r['lat'] - 42.5736) < 0.01, f"Lat mismatch: {r['lat']}" + print(" PASS: lookup_by_street (uppercase)") + + +def test_lookup_nonexistent(): + results = netsyms.lookup_by_street("999999", "Nonexistent Rd", + city="Filer", state="ID") + assert results == [], f"Expected empty list, got {len(results)} results" + print(" PASS: lookup_by_street (nonexistent)") + + +def test_free_text_with_commas(): + results = netsyms.lookup_free_text("214 North St, Filer, ID") + assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" + r = results[0] + assert r['city'] == 'FILER', f"City mismatch: {r['city']}" + assert r['state'] == 'ID', f"State mismatch: {r['state']}" + print(" PASS: lookup_free_text (commas)") + + +def test_free_text_no_commas(): + results = netsyms.lookup_free_text("214 North St Filer ID") + assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" + r = results[0] + assert r['state'] == 'ID', f"State mismatch: {r['state']}" + print(" PASS: lookup_free_text (no commas)") + + +def test_lookup_by_zipcode(): + results = netsyms.lookup_by_zipcode("83328", limit=5) + assert len(results) == 5, f"Expected 5 results, got {len(results)}" + for r in results: + assert r['zipcode'] == '83328', f"Zipcode mismatch: {r['zipcode']}" + print(" PASS: lookup_by_zipcode") + + +def test_health(): + h = netsyms.health() + assert h['ok'] is True, f"Health not OK: {h}" + assert h['row_count'] >= 159_000_000, f"Row count too low: {h['row_count']}" + assert 'US' in h['indexed_countries'], f"US not in countries: {h['indexed_countries']}" + assert 'CA' in h['indexed_countries'], f"CA not in countries: {h['indexed_countries']}" + print(" PASS: health") + + +if __name__ == '__main__': + print("Running Netsyms tests...") + test_lookup_by_street_lowercase() + test_lookup_by_street_uppercase() + test_lookup_nonexistent() + test_free_text_with_commas() + test_free_text_no_commas() + test_lookup_by_zipcode() + test_health() + print("All tests passed.") diff --git a/lib/processors/zim_processor.py b/lib/processors/zim_processor.py new file mode 100644 index 0000000..6f5c887 --- /dev/null +++ b/lib/processors/zim_processor.py @@ -0,0 +1,493 @@ +""" +RECON ZIM Processor + +Batch importer for ZIM files. Opens a ZIM via python-libzim, iterates +HTML articles, strips to clean text, creates processing directories, +and registers each article as "extracted" for the enricher to pick up. + +This is NOT a dispatcher-style processor (no pre_flight). ZIMs contain +thousands of articles — ingestion is triggered explicitly or by the +ZIM monitor. + +Usage: + python3 -m lib.processors.zim_processor --zim-source-id 1 + python3 -m lib.processors.zim_processor --zim-source-id 1 --limit 100 --batch-size 50 +""" +import argparse +import hashlib +import json +import logging +import os +import re +import sys +import time + +from lxml import html as lxml_html + +sys.path.insert(0, "/opt/recon") + +from lib.utils import setup_logging, get_config +from lib.status import StatusDB +from lib.web_scraper import chunk_text + +logger = logging.getLogger("recon.processors.zim") + +WORDS_PER_PAGE = 2000 +MIN_TEXT_LENGTH = 200 + +# Elements to strip before text extraction +STRIP_TAGS = {'nav', 'footer', 'script', 'style', 'header', 'aside'} + +# Non-English article path suffix pattern (MediaWiki ZIMs use /XX or /XXX suffixes) +# Matches paths ending in /xx where xx is a 2-3 letter lowercase language code +_LANG_SUFFIX_RE = re.compile(r'/[a-z]{2,3}$') +# Common ISO 639-1/2 language codes to filter (excludes 'en') +_NON_EN_LANGS = { + 'aa','ab','af','ak','am','an','ar','as','av','ay','az', + 'ba','be','bg','bh','bi','bm','bn','bo','br','bs', + 'ca','ce','ch','co','cr','cs','cu','cv','cy', + 'da','de','dv','dz', + 'ee','el','eo','es','et','eu', + 'fa','ff','fi','fj','fo','fr','fy', + 'ga','gd','gl','gn','gu','gv', + 'ha','he','hi','ho','hr','ht','hu','hy','hz', + 'ia','id','ie','ig','ii','ik','io','is','it','iu', + 'ja','jv', + 'ka','kg','ki','kj','kk','kl','km','kn','ko','kr','ks','ku','kv','kw','ky', + 'la','lb','lg','li','ln','lo','lt','lu','lv', + 'mg','mh','mi','mk','ml','mn','mo','mr','ms','mt','my', + 'na','nb','nd','ne','ng','nl','nn','no','nr','nv','ny', + 'oc','oj','om','or','os', + 'pa','pi','pl','ps','pt', + 'qu', + 'rm','rn','ro','ru','rw', + 'sa','sc','sd','se','sg','sh','si','sk','sl','sm','sn','so','sq','sr','ss','st','su','sv','sw', + 'ta','te','tg','th','ti','tk','tl','tn','to','tr','ts','tt','tw','ty', + 'ug','uk','ur','uz', + 've','vi','vo', + 'wa','wo', + 'xh', + 'yi','yo', + 'za','zh','zu', +} + + +def _text_hash(text): + """Compute MD5 hash of text content (matching content_hash style).""" + return hashlib.md5(text.encode('utf-8')).hexdigest() + + +def _flatten_table(table_el): + """Convert a element to pipe-delimited text. + + Each becomes a row with cells joined by ' | '. + Returns the formatted table as a string with blank lines around it. + """ + rows = [] + for tr in table_el.iter('tr'): + cells = [] + for cell in tr: + if cell.tag in ('td', 'th'): + cell_text = (cell.text_content() or '').strip() + # Collapse internal whitespace in each cell + cell_text = re.sub(r'\s+', ' ', cell_text) + if cell_text: + cells.append(cell_text) + if cells: + rows.append(' | '.join(cells)) + if not rows: + return '' + return '\n'.join(rows) + + +def _preprocess_tree(doc): + """Pre-process HTML tree to add delimiters before text_content() flattens it. + + Handles:
,
,
  • ,
    ,
    -- elements that lxml's + text_content() would concatenate without any separators. + """ + from lxml import etree + + # 1. Replace
  • elements with their pipe-delimited text + for table in list(doc.iter('table')): + formatted = _flatten_table(table) + if formatted: + replacement = etree.Element('div') + replacement.text = '\n\n' + formatted + '\n\n' + parent = table.getparent() + if parent is not None: + parent.replace(table, replacement) + else: + table.drop_tree() + + # 2.
    -> inject newline + for br in list(doc.iter('br')): + br.tail = '\n' + (br.tail or '') + + # 3.
  • -> inject newline + "- " prefix + for li in list(doc.iter('li')): + li.text = '- ' + (li.text or '') + li.tail = '\n' + (li.tail or '') + + # 4.
    -> inject newline before + for dt in list(doc.iter('dt')): + dt.tail = '\n' + (dt.tail or '') + + # 5.
    -> inject newline + indent + for dd in list(doc.iter('dd')): + dd.text = ' ' + (dd.text or '') + dd.tail = '\n' + (dd.tail or '') + + +def _html_to_text(html_bytes): + """Convert HTML bytes to clean text via lxml. + + Strips nav, footer, script, style elements. Decodes entities. + Pre-processes tables, lists, and line breaks for proper delimiters. + Normalizes whitespace. + """ + try: + doc = lxml_html.fromstring(html_bytes) + except Exception: + return "" + + # Strip unwanted elements + for tag in STRIP_TAGS: + for el in doc.iter(tag): + el.drop_tree() + + # Pre-process tree: tables -> pipe-delimited, br -> newlines, li -> dashes + _preprocess_tree(doc) + + # Extract text + text = doc.text_content() + + # Normalize whitespace: collapse runs of spaces, normalize newlines + text = re.sub(r'[ \t]+', ' ', text) + text = re.sub(r'\n{3,}', '\n\n', text) + text = text.strip() + + return text + + +def ingest_zim(zim_source_id, db, config, stop_event=None, + batch_size=100, batch_delay=1.0, limit=None): + """Process all articles from a ZIM file registered in zim_sources. + + - Reads zim_path from zim_sources table + - Iterates articles, creates processing dirs, registers in DB + - Checkpoints progress via zim_sources.last_checkpoint + - Respects stop_event for graceful shutdown + - Yields after each batch to avoid monopolizing resources + + Args: + zim_source_id: ID in zim_sources table + db: StatusDB instance + config: RECON config dict + stop_event: threading.Event for graceful shutdown (optional) + batch_size: articles per batch before sleeping + batch_delay: seconds to sleep between batches + limit: max articles to process (None = all) + + Returns: + dict with counts: processed, skipped, duplicates, errors + """ + from libzim.reader import Archive + + conn = db._get_conn() + + # Load ZIM source record + row = conn.execute( + "SELECT * FROM zim_sources WHERE id = ?", (zim_source_id,) + ).fetchone() + if not row: + logger.error("ZIM source ID %d not found", zim_source_id) + return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + + zim_source = dict(row) + zim_path = zim_source['zim_path'] + zim_filename = zim_source['zim_filename'] + zim_title = zim_source.get('title') or zim_filename + + if not os.path.isfile(zim_path): + logger.error("ZIM file not found: %s", zim_path) + return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + + logger.info("Opening ZIM: %s (%s)", zim_title, zim_filename) + zim = Archive(zim_path) + total_entries = zim.entry_count + + # Read checkpoint to resume from + last_checkpoint = zim_source.get('last_checkpoint') + start_idx = 0 + if last_checkpoint: + try: + start_idx = int(last_checkpoint) + logger.info("Resuming from checkpoint: entry %d", start_idx) + except ValueError: + logger.warning("Invalid checkpoint value: %s, starting from 0", last_checkpoint) + + # Update status to ingesting + conn.execute( + "UPDATE zim_sources SET status = 'ingesting', started_at = CURRENT_TIMESTAMP WHERE id = ?", + (zim_source_id,) + ) + conn.commit() + + processing_root = config.get('pipeline', {}).get( + 'processing_root', '/opt/recon/data/processing' + ) + + # Get already-processed article paths for this ZIM source (dedup within ZIM) + existing_paths = set() + for r in conn.execute( + "SELECT article_path FROM zim_articles WHERE zim_source_id = ?", + (zim_source_id,) + ).fetchall(): + existing_paths.add(r['article_path']) + + stats = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + # Track what was already flushed to DB to avoid double-counting + flushed = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} + batch_count = 0 + total_processed_this_run = 0 + last_entry_idx = start_idx + + for entry_idx in range(start_idx, total_entries): + if stop_event and stop_event.is_set(): + logger.info("Stop event set, halting ZIM ingest at entry %d", entry_idx) + break + + if limit and total_processed_this_run >= limit: + logger.info("Reached limit of %d articles", limit) + break + + last_entry_idx = entry_idx + + try: + entry = zim._get_entry_by_id(entry_idx) + except Exception: + continue + + # Skip redirects + if entry.is_redirect: + continue + + try: + item = entry.get_item() + except Exception: + continue + + # Skip non-HTML + if item.mimetype != "text/html": + continue + + article_path = entry.path + article_title = entry.title + + # Skip if already processed in a prior run + if article_path in existing_paths: + continue + + # Skip non-English articles (MediaWiki translation suffix pattern) + lang_match = _LANG_SUFFIX_RE.search(article_path) + if lang_match and lang_match.group(0)[1:] in _NON_EN_LANGS: + stats['skipped'] += 1 + total_processed_this_run += 1 + continue + + # Extract and clean text + try: + html_bytes = bytes(item.content) + clean_text = _html_to_text(html_bytes) + except Exception as e: + logger.debug("HTML extraction failed for %s: %s", article_path, e) + stats['errors'] += 1 + continue + + # Skip stubs + if len(clean_text) < MIN_TEXT_LENGTH: + stats['skipped'] += 1 + continue + + # Compute content hash + file_hash = _text_hash(clean_text) + + # Deduplicate against existing catalogue + cat_row = conn.execute( + "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) + ).fetchone() + if cat_row: + # Record in zim_articles as skipped duplicate + conn.execute( + """INSERT OR IGNORE INTO zim_articles + (zim_source_id, article_path, article_title, status, processed_at) + VALUES (?, ?, ?, 'skipped', CURRENT_TIMESTAMP)""", + (zim_source_id, article_path, article_title) + ) + stats['duplicates'] += 1 + total_processed_this_run += 1 + continue + + # Create processing directory + proc_dir = os.path.join(processing_root, file_hash) + try: + os.makedirs(proc_dir, exist_ok=True) + except Exception as e: + logger.error("Cannot create processing dir %s: %s", proc_dir, e) + stats['errors'] += 1 + continue + + # Split into page files + pages = chunk_text(clean_text, WORDS_PER_PAGE) + for i, page_text in enumerate(pages, start=1): + page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) + with open(page_path, 'w', encoding='utf-8') as f: + f.write(page_text) + + # Write meta.json + meta = { + 'hash': file_hash, + 'filename': article_title + '.html', + 'source_type': 'zim', + 'zim_file': zim_filename, + 'zim_source_id': zim_source_id, + 'article_title': article_title, + 'article_path': article_path, + 'page_count': len(pages), + 'text_length': len(clean_text), + } + with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: + json.dump(meta, f, indent=2) + + # Register in catalogue + db.add_to_catalogue( + file_hash, + article_title + '.html', + zim_path, # source path is the ZIM file + len(clean_text), # size in bytes (text) + 'kiwix', # source + zim_title, # category = ZIM title + ) + + # Queue document + db.queue_document(file_hash) + + # Set text_dir, page_count, book_title on documents row + # Mark organized_at immediately (ZIM articles don't get filed to library) + conn.execute( + "UPDATE documents SET text_dir = ?, page_count = ?, " + "book_title = ?, organized_at = CURRENT_TIMESTAMP " + "WHERE hash = ?", + (proc_dir, len(pages), article_title, file_hash) + ) + + # Update status to extracted + db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) + + # Record in zim_articles + conn.execute( + """INSERT OR IGNORE INTO zim_articles + (zim_source_id, article_path, article_title, status, processed_at) + VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)""", + (zim_source_id, article_path, article_title) + ) + conn.commit() + + stats['processed'] += 1 + total_processed_this_run += 1 + batch_count += 1 + + # Progress logging + total_done = zim_source['processed_count'] + stats['processed'] + article_count = zim_source.get('article_count', 0) + if stats['processed'] % 500 == 0 and article_count > 0: + pct = total_done / article_count * 100 + logger.info( + "ZIM ingest [%s]: %s/%s (%.1f%%)", + zim_title, f"{total_done:,}", f"{article_count:,}", pct + ) + + # Batch checkpoint — flush only the delta since last flush + if batch_count >= batch_size: + delta_p = stats['processed'] - flushed['processed'] + delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) + delta_e = stats['errors'] - flushed['errors'] + conn.execute( + "UPDATE zim_sources SET processed_count = processed_count + ?, " + "skipped_count = skipped_count + ?, error_count = error_count + ?, " + "last_checkpoint = ? WHERE id = ?", + (delta_p, delta_s, delta_e, str(entry_idx + 1), zim_source_id) + ) + conn.commit() + flushed['processed'] = stats['processed'] + flushed['skipped'] = stats['skipped'] + flushed['duplicates'] = stats['duplicates'] + flushed['errors'] = stats['errors'] + + batch_count = 0 + + if batch_delay > 0: + time.sleep(batch_delay) + + # Final checkpoint — flush only the unflushed delta + final_status = 'complete' + if limit and total_processed_this_run >= limit: + final_status = 'ingesting' # not done yet, just hit the limit + + delta_p = stats['processed'] - flushed['processed'] + delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) + delta_e = stats['errors'] - flushed['errors'] + + conn.execute( + "UPDATE zim_sources SET processed_count = processed_count + ?, " + "skipped_count = skipped_count + ?, error_count = error_count + ?, " + "last_checkpoint = ?, status = ?, completed_at = CASE WHEN ? = 'complete' THEN CURRENT_TIMESTAMP ELSE completed_at END " + "WHERE id = ?", + (delta_p, delta_s, delta_e, str(last_entry_idx + 1), + final_status, final_status, zim_source_id) + ) + conn.commit() + + logger.info( + "ZIM ingest [%s] %s: %d processed, %d skipped, %d duplicates, %d errors", + zim_title, final_status, + stats['processed'], stats['skipped'], stats['duplicates'], stats['errors'] + ) + + return stats + + +def main(): + """CLI entry point for standalone ZIM processing.""" + parser = argparse.ArgumentParser(description="RECON ZIM Processor") + parser.add_argument('--zim-source-id', type=int, required=True, + help="ID from zim_sources table") + parser.add_argument('--batch-size', type=int, default=100, + help="Articles per batch (default: 100)") + parser.add_argument('--batch-delay', type=float, default=1.0, + help="Seconds between batches (default: 1.0)") + parser.add_argument('--limit', type=int, default=None, + help="Max articles to process (default: all)") + args = parser.parse_args() + + setup_logging('recon.processors.zim') + + config = get_config() + db = StatusDB(config['paths']['db']) + + stats = ingest_zim( + zim_source_id=args.zim_source_id, + db=db, + config=config, + batch_size=args.batch_size, + batch_delay=args.batch_delay, + limit=args.limit, + ) + + print(f"\nResults: {stats['processed']} processed, {stats['skipped']} skipped, " + f"{stats['duplicates']} duplicates, {stats['errors']} errors") + + +if __name__ == "__main__": + main() diff --git a/lib/query_router.py b/lib/query_router.py new file mode 100644 index 0000000..dda14a2 --- /dev/null +++ b/lib/query_router.py @@ -0,0 +1,161 @@ +"""Semantic query router for Aurora. + +Classifies user queries into routes (nav_route, nav_reverse_geocode, +direct_answer, rag_search) by comparing query embeddings against +pre-computed route centroids from example queries. + +TEI endpoint: http://100.64.0.14:8090/embed (cortex via Tailscale) +""" + +import math +import threading +import requests + +# ── Route examples ──────────────────────────────────────────────────────────── +ROUTE_EXAMPLES = { + "nav_route": [ + "how do I get to Boise", + "directions to Twin Falls", + "how do I get from Buhl to Boise", + "drive from Jerome to Sun Valley", + "route from Boise to McCall", + "what's the fastest way to Sun Valley", + "how far is it to Twin Falls", + "take me to Shoshone", + "navigate to the airport", + "how do I drive to Salt Lake City", + "walking directions to the park", + "bike route to downtown", + ], + "nav_reverse_geocode": [ + "what town is at 42.5, -114.7", + "where am I right now", + "what is at coordinates 43.6, -116.2", + "what location is 42.574, -114.607", + "where is this place 44.0, -114.3", + "what city is near 42.7, -114.5", + "reverse geocode 43.0, -115.0", + "what's at this location 42.9, -114.8", + ], + "direct_answer": [ + "hello", + "hey aurora", + "good morning", + "thanks", + "thank you", + "what's your name", + "who are you", + "tell me a joke", + "how are you", + "hi there", + ], + "rag_search": [ + "what does the survival manual say about water", + "how to purify water in the field", + "how to treat a gunshot wound", + "what is the ranger handbook chapter on patrolling", + "field manual water purification", + "how to build a shelter in the wilderness", + "tactical combat casualty care procedures", + "what does FM 21-76 say about fire starting", + ], +} + +# ── Module-level cache ──────────────────────────────────────────────────────── +_ROUTE_CENTROIDS: dict | None = None +_LOCK = threading.Lock() + + +def _embed_batch(texts: list[str], tei_url: str) -> list[list[float]]: + """Embed a batch of texts via TEI.""" + resp = requests.post(tei_url, json={"inputs": texts}, timeout=30) + resp.raise_for_status() + return resp.json() + + +def _compute_centroid(vectors: list[list[float]]) -> list[float]: + """Element-wise mean of vectors.""" + n = len(vectors) + dim = len(vectors[0]) + centroid = [0.0] * dim + for vec in vectors: + for i in range(dim): + centroid[i] += vec[i] + for i in range(dim): + centroid[i] /= n + return centroid + + +def _cosine_similarity(a: list[float], b: list[float]) -> float: + """Cosine similarity between two vectors (pure Python).""" + dot = 0.0 + norm_a = 0.0 + norm_b = 0.0 + for i in range(len(a)): + dot += a[i] * b[i] + norm_a += a[i] * a[i] + norm_b += b[i] * b[i] + denom = math.sqrt(norm_a) * math.sqrt(norm_b) + if denom == 0: + return 0.0 + return dot / denom + + +def _ensure_centroids(tei_url: str) -> dict[str, list[float]]: + """Lazy-init: embed all examples in one batch, compute centroids, cache.""" + global _ROUTE_CENTROIDS + if _ROUTE_CENTROIDS is not None: + return _ROUTE_CENTROIDS + + with _LOCK: + if _ROUTE_CENTROIDS is not None: + return _ROUTE_CENTROIDS + + # Flatten all examples into one batch + all_texts = [] + route_ranges: dict[str, tuple[int, int]] = {} + offset = 0 + for route, examples in ROUTE_EXAMPLES.items(): + route_ranges[route] = (offset, offset + len(examples)) + all_texts.extend(examples) + offset += len(examples) + + all_vectors = _embed_batch(all_texts, tei_url) + + centroids = {} + for route, (start, end) in route_ranges.items(): + centroids[route] = _compute_centroid(all_vectors[start:end]) + + _ROUTE_CENTROIDS = centroids + return _ROUTE_CENTROIDS + + +def classify( + query: str, + tei_url: str = "http://100.64.0.14:8090/embed", + threshold: float = 0.45, +) -> tuple[str, float]: + """Classify a query into a route. + + Returns (route_name, confidence). If no route exceeds the threshold, + returns ("rag_search", best_score) as the safe default. + """ + centroids = _ensure_centroids(tei_url) + + # Embed the query + vecs = _embed_batch([query], tei_url) + query_vec = vecs[0] + + # Compare against all centroids + best_route = "rag_search" + best_score = 0.0 + for route, centroid in centroids.items(): + sim = _cosine_similarity(query_vec, centroid) + if sim > best_score: + best_score = sim + best_route = route + + if best_score < threshold: + return ("rag_search", best_score) + + return (best_route, best_score) diff --git a/lib/query_router_test.py b/lib/query_router_test.py new file mode 100644 index 0000000..27ccefd --- /dev/null +++ b/lib/query_router_test.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +"""Test suite for the semantic query router.""" + +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lib.query_router import classify + +TEST_QUERIES = [ + ("how do I get from Buhl to Boise", "nav_route"), + ("what does the survival manual say about water", "rag_search"), + ("what town is at 42.5, -114.7", "nav_reverse_geocode"), + ("hey aurora", "direct_answer"), + ("what's the fastest way to Sun Valley", "nav_route"), + ("how to purify water in the field", "rag_search"), + ("good morning", "direct_answer"), +] + + +def main(): + print("Query Router Test Suite") + print("=" * 70) + + passed = 0 + failed = 0 + + for query, expected in TEST_QUERIES: + route, confidence = classify(query) + status = "PASS" if route == expected else "FAIL" + if status == "PASS": + passed += 1 + else: + failed += 1 + print(f" [{status}] {query!r}") + print(f" → {route} ({confidence:.3f}) expected={expected}") + + print("=" * 70) + print(f"Results: {passed}/{passed + failed} passed") + if failed: + print(f" {failed} FAILED") + sys.exit(1) + else: + print(" All tests passed!") + + +if __name__ == "__main__": + main() diff --git a/lib/scraper_runner.py b/lib/scraper_runner.py new file mode 100644 index 0000000..b83f145 --- /dev/null +++ b/lib/scraper_runner.py @@ -0,0 +1,387 @@ +""" +RECON Scraper Runner + +Daemon loop that processes scrape jobs: crawl via Zimit → kiwix-manage. +Zimit (openZIM Docker crawler) handles all site types and produces ZIM +files directly — no separate zimwriterfs step needed. + +Public entry point: scraper_loop(stop_event, config). + +Config section: scraper (output_dir, docker_image, docker_workers, poll_interval) +DB table: scrape_jobs (status flow: pending → scraping → registering → complete) +""" +import glob as _glob +import os +import re +import shutil +import signal +import subprocess +import time +from datetime import datetime, timezone +from urllib.parse import urlparse + +from .utils import setup_logging +from .status import StatusDB + +logger = setup_logging('recon.scraper_runner') + + +def scraper_loop(stop_event, config): + """Daemon loop: poll for pending scrape jobs, execute pipeline.""" + scraper_cfg = config.get('scraper', {}) + poll_interval = scraper_cfg.get('poll_interval', 300) + + logger.info("Scraper runner started") + + # Clean up any orphan Zimit containers from a previous crash + _cleanup_orphan_containers() + + while not stop_event.is_set(): + db = StatusDB() + job = db.get_pending_scrape_job() + if job: + try: + _process_job(job, config, stop_event) + except Exception as e: + logger.error(f"Scraper job {job['id']} unexpected error: {e}", exc_info=True) + try: + db.update_scrape_job(job['id'], + status='failed', + error_message=str(e)[:1000], + subprocess_pid=None, + completed_at=_now()) + except Exception: + pass + else: + stop_event.wait(poll_interval) + + logger.info("Scraper runner stopped") + + +def _now(): + return datetime.now(timezone.utc).isoformat() + + +def _sanitize_domain(url): + """Extract and sanitize domain from URL for use in filenames.""" + parsed = urlparse(url) + domain = parsed.hostname or 'unknown' + if domain.startswith('www.'): + domain = domain[4:] + return domain + + +def _sanitize_filename(s): + """Sanitize a string for safe filename use.""" + return re.sub(r'[^a-zA-Z0-9._-]', '_', s) + + +def _check_cancelled(db, job_id): + """Check if a job has been cancelled in the DB.""" + job = db.get_scrape_job(job_id) + return job and job['status'] == 'cancelled' + + +def _kill_process(proc, timeout=5): + """Gracefully terminate a subprocess, force kill if needed.""" + if proc.poll() is not None: + return + try: + proc.terminate() + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=2) + + +def _cleanup_orphan_containers(): + """Remove any leftover recon-scraper-* Docker containers from a previous crash.""" + try: + result = subprocess.run( + ['docker', 'ps', '-a', '--filter', 'name=recon-scraper-', '--format', '{{.Names}}'], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0 and result.stdout.strip(): + for name in result.stdout.strip().split('\n'): + name = name.strip() + if name: + subprocess.run(['docker', 'rm', '-f', name], capture_output=True, timeout=10) + logger.info(f"Cleaned up orphan container: {name}") + except Exception as e: + logger.warning(f"Orphan container cleanup failed: {e}") + + +# ── Zimit crawl backend ────────────────────────────────────────── + + +def _crawl_zimit(job, config, stop_event, db): + """ + Crawl a URL using Zimit (openZIM Docker crawler). + + Returns (page_count, zim_filename, error_msg). + On success: (count, filename, None) + On failure: (0, None, error_string) + """ + job_id = job['id'] + url = job['url'] + title = job.get('title') or _sanitize_domain(url) + language = job.get('language') or config.get('scraper', {}).get('default_language', 'eng') + category = job.get('category') or '' + + scraper_cfg = config.get('scraper', {}) + output_dir = scraper_cfg.get('output_dir', '/mnt/kiwix') + docker_image = scraper_cfg.get('docker_image', 'ghcr.io/openzim/zimit') + docker_workers = scraper_cfg.get('docker_workers', 2) + + domain = _sanitize_domain(url) + date_tag = datetime.now().strftime('%Y-%m') + container_name = f'recon-scraper-{job_id}' + tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}') + + # Clean up any pre-existing container with same name (retry scenario) + subprocess.run(['docker', 'rm', '-f', container_name], capture_output=True, timeout=10) + + os.makedirs(tmp_dir, exist_ok=True) + + description = f"Mirror of {domain}" + if category: + description = f"{category} — mirror of {domain}" + + docker_cmd = [ + 'docker', 'run', + '--name', container_name, + '-v', f'{tmp_dir}:/output', + docker_image, + 'zimit', + '--seeds', url, + '--name', _sanitize_filename(domain), + '--zim-lang', language, + '--title', title, + '--description', description[:80], + '--output', '/output', + '-w', str(docker_workers), + ] + + logger.info(f"Job {job_id}: Zimit crawl starting — {url}") + try: + proc = subprocess.Popen( + docker_cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + db.update_scrape_job(job_id, subprocess_pid=proc.pid) + + last_progress_check = 0 + while proc.poll() is None: + if stop_event.is_set() or _check_cancelled(db, job_id): + # Stop the Docker container + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + _kill_process(proc) + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, 'cancelled' + + # Check progress every 30s via docker logs + now = time.time() + if now - last_progress_check >= 30: + last_progress_check = now + try: + log_result = subprocess.run( + ['docker', 'logs', '--tail', '20', container_name], + capture_output=True, text=True, timeout=10 + ) + if log_result.returncode == 0: + # Browsertrix logs JSON with "crawled":N — check both stdout and stderr + log_text = log_result.stdout or log_result.stderr or '' + lines = log_text.strip().split('\n') + for line in reversed(lines): + match = re.search(r'"crawled":(\d+)', line) + if match: + count = int(match.group(1)) + if count > 0: + db.update_scrape_job(job_id, page_count=count) + break + except Exception: + pass + + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + pass + + db.update_scrape_job(job_id, subprocess_pid=None) + + if stop_event.is_set() or _check_cancelled(db, job_id): + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, 'cancelled' + + if proc.returncode != 0: + # Capture last 50 lines of docker logs for error context + error_msg = f"Zimit exited with code {proc.returncode}" + try: + log_result = subprocess.run( + ['docker', 'logs', '--tail', '50', container_name], + capture_output=True, text=True, timeout=10 + ) + log_text = (log_result.stderr or log_result.stdout or '').strip() + if log_text: + # Take last 500 chars + error_msg += f": {log_text[-500:]}" + except Exception: + pass + # Remove container (no --rm flag, so we clean up manually) + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, error_msg + + except Exception as e: + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, f"Zimit error: {e}" + + # Remove container (no --rm flag, so we clean up manually after getting logs) + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + + # Find the output ZIM file + zim_files = _glob.glob(os.path.join(tmp_dir, '*.zim')) + if not zim_files: + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, 'Zimit produced no ZIM file' + + src_zim = zim_files[0] # Should be exactly one + + # Get page count from file size as rough estimate if we don't have one + page_count = 0 + try: + job_state = db.get_scrape_job(job_id) + page_count = job_state.get('page_count') or 0 + except Exception: + pass + + # Rename to final location + zim_filename = f"{_sanitize_filename(domain)}_{language}_{date_tag}_{job_id}.zim" + zim_path = os.path.join(output_dir, zim_filename) + try: + shutil.move(src_zim, zim_path) + except Exception as e: + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, f"Failed to move ZIM to output dir: {e}" + + shutil.rmtree(tmp_dir, ignore_errors=True) + logger.info(f"Job {job_id}: Zimit complete — {zim_filename}") + + return page_count, zim_filename, None + + +# ── Main job pipeline ───────────────────────────────────────────── + + +def _process_job(job, config, stop_event): + """Execute the full scrape pipeline for a single job.""" + db = StatusDB() + job_id = job['id'] + + logger.info(f"Job {job_id}: starting scrape of {job['url']}") + + # ── Phase 1: Crawl via Zimit ─────────────────────────────────── + db.update_scrape_job(job_id, + status='scraping', + crawl_mode='zimit', + started_at=_now()) + + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id) + return + + page_count, zim_filename, error = _crawl_zimit(job, config, stop_event, db) + + if error == 'cancelled': + _handle_cancel(db, job_id) + return + elif error: + db.update_scrape_job(job_id, + status='failed', + error_message=error[:1000], + subprocess_pid=None, + completed_at=_now()) + return + + db.update_scrape_job(job_id, page_count=page_count) + + # ── Phase 2: Register with kiwix-serve ───────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id) + return + + db.update_scrape_job(job_id, status='registering') + + output_dir = config.get('scraper', {}).get('output_dir', '/mnt/kiwix') + zim_path = os.path.join(output_dir, zim_filename) + kiwix_manage = shutil.which('kiwix-manage') or '/opt/recon/bin/kiwix-manage' + library_xml = '/mnt/kiwix/library.xml' + + try: + subprocess.run( + [kiwix_manage, library_xml, 'add', zim_path], + capture_output=True, text=True, timeout=30 + ) + logger.info(f"Job {job_id}: registered with kiwix-serve library") + except Exception as e: + logger.warning(f"Job {job_id}: kiwix-manage add failed: {e}") + + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Job {job_id}: sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Job {job_id}: failed to signal kiwix-serve: {e}") + + # Wait for kiwix-serve to reload its catalog after SIGHUP + time.sleep(3) + + zim_source_id = None + try: + from .zim_monitor import scan_zims + scan_zims() + conn = db._get_conn() + row = conn.execute( + "SELECT id FROM zim_sources WHERE zim_filename = ?", (zim_filename,) + ).fetchone() + if row: + zim_source_id = row['id'] + logger.info(f"Job {job_id}: linked to zim_source_id={zim_source_id}") + except Exception as e: + logger.warning(f"Job {job_id}: scan_zims failed: {e}") + + # ── Phase 3: Complete ────────────────────────────────────────── + db.update_scrape_job(job_id, + status='complete', + zim_filename=zim_filename, + zim_source_id=zim_source_id, + completed_at=_now()) + + logger.info(f"Job {job_id}: complete — {zim_filename} ({page_count} pages)") + + +def _handle_cancel(db, job_id): + """Handle job cancellation: clean up Docker container and update status.""" + container_name = f'recon-scraper-{job_id}' + try: + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + except Exception: + pass + + # Clean up tmp dir if it exists + output_dir = '/mnt/kiwix' + tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}') + shutil.rmtree(tmp_dir, ignore_errors=True) + + logger.info(f"Job {job_id}: cancelled") + db.update_scrape_job(job_id, + status='cancelled', + subprocess_pid=None, + completed_at=_now()) diff --git a/lib/status.py b/lib/status.py index 20cc77b..974cabd 100644 --- a/lib/status.py +++ b/lib/status.py @@ -105,6 +105,25 @@ class StatusDB: except Exception: pass # column already exists + # Migration: add subprocess_pid column to scrape_jobs if missing + try: + conn.execute("ALTER TABLE scrape_jobs ADD COLUMN subprocess_pid INTEGER") + except Exception: + pass # column already exists + + # Migration: add reject pattern columns to scrape_jobs if missing + for col, coltype in [('additional_reject_patterns', 'TEXT'), ('skip_default_patterns', 'INTEGER DEFAULT 0')]: + try: + conn.execute(f"ALTER TABLE scrape_jobs ADD COLUMN {col} {coltype}") + except Exception: + pass # column already exists + + # Migration: add crawl_mode column to scrape_jobs if missing + try: + conn.execute("ALTER TABLE scrape_jobs ADD COLUMN crawl_mode TEXT") + except Exception: + pass # column already exists + # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -142,6 +161,28 @@ class StatusDB: resolved_at TEXT ); CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status); + + CREATE TABLE IF NOT EXISTS scrape_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + title TEXT, + language TEXT DEFAULT 'eng', + category TEXT, + status TEXT DEFAULT 'pending', + page_count INTEGER DEFAULT 0, + error_message TEXT, + zim_filename TEXT, + zim_source_id INTEGER, + workspace_path TEXT, + subprocess_pid INTEGER, + additional_reject_patterns TEXT, + skip_default_patterns INTEGER DEFAULT 0, + crawl_mode TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + started_at TEXT, + completed_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_scrape_status ON scrape_jobs(status); """) conn.commit() @@ -406,6 +447,50 @@ class StatusDB: ) conn.commit() + + # ── Scraper Job Helpers ───────────────────────────────────── + + def get_pending_scrape_job(self): + """Fetch the oldest pending scrape job.""" + conn = self._get_conn() + row = conn.execute( + "SELECT * FROM scrape_jobs WHERE status = 'pending' ORDER BY id ASC LIMIT 1" + ).fetchone() + return dict(row) if row else None + + def update_scrape_job(self, job_id, **kwargs): + """Update arbitrary columns on a scrape job.""" + if not kwargs: + return + conn = self._get_conn() + sets = [] + vals = [] + for k, v in kwargs.items(): + sets.append(f"{k} = ?") + vals.append(v) + vals.append(job_id) + conn.execute(f"UPDATE scrape_jobs SET {', '.join(sets)} WHERE id = ?", vals) + conn.commit() + + def get_scrape_jobs(self, status=None): + """List scrape jobs, optionally filtered by status.""" + conn = self._get_conn() + if status: + rows = conn.execute( + "SELECT * FROM scrape_jobs WHERE status = ? ORDER BY id DESC", (status,) + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM scrape_jobs ORDER BY id DESC" + ).fetchall() + return [dict(r) for r in rows] + + def get_scrape_job(self, job_id): + """Get a single scrape job by ID.""" + conn = self._get_conn() + row = conn.execute("SELECT * FROM scrape_jobs WHERE id = ?", (job_id,)).fetchone() + return dict(row) if row else None + # ── Stream B: File Operations ─────────────────────────────────── def log_file_operation(self, doc_hash, operation, source_path, target_path, diff --git a/lib/zim_monitor.py b/lib/zim_monitor.py new file mode 100644 index 0000000..248fc0f --- /dev/null +++ b/lib/zim_monitor.py @@ -0,0 +1,217 @@ +""" +ZIM Monitor — detects ZIMs loaded in kiwix-serve and tracks them in recon.db. + +Polls the kiwix-serve OPDS v2 catalog, compares against the zim_sources table, +and for new ZIMs reads accurate metadata via python-libzim's Counter field. + +Standalone: python3 /opt/recon/lib/zim_monitor.py +As module: from lib.zim_monitor import scan_zims +""" +import logging +import os +import sqlite3 +import sys +import urllib.request +from xml.etree import ElementTree as ET + +sys.path.insert(0, "/opt/recon") +from lib.utils import setup_logging + +try: + from libzim.reader import Archive + HAVE_LIBZIM = True +except ImportError: + HAVE_LIBZIM = False + +OPDS_URL = "http://localhost:8430/catalog/v2/entries?count=-1" +ZIM_DIR = "/mnt/kiwix" +DB_PATH = "/opt/recon/data/recon.db" + +ATOM_NS = "http://www.w3.org/2005/Atom" + +logger = logging.getLogger("recon.zim_monitor") + + +def _text(element, tag, ns=ATOM_NS): + """Get text content of a child element, or None.""" + child = element.find(f"{{{ns}}}{tag}") + if child is not None and child.text: + return child.text.strip() + return None + + +def parse_counter(counter_str): + """Parse ZIM Counter metadata into {mimetype: count}.""" + result = {} + for pair in counter_str.split(";"): + if "=" in pair: + mime, count = pair.split("=", 1) + try: + result[mime.strip()] = int(count.strip()) + except ValueError: + pass + return result + + +def fetch_opds(): + """Fetch OPDS v2 catalog from kiwix-serve. Returns list of dicts.""" + try: + with urllib.request.urlopen(OPDS_URL, timeout=10) as resp: + data = resp.read() + except Exception as e: + logger.error("Failed to fetch OPDS catalog: %s", e) + return [] + + root = ET.fromstring(data) + entries = [] + for entry in root.findall(f"{{{ATOM_NS}}}entry"): + uuid_raw = _text(entry, "id") + uuid = uuid_raw.replace("urn:uuid:", "") if uuid_raw else None + + # Derive ZIM filename from the content link href + zim_filename = None + for link in entry.findall(f"{{{ATOM_NS}}}link"): + if link.get("type") == "text/html": + href = link.get("href", "") + # href looks like /content/appropedia_en_all_maxi_2025-11 + name = href.rsplit("/", 1)[-1] if "/" in href else href + if name: + zim_filename = name + ".zim" + break + + entries.append({ + "uuid": uuid, + "title": _text(entry, "title"), + "name": _text(entry, "name"), + "flavour": _text(entry, "flavour"), + "language": _text(entry, "language"), + "category": _text(entry, "category") or None, + "summary": _text(entry, "summary"), + "article_count_opds": int(_text(entry, "articleCount") or 0), + "zim_filename": zim_filename, + }) + return entries + + +def get_libzim_metadata(zim_path): + """Open a ZIM file and read accurate metadata via python-libzim.""" + if not HAVE_LIBZIM: + logger.warning("python-libzim not available, skipping metadata read") + return {} + + zim = Archive(zim_path) + meta = {} + + def _get_meta(key): + try: + return zim.get_metadata(key).decode("utf-8", errors="replace") + except RuntimeError: + return None + + meta["title"] = _get_meta("Title") + meta["description"] = _get_meta("Description") + meta["language"] = _get_meta("Language") + meta["tags"] = _get_meta("Tags") + + counter_str = _get_meta("Counter") + if counter_str: + counts = parse_counter(counter_str) + meta["article_count"] = counts.get("text/html", 0) + meta["counter_raw"] = counter_str + else: + meta["article_count"] = 0 + meta["counter_raw"] = None + + return meta + + +def scan_zims(): + """Compare OPDS catalog against zim_sources table. Insert/update as needed.""" + logger.info("Scanning kiwix-serve OPDS catalog...") + opds_entries = fetch_opds() + if not opds_entries: + logger.info("No entries in OPDS catalog (or fetch failed)") + return + + logger.info("OPDS returned %d entries", len(opds_entries)) + + con = sqlite3.connect(DB_PATH) + con.row_factory = sqlite3.Row + + # Get existing zim_sources keyed by filename + existing = {} + for row in con.execute("SELECT id, zim_filename, status FROM zim_sources"): + existing[row["zim_filename"]] = dict(row) + + opds_filenames = set() + new_count = 0 + + for entry in opds_entries: + filename = entry["zim_filename"] + if not filename: + logger.warning("Skipping OPDS entry with no derivable filename: %s", entry) + continue + + opds_filenames.add(filename) + + if filename in existing: + logger.debug("Already tracked: %s (status=%s)", filename, existing[filename]["status"]) + continue + + # New ZIM — read accurate metadata via python-libzim + zim_path = os.path.join(ZIM_DIR, filename) + if not os.path.isfile(zim_path): + logger.warning("ZIM file not found on disk: %s", zim_path) + continue + + logger.info("New ZIM detected: %s — reading metadata via libzim", filename) + meta = get_libzim_metadata(zim_path) + + con.execute( + """INSERT INTO zim_sources + (zim_filename, zim_path, zim_uuid, title, description, + language, category, article_count, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'detected')""", + ( + filename, + zim_path, + entry["uuid"], + meta.get("title") or entry["title"], + meta.get("description") or entry["summary"], + meta.get("language") or entry["language"], + entry["category"], + meta.get("article_count", 0), + ), + ) + new_count += 1 + logger.info( + " Inserted: %s — title=%r, articles=%s (OPDS said %s)", + filename, + meta.get("title") or entry["title"], + meta.get("article_count", 0), + entry["article_count_opds"], + ) + + # Detect removed ZIMs (in DB but not in OPDS, and not already marked removed) + removed_count = 0 + for filename, row in existing.items(): + if filename not in opds_filenames and row["status"] != "removed": + con.execute( + "UPDATE zim_sources SET status = 'removed' WHERE id = ?", + (row["id"],), + ) + removed_count += 1 + logger.info("Marked removed: %s", filename) + + con.commit() + con.close() + + logger.info( + "Scan complete: %d new, %d removed, %d total in catalog", + new_count, removed_count, len(opds_entries), + ) + + +if __name__ == "__main__": + setup_logging("recon.zim_monitor") + scan_zims() diff --git a/recon.py b/recon.py index 47dda7d..9635a59 100755 --- a/recon.py +++ b/recon.py @@ -692,12 +692,23 @@ def cmd_service(args): daemon=True, name='dashboard'), ] + # Scraper daemon: polls for pending scrape jobs, runs wget+zimwriterfs pipeline + scraper_cfg = config.get('scraper', {}) + if scraper_cfg.get('workspace'): + from lib.scraper_runner import scraper_loop + threads.append( + threading.Thread(target=lambda: scraper_loop(stop_event, config), + daemon=True, name='scraper') + ) + logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") pt_interval = config.get("peertube", {}).get("poll_interval", 1800) logger.info(f" PeerTube acquisition: every {pt_interval}s") + if scraper_cfg.get('workspace'): + logger.info(f" Scraper: every {scraper_cfg.get('poll_interval', 300)}s") logger.info(f" Progress: every {progress_interval}s") for t in threads: diff --git a/requirements.txt b/requirements.txt index f643cd8..1da21bc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ anyio==4.12.1 babel==2.18.0 beautifulsoup4==4.14.3 blinker==1.9.0 +cachetools==7.1.3 certifi==2026.1.4 cffi==2.0.0 charset-normalizer==3.4.4 diff --git a/static/css/recon.css b/static/css/recon.css index 95aed52..a272876 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -211,6 +211,7 @@ tr:hover { background: var(--bg-secondary); } .badge-web { background: #1e3a5f; color: #60a5fa; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-pdf { background: #2d5a2d; color: #4ade80; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-transcript { background: #3b1f5e; color: #c084fc; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-wiki { background: #1f4a3b; color: #34d399; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } /* ── Trend indicators ── */ .trend { font-size: 11px; margin-left: 6px; } @@ -315,3 +316,19 @@ tr:hover { background: var(--bg-secondary); } .errors-panel.has-errors { display: block; } .errors-panel summary { color: var(--red); cursor: pointer; font-size: 13px; margin-bottom: 8px; } .errors-panel .error-line { color: var(--text-muted); font-size: 11px; padding: 2px 0; border-bottom: 1px solid var(--border); } + +/* ── Toggle switch ── */ +.toggle-switch { position: relative; display: inline-block; width: 40px; height: 20px; } +.toggle-switch input { opacity: 0; width: 0; height: 0; } +.toggle-slider { position: absolute; cursor: pointer; inset: 0; background: #333; border-radius: 20px; transition: 0.3s; } +.toggle-slider:before { content: ''; position: absolute; height: 16px; width: 16px; left: 2px; bottom: 2px; background: #888; border-radius: 50%; transition: 0.3s; } +.toggle-switch input:checked + .toggle-slider { background: #1a4a2e; } +.toggle-switch input:checked + .toggle-slider:before { transform: translateX(20px); background: #00ff41; } + +/* ── Kiwix status badges ── */ +.badge-complete { background: #1a4a2e; color: #00ff41; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-ingesting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-processing { background: #4a3a1a; color: #f59e0b; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-extracting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-failed { background: #4a1a1a; color: #ff4444; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 254d92a..0bd0b39 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -88,7 +88,7 @@ var pipeCount = s.in_pipeline || 0; totalCat += catCount; totalComp += compCount; totalPipe += pipeCount; totalConcepts += s.concepts; totalVectors += s.vectors; - var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : 'PDF'; + var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : s.type === 'wiki' ? 'WIKI' : 'PDF'; var compPct = catCount > 0 ? (compCount / catCount * 100) : 0; var pipePct = catCount > 0 ? (pipeCount / catCount * 100) : 0; var compColor = compPct >= 100 ? '#00ff41' : compPct > 0 ? '#ffa500' : '#666'; @@ -185,7 +185,7 @@ rtb.innerHTML = '
  • '; } else { rtb.innerHTML = data.recent_complete.map(function(r) { - var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : 'PDF'; + var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : r.type === 'wiki' ? 'WIKI' : 'PDF'; return ''; }).join(''); diff --git a/static/js/kiwix.js b/static/js/kiwix.js new file mode 100644 index 0000000..c85ee93 --- /dev/null +++ b/static/js/kiwix.js @@ -0,0 +1,147 @@ +/* RECON Kiwix Dashboard JS */ +(function() { + 'use strict'; + + function loadKiwixDashboard() { + return RECON.fetchJSON('/api/kiwix/sources').then(function(data) { + // Update stat cards + var t = data.totals || {}; + RECON.set('kx-sources', RECON.fmt(t.sources)); + RECON.set('kx-articles', RECON.fmt(t.articles)); + RECON.set('kx-processed', RECON.fmt(t.processed)); + RECON.set('kx-pipeline', RECON.fmt(t.in_pipeline)); + + // Kiwix-serve status dot + var ks = data.kiwix_serve || {}; + var dot = document.getElementById('svc-kiwix-serve'); + dot.className = 'svc-dot ' + (ks.status === 'active' ? 'active' : 'inactive'); + + // ZIM table + var sources = data.sources || []; + var html = ''; + sources.forEach(function(s) { + var es = s.effective_status || s.status; + var pipe = s.pipeline || {}; + var pipeComplete = pipe.complete || 0; + var pipeTotal = 0; + for (var k in pipe) pipeTotal += pipe[k]; + var pctDone = pipeTotal > 0 ? (pipeComplete / pipeTotal * 100).toFixed(1) : 0; + var statusBadge = es === 'complete' ? 'COMPLETE' : + es === 'processing' ? 'PROCESSING' : + es === 'extracting' ? 'EXTRACTING' : + 'DETECTED'; + // Derive browse URL from zim_filename + var zimName = s.zim_filename.replace(/_(?:maxi|mini|nopic)_[\d-]+\.zim$/, ''); + var browseUrl = 'https://wiki.echo6.co/' + zimName + '/'; + // Toggle switch + var checked = s.ingest_enabled ? ' checked' : ''; + var toggle = ''; + + html += '' + + '' + + '' + + '' + + '' + + '' + + '' + + '' + + '' + + ''; + }); + if (!html) html = ''; + RECON.setHTML('kx-table-body', html); + }).catch(function(err) { + console.error('Kiwix dashboard error:', err); + }); + } + + function toggleIngest(id, enabled) { + RECON.postJSON('/api/kiwix/toggle-ingest/' + id, {enabled: enabled}).then(function(data) { + if (data.ok) loadKiwixDashboard(); + }); + } + + function removeSource(id, title) { + if (!confirm('Remove "' + title + '"?\n\nThis will delete the ZIM file, all ingested documents, and associated vectors from Qdrant. This cannot be undone.')) return; + RECON.postJSON('/api/kiwix/remove/' + id).then(function(data) { + if (data.ok) { + var r = data.results || {}; + alert('Removed: ' + r.docs_deleted + ' docs, ~' + r.vectors_deleted + ' vector batches deleted, file ' + (r.file_deleted ? 'deleted' : 'not found')); + loadKiwixDashboard(); + } + }); + } + + function triggerIngest(id) { + RECON.postJSON('/api/kiwix/trigger-ingest/' + id).then(function(data) { + if (data.ok) loadKiwixDashboard(); + }); + } + + function uploadZim() { + var input = document.getElementById('kx-file-input'); + var file = input.files[0]; + if (!file) return; + + var statusEl = document.getElementById('kx-upload-status'); + var progressDiv = document.getElementById('kx-upload-progress'); + var progressBar = document.getElementById('kx-progress-bar'); + var progressText = document.getElementById('kx-progress-text'); + + statusEl.textContent = 'Uploading ' + file.name + '...'; + progressDiv.style.display = 'block'; + + var formData = new FormData(); + formData.append('file', file); + + var xhr = new XMLHttpRequest(); + xhr.open('POST', '/api/kiwix/upload', true); + + xhr.upload.onprogress = function(e) { + if (e.lengthComputable) { + var pct = (e.loaded / e.total * 100).toFixed(1); + progressBar.style.width = pct + '%'; + progressText.textContent = RECON.fmtBytes(e.loaded) + ' / ' + RECON.fmtBytes(e.total) + ' (' + pct + '%)'; + } + }; + + xhr.onload = function() { + if (xhr.status === 200) { + var resp = JSON.parse(xhr.responseText); + statusEl.textContent = resp.ok ? 'Upload complete: ' + resp.filename : 'Error: ' + (resp.error || 'Unknown'); + progressBar.style.width = '100%'; + progressBar.style.background = resp.ok ? '#16a34a' : '#dc2626'; + if (resp.ok) loadKiwixDashboard(); + } else { + statusEl.textContent = 'Upload failed (HTTP ' + xhr.status + ')'; + progressBar.style.background = '#dc2626'; + } + input.value = ''; + }; + + xhr.onerror = function() { + statusEl.textContent = 'Upload failed (network error)'; + progressBar.style.background = '#dc2626'; + input.value = ''; + }; + + xhr.send(formData); + } + + // Expose for inline onclick + window.KIWIX = { toggleIngest: toggleIngest, triggerIngest: triggerIngest, remove: removeSource }; + + document.addEventListener('DOMContentLoaded', function() { + RECON.startRefresh(loadKiwixDashboard, 30000); + document.getElementById('kx-file-input').addEventListener('change', uploadZim); + }); +})(); diff --git a/static/js/scraper.js b/static/js/scraper.js new file mode 100644 index 0000000..3988ffe --- /dev/null +++ b/static/js/scraper.js @@ -0,0 +1,173 @@ +/* RECON Scraper Dashboard JS */ +(function() { + 'use strict'; + + function loadJobs() { + return RECON.fetchJSON('/api/scraper/jobs').then(function(data) { + var jobs = data.jobs || []; + + // Stats + var total = jobs.length; + var active = 0, complete = 0, failed = 0; + jobs.forEach(function(j) { + if (j.status === 'complete') complete++; + else if (j.status === 'failed' || j.status === 'cancelled') failed++; + else if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') active++; + }); + RECON.set('sc-total', RECON.fmt(total)); + RECON.set('sc-active', RECON.fmt(active)); + RECON.set('sc-complete', RECON.fmt(complete)); + RECON.set('sc-failed', RECON.fmt(failed)); + + // Show/hide Clear Failed button + var clearBtn = document.getElementById('sc-clear-btn'); + if (clearBtn) clearBtn.style.display = failed > 0 ? '' : 'none'; + + // Table + var html = ''; + jobs.forEach(function(j) { + var badge = statusBadge(j.status); + var pages = j.page_count ? RECON.fmt(j.page_count) : '\u2014'; + var zim = j.zim_filename ? + '' + j.zim_filename + '' : '\u2014'; + var actions = ''; + + if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') { + actions = ''; + } else if (j.status === 'failed' || j.status === 'cancelled') { + actions = ' ' + + ''; + } else if (j.status === 'complete') { + actions = ''; + } + + // Truncate URL for display + var displayUrl = j.url.length > 40 ? j.url.substring(0, 40) + '\u2026' : j.url; + + html += '' + + '' + + '' + + '' + + '' + + '' + + '' + + '' + + ''; + }); + if (!html) html = ''; + RECON.setHTML('sc-table-body', html); + }).catch(function(err) { + console.error('Scraper dashboard error:', err); + }); + } + + function statusBadge(status) { + var map = { + 'pending': 'PENDING', + 'scraping': 'SCRAPING', + 'registering': 'REGISTERING', + 'complete': 'COMPLETE', + 'failed': 'FAILED', + 'cancelled': 'CANCELLED' + }; + return map[status] || '' + (status || 'UNKNOWN').toUpperCase() + ''; + } + + function errorTooltip(job) { + if (!job.error_message) return ''; + var short = job.error_message.length > 80 ? + job.error_message.substring(0, 80) + '\u2026' : job.error_message; + return '
    ' + escHtml(short) + '
    '; + } + + function escHtml(str) { + if (!str) return ''; + return str.replace(/&/g, '&').replace(//g, '>') + .replace(/"/g, '"').replace(/'/g, '''); + } + + function submit(e) { + e.preventDefault(); + var url = document.getElementById('sf-url').value.trim(); + if (!url) return false; + + var body = { url: url }; + var title = document.getElementById('sf-title').value.trim(); + var lang = document.getElementById('sf-lang').value; + var category = document.getElementById('sf-category').value.trim(); + if (title) body.title = title; + if (lang) body.language = lang; + if (category) body.category = category; + + var btn = document.getElementById('sf-submit-btn'); + var feedback = document.getElementById('sf-feedback'); + btn.disabled = true; + btn.textContent = 'Submitting...'; + + RECON.postJSON('/api/scraper/submit', body).then(function(data) { + btn.disabled = false; + btn.textContent = 'Submit'; + if (data.ok) { + feedback.style.display = 'block'; + feedback.style.color = '#00ff41'; + feedback.textContent = 'Job #' + data.job_id + ' submitted successfully'; + document.getElementById('sf-url').value = ''; + document.getElementById('sf-title').value = ''; + document.getElementById('sf-category').value = ''; + setTimeout(function() { feedback.style.display = 'none'; }, 4000); + loadJobs(); + } else { + feedback.style.display = 'block'; + feedback.style.color = '#ff4444'; + feedback.textContent = 'Error: ' + (data.error || 'Unknown error'); + } + }).catch(function(err) { + btn.disabled = false; + btn.textContent = 'Submit'; + feedback.style.display = 'block'; + feedback.style.color = '#ff4444'; + feedback.textContent = 'Network error: ' + err.message; + }); + + return false; + } + + function cancel(jobId) { + if (!confirm('Cancel job #' + jobId + '?')) return; + RECON.postJSON('/api/scraper/cancel/' + jobId).then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + function retry(jobId) { + RECON.postJSON('/api/scraper/retry/' + jobId).then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + function remove(jobId) { + if (!confirm('Delete job #' + jobId + '? This cannot be undone.')) return; + RECON.postJSON('/api/scraper/delete/' + jobId).then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + function clearFailed() { + if (!confirm('Delete all failed and cancelled jobs?')) return; + RECON.postJSON('/api/scraper/clear-failed').then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + // Expose for inline onclick + window.SCRAPER = { submit: submit, cancel: cancel, retry: retry, remove: remove, clearFailed: clearFailed }; + + document.addEventListener('DOMContentLoaded', function() { + RECON.startRefresh(loadJobs, 10000); + }); +})(); diff --git a/templates/base.html b/templates/base.html index 09db6d8..49b1a21 100644 --- a/templates/base.html +++ b/templates/base.html @@ -19,6 +19,7 @@ diff --git a/templates/kiwix/dashboard.html b/templates/kiwix/dashboard.html new file mode 100644 index 0000000..72bbed4 --- /dev/null +++ b/templates/kiwix/dashboard.html @@ -0,0 +1,48 @@ +{% extends "base.html" %} +{% block content %} +
    + +
    +
    ZIM Sources
    +
    Total Articles
    +
    Processed
    +
    In Pipeline
    +
    + + +
    +
    Kiwix-Serve
    + +
    + + +
    +

    ZIM Library

    +
    None yet
    ' + r.title + '' + badge + '' + r.concepts + '' + r.vectors + '
    ' + (s.title || s.zim_filename) + '' + + '
    ' + s.zim_filename + '
    ' + (s.language || '\u2014') + '' + RECON.fmt(s.article_count) + '' + (es === 'complete' && pipeComplete > 0 ? + RECON.fmt(pipeComplete) + ' in Qdrant' : + es === 'processing' ? + RECON.fmt(pipeComplete) + ' / ' + RECON.fmt(pipeTotal) + ' in Qdrant (' + pctDone + '%)' : + es === 'extracting' ? + RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + ' extracted' : + '\u2014') + '' + statusBadge + '' + toggle + 'Browse
    No ZIM sources detected
    ' + j.id + '' + escHtml(displayUrl) + '' + escHtml(j.title || '\u2014') + '' + pages + '' + badge + errorTooltip(j) + '' + zim + '' + actions + '
    No scrape jobs
    + + + + + + +
    TitleLanguageArticlesProgressStatusIngestBrowse
    Loading...
    + + + +
    +

    Upload ZIM File

    +
    + + + +
    + +
    + +{% endblock %} +{% block scripts %} + +{% endblock %} diff --git a/templates/kiwix/scraper.html b/templates/kiwix/scraper.html new file mode 100644 index 0000000..862ba0a --- /dev/null +++ b/templates/kiwix/scraper.html @@ -0,0 +1,84 @@ +{% extends "base.html" %} +{% block content %} +
    + +
    +

    Submit Scrape Job

    +
    +
    +
    + + +
    +
    + + +
    +
    +
    +
    + + +
    +
    + + +
    +
    + +
    +
    + +
    +
    + + +
    +
    Total Jobs
    +
    Active
    +
    Complete
    +
    Failed
    +
    + + +
    +
    +

    Scrape Jobs

    + +
    + + + + + + + + + + + + + + + +
    IDURLTitlePagesStatusZIM
    Loading...
    +
    +
    +{% endblock %} +{% block scripts %} + +{% endblock %}