Merge feature/scraper: Zimit-based web scraper

Replaces wget/SingleFile/Playwright crawl backends with Zimit (openZIM
Docker crawler). Produces ZIM files directly — no zimwriterfs step.
Validated with meshtastic.org (3400+ page Docusaurus site).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-04-19 19:37:04 +00:00
commit c5283ece3e
9 changed files with 993 additions and 13 deletions

1
.gitignore vendored
View file

@ -27,3 +27,4 @@ recon.db
# Kiwix binary tools (installed from tarball) # Kiwix binary tools (installed from tarball)
bin/ bin/
status.db

View file

@ -413,6 +413,14 @@ peertube:
rate_limit_delay: 0.5 # Delay between video ingestions (seconds) rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min) poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
scraper:
workspace: /opt/recon/data/scraper # Working directory (tmp dirs for Zimit output)
output_dir: /mnt/kiwix # Finished .zim files land here (kiwix-serve library)
default_language: eng # ISO 639-3 language code for ZIM metadata
poll_interval: 300 # Seconds between checking for pending scrape jobs
docker_image: ghcr.io/openzim/zimit # Zimit Docker image for web crawling
docker_workers: 2 # Concurrent crawl workers inside Zimit container
# Stream B: New Library Pipeline # Stream B: New Library Pipeline
new_pipeline: new_pipeline:
# Disabled 2026-04-14 for refactor — see refactored-recon repo for context # Disabled 2026-04-14 for refactor — see refactored-recon repo for context

View file

