Compare commits

...

21 commits

Author SHA1 Message Date
c1ba1f8dc7 Merge feature/offroute: effort-based wilderness routing, PostGIS entry points, BLM trail filtering 2026-05-09 15:45:27 +00:00
a04c10ad55 offroute: wilderness maneuvers with bearing, elevation, grade
- Segment breaks on: bearing change >30°, grade category change, distance >0.5mi
- Grade categories: flat (0-2°), gentle (2-5°), moderate (5-10°), steep (10-15°), very steep (15°+)
- Distance formatting: feet with commas <1mi, miles with decimal ≥1mi
- Instruction format: Head {cardinal}, gaining/descending X ft ({grade} uphill/downhill) — {dist}

Co-Authored-By: Claude <noreply@anthropic.com>
2026-05-09 05:05:00 +00:00
d8f84ab55a offroute: revert off-network threshold to 10m
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-09 03:34:37 +00:00
b4e33eb048 offroute: PostGIS entry points with 100m densification and land_status tagging
- Migrate EntryPointIndex from SQLite to PostGIS (padus database)
- Densify highway LineStrings at 100m intervals via Shapely interpolate
- 2.94M entry points from 476k lines (4x more coverage)
- Tag each entry point with land_status via ST_Intersects against padus_sub
  - 1.64M public (56%), 1.30M unknown (44%)
- Add geography GIST index for fast radius queries (~25ms)
- Increase OFF_NETWORK_THRESHOLD_M from 10m to 50m for GPS accuracy
- PBF path and PostGIS DSN configurable via home.yaml

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-09 03:28:58 +00:00
05c24f95f6 offroute: tighten off-network threshold to 10m
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 23:27:06 +00:00
686b35710a api: add auto mode to offroute endpoint validation
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 22:37:49 +00:00
cf758476b4 offroute: add auto mode for standard driving routes
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:55:31 +00:00
87a4741b8d offroute: raise bbox limit to 2.0° (~220km coverage)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:19:04 +00:00
58347415bc offroute: bidirectional wilderness routing (all 4 scenarios)
Support all four routing scenarios:
  A: off-network → on-network (wilderness then Valhalla)
  B: off-network → off-network (wilderness, Valhalla, wilderness)
  C: on-network → off-network (Valhalla then wilderness)
  D: on-network → on-network (pure Valhalla passthrough)

Off-network detection via Valhalla /locate endpoint:
  - Snap distance > 500m = off-network

Key implementation details:
  - _locate_on_network() helper for network detection
  - route() dispatches to scenario-specific handlers
  - _pathfind_wilderness() extracted for reuse (runs MCP)
  - _valhalla_route() helper for network segments
  - _build_response() unifies GeoJSON output format

Memory management:
  - Sequential MCP runs for scenario B (not parallel)
  - gc.collect() after each MCP run
  - Bbox centered on wilderness origin, not distant destination

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:11:53 +00:00
ff0721c23e offroute: wilderness always uses foot mode for pathfinding
The wilderness segment now ALWAYS uses foot mode for MCP pathfinding.
The user's selected mode only affects:
1. Entry point selection (MODE_TO_VALID_HIGHWAYS filtering)
2. Valhalla costing for the network segment

This ensures vehicles can navigate through wilderness (on foot) to
reach roads, rather than failing when no vehicle-accessible path exists.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 19:03:31 +00:00
2252905986 feat(offroute): MVUM legal access — pathfinder integration + places panel API + boundary_mode control
MVUM Data Import:
- Downloaded USFS MVUM Roads (150,636 features) and Trails (28,741 features)
- Imported to navi.db as mvum_roads and mvum_trails tables
- Idaho coverage: ~8,994 roads and ~4,504 trails across 7 national forests
- Preserved all vehicle-class fields (ATV, MOTORCYCLE, HIGHCLEARANCEVEHICLE, etc.)
- Preserved seasonal date ranges (*_DATESOPEN fields)

New mvum.py module:
- MVUMReader class for querying MVUM data by bbox and nearest point
- parse_date_range() for seasonal date string parsing (MM/DD-MM/DD format)
- check_access() for determining open/closed status with date checking
- symbol_to_access() fallback when per-vehicle fields are null
- get_mvum_access_grid() for rasterizing MVUM to pathfinder grid

Cost function integration:
- Added mvum parameter to compute_cost_grid()
- MVUM closures respond to boundary_mode:
  * strict = impassable (np.inf)
  * pragmatic = 5x friction penalty
  * emergency = ignored entirely
- Foot mode skips MVUM (motor-vehicle specific)

Router integration:
- Loads MVUM access grid for motorized modes (mtb, atv, vehicle)
- Tracks mvum_closed_crossings in path summary

Places Panel API:
- GET /api/mvum?lat=XX&lon=XX&radius=50
- Returns MVUM feature with access status for all vehicle classes
- Includes seasonal date ranges, maintenance level, forest/district info
- GeoJSON geometry for map display

Validation:
- MVUM places endpoint tested with Sawtooth NF road
- All four modes validated with strict/pragmatic/emergency boundary modes
- Foot mode correctly ignores MVUM restrictions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:26:18 +00:00
bc463188d5 feat(offroute): Phase O4 — multi-mode cost functions (foot/mtb/atv/vehicle)
- Add ModeProfile dataclass for data-driven mode configuration
- Implement three speed functions:
  * Tobler off-path hiking (foot)
  * Herzog wheeled-transport polynomial (mtb/atv)
  * Linear speed degradation (vehicle)
- Add WildernessReader for PAD-US Des_Tp=WA wilderness areas
- Mode-specific terrain friction overrides:
  * Forest impassable for ATV/vehicle, high friction for MTB
  * Wetland/mangrove impassable for all wheeled modes
- Trail access rules:
  * Foot trails (value 25) impassable for ATV/vehicle
- Wilderness blocking for mtb/atv/vehicle modes
- Vehicle mode allows flat grassland/cropland traversal
- Memory optimization: limit entry points, constrain bbox size
- Update router to pass mode and wilderness to cost function
- Add vehicle to API mode validation

Validated all four modes with test route:
- foot: 0.46km off-network, 12.11km network, 89% on trail
- mtb: 0.47km off-network, 13.13km network, 90% on trail
- atv: 0.47km off-network, 12.81km network, 90% on trail
- vehicle: 0.46km off-network, 12.81km network, 89% on trail

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:11:56 +00:00
1a9dfc8f8d feat(offroute): Phase O3b — trail entry index, Valhalla stitching, /api/offroute endpoint
Phase A: Trail Entry Point Index
- Extract highway endpoints from idaho-latest.osm.pbf using osmium + ogr2ogr
- Store 740,430 entry points in /mnt/nav/navi.db (SQLite with spatial index)
- Entry points by class: service (271k), footway (152k), residential (146k),
  track (111k), path (26k), unclassified (16k), tertiary (9k), secondary (4k),
  primary (4k), bridleway (15)

Phase B: Pathfinder → Valhalla Stitching (router.py)
- OffrouteRouter orchestrates wilderness pathfinding + Valhalla on-network routing
- Queries entry points within 50km (expanding to 100km if needed)
- MCP pathfinder routes to nearest reachable entry point
- Calls Valhalla pedestrian/bicycle/auto costing for on-network segment
- Returns GeoJSON FeatureCollection with wilderness + network + combined segments

Phase C: Flask Endpoint
- POST /api/offroute with start/end coordinates, mode, boundary_mode
- Returns GeoJSON route with per-segment metadata and turn-by-turn maneuvers

Validated: 42.35,-114.30 → Twin Falls downtown
- Wilderness: 0.5km, 9min | Network: 36km, 413min | Total: ~421min
- 21 turn-by-turn instructions, segments connect at entry point

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 13:44:34 +00:00
3293cb4238 feat(offroute): Phase O3a — trail burn-in, pathfinder seeks trail corridors
Trail friction REPLACES land cover friction where trails exist:
- Road (value 5): 0.1× friction
- Track (value 15): 0.3× friction
- Foot trail (value 25): 0.5× friction

TrailReader loads /mnt/nav/worldcover/trails.tif rasterized from OSM highways.

Validation shows trail-seeking behavior:
- On-trail travel: 17.3% → 98.7%
- Effort time: 1047 min → 155 min (-85.2%)
- Path travels farther but stays on roads for speed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 07:26:25 +00:00
e0eedcedfd feat(offroute): Phase O2c — PAD-US barriers with three-mode boundary respect
- Add barriers.py: PAD-US raster reader + build_barriers_raster() function
- Rasterize PAD-US Pub_Access=XA (Closed) polygons to CONUS GeoTIFF
- Modify cost.py: boundary_mode parameter (strict/pragmatic/emergency)
  - strict: private land = impassable (np.inf)
  - pragmatic: private land = 5x friction penalty (default)
  - emergency: private land barriers ignored
- Modify prototype.py: three-way comparison output
- Output: padus_barriers.tif at /mnt/nav/worldcover/ (144MB, ~33m resolution)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 06:56:36 +00:00
26d4bc7478 feat(offroute): Phase O2b — WorldCover friction integration, lake avoidance validated
- New friction.py: reads WorldCover friction VRT, resamples to match
  elevation grid, provides point sampling for validation
- Modified cost.py: accepts optional friction array, multiplies Tobler
  time cost by friction multiplier, inf for water/nodata (255/0)
- Modified prototype.py: loads friction layer, passes to cost function,
  validates path avoids water cells (friction=255)

Validated on Idaho test bbox:
- Path avoids Murtaugh Lake (no water cells on path)
- Friction along path: min=10, max=20, mean=10.2
- Effort increased 3.4% vs Phase O1 due to friction multipliers

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 06:33:45 +00:00
f2a0f81580 feat(offroute): Phase O1 foundation — PMTiles decoder, Tobler cost, MCP pathfinder prototype
- dem.py: Terrarium-encoded PMTiles tile reader with LRU cache
  - Decodes WebP tiles from planet-dem.pmtiles
  - Stitches tiles into numpy elevation grids for arbitrary bboxes
  - Provides pixel-to-latlon coordinate conversion

- cost.py: Tobler off-path hiking cost function
  - speed = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) km/h
  - Max slope cutoff: 40 degrees → impassable
  - Returns time-to-traverse (seconds/cell) as cost metric

- prototype.py: Standalone validation on Idaho test bbox
  - 43km × 80km bbox (~17M cells at 14m resolution)
  - scikit-image MCP_Geometric Dijkstra pathfinder
  - Outputs GeoJSON LineString with path metadata
  - Validated: 61.6km path, 21.3 hours effort time

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-07 23:43:56 +00:00
227affca9d Merge fix/pdf-extraction-quality into master 2026-05-07 01:51:05 +00:00
fa456fecb1 Merge fix/zim-table-extraction into master 2026-05-07 01:51:05 +00:00
83a21854c3 fix: PDF extraction quality — word-boundary checks and layout mode
Adds _text_quality_ok() gate that replaces the bare 50-char length
check at each stage of the extraction fallback chain. Checks:
- Word-boundary ratio (≥60% of tokens must be real words)
- Concatenation ratio (lc→UC transitions must be <10% of word count)

