diff --git a/.gitignore b/.gitignore index 3fb01ef..bce13d8 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ recon.db # Kiwix binary tools (installed from tarball) bin/ +status.db diff --git a/config.yaml b/config.yaml index 4b147fd..a2709b0 100644 --- a/config.yaml +++ b/config.yaml @@ -413,6 +413,14 @@ peertube: rate_limit_delay: 0.5 # Delay between video ingestions (seconds) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) +scraper: + workspace: /opt/recon/data/scraper # Working directory (tmp dirs for Zimit output) + output_dir: /mnt/kiwix # Finished .zim files land here (kiwix-serve library) + default_language: eng # ISO 639-3 language code for ZIM metadata + poll_interval: 300 # Seconds between checking for pending scrape jobs + docker_image: ghcr.io/openzim/zimit # Zimit Docker image for web crawling + docker_workers: 2 # Concurrent crawl workers inside Zimit container + # Stream B: New Library Pipeline new_pipeline: # Disabled 2026-04-14 for refactor — see refactored-recon repo for context diff --git a/lib/api.py b/lib/api.py index a739ec0..6a3d627 100644 --- a/lib/api.py +++ b/lib/api.py @@ -44,6 +44,20 @@ app = Flask(__name__, app.config['MAX_CONTENT_LENGTH'] = None # ZIM files can be multi-GB + +# ── Large ZIM upload support ── +# Override stream factory so ZIM uploads write directly to /mnt/kiwix/ +# instead of /tmp (which is on the 96GB root disk and can't hold 100GB+ ZIMs). +from flask import Request as _FlaskRequest + +class _LargeZimRequest(_FlaskRequest): + def _get_file_stream(self, total_content_length, content_type, filename=None, content_length=None): + if filename and filename.lower().endswith('.zim'): + return tempfile.NamedTemporaryFile('wb+', dir='/mnt/kiwix', prefix='.upload_', suffix='.tmp', delete=False) + return super()._get_file_stream(total_content_length, content_type, filename, content_length) + +app.request_class = _LargeZimRequest + # ── Navigation Constants ── KNOWLEDGE_SUBNAV = [ @@ -60,7 +74,10 @@ PEERTUBE_SUBNAV = [ ] -KIWIX_SUBNAV = [] # Single-page, no subnav needed +KIWIX_SUBNAV = [ + {'href': '/kiwix', 'label': 'Library'}, + {'href': '/kiwix/scraper', 'label': 'Scraper'}, +] SETTINGS_SUBNAV = [ {'href': '/settings/keys', 'label': 'API Keys'}, {'href': '/settings/cookies', 'label': 'YouTube Cookies'}, @@ -1956,6 +1973,12 @@ def kiwix_dashboard(): domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix') +@app.route('/kiwix/scraper') +def kiwix_scraper(): + return render_template('kiwix/scraper.html', + domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix/scraper') + + @app.route('/api/kiwix/sources') def api_kiwix_sources(): """Serve pre-cached Kiwix sources data (never blocks).""" @@ -2011,14 +2034,23 @@ def api_kiwix_upload(): filename = secure_filename(f.filename) dest = os.path.join('/mnt/kiwix', filename) - tmp_dest = dest + '.tmp' try: - f.save(tmp_dest) - os.rename(tmp_dest, dest) + # Stream was written directly to /mnt/kiwix/ by _LargeZimRequest — + # rename in-place instead of copying 100GB+ through f.save() + if hasattr(f.stream, 'name') and f.stream.name: + tmp_path = f.stream.name + f.stream.close() + os.rename(tmp_path, dest) + else: + tmp_dest = dest + '.tmp' + f.save(tmp_dest) + os.rename(tmp_dest, dest) except Exception as e: - if os.path.exists(tmp_dest): - os.remove(tmp_dest) + # Clean up any temp files on failure + for p in [locals().get('tmp_path', ''), locals().get('tmp_dest', '')]: + if p and os.path.exists(p): + os.remove(p) return jsonify({'error': f'Save failed: {e}'}), 500 # Register with kiwix-serve library @@ -2051,23 +2083,24 @@ def api_kiwix_upload(): -@app.route('/api/kiwix/remove/', methods=['POST']) -def api_kiwix_remove(source_id): - """Remove a ZIM source: delete vectors, DB records, library entry, and file.""" +def _full_zim_cleanup(source_id): + """Full ZIM cleanup: Qdrant vectors, DB records, kiwix-manage, SIGHUP, file delete. + Returns dict with results. Caller handles cache refresh.""" import subprocess + import signal import requests as req db = StatusDB() conn = db._get_conn() row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() if not row: - return jsonify({'error': 'Source not found'}), 404 + return None zim_source = dict(row) zim_filename = zim_source['zim_filename'] zim_path = zim_source['zim_path'] zim_title = zim_source.get('title', zim_filename) - results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False} + results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False, 'scrape_jobs_deleted': 0} # Step 1: Find all document hashes for this ZIM source doc_hashes = [r['hash'] for r in conn.execute( @@ -2126,7 +2159,6 @@ def api_kiwix_remove(source_id): # Step 4: Remove from kiwix-serve library try: - # Get the book ID from library.xml subprocess.run( ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', zim_filename.replace('.zim', '')], capture_output=True, text=True, timeout=10 @@ -2134,6 +2166,16 @@ def api_kiwix_remove(source_id): except Exception as e: logger.warning(f"kiwix-manage remove failed: {e}") + # Step 4b: SIGHUP kiwix-serve to reload library + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Failed to signal kiwix-serve: {e}") + # Step 5: Delete the ZIM file if os.path.isfile(zim_path): try: @@ -2143,13 +2185,37 @@ def api_kiwix_remove(source_id): logger.warning(f"ZIM file delete failed: {e}") results['file_deleted'] = False + # Step 6: Delete any linked scrape_jobs rows + try: + res = conn.execute("DELETE FROM scrape_jobs WHERE zim_source_id = ?", (source_id,)) + conn.commit() + results['scrape_jobs_deleted'] = res.rowcount + except Exception as e: + logger.warning(f"scrape_jobs cleanup failed: {e}") + + logger.info(f"Full ZIM cleanup for source {source_id} ('{zim_title}'): {results}") + return results + + +@app.route('/api/kiwix/remove/', methods=['POST']) +def api_kiwix_remove(source_id): + """Remove a ZIM source: delete vectors, DB records, library entry, and file.""" + db = StatusDB() + conn = db._get_conn() + row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() + if not row: + return jsonify({'error': 'Source not found'}), 404 + + results = _full_zim_cleanup(source_id) + if results is None: + return jsonify({'error': 'Source not found during cleanup'}), 404 + # Refresh cache try: _cache['kiwix_sources'] = _build_kiwix_sources() except Exception: pass - logger.info(f"Removed ZIM source '{zim_title}': {results}") return jsonify({'ok': True, 'results': results}) @@ -2256,6 +2322,170 @@ def _build_kiwix_sources(): } + + +# ── Scraper API ── + +@app.route('/api/scraper/submit', methods=['POST']) +def api_scraper_submit(): + """Submit a new web scrape job.""" + data = request.get_json(silent=True) or {} + url = (data.get('url') or '').strip() + + if not url: + return jsonify({'error': 'url is required'}), 400 + if not url.startswith(('http://', 'https://')): + return jsonify({'error': 'URL must start with http:// or https://'}), 400 + + config = get_config() + scraper_cfg = config.get('scraper', {}) + language = data.get('language') or scraper_cfg.get('default_language', 'eng') + title = data.get('title', '').strip() or None + category = data.get('category', '').strip() or None + + db = StatusDB() + conn = db._get_conn() + conn.execute( + "INSERT INTO scrape_jobs (url, title, language, category, crawl_mode) VALUES (?, ?, ?, ?, ?)", + (url, title, language, category, 'zimit') + ) + conn.commit() + job_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + + logger.info(f"Scraper job {job_id} submitted: {url}") + return jsonify({'ok': True, 'job_id': job_id}), 201 + + +@app.route('/api/scraper/jobs') +def api_scraper_jobs(): + """List scrape jobs, optionally filtered by status.""" + status_filter = request.args.get('status') + db = StatusDB() + jobs = db.get_scrape_jobs(status=status_filter) + return jsonify({'jobs': jobs}) + + +@app.route('/api/scraper/cancel/', methods=['POST']) +def api_scraper_cancel(job_id): + """Cancel a scrape job.""" + + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] in ('complete', 'cancelled'): + return jsonify({'error': f"Job already {job['status']}"}), 400 + + # Set cancelled in DB — the runner loop checks this between phases + db.update_scrape_job(job_id, status='cancelled') + + # Stop the Docker container if running + container_name = f'recon-scraper-{job_id}' + try: + import subprocess as _subprocess + _subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + except Exception: + pass + + logger.info(f"Scraper job {job_id} cancelled") + return jsonify({'ok': True}) + + +@app.route('/api/scraper/retry/', methods=['POST']) +def api_scraper_retry(job_id): + """Retry a failed or cancelled scrape job.""" + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] not in ('failed', 'cancelled'): + return jsonify({'error': f"Job status is '{job['status']}', can only retry failed or cancelled jobs"}), 400 + + db.update_scrape_job(job_id, + status='pending', + error_message=None, + subprocess_pid=None, + crawl_mode=None, + started_at=None, + completed_at=None) + + logger.info(f"Scraper job {job_id} reset to pending for retry") + return jsonify({'ok': True}) + + +@app.route('/api/scraper/delete/', methods=['POST']) +def api_scraper_delete(job_id): + """Delete a scrape job and clean up any associated ZIM artifacts.""" + import subprocess + import signal + + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] == 'running': + return jsonify({'error': 'Cannot delete a running job \u2014 cancel it first'}), 400 + + zim_cleanup_results = None + + # If the job has a linked zim_source, do full cleanup + if job.get('zim_source_id'): + zim_cleanup_results = _full_zim_cleanup(job['zim_source_id']) + try: + _cache['kiwix_sources'] = _build_kiwix_sources() + except Exception: + pass + elif job.get('zim_filename'): + # No zim_source row, but there may be an orphan file + library entry + zim_path = os.path.join('/mnt/kiwix', job['zim_filename']) + if os.path.isfile(zim_path): + try: + os.remove(zim_path) + logger.info(f"Deleted orphan ZIM file: {zim_path}") + except Exception as e: + logger.warning(f"Failed to delete orphan ZIM file {zim_path}: {e}") + try: + subprocess.run( + ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', + job['zim_filename'].replace('.zim', '')], + capture_output=True, text=True, timeout=10 + ) + except Exception as e: + logger.warning(f"kiwix-manage remove failed for orphan: {e}") + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Failed to signal kiwix-serve: {e}") + + # Delete the scrape_jobs row (may already be gone if _full_zim_cleanup deleted it) + conn = db._get_conn() + conn.execute("DELETE FROM scrape_jobs WHERE id = ?", (job_id,)) + conn.commit() + + logger.info(f"Scraper job {job_id} deleted (zim_cleanup={zim_cleanup_results})") + return jsonify({'ok': True, 'zim_cleanup': zim_cleanup_results}) + + +@app.route('/api/scraper/clear-failed', methods=['POST']) +def api_scraper_clear_failed(): + """Delete all failed and cancelled scrape jobs.""" + db = StatusDB() + conn = db._get_conn() + result = conn.execute("DELETE FROM scrape_jobs WHERE status IN ('failed', 'cancelled')") + conn.commit() + count = result.rowcount + logger.info(f"Cleared {count} failed/cancelled scraper jobs") + return jsonify({'ok': True, 'deleted': count}) + + # ── Metrics API ── @app.route('/api/metrics/history') diff --git a/lib/scraper_runner.py b/lib/scraper_runner.py new file mode 100644 index 0000000..b83f145 --- /dev/null +++ b/lib/scraper_runner.py @@ -0,0 +1,387 @@ +""" +RECON Scraper Runner + +Daemon loop that processes scrape jobs: crawl via Zimit → kiwix-manage. +Zimit (openZIM Docker crawler) handles all site types and produces ZIM +files directly — no separate zimwriterfs step needed. + +Public entry point: scraper_loop(stop_event, config). + +Config section: scraper (output_dir, docker_image, docker_workers, poll_interval) +DB table: scrape_jobs (status flow: pending → scraping → registering → complete) +""" +import glob as _glob +import os +import re +import shutil +import signal +import subprocess +import time +from datetime import datetime, timezone +from urllib.parse import urlparse + +from .utils import setup_logging +from .status import StatusDB + +logger = setup_logging('recon.scraper_runner') + + +def scraper_loop(stop_event, config): + """Daemon loop: poll for pending scrape jobs, execute pipeline.""" + scraper_cfg = config.get('scraper', {}) + poll_interval = scraper_cfg.get('poll_interval', 300) + + logger.info("Scraper runner started") + + # Clean up any orphan Zimit containers from a previous crash + _cleanup_orphan_containers() + + while not stop_event.is_set(): + db = StatusDB() + job = db.get_pending_scrape_job() + if job: + try: + _process_job(job, config, stop_event) + except Exception as e: + logger.error(f"Scraper job {job['id']} unexpected error: {e}", exc_info=True) + try: + db.update_scrape_job(job['id'], + status='failed', + error_message=str(e)[:1000], + subprocess_pid=None, + completed_at=_now()) + except Exception: + pass + else: + stop_event.wait(poll_interval) + + logger.info("Scraper runner stopped") + + +def _now(): + return datetime.now(timezone.utc).isoformat() + + +def _sanitize_domain(url): + """Extract and sanitize domain from URL for use in filenames.""" + parsed = urlparse(url) + domain = parsed.hostname or 'unknown' + if domain.startswith('www.'): + domain = domain[4:] + return domain + + +def _sanitize_filename(s): + """Sanitize a string for safe filename use.""" + return re.sub(r'[^a-zA-Z0-9._-]', '_', s) + + +def _check_cancelled(db, job_id): + """Check if a job has been cancelled in the DB.""" + job = db.get_scrape_job(job_id) + return job and job['status'] == 'cancelled' + + +def _kill_process(proc, timeout=5): + """Gracefully terminate a subprocess, force kill if needed.""" + if proc.poll() is not None: + return + try: + proc.terminate() + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=2) + + +def _cleanup_orphan_containers(): + """Remove any leftover recon-scraper-* Docker containers from a previous crash.""" + try: + result = subprocess.run( + ['docker', 'ps', '-a', '--filter', 'name=recon-scraper-', '--format', '{{.Names}}'], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0 and result.stdout.strip(): + for name in result.stdout.strip().split('\n'): + name = name.strip() + if name: + subprocess.run(['docker', 'rm', '-f', name], capture_output=True, timeout=10) + logger.info(f"Cleaned up orphan container: {name}") + except Exception as e: + logger.warning(f"Orphan container cleanup failed: {e}") + + +# ── Zimit crawl backend ────────────────────────────────────────── + + +def _crawl_zimit(job, config, stop_event, db): + """ + Crawl a URL using Zimit (openZIM Docker crawler). + + Returns (page_count, zim_filename, error_msg). + On success: (count, filename, None) + On failure: (0, None, error_string) + """ + job_id = job['id'] + url = job['url'] + title = job.get('title') or _sanitize_domain(url) + language = job.get('language') or config.get('scraper', {}).get('default_language', 'eng') + category = job.get('category') or '' + + scraper_cfg = config.get('scraper', {}) + output_dir = scraper_cfg.get('output_dir', '/mnt/kiwix') + docker_image = scraper_cfg.get('docker_image', 'ghcr.io/openzim/zimit') + docker_workers = scraper_cfg.get('docker_workers', 2) + + domain = _sanitize_domain(url) + date_tag = datetime.now().strftime('%Y-%m') + container_name = f'recon-scraper-{job_id}' + tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}') + + # Clean up any pre-existing container with same name (retry scenario) + subprocess.run(['docker', 'rm', '-f', container_name], capture_output=True, timeout=10) + + os.makedirs(tmp_dir, exist_ok=True) + + description = f"Mirror of {domain}" + if category: + description = f"{category} — mirror of {domain}" + + docker_cmd = [ + 'docker', 'run', + '--name', container_name, + '-v', f'{tmp_dir}:/output', + docker_image, + 'zimit', + '--seeds', url, + '--name', _sanitize_filename(domain), + '--zim-lang', language, + '--title', title, + '--description', description[:80], + '--output', '/output', + '-w', str(docker_workers), + ] + + logger.info(f"Job {job_id}: Zimit crawl starting — {url}") + try: + proc = subprocess.Popen( + docker_cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + db.update_scrape_job(job_id, subprocess_pid=proc.pid) + + last_progress_check = 0 + while proc.poll() is None: + if stop_event.is_set() or _check_cancelled(db, job_id): + # Stop the Docker container + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + _kill_process(proc) + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, 'cancelled' + + # Check progress every 30s via docker logs + now = time.time() + if now - last_progress_check >= 30: + last_progress_check = now + try: + log_result = subprocess.run( + ['docker', 'logs', '--tail', '20', container_name], + capture_output=True, text=True, timeout=10 + ) + if log_result.returncode == 0: + # Browsertrix logs JSON with "crawled":N — check both stdout and stderr + log_text = log_result.stdout or log_result.stderr or '' + lines = log_text.strip().split('\n') + for line in reversed(lines): + match = re.search(r'"crawled":(\d+)', line) + if match: + count = int(match.group(1)) + if count > 0: + db.update_scrape_job(job_id, page_count=count) + break + except Exception: + pass + + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + pass + + db.update_scrape_job(job_id, subprocess_pid=None) + + if stop_event.is_set() or _check_cancelled(db, job_id): + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, 'cancelled' + + if proc.returncode != 0: + # Capture last 50 lines of docker logs for error context + error_msg = f"Zimit exited with code {proc.returncode}" + try: + log_result = subprocess.run( + ['docker', 'logs', '--tail', '50', container_name], + capture_output=True, text=True, timeout=10 + ) + log_text = (log_result.stderr or log_result.stdout or '').strip() + if log_text: + # Take last 500 chars + error_msg += f": {log_text[-500:]}" + except Exception: + pass + # Remove container (no --rm flag, so we clean up manually) + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, error_msg + + except Exception as e: + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, f"Zimit error: {e}" + + # Remove container (no --rm flag, so we clean up manually after getting logs) + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + + # Find the output ZIM file + zim_files = _glob.glob(os.path.join(tmp_dir, '*.zim')) + if not zim_files: + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, 'Zimit produced no ZIM file' + + src_zim = zim_files[0] # Should be exactly one + + # Get page count from file size as rough estimate if we don't have one + page_count = 0 + try: + job_state = db.get_scrape_job(job_id) + page_count = job_state.get('page_count') or 0 + except Exception: + pass + + # Rename to final location + zim_filename = f"{_sanitize_filename(domain)}_{language}_{date_tag}_{job_id}.zim" + zim_path = os.path.join(output_dir, zim_filename) + try: + shutil.move(src_zim, zim_path) + except Exception as e: + shutil.rmtree(tmp_dir, ignore_errors=True) + return 0, None, f"Failed to move ZIM to output dir: {e}" + + shutil.rmtree(tmp_dir, ignore_errors=True) + logger.info(f"Job {job_id}: Zimit complete — {zim_filename}") + + return page_count, zim_filename, None + + +# ── Main job pipeline ───────────────────────────────────────────── + + +def _process_job(job, config, stop_event): + """Execute the full scrape pipeline for a single job.""" + db = StatusDB() + job_id = job['id'] + + logger.info(f"Job {job_id}: starting scrape of {job['url']}") + + # ── Phase 1: Crawl via Zimit ─────────────────────────────────── + db.update_scrape_job(job_id, + status='scraping', + crawl_mode='zimit', + started_at=_now()) + + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id) + return + + page_count, zim_filename, error = _crawl_zimit(job, config, stop_event, db) + + if error == 'cancelled': + _handle_cancel(db, job_id) + return + elif error: + db.update_scrape_job(job_id, + status='failed', + error_message=error[:1000], + subprocess_pid=None, + completed_at=_now()) + return + + db.update_scrape_job(job_id, page_count=page_count) + + # ── Phase 2: Register with kiwix-serve ───────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id) + return + + db.update_scrape_job(job_id, status='registering') + + output_dir = config.get('scraper', {}).get('output_dir', '/mnt/kiwix') + zim_path = os.path.join(output_dir, zim_filename) + kiwix_manage = shutil.which('kiwix-manage') or '/opt/recon/bin/kiwix-manage' + library_xml = '/mnt/kiwix/library.xml' + + try: + subprocess.run( + [kiwix_manage, library_xml, 'add', zim_path], + capture_output=True, text=True, timeout=30 + ) + logger.info(f"Job {job_id}: registered with kiwix-serve library") + except Exception as e: + logger.warning(f"Job {job_id}: kiwix-manage add failed: {e}") + + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Job {job_id}: sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Job {job_id}: failed to signal kiwix-serve: {e}") + + # Wait for kiwix-serve to reload its catalog after SIGHUP + time.sleep(3) + + zim_source_id = None + try: + from .zim_monitor import scan_zims + scan_zims() + conn = db._get_conn() + row = conn.execute( + "SELECT id FROM zim_sources WHERE zim_filename = ?", (zim_filename,) + ).fetchone() + if row: + zim_source_id = row['id'] + logger.info(f"Job {job_id}: linked to zim_source_id={zim_source_id}") + except Exception as e: + logger.warning(f"Job {job_id}: scan_zims failed: {e}") + + # ── Phase 3: Complete ────────────────────────────────────────── + db.update_scrape_job(job_id, + status='complete', + zim_filename=zim_filename, + zim_source_id=zim_source_id, + completed_at=_now()) + + logger.info(f"Job {job_id}: complete — {zim_filename} ({page_count} pages)") + + +def _handle_cancel(db, job_id): + """Handle job cancellation: clean up Docker container and update status.""" + container_name = f'recon-scraper-{job_id}' + try: + subprocess.run(['docker', 'rm', '-f', container_name], + capture_output=True, timeout=10) + except Exception: + pass + + # Clean up tmp dir if it exists + output_dir = '/mnt/kiwix' + tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}') + shutil.rmtree(tmp_dir, ignore_errors=True) + + logger.info(f"Job {job_id}: cancelled") + db.update_scrape_job(job_id, + status='cancelled', + subprocess_pid=None, + completed_at=_now()) diff --git a/lib/status.py b/lib/status.py index 20cc77b..974cabd 100644 --- a/lib/status.py +++ b/lib/status.py @@ -105,6 +105,25 @@ class StatusDB: except Exception: pass # column already exists + # Migration: add subprocess_pid column to scrape_jobs if missing + try: + conn.execute("ALTER TABLE scrape_jobs ADD COLUMN subprocess_pid INTEGER") + except Exception: + pass # column already exists + + # Migration: add reject pattern columns to scrape_jobs if missing + for col, coltype in [('additional_reject_patterns', 'TEXT'), ('skip_default_patterns', 'INTEGER DEFAULT 0')]: + try: + conn.execute(f"ALTER TABLE scrape_jobs ADD COLUMN {col} {coltype}") + except Exception: + pass # column already exists + + # Migration: add crawl_mode column to scrape_jobs if missing + try: + conn.execute("ALTER TABLE scrape_jobs ADD COLUMN crawl_mode TEXT") + except Exception: + pass # column already exists + # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -142,6 +161,28 @@ class StatusDB: resolved_at TEXT ); CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status); + + CREATE TABLE IF NOT EXISTS scrape_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + title TEXT, + language TEXT DEFAULT 'eng', + category TEXT, + status TEXT DEFAULT 'pending', + page_count INTEGER DEFAULT 0, + error_message TEXT, + zim_filename TEXT, + zim_source_id INTEGER, + workspace_path TEXT, + subprocess_pid INTEGER, + additional_reject_patterns TEXT, + skip_default_patterns INTEGER DEFAULT 0, + crawl_mode TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + started_at TEXT, + completed_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_scrape_status ON scrape_jobs(status); """) conn.commit() @@ -406,6 +447,50 @@ class StatusDB: ) conn.commit() + + # ── Scraper Job Helpers ───────────────────────────────────── + + def get_pending_scrape_job(self): + """Fetch the oldest pending scrape job.""" + conn = self._get_conn() + row = conn.execute( + "SELECT * FROM scrape_jobs WHERE status = 'pending' ORDER BY id ASC LIMIT 1" + ).fetchone() + return dict(row) if row else None + + def update_scrape_job(self, job_id, **kwargs): + """Update arbitrary columns on a scrape job.""" + if not kwargs: + return + conn = self._get_conn() + sets = [] + vals = [] + for k, v in kwargs.items(): + sets.append(f"{k} = ?") + vals.append(v) + vals.append(job_id) + conn.execute(f"UPDATE scrape_jobs SET {', '.join(sets)} WHERE id = ?", vals) + conn.commit() + + def get_scrape_jobs(self, status=None): + """List scrape jobs, optionally filtered by status.""" + conn = self._get_conn() + if status: + rows = conn.execute( + "SELECT * FROM scrape_jobs WHERE status = ? ORDER BY id DESC", (status,) + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM scrape_jobs ORDER BY id DESC" + ).fetchall() + return [dict(r) for r in rows] + + def get_scrape_job(self, job_id): + """Get a single scrape job by ID.""" + conn = self._get_conn() + row = conn.execute("SELECT * FROM scrape_jobs WHERE id = ?", (job_id,)).fetchone() + return dict(row) if row else None + # ── Stream B: File Operations ─────────────────────────────────── def log_file_operation(self, doc_hash, operation, source_path, target_path, diff --git a/recon.py b/recon.py index 47dda7d..9635a59 100755 --- a/recon.py +++ b/recon.py @@ -692,12 +692,23 @@ def cmd_service(args): daemon=True, name='dashboard'), ] + # Scraper daemon: polls for pending scrape jobs, runs wget+zimwriterfs pipeline + scraper_cfg = config.get('scraper', {}) + if scraper_cfg.get('workspace'): + from lib.scraper_runner import scraper_loop + threads.append( + threading.Thread(target=lambda: scraper_loop(stop_event, config), + daemon=True, name='scraper') + ) + logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") pt_interval = config.get("peertube", {}).get("poll_interval", 1800) logger.info(f" PeerTube acquisition: every {pt_interval}s") + if scraper_cfg.get('workspace'): + logger.info(f" Scraper: every {scraper_cfg.get('poll_interval', 300)}s") logger.info(f" Progress: every {progress_interval}s") for t in threads: diff --git a/static/css/recon.css b/static/css/recon.css index 31d6306..a272876 100644 --- a/static/css/recon.css +++ b/static/css/recon.css @@ -331,3 +331,4 @@ tr:hover { background: var(--bg-secondary); } .badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-processing { background: #4a3a1a; color: #f59e0b; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-extracting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } +.badge-failed { background: #4a1a1a; color: #ff4444; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } diff --git a/static/js/scraper.js b/static/js/scraper.js new file mode 100644 index 0000000..3988ffe --- /dev/null +++ b/static/js/scraper.js @@ -0,0 +1,173 @@ +/* RECON Scraper Dashboard JS */ +(function() { + 'use strict'; + + function loadJobs() { + return RECON.fetchJSON('/api/scraper/jobs').then(function(data) { + var jobs = data.jobs || []; + + // Stats + var total = jobs.length; + var active = 0, complete = 0, failed = 0; + jobs.forEach(function(j) { + if (j.status === 'complete') complete++; + else if (j.status === 'failed' || j.status === 'cancelled') failed++; + else if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') active++; + }); + RECON.set('sc-total', RECON.fmt(total)); + RECON.set('sc-active', RECON.fmt(active)); + RECON.set('sc-complete', RECON.fmt(complete)); + RECON.set('sc-failed', RECON.fmt(failed)); + + // Show/hide Clear Failed button + var clearBtn = document.getElementById('sc-clear-btn'); + if (clearBtn) clearBtn.style.display = failed > 0 ? '' : 'none'; + + // Table + var html = ''; + jobs.forEach(function(j) { + var badge = statusBadge(j.status); + var pages = j.page_count ? RECON.fmt(j.page_count) : '\u2014'; + var zim = j.zim_filename ? + '' + j.zim_filename + '' : '\u2014'; + var actions = ''; + + if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') { + actions = ''; + } else if (j.status === 'failed' || j.status === 'cancelled') { + actions = ' ' + + ''; + } else if (j.status === 'complete') { + actions = ''; + } + + // Truncate URL for display + var displayUrl = j.url.length > 40 ? j.url.substring(0, 40) + '\u2026' : j.url; + + html += '' + + '' + j.id + '' + + '' + escHtml(displayUrl) + '' + + '' + escHtml(j.title || '\u2014') + '' + + '' + pages + '' + + '' + badge + errorTooltip(j) + '' + + '' + zim + '' + + '' + actions + '' + + ''; + }); + if (!html) html = 'No scrape jobs'; + RECON.setHTML('sc-table-body', html); + }).catch(function(err) { + console.error('Scraper dashboard error:', err); + }); + } + + function statusBadge(status) { + var map = { + 'pending': 'PENDING', + 'scraping': 'SCRAPING', + 'registering': 'REGISTERING', + 'complete': 'COMPLETE', + 'failed': 'FAILED', + 'cancelled': 'CANCELLED' + }; + return map[status] || '' + (status || 'UNKNOWN').toUpperCase() + ''; + } + + function errorTooltip(job) { + if (!job.error_message) return ''; + var short = job.error_message.length > 80 ? + job.error_message.substring(0, 80) + '\u2026' : job.error_message; + return '
' + escHtml(short) + '
'; + } + + function escHtml(str) { + if (!str) return ''; + return str.replace(/&/g, '&').replace(//g, '>') + .replace(/"/g, '"').replace(/'/g, '''); + } + + function submit(e) { + e.preventDefault(); + var url = document.getElementById('sf-url').value.trim(); + if (!url) return false; + + var body = { url: url }; + var title = document.getElementById('sf-title').value.trim(); + var lang = document.getElementById('sf-lang').value; + var category = document.getElementById('sf-category').value.trim(); + if (title) body.title = title; + if (lang) body.language = lang; + if (category) body.category = category; + + var btn = document.getElementById('sf-submit-btn'); + var feedback = document.getElementById('sf-feedback'); + btn.disabled = true; + btn.textContent = 'Submitting...'; + + RECON.postJSON('/api/scraper/submit', body).then(function(data) { + btn.disabled = false; + btn.textContent = 'Submit'; + if (data.ok) { + feedback.style.display = 'block'; + feedback.style.color = '#00ff41'; + feedback.textContent = 'Job #' + data.job_id + ' submitted successfully'; + document.getElementById('sf-url').value = ''; + document.getElementById('sf-title').value = ''; + document.getElementById('sf-category').value = ''; + setTimeout(function() { feedback.style.display = 'none'; }, 4000); + loadJobs(); + } else { + feedback.style.display = 'block'; + feedback.style.color = '#ff4444'; + feedback.textContent = 'Error: ' + (data.error || 'Unknown error'); + } + }).catch(function(err) { + btn.disabled = false; + btn.textContent = 'Submit'; + feedback.style.display = 'block'; + feedback.style.color = '#ff4444'; + feedback.textContent = 'Network error: ' + err.message; + }); + + return false; + } + + function cancel(jobId) { + if (!confirm('Cancel job #' + jobId + '?')) return; + RECON.postJSON('/api/scraper/cancel/' + jobId).then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + function retry(jobId) { + RECON.postJSON('/api/scraper/retry/' + jobId).then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + function remove(jobId) { + if (!confirm('Delete job #' + jobId + '? This cannot be undone.')) return; + RECON.postJSON('/api/scraper/delete/' + jobId).then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + function clearFailed() { + if (!confirm('Delete all failed and cancelled jobs?')) return; + RECON.postJSON('/api/scraper/clear-failed').then(function(data) { + if (data.ok) loadJobs(); + else alert('Error: ' + (data.error || 'Unknown')); + }); + } + + // Expose for inline onclick + window.SCRAPER = { submit: submit, cancel: cancel, retry: retry, remove: remove, clearFailed: clearFailed }; + + document.addEventListener('DOMContentLoaded', function() { + RECON.startRefresh(loadJobs, 10000); + }); +})(); diff --git a/templates/kiwix/scraper.html b/templates/kiwix/scraper.html new file mode 100644 index 0000000..862ba0a --- /dev/null +++ b/templates/kiwix/scraper.html @@ -0,0 +1,84 @@ +{% extends "base.html" %} +{% block content %} +
+ +
+

Submit Scrape Job

+
+
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ + +
+
+ +
+
+ +
+
+ + +
+
Total Jobs
+
Active
+
Complete
+
Failed
+
+ + +
+
+

Scrape Jobs

+ +
+ + + + + + + + + + + + + + + +
IDURLTitlePagesStatusZIM
Loading...
+
+
+{% endblock %} +{% block scripts %} + +{% endblock %}