mirror of
https://github.com/zvx-echo6/recon.git
synced 2026-05-20 14:44:54 +02:00
feat(navi): add nav_tools with route() and reverse_geocode() - Phase H2
- nav_tools.py: route() geocodes via Photon, routes via Valhalla, returns summary/maneuvers/polyline. reverse_geocode() for coordinate lookups. Supports auto/pedestrian/bicycle/truck modes. - nav_tools_test.py: 5 live tests against local Photon (2322) and Valhalla (8002) - aurora_nav_tool.py: Open WebUI Tool exposing get_directions to Aurora LLM Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
491a4350fc
commit
a9510b5ed9
3 changed files with 352 additions and 0 deletions
153
lib/nav_tools.py
Normal file
153
lib/nav_tools.py
Normal file
|
|
@ -0,0 +1,153 @@
|
|||
"""Navigation tools: geocoding via Photon and routing via Valhalla."""
|
||||
|
||||
import re
|
||||
import requests
|
||||
|
||||
PHOTON_URL = "http://localhost:2322"
|
||||
VALHALLA_URL = "http://localhost:8002"
|
||||
|
||||
_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$')
|
||||
|
||||
VALID_MODES = {"auto", "pedestrian", "bicycle", "truck"}
|
||||
|
||||
|
||||
def _parse_coords(text: str):
|
||||
"""Return (lat, lon) if text looks like coordinates, else None."""
|
||||
m = _COORD_RE.match(text.strip())
|
||||
if m:
|
||||
return float(m.group(1)), float(m.group(2))
|
||||
return None
|
||||
|
||||
|
||||
def _geocode(query: str):
|
||||
"""Geocode a place name via Photon. Returns (lat, lon, display_name) or raises."""
|
||||
coords = _parse_coords(query)
|
||||
if coords:
|
||||
return coords[0], coords[1], query
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{PHOTON_URL}/api",
|
||||
params={"q": query, "limit": 1},
|
||||
timeout=10,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except requests.RequestException:
|
||||
raise RuntimeError("Navigation service unavailable")
|
||||
|
||||
data = resp.json()
|
||||
features = data.get("features", [])
|
||||
if not features:
|
||||
raise ValueError(f"Could not find location: {query}")
|
||||
|
||||
props = features[0]["properties"]
|
||||
coords = features[0]["geometry"]["coordinates"] # [lon, lat]
|
||||
parts = [props.get("name", "")]
|
||||
for key in ("city", "county", "state", "country"):
|
||||
v = props.get(key)
|
||||
if v and v != parts[-1]:
|
||||
parts.append(v)
|
||||
display = ", ".join(p for p in parts if p)
|
||||
return coords[1], coords[0], display # lat, lon
|
||||
|
||||
|
||||
def reverse_geocode(lat: float, lon: float) -> str:
|
||||
"""Reverse geocode coordinates via Photon. Returns formatted address string."""
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{PHOTON_URL}/reverse",
|
||||
params={"lat": lat, "lon": lon, "limit": 1},
|
||||
timeout=10,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except requests.RequestException:
|
||||
raise RuntimeError("Navigation service unavailable")
|
||||
|
||||
data = resp.json()
|
||||
features = data.get("features", [])
|
||||
if not features:
|
||||
return f"{lat}, {lon}"
|
||||
|
||||
props = features[0]["properties"]
|
||||
parts = []
|
||||
for key in ("name", "housenumber", "street", "city", "state", "country", "postcode"):
|
||||
v = props.get(key)
|
||||
if v:
|
||||
parts.append(v)
|
||||
return ", ".join(parts) if parts else f"{lat}, {lon}"
|
||||
|
||||
|
||||
def route(origin: str, destination: str, mode: str = "auto") -> dict:
|
||||
"""
|
||||
Get a route between two locations.
|
||||
|
||||
Args:
|
||||
origin: Starting location — address, place name, or "lat,lon"
|
||||
destination: Destination — address, place name, or "lat,lon"
|
||||
mode: Travel mode — auto, pedestrian, bicycle, truck
|
||||
|
||||
Returns:
|
||||
dict with summary, maneuvers, origin/destination info, and raw shape
|
||||
"""
|
||||
if mode not in VALID_MODES:
|
||||
mode = "auto"
|
||||
|
||||
# Geocode both endpoints
|
||||
orig_lat, orig_lon, orig_name = _geocode(origin)
|
||||
dest_lat, dest_lon, dest_name = _geocode(destination)
|
||||
|
||||
# Query Valhalla
|
||||
valhalla_req = {
|
||||
"locations": [
|
||||
{"lat": orig_lat, "lon": orig_lon},
|
||||
{"lat": dest_lat, "lon": dest_lon},
|
||||
],
|
||||
"costing": mode,
|
||||
"directions_options": {"units": "miles"},
|
||||
}
|
||||
|
||||
try:
|
||||
resp = requests.post(
|
||||
f"{VALHALLA_URL}/route",
|
||||
json=valhalla_req,
|
||||
timeout=30,
|
||||
)
|
||||
except requests.RequestException:
|
||||
raise RuntimeError("Navigation service unavailable")
|
||||
|
||||
if resp.status_code != 200:
|
||||
try:
|
||||
err = resp.json()
|
||||
msg = err.get("error", "Unknown routing error")
|
||||
except Exception:
|
||||
msg = f"Routing error (HTTP {resp.status_code})"
|
||||
raise RuntimeError(f"No route found between locations: {msg}")
|
||||
|
||||
data = resp.json()
|
||||
trip = data["trip"]
|
||||
summary = trip["summary"]
|
||||
leg = trip["legs"][0]
|
||||
|
||||
# Build maneuver list
|
||||
maneuvers = []
|
||||
for m in leg["maneuvers"]:
|
||||
streets = m.get("street_names", [])
|
||||
maneuvers.append({
|
||||
"instruction": m["instruction"],
|
||||
"distance_miles": round(m.get("length", 0), 2),
|
||||
"street_name": streets[0] if streets else "",
|
||||
"type": m.get("type", 0),
|
||||
"verbal_succinct": m.get("verbal_succinct_transition_instruction", ""),
|
||||
})
|
||||
|
||||
return {
|
||||
"origin": {"name": orig_name, "lat": orig_lat, "lon": orig_lon},
|
||||
"destination": {"name": dest_name, "lat": dest_lat, "lon": dest_lon},
|
||||
"summary": {
|
||||
"distance_miles": round(summary["length"], 1),
|
||||
"time_minutes": round(summary["time"] / 60, 1),
|
||||
"mode": mode,
|
||||
},
|
||||
"maneuvers": maneuvers,
|
||||
"shape": leg.get("shape", ""),
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue