recon/lib/offroute/trails.py

174 lines
5.5 KiB
Python
Raw Permalink Normal View History

"""
Trail corridor reader for OFFROUTE.
Provides access to the OSM-derived trail raster for pathfinding.
Trail values replace WorldCover friction where trails exist.
Raster values:
0 = no trail (use WorldCover friction)
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for trails layer support")
# Default path to the trails raster
DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif")
# Trail value to friction multiplier mapping
TRAIL_FRICTION_MAP = {
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
}
class TrailReader:
"""Reader for OSM-derived trail corridor raster."""
def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH):
self.trails_path = trails_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.trails_path.exists():
raise FileNotFoundError(
f"Trails raster not found at {self.trails_path}. "
f"Run the Phase B rasterization script first."
)
self._dataset = rasterio.open(self.trails_path)
return self._dataset
def get_trails_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get trail values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 trail values:
0 = no trail
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor to preserve discrete values
trails = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return trails
def sample_point(self, lat: float, lon: float) -> int:
"""Sample trail value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = no trail
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert trail values to friction multipliers.
Args:
trails: uint8 array of trail values (0, 5, 15, or 25)
Returns:
Tuple of:
- friction: float32 array of friction multipliers
- has_trail: bool array indicating where trails exist
"""
friction = np.ones_like(trails, dtype=np.float32)
has_trail = trails > 0
# Apply friction values where trails exist
friction[trails == 5] = 0.1 # road
friction[trails == 15] = 0.3 # track
friction[trails == 25] = 0.5 # foot trail
return friction, has_trail
if __name__ == "__main__":
print("Testing TrailReader...")
if not DEFAULT_TRAILS_PATH.exists():
print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}")
print("Run Phase B rasterization first.")
exit(1)
reader = TrailReader()
# Test point sampling - Twin Falls downtown (should have roads)
test_lat, test_lon = 42.563, -114.461
trail_value = reader.sample_point(test_lat, test_lon)
print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}")
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown")
print(f" Type: {label}")
# Test grid read for test bbox
trails = reader.get_trails_grid(
south=42.21, north=42.60, west=-114.76, east=-113.79,
target_shape=(400, 1000)
)
print(f"\nGrid test shape: {trails.shape}")
unique, counts = np.unique(trails, return_counts=True)
print("Value distribution:")
for v, c in zip(unique, counts):
pct = 100 * c / trails.size
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})")
print(f" {label}: {c:,} pixels ({pct:.2f}%)")
# Test conversion to friction
friction, has_trail = trails_to_friction(trails)
print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%")
print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}")
reader.close()
print("\nTrailReader test complete.")