recon/lib/offroute/barriers.py
Matt bc463188d5 feat(offroute): Phase O4 — multi-mode cost functions (foot/mtb/atv/vehicle)
- Add ModeProfile dataclass for data-driven mode configuration
- Implement three speed functions:
  * Tobler off-path hiking (foot)
  * Herzog wheeled-transport polynomial (mtb/atv)
  * Linear speed degradation (vehicle)
- Add WildernessReader for PAD-US Des_Tp=WA wilderness areas
- Mode-specific terrain friction overrides:
  * Forest impassable for ATV/vehicle, high friction for MTB
  * Wetland/mangrove impassable for all wheeled modes
- Trail access rules:
  * Foot trails (value 25) impassable for ATV/vehicle
- Wilderness blocking for mtb/atv/vehicle modes
- Vehicle mode allows flat grassland/cropland traversal
- Memory optimization: limit entry points, constrain bbox size
- Update router to pass mode and wilderness to cost function
- Add vehicle to API mode validation

Validated all four modes with test route:
- foot: 0.46km off-network, 12.11km network, 89% on trail
- mtb: 0.47km off-network, 13.13km network, 90% on trail
- atv: 0.47km off-network, 12.81km network, 90% on trail
- vehicle: 0.46km off-network, 12.81km network, 89% on trail

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:11:56 +00:00

440 lines
15 KiB
Python

"""
PAD-US barrier and wilderness layers for OFFROUTE.
Provides access to:
1. Barrier raster (Pub_Access = 'XA' - closed/restricted areas)
2. Wilderness raster (Des_Tp = 'WA' - designated wilderness areas)
Build functions rasterize PAD-US geodatabase to aligned GeoTIFFs.
Runtime functions read the rasters and resample to match elevation grids.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
import subprocess
import tempfile
import os
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for barriers layer support")
# Paths
DEFAULT_BARRIERS_PATH = Path("/mnt/nav/worldcover/padus_barriers.tif")
DEFAULT_WILDERNESS_PATH = Path("/mnt/nav/worldcover/wilderness.tif")
PADUS_GDB_PATH = Path("/mnt/nav/padus/PADUS4_0_Geodatabase.gdb")
PADUS_LAYER = "PADUS4_0Combined_Proclamation_Marine_Fee_Designation_Easement"
# CONUS bounding box in WGS84
CONUS_BOUNDS = {
"west": -125.0,
"east": -66.0,
"south": 24.0,
"north": 50.0,
}
# Resolution in degrees (~30m at mid-latitudes)
PIXEL_SIZE = 0.0003 # ~33m
class BarrierReader:
"""Reader for PAD-US barrier raster (closed/restricted areas)."""
def __init__(self, barrier_path: Path = DEFAULT_BARRIERS_PATH):
self.barrier_path = barrier_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.barrier_path.exists():
raise FileNotFoundError(
f"Barrier raster not found at {self.barrier_path}. "
f"Run build_barriers_raster() first."
)
self._dataset = rasterio.open(self.barrier_path)
return self._dataset
def get_barrier_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get barrier values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 barrier values:
255 = closed/restricted (impassable when respect_boundaries=True)
0 = public/accessible
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
barriers = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return barriers
def sample_point(self, lat: float, lon: float) -> int:
"""Sample barrier value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
class WildernessReader:
"""Reader for PAD-US wilderness raster (designated wilderness areas)."""
def __init__(self, wilderness_path: Path = DEFAULT_WILDERNESS_PATH):
self.wilderness_path = wilderness_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.wilderness_path.exists():
raise FileNotFoundError(
f"Wilderness raster not found at {self.wilderness_path}. "
f"Run build_wilderness_raster() first."
)
self._dataset = rasterio.open(self.wilderness_path)
return self._dataset
def get_wilderness_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get wilderness values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 wilderness values:
255 = designated wilderness area
0 = not wilderness
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
wilderness = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return wilderness
def sample_point(self, lat: float, lon: float) -> int:
"""Sample wilderness value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def build_barriers_raster(
output_path: Path = DEFAULT_BARRIERS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US barriers raster from the source geodatabase.
Extracts polygons where Pub_Access = 'XA' (Closed) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US barriers raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
closed_gpkg = Path(tmpdir) / "closed_areas.gpkg"
print(f"\n[1/3] Extracting closed areas (Pub_Access = 'XA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(closed_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Pub_Access = 'XA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "closed_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(closed_gpkg), "closed_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing closed areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "closed_areas",
str(closed_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
closed_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {closed_count} closed cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
def build_wilderness_raster(
output_path: Path = DEFAULT_WILDERNESS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US wilderness raster from the source geodatabase.
Extracts polygons where Des_Tp = 'WA' (Wilderness Area) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US wilderness raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
wilderness_gpkg = Path(tmpdir) / "wilderness_areas.gpkg"
print(f"\n[1/3] Extracting wilderness areas (Des_Tp = 'WA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(wilderness_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Des_Tp = 'WA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "wilderness_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(wilderness_gpkg), "wilderness_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing wilderness areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "wilderness_areas",
str(wilderness_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
wilderness_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {wilderness_count} wilderness cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == "build":
print("=" * 60)
print("PAD-US Barriers Raster Build")
print("=" * 60)
build_barriers_raster()
elif cmd == "build-wilderness":
print("=" * 60)
print("PAD-US Wilderness Raster Build")
print("=" * 60)
build_wilderness_raster()
elif cmd == "build-all":
print("=" * 60)
print("Building all PAD-US rasters")
print("=" * 60)
build_barriers_raster()
print("\n")
build_wilderness_raster()
else:
print(f"Unknown command: {cmd}")
print("Usage:")
print(" python barriers.py build # Build barriers raster")
print(" python barriers.py build-wilderness # Build wilderness raster")
print(" python barriers.py build-all # Build both rasters")
sys.exit(1)
else:
# Test readers
print("Testing BarrierReader...")
if not DEFAULT_BARRIERS_PATH.exists():
print(f"Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run: python barriers.py build")
sys.exit(1)
reader = BarrierReader()
barriers = reader.get_barrier_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"\nBarrier grid shape: {barriers.shape}")
print(f"Unique values: {np.unique(barriers)}")
closed_cells = np.sum(barriers == 255)
print(f"Closed cells: {closed_cells} ({100*closed_cells/barriers.size:.2f}%)")
reader.close()
print("\nTesting WildernessReader...")
if not DEFAULT_WILDERNESS_PATH.exists():
print(f"Wilderness raster not found at {DEFAULT_WILDERNESS_PATH}")
print(f"Run: python barriers.py build-wilderness")
else:
wilderness_reader = WildernessReader()
wilderness = wilderness_reader.get_wilderness_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"Wilderness grid shape: {wilderness.shape}")
print(f"Unique values: {np.unique(wilderness)}")
wilderness_cells = np.sum(wilderness == 255)
print(f"Wilderness cells: {wilderness_cells} ({100*wilderness_cells/wilderness.size:.2f}%)")
wilderness_reader.close()
print("\nDone.")