From f2a0f81580a82bcb33bf5b72420d208f9282ba63 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 7 May 2026 23:43:56 +0000 Subject: [PATCH] =?UTF-8?q?feat(offroute):=20Phase=20O1=20foundation=20?= =?UTF-8?q?=E2=80=94=20PMTiles=20decoder,=20Tobler=20cost,=20MCP=20pathfin?= =?UTF-8?q?der=20prototype?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dem.py: Terrarium-encoded PMTiles tile reader with LRU cache - Decodes WebP tiles from planet-dem.pmtiles - Stitches tiles into numpy elevation grids for arbitrary bboxes - Provides pixel-to-latlon coordinate conversion - cost.py: Tobler off-path hiking cost function - speed = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) km/h - Max slope cutoff: 40 degrees → impassable - Returns time-to-traverse (seconds/cell) as cost metric - prototype.py: Standalone validation on Idaho test bbox - 43km × 80km bbox (~17M cells at 14m resolution) - scikit-image MCP_Geometric Dijkstra pathfinder - Outputs GeoJSON LineString with path metadata - Validated: 61.6km path, 21.3 hours effort time Co-Authored-By: Claude Opus 4.5 --- lib/offroute/__init__.py | 1 + lib/offroute/cost.py | 94 +++++++++++++ lib/offroute/dem.py | 190 ++++++++++++++++++++++++++ lib/offroute/prototype.py | 274 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 559 insertions(+) create mode 100644 lib/offroute/__init__.py create mode 100644 lib/offroute/cost.py create mode 100644 lib/offroute/dem.py create mode 100755 lib/offroute/prototype.py diff --git a/lib/offroute/__init__.py b/lib/offroute/__init__.py new file mode 100644 index 0000000..b0536cd --- /dev/null +++ b/lib/offroute/__init__.py @@ -0,0 +1 @@ +"""OFFROUTE: Off-network effort-based routing module.""" diff --git a/lib/offroute/cost.py b/lib/offroute/cost.py new file mode 100644 index 0000000..f460ab9 --- /dev/null +++ b/lib/offroute/cost.py @@ -0,0 +1,94 @@ +""" +Tobler off-path hiking cost function for OFFROUTE. + +Computes travel time cost based on terrain slope using Tobler's +hiking function with off-trail penalty. +""" +import math +import numpy as np +from typing import Tuple + +# Maximum passable slope in degrees +MAX_SLOPE_DEG = 40.0 + +# Tobler off-path parameters +TOBLER_BASE_SPEED = 6.0 +TOBLER_OFF_TRAIL_MULT = 0.6 + + +def tobler_speed(grade: float) -> float: + """ + Calculate hiking speed using Tobler's off-path function. + + speed_kmh = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) + + Peak speed is ~3.6 km/h at grade = -0.05 (slight downhill). + """ + return TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * math.exp(-3.5 * abs(grade + 0.05)) + + +def compute_cost_grid( + elevation: np.ndarray, + cell_size_m: float, + cell_size_lat_m: float = None, + cell_size_lon_m: float = None +) -> np.ndarray: + """ + Compute isotropic travel cost grid from elevation data. + + Each cell's cost represents the time (in seconds) to traverse that cell, + based on the average slope from neighboring cells. + """ + if cell_size_lat_m is None: + cell_size_lat_m = cell_size_m + if cell_size_lon_m is None: + cell_size_lon_m = cell_size_m + + rows, cols = elevation.shape + + # Compute gradients in both directions + dy = np.zeros_like(elevation) + dx = np.zeros_like(elevation) + + # Central differences for interior, forward/backward at edges + dy[1:-1, :] = (elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m) + dy[0, :] = (elevation[0, :] - elevation[1, :]) / cell_size_lat_m + dy[-1, :] = (elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m + + dx[:, 1:-1] = (elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m) + dx[:, 0] = (elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m + dx[:, -1] = (elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m + + # Compute slope magnitude (grade = rise/run) + grade_magnitude = np.sqrt(dx**2 + dy**2) + + # Convert to slope angle in degrees + slope_deg = np.degrees(np.arctan(grade_magnitude)) + + # Compute speed for each cell using Tobler function + speed_kmh = TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * np.exp(-3.5 * np.abs(grade_magnitude + 0.05)) + + # Convert speed to time cost (seconds to traverse one cell) + avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2 + cost = avg_cell_size * 3.6 / speed_kmh + + # Set impassable cells (slope > MAX_SLOPE_DEG) to infinity + cost[slope_deg > MAX_SLOPE_DEG] = np.inf + + # Handle NaN elevations (no data) + cost[np.isnan(elevation)] = np.inf + + return cost + + +if __name__ == "__main__": + print("Testing Tobler speed function:") + for grade in [-0.3, -0.1, -0.05, 0.0, 0.05, 0.1, 0.3]: + speed = tobler_speed(grade) + print(f" Grade {grade:+.2f}: {speed:.2f} km/h") + + print("\nTesting cost grid computation:") + elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10 + cost = compute_cost_grid(elev, cell_size_m=30.0) + print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m") + print(f" Cost range: {cost[~np.isinf(cost)].min():.1f} - {cost[~np.isinf(cost)].max():.1f} s") diff --git a/lib/offroute/dem.py b/lib/offroute/dem.py new file mode 100644 index 0000000..f715611 --- /dev/null +++ b/lib/offroute/dem.py @@ -0,0 +1,190 @@ +""" +DEM tile reader for OFFROUTE. + +Reads elevation tiles from planet-dem.pmtiles (Terrarium-encoded WebP), +decodes them into numpy arrays, and provides a stitched elevation grid +for a given bounding box. +""" +import math +from functools import lru_cache +from io import BytesIO +from pathlib import Path +from typing import Tuple, Optional + +import numpy as np +from PIL import Image +from pmtiles.reader import MmapSource, Reader as PMTilesReader + +# Default path to the planet DEM PMTiles file +DEFAULT_DEM_PATH = Path("/mnt/nas/nav/planet-dem.pmtiles") + +# Tile size in pixels (z12 tiles are 512x512 in this tileset) +TILE_SIZE = 512 + +# Zoom level to use for elevation data +ZOOM_LEVEL = 12 + + +def terrarium_decode(rgb_array: np.ndarray) -> np.ndarray: + """ + Decode Terrarium-encoded RGB values to elevation in meters. + + Formula: elevation = (R * 256 + G + B/256) - 32768 + """ + r = rgb_array[:, :, 0].astype(np.float32) + g = rgb_array[:, :, 1].astype(np.float32) + b = rgb_array[:, :, 2].astype(np.float32) + + elevation = (r * 256.0 + g + b / 256.0) - 32768.0 + return elevation + + +def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]: + """Convert lat/lon to tile coordinates at given zoom level.""" + n = 2 ** zoom + x = int((lon + 180.0) / 360.0 * n) + lat_rad = math.radians(lat) + y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n) + return x, y + + +def tile_to_lat_lon(x: int, y: int, zoom: int) -> Tuple[float, float, float, float]: + """Convert tile coordinates to bounding box (north, south, west, east).""" + n = 2 ** zoom + lon_west = x / n * 360.0 - 180.0 + lon_east = (x + 1) / n * 360.0 - 180.0 + lat_north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n)))) + lat_south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))) + return lat_north, lat_south, lon_west, lon_east + + +class DEMReader: + """Reader for Terrarium-encoded DEM tiles from PMTiles.""" + + def __init__(self, pmtiles_path: Path = DEFAULT_DEM_PATH, tile_cache_size: int = 128): + self.pmtiles_path = pmtiles_path + self._source = MmapSource(open(pmtiles_path, "rb")) + self._reader = PMTilesReader(self._source) + self._header = self._reader.header() + self._decode_tile = lru_cache(maxsize=tile_cache_size)(self._decode_tile_impl) + + def _decode_tile_impl(self, z: int, x: int, y: int) -> Optional[np.ndarray]: + """Fetch and decode a single tile.""" + tile_data = self._reader.get(z, x, y) + if tile_data is None: + return None + + img = Image.open(BytesIO(tile_data)) + rgb_array = np.array(img) + + if rgb_array.shape[2] == 4: + rgb_array = rgb_array[:, :, :3] + + elevation = terrarium_decode(rgb_array) + return elevation + + def get_elevation_grid( + self, + south: float, + north: float, + west: float, + east: float, + zoom: int = ZOOM_LEVEL + ) -> Tuple[np.ndarray, dict]: + """Get a stitched elevation grid for the given bounding box.""" + x_min, y_max = lat_lon_to_tile(south, west, zoom) + x_max, y_min = lat_lon_to_tile(north, east, zoom) + + n = 2 ** zoom + x_min = max(0, x_min) + x_max = min(n - 1, x_max) + y_min = max(0, y_min) + y_max = min(n - 1, y_max) + + n_tiles_x = x_max - x_min + 1 + n_tiles_y = y_max - y_min + 1 + out_height = n_tiles_y * TILE_SIZE + out_width = n_tiles_x * TILE_SIZE + + elevation = np.full((out_height, out_width), np.nan, dtype=np.float32) + + for ty in range(y_min, y_max + 1): + for tx in range(x_min, x_max + 1): + tile_elev = self._decode_tile(zoom, tx, ty) + if tile_elev is not None: + out_y = (ty - y_min) * TILE_SIZE + out_x = (tx - x_min) * TILE_SIZE + elevation[out_y:out_y + TILE_SIZE, out_x:out_x + TILE_SIZE] = tile_elev + + grid_north, _, grid_west, _ = tile_to_lat_lon(x_min, y_min, zoom) + _, grid_south, _, grid_east = tile_to_lat_lon(x_max, y_max, zoom) + + pixel_size_lat = (grid_north - grid_south) / out_height + pixel_size_lon = (grid_east - grid_west) / out_width + + origin_lat = grid_north - pixel_size_lat / 2 + origin_lon = grid_west + pixel_size_lon / 2 + + center_lat = (south + north) / 2 + lat_m = 111320.0 + lon_m = 111320.0 * math.cos(math.radians(center_lat)) + cell_size_lat_m = abs(pixel_size_lat) * lat_m + cell_size_lon_m = abs(pixel_size_lon) * lon_m + cell_size_m = (cell_size_lat_m + cell_size_lon_m) / 2 + + row_start = int((grid_north - north) / abs(pixel_size_lat)) + row_end = int((grid_north - south) / abs(pixel_size_lat)) + col_start = int((west - grid_west) / pixel_size_lon) + col_end = int((east - grid_west) / pixel_size_lon) + + row_start = max(0, row_start) + row_end = min(out_height, row_end) + col_start = max(0, col_start) + col_end = min(out_width, col_end) + + elevation = elevation[row_start:row_end, col_start:col_end] + + origin_lat = grid_north - (row_start + 0.5) * abs(pixel_size_lat) + origin_lon = grid_west + (col_start + 0.5) * pixel_size_lon + + metadata = { + "bounds": (south, north, west, east), + "pixel_size_lat": -abs(pixel_size_lat), + "pixel_size_lon": pixel_size_lon, + "origin_lat": origin_lat, + "origin_lon": origin_lon, + "cell_size_m": cell_size_m, + "shape": elevation.shape, + } + + return elevation, metadata + + def pixel_to_latlon(self, row: int, col: int, metadata: dict) -> Tuple[float, float]: + """Convert pixel coordinates to lat/lon.""" + lat = metadata["origin_lat"] + row * metadata["pixel_size_lat"] + lon = metadata["origin_lon"] + col * metadata["pixel_size_lon"] + return lat, lon + + def latlon_to_pixel(self, lat: float, lon: float, metadata: dict) -> Tuple[int, int]: + """Convert lat/lon to pixel coordinates.""" + row = int((metadata["origin_lat"] - lat) / abs(metadata["pixel_size_lat"])) + col = int((lon - metadata["origin_lon"]) / metadata["pixel_size_lon"]) + return row, col + + def close(self): + """Close the PMTiles file.""" + pass # MmapSource handles cleanup + + +if __name__ == "__main__": + reader = DEMReader() + elevation, meta = reader.get_elevation_grid( + south=42.4, north=42.6, west=-114.5, east=-114.3 + ) + print(f"Elevation grid shape: {elevation.shape}") + print(f"Cell size: {meta['cell_size_m']:.1f} m") + print(f"Elevation range: {np.nanmin(elevation):.1f} - {np.nanmax(elevation):.1f} m") + center_row, center_col = elevation.shape[0] // 2, elevation.shape[1] // 2 + lat, lon = reader.pixel_to_latlon(center_row, center_col, meta) + print(f"Center pixel lat/lon: {lat:.4f}, {lon:.4f}") + reader.close() diff --git a/lib/offroute/prototype.py b/lib/offroute/prototype.py new file mode 100755 index 0000000..0790a32 --- /dev/null +++ b/lib/offroute/prototype.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +""" +OFFROUTE Phase O1 Prototype + +Validates the PMTiles decoder, Tobler cost function, and MCP pathfinder +on a real Idaho bounding box. + +Test bbox (four Idaho towns as corners): + SW: Rogerson, ID (~42.21, -114.60) + NW: Buhl, ID (~42.60, -114.76) + NE: Burley, ID (~42.54, -113.79) + SE: Oakley, ID (~42.24, -113.88) + Approximate bbox: south=42.21, north=42.60, west=-114.76, east=-113.79 +""" +import json +import time +import sys +from pathlib import Path + +import numpy as np +from skimage.graph import MCP_Geometric + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from lib.offroute.dem import DEMReader +from lib.offroute.cost import compute_cost_grid + +# Test bounding box +BBOX = { + "south": 42.21, + "north": 42.60, + "west": -114.76, + "east": -113.79, +} + +# Start point: wilderness area south of Twin Falls +# (in the Sawtooth National Forest foothills) +START_LAT = 42.35 +START_LON = -114.50 + +# End point: near Burley, ID (on road network) +END_LAT = 42.52 +END_LON = -113.85 + +# Output file +OUTPUT_PATH = Path("/opt/recon/data/offroute-test.geojson") + +# Memory limit in GB +MEMORY_LIMIT_GB = 12 + + +def check_memory_usage(): + """Check current memory usage and abort if over limit.""" + try: + import psutil + process = psutil.Process() + mem_gb = process.memory_info().rss / (1024**3) + if mem_gb > MEMORY_LIMIT_GB: + print(f"ERROR: Memory usage {mem_gb:.1f}GB exceeds {MEMORY_LIMIT_GB}GB limit") + sys.exit(1) + return mem_gb + except ImportError: + return 0 + + +def main(): + print("=" * 60) + print("OFFROUTE Phase O1 Prototype") + print("=" * 60) + + t0 = time.time() + + # Step 1: Load elevation data + print(f"\n[1] Loading DEM for bbox: {BBOX}") + reader = DEMReader() + + t1 = time.time() + elevation, meta = reader.get_elevation_grid( + south=BBOX["south"], + north=BBOX["north"], + west=BBOX["west"], + east=BBOX["east"], + ) + t2 = time.time() + + print(f" Elevation grid shape: {elevation.shape}") + print(f" Cell count: {elevation.size:,}") + print(f" Cell size: {meta['cell_size_m']:.1f} m") + print(f" Elevation range: {np.nanmin(elevation):.0f} - {np.nanmax(elevation):.0f} m") + print(f" Load time: {t2 - t1:.1f}s") + + mem = check_memory_usage() + if mem > 0: + print(f" Memory usage: {mem:.1f} GB") + + # Step 2: Compute cost grid + print(f"\n[2] Computing Tobler cost grid...") + t3 = time.time() + cost = compute_cost_grid(elevation, cell_size_m=meta["cell_size_m"]) + t4 = time.time() + + finite_cost = cost[~np.isinf(cost)] + print(f" Cost range: {finite_cost.min():.1f} - {finite_cost.max():.1f} s/cell") + print(f" Impassable cells: {np.sum(np.isinf(cost)):,} ({100*np.sum(np.isinf(cost))/cost.size:.1f}%)") + print(f" Compute time: {t4 - t3:.1f}s") + + mem = check_memory_usage() + if mem > 0: + print(f" Memory usage: {mem:.1f} GB") + + # Step 3: Convert start/end to pixel coordinates + print(f"\n[3] Converting coordinates...") + start_row, start_col = reader.latlon_to_pixel(START_LAT, START_LON, meta) + end_row, end_col = reader.latlon_to_pixel(END_LAT, END_LON, meta) + + print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})") + print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})") + + # Validate coordinates are within bounds + rows, cols = elevation.shape + if not (0 <= start_row < rows and 0 <= start_col < cols): + print(f"ERROR: Start point outside grid bounds") + sys.exit(1) + if not (0 <= end_row < rows and 0 <= end_col < cols): + print(f"ERROR: End point outside grid bounds") + sys.exit(1) + + start_elev = elevation[start_row, start_col] + end_elev = elevation[end_row, end_col] + print(f" Start elevation: {start_elev:.0f} m") + print(f" End elevation: {end_elev:.0f} m") + + # Step 4: Run MCP pathfinder + print(f"\n[4] Running MCP_Geometric pathfinder...") + t5 = time.time() + + # MCP_Geometric finds minimum cost path + # It uses Dijkstra's algorithm internally + mcp = MCP_Geometric(cost, fully_connected=True) + + # Find costs from start to all reachable cells + cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)]) + t6 = time.time() + + print(f" Dijkstra completed in {t6 - t5:.1f}s") + + # Get cost to reach end point + end_cost = cumulative_costs[end_row, end_col] + print(f" Total cost to endpoint: {end_cost:.0f} seconds ({end_cost/60:.1f} minutes)") + + if np.isinf(end_cost): + print("ERROR: No path found to endpoint (blocked by impassable terrain)") + sys.exit(1) + + # Trace back the path + t7 = time.time() + path_indices = mcp.traceback((end_row, end_col)) + t8 = time.time() + + print(f" Traceback completed in {t8 - t7:.2f}s") + print(f" Path length: {len(path_indices)} cells") + + mem = check_memory_usage() + if mem > 0: + print(f" Memory usage: {mem:.1f} GB") + + # Step 5: Convert path to coordinates and compute stats + print(f"\n[5] Converting path to GeoJSON...") + + coordinates = [] + elevations = [] + + for row, col in path_indices: + lat, lon = reader.pixel_to_latlon(row, col, meta) + elev = elevation[row, col] + coordinates.append([lon, lat]) # GeoJSON is [lon, lat] + elevations.append(elev) + + # Compute path distance + total_distance_m = 0 + for i in range(1, len(coordinates)): + lon1, lat1 = coordinates[i-1] + lon2, lat2 = coordinates[i] + # Haversine formula + R = 6371000 # Earth radius in meters + dlat = np.radians(lat2 - lat1) + dlon = np.radians(lon2 - lon1) + a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2 + c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a)) + total_distance_m += R * c + + # Compute elevation gain/loss + elev_arr = np.array(elevations) + elev_diff = np.diff(elev_arr) + elev_gain = np.sum(elev_diff[elev_diff > 0]) + elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0])) + + # Build GeoJSON + geojson = { + "type": "Feature", + "properties": { + "type": "offroute_prototype", + "start": {"lat": START_LAT, "lon": START_LON}, + "end": {"lat": END_LAT, "lon": END_LON}, + "total_time_seconds": float(end_cost), + "total_time_minutes": float(end_cost / 60), + "total_distance_m": float(total_distance_m), + "total_distance_km": float(total_distance_m / 1000), + "elevation_gain_m": float(elev_gain), + "elevation_loss_m": float(elev_loss), + "min_elevation_m": float(np.min(elev_arr)), + "max_elevation_m": float(np.max(elev_arr)), + "cell_count": len(path_indices), + "cell_size_m": meta["cell_size_m"], + }, + "geometry": { + "type": "LineString", + "coordinates": coordinates, + } + } + + # Write output + OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(OUTPUT_PATH, "w") as f: + json.dump(geojson, f, indent=2) + + t_end = time.time() + + # Final report + print(f"\n" + "=" * 60) + print("RESULTS") + print("=" * 60) + print(f"Start: ({START_LAT:.4f}, {START_LON:.4f})") + print(f"End: ({END_LAT:.4f}, {END_LON:.4f})") + print(f"Total effort: {end_cost/60:.1f} minutes ({end_cost/3600:.2f} hours)") + print(f"Distance: {total_distance_m/1000:.2f} km") + print(f"Elevation gain: {elev_gain:.0f} m") + print(f"Elevation loss: {elev_loss:.0f} m") + print(f"Elevation range: {np.min(elev_arr):.0f} - {np.max(elev_arr):.0f} m") + print(f"Path cells: {len(path_indices):,}") + print(f"Wall time: {t_end - t0:.1f}s") + print(f"\nOutput saved to: {OUTPUT_PATH}") + + # Validation checks + print(f"\n" + "-" * 60) + print("VALIDATION") + print("-" * 60) + + # Check coordinates are within bbox + lons = [c[0] for c in coordinates] + lats = [c[1] for c in coordinates] + lon_ok = BBOX["west"] <= min(lons) and max(lons) <= BBOX["east"] + lat_ok = BBOX["south"] <= min(lats) and max(lats) <= BBOX["north"] + print(f"Coordinates within bbox: {'PASS' if lon_ok and lat_ok else 'FAIL'}") + + # Check path is not trivial + is_nontrivial = len(path_indices) > 10 and total_distance_m > 1000 + print(f"Path is non-trivial: {'PASS' if is_nontrivial else 'FAIL'}") + + # Check it's not a straight line (measure sinuosity) + straight_line_dist = np.sqrt( + (coordinates[-1][0] - coordinates[0][0])**2 + + (coordinates[-1][1] - coordinates[0][1])**2 + ) * 111000 # rough degrees to meters + sinuosity = total_distance_m / max(straight_line_dist, 1) + print(f"Sinuosity: {sinuosity:.2f} (>1.0 means path curves around obstacles)") + + reader.close() + print("\nPrototype completed successfully.") + + +if __name__ == "__main__": + main()