diff --git a/.gitignore b/.gitignore index bce13d8..238cabb 100644 --- a/.gitignore +++ b/.gitignore @@ -24,7 +24,3 @@ recon.db # OS .DS_Store - -# Kiwix binary tools (installed from tarball) -bin/ -status.db diff --git a/config.yaml b/config.yaml index a2709b0..3e185f8 100644 --- a/config.yaml +++ b/config.yaml @@ -413,14 +413,6 @@ peertube: rate_limit_delay: 0.5 # Delay between video ingestions (seconds) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) -scraper: - workspace: /opt/recon/data/scraper # Working directory (tmp dirs for Zimit output) - output_dir: /mnt/kiwix # Finished .zim files land here (kiwix-serve library) - default_language: eng # ISO 639-3 language code for ZIM metadata - poll_interval: 300 # Seconds between checking for pending scrape jobs - docker_image: ghcr.io/openzim/zimit # Zimit Docker image for web crawling - docker_workers: 2 # Concurrent crawl workers inside Zimit container - # Stream B: New Library Pipeline new_pipeline: # Disabled 2026-04-14 for refactor — see refactored-recon repo for context @@ -448,7 +440,3 @@ pipeline: text: text_processor # mtime stability threshold for picking up files from acquired/ mtime_stability_seconds: 10 - # Language filter: skip non-English content before Gemini enrichment - language_filter: true # Enable langdetect-based filtering - allowed_languages: # ISO 639-1 codes allowed through enrichment - - en diff --git a/config/address_book.yaml b/config/address_book.yaml deleted file mode 100644 index 24bc81c..0000000 --- a/config/address_book.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# RECON Address Book — saved locations for navigation shortcuts. -# Entries are matched by name and aliases (case-insensitive). -# Add new entries by appending to the list below. - -entries: - - id: home - name: Home - aliases: - - home - - matt's house - - 214 north st - - 214 north street - address: "214 North St, Filer, ID 83328" - lat: 42.5735833 - lon: -114.6066389 - tags: - - residence - - primary diff --git a/config/profiles/home.yaml b/config/profiles/home.yaml deleted file mode 100644 index de704d9..0000000 --- a/config/profiles/home.yaml +++ /dev/null @@ -1,67 +0,0 @@ -# Deployment profile: Home (VM 1130) -# Active on the main Echo6 deployment. Full stack with planet-scale NA tiles. -# Override via RECON_PROFILE env var in /etc/systemd/system/recon.service - -profile: home -region_name: "North America" - -tileset: - url: "/tiles/planet/current.pmtiles" - bounds: [-168, 14, -52, 72] - max_zoom: 15 - attribution: "Protomaps © OSM" - -tileset_hillshade: - url: "/tiles/planet-dem.pmtiles" - encoding: "terrarium" - max_zoom: 12 - -traffic: - provider: "tomtom" - proxy_url: "/api/traffic/flow/{z}/{x}/{y}.png" - -place_details: - local_source: "nominatim" - local_bbox: [-125.0, 31.3, -104.0, 49.0] - fallback_source: "overpass" - -services: - geocode: "/api/geocode" - reverse: "/api/reverse" - address_book: "/api/address_book" - valhalla: "/valhalla" - -auth: - login_url: "/outpost.goauthentik.io/start?rd=%2F" - logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/" - -features: - has_nominatim_details: true - has_kiwix_wiki: true - has_hillshade: true - has_3d_terrain: false - has_traffic_overlay: true - has_landclass: true - has_public_lands_layer: true - has_contours: true - has_contours_test: false - has_contours_test_10ft: false - has_address_book_write: false - has_overture_enrichment: true - has_google_places_enrichment: true - has_contacts: true - has_wiki_rewriting: true - has_wiki_discovery: false - has_usfs_trails: true - has_blm_trails: true - -defaults: - center: [42.5736, -114.6066] - zoom: 10 - -# Offroute wilderness routing -offroute: - osm_pbf_path: "/mnt/nav/sources/idaho-latest.osm.pbf" - densify_interval_m: 100 - postgis_dsn: "dbname=padus" - diff --git a/config/profiles/minimal_pi.yaml b/config/profiles/minimal_pi.yaml deleted file mode 100644 index c2fd90a..0000000 --- a/config/profiles/minimal_pi.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# Deployment profile: Minimal Pi (single-state pocket deployment) -# Template for the lightest possible field kit — Idaho only. -# Override via RECON_PROFILE env var. - -profile: minimal_pi -region_name: "Idaho" - -tileset: - url: "/tiles/idaho.pmtiles" - bounds: [-117.5, 42.0, -111.0, 49.0] - max_zoom: 15 - attribution: "Protomaps © OSM" - -tileset_hillshade: - url: "/tiles/hillshade-idaho.pmtiles" - encoding: "terrarium" - max_zoom: 12 - -traffic: - provider: "tomtom" - proxy_url: "/api/traffic/flow/{z}/{x}/{y}.png" - -services: - geocode: "/api/geocode" - reverse: "/api/reverse" - address_book: "/api/address_book" - valhalla: "/valhalla" - -# TODO(matt): confirm logout next= host for this profile -auth: - login_url: "/outpost.goauthentik.io/start?rd=%2F" - logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/" - -features: - has_nominatim_details: false - has_kiwix_wiki: false - has_hillshade: false - has_3d_terrain: false - has_traffic_overlay: false - has_landclass: false - has_public_lands_layer: false - has_address_book_write: true - has_overture_enrichment: false - has_google_places_enrichment: false - has_contacts: false - has_wiki_rewriting: false - has_wiki_discovery: false - -defaults: - center: [44.0, -114.0] - zoom: 7 diff --git a/config/profiles/regional_pi.yaml b/config/profiles/regional_pi.yaml deleted file mode 100644 index b6f2cad..0000000 --- a/config/profiles/regional_pi.yaml +++ /dev/null @@ -1,59 +0,0 @@ -# Deployment profile: Regional Pi (multi-state field kit) -# Template for a Raspberry Pi covering Idaho + surrounding states. -# Override via RECON_PROFILE env var. - -profile: regional_pi -region_name: "Idaho + Neighbors" - -tileset: - url: "/tiles/regional.pmtiles" - bounds: [-125, 40, -104, 49] - max_zoom: 15 - attribution: "Protomaps © OSM" - -tileset_hillshade: - url: "/tiles/hillshade-regional.pmtiles" - encoding: "terrarium" - max_zoom: 12 - -traffic: - provider: "tomtom" - proxy_url: "/api/traffic/flow/{z}/{x}/{y}.png" - -place_details: - local_source: "nominatim" - local_bbox: [-125.0, 40.0, -104.0, 49.0] - fallback_source: "overpass" - -services: - geocode: "/api/geocode" - reverse: "/api/reverse" - address_book: "/api/address_book" - valhalla: "/valhalla" - -# TODO(matt): confirm logout next= host for this profile -auth: - login_url: "/outpost.goauthentik.io/start?rd=%2F" - logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/" - -features: - has_nominatim_details: true - has_kiwix_wiki: false - has_hillshade: true - has_3d_terrain: false - has_traffic_overlay: true - has_landclass: true - has_public_lands_layer: true - has_contours: true - has_contours_test: true - has_contours_test_10ft: true - has_address_book_write: true - has_overture_enrichment: false - has_google_places_enrichment: false - has_contacts: false - has_wiki_rewriting: true - has_wiki_discovery: false - -defaults: - center: [44.0, -114.0] - zoom: 7 diff --git a/lib/api.py b/lib/api.py index a0697bf..757ebf4 100644 --- a/lib/api.py +++ b/lib/api.py @@ -35,33 +35,12 @@ _cache = { 'qdrant_scroll': None, 'qdrant_scroll_ts': 0, 'quick_stats': None, - 'kiwix_sources': None, } app = Flask(__name__, template_folder=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'templates'), static_folder=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'static')) -app.config['MAX_CONTENT_LENGTH'] = None # ZIM files can be multi-GB - - -# ── Large ZIM upload support ── -# Override stream factory so ZIM uploads write directly to /mnt/kiwix/ -# instead of /tmp (which is on the 96GB root disk and can't hold 100GB+ ZIMs). -from flask import Request as _FlaskRequest - -class _LargeZimRequest(_FlaskRequest): - def _get_file_stream(self, total_content_length, content_type, filename=None, content_length=None): - if filename and filename.lower().endswith('.zim'): - return tempfile.NamedTemporaryFile('wb+', dir='/mnt/kiwix', prefix='.upload_', suffix='.tmp', delete=False) - return super()._get_file_stream(total_content_length, content_type, filename, content_length) - -app.request_class = _LargeZimRequest -# ── Netsyms Blueprint ── -from .netsyms_api import netsyms_bp -app.register_blueprint(netsyms_bp) - - # ── Navigation Constants ── KNOWLEDGE_SUBNAV = [ @@ -77,11 +56,6 @@ PEERTUBE_SUBNAV = [ {'href': '/peertube/channels', 'label': 'Channels'}, ] - -KIWIX_SUBNAV = [ - {'href': '/kiwix', 'label': 'Library'}, - {'href': '/kiwix/scraper', 'label': 'Scraper'}, -] SETTINGS_SUBNAV = [ {'href': '/settings/keys', 'label': 'API Keys'}, {'href': '/settings/cookies', 'label': 'YouTube Cookies'}, @@ -934,7 +908,6 @@ def _build_knowledge_stats(): c.source, CASE WHEN c.source = 'stream.echo6.co' THEN 'transcript' - WHEN c.source = 'kiwix' THEN 'wiki' WHEN c.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type, @@ -994,7 +967,6 @@ def _build_knowledge_stats(): d.status, d.concepts_extracted, d.vectors_inserted, CASE WHEN c.source = 'stream.echo6.co' THEN 'transcript' - WHEN c.source = 'kiwix' THEN 'wiki' WHEN d.path LIKE 'http%' THEN 'web' ELSE 'pdf' END as type @@ -1100,12 +1072,6 @@ def start_cache_warmer(stop_event=None): except Exception as e: logger.warning(f" Quick stats warm-up failed: {e}") - try: - _cache['kiwix_sources'] = _build_kiwix_sources() - logger.info(" Kiwix sources cached") - except Exception as e: - logger.warning(f" Kiwix sources warm-up failed: {e}") - logger.info("Cache warmer ready — all data pre-loaded") # Continuous refresh loop @@ -1132,10 +1098,6 @@ def start_cache_warmer(stop_event=None): _cache['quick_stats'] = _build_quick_stats() except Exception: pass - try: - _cache['kiwix_sources'] = _build_kiwix_sources() - except Exception: - pass # PeerTube dashboard: every 30s (cycle 2, offset) if cycle % 2 == 1: @@ -1319,9 +1281,6 @@ def api_keys_reload(): return jsonify({'count': count}) - - - # ── YouTube Cookie Management ── PEERTUBE_HOST = '192.168.1.170' @@ -1971,528 +1930,6 @@ def api_peertube_dashboard(): return jsonify(_cache['pt_dashboard']) - -# ── Kiwix Dashboard ── - -@app.route('/kiwix') -def kiwix_dashboard(): - return render_template('kiwix/dashboard.html', - domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix') - - -@app.route('/kiwix/scraper') -def kiwix_scraper(): - return render_template('kiwix/scraper.html', - domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix/scraper') - - -@app.route('/api/kiwix/sources') -def api_kiwix_sources(): - """Serve pre-cached Kiwix sources data (never blocks).""" - if _cache['kiwix_sources'] is None: - return jsonify({'error': 'Warming up, try again in a few seconds'}), 503 - return jsonify(_cache['kiwix_sources']) - - -@app.route('/api/kiwix/toggle-ingest/', methods=['POST']) -def api_kiwix_toggle_ingest(source_id): - """Toggle ingest_enabled on a ZIM source.""" - db = StatusDB() - conn = db._get_conn() - row = conn.execute("SELECT id, status, ingest_enabled FROM zim_sources WHERE id = ?", (source_id,)).fetchone() - if not row: - return jsonify({'error': 'Source not found'}), 404 - - data = request.get_json(silent=True) or {} - new_val = 1 if data.get('enabled', not row['ingest_enabled']) else 0 - conn.execute("UPDATE zim_sources SET ingest_enabled = ? WHERE id = ?", (new_val, source_id)) - conn.commit() - - # If toggling ON and source is eligible, spawn ingest in background - if new_val == 1 and row['status'] == 'detected': - _spawn_zim_ingest(source_id) - - return jsonify({'ok': True, 'ingest_enabled': new_val}) - - -@app.route('/api/kiwix/trigger-ingest/', methods=['POST']) -def api_kiwix_trigger_ingest(source_id): - """Explicit one-shot ingest trigger.""" - db = StatusDB() - conn = db._get_conn() - row = conn.execute("SELECT id FROM zim_sources WHERE id = ?", (source_id,)).fetchone() - if not row: - return jsonify({'error': 'Source not found'}), 404 - - _spawn_zim_ingest(source_id) - return jsonify({'ok': True}) - - -@app.route('/api/kiwix/upload', methods=['POST']) -def api_kiwix_upload(): - """Accept ZIM file upload, register with kiwix-serve, scan.""" - import subprocess - if 'file' not in request.files: - return jsonify({'error': 'No file provided'}), 400 - - f = request.files['file'] - if not f.filename or not f.filename.endswith('.zim'): - return jsonify({'error': 'File must be a .zim file'}), 400 - - filename = secure_filename(f.filename) - dest = os.path.join('/mnt/kiwix', filename) - - try: - # Stream was written directly to /mnt/kiwix/ by _LargeZimRequest — - # rename in-place instead of copying 100GB+ through f.save() - if hasattr(f.stream, 'name') and f.stream.name: - tmp_path = f.stream.name - f.stream.close() - os.rename(tmp_path, dest) - else: - tmp_dest = dest + '.tmp' - f.save(tmp_dest) - os.rename(tmp_dest, dest) - except Exception as e: - # Clean up any temp files on failure - for p in [locals().get('tmp_path', ''), locals().get('tmp_dest', '')]: - if p and os.path.exists(p): - os.remove(p) - return jsonify({'error': f'Save failed: {e}'}), 500 - - # Register with kiwix-serve library - try: - subprocess.run( - ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'add', dest], - capture_output=True, text=True, timeout=30 - ) - except Exception as e: - logger.warning(f"kiwix-manage add failed: {e}") - - # Scan for new entry (retry — monitorLibrary may need a moment to reload) - import time as _time - from .zim_monitor import scan_zims - for attempt in range(3): - try: - scan_zims() - break - except Exception as e: - logger.warning(f"scan_zims attempt {attempt+1} failed: {e}") - _time.sleep(2) - - # Refresh cache - try: - _cache['kiwix_sources'] = _build_kiwix_sources() - except Exception: - pass - - return jsonify({'ok': True, 'filename': filename}) - - - -def _full_zim_cleanup(source_id): - """Full ZIM cleanup: Qdrant vectors, DB records, kiwix-manage, SIGHUP, file delete. - Returns dict with results. Caller handles cache refresh.""" - import subprocess - import signal - import requests as req - - db = StatusDB() - conn = db._get_conn() - row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() - if not row: - return None - - zim_source = dict(row) - zim_filename = zim_source['zim_filename'] - zim_path = zim_source['zim_path'] - zim_title = zim_source.get('title', zim_filename) - results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False, 'scrape_jobs_deleted': 0} - - # Step 1: Find all document hashes for this ZIM source - doc_hashes = [r['hash'] for r in conn.execute( - "SELECT c.hash FROM catalogue c WHERE c.source = 'kiwix' AND c.category = ?", - (zim_title,) - ).fetchall()] - - # Step 2: Delete vectors from Qdrant - if doc_hashes: - config = get_config() - qdrant_host = config.get('vector_db', {}).get('host', '100.64.0.14') - qdrant_port = config.get('vector_db', {}).get('port', 6333) - collection = config.get('vector_db', {}).get('collection', 'recon_knowledge') - - # Delete in batches of 100 hashes - for i in range(0, len(doc_hashes), 100): - batch = doc_hashes[i:i+100] - try: - resp = req.post( - f"http://{qdrant_host}:{qdrant_port}/collections/{collection}/points/delete", - json={ - "filter": { - "must": [{ - "key": "doc_hash", - "match": {"any": batch} - }] - } - }, - timeout=30 - ) - if resp.status_code == 200: - results['vectors_deleted'] += len(batch) - except Exception as e: - logger.warning(f"Qdrant delete batch failed: {e}") - - # Step 3: Delete DB records - for h in doc_hashes: - # Delete processing directory if it exists - text_dir_row = conn.execute("SELECT text_dir FROM documents WHERE hash = ?", (h,)).fetchone() - if text_dir_row and text_dir_row['text_dir']: - try: - import shutil - shutil.rmtree(text_dir_row['text_dir'], ignore_errors=True) - except Exception: - pass - conn.execute("DELETE FROM documents WHERE hash = ?", (h,)) - conn.execute("DELETE FROM catalogue WHERE hash = ?", (h,)) - results['docs_deleted'] = len(doc_hashes) - - # Delete zim_articles records - conn.execute("DELETE FROM zim_articles WHERE zim_source_id = ?", (source_id,)) - - # Delete zim_sources record - conn.execute("DELETE FROM zim_sources WHERE id = ?", (source_id,)) - conn.commit() - - # Step 4: Remove from kiwix-serve library - try: - subprocess.run( - ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', zim_filename.replace('.zim', '')], - capture_output=True, text=True, timeout=10 - ) - except Exception as e: - logger.warning(f"kiwix-manage remove failed: {e}") - - # Step 4b: SIGHUP kiwix-serve to reload library - try: - result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) - if result.returncode == 0 and result.stdout.strip(): - pid = int(result.stdout.strip().split()[0]) - os.kill(pid, signal.SIGHUP) - logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})") - except Exception as e: - logger.warning(f"Failed to signal kiwix-serve: {e}") - - # Step 5: Delete the ZIM file - if os.path.isfile(zim_path): - try: - os.remove(zim_path) - results['file_deleted'] = True - except Exception as e: - logger.warning(f"ZIM file delete failed: {e}") - results['file_deleted'] = False - - # Step 6: Delete any linked scrape_jobs rows - try: - res = conn.execute("DELETE FROM scrape_jobs WHERE zim_source_id = ?", (source_id,)) - conn.commit() - results['scrape_jobs_deleted'] = res.rowcount - except Exception as e: - logger.warning(f"scrape_jobs cleanup failed: {e}") - - logger.info(f"Full ZIM cleanup for source {source_id} ('{zim_title}'): {results}") - return results - - -@app.route('/api/kiwix/remove/', methods=['POST']) -def api_kiwix_remove(source_id): - """Remove a ZIM source: delete vectors, DB records, library entry, and file.""" - db = StatusDB() - conn = db._get_conn() - row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() - if not row: - return jsonify({'error': 'Source not found'}), 404 - - results = _full_zim_cleanup(source_id) - if results is None: - return jsonify({'error': 'Source not found during cleanup'}), 404 - - # Refresh cache - try: - _cache['kiwix_sources'] = _build_kiwix_sources() - except Exception: - pass - - return jsonify({'ok': True, 'results': results}) - - -def _spawn_zim_ingest(source_id): - """Start ZIM ingestion in a background thread.""" - def _run(): - try: - from .processors.zim_processor import ingest_zim - config = get_config() - db = StatusDB() - logger.info(f"Starting ZIM ingest for source {source_id}") - result = ingest_zim(source_id, db, config) - logger.info(f"ZIM ingest complete for source {source_id}: {result}") - # Refresh cache after completion - try: - _cache['kiwix_sources'] = _build_kiwix_sources() - except Exception: - pass - except Exception as e: - logger.error(f"ZIM ingest failed for source {source_id}: {e}") - - t = threading.Thread(target=_run, daemon=True, name=f'zim-ingest-{source_id}') - t.start() - - -def _build_kiwix_sources(): - """Build Kiwix sources data for the dashboard cache.""" - import urllib.request - - db = StatusDB() - conn = db._get_conn() - - # Get all ZIM sources - rows = conn.execute(""" - SELECT id, zim_filename, title, description, language, category, - article_count, status, processed_count, skipped_count, error_count, - ingest_enabled, detected_at, started_at, completed_at - FROM zim_sources - ORDER BY detected_at DESC - """).fetchall() - - sources = [] - total_articles = 0 - total_processed = 0 - total_in_pipeline = 0 - - for r in rows: - source = dict(r) - zim_title = r['title'] or r['zim_filename'] - total_articles += r['article_count'] or 0 - total_processed += r['processed_count'] or 0 - - # Get pipeline stats for THIS source's documents (filtered by category) - pipeline = {} - try: - pipe_rows = conn.execute(""" - SELECT d.status, COUNT(*) as cnt - FROM documents d - JOIN catalogue c ON d.hash = c.hash - WHERE c.source = 'kiwix' AND c.category = ? - GROUP BY d.status - """, (zim_title,)).fetchall() - for pr in pipe_rows: - pipeline[pr['status']] = pr['cnt'] - except Exception: - pass - - in_pipe = sum(v for k, v in pipeline.items() if k not in ('complete', 'failed')) - total_in_pipeline += in_pipe - source['pipeline'] = pipeline - - # Compute effective status reflecting full pipeline state - db_status = r['status'] - if db_status == 'complete' and pipeline: - if in_pipe > 0: - source['effective_status'] = 'processing' - else: - source['effective_status'] = 'complete' - elif db_status == 'ingesting': - source['effective_status'] = 'extracting' - else: - source['effective_status'] = db_status # 'detected' - - sources.append(source) - - # Check kiwix-serve health - kiwix_status = 'inactive' - try: - resp = urllib.request.urlopen("http://localhost:8430", timeout=3) - if resp.status == 200: - kiwix_status = 'active' - except Exception: - pass - - return { - 'sources': sources, - 'kiwix_serve': {'status': kiwix_status, 'url': 'https://wiki.echo6.co'}, - 'totals': { - 'sources': len(sources), - 'articles': total_articles, - 'processed': total_processed, - 'in_pipeline': total_in_pipeline, - } - } - - - - -# ── Scraper API ── - -@app.route('/api/scraper/submit', methods=['POST']) -def api_scraper_submit(): - """Submit a new web scrape job.""" - data = request.get_json(silent=True) or {} - url = (data.get('url') or '').strip() - - if not url: - return jsonify({'error': 'url is required'}), 400 - if not url.startswith(('http://', 'https://')): - return jsonify({'error': 'URL must start with http:// or https://'}), 400 - - config = get_config() - scraper_cfg = config.get('scraper', {}) - language = data.get('language') or scraper_cfg.get('default_language', 'eng') - title = data.get('title', '').strip() or None - category = data.get('category', '').strip() or None - - db = StatusDB() - conn = db._get_conn() - conn.execute( - "INSERT INTO scrape_jobs (url, title, language, category, crawl_mode) VALUES (?, ?, ?, ?, ?)", - (url, title, language, category, 'zimit') - ) - conn.commit() - job_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] - - logger.info(f"Scraper job {job_id} submitted: {url}") - return jsonify({'ok': True, 'job_id': job_id}), 201 - - -@app.route('/api/scraper/jobs') -def api_scraper_jobs(): - """List scrape jobs, optionally filtered by status.""" - status_filter = request.args.get('status') - db = StatusDB() - jobs = db.get_scrape_jobs(status=status_filter) - return jsonify({'jobs': jobs}) - - -@app.route('/api/scraper/cancel/', methods=['POST']) -def api_scraper_cancel(job_id): - """Cancel a scrape job.""" - - db = StatusDB() - job = db.get_scrape_job(job_id) - if not job: - return jsonify({'error': 'Job not found'}), 404 - - if job['status'] in ('complete', 'cancelled'): - return jsonify({'error': f"Job already {job['status']}"}), 400 - - # Set cancelled in DB — the runner loop checks this between phases - db.update_scrape_job(job_id, status='cancelled') - - # Stop the Docker container if running - container_name = f'recon-scraper-{job_id}' - try: - import subprocess as _subprocess - _subprocess.run(['docker', 'rm', '-f', container_name], - capture_output=True, timeout=10) - except Exception: - pass - - logger.info(f"Scraper job {job_id} cancelled") - return jsonify({'ok': True}) - - -@app.route('/api/scraper/retry/', methods=['POST']) -def api_scraper_retry(job_id): - """Retry a failed or cancelled scrape job.""" - db = StatusDB() - job = db.get_scrape_job(job_id) - if not job: - return jsonify({'error': 'Job not found'}), 404 - - if job['status'] not in ('failed', 'cancelled'): - return jsonify({'error': f"Job status is '{job['status']}', can only retry failed or cancelled jobs"}), 400 - - db.update_scrape_job(job_id, - status='pending', - error_message=None, - subprocess_pid=None, - crawl_mode=None, - started_at=None, - completed_at=None) - - logger.info(f"Scraper job {job_id} reset to pending for retry") - return jsonify({'ok': True}) - - -@app.route('/api/scraper/delete/', methods=['POST']) -def api_scraper_delete(job_id): - """Delete a scrape job and clean up any associated ZIM artifacts.""" - import subprocess - import signal - - db = StatusDB() - job = db.get_scrape_job(job_id) - if not job: - return jsonify({'error': 'Job not found'}), 404 - - if job['status'] == 'running': - return jsonify({'error': 'Cannot delete a running job \u2014 cancel it first'}), 400 - - zim_cleanup_results = None - - # If the job has a linked zim_source, do full cleanup - if job.get('zim_source_id'): - zim_cleanup_results = _full_zim_cleanup(job['zim_source_id']) - try: - _cache['kiwix_sources'] = _build_kiwix_sources() - except Exception: - pass - elif job.get('zim_filename'): - # No zim_source row, but there may be an orphan file + library entry - zim_path = os.path.join('/mnt/kiwix', job['zim_filename']) - if os.path.isfile(zim_path): - try: - os.remove(zim_path) - logger.info(f"Deleted orphan ZIM file: {zim_path}") - except Exception as e: - logger.warning(f"Failed to delete orphan ZIM file {zim_path}: {e}") - try: - subprocess.run( - ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', - job['zim_filename'].replace('.zim', '')], - capture_output=True, text=True, timeout=10 - ) - except Exception as e: - logger.warning(f"kiwix-manage remove failed for orphan: {e}") - try: - result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) - if result.returncode == 0 and result.stdout.strip(): - pid = int(result.stdout.strip().split()[0]) - os.kill(pid, signal.SIGHUP) - logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})") - except Exception as e: - logger.warning(f"Failed to signal kiwix-serve: {e}") - - # Delete the scrape_jobs row (may already be gone if _full_zim_cleanup deleted it) - conn = db._get_conn() - conn.execute("DELETE FROM scrape_jobs WHERE id = ?", (job_id,)) - conn.commit() - - logger.info(f"Scraper job {job_id} deleted (zim_cleanup={zim_cleanup_results})") - return jsonify({'ok': True, 'zim_cleanup': zim_cleanup_results}) - - -@app.route('/api/scraper/clear-failed', methods=['POST']) -def api_scraper_clear_failed(): - """Delete all failed and cancelled scrape jobs.""" - db = StatusDB() - conn = db._get_conn() - result = conn.execute("DELETE FROM scrape_jobs WHERE status IN ('failed', 'cancelled')") - conn.commit() - count = result.rowcount - logger.info(f"Cleared {count} failed/cancelled scraper jobs") - return jsonify({'ok': True, 'deleted': count}) - - # ── Metrics API ── @app.route('/api/metrics/history') diff --git a/lib/aurora_nav_tool.py b/lib/aurora_nav_tool.py deleted file mode 100644 index 2b7285d..0000000 --- a/lib/aurora_nav_tool.py +++ /dev/null @@ -1,117 +0,0 @@ -""" -title: Navigation -author: Echo6 -version: 1.1.0 -description: Turn-by-turn directions and geocoding via Photon + Valhalla on recon-vm. Supports driving, walking, cycling, and truck routing with worldwide coverage (281M places). -""" - -import re -import json -import requests -from pydantic import BaseModel, Field - -_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$') - - -class Tools: - class Valves(BaseModel): - photon_url: str = Field( - default="http://100.64.0.24:2322", - description="Photon geocoding service URL (recon-vm)", - ) - valhalla_url: str = Field( - default="http://100.64.0.24:8002", - description="Valhalla routing service URL (recon-vm)", - ) - - def __init__(self): - self.valves = self.Valves() - - def _geocode(self, query: str): - m = _COORD_RE.match(query.strip()) - if m: - lat, lon = float(m.group(1)), float(m.group(2)) - return lat, lon, query - resp = requests.get( - f"{self.valves.photon_url}/api", - params={"q": query, "limit": 1}, - timeout=10, - ) - resp.raise_for_status() - features = resp.json().get("features", []) - if not features: - return None, None, None - props = features[0]["properties"] - coords = features[0]["geometry"]["coordinates"] - parts = [props.get("name", "")] - for key in ("city", "state", "country"): - v = props.get(key) - if v and v != parts[-1]: - parts.append(v) - return coords[1], coords[0], ", ".join(p for p in parts if p) - - def get_directions( - self, - origin: str, - destination: str, - mode: str = "auto", - ) -> str: - """ - Get turn-by-turn directions between two locations. When this tool returns results, present the directions exactly as returned — do not summarize or rephrase. Include all steps. - - :param origin: Starting location — address, place name, or lat,lon coordinates - :param destination: Destination — address, place name, or lat,lon coordinates - :param mode: Travel mode: auto, pedestrian, bicycle, or truck (default: auto) - :return: Formatted turn-by-turn directions - """ - if mode not in ("auto", "pedestrian", "bicycle", "truck"): - mode = "auto" - - orig_lat, orig_lon, orig_name = self._geocode(origin) - if orig_lat is None: - return f"Could not find location: {origin}" - - dest_lat, dest_lon, dest_name = self._geocode(destination) - if dest_lat is None: - return f"Could not find location: {destination}" - - try: - resp = requests.post( - f"{self.valves.valhalla_url}/route", - json={ - "locations": [ - {"lat": orig_lat, "lon": orig_lon}, - {"lat": dest_lat, "lon": dest_lon}, - ], - "costing": mode, - "directions_options": {"units": "miles"}, - }, - timeout=30, - ) - except requests.RequestException: - return "Navigation service unavailable" - - if resp.status_code != 200: - return "No route found between locations" - - trip = resp.json()["trip"] - summary = trip["summary"] - legs = trip["legs"][0]["maneuvers"] - - miles = round(summary["length"], 1) - minutes = round(summary["time"] / 60, 1) - - lines = [ - f"Directions from {orig_name} to {dest_name} ({mode}):", - f"Distance: {miles} miles | Time: {minutes} minutes", - "", - ] - for i, m in enumerate(legs, 1): - inst = m["instruction"] - dist = m.get("length", 0) - if dist > 0: - lines.append(f"{i}. {inst} — {round(dist, 1)} mi") - else: - lines.append(f"{i}. {inst}") - - return "\n".join(lines) diff --git a/lib/auth.py b/lib/auth.py deleted file mode 100644 index 22b08d2..0000000 --- a/lib/auth.py +++ /dev/null @@ -1,22 +0,0 @@ -""" -RECON Auth Helper — extract user identity from Authentik forward-auth headers. -""" -from functools import wraps -from flask import request, jsonify - - -def get_user_id(): - """Return X-Authentik-Username or None.""" - return request.headers.get('X-Authentik-Username') - - -def require_auth(f): - """Decorator: 401 if no Authentik auth header.""" - @wraps(f) - def wrapper(*args, **kwargs): - user_id = get_user_id() - if not user_id: - return jsonify({'error': 'Authentication required'}), 401 - request.user_id = user_id - return f(*args, **kwargs) - return wrapper diff --git a/lib/deployment_config.py b/lib/deployment_config.py deleted file mode 100644 index ab6aa17..0000000 --- a/lib/deployment_config.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Deployment profile loader. - -Reads RECON_PROFILE env var (default: "home"), loads the matching YAML -from config/profiles/.yaml, and caches the parsed dict in memory. - -Exposes get_deployment_config() as the in-process accessor for the profile. - -Note: its former consumers (the /api/landclass gate, google_places, -place_detail, offroute/router) were all extracted to navi-* services or removed -across cleanups #4–#6/#27 — recon has no remaining caller of -get_deployment_config() today; the module is retained per cleanup #1. -(The former /api/config HTTP endpoint that served this dict to the frontend was -removed once navi-config (:8422) took over that route.) -""" -import os -import yaml -from .utils import setup_logging - -logger = setup_logging('recon.deployment_config') - -_config_cache = None - - -def load_deployment_config(): - """Load and cache the deployment profile. Called once at import time.""" - global _config_cache - - profile = os.environ.get('RECON_PROFILE', 'home') - config_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config', 'profiles') - config_path = os.path.join(config_dir, f'{profile}.yaml') - - if not os.path.exists(config_path): - raise FileNotFoundError( - f"Deployment profile '{profile}' not found at {config_path}. " - f"Available profiles: {', '.join(f.replace('.yaml','') for f in os.listdir(config_dir) if f.endswith('.yaml'))}" - ) - - with open(config_path, 'r') as f: - _config_cache = yaml.safe_load(f) - - logger.info(f"Loaded deployment profile: {profile} ({_config_cache.get('region_name', 'unknown')})") - return _config_cache - - -def get_deployment_config(): - """Return the cached deployment config dict.""" - if _config_cache is None: - load_deployment_config() - return _config_cache - - -# Load on import so startup fails fast if profile is missing -load_deployment_config() diff --git a/lib/embedder.py b/lib/embedder.py index 8dcc45a..35fcb58 100644 --- a/lib/embedder.py +++ b/lib/embedder.py @@ -10,7 +10,6 @@ Dependencies: requests, qdrant-client Config: embedding, vector_db, processing.embed_workers """ import json -import re import os import time import traceback @@ -291,17 +290,7 @@ def embed_single(file_hash, db, config): page_timestamps = meta['page_timestamps'] except Exception: pass - # For ZIM articles, build wiki.echo6.co URL from meta.json - if source_type == 'zim' and meta.get('article_path'): - from urllib.parse import quote as url_quote - zim_name = meta.get('zim_name', '') - if not zim_name: - # Derive from zim_file: strip only .zim extension, keep full name - zf = meta.get('zim_file', '') - zim_name = zf.removesuffix('.zim') - article_path = url_quote(meta['article_path'], safe='/:@!$&()*+,;=-._~') - download_url = f'https://wiki.echo6.co/content/{zim_name}/{article_path}' - elif doc.get('path'): + if doc.get('path'): download_url = generate_download_url( doc['path'], config.get('library_root', '/mnt/library') ) diff --git a/lib/enricher.py b/lib/enricher.py index e1e583c..d9540aa 100644 --- a/lib/enricher.py +++ b/lib/enricher.py @@ -27,15 +27,6 @@ from .utils import get_config, setup_logging from .status import StatusDB from .utils import resolve_text_dir -try: - from langdetect import detect as _detect_lang - from langdetect.lang_detect_exception import LangDetectException - _HAS_LANGDETECT = True -except ImportError: - _HAS_LANGDETECT = False - -ALLOWED_LANGUAGES = {'en'} # Default: English only - logger = setup_logging('recon.enricher') # Docs stuck in "enriching" longer than this get reset to "extracted" for retry @@ -350,42 +341,6 @@ def validate_and_fix_concepts(concepts, key, config): return concepts -def _check_language(text_dir, config): - """Check language of document text. Returns (is_allowed, detected_lang). - - Reads first 1000 chars from first page file and uses langdetect. - Returns (True, lang) if language is allowed, (False, lang) if not. - Falls back to (True, 'unknown') if detection fails (benefit of the doubt). - """ - if not _HAS_LANGDETECT: - return True, 'unknown' - - # Check if language filter is enabled in config - pipeline_cfg = config.get('pipeline', {}) - if not pipeline_cfg.get('language_filter', True): - return True, 'disabled' - - allowed = set(pipeline_cfg.get('allowed_languages', ['en'])) - - # Read first page for detection - page_files = sorted([f for f in os.listdir(text_dir) - if f.startswith('page_') and f.endswith('.txt')]) - if not page_files: - return True, 'no_pages' - - try: - with open(os.path.join(text_dir, page_files[0]), encoding='utf-8') as f: - sample = f.read(1500) - if len(sample.strip()) < 50: - return True, 'too_short' - lang = _detect_lang(sample) - return (lang in allowed), lang - except LangDetectException: - return True, 'detection_failed' - except Exception: - return True, 'error' - - def enrich_single(file_hash, db, config, key_rotator): doc = db.get_document(file_hash) if not doc: @@ -404,14 +359,6 @@ def enrich_single(file_hash, db, config, key_rotator): db.mark_failed(file_hash, f"Text directory not found: {text_dir}") return False - # Language gate: skip non-English documents before burning Gemini quota - lang_ok, detected_lang = _check_language(text_dir, config) - if not lang_ok: - logger.info(f"Skipping {file_hash[:12]}... detected language '{detected_lang}' " - f"(allowed: {config.get('pipeline', {}).get('allowed_languages', ['en'])})") - db.mark_failed(file_hash, f"Language filter: detected '{detected_lang}', not in allowed list") - return False - db.update_status(file_hash, 'enriching') try: diff --git a/lib/extractor.py b/lib/extractor.py index bc236ab..13159c9 100644 --- a/lib/extractor.py +++ b/lib/extractor.py @@ -21,7 +21,6 @@ Config: processing.extract_workers, processing.max_pdf_size_mb, processing.extract_timeout, processing.page_timeout """ import base64 -import re import json import os import random @@ -100,40 +99,6 @@ def _is_transient(error_str): return any(sig in s for sig in transient_signals) -def _text_quality_ok(text, min_length=50): - """Check if extracted text meets quality thresholds. - - Beyond the basic length check, validates: - - Word-boundary ratio: at least 60% of tokens should be real words (2+ alpha chars) - - Concatenation ratio: lowercase-immediately-followed-by-uppercase shouldn't exceed 10% of word count - - Returns True if text passes all checks. - """ - text = text.strip() - if len(text) < min_length: - return False - - words = text.split() - if not words: - return False - - # Word-like ratio: tokens with 2+ alphabetic characters - word_like = sum(1 for w in words if len(re.findall(r'[a-zA-Z]', w)) >= 2) - word_ratio = word_like / len(words) - if word_ratio < 0.60: - return False - - # Concatenation detector: lowercase immediately followed by uppercase - # Filter out common camelCase patterns in code (short tokens) - concat_hits = len(re.findall(r'[a-z][A-Z]', text)) - concat_ratio = concat_hits / len(words) if words else 0 - if concat_ratio > 0.10: - return False - - return True - - - def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30): """Render a single PDF page to PNG bytes using pdftoppm. @@ -259,7 +224,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): # Method 1: pdftotext (poppler) try: result = subprocess.run( - ['pdftotext', '-layout', '-f', str(page_num_0indexed + 1), + ['pdftotext', '-f', str(page_num_0indexed + 1), '-l', str(page_num_0indexed + 1), pdf_path, '-'], capture_output=True, text=True, timeout=page_timeout ) @@ -268,7 +233,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): except Exception: pass - if _text_quality_ok(text): + if len(text.strip()) >= 50: return text, 'pdftotext' # Method 2: pdftoppm + Tesseract OCR @@ -293,7 +258,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): except Exception: pass - if _text_quality_ok(text): + if len(text.strip()) >= 50: return text, 'tesseract' # Method 3: Gemini Vision (last resort) @@ -311,26 +276,8 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30): # ── Core extraction functions ── def _pypdf2_extract(reader, page_num): - """Extract text from a PyPDF2 page object. Runs inside a thread for timeout. - - Tries default extraction first (space_width=200). If quality check fails, - retries with space_width=100 which better detects word boundaries in - tightly-kerned PDFs (common in Haynes/workshop manuals). - - Note: PyPDF2 3.0.1 does not support layout=True. The space_width parameter - controls word-boundary detection tolerance. Lower values = more aggressive - space insertion between characters. - """ - text = reader.pages[page_num].extract_text() or '' - if _text_quality_ok(text): - return text - - # Retry with tighter word-boundary detection - text_tight = reader.pages[page_num].extract_text(space_width=100.0) or '' - if len(text_tight.strip()) >= len(text.strip()): - return text_tight - - return text + """Extract text from a PyPDF2 page object. Runs inside a thread for timeout.""" + return reader.pages[page_num].extract_text() or '' def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): @@ -355,13 +302,13 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): except Exception: text = '' - if _text_quality_ok(text): + if len(text.strip()) >= 50: return text, 'pypdf2' # Method 2: pdftotext via subprocess (inherently timeout-safe) try: result = subprocess.run( - ['pdftotext', '-layout', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'], + ['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'], capture_output=True, text=True, timeout=page_timeout ) if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()): @@ -369,7 +316,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): except Exception: pass - if _text_quality_ok(text): + if len(text.strip()) >= 50: return text, 'pdftotext' # Method 3: pdftoppm + Tesseract OCR @@ -393,7 +340,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30): except Exception: pass - if _text_quality_ok(text): + if len(text.strip()) >= 50: return text, 'tesseract' # Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs) diff --git a/lib/netsyms.py b/lib/netsyms.py deleted file mode 100644 index d51162e..0000000 --- a/lib/netsyms.py +++ /dev/null @@ -1,228 +0,0 @@ -""" -RECON Netsyms AddressDatabase2025 — SQLite-backed US+CA address lookup. - -Provides 159.78M geocoded addresses as tier-2 between address book -(exact named locations) and Photon (full-text global geocoding). - -Database: /mnt/nav/addresses/AddressDatabase2025.sqlite (read-only) -""" - -import os -import re -import sqlite3 -import threading - -from .utils import setup_logging - -logger = setup_logging('recon.netsyms') - -_DB_PATH = '/mnt/nav/addresses/AddressDatabase2025.sqlite' - -_conn = None -_lock = threading.Lock() -_cached_row_count = None - -# US states + DC + territories, CA provinces, for free-text parsing -_STATE_CODES = { - 'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', - 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', - 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', - 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', - 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY', - 'DC', 'PR', 'VI', 'GU', 'AS', 'MP', - # Canadian provinces - 'AB', 'BC', 'MB', 'NB', 'NL', 'NS', 'NT', 'NU', 'ON', 'PE', - 'QC', 'SK', 'YT', -} - -_NUMBER_RE = re.compile(r'^(\d+[\w-]*)(.*)$') - - -def _get_conn(): - """Lazy-open a read-only SQLite connection.""" - global _conn - if _conn is not None: - return _conn - with _lock: - if _conn is not None: - return _conn - uri = f'file:{_DB_PATH}?mode=ro' - _conn = sqlite3.connect(uri, uri=True, check_same_thread=False) - _conn.row_factory = sqlite3.Row - logger.info("Netsyms DB opened: %s", _DB_PATH) - return _conn - - -def _row_to_dict(row): - """Convert a sqlite3.Row to a plain dict with lat/lon keys.""" - return { - 'zipcode': row['zipcode'], - 'number': row['number'], - 'street': row['street'], - 'street2': row['street2'], - 'city': row['city'], - 'state': row['state'], - 'plus4': row['plus4'], - 'country': row['country'], - 'lat': float(row['latitude']), - 'lon': float(row['longitude']), - 'source': row['source'], - } - - -def lookup_by_street(number, street, city=None, state=None, - zipcode=None, country=None, limit=20): - """Match on number + street, with optional qualifiers.""" - conn = _get_conn() - clauses = ['number = ?', 'street = ?'] - params = [str(number).strip().upper(), street.strip().upper()] - - if city: - clauses.append('city = ?') - params.append(city.strip().upper()) - if state: - clauses.append('state = ?') - params.append(state.strip().upper()) - if zipcode: - clauses.append('zipcode = ?') - params.append(zipcode.strip()) - if country: - clauses.append('country = ?') - params.append(country.strip().upper()) - - sql = f"SELECT * FROM addresses WHERE {' AND '.join(clauses)} LIMIT ?" - params.append(limit) - - with _lock: - try: - rows = conn.execute(sql, params).fetchall() - except sqlite3.Error as e: - logger.warning("Netsyms lookup_by_street error: %s", e) - return [] - - results = [_row_to_dict(r) for r in rows] - logger.debug("lookup_by_street(%s, %s, city=%s, state=%s) → %d results", - number, street, city, state, len(results)) - return results - - -def lookup_free_text(query, country_hint=None): - """Parse a free-text address and look it up.""" - q = query.strip() - if not q: - return [] - - # Strip trailing zipcode if present - zipcode = None - zip_match = re.search(r'\b(\d{5})\s*$', q) - if zip_match: - zipcode = zip_match.group(1) - q = q[:zip_match.start()].strip().rstrip(',').strip() - - # Strip trailing state - tokens = re.split(r'[,\s]+', q) - tokens = [t for t in tokens if t] - if not tokens: - return [] - - state = None - if len(tokens) >= 2 and tokens[-1].upper() in _STATE_CODES: - state = tokens[-1].upper() - tokens = tokens[:-1] - - # Leading digits → number - number = None - if tokens and re.match(r'^\d', tokens[0]): - number = tokens[0] - tokens = tokens[1:] - - if not tokens: - # Only a number, or empty — try zipcode if we have one - if zipcode: - return lookup_by_zipcode(zipcode, limit=20) - return [] - - # If state was found and we have 2+ tokens remaining, last token is city - city = None - if state and len(tokens) >= 2: - city = tokens[-1] - tokens = tokens[:-1] - - street = ' '.join(tokens) - - if number: - results = lookup_by_street(number, street, city=city, state=state, - zipcode=zipcode, country=country_hint) - if results: - logger.debug("lookup_free_text(%r) → %d results via street match", - query, len(results)) - return results - - # Fallback: try zipcode only if available - if zipcode: - return lookup_by_zipcode(zipcode, limit=20) - - logger.debug("lookup_free_text(%r) → 0 results", query) - return [] - - -def lookup_by_zipcode(zipcode, limit=100): - """Direct zipcode lookup.""" - conn = _get_conn() - sql = "SELECT * FROM addresses WHERE zipcode = ? LIMIT ?" - params = [zipcode.strip(), limit] - - with _lock: - try: - rows = conn.execute(sql, params).fetchall() - except sqlite3.Error as e: - logger.warning("Netsyms lookup_by_zipcode error: %s", e) - return [] - - results = [_row_to_dict(r) for r in rows] - logger.debug("lookup_by_zipcode(%s) → %d results", zipcode, len(results)) - return results - - -def health(): - """Health check with cached row count.""" - global _cached_row_count - - try: - file_size = os.path.getsize(_DB_PATH) - except OSError: - return {'ok': False, 'row_count': 0, 'file_size_bytes': 0, - 'indexed_countries': []} - - try: - conn = _get_conn() - except Exception: - return {'ok': False, 'row_count': 0, 'file_size_bytes': file_size, - 'indexed_countries': []} - - if _cached_row_count is None: - with _lock: - if _cached_row_count is None: - try: - row = conn.execute( - "SELECT COUNT(*) AS cnt FROM addresses" - ).fetchone() - _cached_row_count = row['cnt'] - except sqlite3.Error: - _cached_row_count = 0 - - with _lock: - try: - rows = conn.execute( - "SELECT DISTINCT country FROM addresses" - ).fetchall() - countries = sorted(r['country'] for r in rows) - except sqlite3.Error: - countries = [] - - return { - 'ok': True, - 'row_count': _cached_row_count, - 'file_size_bytes': file_size, - 'indexed_countries': countries, - } diff --git a/lib/netsyms_api.py b/lib/netsyms_api.py deleted file mode 100644 index dbae24e..0000000 --- a/lib/netsyms_api.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -RECON Netsyms API — Flask Blueprint. - -GET /api/netsyms/lookup?q=&country= -GET /api/netsyms/health -""" - -from flask import Blueprint, request, jsonify - -from . import netsyms -from .utils import setup_logging - -logger = setup_logging('recon.netsyms_api') - -netsyms_bp = Blueprint('netsyms', __name__) - - -@netsyms_bp.route('/api/netsyms/lookup') -def api_netsyms_lookup(): - q = request.args.get('q', '').strip() - if not q: - return jsonify({'error': 'Missing q parameter'}), 400 - - country = request.args.get('country', '').strip() or None - results = netsyms.lookup_free_text(q, country_hint=country) - return jsonify({'results': results, 'count': len(results), 'query': q}) - - -@netsyms_bp.route('/api/netsyms/health') -def api_netsyms_health(): - return jsonify(netsyms.health()) diff --git a/lib/netsyms_test.py b/lib/netsyms_test.py deleted file mode 100644 index ed70472..0000000 --- a/lib/netsyms_test.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 -"""Tests for Netsyms address database module.""" - -import sys -import os - -# Ensure the lib directory is importable -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from lib import netsyms - - -def test_lookup_by_street_lowercase(): - results = netsyms.lookup_by_street("214", "North St", city="Filer", state="ID") - assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" - r = results[0] - assert abs(r['lat'] - 42.5736) < 0.01, f"Lat mismatch: {r['lat']}" - assert abs(r['lon'] - (-114.6066)) < 0.01, f"Lon mismatch: {r['lon']}" - print(" PASS: lookup_by_street (lowercase)") - - -def test_lookup_by_street_uppercase(): - results = netsyms.lookup_by_street("214", "NORTH ST", city="FILER", state="ID") - assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" - r = results[0] - assert abs(r['lat'] - 42.5736) < 0.01, f"Lat mismatch: {r['lat']}" - print(" PASS: lookup_by_street (uppercase)") - - -def test_lookup_nonexistent(): - results = netsyms.lookup_by_street("999999", "Nonexistent Rd", - city="Filer", state="ID") - assert results == [], f"Expected empty list, got {len(results)} results" - print(" PASS: lookup_by_street (nonexistent)") - - -def test_free_text_with_commas(): - results = netsyms.lookup_free_text("214 North St, Filer, ID") - assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" - r = results[0] - assert r['city'] == 'FILER', f"City mismatch: {r['city']}" - assert r['state'] == 'ID', f"State mismatch: {r['state']}" - print(" PASS: lookup_free_text (commas)") - - -def test_free_text_no_commas(): - results = netsyms.lookup_free_text("214 North St Filer ID") - assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}" - r = results[0] - assert r['state'] == 'ID', f"State mismatch: {r['state']}" - print(" PASS: lookup_free_text (no commas)") - - -def test_lookup_by_zipcode(): - results = netsyms.lookup_by_zipcode("83328", limit=5) - assert len(results) == 5, f"Expected 5 results, got {len(results)}" - for r in results: - assert r['zipcode'] == '83328', f"Zipcode mismatch: {r['zipcode']}" - print(" PASS: lookup_by_zipcode") - - -def test_health(): - h = netsyms.health() - assert h['ok'] is True, f"Health not OK: {h}" - assert h['row_count'] >= 159_000_000, f"Row count too low: {h['row_count']}" - assert 'US' in h['indexed_countries'], f"US not in countries: {h['indexed_countries']}" - assert 'CA' in h['indexed_countries'], f"CA not in countries: {h['indexed_countries']}" - print(" PASS: health") - - -if __name__ == '__main__': - print("Running Netsyms tests...") - test_lookup_by_street_lowercase() - test_lookup_by_street_uppercase() - test_lookup_nonexistent() - test_free_text_with_commas() - test_free_text_no_commas() - test_lookup_by_zipcode() - test_health() - print("All tests passed.") diff --git a/lib/processors/zim_processor.py b/lib/processors/zim_processor.py deleted file mode 100644 index 6f5c887..0000000 --- a/lib/processors/zim_processor.py +++ /dev/null @@ -1,493 +0,0 @@ -""" -RECON ZIM Processor - -Batch importer for ZIM files. Opens a ZIM via python-libzim, iterates -HTML articles, strips to clean text, creates processing directories, -and registers each article as "extracted" for the enricher to pick up. - -This is NOT a dispatcher-style processor (no pre_flight). ZIMs contain -thousands of articles — ingestion is triggered explicitly or by the -ZIM monitor. - -Usage: - python3 -m lib.processors.zim_processor --zim-source-id 1 - python3 -m lib.processors.zim_processor --zim-source-id 1 --limit 100 --batch-size 50 -""" -import argparse -import hashlib -import json -import logging -import os -import re -import sys -import time - -from lxml import html as lxml_html - -sys.path.insert(0, "/opt/recon") - -from lib.utils import setup_logging, get_config -from lib.status import StatusDB -from lib.web_scraper import chunk_text - -logger = logging.getLogger("recon.processors.zim") - -WORDS_PER_PAGE = 2000 -MIN_TEXT_LENGTH = 200 - -# Elements to strip before text extraction -STRIP_TAGS = {'nav', 'footer', 'script', 'style', 'header', 'aside'} - -# Non-English article path suffix pattern (MediaWiki ZIMs use /XX or /XXX suffixes) -# Matches paths ending in /xx where xx is a 2-3 letter lowercase language code -_LANG_SUFFIX_RE = re.compile(r'/[a-z]{2,3}$') -# Common ISO 639-1/2 language codes to filter (excludes 'en') -_NON_EN_LANGS = { - 'aa','ab','af','ak','am','an','ar','as','av','ay','az', - 'ba','be','bg','bh','bi','bm','bn','bo','br','bs', - 'ca','ce','ch','co','cr','cs','cu','cv','cy', - 'da','de','dv','dz', - 'ee','el','eo','es','et','eu', - 'fa','ff','fi','fj','fo','fr','fy', - 'ga','gd','gl','gn','gu','gv', - 'ha','he','hi','ho','hr','ht','hu','hy','hz', - 'ia','id','ie','ig','ii','ik','io','is','it','iu', - 'ja','jv', - 'ka','kg','ki','kj','kk','kl','km','kn','ko','kr','ks','ku','kv','kw','ky', - 'la','lb','lg','li','ln','lo','lt','lu','lv', - 'mg','mh','mi','mk','ml','mn','mo','mr','ms','mt','my', - 'na','nb','nd','ne','ng','nl','nn','no','nr','nv','ny', - 'oc','oj','om','or','os', - 'pa','pi','pl','ps','pt', - 'qu', - 'rm','rn','ro','ru','rw', - 'sa','sc','sd','se','sg','sh','si','sk','sl','sm','sn','so','sq','sr','ss','st','su','sv','sw', - 'ta','te','tg','th','ti','tk','tl','tn','to','tr','ts','tt','tw','ty', - 'ug','uk','ur','uz', - 've','vi','vo', - 'wa','wo', - 'xh', - 'yi','yo', - 'za','zh','zu', -} - - -def _text_hash(text): - """Compute MD5 hash of text content (matching content_hash style).""" - return hashlib.md5(text.encode('utf-8')).hexdigest() - - -def _flatten_table(table_el): - """Convert a element to pipe-delimited text. - - Each becomes a row with cells joined by ' | '. - Returns the formatted table as a string with blank lines around it. - """ - rows = [] - for tr in table_el.iter('tr'): - cells = [] - for cell in tr: - if cell.tag in ('td', 'th'): - cell_text = (cell.text_content() or '').strip() - # Collapse internal whitespace in each cell - cell_text = re.sub(r'\s+', ' ', cell_text) - if cell_text: - cells.append(cell_text) - if cells: - rows.append(' | '.join(cells)) - if not rows: - return '' - return '\n'.join(rows) - - -def _preprocess_tree(doc): - """Pre-process HTML tree to add delimiters before text_content() flattens it. - - Handles:
,
,
  • ,
    ,
    -- elements that lxml's - text_content() would concatenate without any separators. - """ - from lxml import etree - - # 1. Replace
  • elements with their pipe-delimited text - for table in list(doc.iter('table')): - formatted = _flatten_table(table) - if formatted: - replacement = etree.Element('div') - replacement.text = '\n\n' + formatted + '\n\n' - parent = table.getparent() - if parent is not None: - parent.replace(table, replacement) - else: - table.drop_tree() - - # 2.
    -> inject newline - for br in list(doc.iter('br')): - br.tail = '\n' + (br.tail or '') - - # 3.
  • -> inject newline + "- " prefix - for li in list(doc.iter('li')): - li.text = '- ' + (li.text or '') - li.tail = '\n' + (li.tail or '') - - # 4.
    -> inject newline before - for dt in list(doc.iter('dt')): - dt.tail = '\n' + (dt.tail or '') - - # 5.
    -> inject newline + indent - for dd in list(doc.iter('dd')): - dd.text = ' ' + (dd.text or '') - dd.tail = '\n' + (dd.tail or '') - - -def _html_to_text(html_bytes): - """Convert HTML bytes to clean text via lxml. - - Strips nav, footer, script, style elements. Decodes entities. - Pre-processes tables, lists, and line breaks for proper delimiters. - Normalizes whitespace. - """ - try: - doc = lxml_html.fromstring(html_bytes) - except Exception: - return "" - - # Strip unwanted elements - for tag in STRIP_TAGS: - for el in doc.iter(tag): - el.drop_tree() - - # Pre-process tree: tables -> pipe-delimited, br -> newlines, li -> dashes - _preprocess_tree(doc) - - # Extract text - text = doc.text_content() - - # Normalize whitespace: collapse runs of spaces, normalize newlines - text = re.sub(r'[ \t]+', ' ', text) - text = re.sub(r'\n{3,}', '\n\n', text) - text = text.strip() - - return text - - -def ingest_zim(zim_source_id, db, config, stop_event=None, - batch_size=100, batch_delay=1.0, limit=None): - """Process all articles from a ZIM file registered in zim_sources. - - - Reads zim_path from zim_sources table - - Iterates articles, creates processing dirs, registers in DB - - Checkpoints progress via zim_sources.last_checkpoint - - Respects stop_event for graceful shutdown - - Yields after each batch to avoid monopolizing resources - - Args: - zim_source_id: ID in zim_sources table - db: StatusDB instance - config: RECON config dict - stop_event: threading.Event for graceful shutdown (optional) - batch_size: articles per batch before sleeping - batch_delay: seconds to sleep between batches - limit: max articles to process (None = all) - - Returns: - dict with counts: processed, skipped, duplicates, errors - """ - from libzim.reader import Archive - - conn = db._get_conn() - - # Load ZIM source record - row = conn.execute( - "SELECT * FROM zim_sources WHERE id = ?", (zim_source_id,) - ).fetchone() - if not row: - logger.error("ZIM source ID %d not found", zim_source_id) - return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} - - zim_source = dict(row) - zim_path = zim_source['zim_path'] - zim_filename = zim_source['zim_filename'] - zim_title = zim_source.get('title') or zim_filename - - if not os.path.isfile(zim_path): - logger.error("ZIM file not found: %s", zim_path) - return {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} - - logger.info("Opening ZIM: %s (%s)", zim_title, zim_filename) - zim = Archive(zim_path) - total_entries = zim.entry_count - - # Read checkpoint to resume from - last_checkpoint = zim_source.get('last_checkpoint') - start_idx = 0 - if last_checkpoint: - try: - start_idx = int(last_checkpoint) - logger.info("Resuming from checkpoint: entry %d", start_idx) - except ValueError: - logger.warning("Invalid checkpoint value: %s, starting from 0", last_checkpoint) - - # Update status to ingesting - conn.execute( - "UPDATE zim_sources SET status = 'ingesting', started_at = CURRENT_TIMESTAMP WHERE id = ?", - (zim_source_id,) - ) - conn.commit() - - processing_root = config.get('pipeline', {}).get( - 'processing_root', '/opt/recon/data/processing' - ) - - # Get already-processed article paths for this ZIM source (dedup within ZIM) - existing_paths = set() - for r in conn.execute( - "SELECT article_path FROM zim_articles WHERE zim_source_id = ?", - (zim_source_id,) - ).fetchall(): - existing_paths.add(r['article_path']) - - stats = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} - # Track what was already flushed to DB to avoid double-counting - flushed = {'processed': 0, 'skipped': 0, 'duplicates': 0, 'errors': 0} - batch_count = 0 - total_processed_this_run = 0 - last_entry_idx = start_idx - - for entry_idx in range(start_idx, total_entries): - if stop_event and stop_event.is_set(): - logger.info("Stop event set, halting ZIM ingest at entry %d", entry_idx) - break - - if limit and total_processed_this_run >= limit: - logger.info("Reached limit of %d articles", limit) - break - - last_entry_idx = entry_idx - - try: - entry = zim._get_entry_by_id(entry_idx) - except Exception: - continue - - # Skip redirects - if entry.is_redirect: - continue - - try: - item = entry.get_item() - except Exception: - continue - - # Skip non-HTML - if item.mimetype != "text/html": - continue - - article_path = entry.path - article_title = entry.title - - # Skip if already processed in a prior run - if article_path in existing_paths: - continue - - # Skip non-English articles (MediaWiki translation suffix pattern) - lang_match = _LANG_SUFFIX_RE.search(article_path) - if lang_match and lang_match.group(0)[1:] in _NON_EN_LANGS: - stats['skipped'] += 1 - total_processed_this_run += 1 - continue - - # Extract and clean text - try: - html_bytes = bytes(item.content) - clean_text = _html_to_text(html_bytes) - except Exception as e: - logger.debug("HTML extraction failed for %s: %s", article_path, e) - stats['errors'] += 1 - continue - - # Skip stubs - if len(clean_text) < MIN_TEXT_LENGTH: - stats['skipped'] += 1 - continue - - # Compute content hash - file_hash = _text_hash(clean_text) - - # Deduplicate against existing catalogue - cat_row = conn.execute( - "SELECT hash FROM catalogue WHERE hash = ?", (file_hash,) - ).fetchone() - if cat_row: - # Record in zim_articles as skipped duplicate - conn.execute( - """INSERT OR IGNORE INTO zim_articles - (zim_source_id, article_path, article_title, status, processed_at) - VALUES (?, ?, ?, 'skipped', CURRENT_TIMESTAMP)""", - (zim_source_id, article_path, article_title) - ) - stats['duplicates'] += 1 - total_processed_this_run += 1 - continue - - # Create processing directory - proc_dir = os.path.join(processing_root, file_hash) - try: - os.makedirs(proc_dir, exist_ok=True) - except Exception as e: - logger.error("Cannot create processing dir %s: %s", proc_dir, e) - stats['errors'] += 1 - continue - - # Split into page files - pages = chunk_text(clean_text, WORDS_PER_PAGE) - for i, page_text in enumerate(pages, start=1): - page_path = os.path.join(proc_dir, "page_{:04d}.txt".format(i)) - with open(page_path, 'w', encoding='utf-8') as f: - f.write(page_text) - - # Write meta.json - meta = { - 'hash': file_hash, - 'filename': article_title + '.html', - 'source_type': 'zim', - 'zim_file': zim_filename, - 'zim_source_id': zim_source_id, - 'article_title': article_title, - 'article_path': article_path, - 'page_count': len(pages), - 'text_length': len(clean_text), - } - with open(os.path.join(proc_dir, 'meta.json'), 'w', encoding='utf-8') as f: - json.dump(meta, f, indent=2) - - # Register in catalogue - db.add_to_catalogue( - file_hash, - article_title + '.html', - zim_path, # source path is the ZIM file - len(clean_text), # size in bytes (text) - 'kiwix', # source - zim_title, # category = ZIM title - ) - - # Queue document - db.queue_document(file_hash) - - # Set text_dir, page_count, book_title on documents row - # Mark organized_at immediately (ZIM articles don't get filed to library) - conn.execute( - "UPDATE documents SET text_dir = ?, page_count = ?, " - "book_title = ?, organized_at = CURRENT_TIMESTAMP " - "WHERE hash = ?", - (proc_dir, len(pages), article_title, file_hash) - ) - - # Update status to extracted - db.update_status(file_hash, 'extracted', pages_extracted=len(pages)) - - # Record in zim_articles - conn.execute( - """INSERT OR IGNORE INTO zim_articles - (zim_source_id, article_path, article_title, status, processed_at) - VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)""", - (zim_source_id, article_path, article_title) - ) - conn.commit() - - stats['processed'] += 1 - total_processed_this_run += 1 - batch_count += 1 - - # Progress logging - total_done = zim_source['processed_count'] + stats['processed'] - article_count = zim_source.get('article_count', 0) - if stats['processed'] % 500 == 0 and article_count > 0: - pct = total_done / article_count * 100 - logger.info( - "ZIM ingest [%s]: %s/%s (%.1f%%)", - zim_title, f"{total_done:,}", f"{article_count:,}", pct - ) - - # Batch checkpoint — flush only the delta since last flush - if batch_count >= batch_size: - delta_p = stats['processed'] - flushed['processed'] - delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) - delta_e = stats['errors'] - flushed['errors'] - conn.execute( - "UPDATE zim_sources SET processed_count = processed_count + ?, " - "skipped_count = skipped_count + ?, error_count = error_count + ?, " - "last_checkpoint = ? WHERE id = ?", - (delta_p, delta_s, delta_e, str(entry_idx + 1), zim_source_id) - ) - conn.commit() - flushed['processed'] = stats['processed'] - flushed['skipped'] = stats['skipped'] - flushed['duplicates'] = stats['duplicates'] - flushed['errors'] = stats['errors'] - - batch_count = 0 - - if batch_delay > 0: - time.sleep(batch_delay) - - # Final checkpoint — flush only the unflushed delta - final_status = 'complete' - if limit and total_processed_this_run >= limit: - final_status = 'ingesting' # not done yet, just hit the limit - - delta_p = stats['processed'] - flushed['processed'] - delta_s = (stats['skipped'] + stats['duplicates']) - (flushed['skipped'] + flushed['duplicates']) - delta_e = stats['errors'] - flushed['errors'] - - conn.execute( - "UPDATE zim_sources SET processed_count = processed_count + ?, " - "skipped_count = skipped_count + ?, error_count = error_count + ?, " - "last_checkpoint = ?, status = ?, completed_at = CASE WHEN ? = 'complete' THEN CURRENT_TIMESTAMP ELSE completed_at END " - "WHERE id = ?", - (delta_p, delta_s, delta_e, str(last_entry_idx + 1), - final_status, final_status, zim_source_id) - ) - conn.commit() - - logger.info( - "ZIM ingest [%s] %s: %d processed, %d skipped, %d duplicates, %d errors", - zim_title, final_status, - stats['processed'], stats['skipped'], stats['duplicates'], stats['errors'] - ) - - return stats - - -def main(): - """CLI entry point for standalone ZIM processing.""" - parser = argparse.ArgumentParser(description="RECON ZIM Processor") - parser.add_argument('--zim-source-id', type=int, required=True, - help="ID from zim_sources table") - parser.add_argument('--batch-size', type=int, default=100, - help="Articles per batch (default: 100)") - parser.add_argument('--batch-delay', type=float, default=1.0, - help="Seconds between batches (default: 1.0)") - parser.add_argument('--limit', type=int, default=None, - help="Max articles to process (default: all)") - args = parser.parse_args() - - setup_logging('recon.processors.zim') - - config = get_config() - db = StatusDB(config['paths']['db']) - - stats = ingest_zim( - zim_source_id=args.zim_source_id, - db=db, - config=config, - batch_size=args.batch_size, - batch_delay=args.batch_delay, - limit=args.limit, - ) - - print(f"\nResults: {stats['processed']} processed, {stats['skipped']} skipped, " - f"{stats['duplicates']} duplicates, {stats['errors']} errors") - - -if __name__ == "__main__": - main() diff --git a/lib/query_router.py b/lib/query_router.py deleted file mode 100644 index dda14a2..0000000 --- a/lib/query_router.py +++ /dev/null @@ -1,161 +0,0 @@ -"""Semantic query router for Aurora. - -Classifies user queries into routes (nav_route, nav_reverse_geocode, -direct_answer, rag_search) by comparing query embeddings against -pre-computed route centroids from example queries. - -TEI endpoint: http://100.64.0.14:8090/embed (cortex via Tailscale) -""" - -import math -import threading -import requests - -# ── Route examples ──────────────────────────────────────────────────────────── -ROUTE_EXAMPLES = { - "nav_route": [ - "how do I get to Boise", - "directions to Twin Falls", - "how do I get from Buhl to Boise", - "drive from Jerome to Sun Valley", - "route from Boise to McCall", - "what's the fastest way to Sun Valley", - "how far is it to Twin Falls", - "take me to Shoshone", - "navigate to the airport", - "how do I drive to Salt Lake City", - "walking directions to the park", - "bike route to downtown", - ], - "nav_reverse_geocode": [ - "what town is at 42.5, -114.7", - "where am I right now", - "what is at coordinates 43.6, -116.2", - "what location is 42.574, -114.607", - "where is this place 44.0, -114.3", - "what city is near 42.7, -114.5", - "reverse geocode 43.0, -115.0", - "what's at this location 42.9, -114.8", - ], - "direct_answer": [ - "hello", - "hey aurora", - "good morning", - "thanks", - "thank you", - "what's your name", - "who are you", - "tell me a joke", - "how are you", - "hi there", - ], - "rag_search": [ - "what does the survival manual say about water", - "how to purify water in the field", - "how to treat a gunshot wound", - "what is the ranger handbook chapter on patrolling", - "field manual water purification", - "how to build a shelter in the wilderness", - "tactical combat casualty care procedures", - "what does FM 21-76 say about fire starting", - ], -} - -# ── Module-level cache ──────────────────────────────────────────────────────── -_ROUTE_CENTROIDS: dict | None = None -_LOCK = threading.Lock() - - -def _embed_batch(texts: list[str], tei_url: str) -> list[list[float]]: - """Embed a batch of texts via TEI.""" - resp = requests.post(tei_url, json={"inputs": texts}, timeout=30) - resp.raise_for_status() - return resp.json() - - -def _compute_centroid(vectors: list[list[float]]) -> list[float]: - """Element-wise mean of vectors.""" - n = len(vectors) - dim = len(vectors[0]) - centroid = [0.0] * dim - for vec in vectors: - for i in range(dim): - centroid[i] += vec[i] - for i in range(dim): - centroid[i] /= n - return centroid - - -def _cosine_similarity(a: list[float], b: list[float]) -> float: - """Cosine similarity between two vectors (pure Python).""" - dot = 0.0 - norm_a = 0.0 - norm_b = 0.0 - for i in range(len(a)): - dot += a[i] * b[i] - norm_a += a[i] * a[i] - norm_b += b[i] * b[i] - denom = math.sqrt(norm_a) * math.sqrt(norm_b) - if denom == 0: - return 0.0 - return dot / denom - - -def _ensure_centroids(tei_url: str) -> dict[str, list[float]]: - """Lazy-init: embed all examples in one batch, compute centroids, cache.""" - global _ROUTE_CENTROIDS - if _ROUTE_CENTROIDS is not None: - return _ROUTE_CENTROIDS - - with _LOCK: - if _ROUTE_CENTROIDS is not None: - return _ROUTE_CENTROIDS - - # Flatten all examples into one batch - all_texts = [] - route_ranges: dict[str, tuple[int, int]] = {} - offset = 0 - for route, examples in ROUTE_EXAMPLES.items(): - route_ranges[route] = (offset, offset + len(examples)) - all_texts.extend(examples) - offset += len(examples) - - all_vectors = _embed_batch(all_texts, tei_url) - - centroids = {} - for route, (start, end) in route_ranges.items(): - centroids[route] = _compute_centroid(all_vectors[start:end]) - - _ROUTE_CENTROIDS = centroids - return _ROUTE_CENTROIDS - - -def classify( - query: str, - tei_url: str = "http://100.64.0.14:8090/embed", - threshold: float = 0.45, -) -> tuple[str, float]: - """Classify a query into a route. - - Returns (route_name, confidence). If no route exceeds the threshold, - returns ("rag_search", best_score) as the safe default. - """ - centroids = _ensure_centroids(tei_url) - - # Embed the query - vecs = _embed_batch([query], tei_url) - query_vec = vecs[0] - - # Compare against all centroids - best_route = "rag_search" - best_score = 0.0 - for route, centroid in centroids.items(): - sim = _cosine_similarity(query_vec, centroid) - if sim > best_score: - best_score = sim - best_route = route - - if best_score < threshold: - return ("rag_search", best_score) - - return (best_route, best_score) diff --git a/lib/query_router_test.py b/lib/query_router_test.py deleted file mode 100644 index 27ccefd..0000000 --- a/lib/query_router_test.py +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python3 -"""Test suite for the semantic query router.""" - -import sys -import os - -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from lib.query_router import classify - -TEST_QUERIES = [ - ("how do I get from Buhl to Boise", "nav_route"), - ("what does the survival manual say about water", "rag_search"), - ("what town is at 42.5, -114.7", "nav_reverse_geocode"), - ("hey aurora", "direct_answer"), - ("what's the fastest way to Sun Valley", "nav_route"), - ("how to purify water in the field", "rag_search"), - ("good morning", "direct_answer"), -] - - -def main(): - print("Query Router Test Suite") - print("=" * 70) - - passed = 0 - failed = 0 - - for query, expected in TEST_QUERIES: - route, confidence = classify(query) - status = "PASS" if route == expected else "FAIL" - if status == "PASS": - passed += 1 - else: - failed += 1 - print(f" [{status}] {query!r}") - print(f" → {route} ({confidence:.3f}) expected={expected}") - - print("=" * 70) - print(f"Results: {passed}/{passed + failed} passed") - if failed: - print(f" {failed} FAILED") - sys.exit(1) - else: - print(" All tests passed!") - - -if __name__ == "__main__": - main() diff --git a/lib/scraper_runner.py b/lib/scraper_runner.py deleted file mode 100644 index b83f145..0000000 --- a/lib/scraper_runner.py +++ /dev/null @@ -1,387 +0,0 @@ -""" -RECON Scraper Runner - -Daemon loop that processes scrape jobs: crawl via Zimit → kiwix-manage. -Zimit (openZIM Docker crawler) handles all site types and produces ZIM -files directly — no separate zimwriterfs step needed. - -Public entry point: scraper_loop(stop_event, config). - -Config section: scraper (output_dir, docker_image, docker_workers, poll_interval) -DB table: scrape_jobs (status flow: pending → scraping → registering → complete) -""" -import glob as _glob -import os -import re -import shutil -import signal -import subprocess -import time -from datetime import datetime, timezone -from urllib.parse import urlparse - -from .utils import setup_logging -from .status import StatusDB - -logger = setup_logging('recon.scraper_runner') - - -def scraper_loop(stop_event, config): - """Daemon loop: poll for pending scrape jobs, execute pipeline.""" - scraper_cfg = config.get('scraper', {}) - poll_interval = scraper_cfg.get('poll_interval', 300) - - logger.info("Scraper runner started") - - # Clean up any orphan Zimit containers from a previous crash - _cleanup_orphan_containers() - - while not stop_event.is_set(): - db = StatusDB() - job = db.get_pending_scrape_job() - if job: - try: - _process_job(job, config, stop_event) - except Exception as e: - logger.error(f"Scraper job {job['id']} unexpected error: {e}", exc_info=True) - try: - db.update_scrape_job(job['id'], - status='failed', - error_message=str(e)[:1000], - subprocess_pid=None, - completed_at=_now()) - except Exception: - pass - else: - stop_event.wait(poll_interval) - - logger.info("Scraper runner stopped") - - -def _now(): - return datetime.now(timezone.utc).isoformat() - - -def _sanitize_domain(url): - """Extract and sanitize domain from URL for use in filenames.""" - parsed = urlparse(url) - domain = parsed.hostname or 'unknown' - if domain.startswith('www.'): - domain = domain[4:] - return domain - - -def _sanitize_filename(s): - """Sanitize a string for safe filename use.""" - return re.sub(r'[^a-zA-Z0-9._-]', '_', s) - - -def _check_cancelled(db, job_id): - """Check if a job has been cancelled in the DB.""" - job = db.get_scrape_job(job_id) - return job and job['status'] == 'cancelled' - - -def _kill_process(proc, timeout=5): - """Gracefully terminate a subprocess, force kill if needed.""" - if proc.poll() is not None: - return - try: - proc.terminate() - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - proc.kill() - proc.wait(timeout=2) - - -def _cleanup_orphan_containers(): - """Remove any leftover recon-scraper-* Docker containers from a previous crash.""" - try: - result = subprocess.run( - ['docker', 'ps', '-a', '--filter', 'name=recon-scraper-', '--format', '{{.Names}}'], - capture_output=True, text=True, timeout=10 - ) - if result.returncode == 0 and result.stdout.strip(): - for name in result.stdout.strip().split('\n'): - name = name.strip() - if name: - subprocess.run(['docker', 'rm', '-f', name], capture_output=True, timeout=10) - logger.info(f"Cleaned up orphan container: {name}") - except Exception as e: - logger.warning(f"Orphan container cleanup failed: {e}") - - -# ── Zimit crawl backend ────────────────────────────────────────── - - -def _crawl_zimit(job, config, stop_event, db): - """ - Crawl a URL using Zimit (openZIM Docker crawler). - - Returns (page_count, zim_filename, error_msg). - On success: (count, filename, None) - On failure: (0, None, error_string) - """ - job_id = job['id'] - url = job['url'] - title = job.get('title') or _sanitize_domain(url) - language = job.get('language') or config.get('scraper', {}).get('default_language', 'eng') - category = job.get('category') or '' - - scraper_cfg = config.get('scraper', {}) - output_dir = scraper_cfg.get('output_dir', '/mnt/kiwix') - docker_image = scraper_cfg.get('docker_image', 'ghcr.io/openzim/zimit') - docker_workers = scraper_cfg.get('docker_workers', 2) - - domain = _sanitize_domain(url) - date_tag = datetime.now().strftime('%Y-%m') - container_name = f'recon-scraper-{job_id}' - tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}') - - # Clean up any pre-existing container with same name (retry scenario) - subprocess.run(['docker', 'rm', '-f', container_name], capture_output=True, timeout=10) - - os.makedirs(tmp_dir, exist_ok=True) - - description = f"Mirror of {domain}" - if category: - description = f"{category} — mirror of {domain}" - - docker_cmd = [ - 'docker', 'run', - '--name', container_name, - '-v', f'{tmp_dir}:/output', - docker_image, - 'zimit', - '--seeds', url, - '--name', _sanitize_filename(domain), - '--zim-lang', language, - '--title', title, - '--description', description[:80], - '--output', '/output', - '-w', str(docker_workers), - ] - - logger.info(f"Job {job_id}: Zimit crawl starting — {url}") - try: - proc = subprocess.Popen( - docker_cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - db.update_scrape_job(job_id, subprocess_pid=proc.pid) - - last_progress_check = 0 - while proc.poll() is None: - if stop_event.is_set() or _check_cancelled(db, job_id): - # Stop the Docker container - subprocess.run(['docker', 'rm', '-f', container_name], - capture_output=True, timeout=10) - _kill_process(proc) - shutil.rmtree(tmp_dir, ignore_errors=True) - return 0, None, 'cancelled' - - # Check progress every 30s via docker logs - now = time.time() - if now - last_progress_check >= 30: - last_progress_check = now - try: - log_result = subprocess.run( - ['docker', 'logs', '--tail', '20', container_name], - capture_output=True, text=True, timeout=10 - ) - if log_result.returncode == 0: - # Browsertrix logs JSON with "crawled":N — check both stdout and stderr - log_text = log_result.stdout or log_result.stderr or '' - lines = log_text.strip().split('\n') - for line in reversed(lines): - match = re.search(r'"crawled":(\d+)', line) - if match: - count = int(match.group(1)) - if count > 0: - db.update_scrape_job(job_id, page_count=count) - break - except Exception: - pass - - try: - proc.wait(timeout=5) - except subprocess.TimeoutExpired: - pass - - db.update_scrape_job(job_id, subprocess_pid=None) - - if stop_event.is_set() or _check_cancelled(db, job_id): - shutil.rmtree(tmp_dir, ignore_errors=True) - return 0, None, 'cancelled' - - if proc.returncode != 0: - # Capture last 50 lines of docker logs for error context - error_msg = f"Zimit exited with code {proc.returncode}" - try: - log_result = subprocess.run( - ['docker', 'logs', '--tail', '50', container_name], - capture_output=True, text=True, timeout=10 - ) - log_text = (log_result.stderr or log_result.stdout or '').strip() - if log_text: - # Take last 500 chars - error_msg += f": {log_text[-500:]}" - except Exception: - pass - # Remove container (no --rm flag, so we clean up manually) - subprocess.run(['docker', 'rm', '-f', container_name], - capture_output=True, timeout=10) - shutil.rmtree(tmp_dir, ignore_errors=True) - return 0, None, error_msg - - except Exception as e: - shutil.rmtree(tmp_dir, ignore_errors=True) - return 0, None, f"Zimit error: {e}" - - # Remove container (no --rm flag, so we clean up manually after getting logs) - subprocess.run(['docker', 'rm', '-f', container_name], - capture_output=True, timeout=10) - - # Find the output ZIM file - zim_files = _glob.glob(os.path.join(tmp_dir, '*.zim')) - if not zim_files: - shutil.rmtree(tmp_dir, ignore_errors=True) - return 0, None, 'Zimit produced no ZIM file' - - src_zim = zim_files[0] # Should be exactly one - - # Get page count from file size as rough estimate if we don't have one - page_count = 0 - try: - job_state = db.get_scrape_job(job_id) - page_count = job_state.get('page_count') or 0 - except Exception: - pass - - # Rename to final location - zim_filename = f"{_sanitize_filename(domain)}_{language}_{date_tag}_{job_id}.zim" - zim_path = os.path.join(output_dir, zim_filename) - try: - shutil.move(src_zim, zim_path) - except Exception as e: - shutil.rmtree(tmp_dir, ignore_errors=True) - return 0, None, f"Failed to move ZIM to output dir: {e}" - - shutil.rmtree(tmp_dir, ignore_errors=True) - logger.info(f"Job {job_id}: Zimit complete — {zim_filename}") - - return page_count, zim_filename, None - - -# ── Main job pipeline ───────────────────────────────────────────── - - -def _process_job(job, config, stop_event): - """Execute the full scrape pipeline for a single job.""" - db = StatusDB() - job_id = job['id'] - - logger.info(f"Job {job_id}: starting scrape of {job['url']}") - - # ── Phase 1: Crawl via Zimit ─────────────────────────────────── - db.update_scrape_job(job_id, - status='scraping', - crawl_mode='zimit', - started_at=_now()) - - if stop_event.is_set() or _check_cancelled(db, job_id): - _handle_cancel(db, job_id) - return - - page_count, zim_filename, error = _crawl_zimit(job, config, stop_event, db) - - if error == 'cancelled': - _handle_cancel(db, job_id) - return - elif error: - db.update_scrape_job(job_id, - status='failed', - error_message=error[:1000], - subprocess_pid=None, - completed_at=_now()) - return - - db.update_scrape_job(job_id, page_count=page_count) - - # ── Phase 2: Register with kiwix-serve ───────────────────────── - if stop_event.is_set() or _check_cancelled(db, job_id): - _handle_cancel(db, job_id) - return - - db.update_scrape_job(job_id, status='registering') - - output_dir = config.get('scraper', {}).get('output_dir', '/mnt/kiwix') - zim_path = os.path.join(output_dir, zim_filename) - kiwix_manage = shutil.which('kiwix-manage') or '/opt/recon/bin/kiwix-manage' - library_xml = '/mnt/kiwix/library.xml' - - try: - subprocess.run( - [kiwix_manage, library_xml, 'add', zim_path], - capture_output=True, text=True, timeout=30 - ) - logger.info(f"Job {job_id}: registered with kiwix-serve library") - except Exception as e: - logger.warning(f"Job {job_id}: kiwix-manage add failed: {e}") - - try: - result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) - if result.returncode == 0 and result.stdout.strip(): - pid = int(result.stdout.strip().split()[0]) - os.kill(pid, signal.SIGHUP) - logger.info(f"Job {job_id}: sent SIGHUP to kiwix-serve (pid {pid})") - except Exception as e: - logger.warning(f"Job {job_id}: failed to signal kiwix-serve: {e}") - - # Wait for kiwix-serve to reload its catalog after SIGHUP - time.sleep(3) - - zim_source_id = None - try: - from .zim_monitor import scan_zims - scan_zims() - conn = db._get_conn() - row = conn.execute( - "SELECT id FROM zim_sources WHERE zim_filename = ?", (zim_filename,) - ).fetchone() - if row: - zim_source_id = row['id'] - logger.info(f"Job {job_id}: linked to zim_source_id={zim_source_id}") - except Exception as e: - logger.warning(f"Job {job_id}: scan_zims failed: {e}") - - # ── Phase 3: Complete ────────────────────────────────────────── - db.update_scrape_job(job_id, - status='complete', - zim_filename=zim_filename, - zim_source_id=zim_source_id, - completed_at=_now()) - - logger.info(f"Job {job_id}: complete — {zim_filename} ({page_count} pages)") - - -def _handle_cancel(db, job_id): - """Handle job cancellation: clean up Docker container and update status.""" - container_name = f'recon-scraper-{job_id}' - try: - subprocess.run(['docker', 'rm', '-f', container_name], - capture_output=True, timeout=10) - except Exception: - pass - - # Clean up tmp dir if it exists - output_dir = '/mnt/kiwix' - tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}') - shutil.rmtree(tmp_dir, ignore_errors=True) - - logger.info(f"Job {job_id}: cancelled") - db.update_scrape_job(job_id, - status='cancelled', - subprocess_pid=None, - completed_at=_now()) diff --git a/lib/status.py b/lib/status.py index 974cabd..20cc77b 100644 --- a/lib/status.py +++ b/lib/status.py @@ -105,25 +105,6 @@ class StatusDB: except Exception: pass # column already exists - # Migration: add subprocess_pid column to scrape_jobs if missing - try: - conn.execute("ALTER TABLE scrape_jobs ADD COLUMN subprocess_pid INTEGER") - except Exception: - pass # column already exists - - # Migration: add reject pattern columns to scrape_jobs if missing - for col, coltype in [('additional_reject_patterns', 'TEXT'), ('skip_default_patterns', 'INTEGER DEFAULT 0')]: - try: - conn.execute(f"ALTER TABLE scrape_jobs ADD COLUMN {col} {coltype}") - except Exception: - pass # column already exists - - # Migration: add crawl_mode column to scrape_jobs if missing - try: - conn.execute("ALTER TABLE scrape_jobs ADD COLUMN crawl_mode TEXT") - except Exception: - pass # column already exists - # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -161,28 +142,6 @@ class StatusDB: resolved_at TEXT ); CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status); - - CREATE TABLE IF NOT EXISTS scrape_jobs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT NOT NULL, - title TEXT, - language TEXT DEFAULT 'eng', - category TEXT, - status TEXT DEFAULT 'pending', - page_count INTEGER DEFAULT 0, - error_message TEXT, - zim_filename TEXT, - zim_source_id INTEGER, - workspace_path TEXT, - subprocess_pid INTEGER, - additional_reject_patterns TEXT, - skip_default_patterns INTEGER DEFAULT 0, - crawl_mode TEXT, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - started_at TEXT, - completed_at TEXT - ); - CREATE INDEX IF NOT EXISTS idx_scrape_status ON scrape_jobs(status); """) conn.commit() @@ -447,50 +406,6 @@ class StatusDB: ) conn.commit() - - # ── Scraper Job Helpers ───────────────────────────────────── - - def get_pending_scrape_job(self): - """Fetch the oldest pending scrape job.""" - conn = self._get_conn() - row = conn.execute( - "SELECT * FROM scrape_jobs WHERE status = 'pending' ORDER BY id ASC LIMIT 1" - ).fetchone() - return dict(row) if row else None - - def update_scrape_job(self, job_id, **kwargs): - """Update arbitrary columns on a scrape job.""" - if not kwargs: - return - conn = self._get_conn() - sets = [] - vals = [] - for k, v in kwargs.items(): - sets.append(f"{k} = ?") - vals.append(v) - vals.append(job_id) - conn.execute(f"UPDATE scrape_jobs SET {', '.join(sets)} WHERE id = ?", vals) - conn.commit() - - def get_scrape_jobs(self, status=None): - """List scrape jobs, optionally filtered by status.""" - conn = self._get_conn() - if status: - rows = conn.execute( - "SELECT * FROM scrape_jobs WHERE status = ? ORDER BY id DESC", (status,) - ).fetchall() - else: - rows = conn.execute( - "SELECT * FROM scrape_jobs ORDER BY id DESC" - ).fetchall() - return [dict(r) for r in rows] - - def get_scrape_job(self, job_id): - """Get a single scrape job by ID.""" - conn = self._get_conn() - row = conn.execute("SELECT * FROM scrape_jobs WHERE id = ?", (job_id,)).fetchone() - return dict(row) if row else None - # ── Stream B: File Operations ─────────────────────────────────── def log_file_operation(self, doc_hash, operation, source_path, target_path, diff --git a/lib/zim_monitor.py b/lib/zim_monitor.py deleted file mode 100644 index 248fc0f..0000000 --- a/lib/zim_monitor.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -ZIM Monitor — detects ZIMs loaded in kiwix-serve and tracks them in recon.db. - -Polls the kiwix-serve OPDS v2 catalog, compares against the zim_sources table, -and for new ZIMs reads accurate metadata via python-libzim's Counter field. - -Standalone: python3 /opt/recon/lib/zim_monitor.py -As module: from lib.zim_monitor import scan_zims -""" -import logging -import os -import sqlite3 -import sys -import urllib.request -from xml.etree import ElementTree as ET - -sys.path.insert(0, "/opt/recon") -from lib.utils import setup_logging - -try: - from libzim.reader import Archive - HAVE_LIBZIM = True -except ImportError: - HAVE_LIBZIM = False - -OPDS_URL = "http://localhost:8430/catalog/v2/entries?count=-1" -ZIM_DIR = "/mnt/kiwix" -DB_PATH = "/opt/recon/data/recon.db" - -ATOM_NS = "http://www.w3.org/2005/Atom" - -logger = logging.getLogger("recon.zim_monitor") - - -def _text(element, tag, ns=ATOM_NS): - """Get text content of a child element, or None.""" - child = element.find(f"{{{ns}}}{tag}") - if child is not None and child.text: - return child.text.strip() - return None - - -def parse_counter(counter_str): - """Parse ZIM Counter metadata into {mimetype: count}.""" - result = {} - for pair in counter_str.split(";"): - if "=" in pair: - mime, count = pair.split("=", 1) - try: - result[mime.strip()] = int(count.strip()) - except ValueError: - pass - return result - - -def fetch_opds(): - """Fetch OPDS v2 catalog from kiwix-serve. Returns list of dicts.""" - try: - with urllib.request.urlopen(OPDS_URL, timeout=10) as resp: - data = resp.read() - except Exception as e: - logger.error("Failed to fetch OPDS catalog: %s", e) - return [] - - root = ET.fromstring(data) - entries = [] - for entry in root.findall(f"{{{ATOM_NS}}}entry"): - uuid_raw = _text(entry, "id") - uuid = uuid_raw.replace("urn:uuid:", "") if uuid_raw else None - - # Derive ZIM filename from the content link href - zim_filename = None - for link in entry.findall(f"{{{ATOM_NS}}}link"): - if link.get("type") == "text/html": - href = link.get("href", "") - # href looks like /content/appropedia_en_all_maxi_2025-11 - name = href.rsplit("/", 1)[-1] if "/" in href else href - if name: - zim_filename = name + ".zim" - break - - entries.append({ - "uuid": uuid, - "title": _text(entry, "title"), - "name": _text(entry, "name"), - "flavour": _text(entry, "flavour"), - "language": _text(entry, "language"), - "category": _text(entry, "category") or None, - "summary": _text(entry, "summary"), - "article_count_opds": int(_text(entry, "articleCount") or 0), - "zim_filename": zim_filename, - }) - return entries - - -def get_libzim_metadata(zim_path): - """Open a ZIM file and read accurate metadata via python-libzim.""" - if not HAVE_LIBZIM: - logger.warning("python-libzim not available, skipping metadata read") - return {} - - zim = Archive(zim_path) - meta = {} - - def _get_meta(key): - try: - return zim.get_metadata(key).decode("utf-8", errors="replace") - except RuntimeError: - return None - - meta["title"] = _get_meta("Title") - meta["description"] = _get_meta("Description") - meta["language"] = _get_meta("Language") - meta["tags"] = _get_meta("Tags") - - counter_str = _get_meta("Counter") - if counter_str: - counts = parse_counter(counter_str) - meta["article_count"] = counts.get("text/html", 0) - meta["counter_raw"] = counter_str - else: - meta["article_count"] = 0 - meta["counter_raw"] = None - - return meta - - -def scan_zims(): - """Compare OPDS catalog against zim_sources table. Insert/update as needed.""" - logger.info("Scanning kiwix-serve OPDS catalog...") - opds_entries = fetch_opds() - if not opds_entries: - logger.info("No entries in OPDS catalog (or fetch failed)") - return - - logger.info("OPDS returned %d entries", len(opds_entries)) - - con = sqlite3.connect(DB_PATH) - con.row_factory = sqlite3.Row - - # Get existing zim_sources keyed by filename - existing = {} - for row in con.execute("SELECT id, zim_filename, status FROM zim_sources"): - existing[row["zim_filename"]] = dict(row) - - opds_filenames = set() - new_count = 0 - - for entry in opds_entries: - filename = entry["zim_filename"] - if not filename: - logger.warning("Skipping OPDS entry with no derivable filename: %s", entry) - continue - - opds_filenames.add(filename) - - if filename in existing: - logger.debug("Already tracked: %s (status=%s)", filename, existing[filename]["status"]) - continue - - # New ZIM — read accurate metadata via python-libzim - zim_path = os.path.join(ZIM_DIR, filename) - if not os.path.isfile(zim_path): - logger.warning("ZIM file not found on disk: %s", zim_path) - continue - - logger.info("New ZIM detected: %s — reading metadata via libzim", filename) - meta = get_libzim_metadata(zim_path) - - con.execute( - """INSERT INTO zim_sources - (zim_filename, zim_path, zim_uuid, title, description, - language, category, article_count, status) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'detected')""", - ( - filename, - zim_path, - entry["uuid"], - meta.get("title") or entry["title"], - meta.get("description") or entry["summary"], - meta.get("language") or entry["language"], - entry["category"], - meta.get("article_count", 0), - ), - ) - new_count += 1 - logger.info( - " Inserted: %s — title=%r, articles=%s (OPDS said %s)", - filename, - meta.get("title") or entry["title"], - meta.get("article_count", 0), - entry["article_count_opds"], - ) - - # Detect removed ZIMs (in DB but not in OPDS, and not already marked removed) - removed_count = 0 - for filename, row in existing.items(): - if filename not in opds_filenames and row["status"] != "removed": - con.execute( - "UPDATE zim_sources SET status = 'removed' WHERE id = ?", - (row["id"],), - ) - removed_count += 1 - logger.info("Marked removed: %s", filename) - - con.commit() - con.close() - - logger.info( - "Scan complete: %d new, %d removed, %d total in catalog", - new_count, removed_count, len(opds_entries), - ) - - -if __name__ == "__main__": - setup_logging("recon.zim_monitor") - scan_zims() diff --git a/recon.py b/recon.py index 9635a59..47dda7d 100755 --- a/recon.py +++ b/recon.py @@ -692,23 +692,12 @@ def cmd_service(args): daemon=True, name='dashboard'), ] - # Scraper daemon: polls for pending scrape jobs, runs wget+zimwriterfs pipeline - scraper_cfg = config.get('scraper', {}) - if scraper_cfg.get('workspace'): - from lib.scraper_runner import scraper_loop - threads.append( - threading.Thread(target=lambda: scraper_loop(stop_event, config), - daemon=True, name='scraper') - ) - logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") pt_interval = config.get("peertube", {}).get("poll_interval", 1800) logger.info(f" PeerTube acquisition: every {pt_interval}s") - if scraper_cfg.get('workspace'): - logger.info(f" Scraper: every {scraper_cfg.get('poll_interval', 300)}s") logger.info(f" Progress: every {progress_interval}s") for t in threads: diff --git a/requirements.txt b/requirements.txt index 1da21bc..f643cd8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ anyio==4.12.1 babel==2.18.0 beautifulsoup4==4.14.3 blinker==1.9.0 -cachetools==7.1.3 certifi==2026.1.4 cffi==2.0.0 charset-normalizer==3.4.4 diff --git a/static/css/recon.css b/static/css/recon.css index a272876..95aed52 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -211,7 +211,6 @@ tr:hover { background: var(--bg-secondary); } .badge-web { background: #1e3a5f; color: #60a5fa; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-pdf { background: #2d5a2d; color: #4ade80; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-transcript { background: #3b1f5e; color: #c084fc; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } -.badge-wiki { background: #1f4a3b; color: #34d399; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } /* ── Trend indicators ── */ .trend { font-size: 11px; margin-left: 6px; } @@ -316,19 +315,3 @@ tr:hover { background: var(--bg-secondary); } .errors-panel.has-errors { display: block; } .errors-panel summary { color: var(--red); cursor: pointer; font-size: 13px; margin-bottom: 8px; } .errors-panel .error-line { color: var(--text-muted); font-size: 11px; padding: 2px 0; border-bottom: 1px solid var(--border); } - -/* ── Toggle switch ── */ -.toggle-switch { position: relative; display: inline-block; width: 40px; height: 20px; } -.toggle-switch input { opacity: 0; width: 0; height: 0; } -.toggle-slider { position: absolute; cursor: pointer; inset: 0; background: #333; border-radius: 20px; transition: 0.3s; } -.toggle-slider:before { content: ''; position: absolute; height: 16px; width: 16px; left: 2px; bottom: 2px; background: #888; border-radius: 50%; transition: 0.3s; } -.toggle-switch input:checked + .toggle-slider { background: #1a4a2e; } -.toggle-switch input:checked + .toggle-slider:before { transform: translateX(20px); background: #00ff41; } - -/* ── Kiwix status badges ── */ -.badge-complete { background: #1a4a2e; color: #00ff41; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } -.badge-ingesting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } -.badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } -.badge-processing { background: #4a3a1a; color: #f59e0b; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } -.badge-extracting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } -.badge-failed { background: #4a1a1a; color: #ff4444; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 0bd0b39..254d92a 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -88,7 +88,7 @@ var pipeCount = s.in_pipeline || 0; totalCat += catCount; totalComp += compCount; totalPipe += pipeCount; totalConcepts += s.concepts; totalVectors += s.vectors; - var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : s.type === 'wiki' ? 'WIKI' : 'PDF'; + var badge = s.type === 'transcript' ? 'TRANSCRIPT' : s.type === 'web' ? 'WEB' : 'PDF'; var compPct = catCount > 0 ? (compCount / catCount * 100) : 0; var pipePct = catCount > 0 ? (pipeCount / catCount * 100) : 0; var compColor = compPct >= 100 ? '#00ff41' : compPct > 0 ? '#ffa500' : '#666'; @@ -185,7 +185,7 @@ rtb.innerHTML = '
  • '; } else { rtb.innerHTML = data.recent_complete.map(function(r) { - var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : r.type === 'wiki' ? 'WIKI' : 'PDF'; + var badge = r.type === 'transcript' ? 'TRANSCRIPT' : r.type === 'web' ? 'WEB' : 'PDF'; return ''; }).join(''); diff --git a/static/js/kiwix.js b/static/js/kiwix.js deleted file mode 100644 index c85ee93..0000000 --- a/static/js/kiwix.js +++ /dev/null @@ -1,147 +0,0 @@ -/* RECON Kiwix Dashboard JS */ -(function() { - 'use strict'; - - function loadKiwixDashboard() { - return RECON.fetchJSON('/api/kiwix/sources').then(function(data) { - // Update stat cards - var t = data.totals || {}; - RECON.set('kx-sources', RECON.fmt(t.sources)); - RECON.set('kx-articles', RECON.fmt(t.articles)); - RECON.set('kx-processed', RECON.fmt(t.processed)); - RECON.set('kx-pipeline', RECON.fmt(t.in_pipeline)); - - // Kiwix-serve status dot - var ks = data.kiwix_serve || {}; - var dot = document.getElementById('svc-kiwix-serve'); - dot.className = 'svc-dot ' + (ks.status === 'active' ? 'active' : 'inactive'); - - // ZIM table - var sources = data.sources || []; - var html = ''; - sources.forEach(function(s) { - var es = s.effective_status || s.status; - var pipe = s.pipeline || {}; - var pipeComplete = pipe.complete || 0; - var pipeTotal = 0; - for (var k in pipe) pipeTotal += pipe[k]; - var pctDone = pipeTotal > 0 ? (pipeComplete / pipeTotal * 100).toFixed(1) : 0; - var statusBadge = es === 'complete' ? 'COMPLETE' : - es === 'processing' ? 'PROCESSING' : - es === 'extracting' ? 'EXTRACTING' : - 'DETECTED'; - // Derive browse URL from zim_filename - var zimName = s.zim_filename.replace(/_(?:maxi|mini|nopic)_[\d-]+\.zim$/, ''); - var browseUrl = 'https://wiki.echo6.co/' + zimName + '/'; - // Toggle switch - var checked = s.ingest_enabled ? ' checked' : ''; - var toggle = ''; - - html += '' + - '' + - '' + - '' + - '' + - '' + - '' + - '' + - '' + - ''; - }); - if (!html) html = ''; - RECON.setHTML('kx-table-body', html); - }).catch(function(err) { - console.error('Kiwix dashboard error:', err); - }); - } - - function toggleIngest(id, enabled) { - RECON.postJSON('/api/kiwix/toggle-ingest/' + id, {enabled: enabled}).then(function(data) { - if (data.ok) loadKiwixDashboard(); - }); - } - - function removeSource(id, title) { - if (!confirm('Remove "' + title + '"?\n\nThis will delete the ZIM file, all ingested documents, and associated vectors from Qdrant. This cannot be undone.')) return; - RECON.postJSON('/api/kiwix/remove/' + id).then(function(data) { - if (data.ok) { - var r = data.results || {}; - alert('Removed: ' + r.docs_deleted + ' docs, ~' + r.vectors_deleted + ' vector batches deleted, file ' + (r.file_deleted ? 'deleted' : 'not found')); - loadKiwixDashboard(); - } - }); - } - - function triggerIngest(id) { - RECON.postJSON('/api/kiwix/trigger-ingest/' + id).then(function(data) { - if (data.ok) loadKiwixDashboard(); - }); - } - - function uploadZim() { - var input = document.getElementById('kx-file-input'); - var file = input.files[0]; - if (!file) return; - - var statusEl = document.getElementById('kx-upload-status'); - var progressDiv = document.getElementById('kx-upload-progress'); - var progressBar = document.getElementById('kx-progress-bar'); - var progressText = document.getElementById('kx-progress-text'); - - statusEl.textContent = 'Uploading ' + file.name + '...'; - progressDiv.style.display = 'block'; - - var formData = new FormData(); - formData.append('file', file); - - var xhr = new XMLHttpRequest(); - xhr.open('POST', '/api/kiwix/upload', true); - - xhr.upload.onprogress = function(e) { - if (e.lengthComputable) { - var pct = (e.loaded / e.total * 100).toFixed(1); - progressBar.style.width = pct + '%'; - progressText.textContent = RECON.fmtBytes(e.loaded) + ' / ' + RECON.fmtBytes(e.total) + ' (' + pct + '%)'; - } - }; - - xhr.onload = function() { - if (xhr.status === 200) { - var resp = JSON.parse(xhr.responseText); - statusEl.textContent = resp.ok ? 'Upload complete: ' + resp.filename : 'Error: ' + (resp.error || 'Unknown'); - progressBar.style.width = '100%'; - progressBar.style.background = resp.ok ? '#16a34a' : '#dc2626'; - if (resp.ok) loadKiwixDashboard(); - } else { - statusEl.textContent = 'Upload failed (HTTP ' + xhr.status + ')'; - progressBar.style.background = '#dc2626'; - } - input.value = ''; - }; - - xhr.onerror = function() { - statusEl.textContent = 'Upload failed (network error)'; - progressBar.style.background = '#dc2626'; - input.value = ''; - }; - - xhr.send(formData); - } - - // Expose for inline onclick - window.KIWIX = { toggleIngest: toggleIngest, triggerIngest: triggerIngest, remove: removeSource }; - - document.addEventListener('DOMContentLoaded', function() { - RECON.startRefresh(loadKiwixDashboard, 30000); - document.getElementById('kx-file-input').addEventListener('change', uploadZim); - }); -})(); diff --git a/static/js/scraper.js b/static/js/scraper.js deleted file mode 100644 index 3988ffe..0000000 --- a/static/js/scraper.js +++ /dev/null @@ -1,173 +0,0 @@ -/* RECON Scraper Dashboard JS */ -(function() { - 'use strict'; - - function loadJobs() { - return RECON.fetchJSON('/api/scraper/jobs').then(function(data) { - var jobs = data.jobs || []; - - // Stats - var total = jobs.length; - var active = 0, complete = 0, failed = 0; - jobs.forEach(function(j) { - if (j.status === 'complete') complete++; - else if (j.status === 'failed' || j.status === 'cancelled') failed++; - else if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') active++; - }); - RECON.set('sc-total', RECON.fmt(total)); - RECON.set('sc-active', RECON.fmt(active)); - RECON.set('sc-complete', RECON.fmt(complete)); - RECON.set('sc-failed', RECON.fmt(failed)); - - // Show/hide Clear Failed button - var clearBtn = document.getElementById('sc-clear-btn'); - if (clearBtn) clearBtn.style.display = failed > 0 ? '' : 'none'; - - // Table - var html = ''; - jobs.forEach(function(j) { - var badge = statusBadge(j.status); - var pages = j.page_count ? RECON.fmt(j.page_count) : '\u2014'; - var zim = j.zim_filename ? - '' + j.zim_filename + '' : '\u2014'; - var actions = ''; - - if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') { - actions = ''; - } else if (j.status === 'failed' || j.status === 'cancelled') { - actions = ' ' + - ''; - } else if (j.status === 'complete') { - actions = ''; - } - - // Truncate URL for display - var displayUrl = j.url.length > 40 ? j.url.substring(0, 40) + '\u2026' : j.url; - - html += '' + - '' + - '' + - '' + - '' + - '' + - '' + - '' + - ''; - }); - if (!html) html = ''; - RECON.setHTML('sc-table-body', html); - }).catch(function(err) { - console.error('Scraper dashboard error:', err); - }); - } - - function statusBadge(status) { - var map = { - 'pending': 'PENDING', - 'scraping': 'SCRAPING', - 'registering': 'REGISTERING', - 'complete': 'COMPLETE', - 'failed': 'FAILED', - 'cancelled': 'CANCELLED' - }; - return map[status] || '' + (status || 'UNKNOWN').toUpperCase() + ''; - } - - function errorTooltip(job) { - if (!job.error_message) return ''; - var short = job.error_message.length > 80 ? - job.error_message.substring(0, 80) + '\u2026' : job.error_message; - return '
    ' + escHtml(short) + '
    '; - } - - function escHtml(str) { - if (!str) return ''; - return str.replace(/&/g, '&').replace(//g, '>') - .replace(/"/g, '"').replace(/'/g, '''); - } - - function submit(e) { - e.preventDefault(); - var url = document.getElementById('sf-url').value.trim(); - if (!url) return false; - - var body = { url: url }; - var title = document.getElementById('sf-title').value.trim(); - var lang = document.getElementById('sf-lang').value; - var category = document.getElementById('sf-category').value.trim(); - if (title) body.title = title; - if (lang) body.language = lang; - if (category) body.category = category; - - var btn = document.getElementById('sf-submit-btn'); - var feedback = document.getElementById('sf-feedback'); - btn.disabled = true; - btn.textContent = 'Submitting...'; - - RECON.postJSON('/api/scraper/submit', body).then(function(data) { - btn.disabled = false; - btn.textContent = 'Submit'; - if (data.ok) { - feedback.style.display = 'block'; - feedback.style.color = '#00ff41'; - feedback.textContent = 'Job #' + data.job_id + ' submitted successfully'; - document.getElementById('sf-url').value = ''; - document.getElementById('sf-title').value = ''; - document.getElementById('sf-category').value = ''; - setTimeout(function() { feedback.style.display = 'none'; }, 4000); - loadJobs(); - } else { - feedback.style.display = 'block'; - feedback.style.color = '#ff4444'; - feedback.textContent = 'Error: ' + (data.error || 'Unknown error'); - } - }).catch(function(err) { - btn.disabled = false; - btn.textContent = 'Submit'; - feedback.style.display = 'block'; - feedback.style.color = '#ff4444'; - feedback.textContent = 'Network error: ' + err.message; - }); - - return false; - } - - function cancel(jobId) { - if (!confirm('Cancel job #' + jobId + '?')) return; - RECON.postJSON('/api/scraper/cancel/' + jobId).then(function(data) { - if (data.ok) loadJobs(); - else alert('Error: ' + (data.error || 'Unknown')); - }); - } - - function retry(jobId) { - RECON.postJSON('/api/scraper/retry/' + jobId).then(function(data) { - if (data.ok) loadJobs(); - else alert('Error: ' + (data.error || 'Unknown')); - }); - } - - function remove(jobId) { - if (!confirm('Delete job #' + jobId + '? This cannot be undone.')) return; - RECON.postJSON('/api/scraper/delete/' + jobId).then(function(data) { - if (data.ok) loadJobs(); - else alert('Error: ' + (data.error || 'Unknown')); - }); - } - - function clearFailed() { - if (!confirm('Delete all failed and cancelled jobs?')) return; - RECON.postJSON('/api/scraper/clear-failed').then(function(data) { - if (data.ok) loadJobs(); - else alert('Error: ' + (data.error || 'Unknown')); - }); - } - - // Expose for inline onclick - window.SCRAPER = { submit: submit, cancel: cancel, retry: retry, remove: remove, clearFailed: clearFailed }; - - document.addEventListener('DOMContentLoaded', function() { - RECON.startRefresh(loadJobs, 10000); - }); -})(); diff --git a/templates/base.html b/templates/base.html index 49b1a21..09db6d8 100644 --- a/templates/base.html +++ b/templates/base.html @@ -19,7 +19,6 @@ diff --git a/templates/kiwix/dashboard.html b/templates/kiwix/dashboard.html deleted file mode 100644 index 72bbed4..0000000 --- a/templates/kiwix/dashboard.html +++ /dev/null @@ -1,48 +0,0 @@ -{% extends "base.html" %} -{% block content %} -
    - -
    -
    ZIM Sources
    -
    Total Articles
    -
    Processed
    -
    In Pipeline
    -
    - - -
    -
    Kiwix-Serve
    - -
    - - -
    -

    ZIM Library

    -
    None yet
    ' + r.title + '' + badge + '' + r.concepts + '' + r.vectors + '
    ' + (s.title || s.zim_filename) + '' + - '
    ' + s.zim_filename + '
    ' + (s.language || '\u2014') + '' + RECON.fmt(s.article_count) + '' + (es === 'complete' && pipeComplete > 0 ? - RECON.fmt(pipeComplete) + ' in Qdrant' : - es === 'processing' ? - RECON.fmt(pipeComplete) + ' / ' + RECON.fmt(pipeTotal) + ' in Qdrant (' + pctDone + '%)' : - es === 'extracting' ? - RECON.fmt(s.processed_count) + ' / ' + RECON.fmt(s.article_count) + ' extracted' : - '\u2014') + '' + statusBadge + '' + toggle + 'Browse
    No ZIM sources detected
    ' + j.id + '' + escHtml(displayUrl) + '' + escHtml(j.title || '\u2014') + '' + pages + '' + badge + errorTooltip(j) + '' + zim + '' + actions + '
    No scrape jobs
    - - - - - - -
    TitleLanguageArticlesProgressStatusIngestBrowse
    Loading...
    - - - -
    -

    Upload ZIM File

    -
    - - - -
    - -
    - -{% endblock %} -{% block scripts %} - -{% endblock %} diff --git a/templates/kiwix/scraper.html b/templates/kiwix/scraper.html deleted file mode 100644 index 862ba0a..0000000 --- a/templates/kiwix/scraper.html +++ /dev/null @@ -1,84 +0,0 @@ -{% extends "base.html" %} -{% block content %} -
    - -
    -

    Submit Scrape Job

    -
    -
    -
    - - -
    -
    - - -
    -
    -
    -
    - - -
    -
    - - -
    -
    - -
    -
    - -
    -
    - - -
    -
    Total Jobs
    -
    Active
    -
    Complete
    -
    Failed
    -
    - - -
    -
    -

    Scrape Jobs

    - -
    - - - - - - - - - - - - - - - -
    IDURLTitlePagesStatusZIM
    Loading...
    -
    -
    -{% endblock %} -{% block scripts %} - -{% endblock %}