recon/scripts/wiki_index_wave2.py

1128 lines
39 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Wiki Location Index Pipeline Wave 2
Processes places with extra.wikidata but NO extra.wikipedia tags from Photon JSONL dump.
Resolves Wikipedia titles via Wikidata API.
Usage:
python wiki_index_wave2.py extract # Extract from JSONL (wikidata only)
python wiki_index_wave2.py resolve # Resolve Wikipedia titles via Wikidata
python wiki_index_wave2.py validate # Validate titles against ZIM
python wiki_index_wave2.py summarize # Generate summaries with Gemini
python wiki_index_wave2.py summarize --workers=10 # Use 10 concurrent workers
python wiki_index_wave2.py summarize --dry-run # Process only 5 places (test run)
python wiki_index_wave2.py revalidate # Re-validate corrected titles
python wiki_index_wave2.py all # Run all stages
"""
import os
import sys
import json
import sqlite3
import logging
import time
import resource
import threading
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import zstandard as zstd
from bs4 import BeautifulSoup
from google import genai
from google.genai import types
import requests
# =============================================================================
# CONFIGURATION
# =============================================================================
# Paths
JSONL_PATH = "/mnt/pi-nas/nav/photon-dump-planet.jsonl.zst"
DB_PATH = "/mnt/pi-nas/nav/wiki-index/data/wiki_index.db"
LOG_DIR = "/mnt/pi-nas/nav/wiki-index/logs"
CHECKPOINT_DIR = "/mnt/pi-nas/nav/wiki-index/data"
GEMINI_LOG = f"{LOG_DIR}/gemini_responses_wave2.jsonl"
# Checkpoint files (wave 2 specific)
EXTRACT_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_extract_checkpoint.txt"
RESOLVE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_resolve_checkpoint.txt"
VALIDATE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_validate_checkpoint.txt"
SUMMARIZE_CHECKPOINT = f"{CHECKPOINT_DIR}/wave2_summarize_checkpoint.txt"
# Single-pass line range for US + Canada
COMBINED_START = 53694616
COMBINED_END = 175406527
# ZIM endpoints
WIKIPEDIA_INTERNAL = "http://192.168.1.130:8430/wikipedia_en_all_maxi_2026-02"
WIKIVOYAGE_INTERNAL = "http://192.168.1.130:8430/wikivoyage_en_all_maxi_2026-03"
# Wikidata API
WIKIDATA_API = "https://www.wikidata.org/w/api.php"
WIKIDATA_BATCH_SIZE = 50
# Gemini
GEMINI_MODEL = "gemini-2.5-flash"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]
# Concurrency
VALIDATION_WORKERS = 4
SUMMARIZE_WORKERS = 5
# Memory limit (MB)
MAX_RSS_MB = 10240 # 10GB
# Circuit breaker
CIRCUIT_BREAKER_THRESHOLD = 50 # 50 consecutive 429s
CIRCUIT_BREAKER_PAUSE = 300 # 5 minutes
# Included types (same as wave 1)
INCLUDE_KEYS = {
"place": {"city", "town", "village", "hamlet", "suburb", "island", "islet",
"state", "county", "region", "locality"},
"natural": {"peak", "volcano", "bay", "beach", "cape", "cliff", "water",
"wetland", "wood", "glacier", "valley", "strait", "reef",
"hot_spring", "geyser", "cave_entrance"},
"waterway": {"river", "stream", "waterfall", "dam", "canal", "rapids"},
"water": {"lake", "pond", "reservoir", "lagoon"},
"boundary": {"protected_area", "national_park", "administrative"},
"leisure": {"nature_reserve", "park"},
"mountain_pass": None,
"landuse": {"cemetery"},
"historic": None,
"tourism": {"attraction", "viewpoint"},
}
# Travel-relevant types (get Wikivoyage resolution)
TRAVEL_TYPES = {
("place", "city"), ("place", "town"), ("place", "state"),
("place", "country"), ("place", "island"),
("boundary", "national_park"), ("boundary", "protected_area"),
("leisure", "nature_reserve"), ("leisure", "park"),
("tourism", "attraction"),
}
# =============================================================================
# LOGGING SETUP
# =============================================================================
def setup_logging():
Path(LOG_DIR).mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = f"{LOG_DIR}/wave2_{timestamp}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
log = setup_logging()
# =============================================================================
# MEMORY MONITORING
# =============================================================================
def check_memory(context=""):
"""Check RSS memory usage, abort if over budget."""
rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if sys.platform == 'darwin':
rss_mb = rss_kb / 1024 / 1024
else:
rss_mb = rss_kb / 1024
if rss_mb > MAX_RSS_MB:
log.error(f"RSS {rss_mb:.0f}MB exceeds {MAX_RSS_MB}MB budget at {context}, aborting")
sys.exit(1)
return rss_mb
# =============================================================================
# CHECKPOINTS
# =============================================================================
def write_checkpoint(path, *values):
"""Write checkpoint values to file."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
for v in values:
f.write(f"{v}\n")
log.info(f"Checkpoint written: {path}")
def read_checkpoint(path):
"""Read checkpoint values from file."""
if not Path(path).exists():
return None
with open(path, 'r') as f:
return [line.strip() for line in f.readlines()]
# =============================================================================
# DATABASE
# =============================================================================
def get_db():
return sqlite3.connect(DB_PATH)
def get_existing_place_names():
"""Get set of (place_name, osm_key, osm_value, county, state, country_code) from DB."""
conn = get_db()
c = conn.cursor()
c.execute("""
SELECT place_name, osm_key, osm_value,
COALESCE(county,''), COALESCE(state,''), country_code
FROM wiki_places
""")
existing = set(tuple(row) for row in c.fetchall())
conn.close()
return existing
# =============================================================================
# STAGE 1: EXTRACT (WIKIDATA ONLY, NO WIKIPEDIA)
# =============================================================================
def should_include(osm_key, osm_value):
"""Check if this type should be included."""
if osm_key not in INCLUDE_KEYS:
return False
allowed = INCLUDE_KEYS[osm_key]
return allowed is None or osm_value in allowed
def extract_from_jsonl():
"""Extract places with extra.wikidata but NO extra.wikipedia from JSONL."""
conn = get_db()
c = conn.cursor()
# Load existing places from wave 1 to skip
log.info("Loading existing place names from DB...")
existing = get_existing_place_names()
log.info(f"Found {len(existing):,} existing places to skip")
# Check for checkpoint
checkpoint = read_checkpoint(EXTRACT_CHECKPOINT)
if checkpoint:
resume_line = int(checkpoint[0])
log.info(f"Resuming extraction from line {resume_line:,}")
else:
resume_line = COMBINED_START
seen = set()
inserted = 0
skipped_has_wikipedia = 0
skipped_no_wikidata = 0
skipped_filtered = 0
skipped_no_name = 0
skipped_dupe = 0
skipped_existing = 0
log.info(f"Wave 2 extraction: lines {resume_line:,} to {COMBINED_END:,}")
log.info("Filtering: wikidata present AND wikipedia absent")
with open(JSONL_PATH, 'rb') as fh:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(fh) as reader:
import io
text_reader = io.TextIOWrapper(reader, encoding='utf-8')
line_num = 0
for line in text_reader:
line_num += 1
if line_num < resume_line:
if line_num % 10_000_000 == 0:
log.info(f" Seeking... line {line_num:,}")
continue
if line_num > COMBINED_END:
break
if line_num % 1_000_000 == 0:
rss = check_memory(f"line {line_num}")
log.info(f" Line {line_num:,}, inserted {inserted:,}, RSS {rss:.0f}MB")
conn.commit()
write_checkpoint(EXTRACT_CHECKPOINT, line_num)
try:
record = json.loads(line)
content = record.get("content", [{}])[0]
country_code = content.get("country_code", "")
if country_code not in ("us", "ca"):
continue
osm_key = content.get("osm_key", "")
osm_value = content.get("osm_value", "")
if not should_include(osm_key, osm_value):
skipped_filtered += 1
continue
extra = content.get("extra", {})
# Wave 2: require wikidata, reject if has wikipedia
wikidata_id = extra.get("wikidata")
if not wikidata_id:
skipped_no_wikidata += 1
continue
wiki_tag = extra.get("wikipedia")
if wiki_tag:
skipped_has_wikipedia += 1
continue
# Get name
name_obj = content.get("name", {})
name = name_obj.get("name:en") or name_obj.get("name")
if not name:
skipped_no_name += 1
continue
# Parse address
address = content.get("address", {})
state = address.get("state") or address.get("state:en")
county = address.get("county") or address.get("county:en")
# Dedup key
dedup_key = (name, osm_key, osm_value,
county or "", state or "", country_code)
# Skip if already in DB from wave 1
if dedup_key in existing:
skipped_existing += 1
continue
if dedup_key in seen:
skipped_dupe += 1
continue
seen.add(dedup_key)
# Get other fields
osm_id = f"{content.get('object_type', '')}{content.get('object_id', '')}"
importance = content.get("importance")
extra_fields = {k: v for k, v in extra.items()
if k not in ("wikipedia", "wikidata")}
extra_json = json.dumps(extra_fields) if extra_fields else None
# Insert (no wikipedia_title yet - will resolve via Wikidata)
c.execute("""
INSERT OR IGNORE INTO wiki_places
(place_name, osm_key, osm_value, county, state, country_code,
wikidata_id, osm_id, importance, extra_json, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'photon_wave2')
""", (name, osm_key, osm_value, county, state, country_code,
wikidata_id, osm_id, importance, extra_json))
if c.rowcount > 0:
inserted += 1
if inserted % 10000 == 0:
conn.commit()
except json.JSONDecodeError:
continue
except Exception as e:
log.error(f"Error on line {line_num}: {e}")
continue
conn.commit()
conn.close()
if Path(EXTRACT_CHECKPOINT).exists():
Path(EXTRACT_CHECKPOINT).unlink()
log.info("Extract checkpoint cleared (completed)")
log.info(f"Wave 2 extraction complete:")
log.info(f" Inserted: {inserted:,}")
log.info(f" Skipped (has wikipedia): {skipped_has_wikipedia:,}")
log.info(f" Skipped (no wikidata): {skipped_no_wikidata:,}")
log.info(f" Skipped (filtered type): {skipped_filtered:,}")
log.info(f" Skipped (no name): {skipped_no_name:,}")
log.info(f" Skipped (duplicate): {skipped_dupe:,}")
log.info(f" Skipped (existing wave1): {skipped_existing:,}")
# =============================================================================
# STAGE 2: RESOLVE WIKIPEDIA TITLES VIA WIKIDATA
# =============================================================================
def batch_wikidata_lookup(qids):
"""Batch lookup Wikidata Q-IDs to get wiki titles."""
if not qids:
return {}
params = {
"action": "wbgetentities",
"ids": "|".join(qids),
"props": "sitelinks",
"format": "json"
}
try:
resp = requests.get(WIKIDATA_API, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
results = {}
for qid, entity in data.get("entities", {}).items():
sitelinks = entity.get("sitelinks", {})
results[qid] = {
"enwiki": sitelinks.get("enwiki", {}).get("title"),
"enwikivoyage": sitelinks.get("enwikivoyage", {}).get("title")
}
return results
except Exception as e:
log.error(f"Wikidata API error: {e}")
return {}
def is_travel_type(osm_key, osm_value):
"""Check if this type should get Wikivoyage resolution."""
return (osm_key, osm_value) in TRAVEL_TYPES
def resolve_wikipedia_titles():
"""Resolve Wikipedia/Wikivoyage titles via Wikidata API for wave 2 records."""
conn = get_db()
c = conn.cursor()
# Get wave 2 records with wikidata_id but no wikipedia_title
c.execute("""
SELECT id, wikidata_id, osm_key, osm_value FROM wiki_places
WHERE source = 'photon_wave2'
AND wikidata_id IS NOT NULL
AND wikipedia_title IS NULL
""")
rows = c.fetchall()
if not rows:
log.info("No wave 2 records need Wikipedia resolution")
return
log.info(f"Resolving Wikipedia titles for {len(rows):,} wave 2 records via Wikidata...")
resolved_wiki = 0
resolved_voyage = 0
for i in range(0, len(rows), WIKIDATA_BATCH_SIZE):
batch = rows[i:i + WIKIDATA_BATCH_SIZE]
qid_to_row = {row[1]: row for row in batch}
qids = list(qid_to_row.keys())
results = batch_wikidata_lookup(qids)
for qid, titles in results.items():
row = qid_to_row[qid]
row_id = row[0]
osm_key = row[2]
osm_value = row[3]
wiki_title = titles.get("enwiki")
voyage_title = titles.get("enwikivoyage")
if wiki_title:
wiki_title = wiki_title.replace(' ', '_')
c.execute("UPDATE wiki_places SET wikipedia_title = ? WHERE id = ?",
(wiki_title, row_id))
resolved_wiki += 1
# Also set wikivoyage if travel type
if voyage_title and is_travel_type(osm_key, osm_value):
voyage_title = voyage_title.replace(' ', '_')
c.execute("UPDATE wiki_places SET wikivoyage_title = ? WHERE id = ?",
(voyage_title, row_id))
resolved_voyage += 1
if ((i // WIKIDATA_BATCH_SIZE + 1) % 20) == 0:
log.info(f" Processed {i + len(batch):,}/{len(rows):,} - "
f"wiki: {resolved_wiki:,}, voyage: {resolved_voyage:,}")
conn.commit()
write_checkpoint(RESOLVE_CHECKPOINT, i + len(batch), resolved_wiki, resolved_voyage)
time.sleep(0.1) # Be nice to Wikidata API
conn.commit()
conn.close()
if Path(RESOLVE_CHECKPOINT).exists():
Path(RESOLVE_CHECKPOINT).unlink()
log.info(f"Resolution complete:")
log.info(f" Wikipedia titles: {resolved_wiki:,}")
log.info(f" Wikivoyage titles: {resolved_voyage:,}")
# =============================================================================
# STAGE 3: VALIDATE
# =============================================================================
def validate_title_worker(args):
"""Worker function for thread pool."""
row_id, title, base_url = args
if not title:
return (row_id, False)
title = title.replace(" ", "_")
if "NONE" in title.upper() or "(disambiguation" in title.lower() or len(title) > 100:
return (row_id, False)
url = f"{base_url}/A/{title}"
try:
resp = requests.head(url, allow_redirects=True, timeout=10)
return (row_id, resp.status_code == 200)
except Exception:
return (row_id, False)
def validate_wikipedia_titles():
"""Validate Wikipedia titles against ZIM for wave 2 records."""
conn = get_db()
c = conn.cursor()
c.execute("""
SELECT id, wikipedia_title FROM wiki_places
WHERE source = 'photon_wave2'
AND wikipedia_title IS NOT NULL
AND wikipedia_exists IS NULL
""")
rows = c.fetchall()
if not rows:
log.info("No wave 2 Wikipedia titles to validate")
return
log.info(f"Validating {len(rows):,} Wikipedia titles...")
valid_count = 0
invalid_count = 0
work_items = [(row_id, title, WIKIPEDIA_INTERNAL) for row_id, title in rows]
with ThreadPoolExecutor(max_workers=VALIDATION_WORKERS) as executor:
futures = {executor.submit(validate_title_worker, item): item for item in work_items}
for i, future in enumerate(as_completed(futures)):
result = future.result()
if result is None:
continue
row_id, exists = result
c.execute("""
UPDATE wiki_places
SET wikipedia_exists = ?, zim_validated_at = ?
WHERE id = ?
""", (1 if exists else 0, datetime.now().isoformat(), row_id))
if exists:
valid_count += 1
else:
invalid_count += 1
if (i + 1) % 1000 == 0:
log.info(f" Validated {i+1:,}/{len(rows):,} - valid: {valid_count:,}")
conn.commit()
conn.commit()
conn.close()
log.info(f"Wikipedia validation complete: valid={valid_count:,}, invalid={invalid_count:,}")
def validate_wikivoyage_titles():
"""Validate Wikivoyage titles against ZIM for wave 2 records."""
conn = get_db()
c = conn.cursor()
c.execute("""
SELECT id, wikivoyage_title FROM wiki_places
WHERE source = 'photon_wave2'
AND wikivoyage_title IS NOT NULL
AND wikivoyage_exists IS NULL
""")
rows = c.fetchall()
if not rows:
log.info("No wave 2 Wikivoyage titles to validate")
return
log.info(f"Validating {len(rows):,} Wikivoyage titles...")
valid_count = 0
invalid_count = 0
work_items = [(row_id, title, WIKIVOYAGE_INTERNAL) for row_id, title in rows]
with ThreadPoolExecutor(max_workers=VALIDATION_WORKERS) as executor:
futures = {executor.submit(validate_title_worker, item): item for item in work_items}
for i, future in enumerate(as_completed(futures)):
result = future.result()
if result is None:
continue
row_id, exists = result
c.execute("""
UPDATE wiki_places
SET wikivoyage_exists = ?, zim_validated_at = ?
WHERE id = ?
""", (1 if exists else 0, datetime.now().isoformat(), row_id))
if exists:
valid_count += 1
else:
invalid_count += 1
if (i + 1) % 500 == 0:
conn.commit()
conn.commit()
conn.close()
log.info(f"Wikivoyage validation complete: valid={valid_count:,}, invalid={invalid_count:,}")
# =============================================================================
# STAGE 4: SUMMARY GENERATION
# =============================================================================
def fetch_article_content(title, base_url, max_chars=8000):
"""Fetch and extract text content from ZIM article."""
if not title:
return None
title = title.replace(" ", "_")
if "NONE" in title.upper() or "(disambiguation" in title.lower() or len(title) > 100:
return None
url = f"{base_url}/A/{title}"
try:
resp = requests.get(url, timeout=30)
if resp.status_code != 200:
return None
soup = BeautifulSoup(resp.text, 'html.parser')
for tag in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
tag.decompose()
text = soup.get_text(separator=' ', strip=True)
if len(text) > max_chars:
text = text[:max_chars] + "..."
return text
except Exception as e:
log.error(f"Error fetching {title}: {e}")
return None
def build_summary_prompt(place, wiki_content=None, voyage_content=None):
"""Build summary prompt."""
prompt_parts = [
"Generate a 3-4 sentence summary for a map app user who tapped on this location.",
"",
f"Place: {place['place_name']} ({place['osm_key']}:{place['osm_value']})",
f"Location: {place['county'] or 'N/A'}, {place['state'] or 'N/A'}, {place['country_code'].upper()}",
""
]
if wiki_content:
prompt_parts.extend([
f"=== WIKIPEDIA ARTICLE: {place['wikipedia_title']} ===",
wiki_content,
""
])
if voyage_content:
prompt_parts.extend([
f"=== WIKIVOYAGE ARTICLE: {place['wikivoyage_title']} ===",
voyage_content,
""
])
prompt_parts.extend([
"Instructions:",
"- If either article appears to be about a DIFFERENT place, ignore it and provide",
" the correct title if you know it.",
"",
"- Write based on place type:",
" * Settlements: what's notable, regional context, key attractions",
" * Natural features: terrain, activities, access, best season",
" * Parks/reserves: what you'll see, trails, camping, logistics",
" * Historic sites: significance, what remains, visiting info",
"",
"- Engaging but informative tone.",
"",
"Response format (REQUIRED):",
"WIKIPEDIA_TITLE: <verified or corrected title, or NONE>",
"WIKIVOYAGE_TITLE: <verified or corrected title, or NONE>",
"SUMMARY: <your 3-4 sentence summary>",
"POPULATION: <number if mentioned, or NONE>",
])
if is_travel_type(place['osm_key'], place['osm_value']) and not voyage_content:
prompt_parts.insert(-5, "")
prompt_parts.insert(-5, "If this place has a Wikivoyage article, include the title.")
return "\n".join(prompt_parts)
def parse_gemini_response(text):
"""Parse Gemini response."""
result = {
"wikipedia_title": None,
"wikivoyage_title": None,
"summary": None,
"population": None
}
lines = text.strip().split('\n')
summary_lines = []
in_summary = False
for line in lines:
line_stripped = line.strip()
if line_stripped.startswith("WIKIPEDIA_TITLE:"):
val = line_stripped.split(":", 1)[1].strip()
result["wikipedia_title"] = None if val.upper().startswith("NONE") else val
in_summary = False
elif line_stripped.startswith("WIKIVOYAGE_TITLE:"):
val = line_stripped.split(":", 1)[1].strip()
result["wikivoyage_title"] = None if val.upper().startswith("NONE") else val
in_summary = False
elif line_stripped.startswith("SUMMARY:"):
first_part = line_stripped.split(":", 1)[1].strip()
if first_part:
summary_lines.append(first_part)
in_summary = True
elif line_stripped.startswith("POPULATION:"):
in_summary = False
val = line_stripped.split(":", 1)[1].strip()
result["population"] = None if val.upper().startswith("NONE") else val
elif in_summary and line_stripped:
summary_lines.append(line_stripped)
if summary_lines:
result["summary"] = " ".join(summary_lines)
return result
def log_gemini_response(place_id, prompt, response, parsed, output_tokens=None):
"""Log Gemini response."""
Path(GEMINI_LOG).parent.mkdir(parents=True, exist_ok=True)
with open(GEMINI_LOG, 'a') as f:
f.write(json.dumps({
"timestamp": datetime.now().isoformat(),
"place_id": place_id,
"prompt_length": len(prompt),
"output_tokens": output_tokens,
"response": response,
"parsed": parsed
}) + "\n")
def is_rate_limit_error(error):
"""Check if error is a rate limit error."""
error_str = str(error).lower()
return (
"429" in error_str or
"resource_exhausted" in error_str or
("rate" in error_str and "limit" in error_str) or
"quota" in error_str
)
def summarize_worker(args):
"""Worker function for summary generation."""
place, client, circuit_breaker = args
wiki_content = None
voyage_content = None
if place["wikipedia_exists"]:
wiki_content = fetch_article_content(place["wikipedia_title"], WIKIPEDIA_INTERNAL)
if place["wikivoyage_exists"]:
voyage_content = fetch_article_content(place["wikivoyage_title"], WIKIVOYAGE_INTERNAL)
if not wiki_content and not voyage_content:
return {
"place_id": place["id"],
"success": False,
"error": "no_content",
"error_message": f"No content for place {place['id']}"
}
prompt = build_summary_prompt(place, wiki_content, voyage_content)
if wiki_content and voyage_content:
summary_source = "wikipedia+wikivoyage"
elif wiki_content:
summary_source = "wikipedia"
else:
summary_source = "wikivoyage"
response_text = None
output_tokens = None
for attempt in range(MAX_RETRIES):
with circuit_breaker["lock"]:
if circuit_breaker["abort"]:
return {
"place_id": place["id"],
"success": False,
"error": "circuit_breaker_abort",
"error_message": "Circuit breaker aborted"
}
try:
response = client.models.generate_content(
model=GEMINI_MODEL,
contents=prompt,
config=types.GenerateContentConfig(
temperature=0.3,
max_output_tokens=3000
)
)
response_text = response.text
if hasattr(response, 'usage_metadata') and response.usage_metadata:
output_tokens = getattr(response.usage_metadata, 'candidates_token_count', None)
with circuit_breaker["lock"]:
circuit_breaker["consecutive_429"] = 0
break
except Exception as e:
if is_rate_limit_error(e):
with circuit_breaker["lock"]:
circuit_breaker["consecutive_429"] += 1
consecutive = circuit_breaker["consecutive_429"]
log.warning(f"Rate limit ({consecutive} consecutive) for {place['id']}: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAYS[attempt])
continue
else:
with circuit_breaker["lock"]:
circuit_breaker["consecutive_429"] = 0
if attempt < MAX_RETRIES - 1:
log.warning(f"Gemini retry {attempt+1} for {place['id']}: {e}")
time.sleep(RETRY_DELAYS[attempt])
else:
return {
"place_id": place["id"],
"success": False,
"error": type(e).__name__,
"error_message": str(e),
"is_rate_limit": is_rate_limit_error(e)
}
if not response_text:
return {
"place_id": place["id"],
"success": False,
"error": "no_response",
"error_message": "No response from Gemini"
}
parsed = parse_gemini_response(response_text)
log_gemini_response(place['id'], prompt, response_text, parsed, output_tokens)
if not parsed["summary"]:
return {
"place_id": place["id"],
"success": False,
"error": "parse_failed",
"error_message": "No summary parsed"
}
return {
"place_id": place["id"],
"success": True,
"summary": parsed["summary"],
"summary_source": summary_source,
"population": parsed["population"],
"wikipedia_title": parsed["wikipedia_title"],
"wikivoyage_title": parsed["wikivoyage_title"]
}
def generate_summaries(dry_run=False, workers=None):
"""Generate summaries for wave 2 validated places."""
if workers is None:
workers = SUMMARIZE_WORKERS
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
env_path = Path(__file__).parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
if line.startswith("GEMINI_API_KEY="):
api_key = line.split("=", 1)[1].strip().strip('"\'')
break
if not api_key:
log.error("GEMINI_API_KEY not set")
return
client = genai.Client(api_key=api_key)
conn = get_db()
c = conn.cursor()
c.execute("""
SELECT id, place_name, osm_key, osm_value, county, state, country_code,
wikipedia_title, wikivoyage_title, wikipedia_exists, wikivoyage_exists,
wikidata_id
FROM wiki_places
WHERE source = 'photon_wave2'
AND (wikipedia_exists = 1 OR wikivoyage_exists = 1)
AND summary IS NULL
ORDER BY id
""")
rows = c.fetchall()
if not rows:
log.info("No wave 2 places need summaries")
return
if dry_run:
rows = rows[:5]
log.info(f"DRY RUN: Processing only {len(rows)} places with {workers} workers")
else:
log.info(f"Generating summaries for {len(rows):,} wave 2 places with {workers} workers...")
places = []
for row in rows:
places.append({
"id": row[0],
"place_name": row[1],
"osm_key": row[2],
"osm_value": row[3],
"county": row[4],
"state": row[5],
"country_code": row[6],
"wikipedia_title": row[7],
"wikivoyage_title": row[8],
"wikipedia_exists": row[9],
"wikivoyage_exists": row[10],
"wikidata_id": row[11]
})
circuit_breaker = {
"lock": threading.Lock(),
"consecutive_429": 0,
"abort": False
}
circuit_breaker_paused = False
processed = 0
success = 0
errors = 0
last_place_id = 0
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(summarize_worker, (place, client, circuit_breaker)): place
for place in places
}
for future in as_completed(futures):
result = future.result()
processed += 1
last_place_id = result["place_id"]
if processed % 500 == 0:
rss = check_memory(f"summary {processed}")
log.info(f" Memory: RSS {rss:.0f}MB")
if result["success"]:
now = datetime.now().isoformat()
c.execute("""
UPDATE wiki_places SET
summary = ?,
summary_source = ?,
wiki_population = ?,
wikipedia_title = COALESCE(?, wikipedia_title),
wikivoyage_title = COALESCE(?, wikivoyage_title),
summary_generated_at = ?,
updated_at = ?
WHERE id = ?
""", (
result["summary"],
result["summary_source"],
result["population"],
result["wikipedia_title"],
result["wikivoyage_title"],
now,
now,
result["place_id"]
))
success += 1
else:
if result["error"] != "circuit_breaker_abort":
log.warning(f"Failed {result['place_id']}: {result['error_message']}")
if result["error"] not in ("no_content", "parse_failed"):
c.execute("""
INSERT INTO wiki_failures (place_id, wave, stage, error_type, error_message)
VALUES (?, 2, 'summarize', ?, ?)
""", (result["place_id"], result["error"], result["error_message"]))
errors += 1
# Circuit breaker check
if result.get("is_rate_limit"):
with circuit_breaker["lock"]:
if circuit_breaker["consecutive_429"] >= CIRCUIT_BREAKER_THRESHOLD:
if not circuit_breaker_paused:
log.warning(f"CIRCUIT BREAKER: Pausing {CIRCUIT_BREAKER_PAUSE//60} minutes...")
circuit_breaker_paused = True
circuit_breaker["consecutive_429"] = 0
time.sleep(CIRCUIT_BREAKER_PAUSE)
circuit_breaker_paused = False
log.info("CIRCUIT BREAKER: Resuming...")
if processed % 50 == 0:
conn.commit()
write_checkpoint(SUMMARIZE_CHECKPOINT, last_place_id, success, errors)
if processed % 100 == 0:
log.info(f" Processed {processed:,}/{len(places):,} - success: {success:,}, errors: {errors:,}")
with circuit_breaker["lock"]:
if circuit_breaker["abort"]:
log.error("Aborting due to circuit breaker")
break
conn.commit()
conn.close()
if Path(SUMMARIZE_CHECKPOINT).exists():
Path(SUMMARIZE_CHECKPOINT).unlink()
log.info(f"Wave 2 summary generation complete:")
log.info(f" Success: {success:,}")
log.info(f" Errors: {errors:,}")
# =============================================================================
# STAGE 5: RE-VALIDATE CORRECTED TITLES
# =============================================================================
def revalidate_corrected_titles():
"""Re-validate titles corrected by Gemini."""
conn = get_db()
c = conn.cursor()
c.execute("""
SELECT id, wikipedia_title, wikivoyage_title FROM wiki_places
WHERE source = 'photon_wave2'
AND summary_generated_at IS NOT NULL
AND (zim_validated_at IS NULL OR zim_validated_at < summary_generated_at)
""")
rows = c.fetchall()
if not rows:
log.info("No wave 2 corrected titles need re-validation")
return
log.info(f"Re-validating {len(rows):,} wave 2 corrected titles...")
wiki_revalidated = 0
voyage_revalidated = 0
for row_id, wiki_title, voyage_title in rows:
now = datetime.now().isoformat()
if wiki_title:
result = validate_title_worker((row_id, wiki_title, WIKIPEDIA_INTERNAL))
if result:
_, exists = result
c.execute("""
UPDATE wiki_places SET wikipedia_exists = ?, zim_validated_at = ?
WHERE id = ?
""", (1 if exists else 0, now, row_id))
wiki_revalidated += 1
if voyage_title:
result = validate_title_worker((row_id, voyage_title, WIKIVOYAGE_INTERNAL))
if result:
_, exists = result
c.execute("""
UPDATE wiki_places SET wikivoyage_exists = ?, zim_validated_at = ?
WHERE id = ?
""", (1 if exists else 0, now, row_id))
voyage_revalidated += 1
if (wiki_revalidated + voyage_revalidated) % 500 == 0:
conn.commit()
log.info(f" Re-validated {wiki_revalidated + voyage_revalidated:,}")
conn.commit()
conn.close()
log.info(f"Re-validation complete: {wiki_revalidated:,} Wikipedia, {voyage_revalidated:,} Wikivoyage")
# =============================================================================
# MAIN
# =============================================================================
def main():
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
command = sys.argv[1].lower()
if command == "extract":
log.info("=== WAVE 2 STAGE 1: EXTRACT ===")
extract_from_jsonl()
elif command == "resolve":
log.info("=== WAVE 2 STAGE 2: RESOLVE ===")
resolve_wikipedia_titles()
elif command == "validate":
log.info("=== WAVE 2 STAGE 3: VALIDATE ===")
validate_wikipedia_titles()
validate_wikivoyage_titles()
elif command == "summarize":
dry_run = "--dry-run" in sys.argv
workers = None
for arg in sys.argv:
if arg.startswith("--workers="):
workers = int(arg.split("=")[1])
if dry_run:
log.info("=== WAVE 2 STAGE 4: SUMMARIZE (DRY RUN) ===")
else:
log.info("=== WAVE 2 STAGE 4: SUMMARIZE ===")
generate_summaries(dry_run=dry_run, workers=workers)
elif command == "revalidate":
log.info("=== WAVE 2 STAGE 5: RE-VALIDATE ===")
revalidate_corrected_titles()
elif command == "all":
log.info("=== WAVE 2: ALL STAGES ===")
log.info("=== STAGE 1: EXTRACT ===")
extract_from_jsonl()
log.info("=== STAGE 2: RESOLVE ===")
resolve_wikipedia_titles()
log.info("=== STAGE 3: VALIDATE ===")
validate_wikipedia_titles()
validate_wikivoyage_titles()
log.info("=== STAGE 4: SUMMARIZE ===")
generate_summaries()
log.info("=== STAGE 5: RE-VALIDATE ===")
revalidate_corrected_titles()
else:
print(f"Unknown command: {command}")
print(__doc__)
sys.exit(1)
if __name__ == "__main__":
main()