@ -44,6 +44,20 @@ app = Flask(__name__,
app.config['MAX_CONTENT_LENGTH'] = None # ZIM files can be multi-GB app.config['MAX_CONTENT_LENGTH'] = None # ZIM files can be multi-GB
# ── Large ZIM upload support ──
# Override stream factory so ZIM uploads write directly to /mnt/kiwix/
# instead of /tmp (which is on the 96GB root disk and can't hold 100GB+ ZIMs).
from flask import Request as _FlaskRequest
class _LargeZimRequest(_FlaskRequest):
def _get_file_stream(self, total_content_length, content_type, filename=None, content_length=None):
if filename and filename.lower().endswith('.zim'):
return tempfile.NamedTemporaryFile('wb+', dir='/mnt/kiwix', prefix='.upload_', suffix='.tmp', delete=False)
return super()._get_file_stream(total_content_length, content_type, filename, content_length)
app.request_class = _LargeZimRequest
# ── Navigation Constants ── # ── Navigation Constants ──
KNOWLEDGE_SUBNAV = [ KNOWLEDGE_SUBNAV = [
@ -60,7 +74,10 @@ PEERTUBE_SUBNAV = [
] ]
KIWIX_SUBNAV = [] # Single-page, no subnav needed KIWIX_SUBNAV = [
{'href': '/kiwix', 'label': 'Library'},
{'href': '/kiwix/scraper', 'label': 'Scraper'},
]
SETTINGS_SUBNAV = [ SETTINGS_SUBNAV = [
{'href': '/settings/keys', 'label': 'API Keys'}, {'href': '/settings/keys', 'label': 'API Keys'},
{'href': '/settings/cookies', 'label': 'YouTube Cookies'}, {'href': '/settings/cookies', 'label': 'YouTube Cookies'},
@ -1956,6 +1973,12 @@ def kiwix_dashboard():
domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix') domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix')
@app.route('/kiwix/scraper')
def kiwix_scraper():
return render_template('kiwix/scraper.html',
domain='kiwix', subnav=KIWIX_SUBNAV, active_page='/kiwix/scraper')
@app.route('/api/kiwix/sources') @app.route('/api/kiwix/sources')
def api_kiwix_sources(): def api_kiwix_sources():
"""Serve pre-cached Kiwix sources data (never blocks).""" """Serve pre-cached Kiwix sources data (never blocks)."""
@ -2011,14 +2034,23 @@ def api_kiwix_upload():
filename = secure_filename(f.filename) filename = secure_filename(f.filename)
dest = os.path.join('/mnt/kiwix', filename) dest = os.path.join('/mnt/kiwix', filename)
tmp_dest = dest + '.tmp'
try: try:
f.save(tmp_dest) # Stream was written directly to /mnt/kiwix/ by _LargeZimRequest —
os.rename(tmp_dest, dest) # rename in-place instead of copying 100GB+ through f.save()
if hasattr(f.stream, 'name') and f.stream.name:
tmp_path = f.stream.name
f.stream.close()
os.rename(tmp_path, dest)
else:
tmp_dest = dest + '.tmp'
f.save(tmp_dest)
os.rename(tmp_dest, dest)
except Exception as e: except Exception as e:
if os.path.exists(tmp_dest): # Clean up any temp files on failure
os.remove(tmp_dest) for p in [locals().get('tmp_path', ''), locals().get('tmp_dest', '')]:
if p and os.path.exists(p):
os.remove(p)
return jsonify({'error': f'Save failed: {e}'}), 500 return jsonify({'error': f'Save failed: {e}'}), 500
# Register with kiwix-serve library # Register with kiwix-serve library
@ -2051,23 +2083,24 @@ def api_kiwix_upload():
@app.route('/api/kiwix/remove/<int:source_id>', methods=['POST']) def _full_zim_cleanup(source_id):
def api_kiwix_remove(source_id): """Full ZIM cleanup: Qdrant vectors, DB records, kiwix-manage, SIGHUP, file delete.
"""Remove a ZIM source: delete vectors, DB records, library entry, and file.""" Returns dict with results. Caller handles cache refresh."""
import subprocess import subprocess
import signal
import requests as req import requests as req
db = StatusDB() db = StatusDB()
conn = db._get_conn() conn = db._get_conn()
row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone() row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone()
if not row: if not row:
return jsonify({'error': 'Source not found'}), 404 return None
zim_source = dict(row) zim_source = dict(row)
zim_filename = zim_source['zim_filename'] zim_filename = zim_source['zim_filename']
zim_path = zim_source['zim_path'] zim_path = zim_source['zim_path']
zim_title = zim_source.get('title', zim_filename) zim_title = zim_source.get('title', zim_filename)
results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False} results = {'vectors_deleted': 0, 'docs_deleted': 0, 'file_deleted': False, 'scrape_jobs_deleted': 0}
# Step 1: Find all document hashes for this ZIM source # Step 1: Find all document hashes for this ZIM source
doc_hashes = [r['hash'] for r in conn.execute( doc_hashes = [r['hash'] for r in conn.execute(
@ -2126,7 +2159,6 @@ def api_kiwix_remove(source_id):
# Step 4: Remove from kiwix-serve library # Step 4: Remove from kiwix-serve library
try: try:
# Get the book ID from library.xml
subprocess.run( subprocess.run(
['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', zim_filename.replace('.zim', '')], ['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove', zim_filename.replace('.zim', '')],
capture_output=True, text=True, timeout=10 capture_output=True, text=True, timeout=10
@ -2134,6 +2166,16 @@ def api_kiwix_remove(source_id):
except Exception as e: except Exception as e:
logger.warning(f"kiwix-manage remove failed: {e}") logger.warning(f"kiwix-manage remove failed: {e}")
# Step 4b: SIGHUP kiwix-serve to reload library
try:
result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
pid = int(result.stdout.strip().split()[0])
os.kill(pid, signal.SIGHUP)
logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})")
except Exception as e:
logger.warning(f"Failed to signal kiwix-serve: {e}")
# Step 5: Delete the ZIM file # Step 5: Delete the ZIM file
if os.path.isfile(zim_path): if os.path.isfile(zim_path):
try: try:
@ -2143,13 +2185,37 @@ def api_kiwix_remove(source_id):
logger.warning(f"ZIM file delete failed: {e}") logger.warning(f"ZIM file delete failed: {e}")
results['file_deleted'] = False results['file_deleted'] = False
# Step 6: Delete any linked scrape_jobs rows
try:
res = conn.execute("DELETE FROM scrape_jobs WHERE zim_source_id = ?", (source_id,))
conn.commit()
results['scrape_jobs_deleted'] = res.rowcount
except Exception as e:
logger.warning(f"scrape_jobs cleanup failed: {e}")
logger.info(f"Full ZIM cleanup for source {source_id} ('{zim_title}'): {results}")
return results
@app.route('/api/kiwix/remove/<int:source_id>', methods=['POST'])
def api_kiwix_remove(source_id):
"""Remove a ZIM source: delete vectors, DB records, library entry, and file."""
db = StatusDB()
conn = db._get_conn()
row = conn.execute("SELECT * FROM zim_sources WHERE id = ?", (source_id,)).fetchone()
if not row:
return jsonify({'error': 'Source not found'}), 404
results = _full_zim_cleanup(source_id)
if results is None:
return jsonify({'error': 'Source not found during cleanup'}), 404
# Refresh cache # Refresh cache
try: try:
_cache['kiwix_sources'] = _build_kiwix_sources() _cache['kiwix_sources'] = _build_kiwix_sources()
except Exception: except Exception:
pass pass
logger.info(f"Removed ZIM source '{zim_title}': {results}")
return jsonify({'ok': True, 'results': results}) return jsonify({'ok': True, 'results': results})
@ -2256,6 +2322,170 @@ def _build_kiwix_sources():
} }
# ── Scraper API ──
@app.route('/api/scraper/submit', methods=['POST'])
def api_scraper_submit():
"""Submit a new web scrape job."""
data = request.get_json(silent=True) or {}
url = (data.get('url') or '').strip()
if not url:
return jsonify({'error': 'url is required'}), 400
if not url.startswith(('http://', 'https://')):
return jsonify({'error': 'URL must start with http:// or https://'}), 400
config = get_config()
scraper_cfg = config.get('scraper', {})
language = data.get('language') or scraper_cfg.get('default_language', 'eng')
title = data.get('title', '').strip() or None
category = data.get('category', '').strip() or None
db = StatusDB()
conn = db._get_conn()
conn.execute(
"INSERT INTO scrape_jobs (url, title, language, category, crawl_mode) VALUES (?, ?, ?, ?, ?)",
(url, title, language, category, 'zimit')
)
conn.commit()
job_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
logger.info(f"Scraper job {job_id} submitted: {url}")
return jsonify({'ok': True, 'job_id': job_id}), 201
@app.route('/api/scraper/jobs')
def api_scraper_jobs():
"""List scrape jobs, optionally filtered by status."""
status_filter = request.args.get('status')
db = StatusDB()
jobs = db.get_scrape_jobs(status=status_filter)
return jsonify({'jobs': jobs})
@app.route('/api/scraper/cancel/<int:job_id>', methods=['POST'])
def api_scraper_cancel(job_id):
"""Cancel a scrape job."""
db = StatusDB()
job = db.get_scrape_job(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
if job['status'] in ('complete', 'cancelled'):
return jsonify({'error': f"Job already {job['status']}"}), 400
# Set cancelled in DB — the runner loop checks this between phases
db.update_scrape_job(job_id, status='cancelled')
# Stop the Docker container if running
container_name = f'recon-scraper-{job_id}'
try:
import subprocess as _subprocess
_subprocess.run(['docker', 'rm', '-f', container_name],
capture_output=True, timeout=10)
except Exception:
pass
logger.info(f"Scraper job {job_id} cancelled")
return jsonify({'ok': True})
@app.route('/api/scraper/retry/<int:job_id>', methods=['POST'])
def api_scraper_retry(job_id):
"""Retry a failed or cancelled scrape job."""
db = StatusDB()
job = db.get_scrape_job(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
if job['status'] not in ('failed', 'cancelled'):
return jsonify({'error': f"Job status is '{job['status']}', can only retry failed or cancelled jobs"}), 400
db.update_scrape_job(job_id,
status='pending',
error_message=None,
subprocess_pid=None,
crawl_mode=None,
started_at=None,
completed_at=None)
logger.info(f"Scraper job {job_id} reset to pending for retry")
return jsonify({'ok': True})
@app.route('/api/scraper/delete/<int:job_id>', methods=['POST'])
def api_scraper_delete(job_id):
"""Delete a scrape job and clean up any associated ZIM artifacts."""
import subprocess
import signal
db = StatusDB()
job = db.get_scrape_job(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
if job['status'] == 'running':
return jsonify({'error': 'Cannot delete a running job \u2014 cancel it first'}), 400
zim_cleanup_results = None
# If the job has a linked zim_source, do full cleanup
if job.get('zim_source_id'):
zim_cleanup_results = _full_zim_cleanup(job['zim_source_id'])
try:
_cache['kiwix_sources'] = _build_kiwix_sources()
except Exception:
pass
elif job.get('zim_filename'):
# No zim_source row, but there may be an orphan file + library entry
zim_path = os.path.join('/mnt/kiwix', job['zim_filename'])
if os.path.isfile(zim_path):
try:
os.remove(zim_path)
logger.info(f"Deleted orphan ZIM file: {zim_path}")
except Exception as e:
logger.warning(f"Failed to delete orphan ZIM file {zim_path}: {e}")
try:
subprocess.run(
['/opt/recon/bin/kiwix-manage', '/mnt/kiwix/library.xml', 'remove',
job['zim_filename'].replace('.zim', '')],
capture_output=True, text=True, timeout=10
)
except Exception as e:
logger.warning(f"kiwix-manage remove failed for orphan: {e}")
try:
result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
pid = int(result.stdout.strip().split()[0])
os.kill(pid, signal.SIGHUP)
logger.info(f"Sent SIGHUP to kiwix-serve (pid {pid})")
except Exception as e:
logger.warning(f"Failed to signal kiwix-serve: {e}")
# Delete the scrape_jobs row (may already be gone if _full_zim_cleanup deleted it)
conn = db._get_conn()
conn.execute("DELETE FROM scrape_jobs WHERE id = ?", (job_id,))
conn.commit()
logger.info(f"Scraper job {job_id} deleted (zim_cleanup={zim_cleanup_results})")
return jsonify({'ok': True, 'zim_cleanup': zim_cleanup_results})
@app.route('/api/scraper/clear-failed', methods=['POST'])
def api_scraper_clear_failed():
"""Delete all failed and cancelled scrape jobs."""
db = StatusDB()
conn = db._get_conn()
result = conn.execute("DELETE FROM scrape_jobs WHERE status IN ('failed', 'cancelled')")
conn.commit()
count = result.rowcount
logger.info(f"Cleared {count} failed/cancelled scraper jobs")
return jsonify({'ok': True, 'deleted': count})
# ── Metrics API ── # ── Metrics API ──
@app.route('/api/metrics/history') @app.route('/api/metrics/history')

387
lib/scraper_runner.py Normal file
View file

@ -0,0 +1,387 @@
"""
RECON Scraper Runner
Daemon loop that processes scrape jobs: crawl via Zimit kiwix-manage.
Zimit (openZIM Docker crawler) handles all site types and produces ZIM
files directly no separate zimwriterfs step needed.
Public entry point: scraper_loop(stop_event, config).
Config section: scraper (output_dir, docker_image, docker_workers, poll_interval)
DB table: scrape_jobs (status flow: pending scraping registering complete)
"""
import glob as _glob
import os
import re
import shutil
import signal
import subprocess
import time
from datetime import datetime, timezone
from urllib.parse import urlparse
from .utils import setup_logging
from .status import StatusDB
logger = setup_logging('recon.scraper_runner')
def scraper_loop(stop_event, config):
"""Daemon loop: poll for pending scrape jobs, execute pipeline."""
scraper_cfg = config.get('scraper', {})
poll_interval = scraper_cfg.get('poll_interval', 300)
logger.info("Scraper runner started")
# Clean up any orphan Zimit containers from a previous crash
_cleanup_orphan_containers()
while not stop_event.is_set():
db = StatusDB()
job = db.get_pending_scrape_job()
if job:
try:
_process_job(job, config, stop_event)
except Exception as e:
logger.error(f"Scraper job {job['id']} unexpected error: {e}", exc_info=True)
try:
db.update_scrape_job(job['id'],
status='failed',
error_message=str(e)[:1000],
subprocess_pid=None,
completed_at=_now())
except Exception:
pass
else:
stop_event.wait(poll_interval)
logger.info("Scraper runner stopped")
def _now():
return datetime.now(timezone.utc).isoformat()
def _sanitize_domain(url):
"""Extract and sanitize domain from URL for use in filenames."""
parsed = urlparse(url)
domain = parsed.hostname or 'unknown'
if domain.startswith('www.'):
domain = domain[4:]
return domain
def _sanitize_filename(s):
"""Sanitize a string for safe filename use."""
return re.sub(r'[^a-zA-Z0-9._-]', '_', s)
def _check_cancelled(db, job_id):
"""Check if a job has been cancelled in the DB."""
job = db.get_scrape_job(job_id)
return job and job['status'] == 'cancelled'
def _kill_process(proc, timeout=5):
"""Gracefully terminate a subprocess, force kill if needed."""
if proc.poll() is not None:
return
try:
proc.terminate()
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=2)
def _cleanup_orphan_containers():
"""Remove any leftover recon-scraper-* Docker containers from a previous crash."""
try:
result = subprocess.run(
['docker', 'ps', '-a', '--filter', 'name=recon-scraper-', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
for name in result.stdout.strip().split('\n'):
name = name.strip()
if name:
subprocess.run(['docker', 'rm', '-f', name], capture_output=True, timeout=10)
logger.info(f"Cleaned up orphan container: {name}")
except Exception as e:
logger.warning(f"Orphan container cleanup failed: {e}")
# ── Zimit crawl backend ──────────────────────────────────────────
def _crawl_zimit(job, config, stop_event, db):
"""
Crawl a URL using Zimit (openZIM Docker crawler).
Returns (page_count, zim_filename, error_msg).
On success: (count, filename, None)
On failure: (0, None, error_string)
"""
job_id = job['id']
url = job['url']
title = job.get('title') or _sanitize_domain(url)
language = job.get('language') or config.get('scraper', {}).get('default_language', 'eng')
category = job.get('category') or ''
scraper_cfg = config.get('scraper', {})
output_dir = scraper_cfg.get('output_dir', '/mnt/kiwix')
docker_image = scraper_cfg.get('docker_image', 'ghcr.io/openzim/zimit')
docker_workers = scraper_cfg.get('docker_workers', 2)
domain = _sanitize_domain(url)
date_tag = datetime.now().strftime('%Y-%m')
container_name = f'recon-scraper-{job_id}'
tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}')
# Clean up any pre-existing container with same name (retry scenario)
subprocess.run(['docker', 'rm', '-f', container_name], capture_output=True, timeout=10)
os.makedirs(tmp_dir, exist_ok=True)
description = f"Mirror of {domain}"
if category:
description = f"{category} — mirror of {domain}"
docker_cmd = [
'docker', 'run',
'--name', container_name,
'-v', f'{tmp_dir}:/output',
docker_image,
'zimit',
'--seeds', url,
'--name', _sanitize_filename(domain),
'--zim-lang', language,
'--title', title,
'--description', description[:80],
'--output', '/output',
'-w', str(docker_workers),
]
logger.info(f"Job {job_id}: Zimit crawl starting — {url}")
try:
proc = subprocess.Popen(
docker_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
db.update_scrape_job(job_id, subprocess_pid=proc.pid)
last_progress_check = 0
while proc.poll() is None:
if stop_event.is_set() or _check_cancelled(db, job_id):
# Stop the Docker container
subprocess.run(['docker', 'rm', '-f', container_name],
capture_output=True, timeout=10)
_kill_process(proc)
shutil.rmtree(tmp_dir, ignore_errors=True)
return 0, None, 'cancelled'
# Check progress every 30s via docker logs
now = time.time()
if now - last_progress_check >= 30:
last_progress_check = now
try:
log_result = subprocess.run(
['docker', 'logs', '--tail', '20', container_name],
capture_output=True, text=True, timeout=10
)
if log_result.returncode == 0:
# Browsertrix logs JSON with "crawled":N — check both stdout and stderr
log_text = log_result.stdout or log_result.stderr or ''
lines = log_text.strip().split('\n')
for line in reversed(lines):
match = re.search(r'"crawled":(\d+)', line)
if match:
count = int(match.group(1))
if count > 0:
db.update_scrape_job(job_id, page_count=count)
break
except Exception:
pass
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
pass
db.update_scrape_job(job_id, subprocess_pid=None)
if stop_event.is_set() or _check_cancelled(db, job_id):
shutil.rmtree(tmp_dir, ignore_errors=True)
return 0, None, 'cancelled'
if proc.returncode != 0:
# Capture last 50 lines of docker logs for error context
error_msg = f"Zimit exited with code {proc.returncode}"
try:
log_result = subprocess.run(
['docker', 'logs', '--tail', '50', container_name],
capture_output=True, text=True, timeout=10
)
log_text = (log_result.stderr or log_result.stdout or '').strip()
if log_text:
# Take last 500 chars
error_msg += f": {log_text[-500:]}"
except Exception:
pass
# Remove container (no --rm flag, so we clean up manually)
subprocess.run(['docker', 'rm', '-f', container_name],
capture_output=True, timeout=10)
shutil.rmtree(tmp_dir, ignore_errors=True)
return 0, None, error_msg
except Exception as e:
shutil.rmtree(tmp_dir, ignore_errors=True)
return 0, None, f"Zimit error: {e}"
# Remove container (no --rm flag, so we clean up manually after getting logs)
subprocess.run(['docker', 'rm', '-f', container_name],
capture_output=True, timeout=10)
# Find the output ZIM file
zim_files = _glob.glob(os.path.join(tmp_dir, '*.zim'))
if not zim_files:
shutil.rmtree(tmp_dir, ignore_errors=True)
return 0, None, 'Zimit produced no ZIM file'
src_zim = zim_files[0] # Should be exactly one
# Get page count from file size as rough estimate if we don't have one
page_count = 0
try:
job_state = db.get_scrape_job(job_id)
page_count = job_state.get('page_count') or 0
except Exception:
pass
# Rename to final location
zim_filename = f"{_sanitize_filename(domain)}_{language}_{date_tag}_{job_id}.zim"
zim_path = os.path.join(output_dir, zim_filename)
try:
shutil.move(src_zim, zim_path)
except Exception as e:
shutil.rmtree(tmp_dir, ignore_errors=True)
return 0, None, f"Failed to move ZIM to output dir: {e}"
shutil.rmtree(tmp_dir, ignore_errors=True)
logger.info(f"Job {job_id}: Zimit complete — {zim_filename}")
return page_count, zim_filename, None
# ── Main job pipeline ─────────────────────────────────────────────
def _process_job(job, config, stop_event):
"""Execute the full scrape pipeline for a single job."""
db = StatusDB()
job_id = job['id']
logger.info(f"Job {job_id}: starting scrape of {job['url']}")
# ── Phase 1: Crawl via Zimit ───────────────────────────────────
db.update_scrape_job(job_id,
status='scraping',
crawl_mode='zimit',
started_at=_now())
if stop_event.is_set() or _check_cancelled(db, job_id):
_handle_cancel(db, job_id)
return
page_count, zim_filename, error = _crawl_zimit(job, config, stop_event, db)
if error == 'cancelled':
_handle_cancel(db, job_id)
return
elif error:
db.update_scrape_job(job_id,
status='failed',
error_message=error[:1000],
subprocess_pid=None,
completed_at=_now())
return
db.update_scrape_job(job_id, page_count=page_count)
# ── Phase 2: Register with kiwix-serve ─────────────────────────
if stop_event.is_set() or _check_cancelled(db, job_id):
_handle_cancel(db, job_id)
return
db.update_scrape_job(job_id, status='registering')
output_dir = config.get('scraper', {}).get('output_dir', '/mnt/kiwix')
zim_path = os.path.join(output_dir, zim_filename)
kiwix_manage = shutil.which('kiwix-manage') or '/opt/recon/bin/kiwix-manage'
library_xml = '/mnt/kiwix/library.xml'
try:
subprocess.run(
[kiwix_manage, library_xml, 'add', zim_path],
capture_output=True, text=True, timeout=30
)
logger.info(f"Job {job_id}: registered with kiwix-serve library")
except Exception as e:
logger.warning(f"Job {job_id}: kiwix-manage add failed: {e}")
try:
result = subprocess.run(['pidof', 'kiwix-serve'], capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
pid = int(result.stdout.strip().split()[0])
os.kill(pid, signal.SIGHUP)
logger.info(f"Job {job_id}: sent SIGHUP to kiwix-serve (pid {pid})")
except Exception as e:
logger.warning(f"Job {job_id}: failed to signal kiwix-serve: {e}")
# Wait for kiwix-serve to reload its catalog after SIGHUP
time.sleep(3)
zim_source_id = None
try:
from .zim_monitor import scan_zims
scan_zims()
conn = db._get_conn()
row = conn.execute(
"SELECT id FROM zim_sources WHERE zim_filename = ?", (zim_filename,)
).fetchone()
if row:
zim_source_id = row['id']
logger.info(f"Job {job_id}: linked to zim_source_id={zim_source_id}")
except Exception as e:
logger.warning(f"Job {job_id}: scan_zims failed: {e}")
# ── Phase 3: Complete ──────────────────────────────────────────
db.update_scrape_job(job_id,
status='complete',
zim_filename=zim_filename,
zim_source_id=zim_source_id,
completed_at=_now())
logger.info(f"Job {job_id}: complete — {zim_filename} ({page_count} pages)")
def _handle_cancel(db, job_id):
"""Handle job cancellation: clean up Docker container and update status."""
container_name = f'recon-scraper-{job_id}'
try:
subprocess.run(['docker', 'rm', '-f', container_name],
capture_output=True, timeout=10)
except Exception:
pass
# Clean up tmp dir if it exists
output_dir = '/mnt/kiwix'
tmp_dir = os.path.join(output_dir, f'.zimit-tmp-{job_id}')
shutil.rmtree(tmp_dir, ignore_errors=True)
logger.info(f"Job {job_id}: cancelled")
db.update_scrape_job(job_id,
status='cancelled',
subprocess_pid=None,
completed_at=_now())

View file

@ -105,6 +105,25 @@ class StatusDB:
except Exception: except Exception:
pass # column already exists pass # column already exists
# Migration: add subprocess_pid column to scrape_jobs if missing
try:
conn.execute("ALTER TABLE scrape_jobs ADD COLUMN subprocess_pid INTEGER")
except Exception:
pass # column already exists
# Migration: add reject pattern columns to scrape_jobs if missing
for col, coltype in [('additional_reject_patterns', 'TEXT'), ('skip_default_patterns', 'INTEGER DEFAULT 0')]:
try:
conn.execute(f"ALTER TABLE scrape_jobs ADD COLUMN {col} {coltype}")
except Exception:
pass # column already exists
# Migration: add crawl_mode column to scrape_jobs if missing
try:
conn.execute("ALTER TABLE scrape_jobs ADD COLUMN crawl_mode TEXT")
except Exception:
pass # column already exists
# Stream B: file_operations + duplicate_review tables # Stream B: file_operations + duplicate_review tables
conn.executescript(""" conn.executescript("""
CREATE TABLE IF NOT EXISTS file_operations ( CREATE TABLE IF NOT EXISTS file_operations (
@ -142,6 +161,28 @@ class StatusDB:
resolved_at TEXT resolved_at TEXT
); );
CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status); CREATE INDEX IF NOT EXISTS idx_dupreview_status ON duplicate_review(status);
CREATE TABLE IF NOT EXISTS scrape_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
title TEXT,
language TEXT DEFAULT 'eng',
category TEXT,
status TEXT DEFAULT 'pending',
page_count INTEGER DEFAULT 0,
error_message TEXT,
zim_filename TEXT,
zim_source_id INTEGER,
workspace_path TEXT,
subprocess_pid INTEGER,
additional_reject_patterns TEXT,
skip_default_patterns INTEGER DEFAULT 0,
crawl_mode TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
started_at TEXT,
completed_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_scrape_status ON scrape_jobs(status);
""") """)
conn.commit() conn.commit()
@ -406,6 +447,50 @@ class StatusDB:
) )
conn.commit() conn.commit()
# ── Scraper Job Helpers ─────────────────────────────────────
def get_pending_scrape_job(self):
"""Fetch the oldest pending scrape job."""
conn = self._get_conn()
row = conn.execute(
"SELECT * FROM scrape_jobs WHERE status = 'pending' ORDER BY id ASC LIMIT 1"
).fetchone()
return dict(row) if row else None
def update_scrape_job(self, job_id, **kwargs):
"""Update arbitrary columns on a scrape job."""
if not kwargs:
return
conn = self._get_conn()
sets = []
vals = []
for k, v in kwargs.items():
sets.append(f"{k} = ?")
vals.append(v)
vals.append(job_id)
conn.execute(f"UPDATE scrape_jobs SET {', '.join(sets)} WHERE id = ?", vals)
conn.commit()
def get_scrape_jobs(self, status=None):
"""List scrape jobs, optionally filtered by status."""
conn = self._get_conn()
if status:
rows = conn.execute(
"SELECT * FROM scrape_jobs WHERE status = ? ORDER BY id DESC", (status,)
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM scrape_jobs ORDER BY id DESC"
).fetchall()
return [dict(r) for r in rows]
def get_scrape_job(self, job_id):
"""Get a single scrape job by ID."""
conn = self._get_conn()
row = conn.execute("SELECT * FROM scrape_jobs WHERE id = ?", (job_id,)).fetchone()
return dict(row) if row else None
# ── Stream B: File Operations ─────────────────────────────────── # ── Stream B: File Operations ───────────────────────────────────
def log_file_operation(self, doc_hash, operation, source_path, target_path, def log_file_operation(self, doc_hash, operation, source_path, target_path,

View file

@ -692,12 +692,23 @@ def cmd_service(args):
daemon=True, name='dashboard'), daemon=True, name='dashboard'),
] ]
# Scraper daemon: polls for pending scrape jobs, runs wget+zimwriterfs pipeline
scraper_cfg = config.get('scraper', {})
if scraper_cfg.get('workspace'):
from lib.scraper_runner import scraper_loop
threads.append(
threading.Thread(target=lambda: scraper_loop(stop_event, config),
daemon=True, name='scraper')
)
logger.info("=== RECON Service Starting ===") logger.info("=== RECON Service Starting ===")
logger.info(f" Dashboard: {web_host}:{web_port}") logger.info(f" Dashboard: {web_host}:{web_port}")
logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}") logger.info(f" Workers: enrich={enrich_workers}, embed={embed_workers}")
logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s") logger.info(f" Dispatcher: every {dispatch_interval}s | Filing: every {filing_interval}s")
pt_interval = config.get("peertube", {}).get("poll_interval", 1800) pt_interval = config.get("peertube", {}).get("poll_interval", 1800)
logger.info(f" PeerTube acquisition: every {pt_interval}s") logger.info(f" PeerTube acquisition: every {pt_interval}s")
if scraper_cfg.get('workspace'):
logger.info(f" Scraper: every {scraper_cfg.get('poll_interval', 300)}s")
logger.info(f" Progress: every {progress_interval}s") logger.info(f" Progress: every {progress_interval}s")
for t in threads: for t in threads:

View file

@ -331,3 +331,4 @@ tr:hover { background: var(--bg-secondary); }
.badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-detected { background: #333; color: #888; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }
.badge-processing { background: #4a3a1a; color: #f59e0b; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-processing { background: #4a3a1a; color: #f59e0b; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }
.badge-extracting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; } .badge-extracting { background: #1a3a5a; color: #0ea5e9; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }
.badge-failed { background: #4a1a1a; color: #ff4444; padding: 2px 8px; border-radius: var(--radius); font-size: 11px; }

173
static/js/scraper.js Normal file
View file

@ -0,0 +1,173 @@
/* RECON Scraper Dashboard JS */
(function() {
'use strict';
function loadJobs() {
return RECON.fetchJSON('/api/scraper/jobs').then(function(data) {
var jobs = data.jobs || [];
// Stats
var total = jobs.length;
var active = 0, complete = 0, failed = 0;
jobs.forEach(function(j) {
if (j.status === 'complete') complete++;
else if (j.status === 'failed' || j.status === 'cancelled') failed++;
else if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') active++;
});
RECON.set('sc-total', RECON.fmt(total));
RECON.set('sc-active', RECON.fmt(active));
RECON.set('sc-complete', RECON.fmt(complete));
RECON.set('sc-failed', RECON.fmt(failed));
// Show/hide Clear Failed button
var clearBtn = document.getElementById('sc-clear-btn');
if (clearBtn) clearBtn.style.display = failed > 0 ? '' : 'none';
// Table
var html = '';
jobs.forEach(function(j) {
var badge = statusBadge(j.status);
var pages = j.page_count ? RECON.fmt(j.page_count) : '\u2014';
var zim = j.zim_filename ?
'<span class="text-small">' + j.zim_filename + '</span>' : '\u2014';
var actions = '';
if (j.status === 'scraping' || j.status === 'registering' || j.status === 'pending') {
actions = '<button class="btn btn-danger" onclick="SCRAPER.cancel(' + j.id + ')">Cancel</button>';
} else if (j.status === 'failed' || j.status === 'cancelled') {
actions = '<button class="btn" onclick="SCRAPER.retry(' + j.id + ')">Retry</button> ' +
'<button class="btn btn-danger" onclick="SCRAPER.remove(' + j.id + ')">Delete</button>';
} else if (j.status === 'complete') {
actions = '<button class="btn btn-danger" onclick="SCRAPER.remove(' + j.id + ')">Delete</button>';
}
// Truncate URL for display
var displayUrl = j.url.length > 40 ? j.url.substring(0, 40) + '\u2026' : j.url;
html += '<tr>' +
'<td>' + j.id + '</td>' +
'<td><a href="' + escHtml(j.url) + '" target="_blank" title="' + escHtml(j.url) + '">' + escHtml(displayUrl) + '</a></td>' +
'<td>' + escHtml(j.title || '\u2014') + '</td>' +
'<td>' + pages + '</td>' +
'<td>' + badge + errorTooltip(j) + '</td>' +
'<td>' + zim + '</td>' +
'<td>' + actions + '</td>' +
'</tr>';
});
if (!html) html = '<tr><td colspan="7" class="text-muted">No scrape jobs</td></tr>';
RECON.setHTML('sc-table-body', html);
}).catch(function(err) {
console.error('Scraper dashboard error:', err);
});
}
function statusBadge(status) {
var map = {
'pending': '<span class="badge-detected">PENDING</span>',
'scraping': '<span class="badge-processing">SCRAPING</span>',
'registering': '<span class="badge-processing">REGISTERING</span>',
'complete': '<span class="badge-complete">COMPLETE</span>',
'failed': '<span class="badge-failed">FAILED</span>',
'cancelled': '<span class="badge-detected">CANCELLED</span>'
};
return map[status] || '<span class="badge-detected">' + (status || 'UNKNOWN').toUpperCase() + '</span>';
}
function errorTooltip(job) {
if (!job.error_message) return '';
var short = job.error_message.length > 80 ?
job.error_message.substring(0, 80) + '\u2026' : job.error_message;
return '<div class="text-small text-muted" style="max-width:200px;word-break:break-all;" title="' +
escHtml(job.error_message) + '">' + escHtml(short) + '</div>';
}
function escHtml(str) {
if (!str) return '';
return str.replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;')
.replace(/"/g, '&quot;').replace(/'/g, '&#39;');
}
function submit(e) {
e.preventDefault();
var url = document.getElementById('sf-url').value.trim();
if (!url) return false;
var body = { url: url };
var title = document.getElementById('sf-title').value.trim();
var lang = document.getElementById('sf-lang').value;
var category = document.getElementById('sf-category').value.trim();
if (title) body.title = title;
if (lang) body.language = lang;
if (category) body.category = category;
var btn = document.getElementById('sf-submit-btn');
var feedback = document.getElementById('sf-feedback');
btn.disabled = true;
btn.textContent = 'Submitting...';
RECON.postJSON('/api/scraper/submit', body).then(function(data) {
btn.disabled = false;
btn.textContent = 'Submit';
if (data.ok) {
feedback.style.display = 'block';
feedback.style.color = '#00ff41';
feedback.textContent = 'Job #' + data.job_id + ' submitted successfully';
document.getElementById('sf-url').value = '';
document.getElementById('sf-title').value = '';
document.getElementById('sf-category').value = '';
setTimeout(function() { feedback.style.display = 'none'; }, 4000);
loadJobs();
} else {
feedback.style.display = 'block';
feedback.style.color = '#ff4444';
feedback.textContent = 'Error: ' + (data.error || 'Unknown error');
}
}).catch(function(err) {
btn.disabled = false;
btn.textContent = 'Submit';
feedback.style.display = 'block';
feedback.style.color = '#ff4444';
feedback.textContent = 'Network error: ' + err.message;
});
return false;
}
function cancel(jobId) {
if (!confirm('Cancel job #' + jobId + '?')) return;
RECON.postJSON('/api/scraper/cancel/' + jobId).then(function(data) {
if (data.ok) loadJobs();
else alert('Error: ' + (data.error || 'Unknown'));
});
}
function retry(jobId) {
RECON.postJSON('/api/scraper/retry/' + jobId).then(function(data) {
if (data.ok) loadJobs();
else alert('Error: ' + (data.error || 'Unknown'));
});
}
function remove(jobId) {
if (!confirm('Delete job #' + jobId + '? This cannot be undone.')) return;
RECON.postJSON('/api/scraper/delete/' + jobId).then(function(data) {
if (data.ok) loadJobs();
else alert('Error: ' + (data.error || 'Unknown'));
});
}
function clearFailed() {
if (!confirm('Delete all failed and cancelled jobs?')) return;
RECON.postJSON('/api/scraper/clear-failed').then(function(data) {
if (data.ok) loadJobs();
else alert('Error: ' + (data.error || 'Unknown'));
});
}
// Expose for inline onclick
window.SCRAPER = { submit: submit, cancel: cancel, retry: retry, remove: remove, clearFailed: clearFailed };
document.addEventListener('DOMContentLoaded', function() {
RECON.startRefresh(loadJobs, 10000);
});
})();

View file

@ -0,0 +1,84 @@
{% extends "base.html" %}
{% block content %}
<div id="scraper-page">
<!-- Submit Form -->
<div class="panel">
<h3 class="section-title" style="margin-bottom:12px;">Submit Scrape Job</h3>
<form id="scraper-form" onsubmit="return SCRAPER.submit(event)">
<div style="display:grid;grid-template-columns:1fr 1fr;gap:12px;margin-bottom:12px;">
<div>
<label class="text-small text-muted" style="display:block;margin-bottom:4px;">URL *</label>
<input type="url" id="sf-url" placeholder="https://example.com/" required
style="width:100%;padding:8px 12px;background:var(--bg-secondary);border:1px solid var(--border);color:var(--text-primary);border-radius:var(--radius);font-family:inherit;font-size:13px;">
</div>
<div>
<label class="text-small text-muted" style="display:block;margin-bottom:4px;">Title</label>
<input type="text" id="sf-title" placeholder="Optional display title"
style="width:100%;padding:8px 12px;background:var(--bg-secondary);border:1px solid var(--border);color:var(--text-primary);border-radius:var(--radius);font-family:inherit;font-size:13px;">
</div>
</div>
<div style="display:grid;grid-template-columns:1fr 1fr auto;gap:12px;align-items:end;">
<div>
<label class="text-small text-muted" style="display:block;margin-bottom:4px;">Language</label>
<select id="sf-lang"
style="width:100%;padding:8px 12px;background:var(--bg-secondary);border:1px solid var(--border);color:var(--text-primary);border-radius:var(--radius);font-family:inherit;font-size:13px;">
<option value="eng" selected>English</option>
<option value="spa">Spanish</option>
<option value="fra">French</option>
<option value="deu">German</option>
<option value="por">Portuguese</option>
<option value="rus">Russian</option>
<option value="jpn">Japanese</option>
<option value="zho">Chinese</option>
<option value="mul">Multilingual</option>
</select>
</div>
<div>
<label class="text-small text-muted" style="display:block;margin-bottom:4px;">Category</label>
<input type="text" id="sf-category" placeholder="Optional"
style="width:100%;padding:8px 12px;background:var(--bg-secondary);border:1px solid var(--border);color:var(--text-primary);border-radius:var(--radius);font-family:inherit;font-size:13px;">
</div>
<div>
<button type="submit" class="btn" id="sf-submit-btn">Submit</button>
</div>
</div>
<div id="sf-feedback" style="margin-top:8px;font-size:12px;display:none;"></div>
</form>
</div>
<!-- Stats row -->
<div class="stat-grid" style="grid-template-columns:repeat(4, 1fr);">
<div class="stat-card"><div class="label">Total Jobs</div><div class="value" id="sc-total">&mdash;</div></div>
<div class="stat-card"><div class="label">Active</div><div class="value" id="sc-active">&mdash;</div></div>
<div class="stat-card"><div class="label">Complete</div><div class="value" id="sc-complete">&mdash;</div></div>
<div class="stat-card"><div class="label">Failed</div><div class="value" id="sc-failed">&mdash;</div></div>
</div>
<!-- Jobs Table -->
<div class="panel">
<div style="display:flex;justify-content:space-between;align-items:center;margin-bottom:12px;">
<h3 class="section-title" style="margin:0;">Scrape Jobs</h3>
<button class="btn btn-danger" onclick="SCRAPER.clearFailed()" id="sc-clear-btn" style="display:none;">Clear Failed</button>
</div>
<table class="data-table" id="sc-table">
<thead>
<tr>
<th>ID</th>
<th>URL</th>
<th>Title</th>
<th>Pages</th>
<th>Status</th>
<th>ZIM</th>
<th></th>
</tr>
</thead>
<tbody id="sc-table-body">
<tr><td colspan="7" class="text-muted">Loading...</td></tr>
</tbody>
</table>
</div>
</div>
{% endblock %}
{% block scripts %}
<script src="/static/js/scraper.js"></script>
{% endblock %}