mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 14:44:54 +02:00
feat(offroute): Phase O2b — WorldCover friction integration, lake avoidance validated
- New friction.py: reads WorldCover friction VRT, resamples to match elevation grid, provides point sampling for validation - Modified cost.py: accepts optional friction array, multiplies Tobler time cost by friction multiplier, inf for water/nodata (255/0) - Modified prototype.py: loads friction layer, passes to cost function, validates path avoids water cells (friction=255) Validated on Idaho test bbox: - Path avoids Murtaugh Lake (no water cells on path) - Friction along path: min=10, max=20, mean=10.2 - Effort increased 3.4% vs Phase O1 due to friction multipliers Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
f2a0f81580
commit
26d4bc7478
3 changed files with 420 additions and 133 deletions
137
lib/offroute/friction.py
Normal file
137
lib/offroute/friction.py
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
"""
|
||||
Friction layer reader for OFFROUTE.
|
||||
|
||||
Reads friction values from the WorldCover friction VRT and resamples
|
||||
to match the elevation grid dimensions.
|
||||
"""
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional
|
||||
|
||||
try:
|
||||
import rasterio
|
||||
from rasterio.windows import from_bounds
|
||||
from rasterio.enums import Resampling
|
||||
except ImportError:
|
||||
raise ImportError("rasterio is required for friction layer support")
|
||||
|
||||
# Default path to the friction VRT
|
||||
DEFAULT_FRICTION_PATH = Path("/mnt/nav/worldcover/friction/friction_conus.vrt")
|
||||
|
||||
|
||||
class FrictionReader:
|
||||
"""Reader for WorldCover friction raster."""
|
||||
|
||||
def __init__(self, friction_path: Path = DEFAULT_FRICTION_PATH):
|
||||
self.friction_path = friction_path
|
||||
self._dataset = None
|
||||
|
||||
def _open(self):
|
||||
"""Lazy open the dataset."""
|
||||
if self._dataset is None:
|
||||
self._dataset = rasterio.open(self.friction_path)
|
||||
return self._dataset
|
||||
|
||||
def get_friction_grid(
|
||||
self,
|
||||
south: float,
|
||||
north: float,
|
||||
west: float,
|
||||
east: float,
|
||||
target_shape: Tuple[int, int]
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Get friction values for a bounding box, resampled to target shape.
|
||||
|
||||
Args:
|
||||
south, north, west, east: Bounding box coordinates
|
||||
target_shape: (rows, cols) to resample to (matches elevation grid)
|
||||
|
||||
Returns:
|
||||
np.ndarray of uint8 friction values, same shape as target_shape.
|
||||
Values: 10-40 = friction multiplier (divide by 10)
|
||||
255 = impassable
|
||||
0 = nodata (treat as impassable)
|
||||
"""
|
||||
ds = self._open()
|
||||
|
||||
# Create a window from the bounding box
|
||||
window = from_bounds(west, south, east, north, ds.transform)
|
||||
|
||||
# Read with resampling to target shape
|
||||
# Use nearest neighbor for categorical data
|
||||
friction = ds.read(
|
||||
1,
|
||||
window=window,
|
||||
out_shape=target_shape,
|
||||
resampling=Resampling.nearest
|
||||
)
|
||||
|
||||
return friction
|
||||
|
||||
def sample_point(self, lat: float, lon: float) -> int:
|
||||
"""Sample friction value at a single point."""
|
||||
ds = self._open()
|
||||
|
||||
# Get pixel coordinates
|
||||
row, col = ds.index(lon, lat)
|
||||
|
||||
# Check bounds
|
||||
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
|
||||
return 0 # Out of bounds = nodata
|
||||
|
||||
# Read single pixel
|
||||
window = rasterio.windows.Window(col, row, 1, 1)
|
||||
value = ds.read(1, window=window)
|
||||
return int(value[0, 0])
|
||||
|
||||
def close(self):
|
||||
"""Close the dataset."""
|
||||
if self._dataset is not None:
|
||||
self._dataset.close()
|
||||
self._dataset = None
|
||||
|
||||
|
||||
def friction_to_multiplier(friction: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Convert friction values to cost multipliers.
|
||||
|
||||
Args:
|
||||
friction: uint8 array of friction values
|
||||
|
||||
Returns:
|
||||
float32 array of multipliers.
|
||||
Values 10-40 become 1.0-4.0 (divide by 10).
|
||||
Values 0 or 255 become np.inf (impassable).
|
||||
"""
|
||||
multiplier = friction.astype(np.float32) / 10.0
|
||||
|
||||
# Mark impassable cells
|
||||
multiplier[friction == 0] = np.inf # nodata
|
||||
multiplier[friction == 255] = np.inf # water/impassable
|
||||
|
||||
return multiplier
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Testing FrictionReader...")
|
||||
|
||||
reader = FrictionReader()
|
||||
|
||||
# Test point sampling - Murtaugh Lake (should be water = 255)
|
||||
lake_lat, lake_lon = 42.47, -114.15
|
||||
lake_friction = reader.sample_point(lake_lat, lake_lon)
|
||||
print(f"Murtaugh Lake ({lake_lat}, {lake_lon}): friction = {lake_friction}")
|
||||
print(f" Expected: 255 (water/impassable)")
|
||||
|
||||
# Test grid read for small bbox
|
||||
friction = reader.get_friction_grid(
|
||||
south=42.4, north=42.5, west=-114.2, east=-114.1,
|
||||
target_shape=(100, 100)
|
||||
)
|
||||
print(f"\nGrid test shape: {friction.shape}")
|
||||
print(f"Unique values: {np.unique(friction)}")
|
||||
print(f"Water cells (255): {np.sum(friction == 255)}")
|
||||
|
||||
reader.close()
|
||||
print("\nFrictionReader test complete.")
|
||||
Loading…
Add table
Add a link
Reference in a new issue