Compare commits

..

5 commits

Author SHA1 Message Date
248f4bded4 Fix wiki lookup to match on name+state+country instead of osm_key/osm_value
- Remove osm_key/osm_value from wiki_places lookup query
- Add fallback matching: try state first, then country only
- Parse state/country from wikipedia extratag when address is empty
- Add US states and Canadian provinces parsing for wikipedia titles
- Apply wiki enrichment to cached results (was missing)

Fixes wiki_summary and wiki_url not appearing for boundary/administrative
places like Joliet, IL where OSM returns boundary/administrative but
wiki_places has place/city.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-03 00:17:49 +00:00
1b3ad1bf9e fix(google_places): add schema migration for Google columns
The place_cache table was missing google_place_id, google_data, and
google_fetched_at columns needed by the Google Places enrichment.

Adds ALTER TABLE migration in _get_db() to add columns if missing,
wrapped in try/except to handle existing columns gracefully.

Fixes HTTP 500 on /api/place/W/* endpoints.
2026-04-30 03:22:38 +00:00
Ubuntu
bb7d0169e0 fix(place): preserve extratags.wikipedia format when has_kiwix_wiki enabled
When has_kiwix_wiki is enabled, skip rewriting extratags.wikipedia to
a Kiwix content URL. The wiki_url field already provides the correct
local Kiwix viewer link for the WikiSummarySection.

This preserves the original OSM format (e.g. "en:Title") in
extratags.wikipedia so the frontend LINKS section can properly build
wikipedia.org URLs.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-04-29 18:53:03 +00:00
c92178d90f fix(place): handle boundary/administrative for wiki lookup
Add place/linked_place to extratags so wiki enrichment can detect
actual place type (e.g. boundary:administrative with place=city
should match wiki_places osm_key=place, osm_value=city).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-29 17:21:38 +00:00
Ubuntu
2d1dcbf70c feat(place): add wiki location index lookup
Add wiki summary, URLs, and population data from wiki_index.db to
place detail and reverse geocode responses.

- Add lib/wiki_index.py: SQLite read-only lookup module
- Enrich /api/place responses via _enrich_with_wiki_index()
- Enrich /api/reverse responses via _enrich_reverse_result_with_wiki()
- Gate on has_kiwix_wiki feature flag (default false in home.yaml)
- Direct match only on place_name + osm_key + osm_value + state + country_code
- Additive fields: wiki_summary, wiki_url, wikivoyage_url, wiki_population

DB path: /opt/recon/data/wiki_index.db

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-29 17:07:31 +00:00
19 changed files with 368 additions and 4805 deletions

View file

@ -6,13 +6,13 @@ profile: home
region_name: "North America"
tileset:
url: "/tiles/planet/current.pmtiles"
url: "/tiles/na.pmtiles"
bounds: [-168, 14, -52, 72]
max_zoom: 15
attribution: "Protomaps © OSM"
tileset_hillshade:
url: "/tiles/planet-dem.pmtiles"
url: "/tiles/hillshade-na.pmtiles"
encoding: "terrarium"
max_zoom: 12
@ -33,14 +33,14 @@ services:
features:
has_nominatim_details: true
has_kiwix_wiki: true
has_kiwix_wiki: false
has_hillshade: true
has_3d_terrain: false
has_traffic_overlay: true
has_landclass: true
has_public_lands_layer: true
has_contours: true
has_contours_test: false
has_contours_test: true
has_contours_test_10ft: false
has_address_book_write: false
has_overture_enrichment: true
@ -48,16 +48,7 @@ features:
has_contacts: true
has_wiki_rewriting: true
has_wiki_discovery: false
has_usfs_trails: true
has_blm_trails: true
defaults:
center: [42.5736, -114.6066]
zoom: 10
# Offroute wilderness routing
offroute:
osm_pbf_path: "/mnt/nav/sources/idaho-latest.osm.pbf"
densify_interval_m: 100
postgis_dsn: "dbname=padus"

View file

@ -2722,214 +2722,3 @@ def api_auth_whoami():
'authenticated': False,
'username': None,
})
# ── OFFROUTE API ──
@app.route("/api/offroute", methods=["POST"])
def api_offroute():
"""
Off-network routing from wilderness to destination.
Request body:
{
"start": [lat, lon],
"end": [lat, lon],
"mode": "foot" | "mtb" | "atv", (default: "foot")
"boundary_mode": "strict" | "pragmatic" | "emergency" (default: "pragmatic")
}
Response:
{
"status": "ok",
"route": { GeoJSON FeatureCollection with wilderness + network segments },
"summary": { total_distance_km, total_effort_minutes, ... }
}
"""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON body provided"}), 400
# Parse coordinates
start = data.get("start")
end = data.get("end")
if not start or not end:
return jsonify({"status": "error", "message": "Missing start or end coordinates"}), 400
if not isinstance(start, (list, tuple)) or len(start) != 2:
return jsonify({"status": "error", "message": "start must be [lat, lon]"}), 400
if not isinstance(end, (list, tuple)) or len(end) != 2:
return jsonify({"status": "error", "message": "end must be [lat, lon]"}), 400
start_lat, start_lon = float(start[0]), float(start[1])
end_lat, end_lon = float(end[0]), float(end[1])
# Parse options
mode = data.get("mode", "foot")
if mode not in ("auto", "foot", "mtb", "atv", "vehicle"):
return jsonify({"status": "error", "message": "mode must be auto, foot, mtb, atv, or vehicle"}), 400
boundary_mode = data.get("boundary_mode", "pragmatic")
if boundary_mode not in ("strict", "pragmatic", "emergency"):
return jsonify({"status": "error", "message": "boundary_mode must be strict, pragmatic, or emergency"}), 400
# Import and run router
from .offroute.router import OffrouteRouter
router = OffrouteRouter()
try:
result = router.route(
start_lat=start_lat,
start_lon=start_lon,
end_lat=end_lat,
end_lon=end_lon,
mode=mode,
boundary_mode=boundary_mode
)
finally:
router.close()
if result.get("status") == "error":
return jsonify(result), 400
return jsonify(result)
except Exception as e:
logger.exception("Offroute error")
return jsonify({"status": "error", "message": str(e)}), 500
# ── MVUM Places Panel API ──
@app.route("/api/mvum", methods=["GET"])
def api_mvum():
"""
Query MVUM (Motor Vehicle Use Map) features near a point.
Used by the Navi frontend places panel when a user taps near a road/trail.
Query params:
lat: Latitude
lon: Longitude
radius: Search radius in meters (default: 50)
Response:
{
"status": "ok",
"feature": {
"id": "FR 123",
"name": "Some Forest Road",
"forest": "Sawtooth National Forest",
"district": "Ketchum Ranger District",
"surface": "NAT",
"maintenance_level": 2,
"seasonal": "Seasonal",
"symbol": 2,
"access": {
"passenger_vehicle": { "status": "Open", "dates": "06/15-10/15" },
"high_clearance": { "status": "Open", "dates": "06/15-10/15" },
"atv": { "status": "Open", "dates": "06/15-10/15" },
...
},
"geometry": { GeoJSON LineString }
}
}
If no MVUM feature within radius:
{ "status": "ok", "feature": null }
"""
try:
lat = request.args.get("lat", type=float)
lon = request.args.get("lon", type=float)
radius = request.args.get("radius", 50, type=float)
if lat is None or lon is None:
return jsonify({"status": "error", "message": "lat and lon required"}), 400
from .offroute.mvum import MVUMReader
reader = MVUMReader()
try:
# Try roads first, then trails
feature = reader.query_nearest(lat, lon, radius, "mvum_roads")
if feature is None:
feature = reader.query_nearest(lat, lon, radius, "mvum_trails")
if feature is None:
return jsonify({"status": "ok", "feature": None})
# Format access info
access = {
"passenger_vehicle": {
"status": feature.get("passengervehicle"),
"dates": feature.get("passengervehicle_datesopen")
},
"high_clearance": {
"status": feature.get("highclearancevehicle"),
"dates": feature.get("highclearancevehicle_datesopen")
},
"atv": {
"status": feature.get("atv"),
"dates": feature.get("atv_datesopen")
},
"motorcycle": {
"status": feature.get("motorcycle"),
"dates": feature.get("motorcycle_datesopen")
},
"4wd_gt50": {
"status": feature.get("fourwd_gt50inches"),
"dates": feature.get("fourwd_gt50_datesopen")
},
"2wd_gt50": {
"status": feature.get("twowd_gt50inches"),
"dates": feature.get("twowd_gt50_datesopen")
},
"e_bike_class1": {
"status": feature.get("e_bike_class1"),
"dates": feature.get("e_bike_class1_dur")
},
"e_bike_class2": {
"status": feature.get("e_bike_class2"),
"dates": feature.get("e_bike_class2_dur")
},
"e_bike_class3": {
"status": feature.get("e_bike_class3"),
"dates": feature.get("e_bike_class3_dur")
},
}
# Parse maintenance level
maint_level = feature.get("operationalmaintlevel", "")
maint_num = None
if maint_level:
# Extract first digit: "2 - HIGH CLEARANCE VEHICLES" -> 2
import re
match = re.match(r"(\d+)", maint_level)
if match:
maint_num = int(match.group(1))
result = {
"id": feature.get("id"),
"name": feature.get("name"),
"forest": feature.get("forestname"),
"district": feature.get("districtname"),
"surface": feature.get("surfacetype"),
"maintenance_level": maint_num,
"seasonal": feature.get("seasonal"),
"symbol": feature.get("symbol"),
"trail_class": feature.get("trailclass"),
"trail_system": feature.get("trailsystem"),
"access": access,
"geometry": feature.get("geojson")
}
return jsonify({"status": "ok", "feature": result})
finally:
reader.close()
except Exception as e:
logger.exception("MVUM query error")
return jsonify({"status": "error", "message": str(e)}), 500

View file

@ -21,7 +21,6 @@ Config: processing.extract_workers, processing.max_pdf_size_mb,
processing.extract_timeout, processing.page_timeout
"""
import base64
import re
import json
import os
import random
@ -100,40 +99,6 @@ def _is_transient(error_str):
return any(sig in s for sig in transient_signals)
def _text_quality_ok(text, min_length=50):
"""Check if extracted text meets quality thresholds.
Beyond the basic length check, validates:
- Word-boundary ratio: at least 60% of tokens should be real words (2+ alpha chars)
- Concatenation ratio: lowercase-immediately-followed-by-uppercase shouldn't exceed 10% of word count
Returns True if text passes all checks.
"""
text = text.strip()
if len(text) < min_length:
return False
words = text.split()
if not words:
return False
# Word-like ratio: tokens with 2+ alphabetic characters
word_like = sum(1 for w in words if len(re.findall(r'[a-zA-Z]', w)) >= 2)
word_ratio = word_like / len(words)
if word_ratio < 0.60:
return False
# Concatenation detector: lowercase immediately followed by uppercase
# Filter out common camelCase patterns in code (short tokens)
concat_hits = len(re.findall(r'[a-z][A-Z]', text))
concat_ratio = concat_hits / len(words) if words else 0
if concat_ratio > 0.10:
return False
return True
def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30):
"""Render a single PDF page to PNG bytes using pdftoppm.
@ -259,7 +224,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# Method 1: pdftotext (poppler)
try:
result = subprocess.run(
['pdftotext', '-layout', '-f', str(page_num_0indexed + 1),
['pdftotext', '-f', str(page_num_0indexed + 1),
'-l', str(page_num_0indexed + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
@ -268,7 +233,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if _text_quality_ok(text):
if len(text.strip()) >= 50:
return text, 'pdftotext'
# Method 2: pdftoppm + Tesseract OCR
@ -293,7 +258,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if _text_quality_ok(text):
if len(text.strip()) >= 50:
return text, 'tesseract'
# Method 3: Gemini Vision (last resort)
@ -311,26 +276,8 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# ── Core extraction functions ──
def _pypdf2_extract(reader, page_num):
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout.
Tries default extraction first (space_width=200). If quality check fails,
retries with space_width=100 which better detects word boundaries in
tightly-kerned PDFs (common in Haynes/workshop manuals).
Note: PyPDF2 3.0.1 does not support layout=True. The space_width parameter
controls word-boundary detection tolerance. Lower values = more aggressive
space insertion between characters.
"""
text = reader.pages[page_num].extract_text() or ''
if _text_quality_ok(text):
return text
# Retry with tighter word-boundary detection
text_tight = reader.pages[page_num].extract_text(space_width=100.0) or ''
if len(text_tight.strip()) >= len(text.strip()):
return text_tight
return text
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout."""
return reader.pages[page_num].extract_text() or ''
def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
@ -355,13 +302,13 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
text = ''
if _text_quality_ok(text):
if len(text.strip()) >= 50:
return text, 'pypdf2'
# Method 2: pdftotext via subprocess (inherently timeout-safe)
try:
result = subprocess.run(
['pdftotext', '-layout', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()):
@ -369,7 +316,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if _text_quality_ok(text):
if len(text.strip()) >= 50:
return text, 'pdftotext'
# Method 3: pdftoppm + Tesseract OCR
@ -393,7 +340,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if _text_quality_ok(text):
if len(text.strip()) >= 50:
return text, 'tesseract'
# Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs)

View file

@ -47,6 +47,18 @@ def _get_db():
)
""")
_db_conn.commit()
# Schema migration: add Google columns to place_cache if missing
for col, coldef in [
('google_place_id', 'TEXT'),
('google_data', 'TEXT'),
('google_fetched_at', 'INTEGER'),
]:
try:
_db_conn.execute(f'ALTER TABLE place_cache ADD COLUMN {col} {coldef}')
logger.info(f'Added column {col} to place_cache')
except sqlite3.OperationalError:
pass # Column already exists
_db_conn.commit()
return _db_conn

View file

@ -4,23 +4,77 @@ RECON Netsyms API + Geocode — Flask Blueprints.
GET /api/netsyms/lookup?q=<free text>&country=<optional>
GET /api/netsyms/health
GET /api/geocode?q=<query>&limit=<N> (Photon-first search with ranked results)
GET /api/reverse/<lat>/<lon> (localhost-sourced enrichment bundle for Central)
"""
import sqlite3
import threading
from cachetools import TTLCache
from flask import Blueprint, request, jsonify
from . import netsyms
from . import address_book
from . import nav_tools
from .geocode import PHOTON_URL
from .utils import setup_logging
logger = setup_logging('recon.netsyms_api')
def _enrich_reverse_result_with_wiki(result):
"""
Add wiki data to a reverse geocode result if available.
Only runs when has_kiwix_wiki is enabled.
"""
try:
from .deployment_config import get_deployment_config
deploy_config = get_deployment_config()
features = deploy_config.get('features', {})
if not features.get('has_kiwix_wiki', False):
return result
except Exception:
return result
try:
from . import wiki_index
except ImportError:
return result
if not wiki_index.is_available():
return result
# Extract match criteria from Photon raw props
raw = result.get('raw', {})
place_name = raw.get('name', '')
osm_key = raw.get('osm_key', '')
osm_value = raw.get('osm_value', '')
state = raw.get('state', '')
country = raw.get('country', '')
# Extract country code (Photon uses full country name, we need code)
country_code = raw.get('countrycode', '').lower()
if not country_code:
country_lower = country.lower() if country else ''
if 'united states' in country_lower or country_lower == 'usa':
country_code = 'us'
elif 'canada' in country_lower:
country_code = 'ca'
if not place_name or not osm_key or not osm_value or not country_code:
return result
# Look up wiki data
wiki_data = wiki_index.lookup_wiki(place_name, osm_key, osm_value, state, country_code)
if wiki_data:
# Add wiki fields to result (additive only)
if 'wiki_summary' in wiki_data:
result['wiki_summary'] = wiki_data['wiki_summary']
if 'wiki_url' in wiki_data:
result['wiki_url'] = wiki_data['wiki_url']
if 'wikivoyage_url' in wiki_data:
result['wikivoyage_url'] = wiki_data['wikivoyage_url']
if 'wiki_population' in wiki_data:
result['wiki_population'] = wiki_data['wiki_population']
return result
netsyms_bp = Blueprint('netsyms', __name__)
geocode_bp = Blueprint('geocode', __name__)
@ -129,159 +183,7 @@ def api_reverse():
from .geocode import _parse_photon_features
results = _parse_photon_features(features, source='photon_reverse')
# Enrich results with wiki data
results = [_enrich_reverse_result_with_wiki(r) for r in results]
return jsonify({'query': query_str, 'results': results, 'count': len(results)})
# ─────────────────────────────────────────────────────────────────────────
# /api/reverse/<lat>/<lon> — localhost-sourced enrichment bundle (Central)
#
# Sibling to the query-string /api/reverse above; that route is unchanged.
# Every component is sourced from localhost only (Photon, timezones.sqlite,
# in-process landclass/PostGIS, Valhalla). Each lookup is independent: a
# component failure logs a warning and yields null — never a 5xx.
# ─────────────────────────────────────────────────────────────────────────
_TZ_DB_PATH = "/mnt/nav/sources/timezones.sqlite"
_VALHALLA_HEIGHT_URL = "http://localhost:8002/height"
# Full bundle cache: key=(round(lat,4), round(lon,4)) -> dict. ~10k entries, 24h TTL.
_REVERSE_BUNDLE_CACHE = TTLCache(maxsize=10_000, ttl=86_400)
_REVERSE_BUNDLE_LOCK = threading.Lock()
_BUNDLE_KEYS = ('name', 'city', 'county', 'state', 'country',
'postal_code', 'timezone', 'landclass', 'elevation_m')
def _spatialite_blob_to_wkb(blob):
"""Recover standard WKB from a SpatiaLite geometry BLOB.
Layout: [00][endian][srid:4][mbr:32][7C][WKB body][FE]. The body omits the
leading byte-order marker, so we re-prepend it and drop the trailing 0xFE.
"""
return bytes([blob[1]]) + blob[39:-1]
def _reverse_photon(lat, lon):
"""Nearest-feature admin fields from local Photon. Returns the six address
fields (any value may be None). Mirrors the existing /api/reverse call."""
import requests as http_requests
resp = http_requests.get(
f"{PHOTON_URL}/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
features = resp.json().get("features", [])
if not features:
return {}
props = features[0].get("properties", {})
return {
"name": props.get("name"),
"city": props.get("city"),
"county": props.get("county"),
"state": props.get("state"),
"country": props.get("country"),
"postal_code": props.get("postcode"),
}
def _reverse_timezone(lat, lon):
"""IANA tzid for the point from local timezones.sqlite (SpatiaLite tz_world).
Uses the table's R-tree index for an MBR prefilter, then shapely
point-in-polygon on the few candidates. Returns None if unresolved.
"""
from shapely import wkb
from shapely.geometry import Point
con = sqlite3.connect(f"file:{_TZ_DB_PATH}?mode=ro", uri=True)
try:
cur = con.cursor()
cur.execute(
"SELECT pkid FROM idx_tz_world_geom "
"WHERE xmin<=? AND xmax>=? AND ymin<=? AND ymax>=?",
(lon, lon, lat, lat),
)
candidates = [r[0] for r in cur.fetchall()]
if not candidates:
return None
pt = Point(lon, lat)
for pk in candidates:
row = cur.execute(
"SELECT tzid, geom FROM tz_world WHERE pk_uid=?", (pk,)
).fetchone()
if row and wkb.loads(_spatialite_blob_to_wkb(row[1])).contains(pt):
return row[0]
return None
finally:
con.close()
def _reverse_landclass(lat, lon):
"""Most-specific PAD-US land class for the point, looked up in-process.
Returns None when there is no coverage or landclass is unavailable."""
from .landclass import lookup_landclass, format_summary
return format_summary(lookup_landclass(lat, lon))
def _reverse_elevation(lat, lon):
"""Elevation in metres from local Valhalla /height. None on failure."""
import requests as http_requests
resp = http_requests.post(
_VALHALLA_HEIGHT_URL,
json={"shape": [{"lat": lat, "lon": lon}]},
timeout=10,
)
resp.raise_for_status()
heights = resp.json().get("height", [])
return heights[0] if heights else None
@geocode_bp.route('/api/reverse/<lat>/<lon>')
def api_reverse_bundle(lat, lon):
"""Localhost-sourced reverse-geocode enrichment bundle for Central.
GET /api/reverse/<lat>/<lon>
Always returns 200 with EXACTLY these keys (any may be null):
name, city, county, state, country, postal_code, timezone, landclass, elevation_m
lat/lon are parsed manually (not via Flask's <float:> converter, which
rejects negative and integer coordinates) so out-of-range or unparseable
input yields 400 per contract; 503 is reserved for catastrophic failure.
"""
try:
lat = float(lat)
lon = float(lon)
except (ValueError, TypeError):
return jsonify({'error': 'lat and lon must be numbers'}), 400
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return jsonify({'error': 'lat must be -90..90, lon must be -180..180'}), 400
key = (round(lat, 4), round(lon, 4))
with _REVERSE_BUNDLE_LOCK:
cached = _REVERSE_BUNDLE_CACHE.get(key)
if cached is not None:
return jsonify(cached)
bundle = {k: None for k in _BUNDLE_KEYS}
try:
bundle.update(_reverse_photon(lat, lon))
except Exception:
logger.warning("reverse-bundle: Photon lookup failed for %s,%s", lat, lon)
try:
bundle['timezone'] = _reverse_timezone(lat, lon)
except Exception:
logger.warning("reverse-bundle: timezone lookup failed for %s,%s", lat, lon)
try:
bundle['landclass'] = _reverse_landclass(lat, lon)
except Exception:
logger.warning("reverse-bundle: landclass lookup failed for %s,%s", lat, lon)
try:
bundle['elevation_m'] = _reverse_elevation(lat, lon)
except Exception:
logger.warning("reverse-bundle: elevation lookup failed for %s,%s", lat, lon)
with _REVERSE_BUNDLE_LOCK:
_REVERSE_BUNDLE_CACHE[key] = bundle
return jsonify(bundle)

View file

@ -1 +0,0 @@
"""OFFROUTE: Off-network effort-based routing module."""

View file

@ -1,440 +0,0 @@
"""
PAD-US barrier and wilderness layers for OFFROUTE.
Provides access to:
1. Barrier raster (Pub_Access = 'XA' - closed/restricted areas)
2. Wilderness raster (Des_Tp = 'WA' - designated wilderness areas)
Build functions rasterize PAD-US geodatabase to aligned GeoTIFFs.
Runtime functions read the rasters and resample to match elevation grids.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
import subprocess
import tempfile
import os
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for barriers layer support")
# Paths
DEFAULT_BARRIERS_PATH = Path("/mnt/nav/worldcover/padus_barriers.tif")
DEFAULT_WILDERNESS_PATH = Path("/mnt/nav/worldcover/wilderness.tif")
PADUS_GDB_PATH = Path("/mnt/nav/padus/PADUS4_0_Geodatabase.gdb")
PADUS_LAYER = "PADUS4_0Combined_Proclamation_Marine_Fee_Designation_Easement"
# CONUS bounding box in WGS84
CONUS_BOUNDS = {
"west": -125.0,
"east": -66.0,
"south": 24.0,
"north": 50.0,
}
# Resolution in degrees (~30m at mid-latitudes)
PIXEL_SIZE = 0.0003 # ~33m
class BarrierReader:
"""Reader for PAD-US barrier raster (closed/restricted areas)."""
def __init__(self, barrier_path: Path = DEFAULT_BARRIERS_PATH):
self.barrier_path = barrier_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.barrier_path.exists():
raise FileNotFoundError(
f"Barrier raster not found at {self.barrier_path}. "
f"Run build_barriers_raster() first."
)
self._dataset = rasterio.open(self.barrier_path)
return self._dataset
def get_barrier_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get barrier values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 barrier values:
255 = closed/restricted (impassable when respect_boundaries=True)
0 = public/accessible
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
barriers = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return barriers
def sample_point(self, lat: float, lon: float) -> int:
"""Sample barrier value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
class WildernessReader:
"""Reader for PAD-US wilderness raster (designated wilderness areas)."""
def __init__(self, wilderness_path: Path = DEFAULT_WILDERNESS_PATH):
self.wilderness_path = wilderness_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.wilderness_path.exists():
raise FileNotFoundError(
f"Wilderness raster not found at {self.wilderness_path}. "
f"Run build_wilderness_raster() first."
)
self._dataset = rasterio.open(self.wilderness_path)
return self._dataset
def get_wilderness_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get wilderness values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 wilderness values:
255 = designated wilderness area
0 = not wilderness
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
wilderness = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return wilderness
def sample_point(self, lat: float, lon: float) -> int:
"""Sample wilderness value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def build_barriers_raster(
output_path: Path = DEFAULT_BARRIERS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US barriers raster from the source geodatabase.
Extracts polygons where Pub_Access = 'XA' (Closed) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US barriers raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
closed_gpkg = Path(tmpdir) / "closed_areas.gpkg"
print(f"\n[1/3] Extracting closed areas (Pub_Access = 'XA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(closed_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Pub_Access = 'XA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "closed_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(closed_gpkg), "closed_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing closed areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "closed_areas",
str(closed_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
closed_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {closed_count} closed cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
def build_wilderness_raster(
output_path: Path = DEFAULT_WILDERNESS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US wilderness raster from the source geodatabase.
Extracts polygons where Des_Tp = 'WA' (Wilderness Area) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US wilderness raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
wilderness_gpkg = Path(tmpdir) / "wilderness_areas.gpkg"
print(f"\n[1/3] Extracting wilderness areas (Des_Tp = 'WA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(wilderness_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Des_Tp = 'WA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "wilderness_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(wilderness_gpkg), "wilderness_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing wilderness areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "wilderness_areas",
str(wilderness_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
wilderness_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {wilderness_count} wilderness cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == "build":
print("=" * 60)
print("PAD-US Barriers Raster Build")
print("=" * 60)
build_barriers_raster()
elif cmd == "build-wilderness":
print("=" * 60)
print("PAD-US Wilderness Raster Build")
print("=" * 60)
build_wilderness_raster()
elif cmd == "build-all":
print("=" * 60)
print("Building all PAD-US rasters")
print("=" * 60)
build_barriers_raster()
print("\n")
build_wilderness_raster()
else:
print(f"Unknown command: {cmd}")
print("Usage:")
print(" python barriers.py build # Build barriers raster")
print(" python barriers.py build-wilderness # Build wilderness raster")
print(" python barriers.py build-all # Build both rasters")
sys.exit(1)
else:
# Test readers
print("Testing BarrierReader...")
if not DEFAULT_BARRIERS_PATH.exists():
print(f"Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run: python barriers.py build")
sys.exit(1)
reader = BarrierReader()
barriers = reader.get_barrier_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"\nBarrier grid shape: {barriers.shape}")
print(f"Unique values: {np.unique(barriers)}")
closed_cells = np.sum(barriers == 255)
print(f"Closed cells: {closed_cells} ({100*closed_cells/barriers.size:.2f}%)")
reader.close()
print("\nTesting WildernessReader...")
if not DEFAULT_WILDERNESS_PATH.exists():
print(f"Wilderness raster not found at {DEFAULT_WILDERNESS_PATH}")
print(f"Run: python barriers.py build-wilderness")
else:
wilderness_reader = WildernessReader()
wilderness = wilderness_reader.get_wilderness_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"Wilderness grid shape: {wilderness.shape}")
print(f"Unique values: {np.unique(wilderness)}")
wilderness_cells = np.sum(wilderness == 255)
print(f"Wilderness cells: {wilderness_cells} ({100*wilderness_cells/wilderness.size:.2f}%)")
wilderness_reader.close()
print("\nDone.")

View file

@ -1,494 +0,0 @@
"""
Multi-mode travel cost functions for OFFROUTE.
Supports four travel modes: foot, mtb, atv, vehicle.
Each mode has its own speed function, max slope, trail access rules,
and terrain friction overrides.
Mode profiles are data-driven adding a new mode means adding a profile entry.
"""
import math
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Literal, Dict, Callable
# ═══════════════════════════════════════════════════════════════════════════════
# SPEED FUNCTIONS
# ═══════════════════════════════════════════════════════════════════════════════
def tobler_off_path_speed(grade: np.ndarray, base_speed: float = 6.0) -> np.ndarray:
"""
Tobler off-path hiking function.
W = 0.6 * base_speed * exp(-3.5 * |S + 0.05|)
Peak ~3.6 km/h at grade = -0.05 (slight downhill).
The 0.6 multiplier is the off-trail penalty.
"""
return 0.6 * base_speed * np.exp(-3.5 * np.abs(grade + 0.05))
def herzog_wheeled_speed(grade: np.ndarray, base_speed: float = 12.0) -> np.ndarray:
"""
Herzog wheeled-transport polynomial.
Relative speed factor:
1 / (1337.8·S^6 + 278.19·S^5 517.39·S^4 78.199·S^3 + 93.419·S^2 + 19.825·|S| + 1.64)
Multiply by base_speed to get km/h.
"""
S = grade
S_abs = np.abs(S)
# Herzog polynomial (returns relative speed factor 0-1)
denom = (1337.8 * S**6 + 278.19 * S**5 - 517.39 * S**4
- 78.199 * S**3 + 93.419 * S**2 + 19.825 * S_abs + 1.64)
# Avoid division by zero and negative speeds
denom = np.maximum(denom, 0.1)
rel_speed = 1.0 / denom
# Clamp relative speed to reasonable bounds (0.05 to 1.5)
rel_speed = np.clip(rel_speed, 0.05, 1.5)
return base_speed * rel_speed
def linear_degrade_speed(grade: np.ndarray, base_speed: float = 40.0, max_grade: float = 0.364) -> np.ndarray:
"""
Linear speed degradation with slope.
speed = base_speed * max(0, 1 - |grade| / max_grade)
max_grade = tan(20°) 0.364 for 20° max slope.
"""
speed = base_speed * np.maximum(0, 1.0 - np.abs(grade) / max_grade)
return np.maximum(speed, 0.1) # Minimum crawl speed
# ═══════════════════════════════════════════════════════════════════════════════
# MODE PROFILES (Data-driven configuration)
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class ModeProfile:
"""Configuration for a travel mode."""
name: str
description: str
# Speed function parameters
speed_function: str # "tobler", "herzog", "linear"
base_speed_kmh: float
max_slope_deg: float
# Trail access: trail_value -> friction multiplier (None = impassable)
# Trail values: 5=road, 15=track, 25=foot trail
trail_friction: Dict[int, Optional[float]] = field(default_factory=dict)
# Off-trail terrain friction overrides (by WorldCover class)
# These MULTIPLY the base WorldCover friction
# None = use default, np.inf = impassable
# WorldCover values: 10=tree, 20=shrub, 30=grass, 40=crop, 50=urban,
# 60=bare, 80=water, 90=wetland, 95=mangrove, 100=moss
terrain_friction_override: Dict[int, Optional[float]] = field(default_factory=dict)
# Should wilderness areas be impassable?
wilderness_impassable: bool = False
# For vehicle mode: can traverse off-trail flat terrain?
off_trail_flat_threshold_deg: float = 0.0 # 0 = no off-trail allowed
off_trail_flat_friction: float = np.inf # friction if allowed
# Define all mode profiles
MODE_PROFILES: Dict[str, ModeProfile] = {
"foot": ModeProfile(
name="foot",
description="Hiking on foot (Tobler off-path model)",
speed_function="tobler",
base_speed_kmh=6.0,
max_slope_deg=40.0,
trail_friction={
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
},
terrain_friction_override={
# Use default WorldCover friction for foot mode
},
wilderness_impassable=False,
),
"mtb": ModeProfile(
name="mtb",
description="Mountain bike / dirt bike (Herzog wheeled model)",
speed_function="herzog",
base_speed_kmh=12.0,
max_slope_deg=25.0,
trail_friction={
5: 0.1, # road
15: 0.2, # track
25: 0.5, # foot trail (rideable but slow)
},
terrain_friction_override={
30: 2.0, # Grassland: rideable but slow
20: 4.0, # Shrubland: barely rideable
10: 8.0, # Tree cover/forest: effectively impassable
60: 3.0, # Bare/rocky
90: np.inf, # Wetland: impassable
95: np.inf, # Mangrove: impassable
80: np.inf, # Water: impassable
},
wilderness_impassable=True,
),
"atv": ModeProfile(
name="atv",
description="ATV / side-by-side (Herzog wheeled model, higher base speed)",
speed_function="herzog",
base_speed_kmh=25.0,
max_slope_deg=30.0,
trail_friction={
5: 0.1, # road
15: 0.3, # track
25: None, # foot trail: impassable (too narrow)
},
terrain_friction_override={
30: 1.5, # Grassland: passable
20: 3.0, # Shrubland: rough
10: np.inf, # Forest: impassable
60: 2.0, # Bare/rocky
90: np.inf, # Wetland: impassable
95: np.inf, # Mangrove: impassable
80: np.inf, # Water: impassable
},
wilderness_impassable=True,
),
"vehicle": ModeProfile(
name="vehicle",
description="4x4 truck / jeep (linear speed degradation)",
speed_function="linear",
base_speed_kmh=40.0,
max_slope_deg=20.0,
trail_friction={
5: 0.1, # road
15: 0.5, # track (rough but passable)
25: None, # foot trail: impassable
},
terrain_friction_override={
# All off-trail terrain is impassable by default
10: np.inf, # Forest
20: np.inf, # Shrubland
30: np.inf, # Grassland (except flat - see below)
40: np.inf, # Cropland (except flat - see below)
60: np.inf, # Bare
90: np.inf, # Wetland
95: np.inf, # Mangrove
80: np.inf, # Water
},
wilderness_impassable=True,
off_trail_flat_threshold_deg=5.0, # Can drive on flat fields
off_trail_flat_friction=5.0, # But very slow
),
}
# Pragmatic mode friction multiplier for private land
PRAGMATIC_BARRIER_MULTIPLIER = 5.0
# ═══════════════════════════════════════════════════════════════════════════════
# COST GRID COMPUTATION
# ═══════════════════════════════════════════════════════════════════════════════
def compute_cost_grid(
elevation: np.ndarray,
cell_size_m: float,
cell_size_lat_m: float = None,
cell_size_lon_m: float = None,
friction: Optional[np.ndarray] = None,
friction_raw: Optional[np.ndarray] = None,
trails: Optional[np.ndarray] = None,
barriers: Optional[np.ndarray] = None,
wilderness: Optional[np.ndarray] = None,
mvum: Optional[np.ndarray] = None,
boundary_mode: Literal["strict", "pragmatic", "emergency"] = "pragmatic",
mode: Literal["foot", "mtb", "atv", "vehicle"] = "foot"
) -> np.ndarray:
"""
Compute isotropic travel cost grid from elevation data.
Args:
elevation: 2D array of elevation values in meters
cell_size_m: Average cell size in meters
cell_size_lat_m: Cell size in latitude direction (optional)
cell_size_lon_m: Cell size in longitude direction (optional)
friction: Optional 2D array of friction multipliers (WorldCover).
Values should be float (1.0 = baseline, 2.0 = 2x slower).
np.inf marks impassable cells.
friction_raw: Optional 2D array of raw WorldCover class values (uint8).
Used for mode-specific terrain overrides.
Values: 10=tree, 20=shrub, 30=grass, etc.
trails: Optional 2D array of trail values (uint8).
0 = no trail, 5 = road, 15 = track, 25 = foot trail
barriers: Optional 2D array of barrier values (uint8).
255 = closed/restricted area (PAD-US Pub_Access = XA).
wilderness: Optional[np.ndarray] of wilderness values (uint8).
255 = designated wilderness area.
mvum: Optional[np.ndarray] of MVUM access values (uint8).
0 = no MVUM data, 1 = open, 255 = closed to this mode.
MVUM closures respond to boundary_mode (strict/pragmatic/emergency).
Foot mode should pass None (MVUM is motor-vehicle specific).
boundary_mode: How to handle barriers ("strict", "pragmatic", "emergency")
mode: Travel mode ("foot", "mtb", "atv", "vehicle")
Returns:
2D array of travel cost in seconds per cell.
np.inf for impassable cells.
"""
if boundary_mode not in ("strict", "pragmatic", "emergency"):
raise ValueError(f"boundary_mode must be 'strict', 'pragmatic', or 'emergency'")
if mode not in MODE_PROFILES:
raise ValueError(f"mode must be one of {list(MODE_PROFILES.keys())}")
profile = MODE_PROFILES[mode]
if cell_size_lat_m is None:
cell_size_lat_m = cell_size_m
if cell_size_lon_m is None:
cell_size_lon_m = cell_size_m
rows, cols = elevation.shape
# ─── Compute gradients (in-place where possible) ─────────────────────────
# Use float32 to reduce memory footprint
grade = np.zeros(elevation.shape, dtype=np.float32)
# Compute dy contribution to grade squared
dy_contrib = np.zeros(elevation.shape, dtype=np.float32)
dy_contrib[1:-1, :] = ((elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m)) ** 2
dy_contrib[0, :] = ((elevation[0, :] - elevation[1, :]) / cell_size_lat_m) ** 2
dy_contrib[-1, :] = ((elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m) ** 2
# Compute dx contribution and add to dy_contrib in-place
dy_contrib[:, 1:-1] += ((elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m)) ** 2
dy_contrib[:, 0] += ((elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m) ** 2
dy_contrib[:, -1] += ((elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m) ** 2
# grade = sqrt(dx^2 + dy^2)
np.sqrt(dy_contrib, out=grade)
del dy_contrib # Free memory immediately
# ─── Compute speed based on mode ─────────────────────────────────────────
max_grade_val = np.tan(np.radians(profile.max_slope_deg))
if profile.speed_function == "tobler":
speed_kmh = tobler_off_path_speed(grade, profile.base_speed_kmh)
elif profile.speed_function == "herzog":
speed_kmh = herzog_wheeled_speed(grade, profile.base_speed_kmh)
elif profile.speed_function == "linear":
speed_kmh = linear_degrade_speed(grade, profile.base_speed_kmh, max_grade_val)
else:
raise ValueError(f"Unknown speed function: {profile.speed_function}")
# ─── Base cost (seconds per cell) ─────────────────────────────────────────
avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2
cost = (avg_cell_size * 3.6) / speed_kmh
del speed_kmh
# ─── Max slope limit ──────────────────────────────────────────────────────
cost[grade > max_grade_val] = np.inf
# ─── NaN elevations ──────────────────────────────────────────────────────
cost[np.isnan(elevation)] = np.inf
# ─── Apply friction in-place ─────────────────────────────────────────────
# Instead of creating effective_friction copy, apply directly to cost
# Start with base friction
if friction is not None:
if friction.shape != elevation.shape:
raise ValueError(f"Friction shape mismatch")
np.multiply(cost, friction, out=cost)
# ─── Mode-specific terrain friction overrides (memory-efficient) ─────────
if friction_raw is not None and profile.terrain_friction_override:
if friction_raw.shape != elevation.shape:
raise ValueError(f"Friction_raw shape mismatch")
# Process all overrides without creating large intermediate masks
for wc_class, override in profile.terrain_friction_override.items():
if override is not None:
if override == np.inf:
# Use np.where for in-place-like behavior
np.putmask(cost, friction_raw == wc_class, np.inf)
else:
# Multiply cost where friction_raw matches
# Using a loop with putmask is more memory efficient
mask = friction_raw == wc_class
cost[mask] *= override
del mask
# ─── Vehicle mode: allow flat grassland/cropland ─────────────────────────
if mode == "vehicle" and profile.off_trail_flat_threshold_deg > 0:
if friction_raw is not None:
# Compute slope in degrees for flat terrain check
slope_deg = np.degrees(np.arctan(grade))
# Flat grassland or cropland - recompute cost for these cells
flat_field_mask = (
(slope_deg <= profile.off_trail_flat_threshold_deg) &
((friction_raw == 30) | (friction_raw == 40))
)
del slope_deg
# Recalculate cost for these cells with flat field friction
if np.any(flat_field_mask):
base_time = avg_cell_size * 3.6 / linear_degrade_speed(
grade[flat_field_mask], profile.base_speed_kmh, max_grade_val
)
cost[flat_field_mask] = base_time * profile.off_trail_flat_friction
del base_time
del flat_field_mask
# ─── Trail friction (mode-specific) ──────────────────────────────────────
if trails is not None:
if trails.shape != elevation.shape:
raise ValueError(f"Trails shape mismatch")
for trail_value, trail_friction in profile.trail_friction.items():
if trail_friction is None:
# Impassable for this mode
np.putmask(cost, trails == trail_value, np.inf)
else:
# Trail friction REPLACES terrain friction
# Recalculate cost = base_time * trail_friction
trail_mask = trails == trail_value
if np.any(trail_mask):
# Get base travel time (without friction)
if profile.speed_function == "tobler":
trail_speed = tobler_off_path_speed(grade[trail_mask], profile.base_speed_kmh)
elif profile.speed_function == "herzog":
trail_speed = herzog_wheeled_speed(grade[trail_mask], profile.base_speed_kmh)
else:
trail_speed = linear_degrade_speed(
grade[trail_mask], profile.base_speed_kmh, max_grade_val
)
cost[trail_mask] = (avg_cell_size * 3.6 / trail_speed) * trail_friction
del trail_speed
del trail_mask
# ─── Wilderness areas (mode-specific) ────────────────────────────────────
if wilderness is not None and profile.wilderness_impassable:
if wilderness.shape != elevation.shape:
raise ValueError(f"Wilderness shape mismatch")
np.putmask(cost, wilderness == 255, np.inf)
# ─── Barriers (private land) ─────────────────────────────────────────────
if barriers is not None and boundary_mode != "emergency":
if barriers.shape != elevation.shape:
raise ValueError(f"Barriers shape mismatch")
if boundary_mode == "strict":
np.putmask(cost, barriers == 255, np.inf)
elif boundary_mode == "pragmatic":
barrier_mask = barriers == 255
cost[barrier_mask] *= PRAGMATIC_BARRIER_MULTIPLIER
del barrier_mask
# ─── MVUM closures (motor vehicle restrictions) ──────────────────────────
# MVUM only applies to motorized modes, not foot. Foot mode should pass mvum=None.
# MVUM closures respond to the same boundary_mode as PAD-US barriers:
# "strict" = MVUM-closed road/trail is impassable
# "pragmatic" = MVUM-closed road/trail gets 5× friction penalty
# "emergency" = MVUM closures ignored entirely
if mvum is not None and mode != "foot" and boundary_mode != "emergency":
if mvum.shape != elevation.shape:
raise ValueError(f"MVUM shape mismatch")
# Value 255 = road/trail exists but is closed to this mode
mvum_closed_mask = mvum == 255
if boundary_mode == "strict":
np.putmask(cost, mvum_closed_mask, np.inf)
elif boundary_mode == "pragmatic":
cost[mvum_closed_mask] *= PRAGMATIC_BARRIER_MULTIPLIER
del mvum_closed_mask
return cost
# ═══════════════════════════════════════════════════════════════════════════════
# LEGACY API (backward compatibility)
# ═══════════════════════════════════════════════════════════════════════════════
def tobler_speed(grade: float) -> float:
"""Legacy single-value Tobler speed function."""
return 0.6 * 6.0 * math.exp(-3.5 * abs(grade + 0.05))
# ═══════════════════════════════════════════════════════════════════════════════
# TESTING
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 70)
print("OFFROUTE Multi-Mode Cost Function Tests")
print("=" * 70)
print("\n[1] Speed functions at various grades:")
print(f"{'Grade':<10} {'Foot':<12} {'MTB':<12} {'ATV':<12} {'Vehicle':<12}")
print("-" * 60)
for grade_val in [-0.3, -0.1, 0.0, 0.1, 0.2, 0.3]:
grade_arr = np.array([grade_val])
foot = tobler_off_path_speed(grade_arr, 6.0)[0]
mtb = herzog_wheeled_speed(grade_arr, 12.0)[0]
atv = herzog_wheeled_speed(grade_arr, 25.0)[0]
veh = linear_degrade_speed(grade_arr, 40.0, np.tan(np.radians(20)))[0]
print(f"{grade_val:+.2f} {foot:>6.2f} km/h {mtb:>6.2f} km/h {atv:>6.2f} km/h {veh:>6.2f} km/h")
print("\n[2] Mode profiles:")
for name, profile in MODE_PROFILES.items():
print(f"\n {name.upper()}: {profile.description}")
print(f" Max slope: {profile.max_slope_deg}°")
print(f" Trail access: {profile.trail_friction}")
print(f" Wilderness blocked: {profile.wilderness_impassable}")
print("\n[3] Cost grid test (flat terrain, forest):")
elev = np.ones((10, 10), dtype=np.float32) * 1000
friction = np.ones((10, 10), dtype=np.float32) * 2.0 # Forest friction
friction_raw = np.ones((10, 10), dtype=np.uint8) * 10 # Tree cover class
trails = np.zeros((10, 10), dtype=np.uint8)
trails[5, :] = 5 # Road across middle
for mode_name in ["foot", "mtb", "atv", "vehicle"]:
cost = compute_cost_grid(
elev, cell_size_m=30.0,
friction=friction,
friction_raw=friction_raw,
trails=trails,
mode=mode_name
)
off_trail_cost = cost[0, 0]
road_cost = cost[5, 0]
impassable = np.sum(np.isinf(cost))
print(f" {mode_name:8s}: off-trail={off_trail_cost:>8.1f}s, road={road_cost:>6.1f}s, impassable={impassable}")
print("\n[4] Wilderness blocking test:")
wilderness = np.zeros((10, 10), dtype=np.uint8)
wilderness[3:7, 3:7] = 255
for mode_name in ["foot", "mtb", "atv", "vehicle"]:
cost = compute_cost_grid(
elev, cell_size_m=30.0,
wilderness=wilderness,
mode=mode_name
)
wilderness_impassable = np.sum(np.isinf(cost[3:7, 3:7]))
print(f" {mode_name:8s}: wilderness cells impassable = {wilderness_impassable}/16")
print("\nDone.")

View file

@ -1,190 +0,0 @@
"""
DEM tile reader for OFFROUTE.
Reads elevation tiles from planet-dem.pmtiles (Terrarium-encoded WebP),
decodes them into numpy arrays, and provides a stitched elevation grid
for a given bounding box.
"""
import math
from functools import lru_cache
from io import BytesIO
from pathlib import Path
from typing import Tuple, Optional
import numpy as np
from PIL import Image
from pmtiles.reader import MmapSource, Reader as PMTilesReader
# Default path to the planet DEM PMTiles file
DEFAULT_DEM_PATH = Path("/mnt/nas/nav/planet-dem.pmtiles")
# Tile size in pixels (z12 tiles are 512x512 in this tileset)
TILE_SIZE = 512
# Zoom level to use for elevation data
ZOOM_LEVEL = 12
def terrarium_decode(rgb_array: np.ndarray) -> np.ndarray:
"""
Decode Terrarium-encoded RGB values to elevation in meters.
Formula: elevation = (R * 256 + G + B/256) - 32768
"""
r = rgb_array[:, :, 0].astype(np.float32)
g = rgb_array[:, :, 1].astype(np.float32)
b = rgb_array[:, :, 2].astype(np.float32)
elevation = (r * 256.0 + g + b / 256.0) - 32768.0
return elevation
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
"""Convert lat/lon to tile coordinates at given zoom level."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
lat_rad = math.radians(lat)
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return x, y
def tile_to_lat_lon(x: int, y: int, zoom: int) -> Tuple[float, float, float, float]:
"""Convert tile coordinates to bounding box (north, south, west, east)."""
n = 2 ** zoom
lon_west = x / n * 360.0 - 180.0
lon_east = (x + 1) / n * 360.0 - 180.0
lat_north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
lat_south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return lat_north, lat_south, lon_west, lon_east
class DEMReader:
"""Reader for Terrarium-encoded DEM tiles from PMTiles."""
def __init__(self, pmtiles_path: Path = DEFAULT_DEM_PATH, tile_cache_size: int = 128):
self.pmtiles_path = pmtiles_path
self._source = MmapSource(open(pmtiles_path, "rb"))
self._reader = PMTilesReader(self._source)
self._header = self._reader.header()
self._decode_tile = lru_cache(maxsize=tile_cache_size)(self._decode_tile_impl)
def _decode_tile_impl(self, z: int, x: int, y: int) -> Optional[np.ndarray]:
"""Fetch and decode a single tile."""
tile_data = self._reader.get(z, x, y)
if tile_data is None:
return None
img = Image.open(BytesIO(tile_data))
rgb_array = np.array(img)
if rgb_array.shape[2] == 4:
rgb_array = rgb_array[:, :, :3]
elevation = terrarium_decode(rgb_array)
return elevation
def get_elevation_grid(
self,
south: float,
north: float,
west: float,
east: float,
zoom: int = ZOOM_LEVEL
) -> Tuple[np.ndarray, dict]:
"""Get a stitched elevation grid for the given bounding box."""
x_min, y_max = lat_lon_to_tile(south, west, zoom)
x_max, y_min = lat_lon_to_tile(north, east, zoom)
n = 2 ** zoom
x_min = max(0, x_min)
x_max = min(n - 1, x_max)
y_min = max(0, y_min)
y_max = min(n - 1, y_max)
n_tiles_x = x_max - x_min + 1
n_tiles_y = y_max - y_min + 1
out_height = n_tiles_y * TILE_SIZE
out_width = n_tiles_x * TILE_SIZE
elevation = np.full((out_height, out_width), np.nan, dtype=np.float32)
for ty in range(y_min, y_max + 1):
for tx in range(x_min, x_max + 1):
tile_elev = self._decode_tile(zoom, tx, ty)
if tile_elev is not None:
out_y = (ty - y_min) * TILE_SIZE
out_x = (tx - x_min) * TILE_SIZE
elevation[out_y:out_y + TILE_SIZE, out_x:out_x + TILE_SIZE] = tile_elev
grid_north, _, grid_west, _ = tile_to_lat_lon(x_min, y_min, zoom)
_, grid_south, _, grid_east = tile_to_lat_lon(x_max, y_max, zoom)
pixel_size_lat = (grid_north - grid_south) / out_height
pixel_size_lon = (grid_east - grid_west) / out_width
origin_lat = grid_north - pixel_size_lat / 2
origin_lon = grid_west + pixel_size_lon / 2
center_lat = (south + north) / 2
lat_m = 111320.0
lon_m = 111320.0 * math.cos(math.radians(center_lat))
cell_size_lat_m = abs(pixel_size_lat) * lat_m
cell_size_lon_m = abs(pixel_size_lon) * lon_m
cell_size_m = (cell_size_lat_m + cell_size_lon_m) / 2
row_start = int((grid_north - north) / abs(pixel_size_lat))
row_end = int((grid_north - south) / abs(pixel_size_lat))
col_start = int((west - grid_west) / pixel_size_lon)
col_end = int((east - grid_west) / pixel_size_lon)
row_start = max(0, row_start)
row_end = min(out_height, row_end)
col_start = max(0, col_start)
col_end = min(out_width, col_end)
elevation = elevation[row_start:row_end, col_start:col_end]
origin_lat = grid_north - (row_start + 0.5) * abs(pixel_size_lat)
origin_lon = grid_west + (col_start + 0.5) * pixel_size_lon
metadata = {
"bounds": (south, north, west, east),
"pixel_size_lat": -abs(pixel_size_lat),
"pixel_size_lon": pixel_size_lon,
"origin_lat": origin_lat,
"origin_lon": origin_lon,
"cell_size_m": cell_size_m,
"shape": elevation.shape,
}
return elevation, metadata
def pixel_to_latlon(self, row: int, col: int, metadata: dict) -> Tuple[float, float]:
"""Convert pixel coordinates to lat/lon."""
lat = metadata["origin_lat"] + row * metadata["pixel_size_lat"]
lon = metadata["origin_lon"] + col * metadata["pixel_size_lon"]
return lat, lon
def latlon_to_pixel(self, lat: float, lon: float, metadata: dict) -> Tuple[int, int]:
"""Convert lat/lon to pixel coordinates."""
row = int((metadata["origin_lat"] - lat) / abs(metadata["pixel_size_lat"]))
col = int((lon - metadata["origin_lon"]) / metadata["pixel_size_lon"])
return row, col
def close(self):
"""Close the PMTiles file."""
pass # MmapSource handles cleanup
if __name__ == "__main__":
reader = DEMReader()
elevation, meta = reader.get_elevation_grid(
south=42.4, north=42.6, west=-114.5, east=-114.3
)
print(f"Elevation grid shape: {elevation.shape}")
print(f"Cell size: {meta['cell_size_m']:.1f} m")
print(f"Elevation range: {np.nanmin(elevation):.1f} - {np.nanmax(elevation):.1f} m")
center_row, center_col = elevation.shape[0] // 2, elevation.shape[1] // 2
lat, lon = reader.pixel_to_latlon(center_row, center_col, meta)
print(f"Center pixel lat/lon: {lat:.4f}, {lon:.4f}")
reader.close()

View file

@ -1,137 +0,0 @@
"""
Friction layer reader for OFFROUTE.
Reads friction values from the WorldCover friction VRT and resamples
to match the elevation grid dimensions.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for friction layer support")
# Default path to the friction VRT
DEFAULT_FRICTION_PATH = Path("/mnt/nav/worldcover/friction/friction_conus.vrt")
class FrictionReader:
"""Reader for WorldCover friction raster."""
def __init__(self, friction_path: Path = DEFAULT_FRICTION_PATH):
self.friction_path = friction_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
self._dataset = rasterio.open(self.friction_path)
return self._dataset
def get_friction_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get friction values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 friction values, same shape as target_shape.
Values: 10-40 = friction multiplier (divide by 10)
255 = impassable
0 = nodata (treat as impassable)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor for categorical data
friction = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return friction
def sample_point(self, lat: float, lon: float) -> int:
"""Sample friction value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = nodata
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def friction_to_multiplier(friction: np.ndarray) -> np.ndarray:
"""
Convert friction values to cost multipliers.
Args:
friction: uint8 array of friction values
Returns:
float32 array of multipliers.
Values 10-40 become 1.0-4.0 (divide by 10).
Values 0 or 255 become np.inf (impassable).
"""
multiplier = friction.astype(np.float32) / 10.0
# Mark impassable cells
multiplier[friction == 0] = np.inf # nodata
multiplier[friction == 255] = np.inf # water/impassable
return multiplier
if __name__ == "__main__":
print("Testing FrictionReader...")
reader = FrictionReader()
# Test point sampling - Murtaugh Lake (should be water = 255)
lake_lat, lake_lon = 42.47, -114.15
lake_friction = reader.sample_point(lake_lat, lake_lon)
print(f"Murtaugh Lake ({lake_lat}, {lake_lon}): friction = {lake_friction}")
print(f" Expected: 255 (water/impassable)")
# Test grid read for small bbox
friction = reader.get_friction_grid(
south=42.4, north=42.5, west=-114.2, east=-114.1,
target_shape=(100, 100)
)
print(f"\nGrid test shape: {friction.shape}")
print(f"Unique values: {np.unique(friction)}")
print(f"Water cells (255): {np.sum(friction == 255)}")
reader.close()
print("\nFrictionReader test complete.")

View file

@ -1,623 +0,0 @@
"""
MVUM (Motor Vehicle Use Map) legal access layer for OFFROUTE.
Queries USFS MVUM data from navi.db and provides rasterized access grids
indicating which roads/trails are open or closed to specific vehicle modes.
MVUM is motor-vehicle specific foot mode should skip this layer entirely.
"""
import re
import sqlite3
import warnings
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Literal
import numpy as np
# Path to navi.db
NAVI_DB_PATH = Path("/mnt/nav/navi.db")
def parse_date_range(date_str: str) -> List[Tuple[int, int, int, int]]:
"""
Parse MVUM date range strings like "05/01-11/30" or "06/15-10/15,12/01-03/31".
Returns list of (start_month, start_day, end_month, end_day) tuples.
Returns empty list if unparseable.
"""
if not date_str or date_str.strip() == "":
return []
ranges = []
# Split by comma for multi-period strings
for part in date_str.split(","):
part = part.strip()
# Match MM/DD-MM/DD pattern
match = re.match(r"(\d{1,2})/(\d{1,2})-(\d{1,2})/(\d{1,2})", part)
if match:
try:
sm, sd, em, ed = int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))
if 1 <= sm <= 12 and 1 <= sd <= 31 and 1 <= em <= 12 and 1 <= ed <= 31:
ranges.append((sm, sd, em, ed))
except ValueError:
pass
return ranges
def is_date_in_range(month: int, day: int, ranges: List[Tuple[int, int, int, int]]) -> bool:
"""
Check if a given month/day falls within any of the date ranges.
Handles ranges that wrap around year end (e.g., 12/01-03/31).
"""
if not ranges:
return True # No ranges = assume open
date_num = month * 100 + day # Simple numeric comparison
for sm, sd, em, ed in ranges:
start_num = sm * 100 + sd
end_num = em * 100 + ed
if start_num <= end_num:
# Normal range (e.g., 05/01-11/30)
if start_num <= date_num <= end_num:
return True
else:
# Wrapping range (e.g., 12/01-03/31)
if date_num >= start_num or date_num <= end_num:
return True
return False
def check_access(
status_field: Optional[str],
dates_field: Optional[str],
seasonal: Optional[str],
check_date: Optional[Tuple[int, int]] = None
) -> Optional[bool]:
"""
Determine if a road/trail is open to a vehicle type.
Args:
status_field: Value of vehicle-class field (e.g., "open", null)
dates_field: Value of *_DATESOPEN field (e.g., "05/01-11/30")
seasonal: Value of SEASONAL field ("yearlong", "seasonal")
check_date: Optional (month, day) tuple to check against date ranges
Returns:
True = open
False = closed
None = no data (field not populated, defer to SYMBOL)
"""
if status_field is None or status_field.strip() == "":
return None # No data
status = status_field.strip().lower()
if status != "open":
return False # Explicitly closed or restricted
# Status is "open" - check seasonal restrictions
if check_date is not None:
month, day = check_date
# Parse date ranges
if dates_field:
ranges = parse_date_range(dates_field)
if ranges:
return is_date_in_range(month, day, ranges)
# No date field but seasonal = "yearlong" means always open
if seasonal and seasonal.strip().lower() == "yearlong":
return True
# Seasonal with no dates - assume open (data quality issue)
if seasonal and seasonal.strip().lower() == "seasonal":
warnings.warn(f"Seasonal road/trail with no DATESOPEN, assuming open")
return True
return True # Open with no date check
def get_mode_field(mode: str) -> Tuple[str, str]:
"""
Get the MVUM field names for a given travel mode.
Returns (status_field, dates_field) tuple.
"""
mode_mapping = {
"atv": ("atv", "atv_datesopen"),
"motorcycle": ("motorcycle", "motorcycle_datesopen"),
"mtb": ("e_bike_class1", "e_bike_class1_dur"), # Closest analog for e-bikes
"vehicle": ("highclearancevehicle", "highclearancevehicle_datesopen"),
"passenger": ("passengervehicle", "passengervehicle_datesopen"),
}
return mode_mapping.get(mode, ("highclearancevehicle", "highclearancevehicle_datesopen"))
def symbol_to_access(symbol: str, mode: str, maint_level: Optional[str] = None) -> Optional[bool]:
"""
Fallback: interpret SYMBOL field when per-vehicle-class fields are null.
MVUM SYMBOL meanings (roads):
1 = Open to all vehicles
2 = Open to highway legal vehicles only
3 = Road closed to motorized
4 = Road open seasonally
11 = Administrative use only
12 = Decommissioned
For trails, similar logic applies based on TRAILCLASS.
"""
if symbol is None:
return None
sym = str(symbol).strip()
# Symbol 1: Open to all
if sym == "1":
return True
# Symbol 2: Highway legal only
if sym == "2":
# ATVs/motorcycles typically not highway legal
if mode in ("atv", "motorcycle"):
return False
return True
# Symbol 3: Closed to motorized
if sym == "3":
return False
# Symbol 4: Seasonally open (assume open if no date check)
if sym == "4":
return True
# Symbol 11/12: Administrative/decommissioned = closed
if sym in ("11", "12"):
return False
# Unknown symbol - defer
return None
class MVUMReader:
"""
Reader for MVUM data from navi.db.
Queries roads and trails by bounding box and returns access grids.
"""
def __init__(self, db_path: Path = NAVI_DB_PATH):
self.db_path = db_path
self._conn = None
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
if not self.db_path.exists():
raise FileNotFoundError(f"navi.db not found at {self.db_path}")
self._conn = sqlite3.connect(str(self.db_path))
self._conn.row_factory = sqlite3.Row
# Load Spatialite extension if available
try:
self._conn.enable_load_extension(True)
self._conn.load_extension("mod_spatialite")
except Exception:
pass # Spatialite not available, will use manual bbox queries
return self._conn
def table_exists(self, table_name: str) -> bool:
"""Check if an MVUM table exists."""
conn = self._get_conn()
cur = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return cur.fetchone() is not None
def query_roads_bbox(
self,
south: float, north: float, west: float, east: float,
mode: str = "atv",
check_date: Optional[Tuple[int, int]] = None
) -> List[Dict]:
"""
Query MVUM roads within a bounding box.
Returns list of dicts with access info for the given mode.
"""
if not self.table_exists("mvum_roads"):
return []
conn = self._get_conn()
# Query using bbox on geometry
# Since we don't have spatialite, we'll query all and filter in Python
# For production, consider pre-computing bbox columns
cur = conn.execute("""
SELECT ogc_fid, id, name, symbol, operationalmaintlevel, seasonal,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
passengervehicle, passengervehicle_datesopen,
e_bike_class1, e_bike_class1_dur,
shape
FROM mvum_roads
""")
status_field, dates_field = get_mode_field(mode)
results = []
for row in cur:
# Parse geometry to check bbox intersection
# The shape is stored as WKB blob
shape = row["shape"]
if shape is None:
continue
# Quick bbox check using geometry extent
# Since we don't have Spatialite functions, we'll include all
# and let the rasterization handle it
access = check_access(
row[status_field] if status_field in row.keys() else None,
row[dates_field] if dates_field in row.keys() else None,
row["seasonal"],
check_date
)
# Fallback to SYMBOL if no per-vehicle data
if access is None:
access = symbol_to_access(row["symbol"], mode, row["operationalmaintlevel"])
if access is not None:
results.append({
"id": row["id"],
"name": row["name"],
"access": access,
"symbol": row["symbol"],
"maint_level": row["operationalmaintlevel"],
"shape": shape,
})
return results
def query_trails_bbox(
self,
south: float, north: float, west: float, east: float,
mode: str = "atv",
check_date: Optional[Tuple[int, int]] = None
) -> List[Dict]:
"""
Query MVUM trails within a bounding box.
"""
if not self.table_exists("mvum_trails"):
return []
conn = self._get_conn()
cur = conn.execute("""
SELECT ogc_fid, id, name, symbol, seasonal, trailclass,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
passengervehicle, passengervehicle_datesopen,
e_bike_class1, e_bike_class1_dur,
shape
FROM mvum_trails
""")
status_field, dates_field = get_mode_field(mode)
results = []
for row in cur:
shape = row["shape"]
if shape is None:
continue
access = check_access(
row[status_field] if status_field in row.keys() else None,
row[dates_field] if dates_field in row.keys() else None,
row["seasonal"],
check_date
)
if access is None:
access = symbol_to_access(row["symbol"], mode)
if access is not None:
results.append({
"id": row["id"],
"name": row["name"],
"access": access,
"symbol": row["symbol"],
"trail_class": row["trailclass"],
"shape": shape,
})
return results
def query_nearest(
self,
lat: float, lon: float,
radius_m: float = 50,
table: str = "mvum_roads"
) -> Optional[Dict]:
"""
Query the nearest MVUM feature to a point.
Used for the places panel API.
"""
if not self.table_exists(table):
return None
conn = self._get_conn()
# Convert radius to degrees (approximate)
radius_deg = radius_m / 111000
# Query features in bbox around point
if table == "mvum_roads":
cur = conn.execute("""
SELECT ogc_fid, id, name, forestname, districtname, symbol,
operationalmaintlevel, surfacetype, seasonal, jurisdiction,
passengervehicle, passengervehicle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
fourwd_gt50inches, fourwd_gt50_datesopen,
twowd_gt50inches, twowd_gt50_datesopen,
e_bike_class1, e_bike_class1_dur,
e_bike_class2, e_bike_class2_dur,
e_bike_class3, e_bike_class3_dur,
shape
FROM mvum_roads
LIMIT 1000
""")
else:
cur = conn.execute("""
SELECT ogc_fid, id, name, forestname, districtname, symbol,
seasonal, jurisdiction, trailclass, trailsystem,
passengervehicle, passengervehicle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
fourwd_gt50inches, fourwd_gt50_datesopen,
twowd_gt50inches, twowd_gt50_datesopen,
e_bike_class1, e_bike_class1_dur,
e_bike_class2, e_bike_class2_dur,
e_bike_class3, e_bike_class3_dur,
shape
FROM mvum_trails
LIMIT 1000
""")
# Find nearest feature
# This is a simplified approach - for production, use spatial index
try:
from shapely import wkb
from shapely.geometry import Point
query_point = Point(lon, lat)
nearest = None
min_dist = float('inf')
for row in cur:
try:
geom = wkb.loads(row["shape"])
dist = query_point.distance(geom)
if dist < min_dist and dist < radius_deg:
min_dist = dist
nearest = dict(row)
nearest["geometry"] = geom
except Exception:
continue
if nearest:
# Convert geometry to GeoJSON
nearest["geojson"] = nearest["geometry"].__geo_interface__
del nearest["geometry"]
del nearest["shape"]
return nearest
except ImportError:
warnings.warn("shapely not available for nearest query")
return None
def close(self):
if self._conn:
self._conn.close()
self._conn = None
def get_mvum_access_grid(
south: float, north: float, west: float, east: float,
target_shape: Tuple[int, int],
mode: Literal["foot", "mtb", "atv", "vehicle"] = "atv",
check_date: Optional[str] = None,
db_path: Path = NAVI_DB_PATH
) -> np.ndarray:
"""
Get MVUM access grid for pathfinding.
Args:
south, north, west, east: Bounding box (WGS84)
target_shape: (rows, cols) to match elevation grid
mode: Travel mode (foot skips MVUM entirely)
check_date: Optional "MM/DD" string for seasonal checking
db_path: Path to navi.db
Returns:
np.ndarray of uint8:
0 = no MVUM data (defer to existing trail/friction logic)
1 = road/trail is OPEN to this vehicle mode
255 = road/trail EXISTS but is CLOSED to this mode
"""
# Foot mode bypasses MVUM entirely
if mode == "foot":
return np.zeros(target_shape, dtype=np.uint8)
# Parse check_date if provided
parsed_date = None
if check_date:
match = re.match(r"(\d{1,2})/(\d{1,2})", check_date)
if match:
parsed_date = (int(match.group(1)), int(match.group(2)))
# Initialize output grid
grid = np.zeros(target_shape, dtype=np.uint8)
rows, cols = target_shape
# Pixel size
pixel_lat = (north - south) / rows
pixel_lon = (east - west) / cols
reader = MVUMReader(db_path)
try:
# Query roads and trails
roads = reader.query_roads_bbox(south, north, west, east, mode, parsed_date)
trails = reader.query_trails_bbox(south, north, west, east, mode, parsed_date)
# Rasterize features
try:
from shapely import wkb
for features in [roads, trails]:
for feat in features:
try:
geom = wkb.loads(feat["shape"])
# Get geometry bounds
minx, miny, maxx, maxy = geom.bounds
# Check if intersects our bbox
if maxx < west or minx > east or maxy < south or miny > north:
continue
# Rasterize line
value = 1 if feat["access"] else 255
# Simple line rasterization
if geom.geom_type in ("LineString", "MultiLineString"):
if geom.geom_type == "MultiLineString":
coords_list = [list(line.coords) for line in geom.geoms]
else:
coords_list = [list(geom.coords)]
for coords in coords_list:
for i in range(len(coords) - 1):
x1, y1 = coords[i]
x2, y2 = coords[i + 1]
# Convert to pixel coordinates
col1 = int((x1 - west) / pixel_lon)
row1 = int((north - y1) / pixel_lat)
col2 = int((x2 - west) / pixel_lon)
row2 = int((north - y2) / pixel_lat)
# Bresenham's line algorithm
_draw_line(grid, row1, col1, row2, col2, value)
except Exception as e:
continue
except ImportError:
warnings.warn("shapely not available, MVUM rasterization skipped")
finally:
reader.close()
return grid
def _draw_line(grid: np.ndarray, r1: int, c1: int, r2: int, c2: int, value: int):
"""Draw a line on the grid using Bresenham's algorithm."""
rows, cols = grid.shape
dr = abs(r2 - r1)
dc = abs(c2 - c1)
sr = 1 if r1 < r2 else -1
sc = 1 if c1 < c2 else -1
err = dr - dc
r, c = r1, c1
while True:
if 0 <= r < rows and 0 <= c < cols:
# Only overwrite if current value is 0 (no data) or we're marking closed
if grid[r, c] == 0 or value == 255:
grid[r, c] = value
if r == r2 and c == c2:
break
e2 = 2 * err
if e2 > -dc:
err -= dc
r += sr
if e2 < dr:
err += dr
c += sc
if __name__ == "__main__":
import sys
print("=" * 60)
print("MVUM Reader Test")
print("=" * 60)
reader = MVUMReader()
if not reader.table_exists("mvum_roads"):
print("ERROR: mvum_roads table not found in navi.db")
sys.exit(1)
# Test bbox query (Sawtooth NF area)
print("\n[1] Testing bbox query (Sawtooth NF area)...")
roads = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv"
)
print(f" Found {len(roads)} roads")
open_count = sum(1 for r in roads if r["access"])
closed_count = sum(1 for r in roads if not r["access"])
print(f" Open to ATV: {open_count}")
print(f" Closed to ATV: {closed_count}")
# Test with seasonal date
print("\n[2] Testing with date check (July 15)...")
roads_summer = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv",
check_date=(7, 15)
)
open_summer = sum(1 for r in roads_summer if r["access"])
print(f" Open to ATV on 07/15: {open_summer}")
print("\n[3] Testing with date check (January 15)...")
roads_winter = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv",
check_date=(1, 15)
)
open_winter = sum(1 for r in roads_winter if r["access"])
print(f" Open to ATV on 01/15: {open_winter}")
# Test grid generation
print("\n[4] Testing grid generation...")
grid = get_mvum_access_grid(
south=43.5, north=44.0, west=-115.0, east=-114.0,
target_shape=(500, 1000),
mode="atv"
)
print(f" Grid shape: {grid.shape}")
print(f" No data (0): {np.sum(grid == 0)}")
print(f" Open (1): {np.sum(grid == 1)}")
print(f" Closed (255): {np.sum(grid == 255)}")
reader.close()
print("\nDone.")

View file

@ -1,414 +0,0 @@
#!/usr/bin/env python3
"""
OFFROUTE Phase O3a Prototype
Validates trail burn-in integration with the MCP pathfinder.
The path should actively seek out trails and roads when nearby.
Compares paths with and without trail burn-in to show the benefit
of trail-seeking behavior.
"""
import json
import time
import sys
from pathlib import Path
import numpy as np
from skimage.graph import MCP_Geometric
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from lib.offroute.dem import DEMReader
from lib.offroute.cost import compute_cost_grid
from lib.offroute.friction import FrictionReader, friction_to_multiplier
from lib.offroute.barriers import BarrierReader, DEFAULT_BARRIERS_PATH
from lib.offroute.trails import TrailReader, DEFAULT_TRAILS_PATH
# Test bounding box - Idaho area
BBOX = {
"south": 42.21,
"north": 42.60,
"west": -114.76,
"east": -113.79,
}
# Start point: wilderness area away from roads
START_LAT = 42.35
START_LON = -114.60
# End point: near Twin Falls (has roads/trails)
END_LAT = 42.55
END_LON = -114.20
# Output files
OUTPUT_PATH_WITH_TRAILS = Path("/opt/recon/data/offroute-test-trails.geojson")
OUTPUT_PATH_NO_TRAILS = Path("/opt/recon/data/offroute-test-no-trails.geojson")
# Memory limit in GB
MEMORY_LIMIT_GB = 12
def check_memory_usage():
"""Check current memory usage and abort if over limit."""
try:
import psutil
process = psutil.Process()
mem_gb = process.memory_info().rss / (1024**3)
if mem_gb > MEMORY_LIMIT_GB:
print(f"ERROR: Memory usage {mem_gb:.1f}GB exceeds {MEMORY_LIMIT_GB}GB limit")
sys.exit(1)
return mem_gb
except ImportError:
return 0
def run_pathfinder(
elevation: np.ndarray,
meta: dict,
friction_mult: np.ndarray,
trails: np.ndarray,
barriers: np.ndarray,
use_trails: bool,
start_row: int,
start_col: int,
end_row: int,
end_col: int,
dem_reader: DEMReader,
) -> dict:
"""Run the MCP pathfinder with given parameters."""
# Compute cost grid
cost = compute_cost_grid(
elevation,
cell_size_m=meta["cell_size_m"],
friction=friction_mult,
trails=trails if use_trails else None,
barriers=barriers,
boundary_mode="pragmatic",
)
# Run MCP
mcp = MCP_Geometric(cost, fully_connected=True)
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
end_cost = cumulative_costs[end_row, end_col]
if np.isinf(end_cost):
return {
"success": False,
"reason": "No path found (blocked by impassable terrain)",
}
# Traceback path
path_indices = mcp.traceback((end_row, end_col))
# Convert to coordinates and collect stats
coordinates = []
elevations = []
trail_values = []
for row, col in path_indices:
lat, lon = dem_reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
trail_val = trails[row, col] if trails is not None else 0
coordinates.append([lon, lat])
elevations.append(elev)
trail_values.append(trail_val)
# Compute distance
total_distance_m = 0
for i in range(1, len(coordinates)):
lon1, lat1 = coordinates[i-1]
lon2, lat2 = coordinates[i]
R = 6371000
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
total_distance_m += R * c
# Elevation stats
elev_arr = np.array(elevations)
elev_diff = np.diff(elev_arr)
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Trail stats
trail_arr = np.array(trail_values)
road_cells = np.sum(trail_arr == 5)
track_cells = np.sum(trail_arr == 15)
trail_cells = np.sum(trail_arr == 25)
off_trail_cells = np.sum(trail_arr == 0)
on_trail_cells = road_cells + track_cells + trail_cells
total_cells = len(trail_arr)
return {
"success": True,
"coordinates": coordinates,
"total_time_seconds": float(end_cost),
"total_time_minutes": float(end_cost / 60),
"total_distance_m": float(total_distance_m),
"total_distance_km": float(total_distance_m / 1000),
"elevation_gain_m": float(elev_gain),
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"cell_count": total_cells,
"road_cells": int(road_cells),
"track_cells": int(track_cells),
"trail_cells": int(trail_cells),
"off_trail_cells": int(off_trail_cells),
"on_trail_pct": float(100 * on_trail_cells / total_cells) if total_cells > 0 else 0,
}
def main():
print("=" * 80)
print("OFFROUTE Phase O3a Prototype (Trail Burn-In)")
print("=" * 80)
t0 = time.time()
# Check for required rasters
if not DEFAULT_BARRIERS_PATH.exists():
print(f"\nERROR: Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
sys.exit(1)
if not DEFAULT_TRAILS_PATH.exists():
print(f"\nERROR: Trails raster not found at {DEFAULT_TRAILS_PATH}")
sys.exit(1)
# Step 1: Load elevation data
print(f"\n[1] Loading DEM for bbox: {BBOX}")
dem_reader = DEMReader()
elevation, meta = dem_reader.get_elevation_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
)
print(f" Elevation grid shape: {elevation.shape}")
print(f" Cell count: {elevation.size:,}")
print(f" Cell size: {meta['cell_size_m']:.1f} m")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 2: Load friction data
print(f"\n[2] Loading WorldCover friction layer...")
friction_reader = FrictionReader()
friction_raw = friction_reader.get_friction_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
friction_mult = friction_to_multiplier(friction_raw)
print(f" Friction grid shape: {friction_raw.shape}")
print(f" Water/impassable cells: {np.sum(np.isinf(friction_mult)):,}")
# Step 3: Load barrier data
print(f"\n[3] Loading PAD-US barrier layer...")
barrier_reader = BarrierReader()
barriers = barrier_reader.get_barrier_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
closed_cells = np.sum(barriers == 255)
print(f" Barrier grid shape: {barriers.shape}")
print(f" Closed/restricted cells: {closed_cells:,}")
# Step 4: Load trails data
print(f"\n[4] Loading OSM trails layer...")
trail_reader = TrailReader()
trails = trail_reader.get_trails_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
road_cells = np.sum(trails == 5)
track_cells = np.sum(trails == 15)
trail_cells = np.sum(trails == 25)
print(f" Trails grid shape: {trails.shape}")
print(f" Road cells: {road_cells:,}")
print(f" Track cells: {track_cells:,}")
print(f" Trail cells: {trail_cells:,}")
print(f" Total trail coverage: {100*(road_cells+track_cells+trail_cells)/trails.size:.2f}%")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 5: Convert start/end to pixel coordinates
print(f"\n[5] Converting coordinates...")
start_row, start_col = dem_reader.latlon_to_pixel(START_LAT, START_LON, meta)
end_row, end_col = dem_reader.latlon_to_pixel(END_LAT, END_LON, meta)
print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})")
print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})")
# Validate coordinates
rows, cols = elevation.shape
if not (0 <= start_row < rows and 0 <= start_col < cols):
print(f"ERROR: Start point outside grid bounds")
sys.exit(1)
if not (0 <= end_row < rows and 0 <= end_col < cols):
print(f"ERROR: End point outside grid bounds")
sys.exit(1)
# Step 6: Run pathfinder WITH trails
print(f"\n[6] Running pathfinder WITH trail burn-in...")
t6a = time.time()
result_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=True,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t6b = time.time()
print(f" Completed in {t6b - t6a:.1f}s")
# Step 7: Run pathfinder WITHOUT trails
print(f"\n[7] Running pathfinder WITHOUT trail burn-in...")
t7a = time.time()
result_no_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=False,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t7b = time.time()
print(f" Completed in {t7b - t7a:.1f}s")
# Step 8: Save GeoJSON outputs
print(f"\n[8] Saving GeoJSON outputs...")
OUTPUT_PATH_WITH_TRAILS.parent.mkdir(parents=True, exist_ok=True)
if result_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_with_trails",
"phase": "O3a",
"trail_burn_in": True,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_trails["coordinates"],
}
}
with open(OUTPUT_PATH_WITH_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_WITH_TRAILS}")
if result_no_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_no_trails",
"phase": "O3a",
"trail_burn_in": False,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_no_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_no_trails["coordinates"],
}
}
with open(OUTPUT_PATH_NO_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_NO_TRAILS}")
t_total = time.time()
# Final report
print(f"\n" + "=" * 80)
print("SIDE-BY-SIDE COMPARISON: Trail Burn-In Effect")
print("=" * 80)
if result_trails["success"] and result_no_trails["success"]:
print(f"{'Metric':<25} {'WITH TRAILS':<20} {'WITHOUT TRAILS':<20} {'Delta':<15}")
print("-" * 80)
metrics = [
("Distance (km)", "total_distance_km", ".2f"),
("Effort time (min)", "total_time_minutes", ".1f"),
("Cell count", "cell_count", "d"),
("Elevation gain (m)", "elevation_gain_m", ".0f"),
("On-trail %", "on_trail_pct", ".1f"),
("Road cells", "road_cells", "d"),
("Track cells", "track_cells", "d"),
("Trail cells", "trail_cells", "d"),
]
for label, key, fmt in metrics:
val_with = result_trails[key]
val_without = result_no_trails[key]
if isinstance(val_with, int):
delta = val_with - val_without
delta_str = f"{delta:+d}"
else:
delta = val_with - val_without
delta_str = f"{delta:+.2f}"
print(f"{label:<25} {val_with:<20{fmt}} {val_without:<20{fmt}} {delta_str:<15}")
# Analysis
print(f"\n" + "-" * 80)
print("ANALYSIS")
print("-" * 80)
time_saved = result_no_trails["total_time_minutes"] - result_trails["total_time_minutes"]
if time_saved > 0:
print(f"Trail burn-in saves {time_saved:.1f} minutes ({100*time_saved/result_no_trails['total_time_minutes']:.1f}% faster)")
elif time_saved < 0:
print(f"Trail burn-in adds {-time_saved:.1f} minutes (path seeks trails even if longer)")
on_trail_with = result_trails["on_trail_pct"]
on_trail_without = result_no_trails["on_trail_pct"]
if on_trail_with > on_trail_without:
print(f"Trail burn-in increases on-trail travel: {on_trail_without:.1f}% → {on_trail_with:.1f}%")
else:
print(f"Both paths have similar on-trail percentage")
else:
if not result_trails["success"]:
print(f"WITH TRAILS: FAILED - {result_trails.get('reason', 'unknown')}")
if not result_no_trails["success"]:
print(f"WITHOUT TRAILS: FAILED - {result_no_trails.get('reason', 'unknown')}")
print(f"\n" + "-" * 80)
print(f"Total wall time: {t_total - t0:.1f}s")
# Cleanup
dem_reader.close()
friction_reader.close()
barrier_reader.close()
trail_reader.close()
print("\nPrototype completed.")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load diff

View file

@ -1,174 +0,0 @@
"""
Trail corridor reader for OFFROUTE.
Provides access to the OSM-derived trail raster for pathfinding.
Trail values replace WorldCover friction where trails exist.
Raster values:
0 = no trail (use WorldCover friction)
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for trails layer support")
# Default path to the trails raster
DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif")
# Trail value to friction multiplier mapping
TRAIL_FRICTION_MAP = {
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
}
class TrailReader:
"""Reader for OSM-derived trail corridor raster."""
def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH):
self.trails_path = trails_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.trails_path.exists():
raise FileNotFoundError(
f"Trails raster not found at {self.trails_path}. "
f"Run the Phase B rasterization script first."
)
self._dataset = rasterio.open(self.trails_path)
return self._dataset
def get_trails_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get trail values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 trail values:
0 = no trail
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor to preserve discrete values
trails = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return trails
def sample_point(self, lat: float, lon: float) -> int:
"""Sample trail value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = no trail
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert trail values to friction multipliers.
Args:
trails: uint8 array of trail values (0, 5, 15, or 25)
Returns:
Tuple of:
- friction: float32 array of friction multipliers
- has_trail: bool array indicating where trails exist
"""
friction = np.ones_like(trails, dtype=np.float32)
has_trail = trails > 0
# Apply friction values where trails exist
friction[trails == 5] = 0.1 # road
friction[trails == 15] = 0.3 # track
friction[trails == 25] = 0.5 # foot trail
return friction, has_trail
if __name__ == "__main__":
print("Testing TrailReader...")
if not DEFAULT_TRAILS_PATH.exists():
print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}")
print("Run Phase B rasterization first.")
exit(1)
reader = TrailReader()
# Test point sampling - Twin Falls downtown (should have roads)
test_lat, test_lon = 42.563, -114.461
trail_value = reader.sample_point(test_lat, test_lon)
print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}")
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown")
print(f" Type: {label}")
# Test grid read for test bbox
trails = reader.get_trails_grid(
south=42.21, north=42.60, west=-114.76, east=-113.79,
target_shape=(400, 1000)
)
print(f"\nGrid test shape: {trails.shape}")
unique, counts = np.unique(trails, return_counts=True)
print("Value distribution:")
for v, c in zip(unique, counts):
pct = 100 * c / trails.size
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})")
print(f" {label}: {c:,} pixels ({pct:.2f}%)")
# Test conversion to friction
friction, has_trail = trails_to_friction(trails)
print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%")
print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}")
reader.close()
print("\nTrailReader test complete.")

View file

@ -22,6 +22,41 @@ OVERPASS_URL = "https://overpass-api.de/api/interpreter"
OVERPASS_UA = "Navi/1.0 (forge.echo6.co/matt/recon)"
VALID_OSM_TYPES = {"N", "W", "R"}
# US states and Canadian provinces for wikipedia title parsing
US_STATES = {
'Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado',
'Connecticut', 'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho',
'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana',
'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota',
'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada',
'New Hampshire', 'New Jersey', 'New Mexico', 'New York',
'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon',
'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota',
'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington',
'West Virginia', 'Wisconsin', 'Wyoming', 'District of Columbia'
}
CANADIAN_PROVINCES = {
'Alberta', 'British Columbia', 'Manitoba', 'New Brunswick',
'Newfoundland and Labrador', 'Northwest Territories', 'Nova Scotia',
'Nunavut', 'Ontario', 'Prince Edward Island', 'Quebec', 'Saskatchewan', 'Yukon'
}
def _parse_state_from_wikipedia(wikipedia_tag):
"""Parse state/province and country from wikipedia extratag like 'en:Joliet, Illinois'"""
if not wikipedia_tag or not wikipedia_tag.startswith('en:'):
return None, None
title = wikipedia_tag[3:]
for state in US_STATES:
if state in title:
return state, 'us'
for prov in CANADIAN_PROVINCES:
if prov in title:
return prov, 'ca'
return None, None
_db_conn = None
@ -290,6 +325,11 @@ def _enrich_wiki_links(result):
"""
Rewrite wiki-related extratags to local Kiwix URLs where available.
Falls back to public URLs. Only runs when has_wiki_rewriting is enabled.
Note: When has_kiwix_wiki is enabled, we skip rewriting 'wikipedia' since
the wiki_index enrichment provides a proper wiki_url field. This keeps
extratags.wikipedia in the original OSM format for frontend link builders.
Returns the (possibly enriched) result dict.
"""
try:
@ -298,6 +338,8 @@ def _enrich_wiki_links(result):
features = deploy_config.get('features', {})
if not features.get('has_wiki_rewriting', False):
return result
# When has_kiwix_wiki is enabled, skip wikipedia rewriting (wiki_url handles it)
has_kiwix_wiki = features.get('has_kiwix_wiki', False)
except Exception:
return result
@ -313,6 +355,9 @@ def _enrich_wiki_links(result):
rewrites = {}
for tag in _WIKI_TAGS:
# Skip wikipedia when has_kiwix_wiki is enabled (wiki_url provides the local link)
if tag == 'wikipedia' and has_kiwix_wiki:
continue
value = extratags.get(tag)
if not value:
continue
@ -328,6 +373,82 @@ def _enrich_wiki_links(result):
return result
# ── Wiki Index enrichment ───────────────────────────────────────────────
def _enrich_with_wiki_index(result):
"""
Add wiki summary, URLs, and population from wiki_index.db.
Only runs when has_kiwix_wiki is enabled. Direct match only.
Returns the (possibly enriched) result dict.
"""
try:
from .deployment_config import get_deployment_config
deploy_config = get_deployment_config()
features = deploy_config.get('features', {})
if not features.get('has_kiwix_wiki', False):
return result
except Exception:
return result
try:
from . import wiki_index
except ImportError:
logger.debug("wiki_index module not available")
return result
if not wiki_index.is_available():
return result
# Extract match criteria from result
name = result.get('name', '')
osm_class = result.get('class', '')
osm_type_tag = result.get('type', '')
address = result.get('address', {})
state = address.get('state', '')
country_code = address.get('country_code', '')
# If state/country missing, try to derive from wikipedia extratag
extratags = result.get('extratags', {})
if (not state or not country_code) and extratags.get('wikipedia'):
derived_state, derived_country = _parse_state_from_wikipedia(extratags['wikipedia'])
if not state and derived_state:
state = derived_state
if not country_code and derived_country:
country_code = derived_country
# Handle boundary/administrative - get actual place type from extratags
# (e.g. boundary:administrative with extratags.place='city' -> place:city)
if osm_class == 'boundary' and osm_type_tag == 'administrative':
place_tag = extratags.get('place') or extratags.get('linked_place')
if place_tag:
osm_class = 'place'
osm_type_tag = place_tag
if not name or not osm_class or not osm_type_tag:
return result
# Look up wiki data
wiki_data = wiki_index.lookup_wiki(name, osm_class, osm_type_tag, state, country_code)
if not wiki_data:
return result
# Add wiki fields to result (additive only)
if 'wiki_summary' in wiki_data:
result['wiki_summary'] = wiki_data['wiki_summary']
if 'wiki_url' in wiki_data:
result['wiki_url'] = wiki_data['wiki_url']
if 'wikivoyage_url' in wiki_data:
result['wikivoyage_url'] = wiki_data['wikivoyage_url']
if 'wiki_population' in wiki_data:
result['wiki_population'] = wiki_data['wiki_population']
result.setdefault('sources', {})['wiki_index'] = True
logger.debug(f"Wiki index enrichment for {name}")
return result
# ── Nominatim parsing ───────────────────────────────────────────────────
# Nominatim address array uses rank_address to indicate what each entry is.
@ -444,6 +565,8 @@ def _parse_nominatim(data):
'wheelchair': raw_extra.get('wheelchair'),
'fee': raw_extra.get('fee'),
'takeaway': raw_extra.get('takeaway'),
'place': raw_extra.get('place'),
'linked_place': raw_extra.get('linked_place'),
}
# Category: use extratags.place for boundaries (e.g. "city"), else class/type
@ -593,6 +716,7 @@ def get_place_detail(osm_type, osm_id):
# 1. Check cache
cached = cache_get(osm_type, osm_id)
if cached:
cached = _enrich_with_wiki_index(cached)
logger.debug(f"Cache hit: {osm_type}/{osm_id}")
return cached, 200
@ -625,6 +749,7 @@ def get_place_detail(osm_type, osm_id):
nominatim_result = _enrich_with_overture(nominatim_result, osm_type, osm_id)
nominatim_result = _enrich_with_google(nominatim_result, osm_type, osm_id)
nominatim_result = _enrich_wiki_links(nominatim_result)
nominatim_result = _enrich_with_wiki_index(nominatim_result)
cache_put(osm_type, osm_id, nominatim_result, 'nominatim_local')
return nominatim_result, 200
@ -658,6 +783,7 @@ def get_place_detail(osm_type, osm_id):
overpass_result = _enrich_with_overture(overpass_result, osm_type, osm_id)
overpass_result = _enrich_with_google(overpass_result, osm_type, osm_id)
overpass_result = _enrich_wiki_links(overpass_result)
overpass_result = _enrich_with_wiki_index(overpass_result)
cache_put(osm_type, osm_id, overpass_result, 'overpass')
return overpass_result, 200

View file

@ -77,73 +77,10 @@ def _text_hash(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _flatten_table(table_el):
"""Convert a <table> element to pipe-delimited text.
Each <tr> becomes a row with cells joined by ' | '.
Returns the formatted table as a string with blank lines around it.
"""
rows = []
for tr in table_el.iter('tr'):
cells = []
for cell in tr:
if cell.tag in ('td', 'th'):
cell_text = (cell.text_content() or '').strip()
# Collapse internal whitespace in each cell
cell_text = re.sub(r'\s+', ' ', cell_text)
if cell_text:
cells.append(cell_text)
if cells:
rows.append(' | '.join(cells))
if not rows:
return ''
return '\n'.join(rows)
def _preprocess_tree(doc):
"""Pre-process HTML tree to add delimiters before text_content() flattens it.
Handles: <table>, <br>, <li>, <dt>, <dd> -- elements that lxml's
text_content() would concatenate without any separators.
"""
from lxml import etree
# 1. Replace <table> elements with their pipe-delimited text
for table in list(doc.iter('table')):
formatted = _flatten_table(table)
if formatted:
replacement = etree.Element('div')
replacement.text = '\n\n' + formatted + '\n\n'
parent = table.getparent()
if parent is not None:
parent.replace(table, replacement)
else:
table.drop_tree()
# 2. <br> -> inject newline
for br in list(doc.iter('br')):
br.tail = '\n' + (br.tail or '')
# 3. <li> -> inject newline + "- " prefix
for li in list(doc.iter('li')):
li.text = '- ' + (li.text or '')
li.tail = '\n' + (li.tail or '')
# 4. <dt> -> inject newline before
for dt in list(doc.iter('dt')):
dt.tail = '\n' + (dt.tail or '')
# 5. <dd> -> inject newline + indent
for dd in list(doc.iter('dd')):
dd.text = ' ' + (dd.text or '')
dd.tail = '\n' + (dd.tail or '')
def _html_to_text(html_bytes):
"""Convert HTML bytes to clean text via lxml.
Strips nav, footer, script, style elements. Decodes entities.
Pre-processes tables, lists, and line breaks for proper delimiters.
Normalizes whitespace.
"""
try:
@ -156,9 +93,6 @@ def _html_to_text(html_bytes):
for el in doc.iter(tag):
el.drop_tree()
# Pre-process tree: tables -> pipe-delimited, br -> newlines, li -> dashes
_preprocess_tree(doc)
# Extract text
text = doc.text_content()

View file

@ -1,136 +0,0 @@
#!/usr/bin/env python3
"""Tests for the /api/reverse/<lat>/<lon> enrichment bundle (lib.netsyms_api).
Photon/Valhalla/landclass are mocked so the suite runs without live services;
one timezone test exercises the real SpatiaLite DB when it is present. Plain
asserts + a __main__ runner, matching the rest of lib/*_test.py.
"""
import os
import sys
from unittest import mock
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from flask import Flask
from lib import netsyms_api
EXPECTED_KEYS = {'name', 'city', 'county', 'state', 'country',
'postal_code', 'timezone', 'landclass', 'elevation_m'}
def _client():
app = Flask(__name__)
app.register_blueprint(netsyms_api.geocode_bp)
return app.test_client()
def _clear_cache():
netsyms_api._REVERSE_BUNDLE_CACHE.clear()
def test_happy_path():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon', return_value={
'name': 'Where you are', 'city': 'Boise', 'county': 'Ada',
'state': 'Idaho', 'country': 'United States', 'postal_code': '83701'}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value='America/Boise'), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value='Boise National Forest'), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=824):
resp = _client().get('/api/reverse/43.6150/-116.2023')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS, data.keys()
assert data['city'] == 'Boise' and data['timezone'] == 'America/Boise'
assert data['landclass'] == 'Boise National Forest' and data['elevation_m'] == 824
print(" PASS: happy path — all 9 fields populated, exact key set")
def test_negative_and_integer_coords_parse():
# Regression: Flask's <float:> converter would 404 these; manual parse must not.
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon', return_value={}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=None):
for path in ('/api/reverse/43.6/-116.2', '/api/reverse/43/-116'):
resp = _client().get(path)
assert resp.status_code == 200, f"{path} -> {resp.status_code}"
assert set(resp.get_json().keys()) == EXPECTED_KEYS
print(" PASS: negative and integer coordinates parse (200, not 404)")
def test_partial_failure_returns_200_with_nulls():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon',
side_effect=RuntimeError('photon down')), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value='America/Boise'), \
mock.patch.object(netsyms_api, '_reverse_landclass',
side_effect=RuntimeError('postgis down')), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=824):
resp = _client().get('/api/reverse/43.6150/-116.2023')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS
assert data['name'] is None and data['city'] is None # photon failed -> nulls
assert data['landclass'] is None # landclass failed -> null
assert data['timezone'] == 'America/Boise' and data['elevation_m'] == 824
print(" PASS: per-component failure -> 200 with nulls, no 5xx")
def test_ocean_point_mostly_null():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon', return_value={}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value='Etc/GMT+2'), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=0):
resp = _client().get('/api/reverse/0.0/-30.0')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS
assert data['city'] is None and data['country'] is None and data['landclass'] is None
print(" PASS: ocean point -> 200, mostly null")
def test_invalid_input_400():
_clear_cache()
client = _client()
for path in ('/api/reverse/9999/0', '/api/reverse/0/9999', '/api/reverse/abc/0'):
resp = client.get(path)
assert resp.status_code == 400, f"{path} -> {resp.status_code}"
print(" PASS: out-of-range / unparseable input -> 400")
def test_cache_hit_serves_without_recompute():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon',
return_value={'name': 'X'}) as m_photon, \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=None):
client = _client()
client.get('/api/reverse/12.3456/-65.4321')
client.get('/api/reverse/12.3456/-65.4321') # same key (rounded) -> cached
assert m_photon.call_count == 1, f"expected 1 compute, got {m_photon.call_count}"
print(" PASS: second identical request served from cache (no recompute)")
def test_real_timezone_db():
if not os.path.exists(netsyms_api._TZ_DB_PATH):
print(" SKIP: real timezone test (timezones.sqlite not present)")
return
assert netsyms_api._reverse_timezone(43.6150, -116.2023) == 'America/Boise'
assert netsyms_api._reverse_timezone(40.7128, -74.0060) == 'America/New_York'
print(" PASS: real timezones.sqlite point-in-polygon")
if __name__ == '__main__':
print("Running reverse-bundle tests...")
test_happy_path()
test_negative_and_integer_coords_parse()
test_partial_failure_returns_200_with_nulls()
test_ocean_point_mostly_null()
test_invalid_input_400()
test_cache_hit_serves_without_recompute()
test_real_timezone_db()
print("All tests passed.")

154
lib/wiki_index.py Normal file
View file

@ -0,0 +1,154 @@
"""
Wiki location index lookup.
Provides wiki summaries, URLs, and population data from the wiki_index.db
for place detail enrichment. Read-only, opened once at startup.
DB path: /opt/recon/data/wiki_index.db
"""
import os
import sqlite3
from .utils import setup_logging
logger = setup_logging('recon.wiki_index')
_db_conn = None
_zim_books = {}
def _get_db():
"""Return a module-level SQLite connection (lazy init, read-only)."""
global _db_conn, _zim_books
if _db_conn is not None:
return _db_conn
db_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'data', 'wiki_index.db'
)
if not os.path.exists(db_path):
logger.warning(f"Wiki index DB not found at {db_path}")
return None
try:
# Open read-only with URI
_db_conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, check_same_thread=False)
_db_conn.row_factory = sqlite3.Row
# Load zim_books for URL construction
rows = _db_conn.execute("SELECT book_type, public_url FROM zim_books").fetchall()
for row in rows:
_zim_books[row['book_type']] = row['public_url']
logger.info(f"Wiki index DB ready at {db_path} ({len(_zim_books)} ZIM books)")
return _db_conn
except Exception as e:
logger.error(f"Failed to open wiki index DB: {e}")
return None
def lookup_wiki(place_name, osm_key, osm_value, state, country_code):
"""
Look up wiki data for a place by name and country, with optional state.
Args:
place_name: Name of the place (e.g., "Twin Falls")
osm_key: OSM key (unused, kept for API compatibility)
osm_value: OSM value (unused, kept for API compatibility)
state: State/province name (may be None or empty)
country_code: ISO country code (e.g., "us", "ca")
Returns:
dict with wiki_summary, wiki_url, wikivoyage_url, wiki_population
or None if no match found.
"""
db = _get_db()
if db is None:
return None
# Normalize inputs
place_name = (place_name or '').strip()
state = (state or '').strip() if state else ''
country_code = (country_code or '').strip().lower()
if not place_name or not country_code:
return None
try:
row = None
# Try exact match with state first (if state provided)
if state:
row = db.execute("""
SELECT
summary,
wikipedia_title,
wikivoyage_title,
wikipedia_exists,
wikivoyage_exists,
wiki_population
FROM wiki_places
WHERE place_name = ?
AND state = ?
AND country_code = ?
AND summary IS NOT NULL
ORDER BY importance DESC
LIMIT 1
""", (place_name, state, country_code)).fetchone()
# Fall back to name + country only (for places without state in query)
if not row:
row = db.execute("""
SELECT
summary,
wikipedia_title,
wikivoyage_title,
wikipedia_exists,
wikivoyage_exists,
wiki_population
FROM wiki_places
WHERE place_name = ?
AND country_code = ?
AND summary IS NOT NULL
ORDER BY importance DESC
LIMIT 1
""", (place_name, country_code)).fetchone()
if not row:
return None
result = {}
# Summary
if row['summary']:
result['wiki_summary'] = row['summary']
# Wikipedia URL
if row['wikipedia_exists'] and row['wikipedia_title'] and 'wikipedia' in _zim_books:
base_url = _zim_books['wikipedia']
title = row['wikipedia_title'].replace(' ', '_')
result['wiki_url'] = f"{base_url}/A/{title}"
# Wikivoyage URL
if row['wikivoyage_exists'] and row['wikivoyage_title'] and 'wikivoyage' in _zim_books:
base_url = _zim_books['wikivoyage']
title = row['wikivoyage_title'].replace(' ', '_')
result['wikivoyage_url'] = f"{base_url}/A/{title}"
# Population
if row['wiki_population']:
result['wiki_population'] = row['wiki_population']
return result if result else None
except Exception as e:
logger.warning(f"Wiki lookup error for {place_name}: {e}")
return None
def is_available():
"""Check if the wiki index DB is available."""
return _get_db() is not None

View file

@ -3,7 +3,6 @@ anyio==4.12.1
babel==2.18.0
beautifulsoup4==4.14.3
blinker==1.9.0
cachetools==7.1.3
certifi==2026.1.4
cffi==2.0.0
charset-normalizer==3.4.4