diff --git a/config.yaml b/config.yaml index 4b147fd..c98a866 100644 --- a/config.yaml +++ b/config.yaml @@ -413,6 +413,84 @@ peertube: rate_limit_delay: 0.5 # Delay between video ingestions (seconds) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) +scraper: + workspace: /opt/recon/data/scraper # Working directory for wget mirrors + ZIM builds + output_dir: /mnt/kiwix # Finished .zim files land here (kiwix-serve library) + rate_limit_delay: 0.5 # Seconds between wget requests (--wait) + wait_random: 1.0 # Random jitter added to wait (--random-wait range) + default_language: eng # ISO 639-3 language code for ZIM metadata + user_agent: "Mozilla/5.0 (compatible; RECON/1.0; +https://echo6.co)" + poll_interval: 300 # Seconds between checking for pending scrape jobs + keep_workspace_on_failure: true # Retain workspace for debugging when a job fails + + # Default URL patterns rejected by wget --reject-regex. + # Covers common CMS junk across WordPress, Squarespace, Wix, Ghost, Drupal, etc. + # Per-job overrides: additional_reject_patterns (appended) or skip_default_patterns (bypass). + default_reject_patterns: + # WordPress + - '\?share=' + - '\?replytocom=' + - '\?like_comment=' + - '/feed/' + - '/wp-json/' + - '/wp-login' + - '/wp-admin' + - '/wp-cron' + - '\?attachment_id=' + - '/xmlrpc' + - '/trackback' + - '/comment-page-' + - '\?doing_wp_cron' + # Squarespace + - '\?format=json' + - '\?format=rss' + - '/api/' + # Wix + - '/_api/' + - '/_partials/' + # Ghost + - '/ghost/' + - '/p/' + # Drupal + - '\?q=comment' + - '\?q=node' + - '/user/login' + - '/user/register' + # General CMS / site chrome + - '/login' + - '/signup' + - '/register' + - '/cart' + - '/checkout' + - '/search\?' + - '/tag/' + - '/author/' + - '\?print=' + - '\?pdf=' + - '\?format=amp' + - '\?preview=' + - '/rss' + - '/atom' + - '/cdn-cgi/' + + # Pre-flight mode detection + preflight: + enabled: true + timeout: 30 # Seconds for single-page Playwright fetch + min_static_size: 5120 # 5KB - wget HTML below this = suspect JS site + min_browser_size: 20480 # 20KB - browser HTML above this confirms JS + spa_markers: + - 'div#root' + - 'div#app' + - 'div#__next' + + # SingleFile CLI settings (browser crawl mode) + singlefile: + executable: single-file + chromium_path: "" # Auto-detected from Playwright if empty + crawl_max_depth: 10 + crawl_delay: 2 # Seconds between page fetches + # Stream B: New Library Pipeline new_pipeline: # Disabled 2026-04-14 for refactor — see refactored-recon repo for context diff --git a/lib/api.py b/lib/api.py index a739ec0..cbb3377 100644 --- a/lib/api.py +++ b/lib/api.py @@ -2256,6 +2256,114 @@ def _build_kiwix_sources(): } + + +# ── Scraper API ── + +@app.route('/api/scraper/submit', methods=['POST']) +def api_scraper_submit(): + """Submit a new web scrape job.""" + data = request.get_json(silent=True) or {} + url = (data.get('url') or '').strip() + + if not url: + return jsonify({'error': 'url is required'}), 400 + if not url.startswith(('http://', 'https://')): + return jsonify({'error': 'URL must start with http:// or https://'}), 400 + + config = get_config() + scraper_cfg = config.get('scraper', {}) + language = data.get('language') or scraper_cfg.get('default_language', 'eng') + title = data.get('title', '').strip() or None + category = data.get('category', '').strip() or None + + # Optional per-job reject pattern overrides + additional_reject_patterns = data.get('additional_reject_patterns') + skip_default_patterns = bool(data.get('skip_default_patterns', False)) + + # Optional crawl mode override (static, browser, redirect, or null for auto-detect) + crawl_mode = data.get('crawl_mode') + if crawl_mode and crawl_mode not in ('static', 'browser', 'redirect'): + return jsonify({'error': "crawl_mode must be 'static', 'browser', 'redirect', or null"}), 400 + + # Serialize additional patterns as JSON if provided + import json as _json + additional_json = _json.dumps(additional_reject_patterns) if additional_reject_patterns else None + + db = StatusDB() + conn = db._get_conn() + conn.execute( + "INSERT INTO scrape_jobs (url, title, language, category, additional_reject_patterns, skip_default_patterns, crawl_mode) VALUES (?, ?, ?, ?, ?, ?, ?)", + (url, title, language, category, additional_json, int(skip_default_patterns), crawl_mode) + ) + conn.commit() + job_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + + logger.info(f"Scraper job {job_id} submitted: {url}") + return jsonify({'ok': True, 'job_id': job_id}), 201 + + +@app.route('/api/scraper/jobs') +def api_scraper_jobs(): + """List scrape jobs, optionally filtered by status.""" + status_filter = request.args.get('status') + db = StatusDB() + jobs = db.get_scrape_jobs(status=status_filter) + return jsonify({'jobs': jobs}) + + +@app.route('/api/scraper/cancel/', methods=['POST']) +def api_scraper_cancel(job_id): + """Cancel a scrape job.""" + import os as _os + import signal as _signal + + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] in ('complete', 'cancelled'): + return jsonify({'error': f"Job already {job['status']}"}), 400 + + # Set cancelled in DB — the runner loop checks this between phases + db.update_scrape_job(job_id, status='cancelled') + + # If there's an active subprocess, send SIGTERM + pid = job.get('subprocess_pid') + if pid: + try: + _os.kill(pid, _signal.SIGTERM) + except (ProcessLookupError, PermissionError): + pass # Process already gone + + logger.info(f"Scraper job {job_id} cancelled") + return jsonify({'ok': True}) + + +@app.route('/api/scraper/retry/', methods=['POST']) +def api_scraper_retry(job_id): + """Retry a failed or cancelled scrape job.""" + db = StatusDB() + job = db.get_scrape_job(job_id) + if not job: + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] not in ('failed', 'cancelled'): + return jsonify({'error': f"Job status is '{job['status']}', can only retry failed or cancelled jobs"}), 400 + + db.update_scrape_job(job_id, + status='pending', + error_message=None, + subprocess_pid=None, + crawl_mode=None, + started_at=None, + completed_at=None) + + logger.info(f"Scraper job {job_id} reset to pending for retry") + return jsonify({'ok': True}) + + # ── Metrics API ── @app.route('/api/metrics/history') diff --git a/lib/scraper_runner.py b/lib/scraper_runner.py new file mode 100644 index 0000000..1599f2e --- /dev/null +++ b/lib/scraper_runner.py @@ -0,0 +1,695 @@ +""" +RECON Scraper Runner + +Daemon loop that processes scrape jobs: crawl → zimwriterfs → kiwix-manage. +Supports two crawl backends: + - wget (static sites) — default + - SingleFile CLI (JS-rendered sites) — browser mode + +Pre-flight detection automatically chooses the right backend unless +crawl_mode is pre-set on the job. + +Public entry point: scraper_loop(stop_event, config). + +Config section: scraper (workspace, output_dir, rate_limit_delay, preflight, singlefile) +DB table: scrape_jobs (status flow: pending → scraping → packaging → complete) +""" +import glob as _glob +import json as _json +import os +import re +import shutil +import signal +import subprocess +import tempfile +import time +from datetime import datetime, timezone +from urllib.parse import urlparse + +from .utils import setup_logging +from .status import StatusDB + +logger = setup_logging('recon.scraper_runner') + + +def scraper_loop(stop_event, config): + """Daemon loop: poll for pending scrape jobs, execute pipeline.""" + scraper_cfg = config.get('scraper', {}) + poll_interval = scraper_cfg.get('poll_interval', 300) + + logger.info("Scraper runner started") + + while not stop_event.is_set(): + db = StatusDB() + job = db.get_pending_scrape_job() + if job: + try: + _process_job(job, config, stop_event) + except Exception as e: + logger.error(f"Scraper job {job['id']} unexpected error: {e}", exc_info=True) + try: + db.update_scrape_job(job['id'], + status='failed', + error_message=str(e)[:1000], + subprocess_pid=None, + completed_at=_now()) + except Exception: + pass + else: + stop_event.wait(poll_interval) + + logger.info("Scraper runner stopped") + + +def _now(): + return datetime.now(timezone.utc).isoformat() + + +def _sanitize_domain(url): + """Extract and sanitize domain from URL for use in filenames.""" + parsed = urlparse(url) + domain = parsed.hostname or 'unknown' + if domain.startswith('www.'): + domain = domain[4:] + return domain + + +def _sanitize_filename(s): + """Sanitize a string for safe filename use.""" + return re.sub(r'[^a-zA-Z0-9._-]', '_', s) + + +def _check_cancelled(db, job_id): + """Check if a job has been cancelled in the DB.""" + job = db.get_scrape_job(job_id) + return job and job['status'] == 'cancelled' + + +def _kill_process(proc, timeout=5): + """Gracefully terminate a subprocess, force kill if needed.""" + if proc.poll() is not None: + return + try: + proc.terminate() + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=2) + + +def _count_html_files(directory): + """Count HTML files in a directory tree.""" + count = 0 + for root, dirs, files in os.walk(directory): + for f in files: + if f.lower().endswith(('.html', '.htm')): + count += 1 + return count + + +def _find_welcome_page(content_dir, domain): + """Find the welcome page (index.html) in the wget mirror.""" + domain_dir = None + for entry in os.listdir(content_dir): + entry_path = os.path.join(content_dir, entry) + if os.path.isdir(entry_path): + domain_dir = entry_path + break + + if not domain_dir: + return None, content_dir + + for candidate in ['index.html', 'index.htm']: + path = os.path.join(domain_dir, candidate) + if os.path.isfile(path): + return candidate, domain_dir + + for root, dirs, files in os.walk(domain_dir): + for f in sorted(files): + if f.lower().endswith(('.html', '.htm')): + rel = os.path.relpath(os.path.join(root, f), domain_dir) + return rel, domain_dir + + return 'index.html', domain_dir + + +def _create_placeholder_illustration(path): + """Create a 48x48 placeholder PNG for zimwriterfs --illustration.""" + from PIL import Image + img = Image.new('RGB', (48, 48), color=(40, 192, 232)) + img.save(path, 'PNG') + + +# ── Crawl mode detection ────────────────────────────────────────── + + +def _get_chromium_path(config): + """Auto-detect Chromium from Playwright's cache, or use config override.""" + configured = config.get('scraper', {}).get('singlefile', {}).get('chromium_path', '') + if configured and os.path.isfile(configured): + return configured + # Playwright stores Chromium — check both root and user caches + search_paths = [ + os.path.expanduser('~/.cache/ms-playwright/chromium-*/chrome-linux*/chrome'), + '/root/.cache/ms-playwright/chromium-*/chrome-linux*/chrome', + ] + for pattern in search_paths: + matches = sorted(_glob.glob(pattern)) + if matches: + return matches[-1] + return None + + +def _detect_crawl_mode(url, config): + """ + Pre-flight detection: determine whether a URL needs a browser to crawl. + + Returns (mode, resolved_url) where mode is 'static', 'browser', or 'redirect'. + 'redirect' means the URL redirected to a different domain (parking page etc.); + resolved_url will be the final browser URL in that case. + """ + preflight_cfg = config.get('scraper', {}).get('preflight', {}) + if not preflight_cfg.get('enabled', True): + return 'static', url + + timeout = preflight_cfg.get('timeout', 30) + min_static = preflight_cfg.get('min_static_size', 5120) + min_browser = preflight_cfg.get('min_browser_size', 20480) + spa_markers = preflight_cfg.get('spa_markers', ['div#root', 'div#app', 'div#__next']) + + input_domain = urlparse(url).hostname or '' + if input_domain.startswith('www.'): + input_domain = input_domain[4:] + + # Step 1: wget single-page fetch + wget_html = '' + wget_size = 0 + try: + with tempfile.NamedTemporaryFile(suffix='.html', delete=False) as tmp: + tmp_path = tmp.name + result = subprocess.run( + ['wget', '-q', '-O', tmp_path, '--timeout=30', '--tries=1', url], + capture_output=True, text=True, timeout=timeout + 5 + ) + if os.path.isfile(tmp_path): + wget_size = os.path.getsize(tmp_path) + with open(tmp_path, 'r', errors='replace') as f: + wget_html = f.read() + os.unlink(tmp_path) + except Exception as e: + logger.debug(f"Preflight wget failed for {url}: {e}") + try: + os.unlink(tmp_path) + except Exception: + pass + + # Step 2: Playwright headless fetch + browser_html = '' + browser_size = 0 + browser_url = url + try: + from playwright.sync_api import sync_playwright + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=['--no-sandbox', '--disable-dev-shm-usage'] + ) + page = browser.new_page() + page.goto(url, wait_until='networkidle', timeout=timeout * 1000) + browser_url = page.url + browser_html = page.content() + browser_size = len(browser_html.encode('utf-8')) + browser.close() + except Exception as e: + logger.debug(f"Preflight Playwright failed for {url}: {e}") + # If Playwright fails entirely, fall back to static + return 'static', url + + # Step 3: Decision logic + browser_domain = urlparse(browser_url).hostname or '' + if browser_domain.startswith('www.'): + browser_domain = browser_domain[4:] + + # Check for cross-domain redirect (parking page detection) + if browser_domain and input_domain and browser_domain != input_domain: + logger.info(f"Preflight: {url} redirected to different domain {browser_domain}, mode=redirect") + return 'redirect', browser_url + + # Check size disparity: small wget + large browser = JS-rendered + if wget_size < min_static and browser_size > min_browser: + logger.info(f"Preflight: {url} wget={wget_size}B browser={browser_size}B, mode=browser") + return 'browser', url + + # Check for SPA shell markers in wget HTML + if wget_html: + try: + from bs4 import BeautifulSoup + soup = BeautifulSoup(wget_html, 'html.parser') + for marker in spa_markers: + # marker is like 'div#root' — split tag and id + parts = marker.split('#', 1) + tag = parts[0] if parts[0] else 'div' + elem_id = parts[1] if len(parts) > 1 else None + elem = soup.find(tag, id=elem_id) if elem_id else soup.find(tag) + if elem: + text_content = elem.get_text(strip=True) + if len(text_content) < 100: + logger.info(f"Preflight: {url} has SPA marker {marker} with {len(text_content)} chars text, mode=browser") + return 'browser', url + except Exception as e: + logger.debug(f"Preflight SPA marker check failed: {e}") + + logger.info(f"Preflight: {url} wget={wget_size}B browser={browser_size}B, mode=static") + return 'static', url + + +# ── Crawl backends ──────────────────────────────────────────────── + + +def _crawl_wget(job, url, site_dir, config, stop_event, db): + """ + wget mirror crawl backend. + Returns (page_count, error_msg) — error_msg is None on success, 'cancelled' on cancel. + """ + job_id = job['id'] + scraper_cfg = config.get('scraper', {}) + rate_limit_delay = scraper_cfg.get('rate_limit_delay', 0.5) + user_agent = scraper_cfg.get('user_agent', 'Mozilla/5.0 (compatible; RECON/1.0)') + keep_workspace = scraper_cfg.get('keep_workspace_on_failure', True) + workspace = os.path.dirname(site_dir) + + # Build reject-regex from config defaults + per-job overrides + reject_patterns = [] + skip_defaults = bool(job.get('skip_default_patterns')) + if not skip_defaults: + reject_patterns.extend(scraper_cfg.get('default_reject_patterns', [])) + additional_raw = job.get('additional_reject_patterns') + if additional_raw: + try: + additional = _json.loads(additional_raw) if isinstance(additional_raw, str) else additional_raw + if isinstance(additional, list): + reject_patterns.extend(additional) + except (ValueError, TypeError): + pass + + wget_cmd = [ + 'wget', '--mirror', '--convert-links', '--adjust-extension', + '--page-requisites', '--no-parent', + '--restrict-file-names=windows', + f'--wait={rate_limit_delay}', '--random-wait', + f'--user-agent={user_agent}', + f'--directory-prefix={site_dir}', + '--timeout=30', '--tries=3', + ] + if reject_patterns: + combined_regex = '|'.join(f'({p})' for p in reject_patterns) + wget_cmd.extend([f'--reject-regex={combined_regex}']) + logger.info(f"Job {job_id}: reject-regex has {len(reject_patterns)} patterns") + wget_cmd.append(url) + + logger.info(f"Job {job_id}: wget mirror starting") + wget_log = os.path.join(workspace, 'wget.log') + try: + with open(wget_log, 'w') as log_fh: + proc = subprocess.Popen( + wget_cmd, + stdout=log_fh, stderr=subprocess.STDOUT, + ) + db.update_scrape_job(job_id, subprocess_pid=proc.pid) + + while proc.poll() is None: + if stop_event.is_set() or _check_cancelled(db, job_id): + _kill_process(proc) + return 0, 'cancelled' + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + pass + + db.update_scrape_job(job_id, subprocess_pid=None) + + if stop_event.is_set() or _check_cancelled(db, job_id): + return 0, 'cancelled' + + # wget returns 8 for some server errors but may still have useful content + if proc.returncode not in (0, 4, 6, 8): + output = '' + try: + with open(wget_log, 'r') as f: + f.seek(max(0, os.path.getsize(wget_log) - 500)) + output = f.read() + except Exception: + pass + return 0, f"wget failed with code {proc.returncode}: {output[-500:]}" + + except Exception as e: + return 0, f"wget error: {e}" + + page_count = _count_html_files(site_dir) + logger.info(f"Job {job_id}: wget complete, {page_count} HTML pages found") + + if page_count == 0: + return 0, 'wget produced no HTML files' + + return page_count, None + + +def _crawl_singlefile(job, url, site_dir, config, stop_event, db): + """ + SingleFile CLI crawl backend for JS-rendered sites. + Returns (page_count, error_msg) — error_msg is None on success, 'cancelled' on cancel. + """ + job_id = job['id'] + scraper_cfg = config.get('scraper', {}) + sf_cfg = scraper_cfg.get('singlefile', {}) + keep_workspace = scraper_cfg.get('keep_workspace_on_failure', True) + workspace = os.path.dirname(site_dir) + + executable = sf_cfg.get('executable', 'single-file') + chromium_path = _get_chromium_path(config) + crawl_max_depth = sf_cfg.get('crawl_max_depth', 10) + crawl_delay = sf_cfg.get('crawl_delay', 2) + + if not chromium_path: + return 0, 'Chromium not found — cannot use browser crawl mode' + + # SingleFile outputs into site_dir// to match wget's structure + domain = _sanitize_domain(url) + output_dir = os.path.join(site_dir, domain) + os.makedirs(output_dir, exist_ok=True) + + sf_cmd = [ + executable, + '--crawl-links=true', + '--crawl-inner-links-only=true', + f'--crawl-max-depth={crawl_max_depth}', + f'--crawl-delay={crawl_delay * 1000}', # milliseconds + f'--browser-executable-path={chromium_path}', + '--browser-headless=true', + '--browser-args=["--no-sandbox","--disable-dev-shm-usage"]', + f'--output-directory={output_dir}', + url, + ] + + logger.info(f"Job {job_id}: SingleFile crawl starting (depth={crawl_max_depth}, delay={crawl_delay}s)") + sf_log = os.path.join(workspace, 'singlefile.log') + try: + with open(sf_log, 'w') as log_fh: + proc = subprocess.Popen( + sf_cmd, + stdout=log_fh, stderr=subprocess.STDOUT, + ) + db.update_scrape_job(job_id, subprocess_pid=proc.pid) + + while proc.poll() is None: + if stop_event.is_set() or _check_cancelled(db, job_id): + _kill_process(proc) + return 0, 'cancelled' + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + pass + + db.update_scrape_job(job_id, subprocess_pid=None) + + if stop_event.is_set() or _check_cancelled(db, job_id): + return 0, 'cancelled' + + if proc.returncode != 0: + output = '' + try: + with open(sf_log, 'r') as f: + f.seek(max(0, os.path.getsize(sf_log) - 500)) + output = f.read() + except Exception: + pass + # SingleFile may still produce some files even with non-zero exit + page_count = _count_html_files(site_dir) + if page_count == 0: + return 0, f"SingleFile failed with code {proc.returncode}: {output[-500:]}" + logger.warning(f"Job {job_id}: SingleFile exited {proc.returncode} but produced {page_count} pages, continuing") + + except Exception as e: + return 0, f"SingleFile error: {e}" + + # If no index.html exists, rename the first HTML file to index.html + index_path = os.path.join(output_dir, 'index.html') + if not os.path.isfile(index_path): + for f in sorted(os.listdir(output_dir)): + if f.lower().endswith(('.html', '.htm')): + src = os.path.join(output_dir, f) + os.rename(src, index_path) + logger.info(f"Job {job_id}: renamed {f} → index.html") + break + + page_count = _count_html_files(site_dir) + logger.info(f"Job {job_id}: SingleFile complete, {page_count} HTML pages found") + + if page_count == 0: + return 0, 'SingleFile produced no HTML files' + + return page_count, None + + +# ── Main job pipeline ───────────────────────────────────────────── + + +def _process_job(job, config, stop_event): + """Execute the full scrape pipeline for a single job.""" + db = StatusDB() + job_id = job['id'] + url = job['url'] + title = job.get('title') or _sanitize_domain(url) + language = job.get('language') or config.get('scraper', {}).get('default_language', 'eng') + category = job.get('category') or '' + + scraper_cfg = config.get('scraper', {}) + workspace_root = scraper_cfg.get('workspace', '/opt/recon/data/scraper') + output_dir = scraper_cfg.get('output_dir', '/mnt/kiwix') + keep_workspace = scraper_cfg.get('keep_workspace_on_failure', True) + + workspace = os.path.join(workspace_root, str(job_id)) + site_dir = os.path.join(workspace, 'site') + os.makedirs(site_dir, exist_ok=True) + + domain = _sanitize_domain(url) + date_tag = datetime.now().strftime('%Y-%m') + zim_filename = f"{_sanitize_filename(domain)}_{language}_{date_tag}.zim" + zim_path = os.path.join(output_dir, zim_filename) + + logger.info(f"Job {job_id}: starting scrape of {url}") + db.update_scrape_job(job_id, + status='scraping', + workspace_path=workspace, + started_at=_now()) + + # ── Phase 0: Pre-flight mode detection ───────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id, workspace, keep_workspace) + return + + pre_set = job.get('crawl_mode') + if pre_set: + crawl_mode, resolved_url = pre_set, url + logger.info(f"Job {job_id}: using pre-set crawl_mode={crawl_mode}") + else: + crawl_mode, resolved_url = _detect_crawl_mode(url, config) + logger.info(f"Job {job_id}: detected crawl_mode={crawl_mode}") + + db.update_scrape_job(job_id, crawl_mode=crawl_mode) + + # If redirect detected, update domain/filename to match resolved URL + if crawl_mode == 'redirect' and resolved_url != url: + logger.info(f"Job {job_id}: URL resolved from {url} → {resolved_url}") + domain = _sanitize_domain(resolved_url) + zim_filename = f"{_sanitize_filename(domain)}_{language}_{date_tag}.zim" + zim_path = os.path.join(output_dir, zim_filename) + + # ── Phase A: Crawl (dispatch to backend) ──────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id, workspace, keep_workspace) + return + + if crawl_mode == 'browser': + page_count, error = _crawl_singlefile(job, resolved_url, site_dir, config, stop_event, db) + else: # 'static' or 'redirect' + page_count, error = _crawl_wget(job, resolved_url, site_dir, config, stop_event, db) + + if error == 'cancelled': + _handle_cancel(db, job_id, workspace, keep_workspace) + return + elif error: + db.update_scrape_job(job_id, + status='failed', + error_message=error, + subprocess_pid=None, + completed_at=_now()) + if not keep_workspace: + shutil.rmtree(workspace, ignore_errors=True) + return + + db.update_scrape_job(job_id, page_count=page_count) + + # ── Phase B: Prepare zimwriterfs inputs ──────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id, workspace, keep_workspace) + return + + welcome_page, content_dir = _find_welcome_page(site_dir, domain) + if welcome_page is None: + welcome_page = 'index.html' + + illustration_path = os.path.join(workspace, 'illustration.png') + _create_placeholder_illustration(illustration_path) + illust_dest = os.path.join(content_dir, 'illustration.png') + shutil.copy2(illustration_path, illust_dest) + + description = f"Mirror of {domain}" + if category: + description = f"{category} — mirror of {domain}" + + logger.info(f"Job {job_id}: packaging ZIM (welcome={welcome_page}, content_dir={content_dir})") + db.update_scrape_job(job_id, status='packaging') + + # ── Phase C: zimwriterfs ─────────────────────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id, workspace, keep_workspace) + return + + zim_name = _sanitize_filename(domain) + long_description = f"Offline mirror of {resolved_url} created by RECON web scraper" + + zim_cmd = [ + 'zimwriterfs', + f'--welcome={welcome_page}', + f'--illustration=illustration.png', + f'--language={language}', + f'--title={title}', + f'--description={description[:80]}', + f'--longDescription={long_description[:4096]}', + f'--name={zim_name}', + f'--creator={domain}', + '--publisher=RECON', + content_dir, + zim_path, + ] + + zim_log = os.path.join(workspace, 'zimwriterfs.log') + try: + with open(zim_log, 'w') as log_fh: + proc = subprocess.Popen( + zim_cmd, + stdout=log_fh, stderr=subprocess.STDOUT, + ) + db.update_scrape_job(job_id, subprocess_pid=proc.pid) + + while proc.poll() is None: + if stop_event.is_set() or _check_cancelled(db, job_id): + _kill_process(proc) + _handle_cancel(db, job_id, workspace, keep_workspace) + return + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + pass + + db.update_scrape_job(job_id, subprocess_pid=None) + + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id, workspace, keep_workspace) + return + + if proc.returncode != 0: + output = '' + try: + with open(zim_log, 'r') as f: + f.seek(max(0, os.path.getsize(zim_log) - 500)) + output = f.read() + except Exception: + pass + raise RuntimeError(f"zimwriterfs failed with code {proc.returncode}: {output[-500:]}") + + except RuntimeError: + raise + except Exception as e: + db.update_scrape_job(job_id, + status='failed', + error_message=f"zimwriterfs error: {e}", + subprocess_pid=None, + completed_at=_now()) + if not keep_workspace: + shutil.rmtree(workspace, ignore_errors=True) + return + + if not os.path.isfile(zim_path): + db.update_scrape_job(job_id, + status='failed', + error_message='zimwriterfs produced no output file', + completed_at=_now()) + return + + logger.info(f"Job {job_id}: ZIM created at {zim_path}") + + # ── Phase D: kiwix-manage + registration ─────────────────────── + if stop_event.is_set() or _check_cancelled(db, job_id): + _handle_cancel(db, job_id, workspace, keep_workspace) + return + + kiwix_manage = shutil.which('kiwix-manage') or '/opt/recon/bin/kiwix-manage' + library_xml = '/mnt/kiwix/library.xml' + + try: + subprocess.run( + [kiwix_manage, library_xml, 'add', zim_path], + capture_output=True, text=True, timeout=30 + ) + logger.info(f"Job {job_id}: registered with kiwix-serve library") + except Exception as e: + logger.warning(f"Job {job_id}: kiwix-manage add failed: {e}") + + try: + result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pid = int(result.stdout.strip().split()[0]) + os.kill(pid, signal.SIGHUP) + logger.info(f"Job {job_id}: sent SIGHUP to kiwix-serve (pid {pid})") + except Exception as e: + logger.warning(f"Job {job_id}: failed to signal kiwix-serve: {e}") + + zim_source_id = None + try: + from .zim_monitor import scan_zims + scan_zims() + conn = db._get_conn() + row = conn.execute( + "SELECT id FROM zim_sources WHERE zim_filename = ?", (zim_filename,) + ).fetchone() + if row: + zim_source_id = row['id'] + logger.info(f"Job {job_id}: linked to zim_source_id={zim_source_id}") + except Exception as e: + logger.warning(f"Job {job_id}: scan_zims failed: {e}") + + try: + shutil.rmtree(workspace, ignore_errors=True) + except Exception: + pass + + db.update_scrape_job(job_id, + status='complete', + zim_filename=zim_filename, + zim_source_id=zim_source_id, + completed_at=_now()) + + logger.info(f"Job {job_id}: complete — {zim_filename} ({page_count} pages, mode={crawl_mode})") + + +def _handle_cancel(db, job_id, workspace, keep_workspace): + """Handle job cancellation: clean up and update status.""" + logger.info(f"Job {job_id}: cancelled") + db.update_scrape_job(job_id, + status='cancelled', + subprocess_pid=None, + completed_at=_now()) + if not keep_workspace: + shutil.rmtree(workspace, ignore_errors=True) diff --git a/lib/status.py b/lib/status.py index 20cc77b..974cabd 100644 --- a/lib/status.py +++ b/lib/status.py @@ -105,6 +105,25 @@ class StatusDB: except Exception: pass # column already exists + # Migration: add subprocess_pid column to scrape_jobs if missing + try: + conn.execute("ALTER TABLE scrape_jobs ADD COLUMN subprocess_pid INTEGER") + except Exception: + pass # column already exists + + # Migration: add reject pattern columns to scrape_jobs if missing + for col, coltype in [('additional_reject_patterns', 'TEXT'), ('skip_default_patterns', 'INTEGER DEFAULT 0')]: + try: + conn.execute(f"ALTER TABLE scrape_jobs ADD COLUMN {col} {coltype}") + except Exception: + pass # column already exists + + # Migration: add crawl_mode column to scrape_jobs if missing + try: + conn.execute("ALTER TABLE scrape_jobs ADD COLUMN crawl_mode TEXT") + except Exception: + pass # column already exists + # Stream B: file_operations + duplicate_review tables conn.executescript(""" CREATE TABLE IF NOT EXISTS file_operations ( @@ -142,6 +161,28 @@ class StatusDB: resolved_at TEXT ); CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status); + + CREATE TABLE IF NOT EXISTS scrape_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + title TEXT, + language TEXT DEFAULT 'eng', + category TEXT, + status TEXT DEFAULT 'pending', + page_count INTEGER DEFAULT 0, + error_message TEXT, + zim_filename TEXT, + zim_source_id INTEGER, + workspace_path TEXT, + subprocess_pid INTEGER, + additional_reject_patterns TEXT, + skip_default_patterns INTEGER DEFAULT 0, + crawl_mode TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + started_at TEXT, + completed_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_scrape_status ON scrape_jobs(status); """) conn.commit() @@ -406,6 +447,50 @@ class StatusDB: ) conn.commit() + + # ── Scraper Job Helpers ───────────────────────────────────── + + def get_pending_scrape_job(self): + """Fetch the oldest pending scrape job.""" + conn = self._get_conn() + row = conn.execute( + "SELECT * FROM scrape_jobs WHERE status = 'pending' ORDER BY id ASC LIMIT 1" + ).fetchone() + return dict(row) if row else None + + def update_scrape_job(self, job_id, **kwargs): + """Update arbitrary columns on a scrape job.""" + if not kwargs: + return + conn = self._get_conn() + sets = [] + vals = [] + for k, v in kwargs.items(): + sets.append(f"{k} = ?") + vals.append(v) + vals.append(job_id) + conn.execute(f"UPDATE scrape_jobs SET {', '.join(sets)} WHERE id = ?", vals) + conn.commit() + + def get_scrape_jobs(self, status=None): + """List scrape jobs, optionally filtered by status.""" + conn = self._get_conn() + if status: + rows = conn.execute( + "SELECT * FROM scrape_jobs WHERE status = ? ORDER BY id DESC", (status,) + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM scrape_jobs ORDER BY id DESC" + ).fetchall() + return [dict(r) for r in rows] + + def get_scrape_job(self, job_id): + """Get a single scrape job by ID.""" + conn = self._get_conn() + row = conn.execute("SELECT * FROM scrape_jobs WHERE id = ?", (job_id,)).fetchone() + return dict(row) if row else None + # ── Stream B: File Operations ─────────────────────────────────── def log_file_operation(self, doc_hash, operation, source_path, target_path, diff --git a/recon.py b/recon.py index 47dda7d..9635a59 100755 --- a/recon.py +++ b/recon.py @@ -692,12 +692,23 @@ def cmd_service(args): daemon=True, name='dashboard'), ] + # Scraper daemon: polls for pending scrape jobs, runs wget+zimwriterfs pipeline + scraper_cfg = config.get('scraper', {}) + if scraper_cfg.get('workspace'): + from lib.scraper_runner import scraper_loop + threads.append( + threading.Thread(target=lambda: scraper_loop(stop_event, config), + daemon=True, name='scraper') + ) + logger.info("=== RECON Service Starting ===") logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") pt_interval = config.get("peertube", {}).get("poll_interval", 1800) logger.info(f" PeerTube acquisition: every {pt_interval}s") + if scraper_cfg.get('workspace'): + logger.info(f" Scraper: every {scraper_cfg.get('poll_interval', 300)}s") logger.info(f" Progress: every {progress_interval}s") for t in threads: