diff --git a/lib/offroute/cost.py b/lib/offroute/cost.py index f31b8f5..5f3618c 100644 --- a/lib/offroute/cost.py +++ b/lib/offroute/cost.py @@ -3,7 +3,7 @@ Tobler off-path hiking cost function for OFFROUTE. Computes travel time cost based on terrain slope using Tobler's hiking function with off-trail penalty. Optionally applies friction -multipliers from land cover data and barrier grids from PAD-US. +multipliers from land cover data, trail corridors, and barrier grids. """ import math import numpy as np @@ -19,6 +19,14 @@ TOBLER_OFF_TRAIL_MULT = 0.6 # Pragmatic mode friction multiplier for private land PRAGMATIC_BARRIER_MULTIPLIER = 5.0 +# Trail value to friction multiplier mapping +# Trail friction REPLACES land cover friction (a road through forest is still easy) +TRAIL_FRICTION_MAP = { + 5: 0.1, # road + 15: 0.3, # track + 25: 0.5, # foot trail +} + def tobler_speed(grade: float) -> float: """ @@ -37,6 +45,7 @@ def compute_cost_grid( cell_size_lat_m: float = None, cell_size_lon_m: float = None, friction: Optional[np.ndarray] = None, + trails: Optional[np.ndarray] = None, barriers: Optional[np.ndarray] = None, boundary_mode: Literal["strict", "pragmatic", "emergency"] = "pragmatic" ) -> np.ndarray: @@ -51,10 +60,17 @@ def compute_cost_grid( cell_size_m: Average cell size in meters cell_size_lat_m: Cell size in latitude direction (optional) cell_size_lon_m: Cell size in longitude direction (optional) - friction: Optional 2D array of friction multipliers. + friction: Optional 2D array of friction multipliers (WorldCover). Values should be float (1.0 = baseline, 2.0 = 2x slower). np.inf marks impassable cells. If None, no friction is applied (backward compatible). + trails: Optional 2D array of trail values (uint8). + 0 = no trail (use friction) + 5 = road (0.1× friction, replaces WorldCover) + 15 = track (0.3× friction, replaces WorldCover) + 25 = foot trail (0.5× friction, replaces WorldCover) + Trail friction REPLACES land cover friction where trails exist. + If None, no trail burn-in is applied. barriers: Optional 2D array of barrier values (uint8). 255 = closed/restricted area (from PAD-US Pub_Access = XA). 0 = accessible. @@ -111,14 +127,30 @@ def compute_cost_grid( # Handle NaN elevations (no data) cost[np.isnan(elevation)] = np.inf - # Apply friction multipliers if provided + # Build effective friction array + # Start with WorldCover friction if provided, else 1.0 if friction is not None: if friction.shape != elevation.shape: raise ValueError( f"Friction shape {friction.shape} does not match elevation shape {elevation.shape}" ) - # Multiply cost by friction (inf * anything = inf, which is correct) - cost = cost * friction + effective_friction = friction.copy() + else: + effective_friction = np.ones(elevation.shape, dtype=np.float32) + + # Apply trail burn-in: trails REPLACE land cover friction + if trails is not None: + if trails.shape != elevation.shape: + raise ValueError( + f"Trails shape {trails.shape} does not match elevation shape {elevation.shape}" + ) + # Replace friction where trails exist + for trail_value, trail_friction in TRAIL_FRICTION_MAP.items(): + trail_mask = trails == trail_value + effective_friction[trail_mask] = trail_friction + + # Apply friction to cost + cost = cost * effective_friction # Apply barriers based on boundary_mode if barriers is not None and boundary_mode != "emergency": @@ -145,7 +177,7 @@ if __name__ == "__main__": speed = tobler_speed(grade) print(f" Grade {grade:+.2f}: {speed:.2f} km/h") - print("\nTesting cost grid computation (no friction, no barriers):") + print("\nTesting cost grid computation (no friction, no trails):") elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10 cost = compute_cost_grid(elev, cell_size_m=30.0) print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m") @@ -155,21 +187,25 @@ if __name__ == "__main__": else: print(f" All cells impassable (test data too steep)") - print("\nTesting cost grid with friction:") + print("\nTesting cost grid with friction and trails:") elev = np.ones((10, 10), dtype=np.float32) * 1000 # flat terrain - friction = np.ones((10, 10), dtype=np.float32) * 1.5 # 1.5x friction - friction[5, 5] = np.inf # one impassable cell - cost = compute_cost_grid(elev, cell_size_m=30.0, friction=friction) - print(f" Base cost (flat, 30m cell): {30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05)):.1f} s") - print(f" With 1.5x friction: {cost[0, 0]:.1f} s") - print(f" Impassable cells: {np.sum(np.isinf(cost))}") + friction = np.ones((10, 10), dtype=np.float32) * 2.0 # 2.0x friction (forest) + trails = np.zeros((10, 10), dtype=np.uint8) + trails[5, :] = 5 # road across middle row - print("\nTesting cost grid with barriers (three modes):") - elev = np.ones((10, 10), dtype=np.float32) * 1000 # flat terrain - barriers = np.zeros((10, 10), dtype=np.uint8) - barriers[3:7, 3:7] = 255 # 4x4 closed area in center + cost_no_trail = compute_cost_grid(elev, cell_size_m=30.0, friction=friction) + cost_with_trail = compute_cost_grid(elev, cell_size_m=30.0, friction=friction, trails=trails) base_cost = 30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05)) + print(f" Base cost (flat, 30m cell): {base_cost:.1f} s") + print(f" Forest cell (2.0x friction): {cost_no_trail[0, 0]:.1f} s") + print(f" Road cell (0.1x friction, replaces forest): {cost_with_trail[5, 0]:.1f} s") + print(f" Road friction advantage: {cost_no_trail[0, 0] / cost_with_trail[5, 0]:.1f}x faster") + + print("\nTesting cost grid with barriers (three modes):") + elev = np.ones((10, 10), dtype=np.float32) * 1000 + barriers = np.zeros((10, 10), dtype=np.uint8) + barriers[3:7, 3:7] = 255 for mode in ["strict", "pragmatic", "emergency"]: cost = compute_cost_grid(elev, cell_size_m=30.0, barriers=barriers, boundary_mode=mode) diff --git a/lib/offroute/prototype.py b/lib/offroute/prototype.py index b5caf86..c9b78f0 100755 --- a/lib/offroute/prototype.py +++ b/lib/offroute/prototype.py @@ -1,16 +1,12 @@ #!/usr/bin/env python3 """ -OFFROUTE Phase O2c Prototype +OFFROUTE Phase O3a Prototype -Validates the PMTiles decoder, Tobler cost function, WorldCover friction, -PAD-US barriers integration, and MCP pathfinder on a real Idaho bounding box. +Validates trail burn-in integration with the MCP pathfinder. +The path should actively seek out trails and roads when nearby. -Runs THREE pathfinding passes with different boundary modes: - 1. boundary_mode="strict" - private land is impassable - 2. boundary_mode="pragmatic" - private land has 5x friction penalty - 3. boundary_mode="emergency" - private land barriers ignored - -Outputs comparison showing impact of boundary mode on routing. +Compares paths with and without trail burn-in to show the benefit +of trail-seeking behavior. """ import json import time @@ -27,8 +23,9 @@ from lib.offroute.dem import DEMReader from lib.offroute.cost import compute_cost_grid from lib.offroute.friction import FrictionReader, friction_to_multiplier from lib.offroute.barriers import BarrierReader, DEFAULT_BARRIERS_PATH +from lib.offroute.trails import TrailReader, DEFAULT_TRAILS_PATH -# Test bounding box - Idaho area known to have mixed public/private land +# Test bounding box - Idaho area BBOX = { "south": 42.21, "north": 42.60, @@ -36,26 +33,17 @@ BBOX = { "east": -113.79, } -# Start point: wilderness area south of Twin Falls -START_LAT = 42.36 -START_LON = -114.55 +# Start point: wilderness area away from roads +START_LAT = 42.35 +START_LON = -114.60 -# End point: near Burley, ID (on road network) +# End point: near Twin Falls (has roads/trails) END_LAT = 42.55 -END_LON = -114.25 +END_LON = -114.20 # Output files -OUTPUT_PATHS = { - "strict": Path("/opt/recon/data/offroute-test-strict.geojson"), - "pragmatic": Path("/opt/recon/data/offroute-test-pragmatic.geojson"), - "emergency": Path("/opt/recon/data/offroute-test-emergency.geojson"), -} - -# Old files to delete -OLD_FILES = [ - Path("/opt/recon/data/offroute-test-barriers-on.geojson"), - Path("/opt/recon/data/offroute-test-barriers-off.geojson"), -] +OUTPUT_PATH_WITH_TRAILS = Path("/opt/recon/data/offroute-test-trails.geojson") +OUTPUT_PATH_NO_TRAILS = Path("/opt/recon/data/offroute-test-no-trails.geojson") # Memory limit in GB MEMORY_LIMIT_GB = 12 @@ -79,32 +67,26 @@ def run_pathfinder( elevation: np.ndarray, meta: dict, friction_mult: np.ndarray, + trails: np.ndarray, barriers: np.ndarray, - boundary_mode: str, + use_trails: bool, start_row: int, start_col: int, end_row: int, end_col: int, dem_reader: DEMReader, ) -> dict: - """ - Run the MCP pathfinder with given parameters. - - Returns dict with path info and stats. - """ + """Run the MCP pathfinder with given parameters.""" # Compute cost grid cost = compute_cost_grid( elevation, cell_size_m=meta["cell_size_m"], friction=friction_mult, + trails=trails if use_trails else None, barriers=barriers, - boundary_mode=boundary_mode, + boundary_mode="pragmatic", ) - # Count impassable cells - impassable_count = np.sum(np.isinf(cost)) - barrier_count = np.sum(barriers == 255) if barriers is not None else 0 - # Run MCP mcp = MCP_Geometric(cost, fully_connected=True) cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)]) @@ -115,25 +97,23 @@ def run_pathfinder( return { "success": False, "reason": "No path found (blocked by impassable terrain)", - "impassable_cells": int(impassable_count), - "barrier_cells": int(barrier_count), } # Traceback path path_indices = mcp.traceback((end_row, end_col)) - # Convert to coordinates + # Convert to coordinates and collect stats coordinates = [] elevations = [] - barrier_values = [] + trail_values = [] for row, col in path_indices: lat, lon = dem_reader.pixel_to_latlon(row, col, meta) elev = elevation[row, col] - barr = barriers[row, col] if barriers is not None else 0 + trail_val = trails[row, col] if trails is not None else 0 coordinates.append([lon, lat]) elevations.append(elev) - barrier_values.append(barr) + trail_values.append(trail_val) # Compute distance total_distance_m = 0 @@ -153,9 +133,14 @@ def run_pathfinder( elev_gain = np.sum(elev_diff[elev_diff > 0]) elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0])) - # Barrier crossings on path - barr_arr = np.array(barrier_values) - barrier_crossings = np.sum(barr_arr == 255) + # Trail stats + trail_arr = np.array(trail_values) + road_cells = np.sum(trail_arr == 5) + track_cells = np.sum(trail_arr == 15) + trail_cells = np.sum(trail_arr == 25) + off_trail_cells = np.sum(trail_arr == 0) + on_trail_cells = road_cells + track_cells + trail_cells + total_cells = len(trail_arr) return { "success": True, @@ -168,30 +153,28 @@ def run_pathfinder( "elevation_loss_m": float(elev_loss), "min_elevation_m": float(np.min(elev_arr)), "max_elevation_m": float(np.max(elev_arr)), - "cell_count": len(path_indices), - "impassable_cells": int(impassable_count), - "barrier_cells": int(barrier_count), - "barrier_crossings": int(barrier_crossings), + "cell_count": total_cells, + "road_cells": int(road_cells), + "track_cells": int(track_cells), + "trail_cells": int(trail_cells), + "off_trail_cells": int(off_trail_cells), + "on_trail_pct": float(100 * on_trail_cells / total_cells) if total_cells > 0 else 0, } def main(): print("=" * 80) - print("OFFROUTE Phase O2c Prototype (Three-Mode Boundary Respect)") + print("OFFROUTE Phase O3a Prototype (Trail Burn-In)") print("=" * 80) t0 = time.time() - # Delete old output files - for old_file in OLD_FILES: - if old_file.exists(): - old_file.unlink() - print(f"Deleted old file: {old_file}") - - # Check if barrier raster exists + # Check for required rasters if not DEFAULT_BARRIERS_PATH.exists(): print(f"\nERROR: Barrier raster not found at {DEFAULT_BARRIERS_PATH}") - print(f"Run first: python /opt/recon/lib/offroute/barriers.py build") + sys.exit(1) + if not DEFAULT_TRAILS_PATH.exists(): + print(f"\nERROR: Trails raster not found at {DEFAULT_TRAILS_PATH}") sys.exit(1) # Step 1: Load elevation data @@ -243,25 +226,42 @@ def main(): closed_cells = np.sum(barriers == 255) print(f" Barrier grid shape: {barriers.shape}") - print(f" Closed/restricted cells: {closed_cells:,} ({100*closed_cells/barriers.size:.2f}%)") + print(f" Closed/restricted cells: {closed_cells:,}") - if closed_cells == 0: - print("\n WARNING: No closed/restricted areas in this bbox.") - print(" The test may not show meaningful differences between modes.") + # Step 4: Load trails data + print(f"\n[4] Loading OSM trails layer...") + trail_reader = TrailReader() + + trails = trail_reader.get_trails_grid( + south=BBOX["south"], + north=BBOX["north"], + west=BBOX["west"], + east=BBOX["east"], + target_shape=elevation.shape + ) + + road_cells = np.sum(trails == 5) + track_cells = np.sum(trails == 15) + trail_cells = np.sum(trails == 25) + print(f" Trails grid shape: {trails.shape}") + print(f" Road cells: {road_cells:,}") + print(f" Track cells: {track_cells:,}") + print(f" Trail cells: {trail_cells:,}") + print(f" Total trail coverage: {100*(road_cells+track_cells+trail_cells)/trails.size:.2f}%") mem = check_memory_usage() if mem > 0: print(f" Memory usage: {mem:.1f} GB") - # Step 4: Convert start/end to pixel coordinates - print(f"\n[4] Converting coordinates...") + # Step 5: Convert start/end to pixel coordinates + print(f"\n[5] Converting coordinates...") start_row, start_col = dem_reader.latlon_to_pixel(START_LAT, START_LON, meta) end_row, end_col = dem_reader.latlon_to_pixel(END_LAT, END_LON, meta) print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})") print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})") - # Validate coordinates are within bounds + # Validate coordinates rows, cols = elevation.shape if not (0 <= start_row < rows and 0 <= start_col < cols): print(f"ERROR: Start point outside grid bounds") @@ -270,64 +270,86 @@ def main(): print(f"ERROR: End point outside grid bounds") sys.exit(1) - # Step 5: Run pathfinder THREE times - results = {} - modes = ["strict", "pragmatic", "emergency"] + # Step 6: Run pathfinder WITH trails + print(f"\n[6] Running pathfinder WITH trail burn-in...") + t6a = time.time() + result_trails = run_pathfinder( + elevation, meta, friction_mult, trails, barriers, + use_trails=True, + start_row=start_row, start_col=start_col, + end_row=end_row, end_col=end_col, + dem_reader=dem_reader, + ) + t6b = time.time() + print(f" Completed in {t6b - t6a:.1f}s") - for i, mode in enumerate(modes, start=5): - print(f"\n[{i}] Running pathfinder (boundary_mode=\"{mode}\")...") - t_start = time.time() - results[mode] = run_pathfinder( - elevation, meta, friction_mult, barriers, - boundary_mode=mode, - start_row=start_row, start_col=start_col, - end_row=end_row, end_col=end_col, - dem_reader=dem_reader, - ) - t_end = time.time() - print(f" Completed in {t_end - t_start:.1f}s") + # Step 7: Run pathfinder WITHOUT trails + print(f"\n[7] Running pathfinder WITHOUT trail burn-in...") + t7a = time.time() + result_no_trails = run_pathfinder( + elevation, meta, friction_mult, trails, barriers, + use_trails=False, + start_row=start_row, start_col=start_col, + end_row=end_row, end_col=end_col, + dem_reader=dem_reader, + ) + t7b = time.time() + print(f" Completed in {t7b - t7a:.1f}s") - # Step 6: Save GeoJSON outputs + # Step 8: Save GeoJSON outputs print(f"\n[8] Saving GeoJSON outputs...") - OUTPUT_PATHS["strict"].parent.mkdir(parents=True, exist_ok=True) + OUTPUT_PATH_WITH_TRAILS.parent.mkdir(parents=True, exist_ok=True) - for mode, result in results.items(): - output_path = OUTPUT_PATHS[mode] - if result["success"]: - geojson = { - "type": "Feature", - "properties": { - "type": f"offroute_{mode}", - "phase": "O2c", - "boundary_mode": mode, - "start": {"lat": START_LAT, "lon": START_LON}, - "end": {"lat": END_LAT, "lon": END_LON}, - **{k: v for k, v in result.items() if k not in ["success", "coordinates"]}, - }, - "geometry": { - "type": "LineString", - "coordinates": result["coordinates"], - } + if result_trails["success"]: + geojson = { + "type": "Feature", + "properties": { + "type": "offroute_with_trails", + "phase": "O3a", + "trail_burn_in": True, + "start": {"lat": START_LAT, "lon": START_LON}, + "end": {"lat": END_LAT, "lon": END_LON}, + **{k: v for k, v in result_trails.items() if k not in ["success", "coordinates"]}, + }, + "geometry": { + "type": "LineString", + "coordinates": result_trails["coordinates"], } - with open(output_path, "w") as f: - json.dump(geojson, f, indent=2) - print(f" Saved: {output_path}") - else: - print(f" SKIPPED ({mode}): {result['reason']}") + } + with open(OUTPUT_PATH_WITH_TRAILS, "w") as f: + json.dump(geojson, f, indent=2) + print(f" Saved: {OUTPUT_PATH_WITH_TRAILS}") + + if result_no_trails["success"]: + geojson = { + "type": "Feature", + "properties": { + "type": "offroute_no_trails", + "phase": "O3a", + "trail_burn_in": False, + "start": {"lat": START_LAT, "lon": START_LON}, + "end": {"lat": END_LAT, "lon": END_LON}, + **{k: v for k, v in result_no_trails.items() if k not in ["success", "coordinates"]}, + }, + "geometry": { + "type": "LineString", + "coordinates": result_no_trails["coordinates"], + } + } + with open(OUTPUT_PATH_NO_TRAILS, "w") as f: + json.dump(geojson, f, indent=2) + print(f" Saved: {OUTPUT_PATH_NO_TRAILS}") t_total = time.time() - # Final report - three-way comparison + # Final report print(f"\n" + "=" * 80) - print("THREE-WAY COMPARISON") + print("SIDE-BY-SIDE COMPARISON: Trail Burn-In Effect") print("=" * 80) - # Check how many succeeded - success_count = sum(1 for r in results.values() if r["success"]) - - if success_count == 3: - print(f"{'Metric':<22} {'STRICT':<18} {'PRAGMATIC':<18} {'EMERGENCY':<18}") + if result_trails["success"] and result_no_trails["success"]: + print(f"{'Metric':<25} {'WITH TRAILS':<20} {'WITHOUT TRAILS':<20} {'Delta':<15}") print("-" * 80) metrics = [ @@ -335,55 +357,55 @@ def main(): ("Effort time (min)", "total_time_minutes", ".1f"), ("Cell count", "cell_count", "d"), ("Elevation gain (m)", "elevation_gain_m", ".0f"), - ("Elevation loss (m)", "elevation_loss_m", ".0f"), - ("Barrier crossings", "barrier_crossings", "d"), - ("Impassable cells", "impassable_cells", ",d"), + ("On-trail %", "on_trail_pct", ".1f"), + ("Road cells", "road_cells", "d"), + ("Track cells", "track_cells", "d"), + ("Trail cells", "trail_cells", "d"), ] for label, key, fmt in metrics: - vals = [results[m][key] for m in modes] - print(f"{label:<22} {vals[0]:<18{fmt}} {vals[1]:<18{fmt}} {vals[2]:<18{fmt}}") + val_with = result_trails[key] + val_without = result_no_trails[key] + if isinstance(val_with, int): + delta = val_with - val_without + delta_str = f"{delta:+d}" + else: + delta = val_with - val_without + delta_str = f"{delta:+.2f}" + print(f"{label:<25} {val_with:<20{fmt}} {val_without:<20{fmt}} {delta_str:<15}") # Analysis print(f"\n" + "-" * 80) print("ANALYSIS") print("-" * 80) - strict_crossings = results["strict"]["barrier_crossings"] - pragmatic_crossings = results["pragmatic"]["barrier_crossings"] - emergency_crossings = results["emergency"]["barrier_crossings"] + time_saved = result_no_trails["total_time_minutes"] - result_trails["total_time_minutes"] + if time_saved > 0: + print(f"Trail burn-in saves {time_saved:.1f} minutes ({100*time_saved/result_no_trails['total_time_minutes']:.1f}% faster)") + elif time_saved < 0: + print(f"Trail burn-in adds {-time_saved:.1f} minutes (path seeks trails even if longer)") - print(f"Barrier crossings: strict={strict_crossings}, pragmatic={pragmatic_crossings}, emergency={emergency_crossings}") - - if strict_crossings == 0 and pragmatic_crossings == 0 and emergency_crossings == 0: - print("No path crosses private land - terrain naturally avoids barriers.") + on_trail_with = result_trails["on_trail_pct"] + on_trail_without = result_no_trails["on_trail_pct"] + if on_trail_with > on_trail_without: + print(f"Trail burn-in increases on-trail travel: {on_trail_without:.1f}% → {on_trail_with:.1f}%") else: - if emergency_crossings > pragmatic_crossings: - print(f"Pragmatic mode reduces barrier crossings vs emergency: {emergency_crossings} -> {pragmatic_crossings}") - if pragmatic_crossings > 0 and strict_crossings == 0: - print(f"Strict mode completely avoids private land (pragmatic crosses {pragmatic_crossings} cells)") - - # Time/distance comparison - if results["strict"]["total_time_minutes"] > results["emergency"]["total_time_minutes"]: - time_penalty = results["strict"]["total_time_minutes"] - results["emergency"]["total_time_minutes"] - print(f"Time cost of strict boundary respect: +{time_penalty:.1f} min") + print(f"Both paths have similar on-trail percentage") else: - print(f"Only {success_count}/3 modes found a path:") - for mode, result in results.items(): - if result["success"]: - print(f" {mode}: {result['total_distance_km']:.2f} km, {result['total_time_minutes']:.1f} min") - else: - print(f" {mode}: FAILED - {result.get('reason', 'unknown')}") + if not result_trails["success"]: + print(f"WITH TRAILS: FAILED - {result_trails.get('reason', 'unknown')}") + if not result_no_trails["success"]: + print(f"WITHOUT TRAILS: FAILED - {result_no_trails.get('reason', 'unknown')}") print(f"\n" + "-" * 80) print(f"Total wall time: {t_total - t0:.1f}s") - print(f"Closed cells in bbox: {closed_cells:,}") # Cleanup dem_reader.close() friction_reader.close() barrier_reader.close() + trail_reader.close() print("\nPrototype completed.") diff --git a/lib/offroute/trails.py b/lib/offroute/trails.py new file mode 100644 index 0000000..9d9185e --- /dev/null +++ b/lib/offroute/trails.py @@ -0,0 +1,174 @@ +""" +Trail corridor reader for OFFROUTE. + +Provides access to the OSM-derived trail raster for pathfinding. +Trail values replace WorldCover friction where trails exist. + +Raster values: + 0 = no trail (use WorldCover friction) + 5 = road (0.1× friction) + 15 = track (0.3× friction) + 25 = foot trail (0.5× friction) +""" +import numpy as np +from pathlib import Path +from typing import Tuple, Optional + +try: + import rasterio + from rasterio.windows import from_bounds + from rasterio.enums import Resampling +except ImportError: + raise ImportError("rasterio is required for trails layer support") + +# Default path to the trails raster +DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif") + +# Trail value to friction multiplier mapping +TRAIL_FRICTION_MAP = { + 5: 0.1, # road + 15: 0.3, # track + 25: 0.5, # foot trail +} + + +class TrailReader: + """Reader for OSM-derived trail corridor raster.""" + + def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH): + self.trails_path = trails_path + self._dataset = None + + def _open(self): + """Lazy open the dataset.""" + if self._dataset is None: + if not self.trails_path.exists(): + raise FileNotFoundError( + f"Trails raster not found at {self.trails_path}. " + f"Run the Phase B rasterization script first." + ) + self._dataset = rasterio.open(self.trails_path) + return self._dataset + + def get_trails_grid( + self, + south: float, + north: float, + west: float, + east: float, + target_shape: Tuple[int, int] + ) -> np.ndarray: + """ + Get trail values for a bounding box, resampled to target shape. + + Args: + south, north, west, east: Bounding box coordinates (WGS84) + target_shape: (rows, cols) to resample to (matches elevation grid) + + Returns: + np.ndarray of uint8 trail values: + 0 = no trail + 5 = road (0.1× friction) + 15 = track (0.3× friction) + 25 = foot trail (0.5× friction) + """ + ds = self._open() + + # Create a window from the bounding box + window = from_bounds(west, south, east, north, ds.transform) + + # Read with resampling to target shape + # Use nearest neighbor to preserve discrete values + trails = ds.read( + 1, + window=window, + out_shape=target_shape, + resampling=Resampling.nearest + ) + + return trails + + def sample_point(self, lat: float, lon: float) -> int: + """Sample trail value at a single point.""" + ds = self._open() + + # Get pixel coordinates + row, col = ds.index(lon, lat) + + # Check bounds + if row < 0 or row >= ds.height or col < 0 or col >= ds.width: + return 0 # Out of bounds = no trail + + # Read single pixel + window = rasterio.windows.Window(col, row, 1, 1) + value = ds.read(1, window=window) + return int(value[0, 0]) + + def close(self): + """Close the dataset.""" + if self._dataset is not None: + self._dataset.close() + self._dataset = None + + +def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Convert trail values to friction multipliers. + + Args: + trails: uint8 array of trail values (0, 5, 15, or 25) + + Returns: + Tuple of: + - friction: float32 array of friction multipliers + - has_trail: bool array indicating where trails exist + """ + friction = np.ones_like(trails, dtype=np.float32) + has_trail = trails > 0 + + # Apply friction values where trails exist + friction[trails == 5] = 0.1 # road + friction[trails == 15] = 0.3 # track + friction[trails == 25] = 0.5 # foot trail + + return friction, has_trail + + +if __name__ == "__main__": + print("Testing TrailReader...") + + if not DEFAULT_TRAILS_PATH.exists(): + print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}") + print("Run Phase B rasterization first.") + exit(1) + + reader = TrailReader() + + # Test point sampling - Twin Falls downtown (should have roads) + test_lat, test_lon = 42.563, -114.461 + trail_value = reader.sample_point(test_lat, test_lon) + print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}") + label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown") + print(f" Type: {label}") + + # Test grid read for test bbox + trails = reader.get_trails_grid( + south=42.21, north=42.60, west=-114.76, east=-113.79, + target_shape=(400, 1000) + ) + print(f"\nGrid test shape: {trails.shape}") + + unique, counts = np.unique(trails, return_counts=True) + print("Value distribution:") + for v, c in zip(unique, counts): + pct = 100 * c / trails.size + label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})") + print(f" {label}: {c:,} pixels ({pct:.2f}%)") + + # Test conversion to friction + friction, has_trail = trails_to_friction(trails) + print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%") + print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}") + + reader.close() + print("\nTrailReader test complete.")