From 6be1e4cfa66eb3b2c0f9494546e992ba4c2e233b Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 29 Apr 2026 19:50:43 +0000 Subject: [PATCH] feat(wiki-index): add wave 2 pipeline for wikidata-only places Processes places with wikidata but no wikipedia tag: - Batch resolve Q-IDs via Wikidata API (50/request) - Validate resolved titles against local ZIM - Generate summaries with Gemini API (3-4 sentences) - Circuit breaker: 50 consecutive 429s triggers 5min pause - Revalidate any remaining unvalidated entries Filters for US+CA places, skips existing wave 1 entries. Co-Authored-By: Claude Opus 4.5 --- scripts/wiki_index_wave2.py | 1128 +++++++++++++++++++++++++++++++++++ 1 file changed, 1128 insertions(+) create mode 100755 scripts/wiki_index_wave2.py diff --git a/scripts/wiki_index_wave2.py b/scripts/wiki_index_wave2.py new file mode 100755 index 0000000..4de432a --- /dev/null +++ b/scripts/wiki_index_wave2.py @@ -0,0 +1,1128 @@ +#!/usr/bin/env python3 +""" +Wiki Location Index Pipeline — Wave 2 +Processes places with extra.wikidata but NO extra.wikipedia tags from Photon JSONL dump. +Resolves Wikipedia titles via Wikidata API. + +Usage: + python wiki_index_wave2.py extract # Extract from JSONL (wikidata only) + python wiki_index_wave2.py resolve # Resolve Wikipedia titles via Wikidata + python wiki_index_wave2.py validate # Validate titles against ZIM + python wiki_index_wave2.py summarize # Generate summaries with Gemini + python wiki_index_wave2.py summarize --workers=10 # Use 10 concurrent workers + python wiki_index_wave2.py summarize --dry-run # Process only 5 places (test run) + python wiki_index_wave2.py revalidate # Re-validate corrected titles + python wiki_index_wave2.py all # Run all stages +""" + +import os +import sys +import json +import sqlite3 +import logging +import time +import resource +import threading +from datetime import datetime +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +import zstandard as zstd +from bs4 import BeautifulSoup +from google import genai +from google.genai import types +import requests + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +# Paths +JSONL_PATH = "/mnt/pi-nas/nav/photon-dump-planet.jsonl.zst" +DB_PATH = "/mnt/pi-nas/nav/wiki-index/data/wiki_index.db" +LOG_DIR = "/mnt/pi-nas/nav/wiki-index/logs" +CHECKPOINT_DIR = "/mnt/pi-nas/nav/wiki-index/data" +GEMINI_LOG = f"{LOG_DIR}/gemini_responses_wave2.jsonl" + +# Checkpoint files (wave 2 specific) +EXTRACT_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_extract_checkpoint.txt" +RESOLVE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_resolve_checkpoint.txt" +VALIDATE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_validate_checkpoint.txt" +SUMMARIZE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_summarize_checkpoint.txt" + +# Single-pass line range for US + Canada +COMBINED_START = 53694616 +COMBINED_END = 175406527 + +# ZIM endpoints +WIKIPEDIA_INTERNAL = "http://192.168.1.130:8430/wikipedia_en_all_maxi_2026-02" +WIKIVOYAGE_INTERNAL = "http://192.168.1.130:8430/wikivoyage_en_all_maxi_2026-03" + +# Wikidata API +WIKIDATA_API = "https://www.wikidata.org/w/api.php" +WIKIDATA_BATCH_SIZE = 50 + +# Gemini +GEMINI_MODEL = "gemini-2.5-flash" +MAX_RETRIES = 3 +RETRY_DELAYS = [1, 5, 30] + +# Concurrency +VALIDATION_WORKERS = 4 +SUMMARIZE_WORKERS = 5 + +# Memory limit (MB) +MAX_RSS_MB = 10240 # 10GB + +# Circuit breaker +CIRCUIT_BREAKER_THRESHOLD = 50 # 50 consecutive 429s +CIRCUIT_BREAKER_PAUSE = 300 # 5 minutes + +# Included types (same as wave 1) +INCLUDE_KEYS = { + "place": {"city", "town", "village", "hamlet", "suburb", "island", "islet", + "state", "county", "region", "locality"}, + "natural": {"peak", "volcano", "bay", "beach", "cape", "cliff", "water", + "wetland", "wood", "glacier", "valley", "strait", "reef", + "hot_spring", "geyser", "cave_entrance"}, + "waterway": {"river", "stream", "waterfall", "dam", "canal", "rapids"}, + "water": {"lake", "pond", "reservoir", "lagoon"}, + "boundary": {"protected_area", "national_park", "administrative"}, + "leisure": {"nature_reserve", "park"}, + "mountain_pass": None, + "landuse": {"cemetery"}, + "historic": None, + "tourism": {"attraction", "viewpoint"}, +} + +# Travel-relevant types (get Wikivoyage resolution) +TRAVEL_TYPES = { + ("place", "city"), ("place", "town"), ("place", "state"), + ("place", "country"), ("place", "island"), + ("boundary", "national_park"), ("boundary", "protected_area"), + ("leisure", "nature_reserve"), ("leisure", "park"), + ("tourism", "attraction"), +} + +# ============================================================================= +# LOGGING SETUP +# ============================================================================= + +def setup_logging(): + Path(LOG_DIR).mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = f"{LOG_DIR}/wave2_{timestamp}.log" + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler() + ] + ) + return logging.getLogger(__name__) + +log = setup_logging() + +# ============================================================================= +# MEMORY MONITORING +# ============================================================================= + +def check_memory(context=""): + """Check RSS memory usage, abort if over budget.""" + rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if sys.platform == 'darwin': + rss_mb = rss_kb / 1024 / 1024 + else: + rss_mb = rss_kb / 1024 + + if rss_mb > MAX_RSS_MB: + log.error(f"RSS {rss_mb:.0f}MB exceeds {MAX_RSS_MB}MB budget at {context}, aborting") + sys.exit(1) + + return rss_mb + +# ============================================================================= +# CHECKPOINTS +# ============================================================================= + +def write_checkpoint(path, *values): + """Write checkpoint values to file.""" + Path(path).parent.mkdir(parents=True, exist_ok=True) + with open(path, 'w') as f: + for v in values: + f.write(f"{v}\n") + log.info(f"Checkpoint written: {path}") + +def read_checkpoint(path): + """Read checkpoint values from file.""" + if not Path(path).exists(): + return None + with open(path, 'r') as f: + return [line.strip() for line in f.readlines()] + +# ============================================================================= +# DATABASE +# ============================================================================= + +def get_db(): + return sqlite3.connect(DB_PATH) + +def get_existing_place_names(): + """Get set of (place_name, osm_key, osm_value, county, state, country_code) from DB.""" + conn = get_db() + c = conn.cursor() + c.execute(""" + SELECT place_name, osm_key, osm_value, + COALESCE(county,''), COALESCE(state,''), country_code + FROM wiki_places + """) + existing = set(tuple(row) for row in c.fetchall()) + conn.close() + return existing + +# ============================================================================= +# STAGE 1: EXTRACT (WIKIDATA ONLY, NO WIKIPEDIA) +# ============================================================================= + +def should_include(osm_key, osm_value): + """Check if this type should be included.""" + if osm_key not in INCLUDE_KEYS: + return False + allowed = INCLUDE_KEYS[osm_key] + return allowed is None or osm_value in allowed + +def extract_from_jsonl(): + """Extract places with extra.wikidata but NO extra.wikipedia from JSONL.""" + + conn = get_db() + c = conn.cursor() + + # Load existing places from wave 1 to skip + log.info("Loading existing place names from DB...") + existing = get_existing_place_names() + log.info(f"Found {len(existing):,} existing places to skip") + + # Check for checkpoint + checkpoint = read_checkpoint(EXTRACT_CHECKPOINT) + if checkpoint: + resume_line = int(checkpoint[0]) + log.info(f"Resuming extraction from line {resume_line:,}") + else: + resume_line = COMBINED_START + + seen = set() + inserted = 0 + skipped_has_wikipedia = 0 + skipped_no_wikidata = 0 + skipped_filtered = 0 + skipped_no_name = 0 + skipped_dupe = 0 + skipped_existing = 0 + + log.info(f"Wave 2 extraction: lines {resume_line:,} to {COMBINED_END:,}") + log.info("Filtering: wikidata present AND wikipedia absent") + + with open(JSONL_PATH, 'rb') as fh: + dctx = zstd.ZstdDecompressor() + with dctx.stream_reader(fh) as reader: + import io + text_reader = io.TextIOWrapper(reader, encoding='utf-8') + + line_num = 0 + for line in text_reader: + line_num += 1 + + if line_num < resume_line: + if line_num % 10_000_000 == 0: + log.info(f" Seeking... line {line_num:,}") + continue + + if line_num > COMBINED_END: + break + + if line_num % 1_000_000 == 0: + rss = check_memory(f"line {line_num}") + log.info(f" Line {line_num:,}, inserted {inserted:,}, RSS {rss:.0f}MB") + conn.commit() + write_checkpoint(EXTRACT_CHECKPOINT, line_num) + + try: + record = json.loads(line) + content = record.get("content", [{}])[0] + + country_code = content.get("country_code", "") + if country_code not in ("us", "ca"): + continue + + osm_key = content.get("osm_key", "") + osm_value = content.get("osm_value", "") + + if not should_include(osm_key, osm_value): + skipped_filtered += 1 + continue + + extra = content.get("extra", {}) + + # Wave 2: require wikidata, reject if has wikipedia + wikidata_id = extra.get("wikidata") + if not wikidata_id: + skipped_no_wikidata += 1 + continue + + wiki_tag = extra.get("wikipedia") + if wiki_tag: + skipped_has_wikipedia += 1 + continue + + # Get name + name_obj = content.get("name", {}) + name = name_obj.get("name:en") or name_obj.get("name") + if not name: + skipped_no_name += 1 + continue + + # Parse address + address = content.get("address", {}) + state = address.get("state") or address.get("state:en") + county = address.get("county") or address.get("county:en") + + # Dedup key + dedup_key = (name, osm_key, osm_value, + county or "", state or "", country_code) + + # Skip if already in DB from wave 1 + if dedup_key in existing: + skipped_existing += 1 + continue + + if dedup_key in seen: + skipped_dupe += 1 + continue + seen.add(dedup_key) + + # Get other fields + osm_id = f"{content.get('object_type', '')}{content.get('object_id', '')}" + importance = content.get("importance") + + extra_fields = {k: v for k, v in extra.items() + if k not in ("wikipedia", "wikidata")} + extra_json = json.dumps(extra_fields) if extra_fields else None + + # Insert (no wikipedia_title yet - will resolve via Wikidata) + c.execute(""" + INSERT OR IGNORE INTO wiki_places + (place_name, osm_key, osm_value, county, state, country_code, + wikidata_id, osm_id, importance, extra_json, source) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'photon_wave2') + """, (name, osm_key, osm_value, county, state, country_code, + wikidata_id, osm_id, importance, extra_json)) + + if c.rowcount > 0: + inserted += 1 + + if inserted % 10000 == 0: + conn.commit() + + except json.JSONDecodeError: + continue + except Exception as e: + log.error(f"Error on line {line_num}: {e}") + continue + + conn.commit() + conn.close() + + if Path(EXTRACT_CHECKPOINT).exists(): + Path(EXTRACT_CHECKPOINT).unlink() + log.info("Extract checkpoint cleared (completed)") + + log.info(f"Wave 2 extraction complete:") + log.info(f" Inserted: {inserted:,}") + log.info(f" Skipped (has wikipedia): {skipped_has_wikipedia:,}") + log.info(f" Skipped (no wikidata): {skipped_no_wikidata:,}") + log.info(f" Skipped (filtered type): {skipped_filtered:,}") + log.info(f" Skipped (no name): {skipped_no_name:,}") + log.info(f" Skipped (duplicate): {skipped_dupe:,}") + log.info(f" Skipped (existing wave1): {skipped_existing:,}") + +# ============================================================================= +# STAGE 2: RESOLVE WIKIPEDIA TITLES VIA WIKIDATA +# ============================================================================= + +def batch_wikidata_lookup(qids): + """Batch lookup Wikidata Q-IDs to get wiki titles.""" + if not qids: + return {} + + params = { + "action": "wbgetentities", + "ids": "|".join(qids), + "props": "sitelinks", + "format": "json" + } + + try: + resp = requests.get(WIKIDATA_API, params=params, timeout=30) + resp.raise_for_status() + data = resp.json() + + results = {} + for qid, entity in data.get("entities", {}).items(): + sitelinks = entity.get("sitelinks", {}) + results[qid] = { + "enwiki": sitelinks.get("enwiki", {}).get("title"), + "enwikivoyage": sitelinks.get("enwikivoyage", {}).get("title") + } + return results + + except Exception as e: + log.error(f"Wikidata API error: {e}") + return {} + +def is_travel_type(osm_key, osm_value): + """Check if this type should get Wikivoyage resolution.""" + return (osm_key, osm_value) in TRAVEL_TYPES + +def resolve_wikipedia_titles(): + """Resolve Wikipedia/Wikivoyage titles via Wikidata API for wave 2 records.""" + conn = get_db() + c = conn.cursor() + + # Get wave 2 records with wikidata_id but no wikipedia_title + c.execute(""" + SELECT id, wikidata_id, osm_key, osm_value FROM wiki_places + WHERE source = 'photon_wave2' + AND wikidata_id IS NOT NULL + AND wikipedia_title IS NULL + """) + rows = c.fetchall() + + if not rows: + log.info("No wave 2 records need Wikipedia resolution") + return + + log.info(f"Resolving Wikipedia titles for {len(rows):,} wave 2 records via Wikidata...") + + resolved_wiki = 0 + resolved_voyage = 0 + + for i in range(0, len(rows), WIKIDATA_BATCH_SIZE): + batch = rows[i:i + WIKIDATA_BATCH_SIZE] + qid_to_row = {row[1]: row for row in batch} + qids = list(qid_to_row.keys()) + + results = batch_wikidata_lookup(qids) + + for qid, titles in results.items(): + row = qid_to_row[qid] + row_id = row[0] + osm_key = row[2] + osm_value = row[3] + + wiki_title = titles.get("enwiki") + voyage_title = titles.get("enwikivoyage") + + if wiki_title: + wiki_title = wiki_title.replace(' ', '_') + c.execute("UPDATE wiki_places SET wikipedia_title = ? WHERE id = ?", + (wiki_title, row_id)) + resolved_wiki += 1 + + # Also set wikivoyage if travel type + if voyage_title and is_travel_type(osm_key, osm_value): + voyage_title = voyage_title.replace(' ', '_') + c.execute("UPDATE wiki_places SET wikivoyage_title = ? WHERE id = ?", + (voyage_title, row_id)) + resolved_voyage += 1 + + if ((i // WIKIDATA_BATCH_SIZE + 1) % 20) == 0: + log.info(f" Processed {i + len(batch):,}/{len(rows):,} - " + f"wiki: {resolved_wiki:,}, voyage: {resolved_voyage:,}") + conn.commit() + write_checkpoint(RESOLVE_CHECKPOINT, i + len(batch), resolved_wiki, resolved_voyage) + + time.sleep(0.1) # Be nice to Wikidata API + + conn.commit() + conn.close() + + if Path(RESOLVE_CHECKPOINT).exists(): + Path(RESOLVE_CHECKPOINT).unlink() + + log.info(f"Resolution complete:") + log.info(f" Wikipedia titles: {resolved_wiki:,}") + log.info(f" Wikivoyage titles: {resolved_voyage:,}") + +# ============================================================================= +# STAGE 3: VALIDATE +# ============================================================================= + +def validate_title_worker(args): + """Worker function for thread pool.""" + row_id, title, base_url = args + if not title: + return (row_id, False) + + title = title.replace(" ", "_") + + if "NONE" in title.upper() or "(disambiguation" in title.lower() or len(title) > 100: + return (row_id, False) + + url = f"{base_url}/A/{title}" + try: + resp = requests.head(url, allow_redirects=True, timeout=10) + return (row_id, resp.status_code == 200) + except Exception: + return (row_id, False) + +def validate_wikipedia_titles(): + """Validate Wikipedia titles against ZIM for wave 2 records.""" + conn = get_db() + c = conn.cursor() + + c.execute(""" + SELECT id, wikipedia_title FROM wiki_places + WHERE source = 'photon_wave2' + AND wikipedia_title IS NOT NULL + AND wikipedia_exists IS NULL + """) + rows = c.fetchall() + + if not rows: + log.info("No wave 2 Wikipedia titles to validate") + return + + log.info(f"Validating {len(rows):,} Wikipedia titles...") + + valid_count = 0 + invalid_count = 0 + + work_items = [(row_id, title, WIKIPEDIA_INTERNAL) for row_id, title in rows] + + with ThreadPoolExecutor(max_workers=VALIDATION_WORKERS) as executor: + futures = {executor.submit(validate_title_worker, item): item for item in work_items} + + for i, future in enumerate(as_completed(futures)): + result = future.result() + if result is None: + continue + row_id, exists = result + + c.execute(""" + UPDATE wiki_places + SET wikipedia_exists = ?, zim_validated_at = ? + WHERE id = ? + """, (1 if exists else 0, datetime.now().isoformat(), row_id)) + + if exists: + valid_count += 1 + else: + invalid_count += 1 + + if (i + 1) % 1000 == 0: + log.info(f" Validated {i+1:,}/{len(rows):,} - valid: {valid_count:,}") + conn.commit() + + conn.commit() + conn.close() + + log.info(f"Wikipedia validation complete: valid={valid_count:,}, invalid={invalid_count:,}") + +def validate_wikivoyage_titles(): + """Validate Wikivoyage titles against ZIM for wave 2 records.""" + conn = get_db() + c = conn.cursor() + + c.execute(""" + SELECT id, wikivoyage_title FROM wiki_places + WHERE source = 'photon_wave2' + AND wikivoyage_title IS NOT NULL + AND wikivoyage_exists IS NULL + """) + rows = c.fetchall() + + if not rows: + log.info("No wave 2 Wikivoyage titles to validate") + return + + log.info(f"Validating {len(rows):,} Wikivoyage titles...") + + valid_count = 0 + invalid_count = 0 + + work_items = [(row_id, title, WIKIVOYAGE_INTERNAL) for row_id, title in rows] + + with ThreadPoolExecutor(max_workers=VALIDATION_WORKERS) as executor: + futures = {executor.submit(validate_title_worker, item): item for item in work_items} + + for i, future in enumerate(as_completed(futures)): + result = future.result() + if result is None: + continue + row_id, exists = result + + c.execute(""" + UPDATE wiki_places + SET wikivoyage_exists = ?, zim_validated_at = ? + WHERE id = ? + """, (1 if exists else 0, datetime.now().isoformat(), row_id)) + + if exists: + valid_count += 1 + else: + invalid_count += 1 + + if (i + 1) % 500 == 0: + conn.commit() + + conn.commit() + conn.close() + + log.info(f"Wikivoyage validation complete: valid={valid_count:,}, invalid={invalid_count:,}") + +# ============================================================================= +# STAGE 4: SUMMARY GENERATION +# ============================================================================= + +def fetch_article_content(title, base_url, max_chars=8000): + """Fetch and extract text content from ZIM article.""" + if not title: + return None + + title = title.replace(" ", "_") + + if "NONE" in title.upper() or "(disambiguation" in title.lower() or len(title) > 100: + return None + + url = f"{base_url}/A/{title}" + try: + resp = requests.get(url, timeout=30) + if resp.status_code != 200: + return None + + soup = BeautifulSoup(resp.text, 'html.parser') + + for tag in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): + tag.decompose() + + text = soup.get_text(separator=' ', strip=True) + + if len(text) > max_chars: + text = text[:max_chars] + "..." + + return text + + except Exception as e: + log.error(f"Error fetching {title}: {e}") + return None + +def build_summary_prompt(place, wiki_content=None, voyage_content=None): + """Build summary prompt.""" + + prompt_parts = [ + "Generate a 3-4 sentence summary for a map app user who tapped on this location.", + "", + f"Place: {place['place_name']} ({place['osm_key']}:{place['osm_value']})", + f"Location: {place['county'] or 'N/A'}, {place['state'] or 'N/A'}, {place['country_code'].upper()}", + "" + ] + + if wiki_content: + prompt_parts.extend([ + f"=== WIKIPEDIA ARTICLE: {place['wikipedia_title']} ===", + wiki_content, + "" + ]) + + if voyage_content: + prompt_parts.extend([ + f"=== WIKIVOYAGE ARTICLE: {place['wikivoyage_title']} ===", + voyage_content, + "" + ]) + + prompt_parts.extend([ + "Instructions:", + "- If either article appears to be about a DIFFERENT place, ignore it and provide", + " the correct title if you know it.", + "", + "- Write based on place type:", + " * Settlements: what's notable, regional context, key attractions", + " * Natural features: terrain, activities, access, best season", + " * Parks/reserves: what you'll see, trails, camping, logistics", + " * Historic sites: significance, what remains, visiting info", + "", + "- Engaging but informative tone.", + "", + "Response format (REQUIRED):", + "WIKIPEDIA_TITLE: ", + "WIKIVOYAGE_TITLE: ", + "SUMMARY: ", + "POPULATION: ", + ]) + + if is_travel_type(place['osm_key'], place['osm_value']) and not voyage_content: + prompt_parts.insert(-5, "") + prompt_parts.insert(-5, "If this place has a Wikivoyage article, include the title.") + + return "\n".join(prompt_parts) + +def parse_gemini_response(text): + """Parse Gemini response.""" + result = { + "wikipedia_title": None, + "wikivoyage_title": None, + "summary": None, + "population": None + } + + lines = text.strip().split('\n') + summary_lines = [] + in_summary = False + + for line in lines: + line_stripped = line.strip() + + if line_stripped.startswith("WIKIPEDIA_TITLE:"): + val = line_stripped.split(":", 1)[1].strip() + result["wikipedia_title"] = None if val.upper().startswith("NONE") else val + in_summary = False + elif line_stripped.startswith("WIKIVOYAGE_TITLE:"): + val = line_stripped.split(":", 1)[1].strip() + result["wikivoyage_title"] = None if val.upper().startswith("NONE") else val + in_summary = False + elif line_stripped.startswith("SUMMARY:"): + first_part = line_stripped.split(":", 1)[1].strip() + if first_part: + summary_lines.append(first_part) + in_summary = True + elif line_stripped.startswith("POPULATION:"): + in_summary = False + val = line_stripped.split(":", 1)[1].strip() + result["population"] = None if val.upper().startswith("NONE") else val + elif in_summary and line_stripped: + summary_lines.append(line_stripped) + + if summary_lines: + result["summary"] = " ".join(summary_lines) + + return result + +def log_gemini_response(place_id, prompt, response, parsed, output_tokens=None): + """Log Gemini response.""" + Path(GEMINI_LOG).parent.mkdir(parents=True, exist_ok=True) + with open(GEMINI_LOG, 'a') as f: + f.write(json.dumps({ + "timestamp": datetime.now().isoformat(), + "place_id": place_id, + "prompt_length": len(prompt), + "output_tokens": output_tokens, + "response": response, + "parsed": parsed + }) + "\n") + +def is_rate_limit_error(error): + """Check if error is a rate limit error.""" + error_str = str(error).lower() + return ( + "429" in error_str or + "resource_exhausted" in error_str or + ("rate" in error_str and "limit" in error_str) or + "quota" in error_str + ) + +def summarize_worker(args): + """Worker function for summary generation.""" + place, client, circuit_breaker = args + + wiki_content = None + voyage_content = None + + if place["wikipedia_exists"]: + wiki_content = fetch_article_content(place["wikipedia_title"], WIKIPEDIA_INTERNAL) + + if place["wikivoyage_exists"]: + voyage_content = fetch_article_content(place["wikivoyage_title"], WIKIVOYAGE_INTERNAL) + + if not wiki_content and not voyage_content: + return { + "place_id": place["id"], + "success": False, + "error": "no_content", + "error_message": f"No content for place {place['id']}" + } + + prompt = build_summary_prompt(place, wiki_content, voyage_content) + + if wiki_content and voyage_content: + summary_source = "wikipedia+wikivoyage" + elif wiki_content: + summary_source = "wikipedia" + else: + summary_source = "wikivoyage" + + response_text = None + output_tokens = None + + for attempt in range(MAX_RETRIES): + with circuit_breaker["lock"]: + if circuit_breaker["abort"]: + return { + "place_id": place["id"], + "success": False, + "error": "circuit_breaker_abort", + "error_message": "Circuit breaker aborted" + } + + try: + response = client.models.generate_content( + model=GEMINI_MODEL, + contents=prompt, + config=types.GenerateContentConfig( + temperature=0.3, + max_output_tokens=3000 + ) + ) + response_text = response.text + + if hasattr(response, 'usage_metadata') and response.usage_metadata: + output_tokens = getattr(response.usage_metadata, 'candidates_token_count', None) + + with circuit_breaker["lock"]: + circuit_breaker["consecutive_429"] = 0 + break + + except Exception as e: + if is_rate_limit_error(e): + with circuit_breaker["lock"]: + circuit_breaker["consecutive_429"] += 1 + consecutive = circuit_breaker["consecutive_429"] + + log.warning(f"Rate limit ({consecutive} consecutive) for {place['id']}: {e}") + + if attempt < MAX_RETRIES - 1: + time.sleep(RETRY_DELAYS[attempt]) + continue + else: + with circuit_breaker["lock"]: + circuit_breaker["consecutive_429"] = 0 + + if attempt < MAX_RETRIES - 1: + log.warning(f"Gemini retry {attempt+1} for {place['id']}: {e}") + time.sleep(RETRY_DELAYS[attempt]) + else: + return { + "place_id": place["id"], + "success": False, + "error": type(e).__name__, + "error_message": str(e), + "is_rate_limit": is_rate_limit_error(e) + } + + if not response_text: + return { + "place_id": place["id"], + "success": False, + "error": "no_response", + "error_message": "No response from Gemini" + } + + parsed = parse_gemini_response(response_text) + log_gemini_response(place['id'], prompt, response_text, parsed, output_tokens) + + if not parsed["summary"]: + return { + "place_id": place["id"], + "success": False, + "error": "parse_failed", + "error_message": "No summary parsed" + } + + return { + "place_id": place["id"], + "success": True, + "summary": parsed["summary"], + "summary_source": summary_source, + "population": parsed["population"], + "wikipedia_title": parsed["wikipedia_title"], + "wikivoyage_title": parsed["wikivoyage_title"] + } + +def generate_summaries(dry_run=False, workers=None): + """Generate summaries for wave 2 validated places.""" + + if workers is None: + workers = SUMMARIZE_WORKERS + + api_key = os.environ.get("GEMINI_API_KEY") + if not api_key: + env_path = Path(__file__).parent / ".env" + if env_path.exists(): + for line in env_path.read_text().splitlines(): + if line.startswith("GEMINI_API_KEY="): + api_key = line.split("=", 1)[1].strip().strip('"\'') + break + + if not api_key: + log.error("GEMINI_API_KEY not set") + return + + client = genai.Client(api_key=api_key) + + conn = get_db() + c = conn.cursor() + + c.execute(""" + SELECT id, place_name, osm_key, osm_value, county, state, country_code, + wikipedia_title, wikivoyage_title, wikipedia_exists, wikivoyage_exists, + wikidata_id + FROM wiki_places + WHERE source = 'photon_wave2' + AND (wikipedia_exists = 1 OR wikivoyage_exists = 1) + AND summary IS NULL + ORDER BY id + """) + rows = c.fetchall() + + if not rows: + log.info("No wave 2 places need summaries") + return + + if dry_run: + rows = rows[:5] + log.info(f"DRY RUN: Processing only {len(rows)} places with {workers} workers") + else: + log.info(f"Generating summaries for {len(rows):,} wave 2 places with {workers} workers...") + + places = [] + for row in rows: + places.append({ + "id": row[0], + "place_name": row[1], + "osm_key": row[2], + "osm_value": row[3], + "county": row[4], + "state": row[5], + "country_code": row[6], + "wikipedia_title": row[7], + "wikivoyage_title": row[8], + "wikipedia_exists": row[9], + "wikivoyage_exists": row[10], + "wikidata_id": row[11] + }) + + circuit_breaker = { + "lock": threading.Lock(), + "consecutive_429": 0, + "abort": False + } + circuit_breaker_paused = False + + processed = 0 + success = 0 + errors = 0 + last_place_id = 0 + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = { + executor.submit(summarize_worker, (place, client, circuit_breaker)): place + for place in places + } + + for future in as_completed(futures): + result = future.result() + processed += 1 + last_place_id = result["place_id"] + + if processed % 500 == 0: + rss = check_memory(f"summary {processed}") + log.info(f" Memory: RSS {rss:.0f}MB") + + if result["success"]: + now = datetime.now().isoformat() + c.execute(""" + UPDATE wiki_places SET + summary = ?, + summary_source = ?, + wiki_population = ?, + wikipedia_title = COALESCE(?, wikipedia_title), + wikivoyage_title = COALESCE(?, wikivoyage_title), + summary_generated_at = ?, + updated_at = ? + WHERE id = ? + """, ( + result["summary"], + result["summary_source"], + result["population"], + result["wikipedia_title"], + result["wikivoyage_title"], + now, + now, + result["place_id"] + )) + success += 1 + else: + if result["error"] != "circuit_breaker_abort": + log.warning(f"Failed {result['place_id']}: {result['error_message']}") + if result["error"] not in ("no_content", "parse_failed"): + c.execute(""" + INSERT INTO wiki_failures (place_id, wave, stage, error_type, error_message) + VALUES (?, 2, 'summarize', ?, ?) + """, (result["place_id"], result["error"], result["error_message"])) + errors += 1 + + # Circuit breaker check + if result.get("is_rate_limit"): + with circuit_breaker["lock"]: + if circuit_breaker["consecutive_429"] >= CIRCUIT_BREAKER_THRESHOLD: + if not circuit_breaker_paused: + log.warning(f"CIRCUIT BREAKER: Pausing {CIRCUIT_BREAKER_PAUSE//60} minutes...") + circuit_breaker_paused = True + circuit_breaker["consecutive_429"] = 0 + time.sleep(CIRCUIT_BREAKER_PAUSE) + circuit_breaker_paused = False + log.info("CIRCUIT BREAKER: Resuming...") + + if processed % 50 == 0: + conn.commit() + write_checkpoint(SUMMARIZE_CHECKPOINT, last_place_id, success, errors) + + if processed % 100 == 0: + log.info(f" Processed {processed:,}/{len(places):,} - success: {success:,}, errors: {errors:,}") + + with circuit_breaker["lock"]: + if circuit_breaker["abort"]: + log.error("Aborting due to circuit breaker") + break + + conn.commit() + conn.close() + + if Path(SUMMARIZE_CHECKPOINT).exists(): + Path(SUMMARIZE_CHECKPOINT).unlink() + + log.info(f"Wave 2 summary generation complete:") + log.info(f" Success: {success:,}") + log.info(f" Errors: {errors:,}") + +# ============================================================================= +# STAGE 5: RE-VALIDATE CORRECTED TITLES +# ============================================================================= + +def revalidate_corrected_titles(): + """Re-validate titles corrected by Gemini.""" + conn = get_db() + c = conn.cursor() + + c.execute(""" + SELECT id, wikipedia_title, wikivoyage_title FROM wiki_places + WHERE source = 'photon_wave2' + AND summary_generated_at IS NOT NULL + AND (zim_validated_at IS NULL OR zim_validated_at < summary_generated_at) + """) + rows = c.fetchall() + + if not rows: + log.info("No wave 2 corrected titles need re-validation") + return + + log.info(f"Re-validating {len(rows):,} wave 2 corrected titles...") + + wiki_revalidated = 0 + voyage_revalidated = 0 + + for row_id, wiki_title, voyage_title in rows: + now = datetime.now().isoformat() + + if wiki_title: + result = validate_title_worker((row_id, wiki_title, WIKIPEDIA_INTERNAL)) + if result: + _, exists = result + c.execute(""" + UPDATE wiki_places SET wikipedia_exists = ?, zim_validated_at = ? + WHERE id = ? + """, (1 if exists else 0, now, row_id)) + wiki_revalidated += 1 + + if voyage_title: + result = validate_title_worker((row_id, voyage_title, WIKIVOYAGE_INTERNAL)) + if result: + _, exists = result + c.execute(""" + UPDATE wiki_places SET wikivoyage_exists = ?, zim_validated_at = ? + WHERE id = ? + """, (1 if exists else 0, now, row_id)) + voyage_revalidated += 1 + + if (wiki_revalidated + voyage_revalidated) % 500 == 0: + conn.commit() + log.info(f" Re-validated {wiki_revalidated + voyage_revalidated:,}") + + conn.commit() + conn.close() + + log.info(f"Re-validation complete: {wiki_revalidated:,} Wikipedia, {voyage_revalidated:,} Wikivoyage") + +# ============================================================================= +# MAIN +# ============================================================================= + +def main(): + if len(sys.argv) < 2: + print(__doc__) + sys.exit(1) + + command = sys.argv[1].lower() + + if command == "extract": + log.info("=== WAVE 2 STAGE 1: EXTRACT ===") + extract_from_jsonl() + + elif command == "resolve": + log.info("=== WAVE 2 STAGE 2: RESOLVE ===") + resolve_wikipedia_titles() + + elif command == "validate": + log.info("=== WAVE 2 STAGE 3: VALIDATE ===") + validate_wikipedia_titles() + validate_wikivoyage_titles() + + elif command == "summarize": + dry_run = "--dry-run" in sys.argv + workers = None + for arg in sys.argv: + if arg.startswith("--workers="): + workers = int(arg.split("=")[1]) + if dry_run: + log.info("=== WAVE 2 STAGE 4: SUMMARIZE (DRY RUN) ===") + else: + log.info("=== WAVE 2 STAGE 4: SUMMARIZE ===") + generate_summaries(dry_run=dry_run, workers=workers) + + elif command == "revalidate": + log.info("=== WAVE 2 STAGE 5: RE-VALIDATE ===") + revalidate_corrected_titles() + + elif command == "all": + log.info("=== WAVE 2: ALL STAGES ===") + log.info("=== STAGE 1: EXTRACT ===") + extract_from_jsonl() + log.info("=== STAGE 2: RESOLVE ===") + resolve_wikipedia_titles() + log.info("=== STAGE 3: VALIDATE ===") + validate_wikipedia_titles() + validate_wikivoyage_titles() + log.info("=== STAGE 4: SUMMARIZE ===") + generate_summaries() + log.info("=== STAGE 5: RE-VALIDATE ===") + revalidate_corrected_titles() + + else: + print(f"Unknown command: {command}") + print(__doc__) + sys.exit(1) + +if __name__ == "__main__": + main()