recon/lib/nav_tools.py
Matt dfab388769 feat(navi): add netsyms tier-2 geocoding + geocode API
Add Netsyms AddressDatabase2025 (159M US+CA addresses) as tier-2
in the geocode chain: address_book → netsyms → photon.

- lib/netsyms.py: SQLite lookup module (lazy, read-only, thread-safe)
- lib/netsyms_api.py: Flask blueprints for /api/netsyms/* and /api/geocode
- lib/netsyms_test.py: 7 test cases (street, free-text, zipcode, health)
- lib/nav_tools.py: new geocode() with consistent {name,lat,lon,source,raw}
- lib/api.py: register netsyms_bp and geocode_bp

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-20 07:24:09 +00:00

286 lines
9.6 KiB
Python

"""Navigation tools: geocoding via Photon and routing via Valhalla."""
import re
import requests
from .utils import setup_logging
logger = setup_logging('recon.nav_tools')
PHOTON_URL = "http://localhost:2322"
VALHALLA_URL = "http://localhost:8002"
_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$')
VALID_MODES = {"auto", "pedestrian", "bicycle", "truck"}
def _parse_coords(text: str):
"""Return (lat, lon) if text looks like coordinates, else None."""
m = _COORD_RE.match(text.strip())
if m:
return float(m.group(1)), float(m.group(2))
return None
def _geocode(query: str):
"""Geocode a place name via address book then Photon. Returns (lat, lon, display_name) or raises."""
coords = _parse_coords(query)
if coords:
return coords[0], coords[1], query
# ── Address book lookup (before Photon) ──
try:
from . import address_book
match = address_book.lookup(query)
if match and match['confidence'] == 'exact' and match.get('lat') and match.get('lon'):
logger.info("Address book exact match: %r%s (%s, %s)",
query, match['name'], match['lat'], match['lon'])
return match['lat'], match['lon'], match.get('address') or match['name']
elif match and match['confidence'] == 'partial':
logger.info("Address book partial match: %r%s (falling through to Photon)",
query, match['name'])
except Exception as e:
logger.debug("Address book lookup failed: %s", e)
# ── Photon geocoding ──
try:
resp = requests.get(
f"{PHOTON_URL}/api",
params={"q": query, "limit": 1},
timeout=10,
)
resp.raise_for_status()
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
data = resp.json()
features = data.get("features", [])
if not features:
raise ValueError(f"Could not find location: {query}")
props = features[0]["properties"]
coords = features[0]["geometry"]["coordinates"] # [lon, lat]
parts = [props.get("name", "")]
for key in ("city", "county", "state", "country"):
v = props.get(key)
if v and v != parts[-1]:
parts.append(v)
display = ", ".join(p for p in parts if p)
return coords[1], coords[0], display # lat, lon
def geocode(query: str):
"""
Three-tier geocode chain returning a consistent shape.
Chain: address_book (exact) → netsyms → photon.
Returns dict with {name, lat, lon, source, raw} or None.
"""
coords = _parse_coords(query)
if coords:
return {
'name': query,
'lat': coords[0],
'lon': coords[1],
'source': 'coordinates',
'raw': None,
}
# ── Tier 1: Address book (exact match only) ──
ab_partial = None
try:
from . import address_book
match = address_book.lookup(query)
if match and match['confidence'] == 'exact' and match.get('lat') and match.get('lon'):
logger.info("geocode: address_book exact match: %r%s", query, match['name'])
return {
'name': match.get('address') or match['name'],
'lat': match['lat'],
'lon': match['lon'],
'source': 'address_book',
'raw': match,
}
elif match and match['confidence'] == 'partial':
logger.info("geocode: address_book partial match: %r%s (continuing chain)",
query, match['name'])
ab_partial = match
except Exception as e:
logger.debug("geocode: address_book lookup failed: %s", e)
# ── Tier 2: Netsyms (159M US+CA addresses) ──
netsyms_result = None
try:
from . import netsyms
results = netsyms.lookup_free_text(query)
if results:
# Prefer results with plus4 (more precise)
best = results[0]
for r in results:
if r.get('plus4') and not best.get('plus4'):
best = r
break
addr_parts = [best['number'], best['street']]
if best.get('street2'):
addr_parts.append(best['street2'])
addr_parts.extend([best['city'], best['state'], best['zipcode']])
display = ' '.join(p for p in addr_parts if p)
netsyms_result = {
'name': display,
'lat': best['lat'],
'lon': best['lon'],
'source': 'netsyms',
'raw': best,
}
logger.info("geocode: netsyms match: %r%s", query, display)
return netsyms_result
except Exception as e:
logger.debug("geocode: netsyms lookup failed: %s", e)
# ── Tier 3: Photon (global geocoding) ──
try:
resp = requests.get(
f"{PHOTON_URL}/api",
params={"q": query, "limit": 1},
timeout=2,
)
resp.raise_for_status()
data = resp.json()
features = data.get("features", [])
if features:
props = features[0]["properties"]
coords = features[0]["geometry"]["coordinates"] # [lon, lat]
parts = [props.get("name", "")]
for key in ("city", "county", "state", "country"):
v = props.get(key)
if v and v != parts[-1]:
parts.append(v)
display = ", ".join(p for p in parts if p)
logger.info("geocode: photon match: %r%s", query, display)
return {
'name': display,
'lat': coords[1],
'lon': coords[0],
'source': 'photon',
'raw': props,
}
except Exception as e:
logger.debug("geocode: photon lookup failed: %s", e)
# ── Fallback: address book partial match ──
if ab_partial and ab_partial.get('lat') and ab_partial.get('lon'):
logger.info("geocode: falling back to address_book partial: %r%s",
query, ab_partial['name'])
return {
'name': ab_partial.get('address') or ab_partial['name'],
'lat': ab_partial['lat'],
'lon': ab_partial['lon'],
'source': 'address_book',
'raw': ab_partial,
}
logger.info("geocode: no match for %r across all tiers", query)
return None
def reverse_geocode(lat: float, lon: float) -> str:
"""Reverse geocode coordinates via Photon. Returns formatted address string."""
try:
resp = requests.get(
f"{PHOTON_URL}/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
data = resp.json()
features = data.get("features", [])
if not features:
return f"{lat}, {lon}"
props = features[0]["properties"]
parts = []
for key in ("name", "housenumber", "street", "city", "state", "country", "postcode"):
v = props.get(key)
if v:
parts.append(v)
return ", ".join(parts) if parts else f"{lat}, {lon}"
def route(origin: str, destination: str, mode: str = "auto") -> dict:
"""
Get a route between two locations.
Args:
origin: Starting location — address, place name, or "lat,lon"
destination: Destination — address, place name, or "lat,lon"
mode: Travel mode — auto, pedestrian, bicycle, truck
Returns:
dict with summary, maneuvers, origin/destination info, and raw shape
"""
if mode not in VALID_MODES:
mode = "auto"
# Geocode both endpoints
orig_lat, orig_lon, orig_name = _geocode(origin)
dest_lat, dest_lon, dest_name = _geocode(destination)
# Query Valhalla
valhalla_req = {
"locations": [
{"lat": orig_lat, "lon": orig_lon},
{"lat": dest_lat, "lon": dest_lon},
],
"costing": mode,
"directions_options": {"units": "miles"},
}
try:
resp = requests.post(
f"{VALHALLA_URL}/route",
json=valhalla_req,
timeout=30,
)
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
if resp.status_code != 200:
try:
err = resp.json()
msg = err.get("error", "Unknown routing error")
except Exception:
msg = f"Routing error (HTTP {resp.status_code})"
raise RuntimeError(f"No route found between locations: {msg}")
data = resp.json()
trip = data["trip"]
summary = trip["summary"]
leg = trip["legs"][0]
# Build maneuver list
maneuvers = []
for m in leg["maneuvers"]:
streets = m.get("street_names", [])
maneuvers.append({
"instruction": m["instruction"],
"distance_miles": round(m.get("length", 0), 2),
"street_name": streets[0] if streets else "",
"type": m.get("type", 0),
"verbal_succinct": m.get("verbal_succinct_transition_instruction", ""),
})
return {
"origin": {"name": orig_name, "lat": orig_lat, "lon": orig_lon},
"destination": {"name": dest_name, "lat": dest_lat, "lon": dest_lon},
"summary": {
"distance_miles": round(summary["length"], 1),
"time_minutes": round(summary["time"] / 60, 1),
"mode": mode,
},
"maneuvers": maneuvers,
"shape": leg.get("shape", ""),
}