From 21c0f11eff17d3de8f44dc771ff2bd55bb05c71d Mon Sep 17 00:00:00 2001 From: malice Date: Sat, 23 May 2026 13:57:14 -0600 Subject: [PATCH] decouple: remove scripts/overture_import.py (migrated to navi-backend) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-B of the Overture-import relocation. The ETL now lives in zvx-echo6/navi-backend at the same scripts/ path (PR-A, navi-backend 475739d: script ported verbatim + duckdb dep + docs; verified live — imports cleanly, overture PG reachable with ~20.9M rows). recon no longer produces overture data it doesn't consume. - DELETE scripts/overture_import.py. Context: cleanup #29 removed lib/overture.py (recon's only overture *reader*), leaving this ETL as recon's last orphan overture code path. PR-A moved the writer to the navi side; this removes recon's now-orphan copy. The `overture` PG database is unchanged — only the writer moved. OVERTURE_DB_* vars in /opt/recon/.env are now dead in recon (zero overture code paths remain) — flagged for out-of-band post-merge prune, same pattern as PADUS_DB_* (cleanup #5). Co-authored-by: Claude Opus 4.7 (1M context) --- scripts/overture_import.py | 350 ------------------------------------- 1 file changed, 350 deletions(-) delete mode 100644 scripts/overture_import.py diff --git a/scripts/overture_import.py b/scripts/overture_import.py deleted file mode 100644 index 0b6ba67..0000000 --- a/scripts/overture_import.py +++ /dev/null @@ -1,350 +0,0 @@ -#!/usr/bin/env python3 -"""Overture Maps Places → PostgreSQL import script (v2). - -Downloads Overture Places Parquet from S3 via DuckDB (public bucket, no credentials), -filters to North America bounding box, and inserts into local PostgreSQL with PostGIS. - -Usage: - cd /opt/recon && venv/bin/python scripts/overture_import.py - -Re-runnable (idempotent via UPSERT). -""" - -import json -import logging -import os -import re -import sys -import time - -import duckdb -import psycopg2 -import psycopg2.extras - -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(levelname)s %(message)s', - datefmt='%H:%M:%S' -) -log = logging.getLogger('overture_import') - -# --- Config --- -OVERTURE_RELEASE = '2026-04-15.0' -S3_PATH = f's3://overturemaps-us-west-2/release/{OVERTURE_RELEASE}/theme=places/type=place/*' - -# North America bounding box (generous — includes Hawaii, Puerto Rico, Canada) -BBOX = { - 'xmin': -170.0, - 'xmax': -50.0, - 'ymin': 15.0, - 'ymax': 85.0, -} - -BATCH_SIZE = 50_000 -OSM_RECORD_RE = re.compile(r'^([nwr])(\d+)@\d+$') - -DB_CONFIG = { - 'host': os.environ.get('OVERTURE_DB_HOST', 'localhost'), - 'port': int(os.environ.get('OVERTURE_DB_PORT', '5432')), - 'dbname': os.environ.get('OVERTURE_DB_NAME', 'overture'), - 'user': os.environ.get('OVERTURE_DB_USER', 'overture'), - 'password': os.environ.get('OVERTURE_DB_PASSWORD', ''), -} - - -def create_table(conn): - """Create places table and indexes if they don't exist.""" - with conn.cursor() as cur: - cur.execute(""" - CREATE TABLE IF NOT EXISTS places ( - id TEXT PRIMARY KEY, - geometry GEOMETRY(Point, 4326), - name TEXT, - basic_category TEXT, - confidence REAL, - phone TEXT, - website TEXT, - socials JSONB, - brand_name TEXT, - brand_wikidata TEXT, - osm_type CHAR(1), - osm_id BIGINT, - source_record_id TEXT, - raw_sources JSONB - ); - """) - cur.execute(""" - CREATE INDEX IF NOT EXISTS idx_places_osm - ON places(osm_type, osm_id) WHERE osm_type IS NOT NULL; - """) - cur.execute(""" - CREATE INDEX IF NOT EXISTS idx_places_geom - ON places USING GIST(geometry); - """) - cur.execute(""" - CREATE INDEX IF NOT EXISTS idx_places_name_trgm - ON places USING GIN(name gin_trgm_ops); - """) - conn.commit() - log.info('Table and indexes ready') - - -def parse_osm_ref(sources): - """Extract OSM type letter and ID from Overture sources array.""" - if not sources: - return None, None, None - for src in sources: - record_id = None - if isinstance(src, dict): - record_id = src.get('record_id', '') - elif hasattr(src, '__getitem__'): - # DuckDB struct — try attribute access - try: - record_id = src['record_id'] - except (KeyError, TypeError, IndexError): - pass - if not record_id: - continue - m = OSM_RECORD_RE.match(str(record_id)) - if m: - return m.group(1), int(m.group(2)), str(record_id) - return None, None, None - - -def run_import(): - """Main import: DuckDB reads S3 Parquet → PostgreSQL via chunked OFFSET/LIMIT.""" - log.info(f'Overture release: {OVERTURE_RELEASE}') - log.info(f'S3 path: {S3_PATH}') - log.info(f'Bounding box: {BBOX}') - - # Connect to PostgreSQL - conn = psycopg2.connect(**DB_CONFIG) - conn.autocommit = False - create_table(conn) - - # Set up DuckDB with httpfs and spatial for S3 access - duck = duckdb.connect() - duck.execute("INSTALL httpfs; LOAD httpfs;") - duck.execute("INSTALL spatial; LOAD spatial;") - duck.execute("SET s3_region='us-west-2';") - - # Use a materialized approach: DuckDB query → Arrow → iterate in Python - query = f""" - SELECT - id, - ST_X(geometry) AS lon, - ST_Y(geometry) AS lat, - names.primary AS name, - basic_category, - confidence, - phones, - websites, - socials, - brand, - sources - FROM read_parquet('{S3_PATH}', hive_partitioning=true) - WHERE bbox.xmin >= {BBOX['xmin']} - AND bbox.xmax <= {BBOX['xmax']} - AND bbox.ymin >= {BBOX['ymin']} - AND bbox.ymax <= {BBOX['ymax']} - """ - - log.info('Starting DuckDB query against S3 (this will take several minutes)...') - t_start = time.time() - - # Execute and fetch all as Arrow for efficient iteration - result_rel = duck.sql(query) - - upsert_sql = """ - INSERT INTO places (id, geometry, name, basic_category, confidence, - phone, website, socials, brand_name, brand_wikidata, - osm_type, osm_id, source_record_id, raw_sources) - VALUES %s - ON CONFLICT (id) DO UPDATE SET - geometry = EXCLUDED.geometry, - name = EXCLUDED.name, - basic_category = EXCLUDED.basic_category, - confidence = EXCLUDED.confidence, - phone = EXCLUDED.phone, - website = EXCLUDED.website, - socials = EXCLUDED.socials, - brand_name = EXCLUDED.brand_name, - brand_wikidata = EXCLUDED.brand_wikidata, - osm_type = EXCLUDED.osm_type, - osm_id = EXCLUDED.osm_id, - source_record_id = EXCLUDED.source_record_id, - raw_sources = EXCLUDED.raw_sources - """ - - template = """( - %(id)s, - ST_SetSRID(ST_MakePoint(%(lon)s, %(lat)s), 4326), - %(name)s, - %(basic_category)s, - %(confidence)s, - %(phone)s, - %(website)s, - %(socials)s::jsonb, - %(brand_name)s, - %(brand_wikidata)s, - %(osm_type)s, - %(osm_id)s, - %(source_record_id)s, - %(raw_sources)s::jsonb - )""" - - total = 0 - osm_refs = 0 - batch = [] - - log.info('DuckDB query executing, fetching results in chunks...') - - # Fetch in chunks using fetchmany on the relation - chunk_size = BATCH_SIZE - while True: - chunk = result_rel.fetchmany(chunk_size) - if not chunk: - break - - for row in chunk: - row_id = row[0] - lon = row[1] - lat = row[2] - name = row[3] - basic_cat = row[4] - conf = row[5] - phones = row[6] - websites = row[7] - socials_raw = row[8] - brand_raw = row[9] - sources_raw = row[10] - - if lon is None or lat is None: - continue - - # Phone: first element of VARCHAR[] - phone = None - if phones and len(phones) > 0: - phone = str(phones[0]) if phones[0] else None - - # Website: first element of VARCHAR[] - website = None - if websites and len(websites) > 0: - website = str(websites[0]) if websites[0] else None - - # Socials: VARCHAR[] → JSON array of strings - socials_json = None - if socials_raw and len(socials_raw) > 0: - socials_json = json.dumps([str(s) for s in socials_raw if s]) - - # Brand: struct with wikidata and names.primary - brand_name = None - brand_wikidata = None - if brand_raw: - try: - if isinstance(brand_raw, dict): - brand_wikidata = brand_raw.get('wikidata') - names_struct = brand_raw.get('names') - if names_struct and isinstance(names_struct, dict): - brand_name = names_struct.get('primary') - else: - # DuckDB struct — access by key - brand_wikidata = brand_raw['wikidata'] if 'wikidata' in dir(brand_raw) else None - try: - brand_wikidata = brand_raw[0] # wikidata is first field - names_struct = brand_raw[1] # names is second field - if names_struct: - brand_name = names_struct[0] # primary is first field - except (IndexError, TypeError): - pass - except Exception: - pass - - # Sources: parse OSM cross-reference - sources_list = None - if sources_raw: - if isinstance(sources_raw, (list, tuple)): - sources_list = [] - for s in sources_raw: - if isinstance(s, dict): - sources_list.append(s) - else: - # DuckDB struct tuple — convert - try: - sources_list.append({ - 'dataset': s[1] if len(s) > 1 else None, - 'record_id': s[3] if len(s) > 3 else None, - }) - except (TypeError, IndexError): - pass - - osm_type_letter, osm_id_val, source_record_id = parse_osm_ref(sources_list) - if osm_type_letter: - osm_refs += 1 - - raw_sources_json = json.dumps(sources_list) if sources_list else None - - batch.append({ - 'id': row_id, - 'lon': float(lon), - 'lat': float(lat), - 'name': name, - 'basic_category': basic_cat, - 'confidence': float(conf) if conf is not None else None, - 'phone': phone, - 'website': website, - 'socials': socials_json, - 'brand_name': brand_name, - 'brand_wikidata': brand_wikidata, - 'osm_type': osm_type_letter, - 'osm_id': osm_id_val, - 'source_record_id': source_record_id, - 'raw_sources': raw_sources_json, - }) - - if len(batch) >= BATCH_SIZE: - with conn.cursor() as cur: - psycopg2.extras.execute_values( - cur, upsert_sql, batch, - template=template, - page_size=BATCH_SIZE - ) - conn.commit() - total += len(batch) - elapsed = time.time() - t_start - rate = total / elapsed if elapsed > 0 else 0 - log.info(f'Inserted {total:,} rows ({osm_refs:,} OSM xrefs) ' - f'[{rate:.0f} rows/sec, {elapsed:.0f}s elapsed]') - batch = [] - - # Flush remaining - if batch: - with conn.cursor() as cur: - psycopg2.extras.execute_values( - cur, upsert_sql, batch, - template=template, - page_size=BATCH_SIZE - ) - conn.commit() - total += len(batch) - - duck.close() - - # Final stats - elapsed = time.time() - t_start - log.info(f'Import complete: {total:,} rows, {osm_refs:,} OSM cross-refs, ' - f'{elapsed:.0f}s total ({total/elapsed:.0f} rows/sec)') - - # Verify - with conn.cursor() as cur: - cur.execute("SELECT count(*) FROM places") - count = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM places WHERE osm_type IS NOT NULL") - osm_count = cur.fetchone()[0] - log.info(f'Final table: {count:,} total rows, {osm_count:,} with OSM cross-references') - - conn.close() - - -if __name__ == '__main__': - run_import()