From 26d4bc74784c7586ce14318a1d2cbf8d00f13b8e Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 8 May 2026 06:33:45 +0000 Subject: [PATCH] =?UTF-8?q?feat(offroute):=20Phase=20O2b=20=E2=80=94=20Wor?= =?UTF-8?q?ldCover=20friction=20integration,=20lake=20avoidance=20validate?= =?UTF-8?q?d?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New friction.py: reads WorldCover friction VRT, resamples to match elevation grid, provides point sampling for validation - Modified cost.py: accepts optional friction array, multiplies Tobler time cost by friction multiplier, inf for water/nodata (255/0) - Modified prototype.py: loads friction layer, passes to cost function, validates path avoids water cells (friction=255) Validated on Idaho test bbox: - Path avoids Murtaugh Lake (no water cells on path) - Friction along path: min=10, max=20, mean=10.2 - Effort increased 3.4% vs Phase O1 due to friction multipliers Co-Authored-By: Claude Opus 4.5 --- lib/offroute/cost.py | 226 ++++++++++++++++++++++---------------- lib/offroute/friction.py | 137 +++++++++++++++++++++++ lib/offroute/prototype.py | 190 +++++++++++++++++++++++++------- 3 files changed, 420 insertions(+), 133 deletions(-) create mode 100644 lib/offroute/friction.py diff --git a/lib/offroute/cost.py b/lib/offroute/cost.py index f460ab9..3607de6 100644 --- a/lib/offroute/cost.py +++ b/lib/offroute/cost.py @@ -1,94 +1,132 @@ -""" -Tobler off-path hiking cost function for OFFROUTE. - -Computes travel time cost based on terrain slope using Tobler's -hiking function with off-trail penalty. -""" -import math -import numpy as np -from typing import Tuple - -# Maximum passable slope in degrees -MAX_SLOPE_DEG = 40.0 - -# Tobler off-path parameters -TOBLER_BASE_SPEED = 6.0 -TOBLER_OFF_TRAIL_MULT = 0.6 - - -def tobler_speed(grade: float) -> float: - """ - Calculate hiking speed using Tobler's off-path function. - - speed_kmh = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) - - Peak speed is ~3.6 km/h at grade = -0.05 (slight downhill). - """ - return TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * math.exp(-3.5 * abs(grade + 0.05)) - - -def compute_cost_grid( - elevation: np.ndarray, - cell_size_m: float, - cell_size_lat_m: float = None, - cell_size_lon_m: float = None -) -> np.ndarray: - """ - Compute isotropic travel cost grid from elevation data. - - Each cell's cost represents the time (in seconds) to traverse that cell, - based on the average slope from neighboring cells. - """ - if cell_size_lat_m is None: - cell_size_lat_m = cell_size_m - if cell_size_lon_m is None: - cell_size_lon_m = cell_size_m - - rows, cols = elevation.shape - - # Compute gradients in both directions - dy = np.zeros_like(elevation) - dx = np.zeros_like(elevation) - - # Central differences for interior, forward/backward at edges - dy[1:-1, :] = (elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m) - dy[0, :] = (elevation[0, :] - elevation[1, :]) / cell_size_lat_m - dy[-1, :] = (elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m - - dx[:, 1:-1] = (elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m) - dx[:, 0] = (elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m - dx[:, -1] = (elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m - - # Compute slope magnitude (grade = rise/run) - grade_magnitude = np.sqrt(dx**2 + dy**2) - - # Convert to slope angle in degrees - slope_deg = np.degrees(np.arctan(grade_magnitude)) - - # Compute speed for each cell using Tobler function - speed_kmh = TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * np.exp(-3.5 * np.abs(grade_magnitude + 0.05)) - - # Convert speed to time cost (seconds to traverse one cell) - avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2 - cost = avg_cell_size * 3.6 / speed_kmh - - # Set impassable cells (slope > MAX_SLOPE_DEG) to infinity - cost[slope_deg > MAX_SLOPE_DEG] = np.inf - - # Handle NaN elevations (no data) - cost[np.isnan(elevation)] = np.inf - - return cost - - -if __name__ == "__main__": - print("Testing Tobler speed function:") - for grade in [-0.3, -0.1, -0.05, 0.0, 0.05, 0.1, 0.3]: - speed = tobler_speed(grade) - print(f" Grade {grade:+.2f}: {speed:.2f} km/h") - - print("\nTesting cost grid computation:") - elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10 - cost = compute_cost_grid(elev, cell_size_m=30.0) - print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m") - print(f" Cost range: {cost[~np.isinf(cost)].min():.1f} - {cost[~np.isinf(cost)].max():.1f} s") +""" +Tobler off-path hiking cost function for OFFROUTE. + +Computes travel time cost based on terrain slope using Tobler's +hiking function with off-trail penalty. Optionally applies friction +multipliers from land cover data. +""" +import math +import numpy as np +from typing import Optional + +# Maximum passable slope in degrees +MAX_SLOPE_DEG = 40.0 + +# Tobler off-path parameters +TOBLER_BASE_SPEED = 6.0 +TOBLER_OFF_TRAIL_MULT = 0.6 + + +def tobler_speed(grade: float) -> float: + """ + Calculate hiking speed using Tobler's off-path function. + + speed_kmh = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) + + Peak speed is ~3.6 km/h at grade = -0.05 (slight downhill). + """ + return TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * math.exp(-3.5 * abs(grade + 0.05)) + + +def compute_cost_grid( + elevation: np.ndarray, + cell_size_m: float, + cell_size_lat_m: float = None, + cell_size_lon_m: float = None, + friction: Optional[np.ndarray] = None +) -> np.ndarray: + """ + Compute isotropic travel cost grid from elevation data. + + Each cell's cost represents the time (in seconds) to traverse that cell, + based on the average slope from neighboring cells. + + Args: + elevation: 2D array of elevation values in meters + cell_size_m: Average cell size in meters + cell_size_lat_m: Cell size in latitude direction (optional) + cell_size_lon_m: Cell size in longitude direction (optional) + friction: Optional 2D array of friction multipliers. + Values should be float (1.0 = baseline, 2.0 = 2x slower). + np.inf marks impassable cells. + If None, no friction is applied (backward compatible). + + Returns: + 2D array of travel cost in seconds per cell. + np.inf for impassable cells. + """ + if cell_size_lat_m is None: + cell_size_lat_m = cell_size_m + if cell_size_lon_m is None: + cell_size_lon_m = cell_size_m + + rows, cols = elevation.shape + + # Compute gradients in both directions + dy = np.zeros_like(elevation) + dx = np.zeros_like(elevation) + + # Central differences for interior, forward/backward at edges + dy[1:-1, :] = (elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m) + dy[0, :] = (elevation[0, :] - elevation[1, :]) / cell_size_lat_m + dy[-1, :] = (elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m + + dx[:, 1:-1] = (elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m) + dx[:, 0] = (elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m + dx[:, -1] = (elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m + + # Compute slope magnitude (grade = rise/run) + grade_magnitude = np.sqrt(dx**2 + dy**2) + + # Convert to slope angle in degrees + slope_deg = np.degrees(np.arctan(grade_magnitude)) + + # Compute speed for each cell using Tobler function + speed_kmh = TOBLER_OFF_TRAIL_MULT * TOBLER_BASE_SPEED * np.exp(-3.5 * np.abs(grade_magnitude + 0.05)) + + # Convert speed to time cost (seconds to traverse one cell) + avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2 + cost = avg_cell_size * 3.6 / speed_kmh + + # Set impassable cells (slope > MAX_SLOPE_DEG) to infinity + cost[slope_deg > MAX_SLOPE_DEG] = np.inf + + # Handle NaN elevations (no data) + cost[np.isnan(elevation)] = np.inf + + # Apply friction multipliers if provided + if friction is not None: + if friction.shape != elevation.shape: + raise ValueError( + f"Friction shape {friction.shape} does not match elevation shape {elevation.shape}" + ) + # Multiply cost by friction (inf * anything = inf, which is correct) + cost = cost * friction + + return cost + + +if __name__ == "__main__": + print("Testing Tobler speed function:") + for grade in [-0.3, -0.1, -0.05, 0.0, 0.05, 0.1, 0.3]: + speed = tobler_speed(grade) + print(f" Grade {grade:+.2f}: {speed:.2f} km/h") + + print("\nTesting cost grid computation (no friction):") + elev = np.arange(100).reshape(10, 10).astype(np.float32) * 10 + cost = compute_cost_grid(elev, cell_size_m=30.0) + print(f" Elevation range: {elev.min():.0f} - {elev.max():.0f} m") + finite = cost[~np.isinf(cost)] + if len(finite) > 0: + print(f" Cost range: {finite.min():.1f} - {finite.max():.1f} s") + else: + print(f" All cells impassable (test data too steep)") + + print("\nTesting cost grid with friction:") + elev = np.ones((10, 10), dtype=np.float32) * 1000 # flat terrain + friction = np.ones((10, 10), dtype=np.float32) * 1.5 # 1.5x friction + friction[5, 5] = np.inf # one impassable cell + cost = compute_cost_grid(elev, cell_size_m=30.0, friction=friction) + print(f" Base cost (flat, 30m cell): {30 * 3.6 / (0.6 * 6.0 * np.exp(-3.5 * 0.05)):.1f} s") + print(f" With 1.5x friction: {cost[0, 0]:.1f} s") + print(f" Impassable cells: {np.sum(np.isinf(cost))}") diff --git a/lib/offroute/friction.py b/lib/offroute/friction.py new file mode 100644 index 0000000..32df0c0 --- /dev/null +++ b/lib/offroute/friction.py @@ -0,0 +1,137 @@ +""" +Friction layer reader for OFFROUTE. + +Reads friction values from the WorldCover friction VRT and resamples +to match the elevation grid dimensions. +""" +import numpy as np +from pathlib import Path +from typing import Tuple, Optional + +try: + import rasterio + from rasterio.windows import from_bounds + from rasterio.enums import Resampling +except ImportError: + raise ImportError("rasterio is required for friction layer support") + +# Default path to the friction VRT +DEFAULT_FRICTION_PATH = Path("/mnt/nav/worldcover/friction/friction_conus.vrt") + + +class FrictionReader: + """Reader for WorldCover friction raster.""" + + def __init__(self, friction_path: Path = DEFAULT_FRICTION_PATH): + self.friction_path = friction_path + self._dataset = None + + def _open(self): + """Lazy open the dataset.""" + if self._dataset is None: + self._dataset = rasterio.open(self.friction_path) + return self._dataset + + def get_friction_grid( + self, + south: float, + north: float, + west: float, + east: float, + target_shape: Tuple[int, int] + ) -> np.ndarray: + """ + Get friction values for a bounding box, resampled to target shape. + + Args: + south, north, west, east: Bounding box coordinates + target_shape: (rows, cols) to resample to (matches elevation grid) + + Returns: + np.ndarray of uint8 friction values, same shape as target_shape. + Values: 10-40 = friction multiplier (divide by 10) + 255 = impassable + 0 = nodata (treat as impassable) + """ + ds = self._open() + + # Create a window from the bounding box + window = from_bounds(west, south, east, north, ds.transform) + + # Read with resampling to target shape + # Use nearest neighbor for categorical data + friction = ds.read( + 1, + window=window, + out_shape=target_shape, + resampling=Resampling.nearest + ) + + return friction + + def sample_point(self, lat: float, lon: float) -> int: + """Sample friction value at a single point.""" + ds = self._open() + + # Get pixel coordinates + row, col = ds.index(lon, lat) + + # Check bounds + if row < 0 or row >= ds.height or col < 0 or col >= ds.width: + return 0 # Out of bounds = nodata + + # Read single pixel + window = rasterio.windows.Window(col, row, 1, 1) + value = ds.read(1, window=window) + return int(value[0, 0]) + + def close(self): + """Close the dataset.""" + if self._dataset is not None: + self._dataset.close() + self._dataset = None + + +def friction_to_multiplier(friction: np.ndarray) -> np.ndarray: + """ + Convert friction values to cost multipliers. + + Args: + friction: uint8 array of friction values + + Returns: + float32 array of multipliers. + Values 10-40 become 1.0-4.0 (divide by 10). + Values 0 or 255 become np.inf (impassable). + """ + multiplier = friction.astype(np.float32) / 10.0 + + # Mark impassable cells + multiplier[friction == 0] = np.inf # nodata + multiplier[friction == 255] = np.inf # water/impassable + + return multiplier + + +if __name__ == "__main__": + print("Testing FrictionReader...") + + reader = FrictionReader() + + # Test point sampling - Murtaugh Lake (should be water = 255) + lake_lat, lake_lon = 42.47, -114.15 + lake_friction = reader.sample_point(lake_lat, lake_lon) + print(f"Murtaugh Lake ({lake_lat}, {lake_lon}): friction = {lake_friction}") + print(f" Expected: 255 (water/impassable)") + + # Test grid read for small bbox + friction = reader.get_friction_grid( + south=42.4, north=42.5, west=-114.2, east=-114.1, + target_shape=(100, 100) + ) + print(f"\nGrid test shape: {friction.shape}") + print(f"Unique values: {np.unique(friction)}") + print(f"Water cells (255): {np.sum(friction == 255)}") + + reader.close() + print("\nFrictionReader test complete.") diff --git a/lib/offroute/prototype.py b/lib/offroute/prototype.py index 0790a32..9822021 100755 --- a/lib/offroute/prototype.py +++ b/lib/offroute/prototype.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 """ -OFFROUTE Phase O1 Prototype +OFFROUTE Phase O2b Prototype -Validates the PMTiles decoder, Tobler cost function, and MCP pathfinder -on a real Idaho bounding box. +Validates the PMTiles decoder, Tobler cost function, WorldCover friction +integration, and MCP pathfinder on a real Idaho bounding box. + +Now includes friction layer to avoid water bodies like Murtaugh Lake. Test bbox (four Idaho towns as corners): SW: Rogerson, ID (~42.21, -114.60) @@ -25,6 +27,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from lib.offroute.dem import DEMReader from lib.offroute.cost import compute_cost_grid +from lib.offroute.friction import FrictionReader, friction_to_multiplier # Test bounding box BBOX = { @@ -43,8 +46,18 @@ START_LON = -114.50 END_LAT = 42.52 END_LON = -113.85 -# Output file -OUTPUT_PATH = Path("/opt/recon/data/offroute-test.geojson") +# Murtaugh Lake - actual water extent from WorldCover +LAKE_BOUNDS = { + "south": 42.44, + "north": 42.50, + "west": -114.20, + "east": -114.10, +} +LAKE_CENTER = (42.465, -114.155) # Verified water in WorldCover + +# Output files +OUTPUT_PATH_O1 = Path("/opt/recon/data/offroute-test.geojson") +OUTPUT_PATH_FRICTION = Path("/opt/recon/data/offroute-test-friction.geojson") # Memory limit in GB MEMORY_LIMIT_GB = 12 @@ -64,19 +77,28 @@ def check_memory_usage(): return 0 +def path_crosses_lake(coordinates, lake_bounds): + """Check if any path coordinates fall within the lake bounding box.""" + for lon, lat in coordinates: + if (lake_bounds["south"] <= lat <= lake_bounds["north"] and + lake_bounds["west"] <= lon <= lake_bounds["east"]): + return True, (lat, lon) + return False, None + + def main(): print("=" * 60) - print("OFFROUTE Phase O1 Prototype") + print("OFFROUTE Phase O2b Prototype (with Friction)") print("=" * 60) t0 = time.time() # Step 1: Load elevation data print(f"\n[1] Loading DEM for bbox: {BBOX}") - reader = DEMReader() + dem_reader = DEMReader() t1 = time.time() - elevation, meta = reader.get_elevation_grid( + elevation, meta = dem_reader.get_elevation_grid( south=BBOX["south"], north=BBOX["north"], west=BBOX["west"], @@ -94,25 +116,67 @@ def main(): if mem > 0: print(f" Memory usage: {mem:.1f} GB") - # Step 2: Compute cost grid - print(f"\n[2] Computing Tobler cost grid...") + # Step 2: Load friction data + print(f"\n[2] Loading WorldCover friction layer...") + t2a = time.time() + + friction_reader = FrictionReader() + + # Validate lake is marked as impassable + lake_friction = friction_reader.sample_point(LAKE_CENTER[0], LAKE_CENTER[1]) + print(f" Murtaugh Lake center ({LAKE_CENTER[0]}, {LAKE_CENTER[1]}): friction = {lake_friction}") + if lake_friction != 255: + print(f" WARNING: Lake not marked as water (expected 255, got {lake_friction})") + else: + print(f" Lake correctly marked as impassable (255)") + + # Load friction grid matching elevation shape + friction_raw = friction_reader.get_friction_grid( + south=BBOX["south"], + north=BBOX["north"], + west=BBOX["west"], + east=BBOX["east"], + target_shape=elevation.shape + ) + t2b = time.time() + + # Convert to multipliers + friction_mult = friction_to_multiplier(friction_raw) + + impassable_count = np.sum(np.isinf(friction_mult)) + print(f" Friction grid shape: {friction_raw.shape}") + print(f" Unique friction values: {np.unique(friction_raw[friction_raw > 0])}") + print(f" Impassable cells (water/nodata): {impassable_count:,} ({100*impassable_count/friction_raw.size:.1f}%)") + print(f" Load time: {t2b - t2a:.1f}s") + + mem = check_memory_usage() + if mem > 0: + print(f" Memory usage: {mem:.1f} GB") + + # Step 3: Compute cost grid with friction + print(f"\n[3] Computing Tobler cost grid with friction...") t3 = time.time() - cost = compute_cost_grid(elevation, cell_size_m=meta["cell_size_m"]) + cost = compute_cost_grid( + elevation, + cell_size_m=meta["cell_size_m"], + friction=friction_mult + ) t4 = time.time() finite_cost = cost[~np.isinf(cost)] + total_impassable = np.sum(np.isinf(cost)) print(f" Cost range: {finite_cost.min():.1f} - {finite_cost.max():.1f} s/cell") - print(f" Impassable cells: {np.sum(np.isinf(cost)):,} ({100*np.sum(np.isinf(cost))/cost.size:.1f}%)") + print(f" Total impassable cells: {total_impassable:,} ({100*total_impassable/cost.size:.1f}%)") print(f" Compute time: {t4 - t3:.1f}s") mem = check_memory_usage() if mem > 0: print(f" Memory usage: {mem:.1f} GB") - # Step 3: Convert start/end to pixel coordinates - print(f"\n[3] Converting coordinates...") - start_row, start_col = reader.latlon_to_pixel(START_LAT, START_LON, meta) - end_row, end_col = reader.latlon_to_pixel(END_LAT, END_LON, meta) + # Step 4: Convert start/end to pixel coordinates + print(f"\n[4] Converting coordinates...") + start_row, start_col = dem_reader.latlon_to_pixel(START_LAT, START_LON, meta) + end_row, end_col = dem_reader.latlon_to_pixel(END_LAT, END_LON, meta) print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})") print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})") @@ -131,21 +195,16 @@ def main(): print(f" Start elevation: {start_elev:.0f} m") print(f" End elevation: {end_elev:.0f} m") - # Step 4: Run MCP pathfinder - print(f"\n[4] Running MCP_Geometric pathfinder...") + # Step 5: Run MCP pathfinder + print(f"\n[5] Running MCP_Geometric pathfinder...") t5 = time.time() - # MCP_Geometric finds minimum cost path - # It uses Dijkstra's algorithm internally mcp = MCP_Geometric(cost, fully_connected=True) - - # Find costs from start to all reachable cells cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)]) t6 = time.time() print(f" Dijkstra completed in {t6 - t5:.1f}s") - # Get cost to reach end point end_cost = cumulative_costs[end_row, end_col] print(f" Total cost to endpoint: {end_cost:.0f} seconds ({end_cost/60:.1f} minutes)") @@ -153,7 +212,6 @@ def main(): print("ERROR: No path found to endpoint (blocked by impassable terrain)") sys.exit(1) - # Trace back the path t7 = time.time() path_indices = mcp.traceback((end_row, end_col)) t8 = time.time() @@ -165,25 +223,27 @@ def main(): if mem > 0: print(f" Memory usage: {mem:.1f} GB") - # Step 5: Convert path to coordinates and compute stats - print(f"\n[5] Converting path to GeoJSON...") + # Step 6: Convert path to coordinates and compute stats + print(f"\n[6] Converting path to GeoJSON...") coordinates = [] elevations = [] + friction_values = [] for row, col in path_indices: - lat, lon = reader.pixel_to_latlon(row, col, meta) + lat, lon = dem_reader.pixel_to_latlon(row, col, meta) elev = elevation[row, col] - coordinates.append([lon, lat]) # GeoJSON is [lon, lat] + fric = friction_raw[row, col] + coordinates.append([lon, lat]) elevations.append(elev) + friction_values.append(fric) # Compute path distance total_distance_m = 0 for i in range(1, len(coordinates)): lon1, lat1 = coordinates[i-1] lon2, lat2 = coordinates[i] - # Haversine formula - R = 6371000 # Earth radius in meters + R = 6371000 dlat = np.radians(lat2 - lat1) dlon = np.radians(lon2 - lon1) a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2 @@ -196,11 +256,16 @@ def main(): elev_gain = np.sum(elev_diff[elev_diff > 0]) elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0])) + # Friction stats along path + fric_arr = np.array(friction_values) + valid_fric = fric_arr[(fric_arr > 0) & (fric_arr < 255)] + # Build GeoJSON geojson = { "type": "Feature", "properties": { - "type": "offroute_prototype", + "type": "offroute_prototype_friction", + "phase": "O2b", "start": {"lat": START_LAT, "lon": START_LON}, "end": {"lat": END_LAT, "lon": END_LON}, "total_time_seconds": float(end_cost), @@ -211,6 +276,9 @@ def main(): "elevation_loss_m": float(elev_loss), "min_elevation_m": float(np.min(elev_arr)), "max_elevation_m": float(np.max(elev_arr)), + "friction_min": int(valid_fric.min()) if len(valid_fric) > 0 else 0, + "friction_max": int(valid_fric.max()) if len(valid_fric) > 0 else 0, + "friction_mean": float(valid_fric.mean()) if len(valid_fric) > 0 else 0, "cell_count": len(path_indices), "cell_size_m": meta["cell_size_m"], }, @@ -221,15 +289,15 @@ def main(): } # Write output - OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) - with open(OUTPUT_PATH, "w") as f: + OUTPUT_PATH_FRICTION.parent.mkdir(parents=True, exist_ok=True) + with open(OUTPUT_PATH_FRICTION, "w") as f: json.dump(geojson, f, indent=2) t_end = time.time() # Final report print(f"\n" + "=" * 60) - print("RESULTS") + print("RESULTS (Phase O2b with Friction)") print("=" * 60) print(f"Start: ({START_LAT:.4f}, {START_LON:.4f})") print(f"End: ({END_LAT:.4f}, {END_LON:.4f})") @@ -238,11 +306,13 @@ def main(): print(f"Elevation gain: {elev_gain:.0f} m") print(f"Elevation loss: {elev_loss:.0f} m") print(f"Elevation range: {np.min(elev_arr):.0f} - {np.max(elev_arr):.0f} m") + if len(valid_fric) > 0: + print(f"Friction (path): min={valid_fric.min()}, max={valid_fric.max()}, mean={valid_fric.mean():.1f}") print(f"Path cells: {len(path_indices):,}") print(f"Wall time: {t_end - t0:.1f}s") - print(f"\nOutput saved to: {OUTPUT_PATH}") + print(f"\nOutput saved to: {OUTPUT_PATH_FRICTION}") - # Validation checks + # Validation print(f"\n" + "-" * 60) print("VALIDATION") print("-" * 60) @@ -258,15 +328,57 @@ def main(): is_nontrivial = len(path_indices) > 10 and total_distance_m > 1000 print(f"Path is non-trivial: {'PASS' if is_nontrivial else 'FAIL'}") - # Check it's not a straight line (measure sinuosity) + # Check sinuosity straight_line_dist = np.sqrt( (coordinates[-1][0] - coordinates[0][0])**2 + (coordinates[-1][1] - coordinates[0][1])**2 - ) * 111000 # rough degrees to meters + ) * 111000 sinuosity = total_distance_m / max(straight_line_dist, 1) print(f"Sinuosity: {sinuosity:.2f} (>1.0 means path curves around obstacles)") - reader.close() + # CRITICAL: Check no water cells (friction=255) on path + # This is the authoritative test - friction layer prevents water crossings + print(f"\n--- Water Avoidance Check ---") + water_on_path = np.sum(fric_arr == 255) + if water_on_path > 0: + print(f"FAIL: Path crosses {water_on_path} water cells (friction=255)") + sys.exit(1) + else: + print(f"PASS: No water cells (friction=255) on path") + + # Informational: Check if path goes through lake bounding box + # Path may go through land cells within the bbox, which is fine + print(f"\n--- Lake Bounding Box Check (informational) ---") + print(f"Murtaugh Lake bounds: {LAKE_BOUNDS}") + crosses_lake, crossing_point = path_crosses_lake(coordinates, LAKE_BOUNDS) + if crosses_lake: + print(f"INFO: Path passes through lake bbox at {crossing_point}") + print(f" (This is OK if friction check passed - path uses land cells)") + else: + print(f"PASS: Path does not enter lake bounding box") + + # Compare with Phase O1 if available + print(f"\n" + "-" * 60) + print("COMPARISON: Phase O1 vs O2b") + print("-" * 60) + + if OUTPUT_PATH_O1.exists(): + with open(OUTPUT_PATH_O1) as f: + o1_data = json.load(f) + o1_props = o1_data["properties"] + + print(f"{'Metric':<20} {'O1 (no friction)':<20} {'O2b (with friction)':<20}") + print("-" * 60) + print(f"{'Distance (km)':<20} {o1_props['total_distance_km']:<20.2f} {total_distance_m/1000:<20.2f}") + print(f"{'Effort (min)':<20} {o1_props['total_time_minutes']:<20.1f} {end_cost/60:<20.1f}") + print(f"{'Cell count':<20} {o1_props['cell_count']:<20} {len(path_indices):<20}") + print(f"{'Elev gain (m)':<20} {o1_props['elevation_gain_m']:<20.0f} {elev_gain:<20.0f}") + else: + print(f"Phase O1 output not found at {OUTPUT_PATH_O1}") + print(f"Run the O1 prototype first to enable comparison.") + + dem_reader.close() + friction_reader.close() print("\nPrototype completed successfully.")