From a9510b5ed9d234fb94b7d6d58be0e9687fe2fad9 Mon Sep 17 00:00:00 2001 From: Matt Date: Sun, 19 Apr 2026 22:14:26 +0000 Subject: [PATCH] feat(navi): add nav_tools with route() and reverse_geocode() - Phase H2 - nav_tools.py: route() geocodes via Photon, routes via Valhalla, returns summary/maneuvers/polyline. reverse_geocode() for coordinate lookups. Supports auto/pedestrian/bicycle/truck modes. - nav_tools_test.py: 5 live tests against local Photon (2322) and Valhalla (8002) - aurora_nav_tool.py: Open WebUI Tool exposing get_directions to Aurora LLM Co-Authored-By: Claude Opus 4.6 --- lib/aurora_nav_tool.py | 122 ++++++++++++++++++++++++++++++++ lib/nav_tools.py | 153 +++++++++++++++++++++++++++++++++++++++++ lib/nav_tools_test.py | 77 +++++++++++++++++++++ 3 files changed, 352 insertions(+) create mode 100644 lib/aurora_nav_tool.py create mode 100644 lib/nav_tools.py create mode 100644 lib/nav_tools_test.py diff --git a/lib/aurora_nav_tool.py b/lib/aurora_nav_tool.py new file mode 100644 index 0000000..ef4b604 --- /dev/null +++ b/lib/aurora_nav_tool.py @@ -0,0 +1,122 @@ +""" +title: Navigation +author: Echo6 +version: 1.0.0 +description: Turn-by-turn directions and geocoding via Photon + Valhalla on recon-vm. Supports driving, walking, cycling, and truck routing with worldwide coverage (281M places). +""" + +import re +import json +import requests +from pydantic import BaseModel, Field + +_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$') + + +class Tools: + class Valves(BaseModel): + photon_url: str = Field( + default="http://100.64.0.24:2322", + description="Photon geocoding service URL (recon-vm)", + ) + valhalla_url: str = Field( + default="http://100.64.0.24:8002", + description="Valhalla routing service URL (recon-vm)", + ) + + def __init__(self): + self.valves = self.Valves() + + def _geocode(self, query: str): + m = _COORD_RE.match(query.strip()) + if m: + lat, lon = float(m.group(1)), float(m.group(2)) + return lat, lon, query + resp = requests.get( + f"{self.valves.photon_url}/api", + params={"q": query, "limit": 1}, + timeout=10, + ) + resp.raise_for_status() + features = resp.json().get("features", []) + if not features: + return None, None, None + props = features[0]["properties"] + coords = features[0]["geometry"]["coordinates"] + parts = [props.get("name", "")] + for key in ("city", "state", "country"): + v = props.get(key) + if v and v != parts[-1]: + parts.append(v) + return coords[1], coords[0], ", ".join(p for p in parts if p) + + def get_directions( + self, + origin: str, + destination: str, + mode: str = "auto", + ) -> str: + """ + Get turn-by-turn driving, walking, or cycling directions between two locations. + Use this when someone asks how to get somewhere, asks for directions, or wants to know distance/time between places. + + :param origin: Starting location — address, place name, or lat,lon coordinates + :param destination: Destination — address, place name, or lat,lon coordinates + :param mode: Travel mode: auto, pedestrian, bicycle, or truck (default: auto) + :return: Directions with distance, time, and turn-by-turn maneuvers + """ + if mode not in ("auto", "pedestrian", "bicycle", "truck"): + mode = "auto" + + # Geocode origin + orig_lat, orig_lon, orig_name = self._geocode(origin) + if orig_lat is None: + return json.dumps({"error": f"Could not find location: {origin}"}) + + # Geocode destination + dest_lat, dest_lon, dest_name = self._geocode(destination) + if dest_lat is None: + return json.dumps({"error": f"Could not find location: {destination}"}) + + # Route via Valhalla + try: + resp = requests.post( + f"{self.valves.valhalla_url}/route", + json={ + "locations": [ + {"lat": orig_lat, "lon": orig_lon}, + {"lat": dest_lat, "lon": dest_lon}, + ], + "costing": mode, + "directions_options": {"units": "miles"}, + }, + timeout=30, + ) + except requests.RequestException: + return json.dumps({"error": "Navigation service unavailable"}) + + if resp.status_code != 200: + return json.dumps({"error": "No route found between locations"}) + + trip = resp.json()["trip"] + summary = trip["summary"] + maneuvers = [] + for m in trip["legs"][0]["maneuvers"]: + streets = m.get("street_names", []) + entry = { + "instruction": m["instruction"], + "distance_miles": round(m.get("length", 0), 2), + } + if streets: + entry["street"] = streets[0] + maneuvers.append(entry) + + result = { + "origin": orig_name, + "destination": dest_name, + "distance_miles": round(summary["length"], 1), + "time_minutes": round(summary["time"] / 60, 1), + "mode": mode, + "maneuvers": maneuvers, + } + return json.dumps(result) diff --git a/lib/nav_tools.py b/lib/nav_tools.py new file mode 100644 index 0000000..f6db5e6 --- /dev/null +++ b/lib/nav_tools.py @@ -0,0 +1,153 @@ +"""Navigation tools: geocoding via Photon and routing via Valhalla.""" + +import re +import requests + +PHOTON_URL = "http://localhost:2322" +VALHALLA_URL = "http://localhost:8002" + +_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$') + +VALID_MODES = {"auto", "pedestrian", "bicycle", "truck"} + + +def _parse_coords(text: str): + """Return (lat, lon) if text looks like coordinates, else None.""" + m = _COORD_RE.match(text.strip()) + if m: + return float(m.group(1)), float(m.group(2)) + return None + + +def _geocode(query: str): + """Geocode a place name via Photon. Returns (lat, lon, display_name) or raises.""" + coords = _parse_coords(query) + if coords: + return coords[0], coords[1], query + + try: + resp = requests.get( + f"{PHOTON_URL}/api", + params={"q": query, "limit": 1}, + timeout=10, + ) + resp.raise_for_status() + except requests.RequestException: + raise RuntimeError("Navigation service unavailable") + + data = resp.json() + features = data.get("features", []) + if not features: + raise ValueError(f"Could not find location: {query}") + + props = features[0]["properties"] + coords = features[0]["geometry"]["coordinates"] # [lon, lat] + parts = [props.get("name", "")] + for key in ("city", "county", "state", "country"): + v = props.get(key) + if v and v != parts[-1]: + parts.append(v) + display = ", ".join(p for p in parts if p) + return coords[1], coords[0], display # lat, lon + + +def reverse_geocode(lat: float, lon: float) -> str: + """Reverse geocode coordinates via Photon. Returns formatted address string.""" + try: + resp = requests.get( + f"{PHOTON_URL}/reverse", + params={"lat": lat, "lon": lon, "limit": 1}, + timeout=10, + ) + resp.raise_for_status() + except requests.RequestException: + raise RuntimeError("Navigation service unavailable") + + data = resp.json() + features = data.get("features", []) + if not features: + return f"{lat}, {lon}" + + props = features[0]["properties"] + parts = [] + for key in ("name", "housenumber", "street", "city", "state", "country", "postcode"): + v = props.get(key) + if v: + parts.append(v) + return ", ".join(parts) if parts else f"{lat}, {lon}" + + +def route(origin: str, destination: str, mode: str = "auto") -> dict: + """ + Get a route between two locations. + + Args: + origin: Starting location — address, place name, or "lat,lon" + destination: Destination — address, place name, or "lat,lon" + mode: Travel mode — auto, pedestrian, bicycle, truck + + Returns: + dict with summary, maneuvers, origin/destination info, and raw shape + """ + if mode not in VALID_MODES: + mode = "auto" + + # Geocode both endpoints + orig_lat, orig_lon, orig_name = _geocode(origin) + dest_lat, dest_lon, dest_name = _geocode(destination) + + # Query Valhalla + valhalla_req = { + "locations": [ + {"lat": orig_lat, "lon": orig_lon}, + {"lat": dest_lat, "lon": dest_lon}, + ], + "costing": mode, + "directions_options": {"units": "miles"}, + } + + try: + resp = requests.post( + f"{VALHALLA_URL}/route", + json=valhalla_req, + timeout=30, + ) + except requests.RequestException: + raise RuntimeError("Navigation service unavailable") + + if resp.status_code != 200: + try: + err = resp.json() + msg = err.get("error", "Unknown routing error") + except Exception: + msg = f"Routing error (HTTP {resp.status_code})" + raise RuntimeError(f"No route found between locations: {msg}") + + data = resp.json() + trip = data["trip"] + summary = trip["summary"] + leg = trip["legs"][0] + + # Build maneuver list + maneuvers = [] + for m in leg["maneuvers"]: + streets = m.get("street_names", []) + maneuvers.append({ + "instruction": m["instruction"], + "distance_miles": round(m.get("length", 0), 2), + "street_name": streets[0] if streets else "", + "type": m.get("type", 0), + "verbal_succinct": m.get("verbal_succinct_transition_instruction", ""), + }) + + return { + "origin": {"name": orig_name, "lat": orig_lat, "lon": orig_lon}, + "destination": {"name": dest_name, "lat": dest_lat, "lon": dest_lon}, + "summary": { + "distance_miles": round(summary["length"], 1), + "time_minutes": round(summary["time"] / 60, 1), + "mode": mode, + }, + "maneuvers": maneuvers, + "shape": leg.get("shape", ""), + } diff --git a/lib/nav_tools_test.py b/lib/nav_tools_test.py new file mode 100644 index 0000000..b987293 --- /dev/null +++ b/lib/nav_tools_test.py @@ -0,0 +1,77 @@ +"""Tests for nav_tools — run against live Photon + Valhalla services.""" + +import sys +import json + +from nav_tools import route, reverse_geocode + + +def test_route_named(): + """route("Buhl Idaho", "Boise Idaho", "auto") returns maneuvers.""" + print("TEST 1: route('Buhl Idaho', 'Boise Idaho', 'auto')") + r = route("Buhl Idaho", "Boise Idaho", "auto") + assert r["summary"]["distance_miles"] > 50, f"Expected >50 mi, got {r['summary']['distance_miles']}" + assert r["summary"]["time_minutes"] > 60, f"Expected >60 min, got {r['summary']['time_minutes']}" + assert len(r["maneuvers"]) > 5, f"Expected >5 maneuvers, got {len(r['maneuvers'])}" + assert r["shape"], "Missing polyline shape" + print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min, {len(r['maneuvers'])} maneuvers") + print(f" Origin: {r['origin']['name']}") + print(f" Destination: {r['destination']['name']}") + print(f" First maneuver: {r['maneuvers'][0]['instruction']}") + + +def test_route_coords(): + """route with raw lat,lon coordinates.""" + print("\nTEST 2: route('42.5991,-114.7636', '43.615,-116.2023', 'auto')") + r = route("42.5991,-114.7636", "43.615,-116.2023", "auto") + assert r["summary"]["distance_miles"] > 100, f"Expected >100 mi, got {r['summary']['distance_miles']}" + assert len(r["maneuvers"]) > 3, f"Expected >3 maneuvers" + print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min") + + +def test_route_pedestrian(): + """route with pedestrian mode.""" + print("\nTEST 3: route('Buhl Idaho', 'Boise Idaho', 'pedestrian')") + r = route("Buhl Idaho", "Boise Idaho", "pedestrian") + assert r["summary"]["mode"] == "pedestrian" + assert r["summary"]["time_minutes"] > r["summary"]["distance_miles"], "Walking should take more min than miles" + print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min (pedestrian)") + + +def test_reverse_geocode(): + """reverse_geocode near Buhl, Idaho.""" + print("\nTEST 4: reverse_geocode(42.5991, -114.7636)") + result = reverse_geocode(42.5991, -114.7636) + assert "Buhl" in result or "Twin Falls" in result or "Idaho" in result, f"Expected Buhl/Idaho, got: {result}" + print(f" OK — {result}") + + +def test_route_bad_origin(): + """route with nonexistent place returns clean error.""" + print("\nTEST 5: route('nonexistent place xyz123abc', 'Boise Idaho')") + try: + r = route("nonexistent place xyz123abc", "Boise Idaho") + print(f" FAIL — expected error, got result: {r['summary']}") + return False + except ValueError as e: + print(f" OK — clean error: {e}") + except RuntimeError as e: + print(f" OK — runtime error: {e}") + + +if __name__ == "__main__": + passed = 0 + failed = 0 + tests = [test_route_named, test_route_coords, test_route_pedestrian, test_reverse_geocode, test_route_bad_origin] + + for test in tests: + try: + test() + passed += 1 + except Exception as e: + print(f" FAIL — {e}") + failed += 1 + + print(f"\n{'='*40}") + print(f"Results: {passed} passed, {failed} failed out of {len(tests)}") + sys.exit(1 if failed else 0)