feat(offroute): Phase O3a — trail burn-in, pathfinder seeks trail corridors

Trail friction REPLACES land cover friction where trails exist:
- Road (value 5): 0.1× friction
- Track (value 15): 0.3× friction
- Foot trail (value 25): 0.5× friction

TrailReader loads /mnt/nav/worldcover/trails.tif rasterized from OSM highways.

Validation shows trail-seeking behavior:
- On-trail travel: 17.3% → 98.7%
- Effort time: 1047 min → 155 min (-85.2%)
- Path travels farther but stays on roads for speed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt 2026-05-08 07:26:25 +00:00
commit 3293cb4238
3 changed files with 392 additions and 160 deletions

View file

@ -3,7 +3,7 @@ Tobler off-path hiking cost function for OFFROUTE.
Computes travel time cost based on terrain slope using Tobler's
hiking function with off-trail penalty. Optionally applies friction
multipliers from land cover data and barrier grids from PAD-US.
multipliers from land cover data, trail corridors, and barrier grids.
"""
import math
import numpy as np
@ -19,6 +19,14 @@ TOBLER_OFF_TRAIL_MULT = 0.6
# Pragmatic mode friction multiplier for private land
PRAGMATIC_BARRIER_MULTIPLIER = 5.0
# Trail value to friction multiplier mapping
# Trail friction REPLACES land cover friction (a road through forest is still easy)
TRAIL_FRICTION_MAP = {
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
}
def tobler_speed(grade: float) -> float:
"""
@ -37,6 +45,7 @@ def compute_cost_grid(
cell_size_lat_m: float = None,
cell_size_lon_m: float = None,
friction: Optional[np.ndarray] = None,
trails: Optional[np.ndarray] = None,
barriers: Optional[np.ndarray] = None,
boundary_mode: Literal["strict", "pragmatic", "emergency"] = "pragmatic"
) -> np.ndarray:
@ -51,10 +60,17 @@ def compute_cost_grid(
cell_size_m: Average cell size in meters
cell_size_lat_m: Cell size in latitude direction (optional)
cell_size_lon_m: Cell size in longitude direction (optional)
friction: Optional 2D array of friction multipliers.
friction: Optional 2D array of friction multipliers (WorldCover).
Values should be float (1.0 = baseline, 2.0 = 2x slower).
np.inf marks impassable cells.
If None, no friction is applied (backward compatible).
trails: Optional 2D array of trail values (uint8).
0 = no trail (use friction)
5 = road (0.1× friction, replaces WorldCover)
15 = track (0.3× friction, replaces WorldCover)
25 = foot trail (0.5× friction, replaces WorldCover)
Trail friction REPLACES land cover friction where trails exist.
If None, no trail burn-in is applied.
barriers: Optional 2D array of barrier values (uint8).
255 = closed/restricted area (from PAD-US Pub_Access = XA).
0 = accessible.
@ -111,14 +127,30 @@ def compute_cost_grid(
# Handle NaN elevations (no data)
cost[np.isnan(elevation)] = np.inf
# Apply friction multipliers if provided
# Build effective friction array
# Start with WorldCover friction if provided, else 1.0
if friction is not None:
if friction.shape != elevation.shape:
raise ValueError(
f"Friction shape {friction.shape} does not match elevation shape {elevation.shape}"
)
# Multiply cost by friction (inf * anything = inf, which is correct)
cost = cost * friction
effective_friction = friction.copy()
else:
effective_friction = np.ones(elevation.shape, dtype=np.float32)
# Apply trail burn-in: trails REPLACE land cover friction
if trails is not None:
if trails.shape != elevation.shape:
raise ValueError(
f"Trails shape {trails.shape} does not match elevation shape {elevation.shape}"
)
# Replace friction where trails exist
for trail_value, trail_friction in TRAIL_FRICTION_MAP.items():
trail_mask = trails == trail_value
effective_friction[trail_mask] = trail_friction
# Apply friction to cost
cost = cost * effective_friction
# Apply barriers based on boundary_mode
if barriers is not None and boundary_mode != "emergency":
@ -145,7 +177,7 @@ if __name__ == "__main__":
speed = tobler_speed(grade)
print(f" Grade {grade:+.2f}: {speed:.2f} km/h")
print("\nTesting cost grid computation (no friction, no barriers):")
print("\nTesting cost grid computation (no friction, no trails):")
elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10
cost = compute_cost_grid(elev, cell_size_m=30.0)
print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m")
@ -155,21 +187,25 @@ if __name__ == "__main__":
else:
print(f" All cells impassable (test data too steep)")
print("\nTesting cost grid with friction:")
print("\nTesting cost grid with friction and trails:")
elev = np.ones((10, 10), dtype=np.float32) * 1000 # flat terrain
friction = np.ones((10, 10), dtype=np.float32) * 1.5 # 1.5x friction
friction[5, 5] = np.inf # one impassable cell
cost = compute_cost_grid(elev, cell_size_m=30.0, friction=friction)
print(f" Base cost (flat, 30m cell): {30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05)):.1f} s")
print(f" With 1.5x friction: {cost[0, 0]:.1f} s")
print(f" Impassable cells: {np.sum(np.isinf(cost))}")
friction = np.ones((10, 10), dtype=np.float32) * 2.0 # 2.0x friction (forest)
trails = np.zeros((10, 10), dtype=np.uint8)
trails[5, :] = 5 # road across middle row
print("\nTesting cost grid with barriers (three modes):")
elev = np.ones((10, 10), dtype=np.float32) * 1000 # flat terrain
barriers = np.zeros((10, 10), dtype=np.uint8)
barriers[3:7, 3:7] = 255 # 4x4 closed area in center
cost_no_trail = compute_cost_grid(elev, cell_size_m=30.0, friction=friction)
cost_with_trail = compute_cost_grid(elev, cell_size_m=30.0, friction=friction, trails=trails)
base_cost = 30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05))
print(f" Base cost (flat, 30m cell): {base_cost:.1f} s")
print(f" Forest cell (2.0x friction): {cost_no_trail[0, 0]:.1f} s")
print(f" Road cell (0.1x friction, replaces forest): {cost_with_trail[5, 0]:.1f} s")
print(f" Road friction advantage: {cost_no_trail[0, 0] / cost_with_trail[5, 0]:.1f}x faster")
print("\nTesting cost grid with barriers (three modes):")
elev = np.ones((10, 10), dtype=np.float32) * 1000
barriers = np.zeros((10, 10), dtype=np.uint8)
barriers[3:7, 3:7] = 255
for mode in ["strict", "pragmatic", "emergency"]:
cost = compute_cost_grid(elev, cell_size_m=30.0, barriers=barriers, boundary_mode=mode)

