diff --git a/config/profiles/home.yaml b/config/profiles/home.yaml index d894d81..99430a8 100644 --- a/config/profiles/home.yaml +++ b/config/profiles/home.yaml @@ -39,6 +39,7 @@ features: has_traffic_overlay: true has_landclass: false has_address_book_write: false + has_overture_enrichment: true defaults: center: [42.5736, -114.6066] diff --git a/config/profiles/minimal_pi.yaml b/config/profiles/minimal_pi.yaml index e4fe651..07a61d5 100644 --- a/config/profiles/minimal_pi.yaml +++ b/config/profiles/minimal_pi.yaml @@ -34,6 +34,7 @@ features: has_traffic_overlay: false has_landclass: false has_address_book_write: true + has_overture_enrichment: false defaults: center: [44.0, -114.0] diff --git a/config/profiles/regional_pi.yaml b/config/profiles/regional_pi.yaml index 043e9e7..291ee81 100644 --- a/config/profiles/regional_pi.yaml +++ b/config/profiles/regional_pi.yaml @@ -39,6 +39,7 @@ features: has_traffic_overlay: true has_landclass: true has_address_book_write: true + has_overture_enrichment: false defaults: center: [44.0, -114.0] diff --git a/lib/overture.py b/lib/overture.py new file mode 100644 index 0000000..fcbdd18 --- /dev/null +++ b/lib/overture.py @@ -0,0 +1,170 @@ +""" +Overture Maps enrichment layer. + +Provides lookup functions against the local PostgreSQL Overture Places database. +Two strategies: + 1. find_by_osm_id — exact match via OSM cross-reference index + 2. find_by_coords_and_name — spatial + fuzzy name fallback + +Connection pool is lazy-initialized on first call. If PostgreSQL is unreachable, +functions return None gracefully (feature degrades, doesn't crash). +""" +import json +import os + +import psycopg2 +import psycopg2.pool + +from .utils import setup_logging + +logger = setup_logging('recon.overture') + +_pool = None +_pool_failed = False + +# Map full OSM type names to single-letter codes used in Overture sources +OSM_TYPE_MAP = { + 'N': 'n', 'W': 'w', 'R': 'r', + 'node': 'n', 'way': 'w', 'relation': 'r', + 'n': 'n', 'w': 'w', 'r': 'r', +} + + +def _get_pool(): + """Lazy-init the connection pool. Returns None if Postgres is unreachable.""" + global _pool, _pool_failed + if _pool is not None: + return _pool + if _pool_failed: + return None + + try: + _pool = psycopg2.pool.SimpleConnectionPool( + minconn=1, + maxconn=3, + host=os.environ.get('OVERTURE_DB_HOST', 'localhost'), + port=int(os.environ.get('OVERTURE_DB_PORT', '5432')), + dbname=os.environ.get('OVERTURE_DB_NAME', 'overture'), + user=os.environ.get('OVERTURE_DB_USER', 'overture'), + password=os.environ.get('OVERTURE_DB_PASSWORD', ''), + connect_timeout=5, + ) + logger.info("Overture PostgreSQL connection pool initialized") + return _pool + except Exception as e: + _pool_failed = True + logger.warning(f"Overture PostgreSQL unavailable, enrichment disabled: {e}") + return None + + +def _query(sql, params): + """Execute a query and return the first row as a dict, or None.""" + pool = _get_pool() + if pool is None: + return None + + conn = None + try: + conn = pool.getconn() + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + if row is None: + return None + cols = [desc[0] for desc in cur.description] + return dict(zip(cols, row)) + except Exception as e: + logger.warning(f"Overture query error: {e}") + if conn: + try: + conn.rollback() + except Exception: + pass + return None + finally: + if conn: + try: + pool.putconn(conn) + except Exception: + pass + + +def _format_result(row, match_method): + """Convert a database row dict to the enrichment result shape.""" + if not row: + return None + + socials = row.get('socials') + if isinstance(socials, str): + try: + socials = json.loads(socials) + except (json.JSONDecodeError, TypeError): + socials = None + + return { + 'phone': row.get('phone'), + 'website': row.get('website'), + 'socials': socials, + 'brand_name': row.get('brand_name'), + 'brand_wikidata': row.get('brand_wikidata'), + 'basic_category': row.get('basic_category'), + 'confidence': row.get('confidence'), + 'gers_id': row.get('id'), + 'match_method': match_method, + } + + +def find_by_osm_id(osm_type, osm_id): + """ + Look up an Overture place by its OSM cross-reference. + + Args: + osm_type: OSM type — 'N', 'W', 'R', 'node', 'way', 'relation', or single letter + osm_id: OSM numeric ID + + Returns: + Enrichment dict or None + """ + type_letter = OSM_TYPE_MAP.get(osm_type) + if not type_letter: + return None + + row = _query( + """SELECT id, name, basic_category, confidence, + phone, website, socials, brand_name, brand_wikidata + FROM places + WHERE osm_type = %s AND osm_id = %s + LIMIT 1""", + (type_letter, int(osm_id)) + ) + return _format_result(row, 'osm_xref') + + +def find_by_coords_and_name(lat, lon, name, radius_m=100): + """ + Look up an Overture place by spatial proximity + fuzzy name match. + + Args: + lat: Latitude + lon: Longitude + name: Place name to fuzzy-match + radius_m: Search radius in meters (default 100) + + Returns: + Enrichment dict or None + """ + if not name or not lat or not lon: + return None + + row = _query( + """SELECT id, name, basic_category, confidence, + phone, website, socials, brand_name, brand_wikidata, + similarity(name, %s) AS sim + FROM places + WHERE ST_DWithin(geometry::geography, ST_MakePoint(%s, %s)::geography, %s) + AND similarity(name, %s) > 0.4 + ORDER BY sim DESC, ST_Distance(geometry::geography, ST_MakePoint(%s, %s)::geography) ASC + LIMIT 1""", + (name, lon, lat, radius_m, name, lon, lat) + ) + return _format_result(row, 'coord_name_fuzzy') diff --git a/lib/place_detail.py b/lib/place_detail.py index f225a08..8ca2781 100644 --- a/lib/place_detail.py +++ b/lib/place_detail.py @@ -1,5 +1,6 @@ """ Place detail proxy — local Nominatim first, Overpass API fallback, SQLite cache. +Overture Maps enrichment layer fills sparse extratags (phone, website, brand). Provides get_place_detail(osm_type, osm_id) which returns a cleaned dict matching the response shape for /api/place//. @@ -82,6 +83,77 @@ def cache_put(osm_type, osm_id, data, source): db.commit() +# ── Overture enrichment ───────────────────────────────────────────────── + +def _enrich_with_overture(result, osm_type, osm_id): + """ + Attempt to enrich a place result with Overture Maps data. + Fills sparse extratags (phone, website, brand) without overwriting existing values. + Returns the (possibly enriched) result dict. + """ + try: + from .deployment_config import get_deployment_config + deploy_config = get_deployment_config() + features = deploy_config.get('features', {}) + if not features.get('has_overture_enrichment', False): + return result + except Exception: + return result + + try: + from .overture import find_by_osm_id, find_by_coords_and_name + except ImportError: + logger.debug("Overture module not available") + return result + + enrichment = None + match_method = None + + # Strategy 1: OSM cross-reference (exact) + enrichment = find_by_osm_id(osm_type, osm_id) + if enrichment: + match_method = 'osm_xref' + + # Strategy 2: Coordinate + name fuzzy (fallback) + if not enrichment and result.get('centroid') and result.get('name'): + centroid = result['centroid'] + if centroid.get('lat') and centroid.get('lon'): + enrichment = find_by_coords_and_name( + centroid['lat'], centroid['lon'], result['name'] + ) + if enrichment: + match_method = 'coord_name_fuzzy' + + if not enrichment: + return result + + # Fill sparse extratags (never overwrite existing non-null values) + extratags = result.get('extratags', {}) + fill_map = [ + ('phone', 'phone'), + ('website', 'website'), + ('brand', 'brand_name'), + ('brand:wikidata', 'brand_wikidata'), + ] + for osm_key, overture_key in fill_map: + if not extratags.get(osm_key) and enrichment.get(overture_key): + extratags[osm_key] = enrichment[overture_key] + result['extratags'] = extratags + + # Add source metadata + result['sources'] = { + 'primary': result.get('source', 'unknown'), + 'enrichment': 'overture', + 'overture_match_method': match_method, + 'overture_gers_id': enrichment.get('gers_id'), + 'overture_confidence': enrichment.get('confidence'), + 'overture_basic_category': enrichment.get('basic_category'), + } + + logger.debug(f"Overture enrichment for {osm_type}/{osm_id}: {match_method}") + return result + + # ── Nominatim parsing ─────────────────────────────────────────────────── # Nominatim address array uses rank_address to indicate what each entry is. @@ -368,6 +440,7 @@ def get_place_detail(osm_type, osm_id): logger.warning(f"Nominatim error for {osm_type}/{osm_id}: {e}") if nominatim_result: + nominatim_result = _enrich_with_overture(nominatim_result, osm_type, osm_id) cache_put(osm_type, osm_id, nominatim_result, 'nominatim_local') return nominatim_result, 200 @@ -398,6 +471,7 @@ def get_place_detail(osm_type, osm_id): logger.warning(f"Overpass error for {osm_type}/{osm_id}: {e}") if overpass_result: + overpass_result = _enrich_with_overture(overpass_result, osm_type, osm_id) cache_put(osm_type, osm_id, overpass_result, 'overpass') return overpass_result, 200 diff --git a/scripts/overture_import.py b/scripts/overture_import.py new file mode 100644 index 0000000..0b6ba67 --- /dev/null +++ b/scripts/overture_import.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +"""Overture Maps Places → PostgreSQL import script (v2). + +Downloads Overture Places Parquet from S3 via DuckDB (public bucket, no credentials), +filters to North America bounding box, and inserts into local PostgreSQL with PostGIS. + +Usage: + cd /opt/recon && venv/bin/python scripts/overture_import.py + +Re-runnable (idempotent via UPSERT). +""" + +import json +import logging +import os +import re +import sys +import time + +import duckdb +import psycopg2 +import psycopg2.extras + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%H:%M:%S' +) +log = logging.getLogger('overture_import') + +# --- Config --- +OVERTURE_RELEASE = '2026-04-15.0' +S3_PATH = f's3://overturemaps-us-west-2/release/{OVERTURE_RELEASE}/theme=places/type=place/*' + +# North America bounding box (generous — includes Hawaii, Puerto Rico, Canada) +BBOX = { + 'xmin': -170.0, + 'xmax': -50.0, + 'ymin': 15.0, + 'ymax': 85.0, +} + +BATCH_SIZE = 50_000 +OSM_RECORD_RE = re.compile(r'^([nwr])(\d+)@\d+$') + +DB_CONFIG = { + 'host': os.environ.get('OVERTURE_DB_HOST', 'localhost'), + 'port': int(os.environ.get('OVERTURE_DB_PORT', '5432')), + 'dbname': os.environ.get('OVERTURE_DB_NAME', 'overture'), + 'user': os.environ.get('OVERTURE_DB_USER', 'overture'), + 'password': os.environ.get('OVERTURE_DB_PASSWORD', ''), +} + + +def create_table(conn): + """Create places table and indexes if they don't exist.""" + with conn.cursor() as cur: + cur.execute(""" + CREATE TABLE IF NOT EXISTS places ( + id TEXT PRIMARY KEY, + geometry GEOMETRY(Point, 4326), + name TEXT, + basic_category TEXT, + confidence REAL, + phone TEXT, + website TEXT, + socials JSONB, + brand_name TEXT, + brand_wikidata TEXT, + osm_type CHAR(1), + osm_id BIGINT, + source_record_id TEXT, + raw_sources JSONB + ); + """) + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_places_osm + ON places(osm_type, osm_id) WHERE osm_type IS NOT NULL; + """) + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_places_geom + ON places USING GIST(geometry); + """) + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_places_name_trgm + ON places USING GIN(name gin_trgm_ops); + """) + conn.commit() + log.info('Table and indexes ready') + + +def parse_osm_ref(sources): + """Extract OSM type letter and ID from Overture sources array.""" + if not sources: + return None, None, None + for src in sources: + record_id = None + if isinstance(src, dict): + record_id = src.get('record_id', '') + elif hasattr(src, '__getitem__'): + # DuckDB struct — try attribute access + try: + record_id = src['record_id'] + except (KeyError, TypeError, IndexError): + pass + if not record_id: + continue + m = OSM_RECORD_RE.match(str(record_id)) + if m: + return m.group(1), int(m.group(2)), str(record_id) + return None, None, None + + +def run_import(): + """Main import: DuckDB reads S3 Parquet → PostgreSQL via chunked OFFSET/LIMIT.""" + log.info(f'Overture release: {OVERTURE_RELEASE}') + log.info(f'S3 path: {S3_PATH}') + log.info(f'Bounding box: {BBOX}') + + # Connect to PostgreSQL + conn = psycopg2.connect(**DB_CONFIG) + conn.autocommit = False + create_table(conn) + + # Set up DuckDB with httpfs and spatial for S3 access + duck = duckdb.connect() + duck.execute("INSTALL httpfs; LOAD httpfs;") + duck.execute("INSTALL spatial; LOAD spatial;") + duck.execute("SET s3_region='us-west-2';") + + # Use a materialized approach: DuckDB query → Arrow → iterate in Python + query = f""" + SELECT + id, + ST_X(geometry) AS lon, + ST_Y(geometry) AS lat, + names.primary AS name, + basic_category, + confidence, + phones, + websites, + socials, + brand, + sources + FROM read_parquet('{S3_PATH}', hive_partitioning=true) + WHERE bbox.xmin >= {BBOX['xmin']} + AND bbox.xmax <= {BBOX['xmax']} + AND bbox.ymin >= {BBOX['ymin']} + AND bbox.ymax <= {BBOX['ymax']} + """ + + log.info('Starting DuckDB query against S3 (this will take several minutes)...') + t_start = time.time() + + # Execute and fetch all as Arrow for efficient iteration + result_rel = duck.sql(query) + + upsert_sql = """ + INSERT INTO places (id, geometry, name, basic_category, confidence, + phone, website, socials, brand_name, brand_wikidata, + osm_type, osm_id, source_record_id, raw_sources) + VALUES %s + ON CONFLICT (id) DO UPDATE SET + geometry = EXCLUDED.geometry, + name = EXCLUDED.name, + basic_category = EXCLUDED.basic_category, + confidence = EXCLUDED.confidence, + phone = EXCLUDED.phone, + website = EXCLUDED.website, + socials = EXCLUDED.socials, + brand_name = EXCLUDED.brand_name, + brand_wikidata = EXCLUDED.brand_wikidata, + osm_type = EXCLUDED.osm_type, + osm_id = EXCLUDED.osm_id, + source_record_id = EXCLUDED.source_record_id, + raw_sources = EXCLUDED.raw_sources + """ + + template = """( + %(id)s, + ST_SetSRID(ST_MakePoint(%(lon)s, %(lat)s), 4326), + %(name)s, + %(basic_category)s, + %(confidence)s, + %(phone)s, + %(website)s, + %(socials)s::jsonb, + %(brand_name)s, + %(brand_wikidata)s, + %(osm_type)s, + %(osm_id)s, + %(source_record_id)s, + %(raw_sources)s::jsonb + )""" + + total = 0 + osm_refs = 0 + batch = [] + + log.info('DuckDB query executing, fetching results in chunks...') + + # Fetch in chunks using fetchmany on the relation + chunk_size = BATCH_SIZE + while True: + chunk = result_rel.fetchmany(chunk_size) + if not chunk: + break + + for row in chunk: + row_id = row[0] + lon = row[1] + lat = row[2] + name = row[3] + basic_cat = row[4] + conf = row[5] + phones = row[6] + websites = row[7] + socials_raw = row[8] + brand_raw = row[9] + sources_raw = row[10] + + if lon is None or lat is None: + continue + + # Phone: first element of VARCHAR[] + phone = None + if phones and len(phones) > 0: + phone = str(phones[0]) if phones[0] else None + + # Website: first element of VARCHAR[] + website = None + if websites and len(websites) > 0: + website = str(websites[0]) if websites[0] else None + + # Socials: VARCHAR[] → JSON array of strings + socials_json = None + if socials_raw and len(socials_raw) > 0: + socials_json = json.dumps([str(s) for s in socials_raw if s]) + + # Brand: struct with wikidata and names.primary + brand_name = None + brand_wikidata = None + if brand_raw: + try: + if isinstance(brand_raw, dict): + brand_wikidata = brand_raw.get('wikidata') + names_struct = brand_raw.get('names') + if names_struct and isinstance(names_struct, dict): + brand_name = names_struct.get('primary') + else: + # DuckDB struct — access by key + brand_wikidata = brand_raw['wikidata'] if 'wikidata' in dir(brand_raw) else None + try: + brand_wikidata = brand_raw[0] # wikidata is first field + names_struct = brand_raw[1] # names is second field + if names_struct: + brand_name = names_struct[0] # primary is first field + except (IndexError, TypeError): + pass + except Exception: + pass + + # Sources: parse OSM cross-reference + sources_list = None + if sources_raw: + if isinstance(sources_raw, (list, tuple)): + sources_list = [] + for s in sources_raw: + if isinstance(s, dict): + sources_list.append(s) + else: + # DuckDB struct tuple — convert + try: + sources_list.append({ + 'dataset': s[1] if len(s) > 1 else None, + 'record_id': s[3] if len(s) > 3 else None, + }) + except (TypeError, IndexError): + pass + + osm_type_letter, osm_id_val, source_record_id = parse_osm_ref(sources_list) + if osm_type_letter: + osm_refs += 1 + + raw_sources_json = json.dumps(sources_list) if sources_list else None + + batch.append({ + 'id': row_id, + 'lon': float(lon), + 'lat': float(lat), + 'name': name, + 'basic_category': basic_cat, + 'confidence': float(conf) if conf is not None else None, + 'phone': phone, + 'website': website, + 'socials': socials_json, + 'brand_name': brand_name, + 'brand_wikidata': brand_wikidata, + 'osm_type': osm_type_letter, + 'osm_id': osm_id_val, + 'source_record_id': source_record_id, + 'raw_sources': raw_sources_json, + }) + + if len(batch) >= BATCH_SIZE: + with conn.cursor() as cur: + psycopg2.extras.execute_values( + cur, upsert_sql, batch, + template=template, + page_size=BATCH_SIZE + ) + conn.commit() + total += len(batch) + elapsed = time.time() - t_start + rate = total / elapsed if elapsed > 0 else 0 + log.info(f'Inserted {total:,} rows ({osm_refs:,} OSM xrefs) ' + f'[{rate:.0f} rows/sec, {elapsed:.0f}s elapsed]') + batch = [] + + # Flush remaining + if batch: + with conn.cursor() as cur: + psycopg2.extras.execute_values( + cur, upsert_sql, batch, + template=template, + page_size=BATCH_SIZE + ) + conn.commit() + total += len(batch) + + duck.close() + + # Final stats + elapsed = time.time() - t_start + log.info(f'Import complete: {total:,} rows, {osm_refs:,} OSM cross-refs, ' + f'{elapsed:.0f}s total ({total/elapsed:.0f} rows/sec)') + + # Verify + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM places") + count = cur.fetchone()[0] + cur.execute("SELECT count(*) FROM places WHERE osm_type IS NOT NULL") + osm_count = cur.fetchone()[0] + log.info(f'Final table: {count:,} total rows, {osm_count:,} with OSM cross-references') + + conn.close() + + +if __name__ == '__main__': + run_import()