mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 06:34:40 +02:00
174 lines
5.5 KiB
Python
174 lines
5.5 KiB
Python
|
|
"""
|
|||
|
|
Trail corridor reader for OFFROUTE.
|
|||
|
|
|
|||
|
|
Provides access to the OSM-derived trail raster for pathfinding.
|
|||
|
|
Trail values replace WorldCover friction where trails exist.
|
|||
|
|
|
|||
|
|
Raster values:
|
|||
|
|
0 = no trail (use WorldCover friction)
|
|||
|
|
5 = road (0.1× friction)
|
|||
|
|
15 = track (0.3× friction)
|
|||
|
|
25 = foot trail (0.5× friction)
|
|||
|
|
"""
|
|||
|
|
import numpy as np
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Tuple, Optional
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
import rasterio
|
|||
|
|
from rasterio.windows import from_bounds
|
|||
|
|
from rasterio.enums import Resampling
|
|||
|
|
except ImportError:
|
|||
|
|
raise ImportError("rasterio is required for trails layer support")
|
|||
|
|
|
|||
|
|
# Default path to the trails raster
|
|||
|
|
DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif")
|
|||
|
|
|
|||
|
|
# Trail value to friction multiplier mapping
|
|||
|
|
TRAIL_FRICTION_MAP = {
|
|||
|
|
5: 0.1, # road
|
|||
|
|
15: 0.3, # track
|
|||
|
|
25: 0.5, # foot trail
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TrailReader:
|
|||
|
|
"""Reader for OSM-derived trail corridor raster."""
|
|||
|
|
|
|||
|
|
def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH):
|
|||
|
|
self.trails_path = trails_path
|
|||
|
|
self._dataset = None
|
|||
|
|
|
|||
|
|
def _open(self):
|
|||
|
|
"""Lazy open the dataset."""
|
|||
|
|
if self._dataset is None:
|
|||
|
|
if not self.trails_path.exists():
|
|||
|
|
raise FileNotFoundError(
|
|||
|
|
f"Trails raster not found at {self.trails_path}. "
|
|||
|
|
f"Run the Phase B rasterization script first."
|
|||
|
|
)
|
|||
|
|
self._dataset = rasterio.open(self.trails_path)
|
|||
|
|
return self._dataset
|
|||
|
|
|
|||
|
|
def get_trails_grid(
|
|||
|
|
self,
|
|||
|
|
south: float,
|
|||
|
|
north: float,
|
|||
|
|
west: float,
|
|||
|
|
east: float,
|
|||
|
|
target_shape: Tuple[int, int]
|
|||
|
|
) -> np.ndarray:
|
|||
|
|
"""
|
|||
|
|
Get trail values for a bounding box, resampled to target shape.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
south, north, west, east: Bounding box coordinates (WGS84)
|
|||
|
|
target_shape: (rows, cols) to resample to (matches elevation grid)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
np.ndarray of uint8 trail values:
|
|||
|
|
0 = no trail
|
|||
|
|
5 = road (0.1× friction)
|
|||
|
|
15 = track (0.3× friction)
|
|||
|
|
25 = foot trail (0.5× friction)
|
|||
|
|
"""
|
|||
|
|
ds = self._open()
|
|||
|
|
|
|||
|
|
# Create a window from the bounding box
|
|||
|
|
window = from_bounds(west, south, east, north, ds.transform)
|
|||
|
|
|
|||
|
|
# Read with resampling to target shape
|
|||
|
|
# Use nearest neighbor to preserve discrete values
|
|||
|
|
trails = ds.read(
|
|||
|
|
1,
|
|||
|
|
window=window,
|
|||
|
|
out_shape=target_shape,
|
|||
|
|
resampling=Resampling.nearest
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return trails
|
|||
|
|
|
|||
|
|
def sample_point(self, lat: float, lon: float) -> int:
|
|||
|
|
"""Sample trail value at a single point."""
|
|||
|
|
ds = self._open()
|
|||
|
|
|
|||
|
|
# Get pixel coordinates
|
|||
|
|
row, col = ds.index(lon, lat)
|
|||
|
|
|
|||
|
|
# Check bounds
|
|||
|
|
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
|
|||
|
|
return 0 # Out of bounds = no trail
|
|||
|
|
|
|||
|
|
# Read single pixel
|
|||
|
|
window = rasterio.windows.Window(col, row, 1, 1)
|
|||
|
|
value = ds.read(1, window=window)
|
|||
|
|
return int(value[0, 0])
|
|||
|
|
|
|||
|
|
def close(self):
|
|||
|
|
"""Close the dataset."""
|
|||
|
|
if self._dataset is not None:
|
|||
|
|
self._dataset.close()
|
|||
|
|
self._dataset = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|||
|
|
"""
|
|||
|
|
Convert trail values to friction multipliers.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
trails: uint8 array of trail values (0, 5, 15, or 25)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Tuple of:
|
|||
|
|
- friction: float32 array of friction multipliers
|
|||
|
|
- has_trail: bool array indicating where trails exist
|
|||
|
|
"""
|
|||
|
|
friction = np.ones_like(trails, dtype=np.float32)
|
|||
|
|
has_trail = trails > 0
|
|||
|
|
|
|||
|
|
# Apply friction values where trails exist
|
|||
|
|
friction[trails == 5] = 0.1 # road
|
|||
|
|
friction[trails == 15] = 0.3 # track
|
|||
|
|
friction[trails == 25] = 0.5 # foot trail
|
|||
|
|
|
|||
|
|
return friction, has_trail
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
print("Testing TrailReader...")
|
|||
|
|
|
|||
|
|
if not DEFAULT_TRAILS_PATH.exists():
|
|||
|
|
print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}")
|
|||
|
|
print("Run Phase B rasterization first.")
|
|||
|
|
exit(1)
|
|||
|
|
|
|||
|
|
reader = TrailReader()
|
|||
|
|
|
|||
|
|
# Test point sampling - Twin Falls downtown (should have roads)
|
|||
|
|
test_lat, test_lon = 42.563, -114.461
|
|||
|
|
trail_value = reader.sample_point(test_lat, test_lon)
|
|||
|
|
print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}")
|
|||
|
|
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown")
|
|||
|
|
print(f" Type: {label}")
|
|||
|
|
|
|||
|
|
# Test grid read for test bbox
|
|||
|
|
trails = reader.get_trails_grid(
|
|||
|
|
south=42.21, north=42.60, west=-114.76, east=-113.79,
|
|||
|
|
target_shape=(400, 1000)
|
|||
|
|
)
|
|||
|
|
print(f"\nGrid test shape: {trails.shape}")
|
|||
|
|
|
|||
|
|
unique, counts = np.unique(trails, return_counts=True)
|
|||
|
|
print("Value distribution:")
|
|||
|
|
for v, c in zip(unique, counts):
|
|||
|
|
pct = 100 * c / trails.size
|
|||
|
|
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})")
|
|||
|
|
print(f" {label}: {c:,} pixels ({pct:.2f}%)")
|
|||
|
|
|
|||
|
|
# Test conversion to friction
|
|||
|
|
friction, has_trail = trails_to_friction(trails)
|
|||
|
|
print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%")
|
|||
|
|
print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}")
|
|||
|
|
|
|||
|
|
reader.close()
|
|||
|
|
print("\nTrailReader test complete.")
|