View file

@ -1,16 +1,12 @@
#!/usr/bin/env python3
"""
OFFROUTE Phase O2c Prototype
OFFROUTE Phase O3a Prototype
Validates the PMTiles decoder, Tobler cost function, WorldCover friction,
PAD-US barriers integration, and MCP pathfinder on a real Idaho bounding box.
Validates trail burn-in integration with the MCP pathfinder.
The path should actively seek out trails and roads when nearby.
Runs THREE pathfinding passes with different boundary modes:
1. boundary_mode="strict" - private land is impassable
2. boundary_mode="pragmatic" - private land has 5x friction penalty
3. boundary_mode="emergency" - private land barriers ignored
Outputs comparison showing impact of boundary mode on routing.
Compares paths with and without trail burn-in to show the benefit
of trail-seeking behavior.
"""
import json
import time
@ -27,8 +23,9 @@ from lib.offroute.dem import DEMReader
from lib.offroute.cost import compute_cost_grid
from lib.offroute.friction import FrictionReader, friction_to_multiplier
from lib.offroute.barriers import BarrierReader, DEFAULT_BARRIERS_PATH
from lib.offroute.trails import TrailReader, DEFAULT_TRAILS_PATH
# Test bounding box - Idaho area known to have mixed public/private land
# Test bounding box - Idaho area
BBOX = {
"south": 42.21,
"north": 42.60,
@ -36,26 +33,17 @@ BBOX = {
"east": -113.79,
}
# Start point: wilderness area south of Twin Falls
START_LAT = 42.36
START_LON = -114.55
# Start point: wilderness area away from roads
START_LAT = 42.35
START_LON = -114.60
# End point: near Burley, ID (on road network)
# End point: near Twin Falls (has roads/trails)
END_LAT = 42.55
END_LON = -114.25
END_LON = -114.20
# Output files
OUTPUT_PATHS = {
"strict": Path("/opt/recon/data/offroute-test-strict.geojson"),
"pragmatic": Path("/opt/recon/data/offroute-test-pragmatic.geojson"),
"emergency": Path("/opt/recon/data/offroute-test-emergency.geojson"),
}
# Old files to delete
OLD_FILES = [
Path("/opt/recon/data/offroute-test-barriers-on.geojson"),
Path("/opt/recon/data/offroute-test-barriers-off.geojson"),
]
OUTPUT_PATH_WITH_TRAILS = Path("/opt/recon/data/offroute-test-trails.geojson")
OUTPUT_PATH_NO_TRAILS = Path("/opt/recon/data/offroute-test-no-trails.geojson")
# Memory limit in GB
MEMORY_LIMIT_GB = 12
@ -79,32 +67,26 @@ def run_pathfinder(
elevation: np.ndarray,
meta: dict,
friction_mult: np.ndarray,
trails: np.ndarray,
barriers: np.ndarray,
boundary_mode: str,
use_trails: bool,
start_row: int,
start_col: int,
end_row: int,
end_col: int,
dem_reader: DEMReader,
) -> dict:
"""
Run the MCP pathfinder with given parameters.
Returns dict with path info and stats.
"""
"""Run the MCP pathfinder with given parameters."""
# Compute cost grid
cost = compute_cost_grid(
elevation,
cell_size_m=meta["cell_size_m"],
friction=friction_mult,
trails=trails if use_trails else None,
barriers=barriers,
boundary_mode=boundary_mode,
boundary_mode="pragmatic",
)
# Count impassable cells
impassable_count = np.sum(np.isinf(cost))
barrier_count = np.sum(barriers == 255) if barriers is not None else 0
# Run MCP
mcp = MCP_Geometric(cost, fully_connected=True)
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
@ -115,25 +97,23 @@ def run_pathfinder(
return {
"success": False,
"reason": "No path found (blocked by impassable terrain)",
"impassable_cells": int(impassable_count),
"barrier_cells": int(barrier_count),
}
# Traceback path
path_indices = mcp.traceback((end_row, end_col))
# Convert to coordinates
# Convert to coordinates and collect stats
coordinates = []
elevations = []
barrier_values = []
trail_values = []
for row, col in path_indices:
lat, lon = dem_reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
barr = barriers[row, col] if barriers is not None else 0
trail_val = trails[row, col] if trails is not None else 0
coordinates.append([lon, lat])
elevations.append(elev)
barrier_values.append(barr)
trail_values.append(trail_val)
# Compute distance
total_distance_m = 0
@ -153,9 +133,14 @@ def run_pathfinder(
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Barrier crossings on path
barr_arr = np.array(barrier_values)
barrier_crossings = np.sum(barr_arr == 255)
# Trail stats
trail_arr = np.array(trail_values)
road_cells = np.sum(trail_arr == 5)
track_cells = np.sum(trail_arr == 15)
trail_cells = np.sum(trail_arr == 25)
off_trail_cells = np.sum(trail_arr == 0)
on_trail_cells = road_cells + track_cells + trail_cells
total_cells = len(trail_arr)
return {
"success": True,
@ -168,30 +153,28 @@ def run_pathfinder(
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"cell_count": len(path_indices),
"impassable_cells": int(impassable_count),
"barrier_cells": int(barrier_count),
"barrier_crossings": int(barrier_crossings),
"cell_count": total_cells,
"road_cells": int(road_cells),
"track_cells": int(track_cells),
"trail_cells": int(trail_cells),
"off_trail_cells": int(off_trail_cells),
"on_trail_pct": float(100 * on_trail_cells / total_cells) if total_cells > 0 else 0,
}
def main():
print("=" * 80)
print("OFFROUTE Phase O2c Prototype (Three-Mode Boundary Respect)")
print("OFFROUTE Phase O3a Prototype (Trail Burn-In)")
print("=" * 80)
t0 = time.time()
# Delete old output files
for old_file in OLD_FILES:
if old_file.exists():
old_file.unlink()
print(f"Deleted old file: {old_file}")
# Check if barrier raster exists
# Check for required rasters
if not DEFAULT_BARRIERS_PATH.exists():
print(f"\nERROR: Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run first: python /opt/recon/lib/offroute/barriers.py build")
sys.exit(1)
if not DEFAULT_TRAILS_PATH.exists():
print(f"\nERROR: Trails raster not found at {DEFAULT_TRAILS_PATH}")
sys.exit(1)
# Step 1: Load elevation data
@ -243,25 +226,42 @@ def main():
closed_cells = np.sum(barriers == 255)
print(f" Barrier grid shape: {barriers.shape}")
print(f" Closed/restricted cells: {closed_cells:,} ({100*closed_cells/barriers.size:.2f}%)")
print(f" Closed/restricted cells: {closed_cells:,}")
if closed_cells == 0:
print("\n WARNING: No closed/restricted areas in this bbox.")
print(" The test may not show meaningful differences between modes.")
# Step 4: Load trails data
print(f"\n[4] Loading OSM trails layer...")
trail_reader = TrailReader()
trails = trail_reader.get_trails_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
road_cells = np.sum(trails == 5)
track_cells = np.sum(trails == 15)
trail_cells = np.sum(trails == 25)
print(f" Trails grid shape: {trails.shape}")
print(f" Road cells: {road_cells:,}")
print(f" Track cells: {track_cells:,}")
print(f" Trail cells: {trail_cells:,}")
print(f" Total trail coverage: {100*(road_cells+track_cells+trail_cells)/trails.size:.2f}%")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 4: Convert start/end to pixel coordinates
print(f"\n[4] Converting coordinates...")
# Step 5: Convert start/end to pixel coordinates
print(f"\n[5] Converting coordinates...")
start_row, start_col = dem_reader.latlon_to_pixel(START_LAT, START_LON, meta)
end_row, end_col = dem_reader.latlon_to_pixel(END_LAT, END_LON, meta)
print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})")
print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})")
# Validate coordinates are within bounds
# Validate coordinates
rows, cols = elevation.shape
if not (0 <= start_row < rows and 0 <= start_col < cols):
print(f"ERROR: Start point outside grid bounds")
@ -270,64 +270,86 @@ def main():
print(f"ERROR: End point outside grid bounds")
sys.exit(1)
# Step 5: Run pathfinder THREE times
results = {}
modes = ["strict", "pragmatic", "emergency"]
# Step 6: Run pathfinder WITH trails
print(f"\n[6] Running pathfinder WITH trail burn-in...")
t6a = time.time()
result_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=True,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t6b = time.time()
print(f" Completed in {t6b - t6a:.1f}s")
for i, mode in enumerate(modes, start=5):
print(f"\n[{i}] Running pathfinder (boundary_mode=\"{mode}\")...")
t_start = time.time()
results[mode] = run_pathfinder(
elevation, meta, friction_mult, barriers,
boundary_mode=mode,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t_end = time.time()
print(f" Completed in {t_end - t_start:.1f}s")
# Step 7: Run pathfinder WITHOUT trails
print(f"\n[7] Running pathfinder WITHOUT trail burn-in...")
t7a = time.time()
result_no_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=False,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t7b = time.time()
print(f" Completed in {t7b - t7a:.1f}s")
# Step 6: Save GeoJSON outputs
# Step 8: Save GeoJSON outputs
print(f"\n[8] Saving GeoJSON outputs...")
OUTPUT_PATHS["strict"].parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH_WITH_TRAILS.parent.mkdir(parents=True, exist_ok=True)
for mode, result in results.items():
output_path = OUTPUT_PATHS[mode]
if result["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": f"offroute_{mode}",
"phase": "O2c",
"boundary_mode": mode,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result["coordinates"],
}
if result_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_with_trails",
"phase": "O3a",
"trail_burn_in": True,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_trails["coordinates"],
}
with open(output_path, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {output_path}")
else:
print(f" SKIPPED ({mode}): {result['reason']}")
}
with open(OUTPUT_PATH_WITH_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_WITH_TRAILS}")
if result_no_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_no_trails",
"phase": "O3a",
"trail_burn_in": False,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_no_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_no_trails["coordinates"],
}
}
with open(OUTPUT_PATH_NO_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_NO_TRAILS}")
t_total = time.time()
# Final report - three-way comparison
# Final report
print(f"\n" + "=" * 80)
print("THREE-WAY COMPARISON")
print("SIDE-BY-SIDE COMPARISON: Trail Burn-In Effect")
print("=" * 80)
# Check how many succeeded
success_count = sum(1 for r in results.values() if r["success"])
if success_count == 3:
print(f"{'Metric':<22} {'STRICT':<18} {'PRAGMATIC':<18} {'EMERGENCY':<18}")
if result_trails["success"] and result_no_trails["success"]:
print(f"{'Metric':<25} {'WITH TRAILS':<20} {'WITHOUT TRAILS':<20} {'Delta':<15}")
print("-" * 80)
metrics = [
@ -335,55 +357,55 @@ def main():
("Effort time (min)", "total_time_minutes", ".1f"),
("Cell count", "cell_count", "d"),
("Elevation gain (m)", "elevation_gain_m", ".0f"),
("Elevation loss (m)", "elevation_loss_m", ".0f"),
("Barrier crossings", "barrier_crossings", "d"),
("Impassable cells", "impassable_cells", ",d"),
("On-trail %", "on_trail_pct", ".1f"),
("Road cells", "road_cells", "d"),
("Track cells", "track_cells", "d"),
("Trail cells", "trail_cells", "d"),
]
for label, key, fmt in metrics:
vals = [results[m][key] for m in modes]
print(f"{label:<22} {vals[0]:<18{fmt}} {vals[1]:<18{fmt}} {vals[2]:<18{fmt}}")
val_with = result_trails[key]
val_without = result_no_trails[key]
if isinstance(val_with, int):
delta = val_with - val_without
delta_str = f"{delta:+d}"
else:
delta = val_with - val_without
delta_str = f"{delta:+.2f}"
print(f"{label:<25} {val_with:<20{fmt}} {val_without:<20{fmt}} {delta_str:<15}")
# Analysis
print(f"\n" + "-" * 80)
print("ANALYSIS")
print("-" * 80)
strict_crossings = results["strict"]["barrier_crossings"]
pragmatic_crossings = results["pragmatic"]["barrier_crossings"]
emergency_crossings = results["emergency"]["barrier_crossings"]
time_saved = result_no_trails["total_time_minutes"] - result_trails["total_time_minutes"]
if time_saved > 0:
print(f"Trail burn-in saves {time_saved:.1f} minutes ({100*time_saved/result_no_trails['total_time_minutes']:.1f}% faster)")
elif time_saved < 0:
print(f"Trail burn-in adds {-time_saved:.1f} minutes (path seeks trails even if longer)")
print(f"Barrier crossings: strict={strict_crossings}, pragmatic={pragmatic_crossings}, emergency={emergency_crossings}")
if strict_crossings == 0 and pragmatic_crossings == 0 and emergency_crossings == 0:
print("No path crosses private land - terrain naturally avoids barriers.")
on_trail_with = result_trails["on_trail_pct"]
on_trail_without = result_no_trails["on_trail_pct"]
if on_trail_with > on_trail_without:
print(f"Trail burn-in increases on-trail travel: {on_trail_without:.1f}% → {on_trail_with:.1f}%")
else:
if emergency_crossings > pragmatic_crossings:
print(f"Pragmatic mode reduces barrier crossings vs emergency: {emergency_crossings} -> {pragmatic_crossings}")
if pragmatic_crossings > 0 and strict_crossings == 0:
print(f"Strict mode completely avoids private land (pragmatic crosses {pragmatic_crossings} cells)")
# Time/distance comparison
if results["strict"]["total_time_minutes"] > results["emergency"]["total_time_minutes"]:
time_penalty = results["strict"]["total_time_minutes"] - results["emergency"]["total_time_minutes"]
print(f"Time cost of strict boundary respect: +{time_penalty:.1f} min")
print(f"Both paths have similar on-trail percentage")
else:
print(f"Only {success_count}/3 modes found a path:")
for mode, result in results.items():
if result["success"]:
print(f" {mode}: {result['total_distance_km']:.2f} km, {result['total_time_minutes']:.1f} min")
else:
print(f" {mode}: FAILED - {result.get('reason', 'unknown')}")
if not result_trails["success"]:
print(f"WITH TRAILS: FAILED - {result_trails.get('reason', 'unknown')}")
if not result_no_trails["success"]:
print(f"WITHOUT TRAILS: FAILED - {result_no_trails.get('reason', 'unknown')}")
print(f"\n" + "-" * 80)
print(f"Total wall time: {t_total - t0:.1f}s")
print(f"Closed cells in bbox: {closed_cells:,}")
# Cleanup
dem_reader.close()
friction_reader.close()
barrier_reader.close()
trail_reader.close()
print("\nPrototype completed.")