When PyPDF2 default extraction fails quality check, retries with
space_width=100 for tighter word-boundary detection. This fixes
Haynes/workshop manuals where tight kerning produces concatenated
words like 'byMike' and 'oftheGuild'.

Also adds -layout flag to pdftotext subprocess calls for better
spatial awareness in the poppler fallback stage.

Note: PyPDF2 3.0.1 does not support layout=True parameter.
The space_width parameter serves the same purpose.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 01:36:23 +00:00
b741e217f6 fix: ZIM table extraction — pipe-delimited cells instead of concatenation
Pre-processes HTML tree before lxml .text_content() to prevent
element concatenation:
- <table> cells joined with ' | ' delimiter, rows with newlines
- <br> tags produce newlines
- <li> items get '- ' prefix and newline separation
- <dt>/<dd> definition list items get newline separation

Fixes ~868 mangled Qdrant points where table content was jammed
together (e.g. 'Freq51Primary1A==' instead of 'Freq51 | Primary | 1A==').

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 01:32:25 +00:00
13 changed files with 4507 additions and 13 deletions

View file

@ -6,13 +6,13 @@ profile: home
region_name: "North America"
tileset:
url: "/tiles/na.pmtiles"
url: "/tiles/planet/current.pmtiles"
bounds: [-168, 14, -52, 72]
max_zoom: 15
attribution: "Protomaps © OSM"
tileset_hillshade:
url: "/tiles/hillshade-na.pmtiles"
url: "/tiles/planet-dem.pmtiles"
encoding: "terrarium"
max_zoom: 12
@ -33,14 +33,14 @@ services:
features:
has_nominatim_details: true
has_kiwix_wiki: false
has_kiwix_wiki: true
has_hillshade: true
has_3d_terrain: false
has_traffic_overlay: true
has_landclass: true
has_public_lands_layer: true
has_contours: true
has_contours_test: true
has_contours_test: false
has_contours_test_10ft: false
has_address_book_write: false
has_overture_enrichment: true
@ -48,7 +48,16 @@ features:
has_contacts: true
has_wiki_rewriting: true
has_wiki_discovery: false
has_usfs_trails: true
has_blm_trails: true
defaults:
center: [42.5736, -114.6066]
zoom: 10
# Offroute wilderness routing
offroute:
osm_pbf_path: "/mnt/nav/sources/idaho-latest.osm.pbf"
densify_interval_m: 100
postgis_dsn: "dbname=padus"

View file

@ -2722,3 +2722,214 @@ def api_auth_whoami():
'authenticated': False,
'username': None,
})
# ── OFFROUTE API ──
@app.route("/api/offroute", methods=["POST"])
def api_offroute():
"""
Off-network routing from wilderness to destination.
Request body:
{
"start": [lat, lon],
"end": [lat, lon],
"mode": "foot" | "mtb" | "atv", (default: "foot")
"boundary_mode": "strict" | "pragmatic" | "emergency" (default: "pragmatic")
}
Response:
{
"status": "ok",
"route": { GeoJSON FeatureCollection with wilderness + network segments },
"summary": { total_distance_km, total_effort_minutes, ... }
}
"""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON body provided"}), 400
# Parse coordinates
start = data.get("start")
end = data.get("end")
if not start or not end:
return jsonify({"status": "error", "message": "Missing start or end coordinates"}), 400
if not isinstance(start, (list, tuple)) or len(start) != 2:
return jsonify({"status": "error", "message": "start must be [lat, lon]"}), 400
if not isinstance(end, (list, tuple)) or len(end) != 2:
return jsonify({"status": "error", "message": "end must be [lat, lon]"}), 400
start_lat, start_lon = float(start[0]), float(start[1])
end_lat, end_lon = float(end[0]), float(end[1])
# Parse options
mode = data.get("mode", "foot")
if mode not in ("auto", "foot", "mtb", "atv", "vehicle"):
return jsonify({"status": "error", "message": "mode must be auto, foot, mtb, atv, or vehicle"}), 400
boundary_mode = data.get("boundary_mode", "pragmatic")
if boundary_mode not in ("strict", "pragmatic", "emergency"):
return jsonify({"status": "error", "message": "boundary_mode must be strict, pragmatic, or emergency"}), 400
# Import and run router
from .offroute.router import OffrouteRouter
router = OffrouteRouter()
try:
result = router.route(
start_lat=start_lat,
start_lon=start_lon,
end_lat=end_lat,
end_lon=end_lon,
mode=mode,
boundary_mode=boundary_mode
)
finally:
router.close()
if result.get("status") == "error":
return jsonify(result), 400
return jsonify(result)
except Exception as e:
logger.exception("Offroute error")
return jsonify({"status": "error", "message": str(e)}), 500
# ── MVUM Places Panel API ──
@app.route("/api/mvum", methods=["GET"])
def api_mvum():
"""
Query MVUM (Motor Vehicle Use Map) features near a point.
Used by the Navi frontend places panel when a user taps near a road/trail.
Query params:
lat: Latitude
lon: Longitude
radius: Search radius in meters (default: 50)
Response:
{
"status": "ok",
"feature": {
"id": "FR 123",
"name": "Some Forest Road",
"forest": "Sawtooth National Forest",
"district": "Ketchum Ranger District",
"surface": "NAT",
"maintenance_level": 2,
"seasonal": "Seasonal",
"symbol": 2,
"access": {
"passenger_vehicle": { "status": "Open", "dates": "06/15-10/15" },
"high_clearance": { "status": "Open", "dates": "06/15-10/15" },
"atv": { "status": "Open", "dates": "06/15-10/15" },
...
},
"geometry": { GeoJSON LineString }
}
}
If no MVUM feature within radius:
{ "status": "ok", "feature": null }
"""
try:
lat = request.args.get("lat", type=float)
lon = request.args.get("lon", type=float)
radius = request.args.get("radius", 50, type=float)
if lat is None or lon is None:
return jsonify({"status": "error", "message": "lat and lon required"}), 400
from .offroute.mvum import MVUMReader
reader = MVUMReader()
try:
# Try roads first, then trails
feature = reader.query_nearest(lat, lon, radius, "mvum_roads")
if feature is None:
feature = reader.query_nearest(lat, lon, radius, "mvum_trails")
if feature is None:
return jsonify({"status": "ok", "feature": None})
# Format access info
access = {
"passenger_vehicle": {
"status": feature.get("passengervehicle"),
"dates": feature.get("passengervehicle_datesopen")
},
"high_clearance": {
"status": feature.get("highclearancevehicle"),
"dates": feature.get("highclearancevehicle_datesopen")
},
"atv": {
"status": feature.get("atv"),
"dates": feature.get("atv_datesopen")
},
"motorcycle": {
"status": feature.get("motorcycle"),
"dates": feature.get("motorcycle_datesopen")
},
"4wd_gt50": {
"status": feature.get("fourwd_gt50inches"),
"dates": feature.get("fourwd_gt50_datesopen")
},
"2wd_gt50": {
"status": feature.get("twowd_gt50inches"),
"dates": feature.get("twowd_gt50_datesopen")
},
"e_bike_class1": {
"status": feature.get("e_bike_class1"),
"dates": feature.get("e_bike_class1_dur")
},
"e_bike_class2": {
"status": feature.get("e_bike_class2"),
"dates": feature.get("e_bike_class2_dur")
},
"e_bike_class3": {
"status": feature.get("e_bike_class3"),
"dates": feature.get("e_bike_class3_dur")
},
}
# Parse maintenance level
maint_level = feature.get("operationalmaintlevel", "")
maint_num = None
if maint_level:
# Extract first digit: "2 - HIGH CLEARANCE VEHICLES" -> 2
import re
match = re.match(r"(\d+)", maint_level)
if match:
maint_num = int(match.group(1))
result = {
"id": feature.get("id"),
"name": feature.get("name"),
"forest": feature.get("forestname"),
"district": feature.get("districtname"),
"surface": feature.get("surfacetype"),
"maintenance_level": maint_num,
"seasonal": feature.get("seasonal"),
"symbol": feature.get("symbol"),
"trail_class": feature.get("trailclass"),
"trail_system": feature.get("trailsystem"),
"access": access,
"geometry": feature.get("geojson")
}
return jsonify({"status": "ok", "feature": result})
finally:
reader.close()
except Exception as e:
logger.exception("MVUM query error")
return jsonify({"status": "error", "message": str(e)}), 500

View file

