feat(offroute): Phase O2c — PAD-US barriers with three-mode boundary respect

- Add barriers.py: PAD-US raster reader + build_barriers_raster() function
- Rasterize PAD-US Pub_Access=XA (Closed) polygons to CONUS GeoTIFF
- Modify cost.py: boundary_mode parameter (strict/pragmatic/emergency)
  - strict: private land = impassable (np.inf)
  - pragmatic: private land = 5x friction penalty (default)
  - emergency: private land barriers ignored
- Modify prototype.py: three-way comparison output
- Output: padus_barriers.tif at /mnt/nav/worldcover/ (144MB, ~33m resolution)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-05-08 06:53:11 +00:00
commit e0eedcedfd
3 changed files with 711 additions and 393 deletions

266
lib/offroute/barriers.py Normal file
View file

@ -0,0 +1,266 @@
"""
PAD-US barrier layer for OFFROUTE.
Provides access to the PAD-US land ownership raster for routing decisions.
Cells with value 255 represent closed/restricted areas (Pub_Access = XA).
Build function rasterizes PAD-US geodatabase to aligned GeoTIFF.
Runtime functions read the raster and resample to match elevation grids.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
import subprocess
import tempfile
import os
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for barriers layer support")
# Paths
DEFAULT_BARRIERS_PATH = Path("/mnt/nav/worldcover/padus_barriers.tif")
PADUS_GDB_PATH = Path("/mnt/nav/padus/PADUS4_0_Geodatabase.gdb")
PADUS_LAYER = "PADUS4_0Combined_Proclamation_Marine_Fee_Designation_Easement"
# CONUS bounding box in WGS84
CONUS_BOUNDS = {
"west": -125.0,
"east": -66.0,
"south": 24.0,
"north": 50.0,
}
# Resolution in degrees (~30m at mid-latitudes)
PIXEL_SIZE = 0.0003 # ~33m
class BarrierReader:
"""Reader for PAD-US barrier raster."""
def __init__(self, barrier_path: Path = DEFAULT_BARRIERS_PATH):
self.barrier_path = barrier_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.barrier_path.exists():
raise FileNotFoundError(
f"Barrier raster not found at {self.barrier_path}. "
f"Run build_barriers_raster() first."
)
self._dataset = rasterio.open(self.barrier_path)
return self._dataset
def get_barrier_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get barrier values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 barrier values:
255 = closed/restricted (impassable when respect_boundaries=True)
0 = public/accessible
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
barriers = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return barriers
def sample_point(self, lat: float, lon: float) -> int:
"""Sample barrier value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = accessible
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def build_barriers_raster(
output_path: Path = DEFAULT_BARRIERS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US barriers raster from the source geodatabase.
Extracts polygons where Pub_Access = 'XA' (Closed) and rasterizes them.
Args:
output_path: Output GeoTIFF path
gdb_path: Path to PAD-US geodatabase
pixel_size: Pixel size in degrees
bounds: CONUS bounding box
Returns:
Path to the created raster
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
# Check for required tools
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US barriers raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
# Step 1: Extract closed areas and reproject to WGS84
closed_gpkg = Path(tmpdir) / "closed_areas.gpkg"
print(f"\n[1/3] Extracting closed areas (Pub_Access = 'XA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(closed_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Pub_Access = 'XA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "closed_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
# Check feature count
info_cmd = ["ogrinfo", "-so", str(closed_gpkg), "closed_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
# Step 2: Create empty raster
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f" Memory estimate: {width * height / 1e6:.1f} MB")
# Step 3: Rasterize
print(f"\n[3/3] Rasterizing closed areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0", # No nodata - 0 means accessible
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "closed_areas",
str(closed_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
# Verify output
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
print(f" Bounds: {ds.bounds}")
# Sample a few tiles to check
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
closed_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {closed_count} closed cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "build":
# Build the raster
print("="*60)
print("PAD-US Barriers Raster Build")
print("="*60)
build_barriers_raster()
else:
# Test the reader
print("Testing BarrierReader...")
if not DEFAULT_BARRIERS_PATH.exists():
print(f"Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run: python barriers.py build")
sys.exit(1)
reader = BarrierReader()
# Test grid read for Idaho area
barriers = reader.get_barrier_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"\nGrid test shape: {barriers.shape}")
print(f"Unique values: {np.unique(barriers)}")
closed_cells = np.sum(barriers == 255)
print(f"Closed cells: {closed_cells} ({100*closed_cells/barriers.size:.2f}%)")
reader.close()
print("\nBarrierReader test complete.")

View file

@ -3,11 +3,11 @@ Tobler off-path hiking cost function for OFFROUTE.
Computes travel time cost based on terrain slope using Tobler's
hiking function with off-trail penalty. Optionally applies friction
multipliers from land cover data.
multipliers from land cover data and barrier grids from PAD-US.
"""
import math
import numpy as np
from typing import Optional
from typing import Optional, Literal
# Maximum passable slope in degrees
MAX_SLOPE_DEG = 40.0
@ -16,6 +16,9 @@ MAX_SLOPE_DEG = 40.0
TOBLER_BASE_SPEED = 6.0
TOBLER_OFF_TRAIL_MULT = 0.6
# Pragmatic mode friction multiplier for private land
PRAGMATIC_BARRIER_MULTIPLIER = 5.0
def tobler_speed(grade: float) -> float:
"""
@ -33,7 +36,9 @@ def compute_cost_grid(
cell_size_m: float,
cell_size_lat_m: float = None,
cell_size_lon_m: float = None,
friction: Optional[np.ndarray] = None
friction: Optional[np.ndarray] = None,
barriers: Optional[np.ndarray] = None,
boundary_mode: Literal["strict", "pragmatic", "emergency"] = "pragmatic"
) -> np.ndarray:
"""
Compute isotropic travel cost grid from elevation data.
@ -50,11 +55,23 @@ def compute_cost_grid(
Values should be float (1.0 = baseline, 2.0 = 2x slower).
np.inf marks impassable cells.
If None, no friction is applied (backward compatible).
barriers: Optional 2D array of barrier values (uint8).
255 = closed/restricted area (from PAD-US Pub_Access = XA).
0 = accessible.
If None, no barriers are applied.
boundary_mode: How to handle private/restricted land barriers:
"strict" - cells with barrier=255 become impassable (np.inf)
"pragmatic" - cells with barrier=255 get 5.0x friction penalty
"emergency" - barriers are ignored entirely
Default: "pragmatic"
Returns:
2D array of travel cost in seconds per cell.
np.inf for impassable cells.
"""
if boundary_mode not in ("strict", "pragmatic", "emergency"):
raise ValueError(f"boundary_mode must be 'strict', 'pragmatic', or 'emergency', got '{boundary_mode}'")
if cell_size_lat_m is None:
cell_size_lat_m = cell_size_m
if cell_size_lon_m is None:
@ -103,6 +120,22 @@ def compute_cost_grid(
# Multiply cost by friction (inf * anything = inf, which is correct)
cost = cost * friction
# Apply barriers based on boundary_mode
if barriers is not None and boundary_mode != "emergency":
if barriers.shape != elevation.shape:
raise ValueError(
f"Barriers shape {barriers.shape} does not match elevation shape {elevation.shape}"
)
barrier_mask = barriers == 255
if boundary_mode == "strict":
# Mark closed/restricted areas as impassable
cost[barrier_mask] = np.inf
elif boundary_mode == "pragmatic":
# Apply friction penalty to closed/restricted areas
cost[barrier_mask] = cost[barrier_mask] * PRAGMATIC_BARRIER_MULTIPLIER
return cost
@ -112,7 +145,7 @@ if __name__ == "__main__":
speed = tobler_speed(grade)
print(f" Grade {grade:+.2f}: {speed:.2f} km/h")
print("\nTesting cost grid computation (no friction):")
print("\nTesting cost grid computation (no friction, no barriers):")
elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10
cost = compute_cost_grid(elev, cell_size_m=30.0)
print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m")
@ -130,3 +163,16 @@ if __name__ == "__main__":
print(f" Base cost (flat, 30m cell): {30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05)):.1f} s")
print(f" With 1.5x friction: {cost[0, 0]:.1f} s")
print(f" Impassable cells: {np.sum(np.isinf(cost))}")
print("\nTesting cost grid with barriers (three modes):")
elev = np.ones((10, 10), dtype=np.float32) * 1000 # flat terrain
barriers = np.zeros((10, 10), dtype=np.uint8)
barriers[3:7, 3:7] = 255 # 4x4 closed area in center
base_cost = 30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05))
for mode in ["strict", "pragmatic", "emergency"]:
cost = compute_cost_grid(elev, cell_size_m=30.0, barriers=barriers, boundary_mode=mode)
impassable = np.sum(np.isinf(cost))
barrier_cost = cost[5, 5] if not np.isinf(cost[5, 5]) else "inf"
print(f" {mode:10s}: {impassable} impassable, barrier cell cost = {barrier_cost}")

View file

@ -1,18 +1,16 @@
#!/usr/bin/env python3
"""
OFFROUTE Phase O2b Prototype
OFFROUTE Phase O2c Prototype
Validates the PMTiles decoder, Tobler cost function, WorldCover friction
integration, and MCP pathfinder on a real Idaho bounding box.
Validates the PMTiles decoder, Tobler cost function, WorldCover friction,
PAD-US barriers integration, and MCP pathfinder on a real Idaho bounding box.
Now includes friction layer to avoid water bodies like Murtaugh Lake.
Runs THREE pathfinding passes with different boundary modes:
1. boundary_mode="strict" - private land is impassable
2. boundary_mode="pragmatic" - private land has 5x friction penalty
3. boundary_mode="emergency" - private land barriers ignored
Test bbox (four Idaho towns as corners):
SW: Rogerson, ID (~42.21, -114.60)
NW: Buhl, ID (~42.60, -114.76)
NE: Burley, ID (~42.54, -113.79)
SE: Oakley, ID (~42.24, -113.88)
Approximate bbox: south=42.21, north=42.60, west=-114.76, east=-113.79
Outputs comparison showing impact of boundary mode on routing.
"""
import json
import time
@ -28,8 +26,9 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from lib.offroute.dem import DEMReader
from lib.offroute.cost import compute_cost_grid
from lib.offroute.friction import FrictionReader, friction_to_multiplier
from lib.offroute.barriers import BarrierReader, DEFAULT_BARRIERS_PATH
# Test bounding box
# Test bounding box - Idaho area known to have mixed public/private land
BBOX = {
"south": 42.21,
"north": 42.60,
@ -38,26 +37,25 @@ BBOX = {
}
# Start point: wilderness area south of Twin Falls
# (in the Sawtooth National Forest foothills)
START_LAT = 42.35
START_LON = -114.50
START_LAT = 42.36
START_LON = -114.55
# End point: near Burley, ID (on road network)
END_LAT = 42.52
END_LON = -113.85
# Murtaugh Lake - actual water extent from WorldCover
LAKE_BOUNDS = {
"south": 42.44,
"north": 42.50,
"west": -114.20,
"east": -114.10,
}
LAKE_CENTER = (42.465, -114.155) # Verified water in WorldCover
END_LAT = 42.55
END_LON = -114.25
# Output files
OUTPUT_PATH_O1 = Path("/opt/recon/data/offroute-test.geojson")
OUTPUT_PATH_FRICTION = Path("/opt/recon/data/offroute-test-friction.geojson")
OUTPUT_PATHS = {
"strict": Path("/opt/recon/data/offroute-test-strict.geojson"),
"pragmatic": Path("/opt/recon/data/offroute-test-pragmatic.geojson"),
"emergency": Path("/opt/recon/data/offroute-test-emergency.geojson"),
}
# Old files to delete
OLD_FILES = [
Path("/opt/recon/data/offroute-test-barriers-on.geojson"),
Path("/opt/recon/data/offroute-test-barriers-off.geojson"),
]
# Memory limit in GB
MEMORY_LIMIT_GB = 12
@ -77,40 +75,139 @@ def check_memory_usage():
return 0
def path_crosses_lake(coordinates, lake_bounds):
"""Check if any path coordinates fall within the lake bounding box."""
for lon, lat in coordinates:
if (lake_bounds["south"] <= lat <= lake_bounds["north"] and
lake_bounds["west"] <= lon <= lake_bounds["east"]):
return True, (lat, lon)
return False, None
def run_pathfinder(
elevation: np.ndarray,
meta: dict,
friction_mult: np.ndarray,
barriers: np.ndarray,
boundary_mode: str,
start_row: int,
start_col: int,
end_row: int,
end_col: int,
dem_reader: DEMReader,
) -> dict:
"""
Run the MCP pathfinder with given parameters.
Returns dict with path info and stats.
"""
# Compute cost grid
cost = compute_cost_grid(
elevation,
cell_size_m=meta["cell_size_m"],
friction=friction_mult,
barriers=barriers,
boundary_mode=boundary_mode,
)
# Count impassable cells
impassable_count = np.sum(np.isinf(cost))
barrier_count = np.sum(barriers == 255) if barriers is not None else 0
# Run MCP
mcp = MCP_Geometric(cost, fully_connected=True)
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
end_cost = cumulative_costs[end_row, end_col]
if np.isinf(end_cost):
return {
"success": False,
"reason": "No path found (blocked by impassable terrain)",
"impassable_cells": int(impassable_count),
"barrier_cells": int(barrier_count),
}
# Traceback path
path_indices = mcp.traceback((end_row, end_col))
# Convert to coordinates
coordinates = []
elevations = []
barrier_values = []
for row, col in path_indices:
lat, lon = dem_reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
barr = barriers[row, col] if barriers is not None else 0
coordinates.append([lon, lat])
elevations.append(elev)
barrier_values.append(barr)
# Compute distance
total_distance_m = 0
for i in range(1, len(coordinates)):
lon1, lat1 = coordinates[i-1]
lon2, lat2 = coordinates[i]
R = 6371000
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
total_distance_m += R * c
# Elevation stats
elev_arr = np.array(elevations)
elev_diff = np.diff(elev_arr)
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Barrier crossings on path
barr_arr = np.array(barrier_values)
barrier_crossings = np.sum(barr_arr == 255)
return {
"success": True,
"coordinates": coordinates,
"total_time_seconds": float(end_cost),
"total_time_minutes": float(end_cost / 60),
"total_distance_m": float(total_distance_m),
"total_distance_km": float(total_distance_m / 1000),
"elevation_gain_m": float(elev_gain),
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"cell_count": len(path_indices),
"impassable_cells": int(impassable_count),
"barrier_cells": int(barrier_count),
"barrier_crossings": int(barrier_crossings),
}
def main():
print("=" * 60)
print("OFFROUTE Phase O2b Prototype (with Friction)")
print("=" * 60)
print("=" * 80)
print("OFFROUTE Phase O2c Prototype (Three-Mode Boundary Respect)")
print("=" * 80)
t0 = time.time()
# Delete old output files
for old_file in OLD_FILES:
if old_file.exists():
old_file.unlink()
print(f"Deleted old file: {old_file}")
# Check if barrier raster exists
if not DEFAULT_BARRIERS_PATH.exists():
print(f"\nERROR: Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run first: python /opt/recon/lib/offroute/barriers.py build")
sys.exit(1)
# Step 1: Load elevation data
print(f"\n[1] Loading DEM for bbox: {BBOX}")
dem_reader = DEMReader()
t1 = time.time()
elevation, meta = dem_reader.get_elevation_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
)
t2 = time.time()
print(f" Elevation grid shape: {elevation.shape}")
print(f" Cell count: {elevation.size:,}")
print(f" Cell size: {meta['cell_size_m']:.1f} m")
print(f" Elevation range: {np.nanmin(elevation):.0f} - {np.nanmax(elevation):.0f} m")
print(f" Load time: {t2 - t1:.1f}s")
mem = check_memory_usage()
if mem > 0:
@ -118,19 +215,8 @@ def main():
# Step 2: Load friction data
print(f"\n[2] Loading WorldCover friction layer...")
t2a = time.time()
friction_reader = FrictionReader()
# Validate lake is marked as impassable
lake_friction = friction_reader.sample_point(LAKE_CENTER[0], LAKE_CENTER[1])
print(f" Murtaugh Lake center ({LAKE_CENTER[0]}, {LAKE_CENTER[1]}): friction = {lake_friction}")
if lake_friction != 255:
print(f" WARNING: Lake not marked as water (expected 255, got {lake_friction})")
else:
print(f" Lake correctly marked as impassable (255)")
# Load friction grid matching elevation shape
friction_raw = friction_reader.get_friction_grid(
south=BBOX["south"],
north=BBOX["north"],
@ -138,36 +224,30 @@ def main():
east=BBOX["east"],
target_shape=elevation.shape
)
t2b = time.time()
# Convert to multipliers
friction_mult = friction_to_multiplier(friction_raw)
impassable_count = np.sum(np.isinf(friction_mult))
print(f" Friction grid shape: {friction_raw.shape}")
print(f" Unique friction values: {np.unique(friction_raw[friction_raw > 0])}")
print(f" Impassable cells (water/nodata): {impassable_count:,} ({100*impassable_count/friction_raw.size:.1f}%)")
print(f" Load time: {t2b - t2a:.1f}s")
print(f" Water/impassable cells: {np.sum(np.isinf(friction_mult)):,}")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 3: Load barrier data
print(f"\n[3] Loading PAD-US barrier layer...")
barrier_reader = BarrierReader()
# Step 3: Compute cost grid with friction
print(f"\n[3] Computing Tobler cost grid with friction...")
t3 = time.time()
cost = compute_cost_grid(
elevation,
cell_size_m=meta["cell_size_m"],
friction=friction_mult
barriers = barrier_reader.get_barrier_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
t4 = time.time()
finite_cost = cost[~np.isinf(cost)]
total_impassable = np.sum(np.isinf(cost))
print(f" Cost range: {finite_cost.min():.1f} - {finite_cost.max():.1f} s/cell")
print(f" Total impassable cells: {total_impassable:,} ({100*total_impassable/cost.size:.1f}%)")
print(f" Compute time: {t4 - t3:.1f}s")
closed_cells = np.sum(barriers == 255)
print(f" Barrier grid shape: {barriers.shape}")
print(f" Closed/restricted cells: {closed_cells:,} ({100*closed_cells/barriers.size:.2f}%)")
if closed_cells == 0:
print("\n WARNING: No closed/restricted areas in this bbox.")
print(" The test may not show meaningful differences between modes.")
mem = check_memory_usage()
if mem > 0:
@ -190,196 +270,122 @@ def main():
print(f"ERROR: End point outside grid bounds")
sys.exit(1)
start_elev = elevation[start_row, start_col]
end_elev = elevation[end_row, end_col]
print(f" Start elevation: {start_elev:.0f} m")
print(f" End elevation: {end_elev:.0f} m")
# Step 5: Run pathfinder THREE times
results = {}
modes = ["strict", "pragmatic", "emergency"]
# Step 5: Run MCP pathfinder
print(f"\n[5] Running MCP_Geometric pathfinder...")
t5 = time.time()
for i, mode in enumerate(modes, start=5):
print(f"\n[{i}] Running pathfinder (boundary_mode=\"{mode}\")...")
t_start = time.time()
results[mode] = run_pathfinder(
elevation, meta, friction_mult, barriers,
boundary_mode=mode,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t_end = time.time()
print(f" Completed in {t_end - t_start:.1f}s")
mcp = MCP_Geometric(cost, fully_connected=True)
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
t6 = time.time()
# Step 6: Save GeoJSON outputs
print(f"\n[8] Saving GeoJSON outputs...")
print(f" Dijkstra completed in {t6 - t5:.1f}s")
OUTPUT_PATHS["strict"].parent.mkdir(parents=True, exist_ok=True)
end_cost = cumulative_costs[end_row, end_col]
print(f" Total cost to endpoint: {end_cost:.0f} seconds ({end_cost/60:.1f} minutes)")
if np.isinf(end_cost):
print("ERROR: No path found to endpoint (blocked by impassable terrain)")
sys.exit(1)
t7 = time.time()
path_indices = mcp.traceback((end_row, end_col))
t8 = time.time()
print(f" Traceback completed in {t8 - t7:.2f}s")
print(f" Path length: {len(path_indices)} cells")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 6: Convert path to coordinates and compute stats
print(f"\n[6] Converting path to GeoJSON...")
coordinates = []
elevations = []
friction_values = []
for row, col in path_indices:
lat, lon = dem_reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
fric = friction_raw[row, col]
coordinates.append([lon, lat])
elevations.append(elev)
friction_values.append(fric)
# Compute path distance
total_distance_m = 0
for i in range(1, len(coordinates)):
lon1, lat1 = coordinates[i-1]
lon2, lat2 = coordinates[i]
R = 6371000
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
total_distance_m += R * c
# Compute elevation gain/loss
elev_arr = np.array(elevations)
elev_diff = np.diff(elev_arr)
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Friction stats along path
fric_arr = np.array(friction_values)
valid_fric = fric_arr[(fric_arr > 0) & (fric_arr < 255)]
# Build GeoJSON
for mode, result in results.items():
output_path = OUTPUT_PATHS[mode]
if result["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_prototype_friction",
"phase": "O2b",
"type": f"offroute_{mode}",
"phase": "O2c",
"boundary_mode": mode,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
"total_time_seconds": float(end_cost),
"total_time_minutes": float(end_cost / 60),
"total_distance_m": float(total_distance_m),
"total_distance_km": float(total_distance_m / 1000),
"elevation_gain_m": float(elev_gain),
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"friction_min": int(valid_fric.min()) if len(valid_fric) > 0 else 0,
"friction_max": int(valid_fric.max()) if len(valid_fric) > 0 else 0,
"friction_mean": float(valid_fric.mean()) if len(valid_fric) > 0 else 0,
"cell_count": len(path_indices),
"cell_size_m": meta["cell_size_m"],
**{k: v for k, v in result.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": coordinates,
"coordinates": result["coordinates"],
}
}
# Write output
OUTPUT_PATH_FRICTION.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_PATH_FRICTION, "w") as f:
with open(output_path, "w") as f:
json.dump(geojson, f, indent=2)
t_end = time.time()
# Final report
print(f"\n" + "=" * 60)
print("RESULTS (Phase O2b with Friction)")
print("=" * 60)
print(f"Start: ({START_LAT:.4f}, {START_LON:.4f})")
print(f"End: ({END_LAT:.4f}, {END_LON:.4f})")
print(f"Total effort: {end_cost/60:.1f} minutes ({end_cost/3600:.2f} hours)")
print(f"Distance: {total_distance_m/1000:.2f} km")
print(f"Elevation gain: {elev_gain:.0f} m")
print(f"Elevation loss: {elev_loss:.0f} m")
print(f"Elevation range: {np.min(elev_arr):.0f} - {np.max(elev_arr):.0f} m")
if len(valid_fric) > 0:
print(f"Friction (path): min={valid_fric.min()}, max={valid_fric.max()}, mean={valid_fric.mean():.1f}")
print(f"Path cells: {len(path_indices):,}")
print(f"Wall time: {t_end - t0:.1f}s")
print(f"\nOutput saved to: {OUTPUT_PATH_FRICTION}")
# Validation
print(f"\n" + "-" * 60)
print("VALIDATION")
print("-" * 60)
# Check coordinates are within bbox
lons = [c[0] for c in coordinates]
lats = [c[1] for c in coordinates]
lon_ok = BBOX["west"] <= min(lons) and max(lons) <= BBOX["east"]
lat_ok = BBOX["south"] <= min(lats) and max(lats) <= BBOX["north"]
print(f"Coordinates within bbox: {'PASS' if lon_ok and lat_ok else 'FAIL'}")
# Check path is not trivial
is_nontrivial = len(path_indices) > 10 and total_distance_m > 1000
print(f"Path is non-trivial: {'PASS' if is_nontrivial else 'FAIL'}")
# Check sinuosity
straight_line_dist = np.sqrt(
(coordinates[-1][0] - coordinates[0][0])**2 +
(coordinates[-1][1] - coordinates[0][1])**2
) * 111000
sinuosity = total_distance_m / max(straight_line_dist, 1)
print(f"Sinuosity: {sinuosity:.2f} (>1.0 means path curves around obstacles)")
# CRITICAL: Check no water cells (friction=255) on path
# This is the authoritative test - friction layer prevents water crossings
print(f"\n--- Water Avoidance Check ---")
water_on_path = np.sum(fric_arr == 255)
if water_on_path > 0:
print(f"FAIL: Path crosses {water_on_path} water cells (friction=255)")
sys.exit(1)
print(f" Saved: {output_path}")
else:
print(f"PASS: No water cells (friction=255) on path")
print(f" SKIPPED ({mode}): {result['reason']}")
# Informational: Check if path goes through lake bounding box
# Path may go through land cells within the bbox, which is fine
print(f"\n--- Lake Bounding Box Check (informational) ---")
print(f"Murtaugh Lake bounds: {LAKE_BOUNDS}")
crosses_lake, crossing_point = path_crosses_lake(coordinates, LAKE_BOUNDS)
if crosses_lake:
print(f"INFO: Path passes through lake bbox at {crossing_point}")
print(f" (This is OK if friction check passed - path uses land cells)")
t_total = time.time()
# Final report - three-way comparison
print(f"\n" + "=" * 80)
print("THREE-WAY COMPARISON")
print("=" * 80)
# Check how many succeeded
success_count = sum(1 for r in results.values() if r["success"])
if success_count == 3:
print(f"{'Metric':<22} {'STRICT':<18} {'PRAGMATIC':<18} {'EMERGENCY':<18}")
print("-" * 80)
metrics = [
("Distance (km)", "total_distance_km", ".2f"),
("Effort time (min)", "total_time_minutes", ".1f"),
("Cell count", "cell_count", "d"),
("Elevation gain (m)", "elevation_gain_m", ".0f"),
("Elevation loss (m)", "elevation_loss_m", ".0f"),
("Barrier crossings", "barrier_crossings", "d"),
("Impassable cells", "impassable_cells", ",d"),
]
for label, key, fmt in metrics:
vals = [results[m][key] for m in modes]
print(f"{label:<22} {vals[0]:<18{fmt}} {vals[1]:<18{fmt}} {vals[2]:<18{fmt}}")
# Analysis
print(f"\n" + "-" * 80)
print("ANALYSIS")
print("-" * 80)
strict_crossings = results["strict"]["barrier_crossings"]
pragmatic_crossings = results["pragmatic"]["barrier_crossings"]
emergency_crossings = results["emergency"]["barrier_crossings"]
print(f"Barrier crossings: strict={strict_crossings}, pragmatic={pragmatic_crossings}, emergency={emergency_crossings}")
if strict_crossings == 0 and pragmatic_crossings == 0 and emergency_crossings == 0:
print("No path crosses private land - terrain naturally avoids barriers.")
else:
print(f"PASS: Path does not enter lake bounding box")
if emergency_crossings > pragmatic_crossings:
print(f"Pragmatic mode reduces barrier crossings vs emergency: {emergency_crossings} -> {pragmatic_crossings}")
if pragmatic_crossings > 0 and strict_crossings == 0:
print(f"Strict mode completely avoids private land (pragmatic crosses {pragmatic_crossings} cells)")
# Compare with Phase O1 if available
print(f"\n" + "-" * 60)
print("COMPARISON: Phase O1 vs O2b")
print("-" * 60)
# Time/distance comparison
if results["strict"]["total_time_minutes"] > results["emergency"]["total_time_minutes"]:
time_penalty = results["strict"]["total_time_minutes"] - results["emergency"]["total_time_minutes"]
print(f"Time cost of strict boundary respect: +{time_penalty:.1f} min")
if OUTPUT_PATH_O1.exists():
with open(OUTPUT_PATH_O1) as f:
o1_data = json.load(f)
o1_props = o1_data["properties"]
print(f"{'Metric':<20} {'O1 (no friction)':<20} {'O2b (with friction)':<20}")
print("-" * 60)
print(f"{'Distance (km)':<20} {o1_props['total_distance_km']:<20.2f} {total_distance_m/1000:<20.2f}")
print(f"{'Effort (min)':<20} {o1_props['total_time_minutes']:<20.1f} {end_cost/60:<20.1f}")
print(f"{'Cell count':<20} {o1_props['cell_count']:<20} {len(path_indices):<20}")
print(f"{'Elev gain (m)':<20} {o1_props['elevation_gain_m']:<20.0f} {elev_gain:<20.0f}")
else:
print(f"Phase O1 output not found at {OUTPUT_PATH_O1}")
print(f"Run the O1 prototype first to enable comparison.")
print(f"Only {success_count}/3 modes found a path:")
for mode, result in results.items():
if result["success"]:
print(f" {mode}: {result['total_distance_km']:.2f} km, {result['total_time_minutes']:.1f} min")
else:
print(f" {mode}: FAILED - {result.get('reason', 'unknown')}")
print(f"\n" + "-" * 80)
print(f"Total wall time: {t_total - t0:.1f}s")
print(f"Closed cells in bbox: {closed_cells:,}")
# Cleanup
dem_reader.close()
friction_reader.close()
print("\nPrototype completed successfully.")
barrier_reader.close()
print("\nPrototype completed.")
if __name__ == "__main__":