#!/usr/bin/env python3 """ Wiki Location Index Pipeline — Wave 2 Processes places with extra.wikidata but NO extra.wikipedia tags from Photon JSONL dump. Resolves Wikipedia titles via Wikidata API. Usage: python wiki_index_wave2.py extract # Extract from JSONL (wikidata only) python wiki_index_wave2.py resolve # Resolve Wikipedia titles via Wikidata python wiki_index_wave2.py validate # Validate titles against ZIM python wiki_index_wave2.py summarize # Generate summaries with Gemini python wiki_index_wave2.py summarize --workers=10 # Use 10 concurrent workers python wiki_index_wave2.py summarize --dry-run # Process only 5 places (test run) python wiki_index_wave2.py revalidate # Re-validate corrected titles python wiki_index_wave2.py all # Run all stages """ import os import sys import json import sqlite3 import logging import time import resource import threading from datetime import datetime from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import zstandard as zstd from bs4 import BeautifulSoup from google import genai from google.genai import types import requests # ============================================================================= # CONFIGURATION # ============================================================================= # Paths JSONL_PATH = "/mnt/pi-nas/nav/photon-dump-planet.jsonl.zst" DB_PATH = "/mnt/pi-nas/nav/wiki-index/data/wiki_index.db" LOG_DIR = "/mnt/pi-nas/nav/wiki-index/logs" CHECKPOINT_DIR = "/mnt/pi-nas/nav/wiki-index/data" GEMINI_LOG = f"{LOG_DIR}/gemini_responses_wave2.jsonl" # Checkpoint files (wave 2 specific) EXTRACT_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_extract_checkpoint.txt" RESOLVE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_resolve_checkpoint.txt" VALIDATE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_validate_checkpoint.txt" SUMMARIZE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_summarize_checkpoint.txt" # Single-pass line range for US + Canada COMBINED_START = 53694616 COMBINED_END = 175406527 # ZIM endpoints WIKIPEDIA_INTERNAL = "http://192.168.1.130:8430/wikipedia_en_all_maxi_2026-02" WIKIVOYAGE_INTERNAL = "http://192.168.1.130:8430/wikivoyage_en_all_maxi_2026-03" # Wikidata API WIKIDATA_API = "https://www.wikidata.org/w/api.php" WIKIDATA_BATCH_SIZE = 50 # Gemini GEMINI_MODEL = "gemini-2.5-flash" MAX_RETRIES = 3 RETRY_DELAYS = [1, 5, 30] # Concurrency VALIDATION_WORKERS = 4 SUMMARIZE_WORKERS = 5 # Memory limit (MB) MAX_RSS_MB = 10240 # 10GB # Circuit breaker CIRCUIT_BREAKER_THRESHOLD = 50 # 50 consecutive 429s CIRCUIT_BREAKER_PAUSE = 300 # 5 minutes # Included types (same as wave 1) INCLUDE_KEYS = { "place": {"city", "town", "village", "hamlet", "suburb", "island", "islet", "state", "county", "region", "locality"}, "natural": {"peak", "volcano", "bay", "beach", "cape", "cliff", "water", "wetland", "wood", "glacier", "valley", "strait", "reef", "hot_spring", "geyser", "cave_entrance"}, "waterway": {"river", "stream", "waterfall", "dam", "canal", "rapids"}, "water": {"lake", "pond", "reservoir", "lagoon"}, "boundary": {"protected_area", "national_park", "administrative"}, "leisure": {"nature_reserve", "park"}, "mountain_pass": None, "landuse": {"cemetery"}, "historic": None, "tourism": {"attraction", "viewpoint"}, } # Travel-relevant types (get Wikivoyage resolution) TRAVEL_TYPES = { ("place", "city"), ("place", "town"), ("place", "state"), ("place", "country"), ("place", "island"), ("boundary", "national_park"), ("boundary", "protected_area"), ("leisure", "nature_reserve"), ("leisure", "park"), ("tourism", "attraction"), } # ============================================================================= # LOGGING SETUP # ============================================================================= def setup_logging(): Path(LOG_DIR).mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = f"{LOG_DIR}/wave2_{timestamp}.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger(__name__) log = setup_logging() # ============================================================================= # MEMORY MONITORING # ============================================================================= def check_memory(context=""): """Check RSS memory usage, abort if over budget.""" rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss if sys.platform == 'darwin': rss_mb = rss_kb / 1024 / 1024 else: rss_mb = rss_kb / 1024 if rss_mb > MAX_RSS_MB: log.error(f"RSS {rss_mb:.0f}MB exceeds {MAX_RSS_MB}MB budget at {context}, aborting") sys.exit(1) return rss_mb # ============================================================================= # CHECKPOINTS # ============================================================================= def write_checkpoint(path, *values): """Write checkpoint values to file.""" Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, 'w') as f: for v in values: f.write(f"{v}\n") log.info(f"Checkpoint written: {path}") def read_checkpoint(path): """Read checkpoint values from file.""" if not Path(path).exists(): return None with open(path, 'r') as f: return [line.strip() for line in f.readlines()] # ============================================================================= # DATABASE # ============================================================================= def get_db(): return sqlite3.connect(DB_PATH) def get_existing_place_names(): """Get set of (place_name, osm_key, osm_value, county, state, country_code) from DB.""" conn = get_db() c = conn.cursor() c.execute(""" SELECT place_name, osm_key, osm_value, COALESCE(county,''), COALESCE(state,''), country_code FROM wiki_places """) existing = set(tuple(row) for row in c.fetchall()) conn.close() return existing # ============================================================================= # STAGE 1: EXTRACT (WIKIDATA ONLY, NO WIKIPEDIA) # ============================================================================= def should_include(osm_key, osm_value): """Check if this type should be included.""" if osm_key not in INCLUDE_KEYS: return False allowed = INCLUDE_KEYS[osm_key] return allowed is None or osm_value in allowed def extract_from_jsonl(): """Extract places with extra.wikidata but NO extra.wikipedia from JSONL.""" conn = get_db() c = conn.cursor() # Load existing places from wave 1 to skip log.info("Loading existing place names from DB...") existing = get_existing_place_names() log.info(f"Found {len(existing):,} existing places to skip") # Check for checkpoint checkpoint = read_checkpoint(EXTRACT_CHECKPOINT) if checkpoint: resume_line = int(checkpoint[0]) log.info(f"Resuming extraction from line {resume_line:,}") else: resume_line = COMBINED_START seen = set() inserted = 0 skipped_has_wikipedia = 0 skipped_no_wikidata = 0 skipped_filtered = 0 skipped_no_name = 0 skipped_dupe = 0 skipped_existing = 0 log.info(f"Wave 2 extraction: lines {resume_line:,} to {COMBINED_END:,}") log.info("Filtering: wikidata present AND wikipedia absent") with open(JSONL_PATH, 'rb') as fh: dctx = zstd.ZstdDecompressor() with dctx.stream_reader(fh) as reader: import io text_reader = io.TextIOWrapper(reader, encoding='utf-8') line_num = 0 for line in text_reader: line_num += 1 if line_num < resume_line: if line_num % 10_000_000 == 0: log.info(f" Seeking... line {line_num:,}") continue if line_num > COMBINED_END: break if line_num % 1_000_000 == 0: rss = check_memory(f"line {line_num}") log.info(f" Line {line_num:,}, inserted {inserted:,}, RSS {rss:.0f}MB") conn.commit() write_checkpoint(EXTRACT_CHECKPOINT, line_num) try: record = json.loads(line) content = record.get("content", [{}])[0] country_code = content.get("country_code", "") if country_code not in ("us", "ca"): continue osm_key = content.get("osm_key", "") osm_value = content.get("osm_value", "") if not should_include(osm_key, osm_value): skipped_filtered += 1 continue extra = content.get("extra", {}) # Wave 2: require wikidata, reject if has wikipedia wikidata_id = extra.get("wikidata") if not wikidata_id: skipped_no_wikidata += 1 continue wiki_tag = extra.get("wikipedia") if wiki_tag: skipped_has_wikipedia += 1 continue # Get name name_obj = content.get("name", {}) name = name_obj.get("name:en") or name_obj.get("name") if not name: skipped_no_name += 1 continue # Parse address address = content.get("address", {}) state = address.get("state") or address.get("state:en") county = address.get("county") or address.get("county:en") # Dedup key dedup_key = (name, osm_key, osm_value, county or "", state or "", country_code) # Skip if already in DB from wave 1 if dedup_key in existing: skipped_existing += 1 continue if dedup_key in seen: skipped_dupe += 1 continue seen.add(dedup_key) # Get other fields osm_id = f"{content.get('object_type', '')}{content.get('object_id', '')}" importance = content.get("importance") extra_fields = {k: v for k, v in extra.items() if k not in ("wikipedia", "wikidata")} extra_json = json.dumps(extra_fields) if extra_fields else None # Insert (no wikipedia_title yet - will resolve via Wikidata) c.execute(""" INSERT OR IGNORE INTO wiki_places (place_name, osm_key, osm_value, county, state, country_code, wikidata_id, osm_id, importance, extra_json, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'photon_wave2') """, (name, osm_key, osm_value, county, state, country_code, wikidata_id, osm_id, importance, extra_json)) if c.rowcount > 0: inserted += 1 if inserted % 10000 == 0: conn.commit() except json.JSONDecodeError: continue except Exception as e: log.error(f"Error on line {line_num}: {e}") continue conn.commit() conn.close() if Path(EXTRACT_CHECKPOINT).exists(): Path(EXTRACT_CHECKPOINT).unlink() log.info("Extract checkpoint cleared (completed)") log.info(f"Wave 2 extraction complete:") log.info(f" Inserted: {inserted:,}") log.info(f" Skipped (has wikipedia): {skipped_has_wikipedia:,}") log.info(f" Skipped (no wikidata): {skipped_no_wikidata:,}") log.info(f" Skipped (filtered type): {skipped_filtered:,}") log.info(f" Skipped (no name): {skipped_no_name:,}") log.info(f" Skipped (duplicate): {skipped_dupe:,}") log.info(f" Skipped (existing wave1): {skipped_existing:,}") # ============================================================================= # STAGE 2: RESOLVE WIKIPEDIA TITLES VIA WIKIDATA # ============================================================================= def batch_wikidata_lookup(qids): """Batch lookup Wikidata Q-IDs to get wiki titles.""" if not qids: return {} params = { "action": "wbgetentities", "ids": "|".join(qids), "props": "sitelinks", "format": "json" } try: resp = requests.get(WIKIDATA_API, params=params, timeout=30) resp.raise_for_status() data = resp.json() results = {} for qid, entity in data.get("entities", {}).items(): sitelinks = entity.get("sitelinks", {}) results[qid] = { "enwiki": sitelinks.get("enwiki", {}).get("title"), "enwikivoyage": sitelinks.get("enwikivoyage", {}).get("title") } return results except Exception as e: log.error(f"Wikidata API error: {e}") return {} def is_travel_type(osm_key, osm_value): """Check if this type should get Wikivoyage resolution.""" return (osm_key, osm_value) in TRAVEL_TYPES def resolve_wikipedia_titles(): """Resolve Wikipedia/Wikivoyage titles via Wikidata API for wave 2 records.""" conn = get_db() c = conn.cursor() # Get wave 2 records with wikidata_id but no wikipedia_title c.execute(""" SELECT id, wikidata_id, osm_key, osm_value FROM wiki_places WHERE source = 'photon_wave2' AND wikidata_id IS NOT NULL AND wikipedia_title IS NULL """) rows = c.fetchall() if not rows: log.info("No wave 2 records need Wikipedia resolution") return log.info(f"Resolving Wikipedia titles for {len(rows):,} wave 2 records via Wikidata...") resolved_wiki = 0 resolved_voyage = 0 for i in range(0, len(rows), WIKIDATA_BATCH_SIZE): batch = rows[i:i + WIKIDATA_BATCH_SIZE] qid_to_row = {row[1]: row for row in batch} qids = list(qid_to_row.keys()) results = batch_wikidata_lookup(qids) for qid, titles in results.items(): row = qid_to_row[qid] row_id = row[0] osm_key = row[2] osm_value = row[3] wiki_title = titles.get("enwiki") voyage_title = titles.get("enwikivoyage") if wiki_title: wiki_title = wiki_title.replace(' ', '_') c.execute("UPDATE wiki_places SET wikipedia_title = ? WHERE id = ?", (wiki_title, row_id)) resolved_wiki += 1 # Also set wikivoyage if travel type if voyage_title and is_travel_type(osm_key, osm_value): voyage_title = voyage_title.replace(' ', '_') c.execute("UPDATE wiki_places SET wikivoyage_title = ? WHERE id = ?", (voyage_title, row_id)) resolved_voyage += 1 if ((i // WIKIDATA_BATCH_SIZE + 1) % 20) == 0: log.info(f" Processed {i + len(batch):,}/{len(rows):,} - " f"wiki: {resolved_wiki:,}, voyage: {resolved_voyage:,}") conn.commit() write_checkpoint(RESOLVE_CHECKPOINT, i + len(batch), resolved_wiki, resolved_voyage) time.sleep(0.1) # Be nice to Wikidata API conn.commit() conn.close() if Path(RESOLVE_CHECKPOINT).exists(): Path(RESOLVE_CHECKPOINT).unlink() log.info(f"Resolution complete:") log.info(f" Wikipedia titles: {resolved_wiki:,}") log.info(f" Wikivoyage titles: {resolved_voyage:,}") # ============================================================================= # STAGE 3: VALIDATE # ============================================================================= def validate_title_worker(args): """Worker function for thread pool.""" row_id, title, base_url = args if not title: return (row_id, False) title = title.replace(" ", "_") if "NONE" in title.upper() or "(disambiguation" in title.lower() or len(title) > 100: return (row_id, False) url = f"{base_url}/A/{title}" try: resp = requests.head(url, allow_redirects=True, timeout=10) return (row_id, resp.status_code == 200) except Exception: return (row_id, False) def validate_wikipedia_titles(): """Validate Wikipedia titles against ZIM for wave 2 records.""" conn = get_db() c = conn.cursor() c.execute(""" SELECT id, wikipedia_title FROM wiki_places WHERE source = 'photon_wave2' AND wikipedia_title IS NOT NULL AND wikipedia_exists IS NULL """) rows = c.fetchall() if not rows: log.info("No wave 2 Wikipedia titles to validate") return log.info(f"Validating {len(rows):,} Wikipedia titles...") valid_count = 0 invalid_count = 0 work_items = [(row_id, title, WIKIPEDIA_INTERNAL) for row_id, title in rows] with ThreadPoolExecutor(max_workers=VALIDATION_WORKERS) as executor: futures = {executor.submit(validate_title_worker, item): item for item in work_items} for i, future in enumerate(as_completed(futures)): result = future.result() if result is None: continue row_id, exists = result c.execute(""" UPDATE wiki_places SET wikipedia_exists = ?, zim_validated_at = ? WHERE id = ? """, (1 if exists else 0, datetime.now().isoformat(), row_id)) if exists: valid_count += 1 else: invalid_count += 1 if (i + 1) % 1000 == 0: log.info(f" Validated {i+1:,}/{len(rows):,} - valid: {valid_count:,}") conn.commit() conn.commit() conn.close() log.info(f"Wikipedia validation complete: valid={valid_count:,}, invalid={invalid_count:,}") def validate_wikivoyage_titles(): """Validate Wikivoyage titles against ZIM for wave 2 records.""" conn = get_db() c = conn.cursor() c.execute(""" SELECT id, wikivoyage_title FROM wiki_places WHERE source = 'photon_wave2' AND wikivoyage_title IS NOT NULL AND wikivoyage_exists IS NULL """) rows = c.fetchall() if not rows: log.info("No wave 2 Wikivoyage titles to validate") return log.info(f"Validating {len(rows):,} Wikivoyage titles...") valid_count = 0 invalid_count = 0 work_items = [(row_id, title, WIKIVOYAGE_INTERNAL) for row_id, title in rows] with ThreadPoolExecutor(max_workers=VALIDATION_WORKERS) as executor: futures = {executor.submit(validate_title_worker, item): item for item in work_items} for i, future in enumerate(as_completed(futures)): result = future.result() if result is None: continue row_id, exists = result c.execute(""" UPDATE wiki_places SET wikivoyage_exists = ?, zim_validated_at = ? WHERE id = ? """, (1 if exists else 0, datetime.now().isoformat(), row_id)) if exists: valid_count += 1 else: invalid_count += 1 if (i + 1) % 500 == 0: conn.commit() conn.commit() conn.close() log.info(f"Wikivoyage validation complete: valid={valid_count:,}, invalid={invalid_count:,}") # ============================================================================= # STAGE 4: SUMMARY GENERATION # ============================================================================= def fetch_article_content(title, base_url, max_chars=8000): """Fetch and extract text content from ZIM article.""" if not title: return None title = title.replace(" ", "_") if "NONE" in title.upper() or "(disambiguation" in title.lower() or len(title) > 100: return None url = f"{base_url}/A/{title}" try: resp = requests.get(url, timeout=30) if resp.status_code != 200: return None soup = BeautifulSoup(resp.text, 'html.parser') for tag in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): tag.decompose() text = soup.get_text(separator=' ', strip=True) if len(text) > max_chars: text = text[:max_chars] + "..." return text except Exception as e: log.error(f"Error fetching {title}: {e}") return None def build_summary_prompt(place, wiki_content=None, voyage_content=None): """Build summary prompt.""" prompt_parts = [ "Generate a 3-4 sentence summary for a map app user who tapped on this location.", "", f"Place: {place['place_name']} ({place['osm_key']}:{place['osm_value']})", f"Location: {place['county'] or 'N/A'}, {place['state'] or 'N/A'}, {place['country_code'].upper()}", "" ] if wiki_content: prompt_parts.extend([ f"=== WIKIPEDIA ARTICLE: {place['wikipedia_title']} ===", wiki_content, "" ]) if voyage_content: prompt_parts.extend([ f"=== WIKIVOYAGE ARTICLE: {place['wikivoyage_title']} ===", voyage_content, "" ]) prompt_parts.extend([ "Instructions:", "- If either article appears to be about a DIFFERENT place, ignore it and provide", " the correct title if you know it.", "", "- Write based on place type:", " * Settlements: what's notable, regional context, key attractions", " * Natural features: terrain, activities, access, best season", " * Parks/reserves: what you'll see, trails, camping, logistics", " * Historic sites: significance, what remains, visiting info", "", "- Engaging but informative tone.", "", "Response format (REQUIRED):", "WIKIPEDIA_TITLE: ", "WIKIVOYAGE_TITLE: ", "SUMMARY: ", "POPULATION: ", ]) if is_travel_type(place['osm_key'], place['osm_value']) and not voyage_content: prompt_parts.insert(-5, "") prompt_parts.insert(-5, "If this place has a Wikivoyage article, include the title.") return "\n".join(prompt_parts) def parse_gemini_response(text): """Parse Gemini response.""" result = { "wikipedia_title": None, "wikivoyage_title": None, "summary": None, "population": None } lines = text.strip().split('\n') summary_lines = [] in_summary = False for line in lines: line_stripped = line.strip() if line_stripped.startswith("WIKIPEDIA_TITLE:"): val = line_stripped.split(":", 1)[1].strip() result["wikipedia_title"] = None if val.upper().startswith("NONE") else val in_summary = False elif line_stripped.startswith("WIKIVOYAGE_TITLE:"): val = line_stripped.split(":", 1)[1].strip() result["wikivoyage_title"] = None if val.upper().startswith("NONE") else val in_summary = False elif line_stripped.startswith("SUMMARY:"): first_part = line_stripped.split(":", 1)[1].strip() if first_part: summary_lines.append(first_part) in_summary = True elif line_stripped.startswith("POPULATION:"): in_summary = False val = line_stripped.split(":", 1)[1].strip() result["population"] = None if val.upper().startswith("NONE") else val elif in_summary and line_stripped: summary_lines.append(line_stripped) if summary_lines: result["summary"] = " ".join(summary_lines) return result def log_gemini_response(place_id, prompt, response, parsed, output_tokens=None): """Log Gemini response.""" Path(GEMINI_LOG).parent.mkdir(parents=True, exist_ok=True) with open(GEMINI_LOG, 'a') as f: f.write(json.dumps({ "timestamp": datetime.now().isoformat(), "place_id": place_id, "prompt_length": len(prompt), "output_tokens": output_tokens, "response": response, "parsed": parsed }) + "\n") def is_rate_limit_error(error): """Check if error is a rate limit error.""" error_str = str(error).lower() return ( "429" in error_str or "resource_exhausted" in error_str or ("rate" in error_str and "limit" in error_str) or "quota" in error_str ) def summarize_worker(args): """Worker function for summary generation.""" place, client, circuit_breaker = args wiki_content = None voyage_content = None if place["wikipedia_exists"]: wiki_content = fetch_article_content(place["wikipedia_title"], WIKIPEDIA_INTERNAL) if place["wikivoyage_exists"]: voyage_content = fetch_article_content(place["wikivoyage_title"], WIKIVOYAGE_INTERNAL) if not wiki_content and not voyage_content: return { "place_id": place["id"], "success": False, "error": "no_content", "error_message": f"No content for place {place['id']}" } prompt = build_summary_prompt(place, wiki_content, voyage_content) if wiki_content and voyage_content: summary_source = "wikipedia+wikivoyage" elif wiki_content: summary_source = "wikipedia" else: summary_source = "wikivoyage" response_text = None output_tokens = None for attempt in range(MAX_RETRIES): with circuit_breaker["lock"]: if circuit_breaker["abort"]: return { "place_id": place["id"], "success": False, "error": "circuit_breaker_abort", "error_message": "Circuit breaker aborted" } try: response = client.models.generate_content( model=GEMINI_MODEL, contents=prompt, config=types.GenerateContentConfig( temperature=0.3, max_output_tokens=3000 ) ) response_text = response.text if hasattr(response, 'usage_metadata') and response.usage_metadata: output_tokens = getattr(response.usage_metadata, 'candidates_token_count', None) with circuit_breaker["lock"]: circuit_breaker["consecutive_429"] = 0 break except Exception as e: if is_rate_limit_error(e): with circuit_breaker["lock"]: circuit_breaker["consecutive_429"] += 1 consecutive = circuit_breaker["consecutive_429"] log.warning(f"Rate limit ({consecutive} consecutive) for {place['id']}: {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_DELAYS[attempt]) continue else: with circuit_breaker["lock"]: circuit_breaker["consecutive_429"] = 0 if attempt < MAX_RETRIES - 1: log.warning(f"Gemini retry {attempt+1} for {place['id']}: {e}") time.sleep(RETRY_DELAYS[attempt]) else: return { "place_id": place["id"], "success": False, "error": type(e).__name__, "error_message": str(e), "is_rate_limit": is_rate_limit_error(e) } if not response_text: return { "place_id": place["id"], "success": False, "error": "no_response", "error_message": "No response from Gemini" } parsed = parse_gemini_response(response_text) log_gemini_response(place['id'], prompt, response_text, parsed, output_tokens) if not parsed["summary"]: return { "place_id": place["id"], "success": False, "error": "parse_failed", "error_message": "No summary parsed" } return { "place_id": place["id"], "success": True, "summary": parsed["summary"], "summary_source": summary_source, "population": parsed["population"], "wikipedia_title": parsed["wikipedia_title"], "wikivoyage_title": parsed["wikivoyage_title"] } def generate_summaries(dry_run=False, workers=None): """Generate summaries for wave 2 validated places.""" if workers is None: workers = SUMMARIZE_WORKERS api_key = os.environ.get("GEMINI_API_KEY") if not api_key: env_path = Path(__file__).parent / ".env" if env_path.exists(): for line in env_path.read_text().splitlines(): if line.startswith("GEMINI_API_KEY="): api_key = line.split("=", 1)[1].strip().strip('"\'') break if not api_key: log.error("GEMINI_API_KEY not set") return client = genai.Client(api_key=api_key) conn = get_db() c = conn.cursor() c.execute(""" SELECT id, place_name, osm_key, osm_value, county, state, country_code, wikipedia_title, wikivoyage_title, wikipedia_exists, wikivoyage_exists, wikidata_id FROM wiki_places WHERE source = 'photon_wave2' AND (wikipedia_exists = 1 OR wikivoyage_exists = 1) AND summary IS NULL ORDER BY id """) rows = c.fetchall() if not rows: log.info("No wave 2 places need summaries") return if dry_run: rows = rows[:5] log.info(f"DRY RUN: Processing only {len(rows)} places with {workers} workers") else: log.info(f"Generating summaries for {len(rows):,} wave 2 places with {workers} workers...") places = [] for row in rows: places.append({ "id": row[0], "place_name": row[1], "osm_key": row[2], "osm_value": row[3], "county": row[4], "state": row[5], "country_code": row[6], "wikipedia_title": row[7], "wikivoyage_title": row[8], "wikipedia_exists": row[9], "wikivoyage_exists": row[10], "wikidata_id": row[11] }) circuit_breaker = { "lock": threading.Lock(), "consecutive_429": 0, "abort": False } circuit_breaker_paused = False processed = 0 success = 0 errors = 0 last_place_id = 0 with ThreadPoolExecutor(max_workers=workers) as executor: futures = { executor.submit(summarize_worker, (place, client, circuit_breaker)): place for place in places } for future in as_completed(futures): result = future.result() processed += 1 last_place_id = result["place_id"] if processed % 500 == 0: rss = check_memory(f"summary {processed}") log.info(f" Memory: RSS {rss:.0f}MB") if result["success"]: now = datetime.now().isoformat() c.execute(""" UPDATE wiki_places SET summary = ?, summary_source = ?, wiki_population = ?, wikipedia_title = COALESCE(?, wikipedia_title), wikivoyage_title = COALESCE(?, wikivoyage_title), summary_generated_at = ?, updated_at = ? WHERE id = ? """, ( result["summary"], result["summary_source"], result["population"], result["wikipedia_title"], result["wikivoyage_title"], now, now, result["place_id"] )) success += 1 else: if result["error"] != "circuit_breaker_abort": log.warning(f"Failed {result['place_id']}: {result['error_message']}") if result["error"] not in ("no_content", "parse_failed"): c.execute(""" INSERT INTO wiki_failures (place_id, wave, stage, error_type, error_message) VALUES (?, 2, 'summarize', ?, ?) """, (result["place_id"], result["error"], result["error_message"])) errors += 1 # Circuit breaker check if result.get("is_rate_limit"): with circuit_breaker["lock"]: if circuit_breaker["consecutive_429"] >= CIRCUIT_BREAKER_THRESHOLD: if not circuit_breaker_paused: log.warning(f"CIRCUIT BREAKER: Pausing {CIRCUIT_BREAKER_PAUSE//60} minutes...") circuit_breaker_paused = True circuit_breaker["consecutive_429"] = 0 time.sleep(CIRCUIT_BREAKER_PAUSE) circuit_breaker_paused = False log.info("CIRCUIT BREAKER: Resuming...") if processed % 50 == 0: conn.commit() write_checkpoint(SUMMARIZE_CHECKPOINT, last_place_id, success, errors) if processed % 100 == 0: log.info(f" Processed {processed:,}/{len(places):,} - success: {success:,}, errors: {errors:,}") with circuit_breaker["lock"]: if circuit_breaker["abort"]: log.error("Aborting due to circuit breaker") break conn.commit() conn.close() if Path(SUMMARIZE_CHECKPOINT).exists(): Path(SUMMARIZE_CHECKPOINT).unlink() log.info(f"Wave 2 summary generation complete:") log.info(f" Success: {success:,}") log.info(f" Errors: {errors:,}") # ============================================================================= # STAGE 5: RE-VALIDATE CORRECTED TITLES # ============================================================================= def revalidate_corrected_titles(): """Re-validate titles corrected by Gemini.""" conn = get_db() c = conn.cursor() c.execute(""" SELECT id, wikipedia_title, wikivoyage_title FROM wiki_places WHERE source = 'photon_wave2' AND summary_generated_at IS NOT NULL AND (zim_validated_at IS NULL OR zim_validated_at < summary_generated_at) """) rows = c.fetchall() if not rows: log.info("No wave 2 corrected titles need re-validation") return log.info(f"Re-validating {len(rows):,} wave 2 corrected titles...") wiki_revalidated = 0 voyage_revalidated = 0 for row_id, wiki_title, voyage_title in rows: now = datetime.now().isoformat() if wiki_title: result = validate_title_worker((row_id, wiki_title, WIKIPEDIA_INTERNAL)) if result: _, exists = result c.execute(""" UPDATE wiki_places SET wikipedia_exists = ?, zim_validated_at = ? WHERE id = ? """, (1 if exists else 0, now, row_id)) wiki_revalidated += 1 if voyage_title: result = validate_title_worker((row_id, voyage_title, WIKIVOYAGE_INTERNAL)) if result: _, exists = result c.execute(""" UPDATE wiki_places SET wikivoyage_exists = ?, zim_validated_at = ? WHERE id = ? """, (1 if exists else 0, now, row_id)) voyage_revalidated += 1 if (wiki_revalidated + voyage_revalidated) % 500 == 0: conn.commit() log.info(f" Re-validated {wiki_revalidated + voyage_revalidated:,}") conn.commit() conn.close() log.info(f"Re-validation complete: {wiki_revalidated:,} Wikipedia, {voyage_revalidated:,} Wikivoyage") # ============================================================================= # MAIN # ============================================================================= def main(): if len(sys.argv) < 2: print(__doc__) sys.exit(1) command = sys.argv[1].lower() if command == "extract": log.info("=== WAVE 2 STAGE 1: EXTRACT ===") extract_from_jsonl() elif command == "resolve": log.info("=== WAVE 2 STAGE 2: RESOLVE ===") resolve_wikipedia_titles() elif command == "validate": log.info("=== WAVE 2 STAGE 3: VALIDATE ===") validate_wikipedia_titles() validate_wikivoyage_titles() elif command == "summarize": dry_run = "--dry-run" in sys.argv workers = None for arg in sys.argv: if arg.startswith("--workers="): workers = int(arg.split("=")[1]) if dry_run: log.info("=== WAVE 2 STAGE 4: SUMMARIZE (DRY RUN) ===") else: log.info("=== WAVE 2 STAGE 4: SUMMARIZE ===") generate_summaries(dry_run=dry_run, workers=workers) elif command == "revalidate": log.info("=== WAVE 2 STAGE 5: RE-VALIDATE ===") revalidate_corrected_titles() elif command == "all": log.info("=== WAVE 2: ALL STAGES ===") log.info("=== STAGE 1: EXTRACT ===") extract_from_jsonl() log.info("=== STAGE 2: RESOLVE ===") resolve_wikipedia_titles() log.info("=== STAGE 3: VALIDATE ===") validate_wikipedia_titles() validate_wikivoyage_titles() log.info("=== STAGE 4: SUMMARIZE ===") generate_summaries() log.info("=== STAGE 5: RE-VALIDATE ===") revalidate_corrected_titles() else: print(f"Unknown command: {command}") print(__doc__) sys.exit(1) if __name__ == "__main__": main()