@ -21,6 +21,7 @@ Config: processing.extract_workers, processing.max_pdf_size_mb,
processing.extract_timeout, processing.page_timeout
"""
import base64
import re
import json
import os
import random
@ -99,6 +100,40 @@ def _is_transient(error_str):
return any(sig in s for sig in transient_signals)
def _text_quality_ok(text, min_length=50):
"""Check if extracted text meets quality thresholds.
Beyond the basic length check, validates:
- Word-boundary ratio: at least 60% of tokens should be real words (2+ alpha chars)
- Concatenation ratio: lowercase-immediately-followed-by-uppercase shouldn't exceed 10% of word count
Returns True if text passes all checks.
"""
text = text.strip()
if len(text) < min_length:
return False
words = text.split()
if not words:
return False
# Word-like ratio: tokens with 2+ alphabetic characters
word_like = sum(1 for w in words if len(re.findall(r'[a-zA-Z]', w)) >= 2)
word_ratio = word_like / len(words)
if word_ratio < 0.60:
return False
# Concatenation detector: lowercase immediately followed by uppercase
# Filter out common camelCase patterns in code (short tokens)
concat_hits = len(re.findall(r'[a-z][A-Z]', text))
concat_ratio = concat_hits / len(words) if words else 0
if concat_ratio > 0.10:
return False
return True
def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30):
"""Render a single PDF page to PNG bytes using pdftoppm.
@ -224,7 +259,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# Method 1: pdftotext (poppler)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num_0indexed + 1),
['pdftotext', '-layout', '-f', str(page_num_0indexed + 1),
'-l', str(page_num_0indexed + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
@ -233,7 +268,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pdftotext'
# Method 2: pdftoppm + Tesseract OCR
@ -258,7 +293,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'tesseract'
# Method 3: Gemini Vision (last resort)
@ -276,8 +311,26 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# ── Core extraction functions ──
def _pypdf2_extract(reader, page_num):
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout."""
return reader.pages[page_num].extract_text() or ''
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout.
Tries default extraction first (space_width=200). If quality check fails,
retries with space_width=100 which better detects word boundaries in
tightly-kerned PDFs (common in Haynes/workshop manuals).
Note: PyPDF2 3.0.1 does not support layout=True. The space_width parameter
controls word-boundary detection tolerance. Lower values = more aggressive
space insertion between characters.
"""
text = reader.pages[page_num].extract_text() or ''
if _text_quality_ok(text):
return text
# Retry with tighter word-boundary detection
text_tight = reader.pages[page_num].extract_text(space_width=100.0) or ''
if len(text_tight.strip()) >= len(text.strip()):
return text_tight
return text
def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
@ -302,13 +355,13 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
text = ''
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pypdf2'
# Method 2: pdftotext via subprocess (inherently timeout-safe)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
['pdftotext', '-layout', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()):
@ -316,7 +369,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pdftotext'
# Method 3: pdftoppm + Tesseract OCR
@ -340,7 +393,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'tesseract'
# Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs)

1
lib/offroute/__init__.py Normal file
View file

@ -0,0 +1 @@
"""OFFROUTE: Off-network effort-based routing module."""

440
lib/offroute/barriers.py Normal file
View file

@ -0,0 +1,440 @@
"""
PAD-US barrier and wilderness layers for OFFROUTE.
Provides access to:
1. Barrier raster (Pub_Access = 'XA' - closed/restricted areas)
2. Wilderness raster (Des_Tp = 'WA' - designated wilderness areas)
Build functions rasterize PAD-US geodatabase to aligned GeoTIFFs.
Runtime functions read the rasters and resample to match elevation grids.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
import subprocess
import tempfile
import os
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for barriers layer support")
# Paths
DEFAULT_BARRIERS_PATH = Path("/mnt/nav/worldcover/padus_barriers.tif")
DEFAULT_WILDERNESS_PATH = Path("/mnt/nav/worldcover/wilderness.tif")
PADUS_GDB_PATH = Path("/mnt/nav/padus/PADUS4_0_Geodatabase.gdb")
PADUS_LAYER = "PADUS4_0Combined_Proclamation_Marine_Fee_Designation_Easement"
# CONUS bounding box in WGS84
CONUS_BOUNDS = {
"west": -125.0,
"east": -66.0,
"south": 24.0,
"north": 50.0,
}
# Resolution in degrees (~30m at mid-latitudes)
PIXEL_SIZE = 0.0003 # ~33m
class BarrierReader:
"""Reader for PAD-US barrier raster (closed/restricted areas)."""
def __init__(self, barrier_path: Path = DEFAULT_BARRIERS_PATH):
self.barrier_path = barrier_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.barrier_path.exists():
raise FileNotFoundError(
f"Barrier raster not found at {self.barrier_path}. "
f"Run build_barriers_raster() first."
)
self._dataset = rasterio.open(self.barrier_path)
return self._dataset
def get_barrier_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get barrier values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 barrier values:
255 = closed/restricted (impassable when respect_boundaries=True)
0 = public/accessible
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
barriers = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return barriers
def sample_point(self, lat: float, lon: float) -> int:
"""Sample barrier value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
class WildernessReader:
"""Reader for PAD-US wilderness raster (designated wilderness areas)."""
def __init__(self, wilderness_path: Path = DEFAULT_WILDERNESS_PATH):
self.wilderness_path = wilderness_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.wilderness_path.exists():
raise FileNotFoundError(
f"Wilderness raster not found at {self.wilderness_path}. "
f"Run build_wilderness_raster() first."
)
self._dataset = rasterio.open(self.wilderness_path)
return self._dataset
def get_wilderness_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get wilderness values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 wilderness values:
255 = designated wilderness area
0 = not wilderness
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
wilderness = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return wilderness
def sample_point(self, lat: float, lon: float) -> int:
"""Sample wilderness value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def build_barriers_raster(
output_path: Path = DEFAULT_BARRIERS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US barriers raster from the source geodatabase.
Extracts polygons where Pub_Access = 'XA' (Closed) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US barriers raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
closed_gpkg = Path(tmpdir) / "closed_areas.gpkg"
print(f"\n[1/3] Extracting closed areas (Pub_Access = 'XA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(closed_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Pub_Access = 'XA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "closed_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(closed_gpkg), "closed_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing closed areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "closed_areas",
str(closed_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
closed_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {closed_count} closed cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
def build_wilderness_raster(
output_path: Path = DEFAULT_WILDERNESS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US wilderness raster from the source geodatabase.
Extracts polygons where Des_Tp = 'WA' (Wilderness Area) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US wilderness raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
wilderness_gpkg = Path(tmpdir) / "wilderness_areas.gpkg"
print(f"\n[1/3] Extracting wilderness areas (Des_Tp = 'WA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(wilderness_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Des_Tp = 'WA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "wilderness_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(wilderness_gpkg), "wilderness_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing wilderness areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "wilderness_areas",
str(wilderness_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
wilderness_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {wilderness_count} wilderness cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == "build":
print("=" * 60)
print("PAD-US Barriers Raster Build")
print("=" * 60)
build_barriers_raster()
elif cmd == "build-wilderness":
print("=" * 60)
print("PAD-US Wilderness Raster Build")
print("=" * 60)
build_wilderness_raster()
elif cmd == "build-all":
print("=" * 60)
print("Building all PAD-US rasters")
print("=" * 60)
build_barriers_raster()
print("\n")
build_wilderness_raster()
else:
print(f"Unknown command: {cmd}")
print("Usage:")
print(" python barriers.py build # Build barriers raster")
print(" python barriers.py build-wilderness # Build wilderness raster")
print(" python barriers.py build-all # Build both rasters")
sys.exit(1)
else:
# Test readers
print("Testing BarrierReader...")
if not DEFAULT_BARRIERS_PATH.exists():
print(f"Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run: python barriers.py build")
sys.exit(1)
reader = BarrierReader()
barriers = reader.get_barrier_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"\nBarrier grid shape: {barriers.shape}")
print(f"Unique values: {np.unique(barriers)}")
closed_cells = np.sum(barriers == 255)
print(f"Closed cells: {closed_cells} ({100*closed_cells/barriers.size:.2f}%)")
reader.close()
print("\nTesting WildernessReader...")
if not DEFAULT_WILDERNESS_PATH.exists():
print(f"Wilderness raster not found at {DEFAULT_WILDERNESS_PATH}")
print(f"Run: python barriers.py build-wilderness")
else:
wilderness_reader = WildernessReader()
wilderness = wilderness_reader.get_wilderness_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"Wilderness grid shape: {wilderness.shape}")
print(f"Unique values: {np.unique(wilderness)}")
wilderness_cells = np.sum(wilderness == 255)
print(f"Wilderness cells: {wilderness_cells} ({100*wilderness_cells/wilderness.size:.2f}%)")
wilderness_reader.close()
print("\nDone.")

494
lib/offroute/cost.py Normal file
View file

@ -0,0 +1,494 @@
"""
Multi-mode travel cost functions for OFFROUTE.
Supports four travel modes: foot, mtb, atv, vehicle.
Each mode has its own speed function, max slope, trail access rules,
and terrain friction overrides.
Mode profiles are data-driven adding a new mode means adding a profile entry.
"""
import math
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Literal, Dict, Callable
# ═══════════════════════════════════════════════════════════════════════════════
# SPEED FUNCTIONS
# ═══════════════════════════════════════════════════════════════════════════════
def tobler_off_path_speed(grade: np.ndarray, base_speed: float = 6.0) -> np.ndarray:
"""
Tobler off-path hiking function.
W = 0.6 * base_speed * exp(-3.5 * |S + 0.05|)
Peak ~3.6 km/h at grade = -0.05 (slight downhill).
The 0.6 multiplier is the off-trail penalty.
"""
return 0.6 * base_speed * np.exp(-3.5 * np.abs(grade + 0.05))
def herzog_wheeled_speed(grade: np.ndarray, base_speed: float = 12.0) -> np.ndarray:
"""
Herzog wheeled-transport polynomial.
Relative speed factor:
1 / (1337.8·S^6 + 278.19·S^5 517.39·S^4 78.199·S^3 + 93.419·S^2 + 19.825·|S| + 1.64)
Multiply by base_speed to get km/h.
"""
S = grade
S_abs = np.abs(S)
# Herzog polynomial (returns relative speed factor 0-1)
denom = (1337.8 * S**6 + 278.19 * S**5 - 517.39 * S**4
- 78.199 * S**3 + 93.419 * S**2 + 19.825 * S_abs + 1.64)
# Avoid division by zero and negative speeds
denom = np.maximum(denom, 0.1)
rel_speed = 1.0 / denom
# Clamp relative speed to reasonable bounds (0.05 to 1.5)
rel_speed = np.clip(rel_speed, 0.05, 1.5)
return base_speed * rel_speed
def linear_degrade_speed(grade: np.ndarray, base_speed: float = 40.0, max_grade: float = 0.364) -> np.ndarray:
"""
Linear speed degradation with slope.
speed = base_speed * max(0, 1 - |grade| / max_grade)
max_grade = tan(20°) 0.364 for 20° max slope.
"""
speed = base_speed * np.maximum(0, 1.0 - np.abs(grade) / max_grade)
return np.maximum(speed, 0.1) # Minimum crawl speed
# ═══════════════════════════════════════════════════════════════════════════════
# MODE PROFILES (Data-driven configuration)
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class ModeProfile:
"""Configuration for a travel mode."""
name: str
description: str
# Speed function parameters
speed_function: str # "tobler", "herzog", "linear"
base_speed_kmh: float
max_slope_deg: float
# Trail access: trail_value -> friction multiplier (None = impassable)
# Trail values: 5=road, 15=track, 25=foot trail
trail_friction: Dict[int, Optional[float]] = field(default_factory=dict)
# Off-trail terrain friction overrides (by WorldCover class)
# These MULTIPLY the base WorldCover friction
# None = use default, np.inf = impassable
# WorldCover values: 10=tree, 20=shrub, 30=grass, 40=crop, 50=urban,
# 60=bare, 80=water, 90=wetland, 95=mangrove, 100=moss
terrain_friction_override: Dict[int, Optional[float]] = field(default_factory=dict)
# Should wilderness areas be impassable?
wilderness_impassable: bool = False
# For vehicle mode: can traverse off-trail flat terrain?
off_trail_flat_threshold_deg: float = 0.0 # 0 = no off-trail allowed
off_trail_flat_friction: float = np.inf # friction if allowed
# Define all mode profiles
MODE_PROFILES: Dict[str, ModeProfile] = {
"foot": ModeProfile(
name="foot",
description="Hiking on foot (Tobler off-path model)",
speed_function="tobler",
base_speed_kmh=6.0,
max_slope_deg=40.0,
trail_friction={
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
},
terrain_friction_override={
# Use default WorldCover friction for foot mode
},
wilderness_impassable=False,
),
"mtb": ModeProfile(
name="mtb",
description="Mountain bike / dirt bike (Herzog wheeled model)",
speed_function="herzog",
base_speed_kmh=12.0,
max_slope_deg=25.0,
trail_friction={
5: 0.1, # road
15: 0.2, # track
25: 0.5, # foot trail (rideable but slow)
},
terrain_friction_override={
30: 2.0, # Grassland: rideable but slow
20: 4.0, # Shrubland: barely rideable
10: 8.0, # Tree cover/forest: effectively impassable
60: 3.0, # Bare/rocky
90: np.inf, # Wetland: impassable
95: np.inf, # Mangrove: impassable
80: np.inf, # Water: impassable
},
wilderness_impassable=True,
),
"atv": ModeProfile(
name="atv",
description="ATV / side-by-side (Herzog wheeled model, higher base speed)",
speed_function="herzog",
base_speed_kmh=25.0,
max_slope_deg=30.0,
trail_friction={
5: 0.1, # road
15: 0.3, # track
25: None, # foot trail: impassable (too narrow)
},
terrain_friction_override={
30: 1.5, # Grassland: passable
20: 3.0, # Shrubland: rough
10: np.inf, # Forest: impassable
60: 2.0, # Bare/rocky
90: np.inf, # Wetland: impassable
95: np.inf, # Mangrove: impassable
80: np.inf, # Water: impassable
},
wilderness_impassable=True,
),
"vehicle": ModeProfile(
name="vehicle",
description="4x4 truck / jeep (linear speed degradation)",
speed_function="linear",
base_speed_kmh=40.0,
max_slope_deg=20.0,
trail_friction={
5: 0.1, # road
15: 0.5, # track (rough but passable)
25: None, # foot trail: impassable
},
terrain_friction_override={
# All off-trail terrain is impassable by default
10: np.inf, # Forest
20: np.inf, # Shrubland
30: np.inf, # Grassland (except flat - see below)
40: np.inf, # Cropland (except flat - see below)
60: np.inf, # Bare
90: np.inf, # Wetland
95: np.inf, # Mangrove
80: np.inf, # Water
},
wilderness_impassable=True,
off_trail_flat_threshold_deg=5.0, # Can drive on flat fields
off_trail_flat_friction=5.0, # But very slow
),
}
# Pragmatic mode friction multiplier for private land
PRAGMATIC_BARRIER_MULTIPLIER = 5.0
# ═══════════════════════════════════════════════════════════════════════════════
# COST GRID COMPUTATION
# ═══════════════════════════════════════════════════════════════════════════════
def compute_cost_grid(
elevation: np.ndarray,
cell_size_m: float,
cell_size_lat_m: float = None,
cell_size_lon_m: float = None,
friction: Optional[np.ndarray] = None,
friction_raw: Optional[np.ndarray] = None,
trails: Optional[np.ndarray] = None,
barriers: Optional[np.ndarray] = None,
wilderness: Optional[np.ndarray] = None,
mvum: Optional[np.ndarray] = None,
boundary_mode: Literal["strict", "pragmatic", "emergency"] = "pragmatic",
mode: Literal["foot", "mtb", "atv", "vehicle"] = "foot"
) -> np.ndarray:
"""
Compute isotropic travel cost grid from elevation data.
Args:
elevation: 2D array of elevation values in meters
cell_size_m: Average cell size in meters
cell_size_lat_m: Cell size in latitude direction (optional)
cell_size_lon_m: Cell size in longitude direction (optional)
friction: Optional 2D array of friction multipliers (WorldCover).
Values should be float (1.0 = baseline, 2.0 = 2x slower).
np.inf marks impassable cells.
friction_raw: Optional 2D array of raw WorldCover class values (uint8).
Used for mode-specific terrain overrides.
Values: 10=tree, 20=shrub, 30=grass, etc.
trails: Optional 2D array of trail values (uint8).
0 = no trail, 5 = road, 15 = track, 25 = foot trail
barriers: Optional 2D array of barrier values (uint8).
255 = closed/restricted area (PAD-US Pub_Access = XA).
wilderness: Optional[np.ndarray] of wilderness values (uint8).
255 = designated wilderness area.
mvum: Optional[np.ndarray] of MVUM access values (uint8).
0 = no MVUM data, 1 = open, 255 = closed to this mode.
MVUM closures respond to boundary_mode (strict/pragmatic/emergency).
Foot mode should pass None (MVUM is motor-vehicle specific).
boundary_mode: How to handle barriers ("strict", "pragmatic", "emergency")
mode: Travel mode ("foot", "mtb", "atv", "vehicle")
Returns:
2D array of travel cost in seconds per cell.
np.inf for impassable cells.
"""
if boundary_mode not in ("strict", "pragmatic", "emergency"):
raise ValueError(f"boundary_mode must be 'strict', 'pragmatic', or 'emergency'")
if mode not in MODE_PROFILES:
raise ValueError(f"mode must be one of {list(MODE_PROFILES.keys())}")
profile = MODE_PROFILES[mode]
if cell_size_lat_m is None:
cell_size_lat_m = cell_size_m
if cell_size_lon_m is None:
cell_size_lon_m = cell_size_m
rows, cols = elevation.shape
# ─── Compute gradients (in-place where possible) ─────────────────────────
# Use float32 to reduce memory footprint
grade = np.zeros(elevation.shape, dtype=np.float32)
# Compute dy contribution to grade squared
dy_contrib = np.zeros(elevation.shape, dtype=np.float32)
dy_contrib[1:-1, :] = ((elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m)) ** 2
dy_contrib[0, :] = ((elevation[0, :] - elevation[1, :]) / cell_size_lat_m) ** 2
dy_contrib[-1, :] = ((elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m) ** 2
# Compute dx contribution and add to dy_contrib in-place
dy_contrib[:, 1:-1] += ((elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m)) ** 2
dy_contrib[:, 0] += ((elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m) ** 2
dy_contrib[:, -1] += ((elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m) ** 2
# grade = sqrt(dx^2 + dy^2)
np.sqrt(dy_contrib, out=grade)
del dy_contrib # Free memory immediately
# ─── Compute speed based on mode ─────────────────────────────────────────
max_grade_val = np.tan(np.radians(profile.max_slope_deg))
if profile.speed_function == "tobler":
speed_kmh = tobler_off_path_speed(grade, profile.base_speed_kmh)
elif profile.speed_function == "herzog":
speed_kmh = herzog_wheeled_speed(grade, profile.base_speed_kmh)
elif profile.speed_function == "linear":
speed_kmh = linear_degrade_speed(grade, profile.base_speed_kmh, max_grade_val)
else:
raise ValueError(f"Unknown speed function: {profile.speed_function}")
# ─── Base cost (seconds per cell) ─────────────────────────────────────────
avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2
cost = (avg_cell_size * 3.6) / speed_kmh
del speed_kmh
# ─── Max slope limit ──────────────────────────────────────────────────────
cost[grade > max_grade_val] = np.inf
# ─── NaN elevations ──────────────────────────────────────────────────────
cost[np.isnan(elevation)] = np.inf
# ─── Apply friction in-place ─────────────────────────────────────────────
# Instead of creating effective_friction copy, apply directly to cost
# Start with base friction
if friction is not None:
if friction.shape != elevation.shape:
raise ValueError(f"Friction shape mismatch")
np.multiply(cost, friction, out=cost)
# ─── Mode-specific terrain friction overrides (memory-efficient) ─────────
if friction_raw is not None and profile.terrain_friction_override:
if friction_raw.shape != elevation.shape:
raise ValueError(f"Friction_raw shape mismatch")
# Process all overrides without creating large intermediate masks
for wc_class, override in profile.terrain_friction_override.items():
if override is not None:
if override == np.inf:
# Use np.where for in-place-like behavior
np.putmask(cost, friction_raw == wc_class, np.inf)
else:
# Multiply cost where friction_raw matches
# Using a loop with putmask is more memory efficient
mask = friction_raw == wc_class
cost[mask] *= override
del mask
# ─── Vehicle mode: allow flat grassland/cropland ─────────────────────────
if mode == "vehicle" and profile.off_trail_flat_threshold_deg > 0:
if friction_raw is not None:
# Compute slope in degrees for flat terrain check
slope_deg = np.degrees(np.arctan(grade))
# Flat grassland or cropland - recompute cost for these cells
flat_field_mask = (
(slope_deg <= profile.off_trail_flat_threshold_deg) &
((friction_raw == 30) | (friction_raw == 40))
)
del slope_deg
# Recalculate cost for these cells with flat field friction
if np.any(flat_field_mask):
base_time = avg_cell_size * 3.6 / linear_degrade_speed(
grade[flat_field_mask], profile.base_speed_kmh, max_grade_val
)
cost[flat_field_mask] = base_time * profile.off_trail_flat_friction
del base_time
del flat_field_mask
# ─── Trail friction (mode-specific) ──────────────────────────────────────
if trails is not None:
if trails.shape != elevation.shape:
raise ValueError(f"Trails shape mismatch")
for trail_value, trail_friction in profile.trail_friction.items():
if trail_friction is None:
# Impassable for this mode
np.putmask(cost, trails == trail_value, np.inf)
else:
# Trail friction REPLACES terrain friction
# Recalculate cost = base_time * trail_friction
trail_mask = trails == trail_value
if np.any(trail_mask):
# Get base travel time (without friction)
if profile.speed_function == "tobler":
trail_speed = tobler_off_path_speed(grade[trail_mask], profile.base_speed_kmh)
elif profile.speed_function == "herzog":
trail_speed = herzog_wheeled_speed(grade[trail_mask], profile.base_speed_kmh)
else:
trail_speed = linear_degrade_speed(
grade[trail_mask], profile.base_speed_kmh, max_grade_val
)
cost[trail_mask] = (avg_cell_size * 3.6 / trail_speed) * trail_friction
del trail_speed
del trail_mask
# ─── Wilderness areas (mode-specific) ────────────────────────────────────
if wilderness is not None and profile.wilderness_impassable:
if wilderness.shape != elevation.shape:
raise ValueError(f"Wilderness shape mismatch")
np.putmask(cost, wilderness == 255, np.inf)
# ─── Barriers (private land) ─────────────────────────────────────────────
if barriers is not None and boundary_mode != "emergency":
if barriers.shape != elevation.shape:
raise ValueError(f"Barriers shape mismatch")
if boundary_mode == "strict":
np.putmask(cost, barriers == 255, np.inf)
elif boundary_mode == "pragmatic":
barrier_mask = barriers == 255
cost[barrier_mask] *= PRAGMATIC_BARRIER_MULTIPLIER
del barrier_mask
# ─── MVUM closures (motor vehicle restrictions) ──────────────────────────
# MVUM only applies to motorized modes, not foot. Foot mode should pass mvum=None.
# MVUM closures respond to the same boundary_mode as PAD-US barriers:
# "strict" = MVUM-closed road/trail is impassable
# "pragmatic" = MVUM-closed road/trail gets 5× friction penalty
# "emergency" = MVUM closures ignored entirely
if mvum is not None and mode != "foot" and boundary_mode != "emergency":
if mvum.shape != elevation.shape:
raise ValueError(f"MVUM shape mismatch")
# Value 255 = road/trail exists but is closed to this mode
mvum_closed_mask = mvum == 255
if boundary_mode == "strict":
np.putmask(cost, mvum_closed_mask, np.inf)
elif boundary_mode == "pragmatic":
cost[mvum_closed_mask] *= PRAGMATIC_BARRIER_MULTIPLIER
del mvum_closed_mask
return cost
# ═══════════════════════════════════════════════════════════════════════════════
# LEGACY API (backward compatibility)
# ═══════════════════════════════════════════════════════════════════════════════
def tobler_speed(grade: float) -> float:
"""Legacy single-value Tobler speed function."""
return 0.6 * 6.0 * math.exp(-3.5 * abs(grade + 0.05))
# ═══════════════════════════════════════════════════════════════════════════════
# TESTING
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 70)
print("OFFROUTE Multi-Mode Cost Function Tests")
print("=" * 70)
print("\n[1] Speed functions at various grades:")
print(f"{'Grade':<10} {'Foot':<12} {'MTB':<12} {'ATV':<12} {'Vehicle':<12}")
print("-" * 60)
for grade_val in [-0.3, -0.1, 0.0, 0.1, 0.2, 0.3]:
grade_arr = np.array([grade_val])
foot = tobler_off_path_speed(grade_arr, 6.0)[0]
mtb = herzog_wheeled_speed(grade_arr, 12.0)[0]
atv = herzog_wheeled_speed(grade_arr, 25.0)[0]
veh = linear_degrade_speed(grade_arr, 40.0, np.tan(np.radians(20)))[0]
print(f"{grade_val:+.2f} {foot:>6.2f} km/h {mtb:>6.2f} km/h {atv:>6.2f} km/h {veh:>6.2f} km/h")
print("\n[2] Mode profiles:")
for name, profile in MODE_PROFILES.items():
print(f"\n {name.upper()}: {profile.description}")
print(f" Max slope: {profile.max_slope_deg}°")
print(f" Trail access: {profile.trail_friction}")
print(f" Wilderness blocked: {profile.wilderness_impassable}")
print("\n[3] Cost grid test (flat terrain, forest):")
elev = np.ones((10, 10), dtype=np.float32) * 1000
friction = np.ones((10, 10), dtype=np.float32) * 2.0 # Forest friction
friction_raw = np.ones((10, 10), dtype=np.uint8) * 10 # Tree cover class
trails = np.zeros((10, 10), dtype=np.uint8)
trails[5, :] = 5 # Road across middle
for mode_name in ["foot", "mtb", "atv", "vehicle"]:
cost = compute_cost_grid(
elev, cell_size_m=30.0,
friction=friction,
friction_raw=friction_raw,
trails=trails,
mode=mode_name
)
off_trail_cost = cost[0, 0]
road_cost = cost[5, 0]
impassable = np.sum(np.isinf(cost))
print(f" {mode_name:8s}: off-trail={off_trail_cost:>8.1f}s, road={road_cost:>6.1f}s, impassable={impassable}")
print("\n[4] Wilderness blocking test:")
wilderness = np.zeros((10, 10), dtype=np.uint8)
wilderness[3:7, 3:7] = 255
for mode_name in ["foot", "mtb", "atv", "vehicle"]:
cost = compute_cost_grid(
elev, cell_size_m=30.0,
wilderness=wilderness,
mode=mode_name
)
wilderness_impassable = np.sum(np.isinf(cost[3:7, 3:7]))
print(f" {mode_name:8s}: wilderness cells impassable = {wilderness_impassable}/16")
print("\nDone.")

190
lib/offroute/dem.py Normal file
View file

@ -0,0 +1,190 @@
"""
DEM tile reader for OFFROUTE.
Reads elevation tiles from planet-dem.pmtiles (Terrarium-encoded WebP),
decodes them into numpy arrays, and provides a stitched elevation grid
for a given bounding box.
"""
import math
from functools import lru_cache
from io import BytesIO
from pathlib import Path
from typing import Tuple, Optional
import numpy as np
from PIL import Image
from pmtiles.reader import MmapSource, Reader as PMTilesReader
# Default path to the planet DEM PMTiles file
DEFAULT_DEM_PATH = Path("/mnt/nas/nav/planet-dem.pmtiles")
# Tile size in pixels (z12 tiles are 512x512 in this tileset)
TILE_SIZE = 512
# Zoom level to use for elevation data
ZOOM_LEVEL = 12
def terrarium_decode(rgb_array: np.ndarray) -> np.ndarray:
"""
Decode Terrarium-encoded RGB values to elevation in meters.
Formula: elevation = (R * 256 + G + B/256) - 32768
"""
r = rgb_array[:, :, 0].astype(np.float32)
g = rgb_array[:, :, 1].astype(np.float32)
b = rgb_array[:, :, 2].astype(np.float32)
elevation = (r * 256.0 + g + b / 256.0) - 32768.0
return elevation
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
"""Convert lat/lon to tile coordinates at given zoom level."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
lat_rad = math.radians(lat)
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return x, y
def tile_to_lat_lon(x: int, y: int, zoom: int) -> Tuple[float, float, float, float]:
"""Convert tile coordinates to bounding box (north, south, west, east)."""
n = 2 ** zoom
lon_west = x / n * 360.0 - 180.0
lon_east = (x + 1) / n * 360.0 - 180.0
lat_north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
lat_south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return lat_north, lat_south, lon_west, lon_east
class DEMReader:
"""Reader for Terrarium-encoded DEM tiles from PMTiles."""
def __init__(self, pmtiles_path: Path = DEFAULT_DEM_PATH, tile_cache_size: int = 128):
self.pmtiles_path = pmtiles_path
self._source = MmapSource(open(pmtiles_path, "rb"))
self._reader = PMTilesReader(self._source)
self._header = self._reader.header()
self._decode_tile = lru_cache(maxsize=tile_cache_size)(self._decode_tile_impl)
def _decode_tile_impl(self, z: int, x: int, y: int) -> Optional[np.ndarray]:
"""Fetch and decode a single tile."""
tile_data = self._reader.get(z, x, y)
if tile_data is None:
return None
img = Image.open(BytesIO(tile_data))
rgb_array = np.array(img)
if rgb_array.shape[2] == 4:
rgb_array = rgb_array[:, :, :3]
elevation = terrarium_decode(rgb_array)
return elevation
def get_elevation_grid(
self,
south: float,
north: float,
west: float,
east: float,
zoom: int = ZOOM_LEVEL
) -> Tuple[np.ndarray, dict]:
"""Get a stitched elevation grid for the given bounding box."""
x_min, y_max = lat_lon_to_tile(south, west, zoom)
x_max, y_min = lat_lon_to_tile(north, east, zoom)
n = 2 ** zoom
x_min = max(0, x_min)
x_max = min(n - 1, x_max)
y_min = max(0, y_min)
y_max = min(n - 1, y_max)
n_tiles_x = x_max - x_min + 1
n_tiles_y = y_max - y_min + 1
out_height = n_tiles_y * TILE_SIZE
out_width = n_tiles_x * TILE_SIZE
elevation = np.full((out_height, out_width), np.nan, dtype=np.float32)
for ty in range(y_min, y_max + 1):
for tx in range(x_min, x_max + 1):
tile_elev = self._decode_tile(zoom, tx, ty)
if tile_elev is not None:
out_y = (ty - y_min) * TILE_SIZE
out_x = (tx - x_min) * TILE_SIZE
elevation[out_y:out_y + TILE_SIZE, out_x:out_x + TILE_SIZE] = tile_elev
grid_north, _, grid_west, _ = tile_to_lat_lon(x_min, y_min, zoom)
_, grid_south, _, grid_east = tile_to_lat_lon(x_max, y_max, zoom)
pixel_size_lat = (grid_north - grid_south) / out_height
pixel_size_lon = (grid_east - grid_west) / out_width
origin_lat = grid_north - pixel_size_lat / 2
origin_lon = grid_west + pixel_size_lon / 2
center_lat = (south + north) / 2
lat_m = 111320.0
lon_m = 111320.0 * math.cos(math.radians(center_lat))
cell_size_lat_m = abs(pixel_size_lat) * lat_m
cell_size_lon_m = abs(pixel_size_lon) * lon_m
cell_size_m = (cell_size_lat_m + cell_size_lon_m) / 2
row_start = int((grid_north - north) / abs(pixel_size_lat))
row_end = int((grid_north - south) / abs(pixel_size_lat))
col_start = int((west - grid_west) / pixel_size_lon)
col_end = int((east - grid_west) / pixel_size_lon)
row_start = max(0, row_start)
row_end = min(out_height, row_end)
col_start = max(0, col_start)
col_end = min(out_width, col_end)
elevation = elevation[row_start:row_end, col_start:col_end]
origin_lat = grid_north - (row_start + 0.5) * abs(pixel_size_lat)
origin_lon = grid_west + (col_start + 0.5) * pixel_size_lon
metadata = {
"bounds": (south, north, west, east),
"pixel_size_lat": -abs(pixel_size_lat),
"pixel_size_lon": pixel_size_lon,
"origin_lat": origin_lat,
"origin_lon": origin_lon,
"cell_size_m": cell_size_m,
"shape": elevation.shape,
}
return elevation, metadata
def pixel_to_latlon(self, row: int, col: int, metadata: dict) -> Tuple[float, float]:
"""Convert pixel coordinates to lat/lon."""
lat = metadata["origin_lat"] + row * metadata["pixel_size_lat"]
lon = metadata["origin_lon"] + col * metadata["pixel_size_lon"]
return lat, lon
def latlon_to_pixel(self, lat: float, lon: float, metadata: dict) -> Tuple[int, int]:
"""Convert lat/lon to pixel coordinates."""
row = int((metadata["origin_lat"] - lat) / abs(metadata["pixel_size_lat"]))
col = int((lon - metadata["origin_lon"]) / metadata["pixel_size_lon"])
return row, col
def close(self):
"""Close the PMTiles file."""
pass # MmapSource handles cleanup
if __name__ == "__main__":
reader = DEMReader()
elevation, meta = reader.get_elevation_grid(
south=42.4, north=42.6, west=-114.5, east=-114.3
)
print(f"Elevation grid shape: {elevation.shape}")
print(f"Cell size: {meta['cell_size_m']:.1f} m")
print(f"Elevation range: {np.nanmin(elevation):.1f} - {np.nanmax(elevation):.1f} m")
center_row, center_col = elevation.shape[0] // 2, elevation.shape[1] // 2
lat, lon = reader.pixel_to_latlon(center_row, center_col, meta)
print(f"Center pixel lat/lon: {lat:.4f}, {lon:.4f}")
reader.close()

137
lib/offroute/friction.py Normal file
View file

@ -0,0 +1,137 @@
"""
Friction layer reader for OFFROUTE.
Reads friction values from the WorldCover friction VRT and resamples
to match the elevation grid dimensions.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for friction layer support")
# Default path to the friction VRT
DEFAULT_FRICTION_PATH = Path("/mnt/nav/worldcover/friction/friction_conus.vrt")
class FrictionReader:
"""Reader for WorldCover friction raster."""
def __init__(self, friction_path: Path = DEFAULT_FRICTION_PATH):
self.friction_path = friction_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
self._dataset = rasterio.open(self.friction_path)
return self._dataset
def get_friction_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get friction values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 friction values, same shape as target_shape.
Values: 10-40 = friction multiplier (divide by 10)
255 = impassable
0 = nodata (treat as impassable)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor for categorical data
friction = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return friction
def sample_point(self, lat: float, lon: float) -> int:
"""Sample friction value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = nodata
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def friction_to_multiplier(friction: np.ndarray) -> np.ndarray:
"""
Convert friction values to cost multipliers.
Args:
friction: uint8 array of friction values
Returns:
float32 array of multipliers.
Values 10-40 become 1.0-4.0 (divide by 10).
Values 0 or 255 become np.inf (impassable).
"""
multiplier = friction.astype(np.float32) / 10.0
# Mark impassable cells
multiplier[friction == 0] = np.inf # nodata
multiplier[friction == 255] = np.inf # water/impassable
return multiplier
if __name__ == "__main__":
print("Testing FrictionReader...")
reader = FrictionReader()
# Test point sampling - Murtaugh Lake (should be water = 255)
lake_lat, lake_lon = 42.47, -114.15
lake_friction = reader.sample_point(lake_lat, lake_lon)
print(f"Murtaugh Lake ({lake_lat}, {lake_lon}): friction = {lake_friction}")
print(f" Expected: 255 (water/impassable)")
# Test grid read for small bbox
friction = reader.get_friction_grid(
south=42.4, north=42.5, west=-114.2, east=-114.1,
target_shape=(100, 100)
)
print(f"\nGrid test shape: {friction.shape}")
print(f"Unique values: {np.unique(friction)}")
print(f"Water cells (255): {np.sum(friction == 255)}")
reader.close()
print("\nFrictionReader test complete.")

623
lib/offroute/mvum.py Normal file
View file

@ -0,0 +1,623 @@
"""
MVUM (Motor Vehicle Use Map) legal access layer for OFFROUTE.
Queries USFS MVUM data from navi.db and provides rasterized access grids
indicating which roads/trails are open or closed to specific vehicle modes.
MVUM is motor-vehicle specific foot mode should skip this layer entirely.
"""
import re
import sqlite3
import warnings
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Literal
import numpy as np
# Path to navi.db
NAVI_DB_PATH = Path("/mnt/nav/navi.db")
def parse_date_range(date_str: str) -> List[Tuple[int, int, int, int]]:
"""
Parse MVUM date range strings like "05/01-11/30" or "06/15-10/15,12/01-03/31".
Returns list of (start_month, start_day, end_month, end_day) tuples.
Returns empty list if unparseable.
"""
if not date_str or date_str.strip() == "":
return []
ranges = []
# Split by comma for multi-period strings
for part in date_str.split(","):
part = part.strip()
# Match MM/DD-MM/DD pattern
match = re.match(r"(\d{1,2})/(\d{1,2})-(\d{1,2})/(\d{1,2})", part)
if match:
try:
sm, sd, em, ed = int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))
if 1 <= sm <= 12 and 1 <= sd <= 31 and 1 <= em <= 12 and 1 <= ed <= 31:
ranges.append((sm, sd, em, ed))
except ValueError:
pass
return ranges
def is_date_in_range(month: int, day: int, ranges: List[Tuple[int, int, int, int]]) -> bool:
"""
Check if a given month/day falls within any of the date ranges.
Handles ranges that wrap around year end (e.g., 12/01-03/31).
"""
if not ranges:
return True # No ranges = assume open
date_num = month * 100 + day # Simple numeric comparison
for sm, sd, em, ed in ranges:
start_num = sm * 100 + sd
end_num = em * 100 + ed
if start_num <= end_num:
# Normal range (e.g., 05/01-11/30)
if start_num <= date_num <= end_num:
return True
else:
# Wrapping range (e.g., 12/01-03/31)
if date_num >= start_num or date_num <= end_num:
return True
return False
def check_access(
status_field: Optional[str],
dates_field: Optional[str],
seasonal: Optional[str],
check_date: Optional[Tuple[int, int]] = None
) -> Optional[bool]:
"""
Determine if a road/trail is open to a vehicle type.
Args:
status_field: Value of vehicle-class field (e.g., "open", null)
dates_field: Value of *_DATESOPEN field (e.g., "05/01-11/30")
seasonal: Value of SEASONAL field ("yearlong", "seasonal")
check_date: Optional (month, day) tuple to check against date ranges
Returns:
True = open
False = closed
None = no data (field not populated, defer to SYMBOL)
"""
if status_field is None or status_field.strip() == "":
return None # No data
status = status_field.strip().lower()
if status != "open":
return False # Explicitly closed or restricted
# Status is "open" - check seasonal restrictions
if check_date is not None:
month, day = check_date
# Parse date ranges
if dates_field:
ranges = parse_date_range(dates_field)
if ranges:
return is_date_in_range(month, day, ranges)
# No date field but seasonal = "yearlong" means always open
if seasonal and seasonal.strip().lower() == "yearlong":
return True
# Seasonal with no dates - assume open (data quality issue)
if seasonal and seasonal.strip().lower() == "seasonal":
warnings.warn(f"Seasonal road/trail with no DATESOPEN, assuming open")
return True
return True # Open with no date check
def get_mode_field(mode: str) -> Tuple[str, str]:
"""
Get the MVUM field names for a given travel mode.
Returns (status_field, dates_field) tuple.
"""
mode_mapping = {
"atv": ("atv", "atv_datesopen"),
"motorcycle": ("motorcycle", "motorcycle_datesopen"),
"mtb": ("e_bike_class1", "e_bike_class1_dur"), # Closest analog for e-bikes
"vehicle": ("highclearancevehicle", "highclearancevehicle_datesopen"),
"passenger": ("passengervehicle", "passengervehicle_datesopen"),
}
return mode_mapping.get(mode, ("highclearancevehicle", "highclearancevehicle_datesopen"))
def symbol_to_access(symbol: str, mode: str, maint_level: Optional[str] = None) -> Optional[bool]:
"""
Fallback: interpret SYMBOL field when per-vehicle-class fields are null.
MVUM SYMBOL meanings (roads):
1 = Open to all vehicles
2 = Open to highway legal vehicles only
3 = Road closed to motorized
4 = Road open seasonally
11 = Administrative use only
12 = Decommissioned
For trails, similar logic applies based on TRAILCLASS.
"""
if symbol is None:
return None
sym = str(symbol).strip()
# Symbol 1: Open to all
if sym == "1":
return True
# Symbol 2: Highway legal only
if sym == "2":
# ATVs/motorcycles typically not highway legal
if mode in ("atv", "motorcycle"):
return False
return True
# Symbol 3: Closed to motorized
if sym == "3":
return False
# Symbol 4: Seasonally open (assume open if no date check)
if sym == "4":
return True
# Symbol 11/12: Administrative/decommissioned = closed
if sym in ("11", "12"):
return False
# Unknown symbol - defer
return None
class MVUMReader:
"""
Reader for MVUM data from navi.db.
Queries roads and trails by bounding box and returns access grids.
"""
def __init__(self, db_path: Path = NAVI_DB_PATH):
self.db_path = db_path
self._conn = None
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
if not self.db_path.exists():
raise FileNotFoundError(f"navi.db not found at {self.db_path}")
self._conn = sqlite3.connect(str(self.db_path))
self._conn.row_factory = sqlite3.Row
# Load Spatialite extension if available
try:
self._conn.enable_load_extension(True)
self._conn.load_extension("mod_spatialite")
except Exception:
pass # Spatialite not available, will use manual bbox queries
return self._conn
def table_exists(self, table_name: str) -> bool:
"""Check if an MVUM table exists."""
conn = self._get_conn()
cur = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return cur.fetchone() is not None
def query_roads_bbox(
self,
south: float, north: float, west: float, east: float,
mode: str = "atv",
check_date: Optional[Tuple[int, int]] = None
) -> List[Dict]:
"""
Query MVUM roads within a bounding box.
Returns list of dicts with access info for the given mode.
"""
if not self.table_exists("mvum_roads"):
return []
conn = self._get_conn()
# Query using bbox on geometry
# Since we don't have spatialite, we'll query all and filter in Python
# For production, consider pre-computing bbox columns
cur = conn.execute("""
SELECT ogc_fid, id, name, symbol, operationalmaintlevel, seasonal,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
passengervehicle, passengervehicle_datesopen,
e_bike_class1, e_bike_class1_dur,
shape
FROM mvum_roads
""")
status_field, dates_field = get_mode_field(mode)
results = []
for row in cur:
# Parse geometry to check bbox intersection
# The shape is stored as WKB blob
shape = row["shape"]
if shape is None:
continue
# Quick bbox check using geometry extent
# Since we don't have Spatialite functions, we'll include all
# and let the rasterization handle it
access = check_access(
row[status_field] if status_field in row.keys() else None,
row[dates_field] if dates_field in row.keys() else None,
row["seasonal"],
check_date
)
# Fallback to SYMBOL if no per-vehicle data
if access is None:
access = symbol_to_access(row["symbol"], mode, row["operationalmaintlevel"])
if access is not None:
results.append({
"id": row["id"],
"name": row["name"],
"access": access,
"symbol": row["symbol"],
"maint_level": row["operationalmaintlevel"],
"shape": shape,
})
return results
def query_trails_bbox(
self,
south: float, north: float, west: float, east: float,
mode: str = "atv",
check_date: Optional[Tuple[int, int]] = None
) -> List[Dict]:
"""
Query MVUM trails within a bounding box.
"""
if not self.table_exists("mvum_trails"):
return []
conn = self._get_conn()
cur = conn.execute("""
SELECT ogc_fid, id, name, symbol, seasonal, trailclass,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
passengervehicle, passengervehicle_datesopen,
e_bike_class1, e_bike_class1_dur,
shape
FROM mvum_trails
""")
status_field, dates_field = get_mode_field(mode)
results = []
for row in cur:
shape = row["shape"]
if shape is None:
continue
access = check_access(
row[status_field] if status_field in row.keys() else None,
row[dates_field] if dates_field in row.keys() else None,
row["seasonal"],
check_date
)
if access is None:
access = symbol_to_access(row["symbol"], mode)
if access is not None:
results.append({
"id": row["id"],
"name": row["name"],
"access": access,
"symbol": row["symbol"],
"trail_class": row["trailclass"],
"shape": shape,
})
return results
def query_nearest(
self,
lat: float, lon: float,
radius_m: float = 50,
table: str = "mvum_roads"
) -> Optional[Dict]:
"""
Query the nearest MVUM feature to a point.
Used for the places panel API.
"""
if not self.table_exists(table):
return None
conn = self._get_conn()
# Convert radius to degrees (approximate)
radius_deg = radius_m / 111000
# Query features in bbox around point
if table == "mvum_roads":
cur = conn.execute("""
SELECT ogc_fid, id, name, forestname, districtname, symbol,
operationalmaintlevel, surfacetype, seasonal, jurisdiction,
passengervehicle, passengervehicle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
fourwd_gt50inches, fourwd_gt50_datesopen,
twowd_gt50inches, twowd_gt50_datesopen,
e_bike_class1, e_bike_class1_dur,
e_bike_class2, e_bike_class2_dur,
e_bike_class3, e_bike_class3_dur,
shape
FROM mvum_roads
LIMIT 1000
""")
else:
cur = conn.execute("""
SELECT ogc_fid, id, name, forestname, districtname, symbol,
seasonal, jurisdiction, trailclass, trailsystem,
passengervehicle, passengervehicle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
fourwd_gt50inches, fourwd_gt50_datesopen,
twowd_gt50inches, twowd_gt50_datesopen,
e_bike_class1, e_bike_class1_dur,
e_bike_class2, e_bike_class2_dur,
e_bike_class3, e_bike_class3_dur,
shape
FROM mvum_trails
LIMIT 1000
""")
# Find nearest feature
# This is a simplified approach - for production, use spatial index
try:
from shapely import wkb
from shapely.geometry import Point
query_point = Point(lon, lat)
nearest = None
min_dist = float('inf')
for row in cur:
try:
geom = wkb.loads(row["shape"])
dist = query_point.distance(geom)
if dist < min_dist and dist < radius_deg:
min_dist = dist
nearest = dict(row)
nearest["geometry"] = geom
except Exception:
continue
if nearest:
# Convert geometry to GeoJSON
nearest["geojson"] = nearest["geometry"].__geo_interface__
del nearest["geometry"]
del nearest["shape"]
return nearest
except ImportError:
warnings.warn("shapely not available for nearest query")
return None
def close(self):
if self._conn:
self._conn.close()
self._conn = None
def get_mvum_access_grid(
south: float, north: float, west: float, east: float,
target_shape: Tuple[int, int],
mode: Literal["foot", "mtb", "atv", "vehicle"] = "atv",
check_date: Optional[str] = None,
db_path: Path = NAVI_DB_PATH
) -> np.ndarray:
"""
Get MVUM access grid for pathfinding.
Args:
south, north, west, east: Bounding box (WGS84)
target_shape: (rows, cols) to match elevation grid
mode: Travel mode (foot skips MVUM entirely)
check_date: Optional "MM/DD" string for seasonal checking
db_path: Path to navi.db
Returns:
np.ndarray of uint8:
0 = no MVUM data (defer to existing trail/friction logic)
1 = road/trail is OPEN to this vehicle mode
255 = road/trail EXISTS but is CLOSED to this mode
"""
# Foot mode bypasses MVUM entirely
if mode == "foot":
return np.zeros(target_shape, dtype=np.uint8)
# Parse check_date if provided
parsed_date = None
if check_date:
match = re.match(r"(\d{1,2})/(\d{1,2})", check_date)
if match:
parsed_date = (int(match.group(1)), int(match.group(2)))
# Initialize output grid
grid = np.zeros(target_shape, dtype=np.uint8)
rows, cols = target_shape
# Pixel size
pixel_lat = (north - south) / rows
pixel_lon = (east - west) / cols
reader = MVUMReader(db_path)
try:
# Query roads and trails
roads = reader.query_roads_bbox(south, north, west, east, mode, parsed_date)
trails = reader.query_trails_bbox(south, north, west, east, mode, parsed_date)
# Rasterize features
try:
from shapely import wkb
for features in [roads, trails]:
for feat in features:
try:
geom = wkb.loads(feat["shape"])
# Get geometry bounds
minx, miny, maxx, maxy = geom.bounds
# Check if intersects our bbox
if maxx < west or minx > east or maxy < south or miny > north:
continue
# Rasterize line
value = 1 if feat["access"] else 255
# Simple line rasterization
if geom.geom_type in ("LineString", "MultiLineString"):
if geom.geom_type == "MultiLineString":
coords_list = [list(line.coords) for line in geom.geoms]
else:
coords_list = [list(geom.coords)]
for coords in coords_list:
for i in range(len(coords) - 1):
x1, y1 = coords[i]
x2, y2 = coords[i + 1]
# Convert to pixel coordinates
col1 = int((x1 - west) / pixel_lon)
row1 = int((north - y1) / pixel_lat)
col2 = int((x2 - west) / pixel_lon)
row2 = int((north - y2) / pixel_lat)
# Bresenham's line algorithm
_draw_line(grid, row1, col1, row2, col2, value)
except Exception as e:
continue
except ImportError:
warnings.warn("shapely not available, MVUM rasterization skipped")
finally:
reader.close()
return grid
def _draw_line(grid: np.ndarray, r1: int, c1: int, r2: int, c2: int, value: int):
"""Draw a line on the grid using Bresenham's algorithm."""
rows, cols = grid.shape
dr = abs(r2 - r1)
dc = abs(c2 - c1)
sr = 1 if r1 < r2 else -1
sc = 1 if c1 < c2 else -1
err = dr - dc
r, c = r1, c1
while True:
if 0 <= r < rows and 0 <= c < cols:
# Only overwrite if current value is 0 (no data) or we're marking closed
if grid[r, c] == 0 or value == 255:
grid[r, c] = value
if r == r2 and c == c2:
break
e2 = 2 * err
if e2 > -dc:
err -= dc
r += sr
if e2 < dr:
err += dr
c += sc
if __name__ == "__main__":
import sys
print("=" * 60)
print("MVUM Reader Test")
print("=" * 60)
reader = MVUMReader()
if not reader.table_exists("mvum_roads"):
print("ERROR: mvum_roads table not found in navi.db")
sys.exit(1)
# Test bbox query (Sawtooth NF area)
print("\n[1] Testing bbox query (Sawtooth NF area)...")
roads = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv"
)
print(f" Found {len(roads)} roads")
open_count = sum(1 for r in roads if r["access"])
closed_count = sum(1 for r in roads if not r["access"])
print(f" Open to ATV: {open_count}")
print(f" Closed to ATV: {closed_count}")
# Test with seasonal date
print("\n[2] Testing with date check (July 15)...")
roads_summer = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv",
check_date=(7, 15)
)
open_summer = sum(1 for r in roads_summer if r["access"])
print(f" Open to ATV on 07/15: {open_summer}")
print("\n[3] Testing with date check (January 15)...")
roads_winter = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv",
check_date=(1, 15)
)
open_winter = sum(1 for r in roads_winter if r["access"])
print(f" Open to ATV on 01/15: {open_winter}")
# Test grid generation
print("\n[4] Testing grid generation...")
grid = get_mvum_access_grid(
south=43.5, north=44.0, west=-115.0, east=-114.0,
target_shape=(500, 1000),
mode="atv"
)
print(f" Grid shape: {grid.shape}")
print(f" No data (0): {np.sum(grid == 0)}")
print(f" Open (1): {np.sum(grid == 1)}")
print(f" Closed (255): {np.sum(grid == 255)}")
reader.close()
print("\nDone.")

