feat(offroute): Phase O1 foundation — PMTiles decoder, Tobler cost, MCP pathfinder prototype

- dem.py: Terrarium-encoded PMTiles tile reader with LRU cache
  - Decodes WebP tiles from planet-dem.pmtiles
  - Stitches tiles into numpy elevation grids for arbitrary bboxes
  - Provides pixel-to-latlon coordinate conversion

- cost.py: Tobler off-path hiking cost function
  - speed = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) km/h
  - Max slope cutoff: 40 degrees → impassable
  - Returns time-to-traverse (seconds/cell) as cost metric

- prototype.py: Standalone validation on Idaho test bbox
  - 43km × 80km bbox (~17M cells at 14m resolution)
  - scikit-image MCP_Geometric Dijkstra pathfinder
  - Outputs GeoJSON LineString with path metadata
  - Validated: 61.6km path, 21.3 hours effort time

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-05-07 23:43:56 +00:00
commit f2a0f81580
4 changed files with 559 additions and 0 deletions

1
lib/offroute/__init__.py Normal file
View file

@ -0,0 +1 @@
"""OFFROUTE: Off-network effort-based routing module."""

94
lib/offroute/cost.py Normal file
View file

@ -0,0 +1,94 @@
"""
Tobler off-path hiking cost function for OFFROUTE.
Computes travel time cost based on terrain slope using Tobler's
hiking function with off-trail penalty.
"""
import math
import numpy as np
from typing import Tuple
# Maximum passable slope in degrees
MAX_SLOPE_DEG = 40.0
# Tobler off-path parameters
TOBLER_BASE_SPEED = 6.0
TOBLER_OFF_TRAIL_MULT = 0.6
def tobler_speed(grade: float) -> float:
"""
Calculate hiking speed using Tobler's off-path function.
speed_kmh = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|)
Peak speed is ~3.6 km/h at grade = -0.05 (slight downhill).
"""
return TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * math.exp(-3.5 * abs(grade + 0.05))
def compute_cost_grid(
elevation: np.ndarray,
cell_size_m: float,
cell_size_lat_m: float = None,
cell_size_lon_m: float = None
) -> np.ndarray:
"""
Compute isotropic travel cost grid from elevation data.
Each cell's cost represents the time (in seconds) to traverse that cell,
based on the average slope from neighboring cells.
"""
if cell_size_lat_m is None:
cell_size_lat_m = cell_size_m
if cell_size_lon_m is None:
cell_size_lon_m = cell_size_m
rows, cols = elevation.shape
# Compute gradients in both directions
dy = np.zeros_like(elevation)
dx = np.zeros_like(elevation)
# Central differences for interior, forward/backward at edges
dy[1:-1, :] = (elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m)
dy[0, :] = (elevation[0, :] - elevation[1, :]) / cell_size_lat_m
dy[-1, :] = (elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m
dx[:, 1:-1] = (elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m)
dx[:, 0] = (elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m
dx[:, -1] = (elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m
# Compute slope magnitude (grade = rise/run)
grade_magnitude = np.sqrt(dx**2 + dy**2)
# Convert to slope angle in degrees
slope_deg = np.degrees(np.arctan(grade_magnitude))
# Compute speed for each cell using Tobler function
speed_kmh = TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * np.exp(-3.5 * np.abs(grade_magnitude + 0.05))
# Convert speed to time cost (seconds to traverse one cell)
avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2
cost = avg_cell_size * 3.6 / speed_kmh
# Set impassable cells (slope > MAX_SLOPE_DEG) to infinity
cost[slope_deg > MAX_SLOPE_DEG] = np.inf
# Handle NaN elevations (no data)
cost[np.isnan(elevation)] = np.inf
return cost
if __name__ == "__main__":
print("Testing Tobler speed function:")
for grade in [-0.3, -0.1, -0.05, 0.0, 0.05, 0.1, 0.3]:
speed = tobler_speed(grade)
print(f" Grade {grade:+.2f}: {speed:.2f} km/h")
print("\nTesting cost grid computation:")
elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10
cost = compute_cost_grid(elev, cell_size_m=30.0)
print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m")
print(f" Cost range: {cost[~np.isinf(cost)].min():.1f} - {cost[~np.isinf(cost)].max():.1f} s")

190
lib/offroute/dem.py Normal file
View file

@ -0,0 +1,190 @@
"""
DEM tile reader for OFFROUTE.
Reads elevation tiles from planet-dem.pmtiles (Terrarium-encoded WebP),
decodes them into numpy arrays, and provides a stitched elevation grid
for a given bounding box.
"""
import math
from functools import lru_cache
from io import BytesIO
from pathlib import Path
from typing import Tuple, Optional
import numpy as np
from PIL import Image
from pmtiles.reader import MmapSource, Reader as PMTilesReader
# Default path to the planet DEM PMTiles file
DEFAULT_DEM_PATH = Path("/mnt/nas/nav/planet-dem.pmtiles")
# Tile size in pixels (z12 tiles are 512x512 in this tileset)
TILE_SIZE = 512
# Zoom level to use for elevation data
ZOOM_LEVEL = 12
def terrarium_decode(rgb_array: np.ndarray) -> np.ndarray:
"""
Decode Terrarium-encoded RGB values to elevation in meters.
Formula: elevation = (R * 256 + G + B/256) - 32768
"""
r = rgb_array[:, :, 0].astype(np.float32)
g = rgb_array[:, :, 1].astype(np.float32)
b = rgb_array[:, :, 2].astype(np.float32)
elevation = (r * 256.0 + g + b / 256.0) - 32768.0
return elevation
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
"""Convert lat/lon to tile coordinates at given zoom level."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
lat_rad = math.radians(lat)
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return x, y
def tile_to_lat_lon(x: int, y: int, zoom: int) -> Tuple[float, float, float, float]:
"""Convert tile coordinates to bounding box (north, south, west, east)."""
n = 2 ** zoom
lon_west = x / n * 360.0 - 180.0
lon_east = (x + 1) / n * 360.0 - 180.0
lat_north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
lat_south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return lat_north, lat_south, lon_west, lon_east
class DEMReader:
"""Reader for Terrarium-encoded DEM tiles from PMTiles."""
def __init__(self, pmtiles_path: Path = DEFAULT_DEM_PATH, tile_cache_size: int = 128):
self.pmtiles_path = pmtiles_path
self._source = MmapSource(open(pmtiles_path, "rb"))
self._reader = PMTilesReader(self._source)
self._header = self._reader.header()
self._decode_tile = lru_cache(maxsize=tile_cache_size)(self._decode_tile_impl)
def _decode_tile_impl(self, z: int, x: int, y: int) -> Optional[np.ndarray]:
"""Fetch and decode a single tile."""
tile_data = self._reader.get(z, x, y)
if tile_data is None:
return None
img = Image.open(BytesIO(tile_data))
rgb_array = np.array(img)
if rgb_array.shape[2] == 4:
rgb_array = rgb_array[:, :, :3]
elevation = terrarium_decode(rgb_array)
return elevation
def get_elevation_grid(
self,
south: float,
north: float,
west: float,
east: float,
zoom: int = ZOOM_LEVEL
) -> Tuple[np.ndarray, dict]:
"""Get a stitched elevation grid for the given bounding box."""
x_min, y_max = lat_lon_to_tile(south, west, zoom)
x_max, y_min = lat_lon_to_tile(north, east, zoom)
n = 2 ** zoom
x_min = max(0, x_min)
x_max = min(n - 1, x_max)
y_min = max(0, y_min)
y_max = min(n - 1, y_max)
n_tiles_x = x_max - x_min + 1
n_tiles_y = y_max - y_min + 1
out_height = n_tiles_y * TILE_SIZE
out_width = n_tiles_x * TILE_SIZE
elevation = np.full((out_height, out_width), np.nan, dtype=np.float32)
for ty in range(y_min, y_max + 1):
for tx in range(x_min, x_max + 1):
tile_elev = self._decode_tile(zoom, tx, ty)
if tile_elev is not None:
out_y = (ty - y_min) * TILE_SIZE
out_x = (tx - x_min) * TILE_SIZE
elevation[out_y:out_y + TILE_SIZE, out_x:out_x + TILE_SIZE] = tile_elev
grid_north, _, grid_west, _ = tile_to_lat_lon(x_min, y_min, zoom)
_, grid_south, _, grid_east = tile_to_lat_lon(x_max, y_max, zoom)
pixel_size_lat = (grid_north - grid_south) / out_height
pixel_size_lon = (grid_east - grid_west) / out_width
origin_lat = grid_north - pixel_size_lat / 2
origin_lon = grid_west + pixel_size_lon / 2
center_lat = (south + north) / 2
lat_m = 111320.0
lon_m = 111320.0 * math.cos(math.radians(center_lat))
cell_size_lat_m = abs(pixel_size_lat) * lat_m
cell_size_lon_m = abs(pixel_size_lon) * lon_m
cell_size_m = (cell_size_lat_m + cell_size_lon_m) / 2
row_start = int((grid_north - north) / abs(pixel_size_lat))
row_end = int((grid_north - south) / abs(pixel_size_lat))
col_start = int((west - grid_west) / pixel_size_lon)
col_end = int((east - grid_west) / pixel_size_lon)
row_start = max(0, row_start)
row_end = min(out_height, row_end)
col_start = max(0, col_start)
col_end = min(out_width, col_end)
elevation = elevation[row_start:row_end, col_start:col_end]
origin_lat = grid_north - (row_start + 0.5) * abs(pixel_size_lat)
origin_lon = grid_west + (col_start + 0.5) * pixel_size_lon
metadata = {
"bounds": (south, north, west, east),
"pixel_size_lat": -abs(pixel_size_lat),
"pixel_size_lon": pixel_size_lon,
"origin_lat": origin_lat,
"origin_lon": origin_lon,
"cell_size_m": cell_size_m,
"shape": elevation.shape,
}
return elevation, metadata
def pixel_to_latlon(self, row: int, col: int, metadata: dict) -> Tuple[float, float]:
"""Convert pixel coordinates to lat/lon."""
lat = metadata["origin_lat"] + row * metadata["pixel_size_lat"]
lon = metadata["origin_lon"] + col * metadata["pixel_size_lon"]
return lat, lon
def latlon_to_pixel(self, lat: float, lon: float, metadata: dict) -> Tuple[int, int]:
"""Convert lat/lon to pixel coordinates."""
row = int((metadata["origin_lat"] - lat) / abs(metadata["pixel_size_lat"]))
col = int((lon - metadata["origin_lon"]) / metadata["pixel_size_lon"])
return row, col
def close(self):
"""Close the PMTiles file."""
pass # MmapSource handles cleanup
if __name__ == "__main__":
reader = DEMReader()
elevation, meta = reader.get_elevation_grid(
south=42.4, north=42.6, west=-114.5, east=-114.3
)
print(f"Elevation grid shape: {elevation.shape}")
print(f"Cell size: {meta['cell_size_m']:.1f} m")
print(f"Elevation range: {np.nanmin(elevation):.1f} - {np.nanmax(elevation):.1f} m")
center_row, center_col = elevation.shape[0] // 2, elevation.shape[1] // 2
lat, lon = reader.pixel_to_latlon(center_row, center_col, meta)
print(f"Center pixel lat/lon: {lat:.4f}, {lon:.4f}")
reader.close()

274
lib/offroute/prototype.py Executable file
View file

@ -0,0 +1,274 @@
#!/usr/bin/env python3
"""
OFFROUTE Phase O1 Prototype
Validates the PMTiles decoder, Tobler cost function, and MCP pathfinder
on a real Idaho bounding box.
Test bbox (four Idaho towns as corners):
SW: Rogerson, ID (~42.21, -114.60)
NW: Buhl, ID (~42.60, -114.76)
NE: Burley, ID (~42.54, -113.79)
SE: Oakley, ID (~42.24, -113.88)
Approximate bbox: south=42.21, north=42.60, west=-114.76, east=-113.79
"""
import json
import time
import sys
from pathlib import Path
import numpy as np
from skimage.graph import MCP_Geometric
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from lib.offroute.dem import DEMReader
from lib.offroute.cost import compute_cost_grid
# Test bounding box
BBOX = {
"south": 42.21,
"north": 42.60,
"west": -114.76,
"east": -113.79,
}
# Start point: wilderness area south of Twin Falls
# (in the Sawtooth National Forest foothills)
START_LAT = 42.35
START_LON = -114.50
# End point: near Burley, ID (on road network)
END_LAT = 42.52
END_LON = -113.85
# Output file
OUTPUT_PATH = Path("/opt/recon/data/offroute-test.geojson")
# Memory limit in GB
MEMORY_LIMIT_GB = 12
def check_memory_usage():
"""Check current memory usage and abort if over limit."""
try:
import psutil
process = psutil.Process()
mem_gb = process.memory_info().rss / (1024**3)
if mem_gb > MEMORY_LIMIT_GB:
print(f"ERROR: Memory usage {mem_gb:.1f}GB exceeds {MEMORY_LIMIT_GB}GB limit")
sys.exit(1)
return mem_gb
except ImportError:
return 0
def main():
print("=" * 60)
print("OFFROUTE Phase O1 Prototype")
print("=" * 60)
t0 = time.time()
# Step 1: Load elevation data
print(f"\n[1] Loading DEM for bbox: {BBOX}")
reader = DEMReader()
t1 = time.time()
elevation, meta = reader.get_elevation_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
)
t2 = time.time()
print(f" Elevation grid shape: {elevation.shape}")
print(f" Cell count: {elevation.size:,}")
print(f" Cell size: {meta['cell_size_m']:.1f} m")
print(f" Elevation range: {np.nanmin(elevation):.0f} - {np.nanmax(elevation):.0f} m")
print(f" Load time: {t2 - t1:.1f}s")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 2: Compute cost grid
print(f"\n[2] Computing Tobler cost grid...")
t3 = time.time()
cost = compute_cost_grid(elevation, cell_size_m=meta["cell_size_m"])
t4 = time.time()
finite_cost = cost[~np.isinf(cost)]
print(f" Cost range: {finite_cost.min():.1f} - {finite_cost.max():.1f} s/cell")
print(f" Impassable cells: {np.sum(np.isinf(cost)):,} ({100*np.sum(np.isinf(cost))/cost.size:.1f}%)")
print(f" Compute time: {t4 - t3:.1f}s")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 3: Convert start/end to pixel coordinates
print(f"\n[3] Converting coordinates...")
start_row, start_col = reader.latlon_to_pixel(START_LAT, START_LON, meta)
end_row, end_col = reader.latlon_to_pixel(END_LAT, END_LON, meta)
print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})")
print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})")
# Validate coordinates are within bounds
rows, cols = elevation.shape
if not (0 <= start_row < rows and 0 <= start_col < cols):
print(f"ERROR: Start point outside grid bounds")
sys.exit(1)
if not (0 <= end_row < rows and 0 <= end_col < cols):
print(f"ERROR: End point outside grid bounds")
sys.exit(1)
start_elev = elevation[start_row, start_col]
end_elev = elevation[end_row, end_col]
print(f" Start elevation: {start_elev:.0f} m")
print(f" End elevation: {end_elev:.0f} m")
# Step 4: Run MCP pathfinder
print(f"\n[4] Running MCP_Geometric pathfinder...")
t5 = time.time()
# MCP_Geometric finds minimum cost path
# It uses Dijkstra's algorithm internally
mcp = MCP_Geometric(cost, fully_connected=True)
# Find costs from start to all reachable cells
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
t6 = time.time()
print(f" Dijkstra completed in {t6 - t5:.1f}s")
# Get cost to reach end point
end_cost = cumulative_costs[end_row, end_col]
print(f" Total cost to endpoint: {end_cost:.0f} seconds ({end_cost/60:.1f} minutes)")
if np.isinf(end_cost):
print("ERROR: No path found to endpoint (blocked by impassable terrain)")
sys.exit(1)
# Trace back the path
t7 = time.time()
path_indices = mcp.traceback((end_row, end_col))
t8 = time.time()
print(f" Traceback completed in {t8 - t7:.2f}s")
print(f" Path length: {len(path_indices)} cells")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 5: Convert path to coordinates and compute stats
print(f"\n[5] Converting path to GeoJSON...")
coordinates = []
elevations = []
for row, col in path_indices:
lat, lon = reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
coordinates.append([lon, lat]) # GeoJSON is [lon, lat]
elevations.append(elev)
# Compute path distance
total_distance_m = 0
for i in range(1, len(coordinates)):
lon1, lat1 = coordinates[i-1]
lon2, lat2 = coordinates[i]
# Haversine formula
R = 6371000 # Earth radius in meters
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
total_distance_m += R * c
# Compute elevation gain/loss
elev_arr = np.array(elevations)
elev_diff = np.diff(elev_arr)
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Build GeoJSON
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_prototype",
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
"total_time_seconds": float(end_cost),
"total_time_minutes": float(end_cost / 60),
"total_distance_m": float(total_distance_m),
"total_distance_km": float(total_distance_m / 1000),
"elevation_gain_m": float(elev_gain),
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"cell_count": len(path_indices),
"cell_size_m": meta["cell_size_m"],
},
"geometry": {
"type": "LineString",
"coordinates": coordinates,
}
}
# Write output
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_PATH, "w") as f:
json.dump(geojson, f, indent=2)
t_end = time.time()
# Final report
print(f"\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
print(f"Start: ({START_LAT:.4f}, {START_LON:.4f})")
print(f"End: ({END_LAT:.4f}, {END_LON:.4f})")
print(f"Total effort: {end_cost/60:.1f} minutes ({end_cost/3600:.2f} hours)")
print(f"Distance: {total_distance_m/1000:.2f} km")
print(f"Elevation gain: {elev_gain:.0f} m")
print(f"Elevation loss: {elev_loss:.0f} m")
print(f"Elevation range: {np.min(elev_arr):.0f} - {np.max(elev_arr):.0f} m")
print(f"Path cells: {len(path_indices):,}")
print(f"Wall time: {t_end - t0:.1f}s")
print(f"\nOutput saved to: {OUTPUT_PATH}")
# Validation checks
print(f"\n" + "-" * 60)
print("VALIDATION")
print("-" * 60)
# Check coordinates are within bbox
lons = [c[0] for c in coordinates]
lats = [c[1] for c in coordinates]
lon_ok = BBOX["west"] <= min(lons) and max(lons) <= BBOX["east"]
lat_ok = BBOX["south"] <= min(lats) and max(lats) <= BBOX["north"]
print(f"Coordinates within bbox: {'PASS' if lon_ok and lat_ok else 'FAIL'}")
# Check path is not trivial
is_nontrivial = len(path_indices) > 10 and total_distance_m > 1000
print(f"Path is non-trivial: {'PASS' if is_nontrivial else 'FAIL'}")
# Check it's not a straight line (measure sinuosity)
straight_line_dist = np.sqrt(
(coordinates[-1][0] - coordinates[0][0])**2 +
(coordinates[-1][1] - coordinates[0][1])**2
) * 111000 # rough degrees to meters
sinuosity = total_distance_m / max(straight_line_dist, 1)
print(f"Sinuosity: {sinuosity:.2f} (>1.0 means path curves around obstacles)")
reader.close()
print("\nPrototype completed successfully.")
if __name__ == "__main__":
main()