mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 14:44:54 +02:00
- Add ModeProfile dataclass for data-driven mode configuration - Implement three speed functions: * Tobler off-path hiking (foot) * Herzog wheeled-transport polynomial (mtb/atv) * Linear speed degradation (vehicle) - Add WildernessReader for PAD-US Des_Tp=WA wilderness areas - Mode-specific terrain friction overrides: * Forest impassable for ATV/vehicle, high friction for MTB * Wetland/mangrove impassable for all wheeled modes - Trail access rules: * Foot trails (value 25) impassable for ATV/vehicle - Wilderness blocking for mtb/atv/vehicle modes - Vehicle mode allows flat grassland/cropland traversal - Memory optimization: limit entry points, constrain bbox size - Update router to pass mode and wilderness to cost function - Add vehicle to API mode validation Validated all four modes with test route: - foot: 0.46km off-network, 12.11km network, 89% on trail - mtb: 0.47km off-network, 13.13km network, 90% on trail - atv: 0.47km off-network, 12.81km network, 90% on trail - vehicle: 0.46km off-network, 12.81km network, 89% on trail Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
440 lines
15 KiB
Python
440 lines
15 KiB
Python
"""
|
|
PAD-US barrier and wilderness layers for OFFROUTE.
|
|
|
|
Provides access to:
|
|
1. Barrier raster (Pub_Access = 'XA' - closed/restricted areas)
|
|
2. Wilderness raster (Des_Tp = 'WA' - designated wilderness areas)
|
|
|
|
Build functions rasterize PAD-US geodatabase to aligned GeoTIFFs.
|
|
Runtime functions read the rasters and resample to match elevation grids.
|
|
"""
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
|
|
try:
|
|
import rasterio
|
|
from rasterio.windows import from_bounds
|
|
from rasterio.enums import Resampling
|
|
except ImportError:
|
|
raise ImportError("rasterio is required for barriers layer support")
|
|
|
|
# Paths
|
|
DEFAULT_BARRIERS_PATH = Path("/mnt/nav/worldcover/padus_barriers.tif")
|
|
DEFAULT_WILDERNESS_PATH = Path("/mnt/nav/worldcover/wilderness.tif")
|
|
PADUS_GDB_PATH = Path("/mnt/nav/padus/PADUS4_0_Geodatabase.gdb")
|
|
PADUS_LAYER = "PADUS4_0Combined_Proclamation_Marine_Fee_Designation_Easement"
|
|
|
|
# CONUS bounding box in WGS84
|
|
CONUS_BOUNDS = {
|
|
"west": -125.0,
|
|
"east": -66.0,
|
|
"south": 24.0,
|
|
"north": 50.0,
|
|
}
|
|
|
|
# Resolution in degrees (~30m at mid-latitudes)
|
|
PIXEL_SIZE = 0.0003 # ~33m
|
|
|
|
|
|
class BarrierReader:
|
|
"""Reader for PAD-US barrier raster (closed/restricted areas)."""
|
|
|
|
def __init__(self, barrier_path: Path = DEFAULT_BARRIERS_PATH):
|
|
self.barrier_path = barrier_path
|
|
self._dataset = None
|
|
|
|
def _open(self):
|
|
"""Lazy open the dataset."""
|
|
if self._dataset is None:
|
|
if not self.barrier_path.exists():
|
|
raise FileNotFoundError(
|
|
f"Barrier raster not found at {self.barrier_path}. "
|
|
f"Run build_barriers_raster() first."
|
|
)
|
|
self._dataset = rasterio.open(self.barrier_path)
|
|
return self._dataset
|
|
|
|
def get_barrier_grid(
|
|
self,
|
|
south: float,
|
|
north: float,
|
|
west: float,
|
|
east: float,
|
|
target_shape: Tuple[int, int]
|
|
) -> np.ndarray:
|
|
"""
|
|
Get barrier values for a bounding box, resampled to target shape.
|
|
|
|
Args:
|
|
south, north, west, east: Bounding box coordinates (WGS84)
|
|
target_shape: (rows, cols) to resample to (matches elevation grid)
|
|
|
|
Returns:
|
|
np.ndarray of uint8 barrier values:
|
|
255 = closed/restricted (impassable when respect_boundaries=True)
|
|
0 = public/accessible
|
|
"""
|
|
ds = self._open()
|
|
window = from_bounds(west, south, east, north, ds.transform)
|
|
barriers = ds.read(
|
|
1,
|
|
window=window,
|
|
out_shape=target_shape,
|
|
resampling=Resampling.nearest
|
|
)
|
|
return barriers
|
|
|
|
def sample_point(self, lat: float, lon: float) -> int:
|
|
"""Sample barrier value at a single point."""
|
|
ds = self._open()
|
|
row, col = ds.index(lon, lat)
|
|
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
|
|
return 0
|
|
window = rasterio.windows.Window(col, row, 1, 1)
|
|
value = ds.read(1, window=window)
|
|
return int(value[0, 0])
|
|
|
|
def close(self):
|
|
"""Close the dataset."""
|
|
if self._dataset is not None:
|
|
self._dataset.close()
|
|
self._dataset = None
|
|
|
|
|
|
class WildernessReader:
|
|
"""Reader for PAD-US wilderness raster (designated wilderness areas)."""
|
|
|
|
def __init__(self, wilderness_path: Path = DEFAULT_WILDERNESS_PATH):
|
|
self.wilderness_path = wilderness_path
|
|
self._dataset = None
|
|
|
|
def _open(self):
|
|
"""Lazy open the dataset."""
|
|
if self._dataset is None:
|
|
if not self.wilderness_path.exists():
|
|
raise FileNotFoundError(
|
|
f"Wilderness raster not found at {self.wilderness_path}. "
|
|
f"Run build_wilderness_raster() first."
|
|
)
|
|
self._dataset = rasterio.open(self.wilderness_path)
|
|
return self._dataset
|
|
|
|
def get_wilderness_grid(
|
|
self,
|
|
south: float,
|
|
north: float,
|
|
west: float,
|
|
east: float,
|
|
target_shape: Tuple[int, int]
|
|
) -> np.ndarray:
|
|
"""
|
|
Get wilderness values for a bounding box, resampled to target shape.
|
|
|
|
Args:
|
|
south, north, west, east: Bounding box coordinates (WGS84)
|
|
target_shape: (rows, cols) to resample to (matches elevation grid)
|
|
|
|
Returns:
|
|
np.ndarray of uint8 wilderness values:
|
|
255 = designated wilderness area
|
|
0 = not wilderness
|
|
"""
|
|
ds = self._open()
|
|
window = from_bounds(west, south, east, north, ds.transform)
|
|
wilderness = ds.read(
|
|
1,
|
|
window=window,
|
|
out_shape=target_shape,
|
|
resampling=Resampling.nearest
|
|
)
|
|
return wilderness
|
|
|
|
def sample_point(self, lat: float, lon: float) -> int:
|
|
"""Sample wilderness value at a single point."""
|
|
ds = self._open()
|
|
row, col = ds.index(lon, lat)
|
|
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
|
|
return 0
|
|
window = rasterio.windows.Window(col, row, 1, 1)
|
|
value = ds.read(1, window=window)
|
|
return int(value[0, 0])
|
|
|
|
def close(self):
|
|
"""Close the dataset."""
|
|
if self._dataset is not None:
|
|
self._dataset.close()
|
|
self._dataset = None
|
|
|
|
|
|
def build_barriers_raster(
|
|
output_path: Path = DEFAULT_BARRIERS_PATH,
|
|
gdb_path: Path = PADUS_GDB_PATH,
|
|
pixel_size: float = PIXEL_SIZE,
|
|
bounds: dict = CONUS_BOUNDS,
|
|
) -> Path:
|
|
"""
|
|
Build the PAD-US barriers raster from the source geodatabase.
|
|
|
|
Extracts polygons where Pub_Access = 'XA' (Closed) and rasterizes them.
|
|
"""
|
|
import shutil
|
|
|
|
if not gdb_path.exists():
|
|
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
|
|
|
|
if not shutil.which('ogr2ogr'):
|
|
raise RuntimeError("ogr2ogr not found. Install GDAL.")
|
|
if not shutil.which('gdal_rasterize'):
|
|
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Building PAD-US barriers raster...")
|
|
print(f" Source: {gdb_path}")
|
|
print(f" Output: {output_path}")
|
|
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
|
|
print(f" Bounds: {bounds}")
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
closed_gpkg = Path(tmpdir) / "closed_areas.gpkg"
|
|
|
|
print(f"\n[1/3] Extracting closed areas (Pub_Access = 'XA')...")
|
|
|
|
ogr_cmd = [
|
|
"ogr2ogr",
|
|
"-f", "GPKG",
|
|
str(closed_gpkg),
|
|
str(gdb_path),
|
|
PADUS_LAYER,
|
|
"-where", "Pub_Access = 'XA'",
|
|
"-t_srs", "EPSG:4326",
|
|
"-nlt", "MULTIPOLYGON",
|
|
"-nln", "closed_areas",
|
|
]
|
|
|
|
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"STDERR: {result.stderr}")
|
|
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
|
|
|
|
info_cmd = ["ogrinfo", "-so", str(closed_gpkg), "closed_areas"]
|
|
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
|
|
print(f" Extraction result:\n{info_result.stdout}")
|
|
|
|
print(f"\n[2/3] Creating raster grid...")
|
|
|
|
width = int((bounds['east'] - bounds['west']) / pixel_size)
|
|
height = int((bounds['north'] - bounds['south']) / pixel_size)
|
|
print(f" Grid size: {width} x {height} pixels")
|
|
|
|
print(f"\n[3/3] Rasterizing closed areas...")
|
|
|
|
rasterize_cmd = [
|
|
"gdal_rasterize",
|
|
"-burn", "255",
|
|
"-init", "0",
|
|
"-a_nodata", "0",
|
|
"-te", str(bounds['west']), str(bounds['south']),
|
|
str(bounds['east']), str(bounds['north']),
|
|
"-tr", str(pixel_size), str(pixel_size),
|
|
"-ot", "Byte",
|
|
"-co", "COMPRESS=LZW",
|
|
"-co", "TILED=YES",
|
|
"-l", "closed_areas",
|
|
str(closed_gpkg),
|
|
str(output_path),
|
|
]
|
|
|
|
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"STDERR: {result.stderr}")
|
|
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
|
|
|
|
print(f"\n[Done] Verifying output...")
|
|
with rasterio.open(output_path) as ds:
|
|
print(f" Size: {ds.width} x {ds.height}")
|
|
print(f" CRS: {ds.crs}")
|
|
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
|
|
closed_count = np.sum(sample == 255)
|
|
print(f" Sample (1000x1000): {closed_count} closed cells")
|
|
|
|
file_size = output_path.stat().st_size / (1024**2)
|
|
print(f" File size: {file_size:.1f} MB")
|
|
|
|
return output_path
|
|
|
|
|
|
def build_wilderness_raster(
|
|
output_path: Path = DEFAULT_WILDERNESS_PATH,
|
|
gdb_path: Path = PADUS_GDB_PATH,
|
|
pixel_size: float = PIXEL_SIZE,
|
|
bounds: dict = CONUS_BOUNDS,
|
|
) -> Path:
|
|
"""
|
|
Build the PAD-US wilderness raster from the source geodatabase.
|
|
|
|
Extracts polygons where Des_Tp = 'WA' (Wilderness Area) and rasterizes them.
|
|
"""
|
|
import shutil
|
|
|
|
if not gdb_path.exists():
|
|
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
|
|
|
|
if not shutil.which('ogr2ogr'):
|
|
raise RuntimeError("ogr2ogr not found. Install GDAL.")
|
|
if not shutil.which('gdal_rasterize'):
|
|
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Building PAD-US wilderness raster...")
|
|
print(f" Source: {gdb_path}")
|
|
print(f" Output: {output_path}")
|
|
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
|
|
print(f" Bounds: {bounds}")
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
wilderness_gpkg = Path(tmpdir) / "wilderness_areas.gpkg"
|
|
|
|
print(f"\n[1/3] Extracting wilderness areas (Des_Tp = 'WA')...")
|
|
|
|
ogr_cmd = [
|
|
"ogr2ogr",
|
|
"-f", "GPKG",
|
|
str(wilderness_gpkg),
|
|
str(gdb_path),
|
|
PADUS_LAYER,
|
|
"-where", "Des_Tp = 'WA'",
|
|
"-t_srs", "EPSG:4326",
|
|
"-nlt", "MULTIPOLYGON",
|
|
"-nln", "wilderness_areas",
|
|
]
|
|
|
|
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"STDERR: {result.stderr}")
|
|
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
|
|
|
|
info_cmd = ["ogrinfo", "-so", str(wilderness_gpkg), "wilderness_areas"]
|
|
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
|
|
print(f" Extraction result:\n{info_result.stdout}")
|
|
|
|
print(f"\n[2/3] Creating raster grid...")
|
|
|
|
width = int((bounds['east'] - bounds['west']) / pixel_size)
|
|
height = int((bounds['north'] - bounds['south']) / pixel_size)
|
|
print(f" Grid size: {width} x {height} pixels")
|
|
|
|
print(f"\n[3/3] Rasterizing wilderness areas...")
|
|
|
|
rasterize_cmd = [
|
|
"gdal_rasterize",
|
|
"-burn", "255",
|
|
"-init", "0",
|
|
"-a_nodata", "0",
|
|
"-te", str(bounds['west']), str(bounds['south']),
|
|
str(bounds['east']), str(bounds['north']),
|
|
"-tr", str(pixel_size), str(pixel_size),
|
|
"-ot", "Byte",
|
|
"-co", "COMPRESS=LZW",
|
|
"-co", "TILED=YES",
|
|
"-l", "wilderness_areas",
|
|
str(wilderness_gpkg),
|
|
str(output_path),
|
|
]
|
|
|
|
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"STDERR: {result.stderr}")
|
|
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
|
|
|
|
print(f"\n[Done] Verifying output...")
|
|
with rasterio.open(output_path) as ds:
|
|
print(f" Size: {ds.width} x {ds.height}")
|
|
print(f" CRS: {ds.crs}")
|
|
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
|
|
wilderness_count = np.sum(sample == 255)
|
|
print(f" Sample (1000x1000): {wilderness_count} wilderness cells")
|
|
|
|
file_size = output_path.stat().st_size / (1024**2)
|
|
print(f" File size: {file_size:.1f} MB")
|
|
|
|
return output_path
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "build":
|
|
print("=" * 60)
|
|
print("PAD-US Barriers Raster Build")
|
|
print("=" * 60)
|
|
build_barriers_raster()
|
|
|
|
elif cmd == "build-wilderness":
|
|
print("=" * 60)
|
|
print("PAD-US Wilderness Raster Build")
|
|
print("=" * 60)
|
|
build_wilderness_raster()
|
|
|
|
elif cmd == "build-all":
|
|
print("=" * 60)
|
|
print("Building all PAD-US rasters")
|
|
print("=" * 60)
|
|
build_barriers_raster()
|
|
print("\n")
|
|
build_wilderness_raster()
|
|
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
print("Usage:")
|
|
print(" python barriers.py build # Build barriers raster")
|
|
print(" python barriers.py build-wilderness # Build wilderness raster")
|
|
print(" python barriers.py build-all # Build both rasters")
|
|
sys.exit(1)
|
|
|
|
else:
|
|
# Test readers
|
|
print("Testing BarrierReader...")
|
|
|
|
if not DEFAULT_BARRIERS_PATH.exists():
|
|
print(f"Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
|
|
print(f"Run: python barriers.py build")
|
|
sys.exit(1)
|
|
|
|
reader = BarrierReader()
|
|
barriers = reader.get_barrier_grid(
|
|
south=42.2, north=42.6, west=-114.8, east=-113.8,
|
|
target_shape=(400, 1000)
|
|
)
|
|
print(f"\nBarrier grid shape: {barriers.shape}")
|
|
print(f"Unique values: {np.unique(barriers)}")
|
|
closed_cells = np.sum(barriers == 255)
|
|
print(f"Closed cells: {closed_cells} ({100*closed_cells/barriers.size:.2f}%)")
|
|
reader.close()
|
|
|
|
print("\nTesting WildernessReader...")
|
|
|
|
if not DEFAULT_WILDERNESS_PATH.exists():
|
|
print(f"Wilderness raster not found at {DEFAULT_WILDERNESS_PATH}")
|
|
print(f"Run: python barriers.py build-wilderness")
|
|
else:
|
|
wilderness_reader = WildernessReader()
|
|
wilderness = wilderness_reader.get_wilderness_grid(
|
|
south=42.2, north=42.6, west=-114.8, east=-113.8,
|
|
target_shape=(400, 1000)
|
|
)
|
|
print(f"Wilderness grid shape: {wilderness.shape}")
|
|
print(f"Unique values: {np.unique(wilderness)}")
|
|
wilderness_cells = np.sum(wilderness == 255)
|
|
print(f"Wilderness cells: {wilderness_cells} ({100*wilderness_cells/wilderness.size:.2f}%)")
|
|
wilderness_reader.close()
|
|
|
|
print("\nDone.")
|