414
lib/offroute/prototype.py Executable file
View file

@ -0,0 +1,414 @@
#!/usr/bin/env python3
"""
OFFROUTE Phase O3a Prototype
Validates trail burn-in integration with the MCP pathfinder.
The path should actively seek out trails and roads when nearby.
Compares paths with and without trail burn-in to show the benefit
of trail-seeking behavior.
"""
import json
import time
import sys
from pathlib import Path
import numpy as np
from skimage.graph import MCP_Geometric
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from lib.offroute.dem import DEMReader
from lib.offroute.cost import compute_cost_grid
from lib.offroute.friction import FrictionReader, friction_to_multiplier
from lib.offroute.barriers import BarrierReader, DEFAULT_BARRIERS_PATH
from lib.offroute.trails import TrailReader, DEFAULT_TRAILS_PATH
# Test bounding box - Idaho area
BBOX = {
"south": 42.21,
"north": 42.60,
"west": -114.76,
"east": -113.79,
}
# Start point: wilderness area away from roads
START_LAT = 42.35
START_LON = -114.60
# End point: near Twin Falls (has roads/trails)
END_LAT = 42.55
END_LON = -114.20
# Output files
OUTPUT_PATH_WITH_TRAILS = Path("/opt/recon/data/offroute-test-trails.geojson")
OUTPUT_PATH_NO_TRAILS = Path("/opt/recon/data/offroute-test-no-trails.geojson")
# Memory limit in GB
MEMORY_LIMIT_GB = 12
def check_memory_usage():
"""Check current memory usage and abort if over limit."""
try:
import psutil
process = psutil.Process()
mem_gb = process.memory_info().rss / (1024**3)
if mem_gb > MEMORY_LIMIT_GB:
print(f"ERROR: Memory usage {mem_gb:.1f}GB exceeds {MEMORY_LIMIT_GB}GB limit")
sys.exit(1)
return mem_gb
except ImportError:
return 0
def run_pathfinder(
elevation: np.ndarray,
meta: dict,
friction_mult: np.ndarray,
trails: np.ndarray,
barriers: np.ndarray,
use_trails: bool,
start_row: int,
start_col: int,
end_row: int,
end_col: int,
dem_reader: DEMReader,
) -> dict:
"""Run the MCP pathfinder with given parameters."""
# Compute cost grid
cost = compute_cost_grid(
elevation,
cell_size_m=meta["cell_size_m"],
friction=friction_mult,
trails=trails if use_trails else None,
barriers=barriers,
boundary_mode="pragmatic",
)
# Run MCP
mcp = MCP_Geometric(cost, fully_connected=True)
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
end_cost = cumulative_costs[end_row, end_col]
if np.isinf(end_cost):
return {
"success": False,
"reason": "No path found (blocked by impassable terrain)",
}
# Traceback path
path_indices = mcp.traceback((end_row, end_col))
# Convert to coordinates and collect stats
coordinates = []
elevations = []
trail_values = []
for row, col in path_indices:
lat, lon = dem_reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
trail_val = trails[row, col] if trails is not None else 0
coordinates.append([lon, lat])
elevations.append(elev)
trail_values.append(trail_val)
# Compute distance
total_distance_m = 0
for i in range(1, len(coordinates)):
lon1, lat1 = coordinates[i-1]
lon2, lat2 = coordinates[i]
R = 6371000
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
total_distance_m += R * c
# Elevation stats
elev_arr = np.array(elevations)
elev_diff = np.diff(elev_arr)
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Trail stats
trail_arr = np.array(trail_values)
road_cells = np.sum(trail_arr == 5)
track_cells = np.sum(trail_arr == 15)
trail_cells = np.sum(trail_arr == 25)
off_trail_cells = np.sum(trail_arr == 0)
on_trail_cells = road_cells + track_cells + trail_cells
total_cells = len(trail_arr)
return {
"success": True,
"coordinates": coordinates,
"total_time_seconds": float(end_cost),
"total_time_minutes": float(end_cost / 60),
"total_distance_m": float(total_distance_m),
"total_distance_km": float(total_distance_m / 1000),
"elevation_gain_m": float(elev_gain),
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"cell_count": total_cells,
"road_cells": int(road_cells),
"track_cells": int(track_cells),
"trail_cells": int(trail_cells),
"off_trail_cells": int(off_trail_cells),
"on_trail_pct": float(100 * on_trail_cells / total_cells) if total_cells > 0 else 0,
}
def main():
print("=" * 80)
print("OFFROUTE Phase O3a Prototype (Trail Burn-In)")
print("=" * 80)
t0 = time.time()
# Check for required rasters
if not DEFAULT_BARRIERS_PATH.exists():
print(f"\nERROR: Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
sys.exit(1)
if not DEFAULT_TRAILS_PATH.exists():
print(f"\nERROR: Trails raster not found at {DEFAULT_TRAILS_PATH}")
sys.exit(1)
# Step 1: Load elevation data
print(f"\n[1] Loading DEM for bbox: {BBOX}")
dem_reader = DEMReader()
elevation, meta = dem_reader.get_elevation_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
)
print(f" Elevation grid shape: {elevation.shape}")
print(f" Cell count: {elevation.size:,}")
print(f" Cell size: {meta['cell_size_m']:.1f} m")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 2: Load friction data
print(f"\n[2] Loading WorldCover friction layer...")
friction_reader = FrictionReader()
friction_raw = friction_reader.get_friction_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
friction_mult = friction_to_multiplier(friction_raw)
print(f" Friction grid shape: {friction_raw.shape}")
print(f" Water/impassable cells: {np.sum(np.isinf(friction_mult)):,}")
# Step 3: Load barrier data
print(f"\n[3] Loading PAD-US barrier layer...")
barrier_reader = BarrierReader()
barriers = barrier_reader.get_barrier_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
closed_cells = np.sum(barriers == 255)
print(f" Barrier grid shape: {barriers.shape}")
print(f" Closed/restricted cells: {closed_cells:,}")
# Step 4: Load trails data
print(f"\n[4] Loading OSM trails layer...")
trail_reader = TrailReader()
trails = trail_reader.get_trails_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
road_cells = np.sum(trails == 5)
track_cells = np.sum(trails == 15)
trail_cells = np.sum(trails == 25)
print(f" Trails grid shape: {trails.shape}")
print(f" Road cells: {road_cells:,}")
print(f" Track cells: {track_cells:,}")
print(f" Trail cells: {trail_cells:,}")
print(f" Total trail coverage: {100*(road_cells+track_cells+trail_cells)/trails.size:.2f}%")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 5: Convert start/end to pixel coordinates
print(f"\n[5] Converting coordinates...")
start_row, start_col = dem_reader.latlon_to_pixel(START_LAT, START_LON, meta)
end_row, end_col = dem_reader.latlon_to_pixel(END_LAT, END_LON, meta)
print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})")
print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})")
# Validate coordinates
rows, cols = elevation.shape
if not (0 <= start_row < rows and 0 <= start_col < cols):
print(f"ERROR: Start point outside grid bounds")
sys.exit(1)
if not (0 <= end_row < rows and 0 <= end_col < cols):
print(f"ERROR: End point outside grid bounds")
sys.exit(1)
# Step 6: Run pathfinder WITH trails
print(f"\n[6] Running pathfinder WITH trail burn-in...")
t6a = time.time()
result_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=True,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t6b = time.time()
print(f" Completed in {t6b - t6a:.1f}s")
# Step 7: Run pathfinder WITHOUT trails
print(f"\n[7] Running pathfinder WITHOUT trail burn-in...")
t7a = time.time()
result_no_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=False,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t7b = time.time()
print(f" Completed in {t7b - t7a:.1f}s")
# Step 8: Save GeoJSON outputs
print(f"\n[8] Saving GeoJSON outputs...")
OUTPUT_PATH_WITH_TRAILS.parent.mkdir(parents=True, exist_ok=True)
if result_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_with_trails",
"phase": "O3a",
"trail_burn_in": True,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_trails["coordinates"],
}
}
with open(OUTPUT_PATH_WITH_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_WITH_TRAILS}")
if result_no_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_no_trails",
"phase": "O3a",
"trail_burn_in": False,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_no_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_no_trails["coordinates"],
}
}
with open(OUTPUT_PATH_NO_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_NO_TRAILS}")
t_total = time.time()
# Final report
print(f"\n" + "=" * 80)
print("SIDE-BY-SIDE COMPARISON: Trail Burn-In Effect")
print("=" * 80)
if result_trails["success"] and result_no_trails["success"]:
print(f"{'Metric':<25} {'WITH TRAILS':<20} {'WITHOUT TRAILS':<20} {'Delta':<15}")
print("-" * 80)
metrics = [
("Distance (km)", "total_distance_km", ".2f"),
("Effort time (min)", "total_time_minutes", ".1f"),
("Cell count", "cell_count", "d"),
("Elevation gain (m)", "elevation_gain_m", ".0f"),
("On-trail %", "on_trail_pct", ".1f"),
("Road cells", "road_cells", "d"),
("Track cells", "track_cells", "d"),
("Trail cells", "trail_cells", "d"),
]
for label, key, fmt in metrics:
val_with = result_trails[key]
val_without = result_no_trails[key]
if isinstance(val_with, int):
delta = val_with - val_without
delta_str = f"{delta:+d}"
else:
delta = val_with - val_without
delta_str = f"{delta:+.2f}"
print(f"{label:<25} {val_with:<20{fmt}} {val_without:<20{fmt}} {delta_str:<15}")
# Analysis
print(f"\n" + "-" * 80)
print("ANALYSIS")
print("-" * 80)
time_saved = result_no_trails["total_time_minutes"] - result_trails["total_time_minutes"]
if time_saved > 0:
print(f"Trail burn-in saves {time_saved:.1f} minutes ({100*time_saved/result_no_trails['total_time_minutes']:.1f}% faster)")
elif time_saved < 0:
print(f"Trail burn-in adds {-time_saved:.1f} minutes (path seeks trails even if longer)")
on_trail_with = result_trails["on_trail_pct"]
on_trail_without = result_no_trails["on_trail_pct"]
if on_trail_with > on_trail_without:
print(f"Trail burn-in increases on-trail travel: {on_trail_without:.1f}% → {on_trail_with:.1f}%")
else:
print(f"Both paths have similar on-trail percentage")
else:
if not result_trails["success"]:
print(f"WITH TRAILS: FAILED - {result_trails.get('reason', 'unknown')}")
if not result_no_trails["success"]:
print(f"WITHOUT TRAILS: FAILED - {result_no_trails.get('reason', 'unknown')}")
print(f"\n" + "-" * 80)
print(f"Total wall time: {t_total - t0:.1f}s")
# Cleanup
dem_reader.close()
friction_reader.close()
barrier_reader.close()
trail_reader.close()
print("\nPrototype completed.")
if __name__ == "__main__":
main()

