From 5d618da2a4e920ab9fc8bfef049ca37f3579729a Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 30 Apr 2026 21:37:51 +0000 Subject: [PATCH] Add wiki_index_wave3.py with parallel resolve Wave 3 pipeline for processing 253K+ place types with NO wiki/wikidata tags (US+CA only). Uses Gemini to resolve Wikipedia titles. Key feature: resolve_wikipedia_titles() now uses ThreadPoolExecutor with 5 parallel workers, improving throughput from ~14/min to ~75/min. Co-Authored-By: Claude Opus 4.5 --- scripts/wiki_index_wave3.py | 932 ++++++++++++++++++++++++++++++++++++ 1 file changed, 932 insertions(+) create mode 100755 scripts/wiki_index_wave3.py diff --git a/scripts/wiki_index_wave3.py b/scripts/wiki_index_wave3.py new file mode 100755 index 0000000..c3abc70 --- /dev/null +++ b/scripts/wiki_index_wave3.py @@ -0,0 +1,932 @@ +#!/usr/bin/env python3 +""" +Wiki Location Index Pipeline — Wave 3 +Processes high-coverage place types with NO wiki/wikidata tags (US+CA only). +Uses Gemini to resolve Wikipedia titles. + +CRITICAL: Every Gemini call commits immediately. DB is the checkpoint. + +Usage: + python wiki_index_wave3.py extract # Extract from JSONL + python wiki_index_wave3.py resolve # Resolve Wikipedia titles via Gemini + python wiki_index_wave3.py validate # Validate titles against ZIM + python wiki_index_wave3.py wikivoyage # Resolve+validate Wikivoyage titles + python wiki_index_wave3.py summarize # Generate summaries with Gemini +""" + +import os +import sys +import json +import sqlite3 +import logging +import time +import resource +import threading +import signal +from datetime import datetime +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +import zstandard as zstd +from bs4 import BeautifulSoup +from google import genai +from google.genai import types +import requests + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +JSONL_PATH = "/mnt/pi-nas/nav/photon-dump-planet.jsonl.zst" +DB_PATH = "/mnt/pi-nas/nav/wiki-index/data/wiki_index.db" +LOG_DIR = "/mnt/pi-nas/nav/wiki-index/logs" +GEMINI_JSONL = f"{LOG_DIR}/wave3_gemini.jsonl" + +# US + Canada line range +COMBINED_START = 53694616 +COMBINED_END = 175406527 + +# ZIM endpoints (VM 1130) +WIKIPEDIA_INTERNAL = "http://192.168.1.130:8430/wikipedia_en_all_maxi_2026-02" +WIKIVOYAGE_INTERNAL = "http://192.168.1.130:8430/wikivoyage_en_all_maxi_2026-03" + +# Gemini +GEMINI_MODEL = "gemini-2.5-flash" +RESOLVE_WORKERS = 5 + +# Memory limit +MAX_RSS_MB = 10240 # 10GB + +# Circuit breaker settings +CIRCUIT_BREAKER_CONSECUTIVE_429 = 10 +CIRCUIT_BREAKER_PAUSE_1 = 300 # 5 minutes +CIRCUIT_BREAKER_PAUSE_2 = 600 # 10 minutes +CIRCUIT_BREAKER_MAX_PAUSES = 3 + +# Wave 3 type filter +WAVE3_TYPES = { + ("place", "city"), ("place", "town"), ("place", "village"), + ("place", "hamlet"), ("place", "borough"), ("place", "suburb"), + ("boundary", "administrative"), + ("natural", "peak"), ("natural", "volcano"), ("natural", "bay"), + ("natural", "cape"), ("natural", "glacier"), + ("leisure", "nature_reserve"), ("boundary", "national_park"), + ("tourism", "museum"), ("amenity", "university"), ("aeroway", "aerodrome"), +} + +TRAVEL_TYPES = { + ("place", "city"), ("place", "town"), ("place", "village"), + ("boundary", "national_park"), ("leisure", "nature_reserve"), +} + +# Graceful shutdown flag +_shutdown_requested = False +_current_place_id = None + +def signal_handler(signum, frame): + global _shutdown_requested + _shutdown_requested = True + log.warning(f"Shutdown requested (signal {signum}), finishing current place...") + +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + +# Thread-safe HTTP session +_http_session = None +_http_lock = threading.Lock() + +def get_http_session(): + global _http_session + if _http_session is None: + with _http_lock: + if _http_session is None: + _http_session = requests.Session() + _http_session.headers.update({"User-Agent": "Echo6WikiIndex/3.0"}) + return _http_session + +# ============================================================================= +# LOGGING +# ============================================================================= + +def setup_logging(): + Path(LOG_DIR).mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = f"{LOG_DIR}/wave3_{ts}.log" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.FileHandler(log_file), logging.StreamHandler()] + ) + return logging.getLogger(__name__) + +log = setup_logging() + +# ============================================================================= +# GEMINI AUDIT LOG +# ============================================================================= + +_gemini_log_lock = threading.Lock() + +def log_gemini_response(place_id, stage, prompt, response, error=None): + """Append every Gemini call to audit log.""" + entry = { + "timestamp": datetime.utcnow().isoformat(), + "place_id": place_id, + "stage": stage, + "prompt": prompt[:500], # Truncate long prompts + "response": response, + "error": str(error) if error else None, + } + with _gemini_log_lock: + with open(GEMINI_JSONL, "a") as f: + f.write(json.dumps(entry) + "\n") + +# ============================================================================= +# MEMORY MONITORING +# ============================================================================= + +def check_memory(context=""): + rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + rss_mb = rss_kb / 1024 if sys.platform != 'darwin' else rss_kb / 1024 / 1024 + if rss_mb > MAX_RSS_MB: + log.error(f"RSS {rss_mb:.0f}MB exceeds {MAX_RSS_MB}MB at {context}, aborting") + sys.exit(1) + return rss_mb + +# ============================================================================= +# DATABASE +# ============================================================================= + +def get_db(): + conn = sqlite3.connect(DB_PATH) + # Ensure wiki_failures table exists + conn.execute(""" + CREATE TABLE IF NOT EXISTS wiki_failures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + place_id INTEGER, + stage TEXT, + error_type TEXT, + error_message TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.commit() + return conn + +def log_failure(conn, place_id, stage, error): + """Log a failure to wiki_failures table.""" + error_type = type(error).__name__ + error_msg = str(error)[:1000] + conn.execute( + "INSERT INTO wiki_failures (place_id, stage, error_type, error_message) VALUES (?, ?, ?, ?)", + (place_id, stage, error_type, error_msg) + ) + conn.commit() + +def get_existing_place_keys(conn): + c = conn.cursor() + c.execute(""" + SELECT place_name, osm_key, osm_value, + COALESCE(county,''), COALESCE(state,''), country_code + FROM wiki_places + """) + return set(tuple(row) for row in c.fetchall()) + +# ============================================================================= +# CIRCUIT BREAKER +# ============================================================================= + +class CircuitBreaker: + def __init__(self): + self.consecutive_429s = 0 + self.pause_count = 0 + self.lock = threading.Lock() + + def record_success(self): + with self.lock: + self.consecutive_429s = 0 + + def record_429(self): + """Returns True if we should abort, False if we can continue.""" + with self.lock: + self.consecutive_429s += 1 + + if self.consecutive_429s >= CIRCUIT_BREAKER_CONSECUTIVE_429: + self.pause_count += 1 + + if self.pause_count > CIRCUIT_BREAKER_MAX_PAUSES: + log.error(f"Circuit breaker: {CIRCUIT_BREAKER_MAX_PAUSES} pauses failed, aborting gracefully") + return True # Signal to abort + + pause_time = CIRCUIT_BREAKER_PAUSE_1 if self.pause_count == 1 else CIRCUIT_BREAKER_PAUSE_2 + log.warning(f"Circuit breaker: {self.consecutive_429s} consecutive 429s, pausing {pause_time}s (pause {self.pause_count}/{CIRCUIT_BREAKER_MAX_PAUSES})") + time.sleep(pause_time) + self.consecutive_429s = 0 + + return False # Continue + +circuit_breaker = CircuitBreaker() + +# ============================================================================= +# RESPONSE VALIDATION +# ============================================================================= + +def validate_gemini_response(text): + """Validate response is not empty, error, or HTML.""" + if not text: + return False, "empty response" + if text.startswith(" 8: + return False + except ValueError: + return False + return True + +def extract_from_jsonl(): + """Extract Wave 3 places from JSONL.""" + conn = get_db() + c = conn.cursor() + + log.info("Loading existing place keys...") + existing = get_existing_place_keys(conn) + log.info(f"Found {len(existing):,} existing places") + + seen = set() + inserted = 0 + skipped_type = 0 + skipped_wiki = 0 + skipped_no_name = 0 + skipped_dupe = 0 + skipped_existing = 0 + + log.info(f"Wave 3 extraction: lines {COMBINED_START:,} to {COMBINED_END:,}") + log.info(f"Type filter: {len(WAVE3_TYPES)} types") + + with open(JSONL_PATH, 'rb') as fh: + dctx = zstd.ZstdDecompressor() + with dctx.stream_reader(fh) as reader: + import io + text_reader = io.TextIOWrapper(reader, encoding='utf-8') + + line_num = 0 + for line in text_reader: + line_num += 1 + + if line_num < COMBINED_START: + if line_num % 10_000_000 == 0: + log.info(f" Seeking... line {line_num:,}") + continue + + if line_num > COMBINED_END: + break + + if line_num % 1_000_000 == 0: + rss = check_memory(f"line {line_num}") + log.info(f" Line {line_num:,}, inserted {inserted:,}, RSS {rss:.0f}MB") + conn.commit() + + try: + record = json.loads(line) + content = record.get("content", [{}])[0] + + country_code = content.get("country_code", "") + if country_code not in ("us", "ca"): + continue + + osm_key = content.get("osm_key", "") + osm_value = content.get("osm_value", "") + extra = content.get("extra", {}) + + if not should_include_wave3(osm_key, osm_value, extra): + if extra.get("wikipedia") or extra.get("wikidata"): + skipped_wiki += 1 + else: + skipped_type += 1 + continue + + name_obj = content.get("name", {}) + name = name_obj.get("name:en") or name_obj.get("name") + if not name: + skipped_no_name += 1 + continue + + address = content.get("address", {}) + state = address.get("state") or address.get("state:en") + county = address.get("county") or address.get("county:en") + + dedup_key = (name, osm_key, osm_value, county or "", state or "", country_code) + + if dedup_key in existing: + skipped_existing += 1 + continue + + if dedup_key in seen: + skipped_dupe += 1 + continue + seen.add(dedup_key) + + osm_id = f"{content.get('object_type', '')}{content.get('object_id', '')}" + importance = content.get("importance") + extra_json = json.dumps({k: v for k, v in extra.items() + if k not in ("wikipedia", "wikidata")}) or None + + c.execute(""" + INSERT OR IGNORE INTO wiki_places + (place_name, osm_key, osm_value, county, state, country_code, + osm_id, importance, extra_json, source) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'gemini') + """, (name, osm_key, osm_value, county, state, country_code, + osm_id, importance, extra_json)) + + if c.rowcount > 0: + inserted += 1 + + if inserted % 10000 == 0: + conn.commit() + + except json.JSONDecodeError: + continue + except Exception as e: + log.error(f"Error line {line_num}: {e}") + continue + + conn.commit() + conn.close() + + log.info(f"Wave 3 extraction complete:") + log.info(f" Inserted: {inserted:,}") + log.info(f" Skipped (has wiki/wikidata): {skipped_wiki:,}") + log.info(f" Skipped (type filter): {skipped_type:,}") + log.info(f" Skipped (no name): {skipped_no_name:,}") + log.info(f" Skipped (duplicate): {skipped_dupe:,}") + log.info(f" Skipped (existing): {skipped_existing:,}") + + return inserted + +# ============================================================================= +# STAGE 2: RESOLVE WIKIPEDIA TITLES VIA GEMINI +# ============================================================================= + +def resolve_worker(args): + """Worker function for parallel Wikipedia title resolution.""" + pid, name, state, country_code, client, circuit_breaker, db_lock = args + + country = "United States" if country_code == "us" else "Canada" + location = f"{state}, {country}" if state else country + prompt = f"What is the exact Wikipedia article title for {name}, {location}? Reply with just the article title, or NONE if no article exists." + + result = { + "place_id": pid, + "success": False, + "title": None, + "is_none": False, + "error": None, + "is_rate_limit": False + } + + try: + response = client.models.generate_content( + model=GEMINI_MODEL, + contents=prompt, + config=types.GenerateContentConfig(max_output_tokens=100, temperature=0.1) + ) + circuit_breaker.record_success() + + text = response.text.strip() if response.text else "" + log_gemini_response(pid, "resolve", prompt, text) + + # Validate response + valid, reason = validate_gemini_response(text) + if not valid: + result["error"] = f"Invalid response: {reason}" + return result + + # Clean up and store + if text.upper() == "NONE": + result["success"] = True + result["is_none"] = True + result["title"] = None + else: + result["success"] = True + result["title"] = text.replace("**", "").strip().strip('"') + + return result + + except Exception as e: + err_str = str(e).lower() + log_gemini_response(pid, "resolve", prompt, None, error=e) + + if "429" in err_str or "quota" in err_str or "rate" in err_str: + result["is_rate_limit"] = True + should_abort = circuit_breaker.record_429() + if should_abort: + result["error"] = "circuit_breaker_abort" + return result + + result["error"] = str(e) + return result + + +def resolve_wikipedia_titles(): + """Resolve Wikipedia titles for Wave 3 places via Gemini (parallel).""" + global _shutdown_requested, _current_place_id + + api_key = os.environ.get("GEMINI_API_KEY") + if not api_key: + env_path = Path("/home/zvx/projects/wiki-index/.env") + if env_path.exists(): + for line in env_path.read_text().splitlines(): + if line.startswith("GEMINI_API_KEY="): + api_key = line.split("=", 1)[1].strip().strip('"') + break + if not api_key: + log.error("GEMINI_API_KEY not set") + return + + client = genai.Client(api_key=api_key) + conn = get_db() + c = conn.cursor() + db_lock = threading.Lock() + + # STARTUP VERIFICATION: count already processed + c.execute("SELECT COUNT(*) FROM wiki_places WHERE source = 'gemini' AND wikipedia_title IS NOT NULL") + already_done = c.fetchone()[0] + + # Get unprocessed rows (DB is the checkpoint) + c.execute(""" + SELECT id, place_name, state, country_code FROM wiki_places + WHERE source = 'gemini' AND wikipedia_title IS NULL + ORDER BY id + """) + rows = c.fetchall() + + total = len(rows) + log.info(f"[RESOLVE] Resuming: {already_done:,} already resolved, {total:,} remaining") + log.info(f"[RESOLVE] Using {RESOLVE_WORKERS} parallel workers") + + if not rows: + log.info("[RESOLVE] No places need title resolution") + return + + resolved = 0 + no_article = 0 + errors = 0 + processed = 0 + start_time = time.time() + abort_requested = False + + # Prepare work items + work_items = [ + (pid, name, state, country_code, client, circuit_breaker, db_lock) + for pid, name, state, country_code in rows + ] + + with ThreadPoolExecutor(max_workers=RESOLVE_WORKERS) as executor: + futures = { + executor.submit(resolve_worker, item): item[0] + for item in work_items + } + + for future in as_completed(futures): + if _shutdown_requested or abort_requested: + log.info(f"[RESOLVE] Shutdown requested, cancelling remaining futures...") + executor.shutdown(wait=False, cancel_futures=True) + break + + result = future.result() + processed += 1 + pid = result["place_id"] + _current_place_id = pid + + # Progress logging every 100 + if processed % 100 == 0: + elapsed = time.time() - start_time + rate = processed / (elapsed / 60) if elapsed > 0 else 0 + pct = (processed / total) * 100 + log.info(f"[RESOLVE] Progress: {processed:,}/{total:,} ({pct:.1f}%) | Resolved: {resolved} | None: {no_article} | Errors: {errors} | Rate: {rate:.1f}/min") + check_memory(f"resolve {processed}") + + if result["success"]: + # Thread-safe DB write + with db_lock: + c.execute("UPDATE wiki_places SET wikipedia_title = ? WHERE id = ?", (result["title"], pid)) + conn.commit() + + if result["is_none"]: + no_article += 1 + else: + resolved += 1 + else: + if result["error"] == "circuit_breaker_abort": + log.error(f"[RESOLVE] Circuit breaker abort at place_id {pid}") + abort_requested = True + else: + # Log failure + with db_lock: + log_failure(conn, pid, "resolve", Exception(result["error"])) + errors += 1 + + conn.close() + log.info(f"[RESOLVE] Complete: resolved={resolved:,}, none={no_article:,}, errors={errors:,}") + + +# ============================================================================= +# STAGE 3: VALIDATE AGAINST ZIM +# ============================================================================= + +def validate_wikipedia_title(title): + if not title: + return False + encoded = requests.utils.quote(title.replace(" ", "_")) + url = f"{WIKIPEDIA_INTERNAL}/A/{encoded}" + try: + resp = get_http_session().head(url, timeout=5) + return resp.status_code == 200 + except: + return False + +def validate_titles(): + """Validate resolved Wikipedia titles against ZIM.""" + conn = get_db() + c = conn.cursor() + + # STARTUP VERIFICATION + c.execute("SELECT COUNT(*) FROM wiki_places WHERE source = 'gemini' AND wikipedia_exists IS NOT NULL") + already_done = c.fetchone()[0] + + c.execute(""" + SELECT id, wikipedia_title FROM wiki_places + WHERE source = 'gemini' AND wikipedia_title IS NOT NULL AND wikipedia_exists IS NULL + """) + rows = c.fetchall() + + total = len(rows) + log.info(f"[VALIDATE] Resuming: {already_done:,} already validated, {total:,} remaining") + + if not rows: + log.info("[VALIDATE] No titles need validation") + return + + exists_count = 0 + missing_count = 0 + start_time = time.time() + + for i, (pid, title) in enumerate(rows): + if _shutdown_requested: + log.info(f"[VALIDATE] Shutting down at place_id {pid}") + break + + if i > 0 and i % 100 == 0: + elapsed = time.time() - start_time + rate = i / (elapsed / 60) if elapsed > 0 else 0 + pct = (i / total) * 100 + log.info(f"[VALIDATE] Progress: {i:,}/{total:,} ({pct:.1f}%) | Exists: {exists_count} | Missing: {missing_count} | Rate: {rate:.1f}/min") + + exists = 1 if validate_wikipedia_title(title) else 0 + c.execute("UPDATE wiki_places SET wikipedia_exists = ? WHERE id = ?", (exists, pid)) + conn.commit() + + if exists: + exists_count += 1 + else: + missing_count += 1 + + conn.close() + log.info(f"[VALIDATE] Complete: exists={exists_count:,}, missing={missing_count:,}") + +# ============================================================================= +# STAGE 4: WIKIVOYAGE RESOLUTION +# ============================================================================= + +def validate_wikivoyage_title(title): + if not title: + return False + encoded = requests.utils.quote(title.replace(" ", "_")) + url = f"{WIKIVOYAGE_INTERNAL}/A/{encoded}" + try: + resp = get_http_session().head(url, timeout=5) + return resp.status_code == 200 + except: + return False + +def resolve_wikivoyage(): + """Resolve and validate Wikivoyage titles for travel-relevant types.""" + global _shutdown_requested, _current_place_id + + api_key = os.environ.get("GEMINI_API_KEY") + if not api_key: + env_path = Path("/home/zvx/projects/wiki-index/.env") + if env_path.exists(): + for line in env_path.read_text().splitlines(): + if line.startswith("GEMINI_API_KEY="): + api_key = line.split("=", 1)[1].strip().strip('"') + break + if not api_key: + log.error("GEMINI_API_KEY not set") + return + + client = genai.Client(api_key=api_key) + conn = get_db() + c = conn.cursor() + + # STARTUP VERIFICATION + c.execute("SELECT COUNT(*) FROM wiki_places WHERE source = 'gemini' AND wikivoyage_title IS NOT NULL") + already_done = c.fetchone()[0] + + type_conditions = " OR ".join([f"(osm_key = ? AND osm_value = ?)" for _ in TRAVEL_TYPES]) + params = [p for t in TRAVEL_TYPES for p in t] + + c.execute(f""" + SELECT id, place_name, state, country_code FROM wiki_places + WHERE source = 'gemini' + AND wikipedia_exists = 1 + AND wikivoyage_title IS NULL + AND ({type_conditions}) + ORDER BY id + """, params) + rows = c.fetchall() + + total = len(rows) + log.info(f"[WIKIVOYAGE] Resuming: {already_done:,} already resolved, {total:,} remaining") + + if not rows: + log.info("[WIKIVOYAGE] No places need Wikivoyage resolution") + return + + resolved = 0 + validated = 0 + errors = 0 + start_time = time.time() + + for i, (pid, name, state, country_code) in enumerate(rows): + if _shutdown_requested: + log.info(f"[WIKIVOYAGE] Shutting down at place_id {pid}") + break + + _current_place_id = pid + + if i > 0 and i % 100 == 0: + elapsed = time.time() - start_time + rate = i / (elapsed / 60) if elapsed > 0 else 0 + pct = (i / total) * 100 + log.info(f"[WIKIVOYAGE] Progress: {i:,}/{total:,} ({pct:.1f}%) | Resolved: {resolved} | Validated: {validated} | Errors: {errors} | Rate: {rate:.1f}/min") + + country = "United States" if country_code == "us" else "Canada" + location = f"{state}, {country}" if state else country + prompt = f"What is the exact Wikivoyage article title for {name}, {location}? Reply with just the title, or NONE if no article exists." + + try: + response = client.models.generate_content( + model=GEMINI_MODEL, + contents=prompt, + config=types.GenerateContentConfig(max_output_tokens=100, temperature=0.1) + ) + circuit_breaker.record_success() + + text = response.text.strip() if response.text else "" + log_gemini_response(pid, "wikivoyage", prompt, text) + + valid, reason = validate_gemini_response(text) + if not valid: + log_failure(conn, pid, "wikivoyage", Exception(f"Invalid response: {reason}")) + c.execute("UPDATE wiki_places SET wikivoyage_exists = 0 WHERE id = ?", (pid,)) + conn.commit() + errors += 1 + continue + + if text.upper() == "NONE": + c.execute("UPDATE wiki_places SET wikivoyage_exists = 0 WHERE id = ?", (pid,)) + else: + title = text.replace("**", "").strip().strip('"') + exists = 1 if validate_wikivoyage_title(title) else 0 + c.execute("UPDATE wiki_places SET wikivoyage_title = ?, wikivoyage_exists = ? WHERE id = ?", + (title, exists, pid)) + resolved += 1 + if exists: + validated += 1 + + conn.commit() + + except Exception as e: + err_str = str(e).lower() + log_gemini_response(pid, "wikivoyage", prompt, None, error=e) + + if "429" in err_str or "quota" in err_str: + should_abort = circuit_breaker.record_429() + if should_abort: + log.error(f"[WIKIVOYAGE] Circuit breaker abort at place_id {pid}") + break + + log_failure(conn, pid, "wikivoyage", e) + errors += 1 + continue + + time.sleep(0.1) + + conn.close() + log.info(f"[WIKIVOYAGE] Complete: resolved={resolved:,}, validated={validated:,}, errors={errors:,}") + +# ============================================================================= +# STAGE 5: SUMMARIZE +# ============================================================================= + +def fetch_wiki_content(title, is_wikivoyage=False): + base = WIKIVOYAGE_INTERNAL if is_wikivoyage else WIKIPEDIA_INTERNAL + encoded = requests.utils.quote(title.replace(" ", "_")) + url = f"{base}/A/{encoded}" + try: + resp = get_http_session().get(url, timeout=10) + if resp.status_code != 200: + return None + soup = BeautifulSoup(resp.text, 'html.parser') + for tag in soup(['script', 'style', 'nav', 'footer', 'header']): + tag.decompose() + text = soup.get_text(separator=' ', strip=True) + return text[:15000] + except: + return None + +def summarize(): + """Generate summaries for validated Wave 3 places.""" + global _shutdown_requested, _current_place_id + + api_key = os.environ.get("GEMINI_API_KEY") + if not api_key: + env_path = Path("/home/zvx/projects/wiki-index/.env") + if env_path.exists(): + for line in env_path.read_text().splitlines(): + if line.startswith("GEMINI_API_KEY="): + api_key = line.split("=", 1)[1].strip().strip('"') + break + if not api_key: + log.error("GEMINI_API_KEY not set") + return + + client = genai.Client(api_key=api_key) + conn = get_db() + c = conn.cursor() + + # STARTUP VERIFICATION + c.execute("SELECT COUNT(*) FROM wiki_places WHERE source = 'gemini' AND summary IS NOT NULL") + already_done = c.fetchone()[0] + + c.execute(""" + SELECT id, place_name, wikipedia_title, wikivoyage_title, + wikipedia_exists, wikivoyage_exists + FROM wiki_places + WHERE source = 'gemini' + AND (wikipedia_exists = 1 OR wikivoyage_exists = 1) + AND summary IS NULL + ORDER BY id + """) + rows = c.fetchall() + + total = len(rows) + log.info(f"[SUMMARIZE] Resuming: {already_done:,} already summarized, {total:,} remaining") + + if not rows: + log.info("[SUMMARIZE] No places need summaries") + return + + success = 0 + errors = 0 + start_time = time.time() + + for i, (pid, name, wp_title, wv_title, wp_exists, wv_exists) in enumerate(rows): + if _shutdown_requested: + log.info(f"[SUMMARIZE] Shutting down at place_id {pid}") + break + + _current_place_id = pid + + if i > 0 and i % 100 == 0: + elapsed = time.time() - start_time + rate = i / (elapsed / 60) if elapsed > 0 else 0 + pct = (i / total) * 100 + log.info(f"[SUMMARIZE] Progress: {i:,}/{total:,} ({pct:.1f}%) | Success: {success} | Errors: {errors} | Rate: {rate:.1f}/min") + check_memory(f"summarize {i}") + + # Fetch content + content = None + if wp_exists and wp_title: + content = fetch_wiki_content(wp_title, is_wikivoyage=False) + if not content and wv_exists and wv_title: + content = fetch_wiki_content(wv_title, is_wikivoyage=True) + + if not content: + log_failure(conn, pid, "summarize", Exception("No content fetched")) + errors += 1 + continue + + prompt = f"""Summarize this Wikipedia article about {name} in 2-3 sentences for a map application. Focus on what makes it notable or interesting to visitors. Be concise. + +Article: +{content[:8000]}""" + + try: + response = client.models.generate_content( + model=GEMINI_MODEL, + contents=prompt, + config=types.GenerateContentConfig(max_output_tokens=500, temperature=0.3) + ) + circuit_breaker.record_success() + + text = response.text.strip() if response.text else "" + log_gemini_response(pid, "summarize", f"Summarize {name}", text) + + valid, reason = validate_gemini_response(text) + if not valid: + log_failure(conn, pid, "summarize", Exception(f"Invalid response: {reason}")) + errors += 1 + continue + + # COMMIT BEFORE NEXT + c.execute("UPDATE wiki_places SET summary = ? WHERE id = ?", (text, pid)) + conn.commit() + success += 1 + + except Exception as e: + err_str = str(e).lower() + log_gemini_response(pid, "summarize", f"Summarize {name}", None, error=e) + + if "429" in err_str or "quota" in err_str: + should_abort = circuit_breaker.record_429() + if should_abort: + log.error(f"[SUMMARIZE] Circuit breaker abort at place_id {pid}") + break + + log_failure(conn, pid, "summarize", e) + errors += 1 + continue + + time.sleep(0.1) + + conn.close() + log.info(f"[SUMMARIZE] Complete: success={success:,}, errors={errors:,}") + +# ============================================================================= +# MAIN +# ============================================================================= + +def main(): + if len(sys.argv) < 2: + print(__doc__) + sys.exit(1) + + cmd = sys.argv[1] + + if cmd == "extract": + count = extract_from_jsonl() + log.info(f"\n*** EXTRACT COMPLETE: {count:,} places ***") + log.info("Review count before proceeding to resolve stage.") + + elif cmd == "resolve": + resolve_wikipedia_titles() + + elif cmd == "validate": + validate_titles() + + elif cmd == "wikivoyage": + resolve_wikivoyage() + + elif cmd == "summarize": + summarize() + + elif cmd == "all": + count = extract_from_jsonl() + log.info(f"Extracted {count:,} places") + if not _shutdown_requested: + resolve_wikipedia_titles() + if not _shutdown_requested: + validate_titles() + if not _shutdown_requested: + resolve_wikivoyage() + if not _shutdown_requested: + summarize() + + else: + print(f"Unknown command: {cmd}") + print(__doc__) + sys.exit(1) + +if __name__ == "__main__": + main()