diff --git a/lib/google_places.py b/lib/google_places.py deleted file mode 100644 index 8272b81..0000000 --- a/lib/google_places.py +++ /dev/null @@ -1,397 +0,0 @@ -""" -Google Places (New) API client for tertiary enrichment. - -Searches for business POIs and fetches details (opening hours, phone, website) -when OSM + Overture data is incomplete. Uses field masks to minimize cost. - -API docs: https://developers.google.com/maps/documentation/places/web-service -""" -import json -import os -import sqlite3 -import time -from datetime import date, timezone, datetime - -import requests - -from .utils import setup_logging - -logger = setup_logging('recon.google_places') - -API_BASE = 'https://places.googleapis.com/v1' -DEFAULT_DAILY_CAP = 500 -REQUEST_TIMEOUT = 3 # seconds - -# Google day index → OSM abbreviation -_DAY_ABBR = ['Su', 'Mo', 'Tu', 'We', 'Th', 'Fr', 'Sa'] - -_db_conn = None - - -def _get_db(): - """Return a module-level SQLite connection (lazy init).""" - global _db_conn - if _db_conn is not None: - return _db_conn - - db_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data') - db_path = os.path.join(db_dir, 'place_cache.db') - _db_conn = sqlite3.connect(db_path, check_same_thread=False) - _db_conn.execute("PRAGMA journal_mode=WAL") - _db_conn.execute("PRAGMA synchronous=NORMAL") - # Ensure google_api_calls table exists - _db_conn.execute(""" - CREATE TABLE IF NOT EXISTS google_api_calls ( - call_date TEXT PRIMARY KEY, - call_count INTEGER NOT NULL DEFAULT 0 - ) - """) - _db_conn.commit() - return _db_conn - - -def _get_api_key(): - """Return the Google Places API key from environment.""" - key = os.environ.get('GOOGLE_PLACES_API_KEY') - if not key: - logger.error("GOOGLE_PLACES_API_KEY not set in environment") - return key - - -def _get_daily_cap(): - """Return the daily API call cap (configurable via deployment config).""" - try: - from .deployment_config import get_deployment_config - config = get_deployment_config() - return config.get('google_places', {}).get('daily_cap', DEFAULT_DAILY_CAP) - except Exception: - return DEFAULT_DAILY_CAP - - -# ── Daily call counter ────────────────────────────────────────────────── - -def check_daily_cap(): - """Return True if under daily cap, False if limit reached.""" - db = _get_db() - today = date.today().isoformat() - row = db.execute( - "SELECT call_count FROM google_api_calls WHERE call_date = ?", (today,) - ).fetchone() - current = row[0] if row else 0 - cap = _get_daily_cap() - if current >= cap: - logger.info(f"google_places: daily_cap_reached count={current} cap={cap}") - return False - return True - - -def get_daily_count(): - """Return today's API call count.""" - db = _get_db() - today = date.today().isoformat() - row = db.execute( - "SELECT call_count FROM google_api_calls WHERE call_date = ?", (today,) - ).fetchone() - return row[0] if row else 0 - - -def increment_call_counter(): - """Atomically increment today's API call counter.""" - db = _get_db() - today = date.today().isoformat() - db.execute(""" - INSERT INTO google_api_calls (call_date, call_count) VALUES (?, 1) - ON CONFLICT(call_date) DO UPDATE SET call_count = call_count + 1 - """, (today,)) - db.commit() - - -def _set_daily_count_to_cap(): - """Set today's counter to the cap value (soft-stop on quota error).""" - db = _get_db() - today = date.today().isoformat() - cap = _get_daily_cap() - db.execute(""" - INSERT INTO google_api_calls (call_date, call_count) VALUES (?, ?) - ON CONFLICT(call_date) DO UPDATE SET call_count = ? - """, (today, cap, cap)) - db.commit() - - -# ── Google Places cache (on place_cache table) ───────────────────────── - -def cache_get_google(osm_type, osm_id): - """Return (google_place_id, google_data_dict) or (None, None).""" - db = _get_db() - row = db.execute( - "SELECT google_place_id, google_data FROM place_cache WHERE osm_type=? AND osm_id=?", - (osm_type, osm_id) - ).fetchone() - if row and row[0]: - data = None - if row[1]: - try: - data = json.loads(row[1]) - except (json.JSONDecodeError, TypeError): - pass - return row[0], data - return None, None - - -def cache_put_google(osm_type, osm_id, place_id, data): - """Store Google Places data for a cache entry (UPSERT on google columns).""" - db = _get_db() - now = int(time.time()) - db.execute(""" - INSERT INTO place_cache (osm_type, osm_id, data, source, cached_at, google_place_id, google_data, google_fetched_at) - VALUES (?, ?, '', 'pending', 0, ?, ?, ?) - ON CONFLICT(osm_type, osm_id) DO UPDATE SET - google_place_id = excluded.google_place_id, - google_data = excluded.google_data, - google_fetched_at = excluded.google_fetched_at - """, (osm_type, osm_id, place_id, json.dumps(data) if data else None, now)) - db.commit() - - -# ── API calls ─────────────────────────────────────────────────────────── - -def search_place(name, lat, lon, radius_m=200): - """ - Search Google Places (New) for a business by name + location. - Returns the Google Place ID of the best match, or None. - """ - key = _get_api_key() - if not key: - return None - - if not check_daily_cap(): - return None - - try: - resp = requests.post( - f'{API_BASE}/places:searchText', - headers={ - 'Content-Type': 'application/json', - 'X-Goog-Api-Key': key, - 'X-Goog-FieldMask': 'places.id,places.displayName,places.location', - }, - json={ - 'textQuery': name, - 'locationBias': { - 'circle': { - 'center': {'latitude': lat, 'longitude': lon}, - 'radius': float(radius_m), - } - }, - 'maxResultCount': 1, - }, - timeout=REQUEST_TIMEOUT, - ) - - increment_call_counter() - - if resp.status_code == 429: - logger.warning("google_places: action=search place=%s result=rate_limited", name) - _set_daily_count_to_cap() - return None - - if resp.status_code == 403: - logger.error("google_places: action=search place=%s result=forbidden (invalid key?)", name) - return None - - if resp.status_code != 200: - logger.warning("google_places: action=search place=%s result=error status=%d", name, resp.status_code) - return None - - data = resp.json() - places = data.get('places', []) - if not places: - logger.info("google_places: action=search place=%s result=miss", name) - return None - - place_id = places[0].get('id') - display = places[0].get('displayName', {}).get('text', '?') - logger.info("google_places: action=search place=%s result=hit google_name=%s id=%s", name, display, place_id) - return place_id - - except requests.exceptions.Timeout: - logger.warning("google_places: action=search place=%s result=timeout", name) - return None - except Exception as e: - logger.error("google_places: action=search place=%s result=error err=%s", name, e) - return None - - -def get_place_details(place_id): - """ - Fetch details for a Google Place ID. - Returns dict with {opening_hours, phone_number, website} or None. - """ - key = _get_api_key() - if not key: - return None - - if not check_daily_cap(): - return None - - try: - resp = requests.get( - f'{API_BASE}/places/{place_id}', - headers={ - 'X-Goog-Api-Key': key, - 'X-Goog-FieldMask': 'regularOpeningHours,internationalPhoneNumber,websiteUri', - }, - timeout=REQUEST_TIMEOUT, - ) - - increment_call_counter() - - if resp.status_code == 429: - logger.warning("google_places: action=details id=%s result=rate_limited", place_id) - _set_daily_count_to_cap() - return None - - if resp.status_code != 200: - logger.warning("google_places: action=details id=%s result=error status=%d", place_id, resp.status_code) - return None - - data = resp.json() - result = { - 'opening_hours': None, - 'opening_hours_raw': None, - 'phone_number': None, - 'website': None, - } - - # Phone - phone = data.get('internationalPhoneNumber') - if phone: - result['phone_number'] = phone.replace(' ', '').replace('-', '') - - # Website - result['website'] = data.get('websiteUri') - - # Opening hours - hours = data.get('regularOpeningHours') - if hours: - # Try OSM-compatible format from periods - periods = hours.get('periods', []) - if periods: - osm_str = _periods_to_osm(periods) - if osm_str: - result['opening_hours'] = osm_str - - # Fallback: weekday descriptions (human-readable) - if not result['opening_hours']: - descriptions = hours.get('weekdayDescriptions') - if descriptions: - result['opening_hours_raw'] = descriptions - - logger.info("google_places: action=details id=%s result=hit hours=%s phone=%s website=%s", - place_id, - 'yes' if result['opening_hours'] or result['opening_hours_raw'] else 'no', - 'yes' if result['phone_number'] else 'no', - 'yes' if result['website'] else 'no') - return result - - except requests.exceptions.Timeout: - logger.warning("google_places: action=details id=%s result=timeout", place_id) - return None - except Exception as e: - logger.error("google_places: action=details id=%s result=error err=%s", place_id, e) - return None - - -# ── Opening hours conversion ──────────────────────────────────────────── - -def _periods_to_osm(periods): - """ - Convert Google Places periods array to OSM opening_hours string. - - Google periods: [{"open": {"day": 0-6, "hour": H, "minute": M}, - "close": {"day": 0-6, "hour": H, "minute": M}}, ...] - Where day 0 = Sunday. - - OSM format: "Mo-Fr 06:00-23:00; Sa-Su 07:00-23:00" - """ - if not periods: - return None - - # Check for 24/7: single period with no close, or open 00:00 close 00:00 next day - if len(periods) == 1: - p = periods[0] - o = p.get('open', {}) - c = p.get('close') - if c is None and o.get('hour', 0) == 0 and o.get('minute', 0) == 0: - return '24/7' - - # Build a map: day_index → "HH:MM-HH:MM" - day_hours = {} # day_index → time_range string - for p in periods: - o = p.get('open', {}) - c = p.get('close', {}) - day = o.get('day', 0) - open_time = f"{o.get('hour', 0):02d}:{o.get('minute', 0):02d}" - - if c: - close_time = f"{c.get('hour', 0):02d}:{c.get('minute', 0):02d}" - # Handle midnight closing (00:00 means end of day) - if close_time == '00:00': - close_time = '24:00' - else: - close_time = '24:00' - - time_range = f"{open_time}-{close_time}" - - # A day can have multiple periods (e.g., lunch break) - if day in day_hours: - day_hours[day] = day_hours[day] + ',' + time_range - else: - day_hours[day] = time_range - - if not day_hours: - return None - - # Check if all 7 days have same hours - unique_ranges = set(day_hours.values()) - if len(day_hours) == 7 and len(unique_ranges) == 1: - hours = unique_ranges.pop() - if hours == '00:00-24:00': - return '24/7' - return hours # implicit "every day" - - # Group consecutive days with same hours - # Reorder to OSM convention: Mo(1) Tu(2) We(3) Th(4) Fr(5) Sa(6) Su(0) - osm_day_order = [1, 2, 3, 4, 5, 6, 0] - groups = [] - current_days = [] - current_hours = None - - for day_idx in osm_day_order: - hours = day_hours.get(day_idx) - if hours == current_hours: - current_days.append(day_idx) - else: - if current_days and current_hours: - groups.append((current_days, current_hours)) - current_days = [day_idx] - current_hours = hours - - if current_days and current_hours: - groups.append((current_days, current_hours)) - - if not groups: - return None - - # Format each group - parts = [] - for days, hours in groups: - if len(days) == 1: - day_str = _DAY_ABBR[days[0]] - elif len(days) == 2: - day_str = f"{_DAY_ABBR[days[0]]},{_DAY_ABBR[days[1]]}" - else: - day_str = f"{_DAY_ABBR[days[0]]}-{_DAY_ABBR[days[-1]]}" - parts.append(f"{day_str} {hours}") - - return '; '.join(parts) diff --git a/lib/place_detail.py b/lib/place_detail.py index 46aa8b0..6f6f1ba 100644 --- a/lib/place_detail.py +++ b/lib/place_detail.py @@ -1,291 +1,17 @@ """ -Place detail proxy — local Nominatim first, Overpass API fallback, SQLite cache. -Overture Maps enrichment layer fills sparse extratags (phone, website, brand). +Wiki-index lookup for place enrichment. -Provides get_place_detail(osm_type, osm_id) which returns a cleaned dict -matching the response shape for /api/place//. +Provides lookup_wiki_index(wikidata_id, name, country_code) — a pure read of the +local wiki_index.db, used by the /api/wiki-enrich endpoint (navi-places +HTTP-fetches wiki enrichment instead of reading the 2.1 GB DB directly). """ -import json import os import sqlite3 -import time -import requests as http_requests - -from .osm_categories import humanize_category from .utils import setup_logging logger = setup_logging('recon.place_detail') -NOMINATIM_URL = "http://localhost:8010/details.php" -OVERPASS_URL = "https://overpass-api.de/api/interpreter" -OVERPASS_UA = "Navi/1.0 (forge.echo6.co/matt/recon)" -VALID_OSM_TYPES = {"N", "W", "R"} - -_db_conn = None - - -# ── SQLite cache ──────────────────────────────────────────────────────── - -def _get_db(): - """Return a module-level SQLite connection (lazy init).""" - global _db_conn - if _db_conn is not None: - return _db_conn - - db_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data') - os.makedirs(db_dir, exist_ok=True) - db_path = os.path.join(db_dir, 'place_cache.db') - - _db_conn = sqlite3.connect(db_path, check_same_thread=False) - _db_conn.execute("PRAGMA journal_mode=WAL") - _db_conn.execute("PRAGMA synchronous=NORMAL") - _db_conn.execute(""" - CREATE TABLE IF NOT EXISTS place_cache ( - osm_type TEXT NOT NULL, - osm_id INTEGER NOT NULL, - data TEXT NOT NULL, - source TEXT NOT NULL, - cached_at INTEGER NOT NULL, - PRIMARY KEY (osm_type, osm_id) - ) - """) - _db_conn.commit() - logger.info(f"Place cache DB ready at {db_path}") - return _db_conn - - -def cache_get(osm_type, osm_id): - """Return cached place dict or None.""" - db = _get_db() - row = db.execute( - "SELECT data FROM place_cache WHERE osm_type=? AND osm_id=?", - (osm_type, osm_id) - ).fetchone() - if row: - try: - result = json.loads(row[0]) - result['source'] = 'cache' - return result - except (json.JSONDecodeError, TypeError): - pass - return None - - -def cache_put(osm_type, osm_id, data, source): - """Store a place detail result in the cache (preserves google columns).""" - db = _get_db() - now = int(time.time()) - db.execute(""" - INSERT INTO place_cache (osm_type, osm_id, data, source, cached_at) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(osm_type, osm_id) DO UPDATE SET - data = excluded.data, - source = excluded.source, - cached_at = excluded.cached_at - """, (osm_type, osm_id, json.dumps(data), source, now)) - db.commit() - - -# ── Overture enrichment ───────────────────────────────────────────────── - -def _enrich_with_overture(result, osm_type, osm_id): - """ - Attempt to enrich a place result with Overture Maps data. - Fills sparse extratags (phone, website, brand) without overwriting existing values. - Returns the (possibly enriched) result dict. - """ - try: - from .deployment_config import get_deployment_config - deploy_config = get_deployment_config() - features = deploy_config.get('features', {}) - if not features.get('has_overture_enrichment', False): - return result - except Exception: - return result - - try: - from .overture import find_by_osm_id, find_by_coords_and_name - except ImportError: - logger.debug("Overture module not available") - return result - - enrichment = None - match_method = None - - # Strategy 1: OSM cross-reference (exact) - enrichment = find_by_osm_id(osm_type, osm_id) - if enrichment: - match_method = 'osm_xref' - - # Strategy 2: Coordinate + name fuzzy (fallback) - if not enrichment and result.get('centroid') and result.get('name'): - centroid = result['centroid'] - if centroid.get('lat') and centroid.get('lon'): - enrichment = find_by_coords_and_name( - centroid['lat'], centroid['lon'], result['name'] - ) - if enrichment: - match_method = 'coord_name_fuzzy' - - if not enrichment: - return result - - # Fill sparse extratags (never overwrite existing non-null values) - extratags = result.get('extratags', {}) - fill_map = [ - ('phone', 'phone'), - ('website', 'website'), - ('brand', 'brand_name'), - ('brand:wikidata', 'brand_wikidata'), - ] - for osm_key, overture_key in fill_map: - if not extratags.get(osm_key) and enrichment.get(overture_key): - extratags[osm_key] = enrichment[overture_key] - result['extratags'] = extratags - - # Add source metadata - result['sources'] = { - 'primary': result.get('source', 'unknown'), - 'enrichment': 'overture', - 'overture_match_method': match_method, - 'overture_gers_id': enrichment.get('gers_id'), - 'overture_confidence': enrichment.get('confidence'), - 'overture_basic_category': enrichment.get('basic_category'), - } - - logger.debug(f"Overture enrichment for {osm_type}/{osm_id}: {match_method}") - return result - - - -# ── Google Places enrichment (tertiary, gap-fill only) ────────────── - -# Business POI classes eligible for Google enrichment -_BUSINESS_CLASSES = {'amenity', 'shop', 'tourism', 'leisure', 'office', 'craft'} - -# Fields Google can fill -_GOOGLE_GAP_FIELDS = ('opening_hours', 'phone', 'website') - - -def _enrich_with_google(result, osm_type, osm_id): - """ - Tertiary enrichment via Google Places (New) API. - Only fires for business-type POIs when opening_hours, phone, or website - are still missing after OSM + Overture enrichment. - Fills only empty fields — never overwrites existing values. - """ - # Check feature flag - try: - from .deployment_config import get_deployment_config - deploy_config = get_deployment_config() - features = deploy_config.get('features', {}) - if not features.get('has_google_places_enrichment', False): - return result - except Exception: - return result - - # Only enrich business-type POIs - poi_class = result.get('class', '') - if poi_class not in _BUSINESS_CLASSES: - return result - - # Check if any gap fields are missing - extratags = result.get('extratags', {}) - gaps = [f for f in _GOOGLE_GAP_FIELDS if not extratags.get(f)] - if not gaps: - logger.debug(f"google_places: skip {osm_type}/{osm_id} — no gaps") - return result - - try: - from . import google_places - except ImportError: - logger.debug("google_places module not available") - return result - - # Check Google cache first - cached_pid, cached_data = google_places.cache_get_google(osm_type, osm_id) - if cached_pid and cached_data: - _apply_google_data(result, cached_data, gaps) - result.setdefault('sources', {})['google_places'] = { - 'place_id': cached_pid, - 'source': 'cache', - } - logger.debug(f"google_places: cache hit for {osm_type}/{osm_id}") - return result - - # Skip if already looked up and found nothing (cached_pid is None) - if cached_pid is not None: - return result - - # Skip new Google API calls for guest users (cached data already returned above) - from .auth import get_user_id - if not get_user_id(): - logger.debug(f"google_places: skip API call for {osm_type}/{osm_id} — guest user") - return result - - # Daily cap check - if not google_places.check_daily_cap(): - return result - - # Search for the place - name = result.get('name', '') - centroid = result.get('centroid', {}) - lat = centroid.get('lat') - lon = centroid.get('lon') - if not name or not lat or not lon: - return result - - place_id = google_places.search_place(name, lat, lon) - if not place_id: - # Cache the miss to avoid repeated lookups - google_places.cache_put_google(osm_type, osm_id, '__miss__', None) - return result - - # Get details - details = google_places.get_place_details(place_id) - if not details: - google_places.cache_put_google(osm_type, osm_id, place_id, None) - return result - - # Cache the result - google_places.cache_put_google(osm_type, osm_id, place_id, details) - - # Apply to result - _apply_google_data(result, details, gaps) - result.setdefault('sources', {})['google_places'] = { - 'place_id': place_id, - 'source': 'api', - 'daily_count': google_places.get_daily_count(), - } - - return result - - -def _apply_google_data(result, google_data, gaps): - """Apply Google Places data to fill gap fields only.""" - extratags = result.get('extratags', {}) - if 'opening_hours' in gaps: - osm_hours = google_data.get('opening_hours') - if osm_hours: - extratags['opening_hours'] = osm_hours - elif google_data.get('opening_hours_raw'): - extratags['opening_hours_raw'] = google_data['opening_hours_raw'] - if 'phone' in gaps and google_data.get('phone_number'): - extratags['phone'] = google_data['phone_number'] - if 'website' in gaps and google_data.get('website'): - extratags['website'] = google_data['website'] - result['extratags'] = extratags - - - - -# ── Wiki link rewriting ───────────────────────────────────────────────── - -# Extratag keys that may contain wiki references -_WIKI_TAGS = ('wikipedia', 'wikidata', 'wikivoyage', 'appropedia') - - # ── Wiki Index enrichment ─────────────────────────────────────────────── @@ -369,598 +95,3 @@ def lookup_wiki_index(wikidata_id=None, name=None, country_code=None): except Exception as e: logger.debug(f"wiki_index lookup error: {e}") return None - - -def _enrich_with_wiki_index(result): - try: - from .deployment_config import get_deployment_config - deploy_config = get_deployment_config() - features = deploy_config.get("features", {}) - if not features.get("has_kiwix_wiki", False): - return result - except Exception: - return result - - db = _get_wiki_index_db() - if not db: - return result - - try: - cur = db.cursor() - row = None - - extratags = result.get("extratags", {}) - wikidata_id = result.get("wikidata_id") or extratags.get("wikidata") - if wikidata_id: - if isinstance(wikidata_id, str) and wikidata_id.startswith("http"): - wikidata_id = wikidata_id.split("/")[-1] - cur.execute( - "SELECT summary, wiki_population, wikipedia_title, wikivoyage_title FROM wiki_places WHERE wikidata_id = ?", - (wikidata_id,) - ) - row = cur.fetchone() - - if not row: - name = result.get("name") - address = result.get("address") or {} - country_code = address.get("country_code") or result.get("country_code") - if name and country_code: - cur.execute( - "SELECT summary, wiki_population, wikipedia_title, wikivoyage_title FROM wiki_places WHERE place_name = ? AND country_code = ? LIMIT 1", - (name, country_code.lower()) - ) - row = cur.fetchone() - - if row: - if row["summary"]: - result["wiki_summary"] = row["summary"] - if row["wiki_population"]: - try: - result["wiki_population"] = int(row["wiki_population"]) - except (ValueError, TypeError): - result["wiki_population"] = row["wiki_population"] - if row["wikipedia_title"]: - title = row["wikipedia_title"].replace(" ", "_") - result["wiki_url"] = f"https://en.wikipedia.org/wiki/{title}" - if row["wikivoyage_title"]: - title = row["wikivoyage_title"].replace(" ", "_") - result["wikivoyage_url"] = f"https://en.wikivoyage.org/wiki/{title}" - logger.debug(f"Wiki index enrichment hit for {result.get(name)}") - - except Exception as e: - logger.debug(f"Wiki index enrichment error: {e}") - - return result - -def _enrich_wiki_links(result): - """ - Rewrite wiki-related extratags to local Kiwix URLs where available. - Falls back to public URLs. Only runs when has_wiki_rewriting is enabled. - Returns the (possibly enriched) result dict. - """ - try: - from .deployment_config import get_deployment_config - deploy_config = get_deployment_config() - features = deploy_config.get('features', {}) - if not features.get('has_wiki_rewriting', False): - return result - except Exception: - return result - - try: - from .wiki_rewrite import rewrite_wiki_link - except ImportError: - logger.debug("wiki_rewrite module not available") - return result - - extratags = result.get('extratags', {}) - if not extratags: - return result - - rewrites = {} - for tag in _WIKI_TAGS: - value = extratags.get(tag) - if not value: - continue - url, status = rewrite_wiki_link(tag, value) - if status != 'original': - extratags[tag] = url - rewrites[tag] = status - - if rewrites: - result['extratags'] = extratags - result.setdefault('sources', {})['wiki_rewrites'] = rewrites - logger.debug(f"Wiki rewrites for {result.get('osm_type')}/{result.get('osm_id')}: {rewrites}") - - return result - -# ── Nominatim parsing ─────────────────────────────────────────────────── - -# Nominatim address array uses rank_address to indicate what each entry is. -# We map rank ranges to our flat address fields. -RANK_TO_FIELD = { - 4: 'country', - 5: 'postcode', - 6: 'state', # rank 6 = county in US, but we try name matching - 8: 'state', - 12: 'county', - 16: 'city', - 20: 'neighbourhood', - 22: 'neighbourhood', - 26: 'road', - 28: 'house_number', -} - - -def _parse_nominatim_address(address_array, country_code=None): - """Parse Nominatim's ranked address array into a flat address dict.""" - addr = { - 'house_number': None, - 'road': None, - 'neighbourhood': None, - 'city': None, - 'county': None, - 'state': None, - 'postcode': None, - 'country': None, - 'country_code': country_code, - } - - if not address_array: - return addr - - for entry in address_array: - if not entry.get('isaddress', False): - continue - - name = entry.get('localname', '') - rank = entry.get('rank_address', 0) - etype = entry.get('type', '') - eclass = entry.get('class', '') - - # Explicit type-based assignments (more reliable than rank alone) - if etype == 'country' and eclass == 'place': - addr['country'] = name - elif etype == 'state' or (eclass == 'boundary' and etype == 'administrative' and rank == 8): - if not addr['state']: - addr['state'] = name - elif etype == 'county' or (eclass == 'boundary' and etype == 'administrative' and rank in (10, 12)): - if not addr['county']: - addr['county'] = name - elif etype in ('city', 'town', 'village', 'hamlet') and eclass == 'place': - if not addr['city']: - addr['city'] = name - elif eclass == 'boundary' and etype == 'administrative' and rank == 16: - # City-level admin boundary (common in US) - if not addr['city']: - addr['city'] = name - elif etype == 'postcode': - addr['postcode'] = name - elif eclass == 'highway' or rank == 26: - if not addr['road']: - addr['road'] = name - elif etype == 'house_number' or rank == 28: - addr['house_number'] = name - elif rank in (20, 22) and not addr['neighbourhood']: - addr['neighbourhood'] = name - - # Remove county from output (not in spec) - addr.pop('county', None) - - return addr - - -def _parse_nominatim(data): - """Parse a Nominatim /details response into our canonical shape.""" - osm_type = data.get('osm_type', '') - osm_id = data.get('osm_id', 0) - osm_class = data.get('category', '') - osm_type_tag = data.get('type', '') - - # Centroid - centroid_geom = data.get('centroid', {}) - coords = centroid_geom.get('coordinates', [0, 0]) - centroid = {'lat': coords[1], 'lon': coords[0]} if len(coords) >= 2 else {'lat': 0, 'lon': 0} - - # Names - names = data.get('names', {}) - display_name = data.get('localname', '') or names.get('name', '') - - # Address - address = _parse_nominatim_address( - data.get('address', []), - country_code=data.get('country_code') - ) - - # Use calculated_postcode if address parse didn't find one - if not address.get('postcode') and data.get('calculated_postcode'): - address['postcode'] = data['calculated_postcode'] - - # Extratags - raw_extra = data.get('extratags', {}) - extratags = { - 'opening_hours': raw_extra.get('opening_hours'), - 'phone': raw_extra.get('phone') or raw_extra.get('contact:phone'), - 'website': raw_extra.get('website') or raw_extra.get('contact:website') or raw_extra.get('url'), - 'email': raw_extra.get('email') or raw_extra.get('contact:email'), - 'wikipedia': raw_extra.get('wikipedia'), - 'wikidata': raw_extra.get('wikidata'), - 'cuisine': raw_extra.get('cuisine'), - 'operator': raw_extra.get('operator'), - 'wheelchair': raw_extra.get('wheelchair'), - 'fee': raw_extra.get('fee'), - 'takeaway': raw_extra.get('takeaway'), - } - - # Category: use extratags.place for boundaries (e.g. "city"), else class/type - effective_class = osm_class - effective_type = osm_type_tag - if osm_class == 'boundary' and osm_type_tag == 'administrative': - place_tag = raw_extra.get('place') or raw_extra.get('linked_place') - if place_tag: - effective_class = 'place' - effective_type = place_tag - - category = humanize_category(effective_class, effective_type) - - # Filter names: only include extra name tags, not the bare "name" - extra_names = {k: v for k, v in names.items() if k != 'name'} if names else {} - - # Boundary geometry (polygon/multipolygon from Nominatim) - boundary = None - geom = data.get('geometry') - if geom and geom.get('type') in ('Polygon', 'MultiPolygon'): - boundary = geom - - return { - 'osm_type': osm_type, - 'osm_id': osm_id, - 'name': display_name, - 'category': category, - 'class': osm_class, - 'type': osm_type_tag, - 'address': address, - 'centroid': centroid, - 'extratags': extratags, - 'names': extra_names if extra_names else None, - 'source': 'nominatim_local', - 'boundary': boundary, - } - - -# ── Overpass parsing ──────────────────────────────────────────────────── - -OVERPASS_TYPE_MAP = {'N': 'node', 'W': 'way', 'R': 'relation'} - - -def _build_overpass_query(osm_type, osm_id): - """Build an Overpass QL query for a single element.""" - elem = OVERPASS_TYPE_MAP.get(osm_type) - if not elem: - return None - return f"[out:json][timeout:10];{elem}({osm_id});out tags center;" - - -def _parse_overpass(data, osm_type, osm_id): - """Parse an Overpass API response into our canonical shape.""" - elements = data.get('elements', []) - if not elements: - return None - - elem = elements[0] - tags = elem.get('tags', {}) - - # Centroid: Overpass returns lat/lon for nodes, center for ways/relations - lat = elem.get('lat') or (elem.get('center', {}).get('lat')) - lon = elem.get('lon') or (elem.get('center', {}).get('lon')) - centroid = {'lat': lat, 'lon': lon} if lat and lon else {'lat': 0, 'lon': 0} - - # Determine class/type from tags — Overpass doesn't have a canonical class field - # Use the first recognized class tag - osm_class = '' - osm_type_tag = '' - for cls in ('amenity', 'shop', 'leisure', 'tourism', 'natural', 'highway', - 'boundary', 'place', 'building', 'waterway', 'landuse', 'historic'): - if cls in tags: - osm_class = cls - osm_type_tag = tags[cls] - break - - category = humanize_category(osm_class, osm_type_tag) - - # Address from addr:* tags - address = { - 'house_number': tags.get('addr:housenumber'), - 'road': tags.get('addr:street'), - 'neighbourhood': tags.get('addr:suburb') or tags.get('addr:neighbourhood'), - 'city': tags.get('addr:city'), - 'state': tags.get('addr:state'), - 'postcode': tags.get('addr:postcode'), - 'country': tags.get('addr:country'), - 'country_code': tags.get('addr:country_code', - tags.get('addr:country', '')).lower()[:2] or None, - } - - # Extratags - extratags = { - 'opening_hours': tags.get('opening_hours'), - 'phone': tags.get('phone') or tags.get('contact:phone'), - 'website': tags.get('website') or tags.get('contact:website') or tags.get('url'), - 'email': tags.get('email') or tags.get('contact:email'), - 'wikipedia': tags.get('wikipedia'), - 'wikidata': tags.get('wikidata'), - 'cuisine': tags.get('cuisine'), - 'operator': tags.get('operator'), - 'wheelchair': tags.get('wheelchair'), - 'fee': tags.get('fee'), - 'takeaway': tags.get('takeaway'), - } - - # Names - name = tags.get('name', '') - extra_names = {} - for k, v in tags.items(): - if k.startswith('name:') or k in ('alt_name', 'old_name', 'short_name', 'official_name'): - extra_names[k] = v - - return { - 'osm_type': osm_type, - 'osm_id': osm_id, - 'name': name, - 'category': category, - 'class': osm_class, - 'type': osm_type_tag, - 'address': address, - 'centroid': centroid, - 'extratags': extratags, - 'names': extra_names if extra_names else None, - 'source': 'overpass', - } - - -# ── Public API ────────────────────────────────────────────────────────── - -def get_place_detail(osm_type, osm_id): - """ - Fetch place details for an OSM element. - - Returns (dict, status_code): - - (data, 200) on success - - (error_dict, 404) if not found in any source - - (error_dict, 502) if both sources error - """ - osm_type = osm_type.upper() - if osm_type not in VALID_OSM_TYPES: - return {'error': f'Invalid osm_type: {osm_type}. Must be N, W, or R.'}, 400 - - if osm_id <= 0: - return {'error': 'osm_id must be a positive integer'}, 400 - - # 1. Check cache - cached = cache_get(osm_type, osm_id) - if cached: - logger.debug(f"Cache hit: {osm_type}/{osm_id}") - return cached, 200 - - # 2. Try local Nominatim first - nominatim_result = None - nominatim_error = None - try: - resp = http_requests.get(NOMINATIM_URL, params={ - 'osmtype': osm_type, - 'osmid': osm_id, - 'format': 'json', - 'addressdetails': 1, - 'hierarchy': 0, - 'keywords': 0, - 'polygon_geojson': 1, - }, timeout=5) - - if resp.status_code == 200: - data = resp.json() - # Nominatim returns a result even for IDs not in its DB, - # but they'll have empty/minimal data. Check for osm_id match. - if data.get('osm_id') == osm_id: - nominatim_result = _parse_nominatim(data) - logger.debug(f"Nominatim hit: {osm_type}/{osm_id}") - except Exception as e: - nominatim_error = str(e) - logger.warning(f"Nominatim error for {osm_type}/{osm_id}: {e}") - - if nominatim_result: - nominatim_result = _enrich_with_overture(nominatim_result, osm_type, osm_id) - nominatim_result = _enrich_with_google(nominatim_result, osm_type, osm_id) - nominatim_result = _enrich_wiki_links(nominatim_result) - nominatim_result = _enrich_with_wiki_index(nominatim_result) - cache_put(osm_type, osm_id, nominatim_result, 'nominatim_local') - return nominatim_result, 200 - - # 3. Fallback to Overpass - overpass_result = None - overpass_error = None - try: - query = _build_overpass_query(osm_type, osm_id) - if query: - resp = http_requests.post( - OVERPASS_URL, - data={'data': query}, - headers={'User-Agent': OVERPASS_UA}, - timeout=10, - ) - if resp.status_code == 200: - data = resp.json() - overpass_result = _parse_overpass(data, osm_type, osm_id) - if overpass_result: - logger.debug(f"Overpass hit: {osm_type}/{osm_id}") - elif resp.status_code == 429: - overpass_error = "Overpass rate limited" - logger.warning(f"Overpass 429 for {osm_type}/{osm_id}") - else: - overpass_error = f"Overpass HTTP {resp.status_code}" - except Exception as e: - overpass_error = str(e) - logger.warning(f"Overpass error for {osm_type}/{osm_id}: {e}") - - if overpass_result: - overpass_result = _enrich_with_overture(overpass_result, osm_type, osm_id) - overpass_result = _enrich_with_google(overpass_result, osm_type, osm_id) - overpass_result = _enrich_wiki_links(overpass_result) - overpass_result = _enrich_with_wiki_index(overpass_result) - cache_put(osm_type, osm_id, overpass_result, 'overpass') - return overpass_result, 200 - - # 4. Both failed - if nominatim_error and overpass_error: - logger.error(f"Both sources failed for {osm_type}/{osm_id}: " - f"Nominatim={nominatim_error}, Overpass={overpass_error}") - return {'error': 'Both data sources unavailable'}, 502 - - # Not found in either source (no errors, just empty results) - return {'error': f'{osm_type}/{osm_id} not found'}, 404 - - -# ── Wikidata lookup ───────────────────────────────────────────────────── - -WIKIDATA_API_URL = "https://www.wikidata.org/w/api.php" - -def get_place_by_wikidata(wikidata_id): - """ - Fetch place details from Wikidata entity. - - Returns (dict, status_code): - - (data, 200) on success - - (error_dict, 404) if entity not found - - (error_dict, 400) if invalid ID format - - (error_dict, 502) on API error - """ - # Validate wikidata ID format (Q followed by digits) - wikidata_id = wikidata_id.upper().strip() - if not wikidata_id.startswith("Q") or not wikidata_id[1:].isdigit(): - return {"error": f"Invalid wikidata ID: {wikidata_id}. Must be Q followed by digits."}, 400 - - try: - resp = http_requests.get(WIKIDATA_API_URL, params={ - "action": "wbgetentities", - "ids": wikidata_id, - "format": "json", - "languages": "en", - "props": "labels|descriptions|claims|sitelinks", - }, timeout=10, headers={"User-Agent": "Navi/1.0 (forge.echo6.co/matt/recon)"}) - - if resp.status_code != 200: - logger.warning(f"Wikidata API error for {wikidata_id}: HTTP {resp.status_code}") - return {"error": "Wikidata API error"}, 502 - - data = resp.json() - entities = data.get("entities", {}) - entity = entities.get(wikidata_id) - - if not entity or entity.get("missing"): - return {"error": f"Wikidata entity {wikidata_id} not found"}, 404 - - # Extract basic info - labels = entity.get("labels", {}) - descriptions = entity.get("descriptions", {}) - claims = entity.get("claims", {}) - - name = labels.get("en", {}).get("value", wikidata_id) - description = descriptions.get("en", {}).get("value", "") - - # Extract coordinates from P625 (coordinate location) - lat, lon = None, None - if "P625" in claims: - coord_claim = claims["P625"] - if coord_claim and coord_claim[0].get("mainsnak", {}).get("datavalue"): - coord_val = coord_claim[0]["mainsnak"]["datavalue"]["value"] - lat = coord_val.get("latitude") - lon = coord_val.get("longitude") - - # Extract population from P1082 - population = None - if "P1082" in claims: - pop_claims = claims["P1082"] - if pop_claims: - # Get the most recent population value - for claim in pop_claims: - if claim.get("mainsnak", {}).get("datavalue"): - try: - population = int(claim["mainsnak"]["datavalue"]["value"]["amount"].lstrip("+")) - break - except (KeyError, ValueError): - pass - - # Extract country from P17 - country = None - if "P17" in claims: - country_claims = claims["P17"] - if country_claims and country_claims[0].get("mainsnak", {}).get("datavalue"): - country_id = country_claims[0]["mainsnak"]["datavalue"]["value"]["id"] - # Could resolve this to a name, but for now just store the ID - - # Extract instance of (P31) for type classification - instance_of = [] - if "P31" in claims: - for claim in claims["P31"]: - if claim.get("mainsnak", {}).get("datavalue"): - instance_of.append(claim["mainsnak"]["datavalue"]["value"]["id"]) - - # Extract OSM relation ID if available (P402) - osm_relation_id = None - if "P402" in claims: - osm_claims = claims["P402"] - if osm_claims and osm_claims[0].get("mainsnak", {}).get("datavalue"): - osm_relation_id = osm_claims[0]["mainsnak"]["datavalue"]["value"] - - # Extract Wikipedia sitelink - sitelinks = entity.get("sitelinks", {}) - wikipedia = None - if "enwiki" in sitelinks: - wiki_title = sitelinks["enwiki"].get("title", "") - if wiki_title: - wikipedia = f"en:{wiki_title}" - - result = { - "wikidata_id": wikidata_id, - "name": name, - "description": description, - "centroid": {"lat": lat, "lon": lon} if lat and lon else None, - "population": population, - "instance_of": instance_of, - "osm_relation_id": osm_relation_id, - "source": "wikidata", - "extratags": { - "wikidata": wikidata_id, - }, - } - - if wikipedia: - result["extratags"]["wikipedia"] = wikipedia - - # Fetch boundary polygon from Nominatim if we have an OSM relation ID - boundary = None - if osm_relation_id: - try: - nom_resp = http_requests.get(NOMINATIM_URL, params={ - 'osmtype': 'R', - 'osmid': osm_relation_id, - 'format': 'json', - 'polygon_geojson': 1, - }, timeout=5) - if nom_resp.status_code == 200: - nom_data = nom_resp.json() - geom = nom_data.get('geometry') - if geom and geom.get('type') in ('Polygon', 'MultiPolygon'): - boundary = geom - logger.debug(f"Wikidata boundary hit for {wikidata_id}") - except Exception as e: - logger.debug(f"Wikidata boundary fetch failed: {e}") - - result["boundary"] = boundary - - result = _enrich_with_wiki_index(result) - logger.debug(f"Wikidata hit: {wikidata_id} -> {name}") - return result, 200 - - except Exception as e: - logger.warning(f"Wikidata error for {wikidata_id}: {e}") - return {"error": "Wikidata lookup failed"}, 502