1682
lib/offroute/router.py Normal file

File diff suppressed because it is too large Load diff

174
lib/offroute/trails.py Normal file
View file

@ -0,0 +1,174 @@
"""
Trail corridor reader for OFFROUTE.
Provides access to the OSM-derived trail raster for pathfinding.
Trail values replace WorldCover friction where trails exist.
Raster values:
0 = no trail (use WorldCover friction)
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for trails layer support")
# Default path to the trails raster
DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif")
# Trail value to friction multiplier mapping
TRAIL_FRICTION_MAP = {
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
}
class TrailReader:
"""Reader for OSM-derived trail corridor raster."""
def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH):
self.trails_path = trails_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.trails_path.exists():
raise FileNotFoundError(
f"Trails raster not found at {self.trails_path}. "
f"Run the Phase B rasterization script first."
)
self._dataset = rasterio.open(self.trails_path)
return self._dataset
def get_trails_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get trail values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 trail values:
0 = no trail
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor to preserve discrete values
trails = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return trails
def sample_point(self, lat: float, lon: float) -> int:
"""Sample trail value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = no trail
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert trail values to friction multipliers.
Args:
trails: uint8 array of trail values (0, 5, 15, or 25)
Returns:
Tuple of:
- friction: float32 array of friction multipliers
- has_trail: bool array indicating where trails exist
"""
friction = np.ones_like(trails, dtype=np.float32)
has_trail = trails > 0
# Apply friction values where trails exist
friction[trails == 5] = 0.1 # road
friction[trails == 15] = 0.3 # track
friction[trails == 25] = 0.5 # foot trail
return friction, has_trail
if __name__ == "__main__":
print("Testing TrailReader...")
if not DEFAULT_TRAILS_PATH.exists():
print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}")
print("Run Phase B rasterization first.")
exit(1)
reader = TrailReader()
# Test point sampling - Twin Falls downtown (should have roads)
test_lat, test_lon = 42.563, -114.461
trail_value = reader.sample_point(test_lat, test_lon)
print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}")
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown")
print(f" Type: {label}")
# Test grid read for test bbox
trails = reader.get_trails_grid(
south=42.21, north=42.60, west=-114.76, east=-113.79,
target_shape=(400, 1000)
)
print(f"\nGrid test shape: {trails.shape}")
unique, counts = np.unique(trails, return_counts=True)
print("Value distribution:")
for v, c in zip(unique, counts):
pct = 100 * c / trails.size
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})")
print(f" {label}: {c:,} pixels ({pct:.2f}%)")
# Test conversion to friction
friction, has_trail = trails_to_friction(trails)
print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%")
print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}")
reader.close()
print("\nTrailReader test complete.")

View file

@ -77,10 +77,73 @@ def _text_hash(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _flatten_table(table_el):
"""Convert a <table> element to pipe-delimited text.
Each <tr> becomes a row with cells joined by ' | '.
Returns the formatted table as a string with blank lines around it.
"""
rows = []
for tr in table_el.iter('tr'):
cells = []
for cell in tr:
if cell.tag in ('td', 'th'):
cell_text = (cell.text_content() or '').strip()
# Collapse internal whitespace in each cell
cell_text = re.sub(r'\s+', ' ', cell_text)
if cell_text:
cells.append(cell_text)
if cells:
rows.append(' | '.join(cells))
if not rows:
return ''
return '\n'.join(rows)
def _preprocess_tree(doc):
"""Pre-process HTML tree to add delimiters before text_content() flattens it.
Handles: <table>, <br>, <li>, <dt>, <dd> -- elements that lxml's
text_content() would concatenate without any separators.
"""
from lxml import etree
# 1. Replace <table> elements with their pipe-delimited text
for table in list(doc.iter('table')):
formatted = _flatten_table(table)
if formatted:
replacement = etree.Element('div')
replacement.text = '\n\n' + formatted + '\n\n'
parent = table.getparent()
if parent is not None:
parent.replace(table, replacement)
else:
table.drop_tree()
# 2. <br> -> inject newline
for br in list(doc.iter('br')):
br.tail = '\n' + (br.tail or '')
# 3. <li> -> inject newline + "- " prefix
for li in list(doc.iter('li')):
li.text = '- ' + (li.text or '')
li.tail = '\n' + (li.tail or '')
# 4. <dt> -> inject newline before
for dt in list(doc.iter('dt')):
dt.tail = '\n' + (dt.tail or '')
# 5. <dd> -> inject newline + indent
for dd in list(doc.iter('dd')):
dd.text = ' ' + (dd.text or '')
dd.tail = '\n' + (dd.tail or '')
def _html_to_text(html_bytes):
"""Convert HTML bytes to clean text via lxml.
Strips nav, footer, script, style elements. Decodes entities.
Pre-processes tables, lists, and line breaks for proper delimiters.
Normalizes whitespace.
"""
try:
@ -93,6 +156,9 @@ def _html_to_text(html_bytes):
for el in doc.iter(tag):
el.drop_tree()
# Pre-process tree: tables -> pipe-delimited, br -> newlines, li -> dashes
_preprocess_tree(doc)
# Extract text
text = doc.text_content()