174
lib/offroute/trails.py Normal file
View file

@ -0,0 +1,174 @@
"""
Trail corridor reader for OFFROUTE.
Provides access to the OSM-derived trail raster for pathfinding.
Trail values replace WorldCover friction where trails exist.
Raster values:
0 = no trail (use WorldCover friction)
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for trails layer support")
# Default path to the trails raster
DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif")
# Trail value to friction multiplier mapping
TRAIL_FRICTION_MAP = {
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
}
class TrailReader:
"""Reader for OSM-derived trail corridor raster."""
def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH):
self.trails_path = trails_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.trails_path.exists():
raise FileNotFoundError(
f"Trails raster not found at {self.trails_path}. "
f"Run the Phase B rasterization script first."
)
self._dataset = rasterio.open(self.trails_path)
return self._dataset
def get_trails_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get trail values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 trail values:
0 = no trail
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor to preserve discrete values
trails = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return trails
def sample_point(self, lat: float, lon: float) -> int:
"""Sample trail value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = no trail
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert trail values to friction multipliers.
Args:
trails: uint8 array of trail values (0, 5, 15, or 25)
Returns:
Tuple of:
- friction: float32 array of friction multipliers
- has_trail: bool array indicating where trails exist
"""
friction = np.ones_like(trails, dtype=np.float32)
has_trail = trails > 0
# Apply friction values where trails exist
friction[trails == 5] = 0.1 # road
friction[trails == 15] = 0.3 # track
friction[trails == 25] = 0.5 # foot trail
return friction, has_trail
if __name__ == "__main__":
print("Testing TrailReader...")
if not DEFAULT_TRAILS_PATH.exists():
print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}")
print("Run Phase B rasterization first.")
exit(1)
reader = TrailReader()
# Test point sampling - Twin Falls downtown (should have roads)
test_lat, test_lon = 42.563, -114.461
trail_value = reader.sample_point(test_lat, test_lon)
print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}")
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown")
print(f" Type: {label}")
# Test grid read for test bbox
trails = reader.get_trails_grid(
south=42.21, north=42.60, west=-114.76, east=-113.79,
target_shape=(400, 1000)
)
print(f"\nGrid test shape: {trails.shape}")
unique, counts = np.unique(trails, return_counts=True)
print("Value distribution:")
for v, c in zip(unique, counts):
pct = 100 * c / trails.size
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})")
print(f" {label}: {c:,} pixels ({pct:.2f}%)")
# Test conversion to friction
friction, has_trail = trails_to_friction(trails)
print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%")
print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}")
reader.close()
print("\nTrailReader test complete.")