Merge feature/navi-integration: Navi backend (address book, Netsyms, geocoding chain, reverse endpoint)

This commit is contained in:
Matt 2026-04-20 22:40:03 +00:00
commit d4c5c371ca
15 changed files with 2163 additions and 0 deletions

18
config/address_book.yaml Normal file
View file

@ -0,0 +1,18 @@
# RECON Address Book — saved locations for navigation shortcuts.
# Entries are matched by name and aliases (case-insensitive).
# Add new entries by appending to the list below.
entries:
- id: home
name: Home
aliases:
- home
- matt's house
- 214 north st
- 214 north street
address: "214 North St, Filer, ID 83328"
lat: 42.5735833
lon: -114.6066389
tags:
- residence
- primary

160
lib/address_book.py Normal file
View file

@ -0,0 +1,160 @@
"""
RECON Address Book YAML-backed saved-location lookup.
Provides named locations (home, work, etc.) that short-circuit Photon
geocoding when an exact alias match is found.
Config: /opt/recon/config/address_book.yaml
"""
import os
import re
import threading
import yaml
from .utils import setup_logging
logger = setup_logging('recon.address_book')
_CONFIG_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'config', 'address_book.yaml',
)
_lock = threading.Lock()
_entries: list[dict] = []
_mtime: float = 0.0
def _reload_if_changed():
"""Reload the YAML file if its mtime has changed."""
global _entries, _mtime
try:
st = os.stat(_CONFIG_PATH)
except FileNotFoundError:
logger.warning("Address book not found: %s", _CONFIG_PATH)
_entries = []
_mtime = 0.0
return
if st.st_mtime == _mtime:
return
with _lock:
# Double-check after acquiring lock
try:
st = os.stat(_CONFIG_PATH)
except FileNotFoundError:
_entries = []
_mtime = 0.0
return
if st.st_mtime == _mtime:
return
with open(_CONFIG_PATH, 'r') as f:
data = yaml.safe_load(f) or {}
raw = data.get('entries', [])
loaded = []
for entry in raw:
# Normalise aliases to lowercase for matching
aliases = [a.lower() for a in entry.get('aliases', [])]
loaded.append({
'id': entry.get('id', ''),
'name': entry.get('name', ''),
'aliases': aliases,
'address': entry.get('address', ''),
'lat': entry.get('lat'),
'lon': entry.get('lon'),
'tags': entry.get('tags', []),
})
_entries = loaded
_mtime = st.st_mtime
logger.info("Address book loaded: %d entries from %s", len(_entries), _CONFIG_PATH)
def load():
"""Ensure the address book is loaded (and refreshed if the file changed)."""
_reload_if_changed()
return _entries
def _normalize(text: str) -> str:
"""Lowercase, strip, remove commas, collapse whitespace."""
t = text.strip().lower()
t = t.replace(',', ' ')
return ' '.join(t.split())
def lookup(query: str):
"""
Look up a query against name and aliases.
Returns dict with the matching entry plus a 'confidence' field:
- "exact": full name/alias match, OR query starts with alias + word boundary
- "partial": alias starts with query + word boundary, or alias appears
as a contiguous token sequence inside the query
- None if no match
Matching order (first exact wins, else first partial):
1. normalized(query) == normalized(name or alias) exact
2. normalized(query) starts with normalized(alias) + " " exact
3. normalized(alias) starts with normalized(query) + " " partial
4. normalized(alias) is a contiguous token sub-sequence partial
"""
_reload_if_changed()
q = _normalize(query)
if not q:
return None
first_exact = None
first_partial = None
for entry in _entries:
norm_name = _normalize(entry['name'])
check_aliases = [_normalize(a) for a in entry.get('aliases', [])]
all_forms = [norm_name] + check_aliases
for form in all_forms:
if not form:
continue
# Rule 1: exact match
if q == form:
return {**entry, 'confidence': 'exact'}
# Rule 2: query starts with alias + word boundary
if q.startswith(form + ' '):
if first_exact is None:
first_exact = entry
continue
# Rule 3: alias starts with query (user still typing)
if form.startswith(q) and len(q) < len(form):
if first_partial is None:
first_partial = entry
continue
# Rule 4: alias is contiguous token sub-sequence in query
# Build regex: token1\s+token2\s+...tokenN
tokens = form.split()
if len(tokens) >= 1:
pattern = r'(?:^|\s)' + r'\s+'.join(re.escape(t) for t in tokens) + r'(?:\s|$)'
if re.search(pattern, q):
if first_partial is None:
first_partial = entry
if first_exact is not None:
return {**first_exact, 'confidence': 'exact'}
if first_partial is not None:
return {**first_partial, 'confidence': 'partial'}
return None
def list_all():
"""Return all address book entries."""
_reload_if_changed()
return list(_entries)

31
lib/address_book_api.py Normal file
View file

@ -0,0 +1,31 @@
"""
RECON Address Book API Flask Blueprint.
GET /api/address_book/lookup?q=<query> best match or 404
GET /api/address_book/list all entries
"""
from flask import Blueprint, request, jsonify
from . import address_book
address_book_bp = Blueprint('address_book', __name__)
@address_book_bp.route('/api/address_book/lookup')
def api_address_book_lookup():
q = request.args.get('q', '').strip()
if not q:
return jsonify({'error': 'Missing q parameter'}), 400
result = address_book.lookup(q)
if result is None:
return '', 404
return jsonify(result)
@address_book_bp.route('/api/address_book/list')
def api_address_book_list():
entries = address_book.list_all()
return jsonify(entries)

91
lib/address_book_test.py Normal file
View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""Tests for RECON address book module."""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib import address_book
TESTS = [
# ── Existing tests ──
("lookup('home') → exact",
lambda: address_book.lookup("home"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('Home') → exact (case-insensitive)",
lambda: address_book.lookup("Home"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 north st') → exact via alias",
lambda: address_book.lookup("214 north st"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 North Street') → exact via alias",
lambda: address_book.lookup("214 North Street"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('nonexistent place') → None",
lambda: address_book.lookup("nonexistent place"),
lambda r: r is None),
("list_all() → 1 entry",
lambda: address_book.list_all(),
lambda r: isinstance(r, list) and len(r) == 1 and r[0]['id'] == 'home'),
# ── New prefix+boundary tests ──
("lookup('214 north st filer') → exact (query starts with alias)",
lambda: address_book.lookup("214 north st filer"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 North St Filer ID') → exact (case + trailing state)",
lambda: address_book.lookup("214 North St Filer ID"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 north st, filer, id') → exact (commas stripped)",
lambda: address_book.lookup("214 north st, filer, id"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('home today') → exact (short alias + trailing text)",
lambda: address_book.lookup("home today"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214') → partial (query is prefix of alias)",
lambda: address_book.lookup("214"),
lambda r: r is not None and r['confidence'] == 'partial'),
("lookup('214 n') → partial (partial prefix of alias)",
lambda: address_book.lookup("214 n"),
lambda r: r is not None and r['confidence'] == 'partial'),
("lookup('completely unrelated query') → None",
lambda: address_book.lookup("completely unrelated query"),
lambda r: r is None),
("lookup('214 north streets of filer') → None (no word boundary after st)",
lambda: address_book.lookup("214 north streets of filer"),
lambda r: r is None),
]
passed = 0
failed = 0
for name, fn, check in TESTS:
try:
result = fn()
ok = check(result)
except Exception as e:
ok = False
result = f"EXCEPTION: {e}"
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
else:
failed += 1
print(f" [{status}] {name}")
if not ok:
print(f" got: {result}")
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View file

@ -57,6 +57,16 @@ class _LargeZimRequest(_FlaskRequest):
return super()._get_file_stream(total_content_length, content_type, filename, content_length) return super()._get_file_stream(total_content_length, content_type, filename, content_length)
app.request_class = _LargeZimRequest app.request_class = _LargeZimRequest
# ── Address Book Blueprint ──
from .address_book_api import address_book_bp
app.register_blueprint(address_book_bp)
# ── Netsyms + Geocode Blueprints ──
from .netsyms_api import netsyms_bp, geocode_bp
app.register_blueprint(netsyms_bp)
app.register_blueprint(geocode_bp)
# ── Navigation Constants ── # ── Navigation Constants ──

117
lib/aurora_nav_tool.py Normal file
View file

@ -0,0 +1,117 @@
"""
title: Navigation
author: Echo6
version: 1.1.0
description: Turn-by-turn directions and geocoding via Photon + Valhalla on recon-vm. Supports driving, walking, cycling, and truck routing with worldwide coverage (281M places).
"""
import re
import json
import requests
from pydantic import BaseModel, Field
_COORD_RE = re.compile(r'^(-?\d+\.?\d*)\s*,\s*(-?\d+\.?\d*)$')
class Tools:
class Valves(BaseModel):
photon_url: str = Field(
default="http://100.64.0.24:2322",
description="Photon geocoding service URL (recon-vm)",
)
valhalla_url: str = Field(
default="http://100.64.0.24:8002",
description="Valhalla routing service URL (recon-vm)",
)
def __init__(self):
self.valves = self.Valves()
def _geocode(self, query: str):
m = _COORD_RE.match(query.strip())
if m:
lat, lon = float(m.group(1)), float(m.group(2))
return lat, lon, query
resp = requests.get(
f"{self.valves.photon_url}/api",
params={"q": query, "limit": 1},
timeout=10,
)
resp.raise_for_status()
features = resp.json().get("features", [])
if not features:
return None, None, None
props = features[0]["properties"]
coords = features[0]["geometry"]["coordinates"]
parts = [props.get("name", "")]
for key in ("city", "state", "country"):
v = props.get(key)
if v and v != parts[-1]:
parts.append(v)
return coords[1], coords[0], ", ".join(p for p in parts if p)
def get_directions(
self,
origin: str,
destination: str,
mode: str = "auto",
) -> str:
"""
Get turn-by-turn directions between two locations. When this tool returns results, present the directions exactly as returned do not summarize or rephrase. Include all steps.
:param origin: Starting location address, place name, or lat,lon coordinates
:param destination: Destination address, place name, or lat,lon coordinates
:param mode: Travel mode: auto, pedestrian, bicycle, or truck (default: auto)
:return: Formatted turn-by-turn directions
"""
if mode not in ("auto", "pedestrian", "bicycle", "truck"):
mode = "auto"
orig_lat, orig_lon, orig_name = self._geocode(origin)
if orig_lat is None:
return f"Could not find location: {origin}"
dest_lat, dest_lon, dest_name = self._geocode(destination)
if dest_lat is None:
return f"Could not find location: {destination}"
try:
resp = requests.post(
f"{self.valves.valhalla_url}/route",
json={
"locations": [
{"lat": orig_lat, "lon": orig_lon},
{"lat": dest_lat, "lon": dest_lon},
],
"costing": mode,
"directions_options": {"units": "miles"},
},
timeout=30,
)
except requests.RequestException:
return "Navigation service unavailable"
if resp.status_code != 200:
return "No route found between locations"
trip = resp.json()["trip"]
summary = trip["summary"]
legs = trip["legs"][0]["maneuvers"]
miles = round(summary["length"], 1)
minutes = round(summary["time"] / 60, 1)
lines = [
f"Directions from {orig_name} to {dest_name} ({mode}):",
f"Distance: {miles} miles | Time: {minutes} minutes",
"",
]
for i, m in enumerate(legs, 1):
inst = m["instruction"]
dist = m.get("length", 0)
if dist > 0:
lines.append(f"{i}. {inst}{round(dist, 1)} mi")
else:
lines.append(f"{i}. {inst}")
return "\n".join(lines)

708
lib/geocode.py Normal file
View file

@ -0,0 +1,708 @@
"""
RECON geocode structured preprocessing, multi-source retrieval, reranking.
Replaces the naive Photon-only search with:
1. usaddress parsing + intent classification (ADDRESS / POI / LOCALITY / COORD / POSTCODE)
2. Multi-source retrieval: ADDRESS Netsyms + Photon; POI/LOCALITY Photon /api
3. Python reranker with weighted signals
Public entry point: geocode(query, limit) {query, results, count}
"""
import math
import re
import logging
import requests
import usaddress
from rapidfuzz import fuzz
from .utils import setup_logging
logger = setup_logging('recon.geocode')
# ── Trace logger for reranking audit ──
_trace_logger = logging.getLogger('recon.geocode.trace')
_trace_handler = logging.FileHandler('/tmp/geocode_rerank_trace.log')
_trace_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
_trace_logger.addHandler(_trace_handler)
_trace_logger.setLevel(logging.DEBUG)
# ── Config constants ──
PHOTON_URL = "http://localhost:2322"
GEOCODE_BIAS_LAT = 42.5736
GEOCODE_BIAS_LON = -114.6066
GEOCODE_BIAS_ZOOM = 10
ADDRESS_BOOK_ANNOTATION_RADIUS_M = 75
# ── Reranker weights ──
# Derived from research analysis of failure modes:
# housenumber_exact is the strongest signal because Photon's soft-boost
# lets wrong-number results bubble up. street_name_fuzz and locality_fuzz
# handle abbreviation/case variation. source_authority gives Netsyms a
# boost for US addresses since it has USPS-verified data.
W_HOUSENUMBER_EXACT = 6.0 # exact housenumber match
W_HOUSENUMBER_MISMATCH = -5.0 # housenumber present but wrong
W_STREET_NAME_FUZZ = 3.0 # fuzzy street name similarity [0..1] * weight
W_TOKEN_COVERAGE = 2.0 # fraction of query tokens found in result
W_STREET_TYPE_MATCH = 1.5 # "st" matches "street", etc.
W_LOCALITY_FUZZ = 2.0 # city/state fuzzy match
W_SOURCE_AUTHORITY = 2.0 # Netsyms for US addresses
W_LAYER_RANK = 1.0 # type-appropriate results ranked higher
W_PHOTON_POSITION_NORM = 1.0 # Photon's native ranking (normalized by position)
W_STATE_EXACT = 1.0 # exact state code match
# ── US abbreviation expansions ──
# Applied ONLY to parsed StreetName/StreetNamePostType tokens, NOT to ordinals.
_STREET_TYPE_ABBREVS = {
'st': 'street', 'ave': 'avenue', 'blvd': 'boulevard', 'dr': 'drive',
'rd': 'road', 'ln': 'lane', 'ct': 'court', 'cir': 'circle',
'pl': 'place', 'way': 'way', 'pkwy': 'parkway', 'hwy': 'highway',
'trl': 'trail', 'ter': 'terrace', 'sq': 'square',
}
_DIRECTIONAL_ABBREVS = {
'n': 'north', 's': 'south', 'e': 'east', 'w': 'west',
'ne': 'northeast', 'nw': 'northwest', 'se': 'southeast', 'sw': 'southwest',
}
_ORDINAL_RE = re.compile(r'^\d+(st|nd|rd|th)$', re.IGNORECASE)
# ── US state codes ──
_STATE_CODES = {
'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY', 'DC',
}
# Coordinate regex
_COORD_RE = re.compile(r'^\s*(-?\d+\.?\d*)\s*[,\s]\s*(-?\d+\.?\d*)\s*$')
# ═══════════════════════════════════════════════════════════════════
# STEP 1: PREPROCESSING
# ═══════════════════════════════════════════════════════════════════
def _parse_coords(text):
"""Return (lat, lon) if text looks like coordinates with valid bounds, else None."""
m = _COORD_RE.match(text.strip())
if not m:
return None
lat, lon = float(m.group(1)), float(m.group(2))
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def _classify_and_parse(query):
"""
Parse query with usaddress, classify intent, expand abbreviations.
Returns (intent, parsed_dict) where:
intent: 'ADDRESS' | 'POI' | 'LOCALITY' | 'POSTCODE' | 'COORD' | 'UNKNOWN'
parsed_dict: {number, street, city, state, zipcode, raw_query, expanded_query}
"""
q = query.strip()
parsed = {
'number': None, 'street': None, 'street_raw': None,
'city': None, 'state': None,
'zipcode': None, 'raw_query': q, 'expanded_query': q,
}
# Coordinate check first
if _parse_coords(q):
return 'COORD', parsed
# Try usaddress
try:
tagged, addr_type = usaddress.tag(q)
except usaddress.RepeatedLabelError:
# Ambiguous input — fall back to free-text Photon
return 'UNKNOWN', parsed
# Extract components
number = tagged.get('AddressNumber', '').strip()
street_name = tagged.get('StreetName', '').strip()
street_pre_dir = tagged.get('StreetNamePreDirectional', '').strip()
street_post_type = tagged.get('StreetNamePostType', '').strip()
place = tagged.get('PlaceName', '').strip()
state = tagged.get('StateName', '').strip()
zipcode = tagged.get('ZipCode', '').strip()
# ── Fix usaddress edge case: "214 N St Filer" ──
# usaddress reads single-letter directional + "St" as PreDirectional + empty,
# mashing "St Filer" into StreetName. Detect: PreDirectional is single letter,
# StreetName has 2+ tokens where the first is a street type.
if (street_pre_dir and len(street_pre_dir) <= 2
and not street_name.strip().startswith(street_pre_dir)
and ' ' in street_name):
name_tokens = street_name.split()
first_lower = name_tokens[0].lower()
if first_lower in _STREET_TYPE_ABBREVS or first_lower in _STREET_TYPE_ABBREVS.values():
# "N" is actually the street name, "St" is the post-type
street_name = street_pre_dir
street_post_type = name_tokens[0]
if len(name_tokens) > 1:
place = ' '.join(name_tokens[1:])
street_pre_dir = ''
# ── Expand abbreviations (guard ordinals) ──
expanded_parts = []
if number:
parsed['number'] = number
expanded_parts.append(number)
if street_pre_dir:
exp = _DIRECTIONAL_ABBREVS.get(street_pre_dir.lower(), street_pre_dir)
expanded_parts.append(exp)
if street_name:
# Don't expand ordinals: "21st" stays "21st"
if _ORDINAL_RE.match(street_name):
expanded_parts.append(street_name)
else:
# Expand directional abbreviation if it IS the street name
exp = _DIRECTIONAL_ABBREVS.get(street_name.lower(), street_name)
expanded_parts.append(exp)
parsed['street'] = street_name
if street_post_type:
if _ORDINAL_RE.match(street_post_type):
expanded_parts.append(street_post_type)
else:
exp = _STREET_TYPE_ABBREVS.get(street_post_type.lower(), street_post_type)
expanded_parts.append(exp)
# Build raw street (original abbreviations, for Netsyms) and expanded (for Photon)
raw_street_parts = []
if street_pre_dir:
raw_street_parts.append(street_pre_dir)
if street_name:
raw_street_parts.append(street_name)
if street_post_type:
raw_street_parts.append(street_post_type)
parsed['street_raw'] = ' '.join(raw_street_parts)
# Build the full expanded street
if expanded_parts:
# The street is everything after the number
street_full = ' '.join(expanded_parts[1:] if number else expanded_parts)
parsed['street'] = street_full
if place:
parsed['city'] = place
expanded_parts.append(place)
if state:
parsed['state'] = state.upper()
expanded_parts.append(state)
if zipcode:
parsed['zipcode'] = zipcode
expanded_parts.append(zipcode)
parsed['expanded_query'] = ' '.join(expanded_parts)
# ── Intent classification ──
if addr_type == 'Street Address' and number:
return 'ADDRESS', parsed
elif zipcode and not number and not street_name:
return 'POSTCODE', parsed
elif addr_type == 'Ambiguous':
# Check if it looks like a locality: 2 tokens, second is a state code
tokens = q.replace(',', ' ').split()
if len(tokens) >= 2 and tokens[-1].upper() in _STATE_CODES:
parsed['city'] = ' '.join(tokens[:-1])
parsed['state'] = tokens[-1].upper()
return 'LOCALITY', parsed
return 'UNKNOWN', parsed
else:
return 'UNKNOWN', parsed
# ═══════════════════════════════════════════════════════════════════
# STEP 2: RETRIEVAL
# ═══════════════════════════════════════════════════════════════════
def _retrieve_netsyms(parsed, limit=10):
"""Query Netsyms for structured address lookup. Returns list of candidate dicts."""
try:
from . import netsyms
except Exception:
return []
results = []
number = parsed.get('number', '')
street = parsed.get('street_raw') or parsed.get('street', '')
city = parsed.get('city', '')
state = parsed.get('state', '')
zipcode = parsed.get('zipcode', '')
if number and street:
rows = netsyms.lookup_by_street(
number, street, city=city, state=state, zipcode=zipcode, limit=limit
)
elif zipcode:
rows = netsyms.lookup_by_zipcode(zipcode, limit=limit)
else:
return []
for row in rows:
addr_parts = [row['number'], row['street']]
if row.get('street2'):
addr_parts.append(row['street2'])
addr_parts.extend([row['city'], row['state'], row['zipcode']])
display = ' '.join(p for p in addr_parts if p)
results.append({
'name': display,
'lat': row['lat'],
'lon': row['lon'],
'source': 'netsyms',
'type': 'street_address',
'raw': row,
'_number': row.get('number', ''),
'_street': row.get('street', ''),
'_city': row.get('city', ''),
'_state': row.get('state', ''),
})
return results
def _retrieve_photon_structured(parsed, limit=10):
"""Query Photon /structured endpoint for address lookup."""
params = {'limit': limit, 'countrycode': 'US'}
if parsed.get('street'):
params['street'] = parsed['street']
if parsed.get('number'):
params['housenumber'] = parsed['number']
if parsed.get('city'):
params['city'] = parsed['city']
if parsed.get('state'):
params['state'] = parsed['state']
if 'street' not in params:
return []
try:
resp = requests.get(f"{PHOTON_URL}/structured", params=params, timeout=5)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.debug("Photon /structured failed: %s", e)
return []
return _parse_photon_features(data.get('features', []), 'photon')
def _retrieve_photon_freetext(query, limit=10):
"""Query Photon /api for free-text search with location bias."""
try:
params = {
'q': query,
'limit': limit,
'lat': GEOCODE_BIAS_LAT,
'lon': GEOCODE_BIAS_LON,
'zoom': GEOCODE_BIAS_ZOOM,
}
resp = requests.get(f"{PHOTON_URL}/api", params=params, timeout=5)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.debug("Photon /api failed: %s", e)
return []
return _parse_photon_features(data.get('features', []), 'photon')
def _parse_photon_features(features, source):
"""Convert Photon GeoJSON features to candidate dicts."""
results = []
for i, feature in enumerate(features):
props = feature.get('properties', {})
coords = feature.get('geometry', {}).get('coordinates', [0, 0])
osm_key = props.get('osm_key', '')
osm_value = props.get('osm_value', '')
feat_type = props.get('type', '')
has_hn = bool(props.get('housenumber'))
if has_hn or osm_value in ('house', 'residential'):
rtype = 'street_address'
elif feat_type in ('city', 'town', 'village', 'hamlet', 'county', 'state', 'country'):
rtype = 'locality'
elif osm_key in ('amenity', 'shop', 'tourism', 'leisure'):
rtype = 'poi'
else:
rtype = 'poi'
# Build display name
parts = []
hn = props.get('housenumber')
street = props.get('street')
name = props.get('name', '')
if hn and street:
parts.append(f"{hn} {street}")
if name and name != street:
parts.append(name)
elif name:
parts.append(name)
elif street:
parts.append(street)
for key in ('city', 'county', 'state', 'country'):
v = props.get(key)
if v and (not parts or v != parts[-1]):
parts.append(v)
display = ', '.join(p for p in parts if p) or 'Unknown'
results.append({
'name': display,
'lat': coords[1],
'lon': coords[0],
'source': source,
'type': rtype,
'raw': props,
'_photon_rank': i,
'_number': props.get('housenumber', ''),
'_street': props.get('street', ''),
'_city': props.get('city', ''),
'_state': props.get('state', ''),
})
return results
# ═══════════════════════════════════════════════════════════════════
# STEP 3: RERANKER
# ═══════════════════════════════════════════════════════════════════
def _expand_street_type(s):
"""Expand a street type abbreviation for comparison."""
return _STREET_TYPE_ABBREVS.get(s.lower(), s.lower())
def _score_candidate(candidate, parsed, intent):
"""
Score a candidate against the parsed query.
Returns (total_score, signal_breakdown_dict).
"""
signals = {}
total = 0.0
query_number = (parsed.get('number') or '').strip().upper()
query_street = (parsed.get('street') or '').strip().upper()
query_city = (parsed.get('city') or '').strip().upper()
query_state = (parsed.get('state') or '').strip().upper()
cand_number = (candidate.get('_number') or '').strip().upper()
cand_street = (candidate.get('_street') or '').strip().upper()
cand_city = (candidate.get('_city') or '').strip().upper()
cand_state = (candidate.get('_state') or '').strip().upper()
# ── Housenumber ──
if intent == 'ADDRESS' and query_number:
if cand_number == query_number:
signals['housenumber_exact'] = W_HOUSENUMBER_EXACT
total += W_HOUSENUMBER_EXACT
elif cand_number and cand_number != query_number:
signals['housenumber_mismatch'] = W_HOUSENUMBER_MISMATCH
total += W_HOUSENUMBER_MISMATCH
# ── Street name fuzz ──
if query_street and cand_street:
# Expand both for comparison
q_expanded = ' '.join(_expand_street_type(t) for t in query_street.split())
c_expanded = ' '.join(_expand_street_type(t) for t in cand_street.split())
ratio = fuzz.token_sort_ratio(q_expanded, c_expanded) / 100.0
score = ratio * W_STREET_NAME_FUZZ
signals['street_name_fuzz'] = round(score, 2)
total += score
# ── Street type match ──
if query_street and cand_street:
q_tokens = set(_expand_street_type(t) for t in query_street.split())
c_tokens = set(_expand_street_type(t) for t in cand_street.split())
# Check if the street type words overlap
street_types = set(_STREET_TYPE_ABBREVS.values())
q_types = q_tokens & street_types
c_types = c_tokens & street_types
if q_types and q_types & c_types:
signals['street_type_match'] = W_STREET_TYPE_MATCH
total += W_STREET_TYPE_MATCH
# ── Token coverage ──
raw_q = parsed.get('raw_query', '').upper()
q_tokens = set(raw_q.replace(',', ' ').split())
if q_tokens:
cand_text = candidate.get('name', '').upper()
matched = sum(1 for t in q_tokens if t in cand_text)
coverage = matched / len(q_tokens)
score = coverage * W_TOKEN_COVERAGE
signals['token_coverage'] = round(score, 2)
total += score
# ── Locality fuzz ──
if query_city and cand_city:
ratio = fuzz.ratio(query_city, cand_city) / 100.0
score = ratio * W_LOCALITY_FUZZ
signals['locality_fuzz'] = round(score, 2)
total += score
# ── State exact ──
if query_state and cand_state:
if cand_state == query_state:
signals['state_exact'] = W_STATE_EXACT
total += W_STATE_EXACT
# ── Source authority ──
if candidate.get('source') == 'netsyms' and intent == 'ADDRESS':
signals['source_authority'] = W_SOURCE_AUTHORITY
total += W_SOURCE_AUTHORITY
# ── Layer rank (type-appropriate bonus) ──
cand_type = candidate.get('type', '')
if intent == 'ADDRESS' and cand_type == 'street_address':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
elif intent == 'LOCALITY' and cand_type == 'locality':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
elif intent == 'POI' and cand_type == 'poi':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
# ── Photon position normalization ──
photon_rank = candidate.get('_photon_rank')
if photon_rank is not None:
# Top result gets full bonus, decays linearly
score = max(0, (1.0 - photon_rank / 10.0)) * W_PHOTON_POSITION_NORM
signals['photon_position'] = round(score, 2)
total += score
return round(total, 2), signals
def _build_match_code(candidate, parsed, intent):
"""Build a match_code dict indicating match quality for each field."""
mc = {}
if intent == 'ADDRESS':
q_num = (parsed.get('number') or '').strip().upper()
c_num = (candidate.get('_number') or '').strip().upper()
if q_num and c_num == q_num:
mc['housenumber'] = 'matched'
elif q_num and c_num:
mc['housenumber'] = 'unmatched'
elif q_num and not c_num:
mc['housenumber'] = 'inferred'
q_street = (parsed.get('street') or '').strip().upper()
c_street = (candidate.get('_street') or '').strip().upper()
if q_street and c_street:
q_exp = ' '.join(_expand_street_type(t) for t in q_street.split())
c_exp = ' '.join(_expand_street_type(t) for t in c_street.split())
ratio = fuzz.token_sort_ratio(q_exp, c_exp) / 100.0
mc['street'] = 'matched' if ratio > 0.8 else 'unmatched'
elif q_street:
mc['street'] = 'inferred'
q_city = (parsed.get('city') or '').strip().upper()
c_city = (candidate.get('_city') or '').strip().upper()
if q_city and c_city:
ratio = fuzz.ratio(q_city, c_city) / 100.0
mc['city'] = 'matched' if ratio > 0.8 else 'unmatched'
elif q_city:
mc['city'] = 'inferred'
return mc
def _rerank(candidates, parsed, intent, query, limit):
"""Score, sort, and trim candidates. Trace-log top 3."""
scored = []
for c in candidates:
total, signals = _score_candidate(c, parsed, intent)
c['_score'] = total
c['_signals'] = signals
scored.append(c)
scored.sort(key=lambda c: c['_score'], reverse=True)
# Trace log for audit
_trace_logger.debug("─── Query: %r intent=%s ───", query, intent)
for i, c in enumerate(scored[:3]):
_trace_logger.debug(
" #%d score=%.2f src=%s name=%s",
i, c['_score'], c.get('source', '?'), c.get('name', '?')[:60]
)
_trace_logger.debug(" signals=%s", c.get('_signals', {}))
# Clean internal fields and add match_code
result = []
for c in scored[:limit]:
mc = _build_match_code(c, parsed, intent)
# Assign confidence from score
score = c.get('_score', 0)
if score >= 10:
confidence = 'exact'
elif score >= 5:
confidence = 'high'
elif score >= 2:
confidence = 'medium'
else:
confidence = 'low'
entry = {
'name': c['name'],
'lat': c['lat'],
'lon': c['lon'],
'source': c['source'],
'confidence': confidence,
'type': c.get('type', 'poi'),
'raw': c.get('raw'),
}
if mc:
entry['match_code'] = mc
result.append(entry)
return result
# ═══════════════════════════════════════════════════════════════════
# STEP 4: ANNOTATION
# ═══════════════════════════════════════════════════════════════════
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters."""
R = 6_371_000
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _annotate_with_address_book(results):
"""Add labeled_as to results within radius of an address book entry."""
try:
from . import address_book
entries = address_book.load()
except Exception:
return
for result in results:
rlat, rlon = result.get('lat'), result.get('lon')
if rlat is None or rlon is None:
continue
for entry in entries:
elat, elon = entry.get('lat'), entry.get('lon')
if elat is None or elon is None:
continue
if _haversine_m(rlat, rlon, elat, elon) <= ADDRESS_BOOK_ANNOTATION_RADIUS_M:
result['labeled_as'] = entry['name']
break
# ═══════════════════════════════════════════════════════════════════
# PUBLIC API
# ═══════════════════════════════════════════════════════════════════
def geocode(query, limit=10):
"""
Structured geocoding with multi-source retrieval and reranking.
Returns {query, results: [...], count} always 200-safe.
"""
limit = max(1, min(limit, 20))
q = (query or '').strip()
empty = {'query': q, 'results': [], 'count': 0}
if not q:
return empty
# ── Coordinate detection ──
coords = _parse_coords(q)
if coords:
return {
'query': q,
'results': [{
'name': q,
'lat': coords[0],
'lon': coords[1],
'source': 'coordinates',
'confidence': 'exact',
'type': 'coordinates',
'raw': None,
}],
'count': 1,
}
# ── Address book nickname short-circuit ──
normalized_q = ' '.join(q.lower().replace(',', ' ').split())
is_single_word = ' ' not in normalized_q
try:
from . import address_book
ab_match = address_book.lookup(q)
if (ab_match
and ab_match['confidence'] == 'exact'
and ab_match.get('lat') and ab_match.get('lon')
and is_single_word):
logger.info("geocode: nickname short-circuit %r%s", q, ab_match['name'])
return {
'query': q,
'results': [{
'name': ab_match.get('address') or ab_match['name'],
'lat': ab_match['lat'],
'lon': ab_match['lon'],
'source': 'address_book',
'confidence': 'exact',
'type': 'nickname',
'raw': ab_match,
}],
'count': 1,
}
except Exception as e:
logger.debug("geocode: address_book lookup failed: %s", e)
# ── Classify intent + parse ──
intent, parsed = _classify_and_parse(q)
logger.debug("geocode: intent=%s parsed=%s", intent, parsed)
# ── Retrieve candidates ──
candidates = []
if intent == 'ADDRESS':
# Parallel: Netsyms (structured) + Photon (freetext with expanded query)
netsyms_results = _retrieve_netsyms(parsed, limit=limit)
photon_results = _retrieve_photon_freetext(
parsed.get('expanded_query', q), limit=limit
)
# Also try Photon /structured for addresses
photon_struct = _retrieve_photon_structured(parsed, limit=5)
candidates = netsyms_results + photon_results + photon_struct
elif intent == 'POSTCODE':
netsyms_results = _retrieve_netsyms(parsed, limit=limit)
photon_results = _retrieve_photon_freetext(q, limit=limit)
candidates = netsyms_results + photon_results
elif intent in ('LOCALITY', 'POI', 'UNKNOWN'):
candidates = _retrieve_photon_freetext(q, limit=limit)
# ── Deduplicate by (lat, lon) proximity ──
deduped = []
for c in candidates:
is_dup = False
for existing in deduped:
if (_haversine_m(c['lat'], c['lon'], existing['lat'], existing['lon']) < 50
and c.get('source') == existing.get('source')):
is_dup = True
break
if not is_dup:
deduped.append(c)
candidates = deduped
# ── Rerank ──
results = _rerank(candidates, parsed, intent, q, limit)
# ── Address book annotation ──
_annotate_with_address_book(results)
logger.info("geocode: %r → intent=%s, %d results", q, intent, len(results))
return {'query': q, 'results': results, 'count': len(results)}

157
lib/geocode_test.py Normal file
View file

@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""Tests for RECON Photon-first geocode chain."""
import sys
import os
import json
import urllib.request
import urllib.parse
BASE = "http://localhost:8420"
TESTS = [
{
"name": "home → nickname short-circuit",
"query": "home",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "address_book"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "nickname"
),
},
{
"name": "214 north st filer → netsyms exact match (multi-word, not nickname)",
"query": "214 north st filer",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "netsyms"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "street_address"
),
},
{
"name": "214 North St, Filer, ID → netsyms (case/punctuation)",
"query": "214 North St, Filer, ID",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "netsyms",
},
{
"name": "214 NORTH ST FILER ID → netsyms (uppercase)",
"query": "214 NORTH ST FILER ID",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "netsyms",
},
{
"name": "1600 Pennsylvania Ave Washington DC → White House",
"query": "1600 Pennsylvania Ave Washington DC",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "photon"
),
},
{
"name": "1600 pennsylvania ave washington dc → lowercase",
"query": "1600 pennsylvania ave washington dc",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "starbucks filer → POI result",
"query": "starbucks filer",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "filer idaho → locality",
"query": "filer idaho",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "photon"
and r["results"][0]["type"] == "locality"
),
},
{
"name": "filer → partial query, at least 1 result",
"query": "filer",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "42.5736, -114.6066 → coordinates (with space)",
"query": "42.5736, -114.6066",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "coordinates"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "coordinates"
),
},
{
"name": "42.5736,-114.6066 → coordinates (no space)",
"query": "42.5736,-114.6066",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "coordinates"
and r["results"][0]["confidence"] == "exact"
),
},
{
"name": "boise → at least 1 result",
"query": "boise",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "toronto → CA canary",
"query": "toronto",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "asdfghjklqwerty → empty results, 200 OK",
"query": "asdfghjklqwerty",
"check": lambda r: r["count"] == 0 and r["results"] == [],
},
{
"name": "empty query → empty results",
"query": "",
"check": lambda r: r["count"] == 0 and r["results"] == [],
},
]
passed = 0
failed = 0
for t in TESTS:
q = urllib.parse.urlencode({"q": t["query"]}) if t["query"] else "q="
url = f"{BASE}/api/geocode?{q}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
status = resp.status
body = json.loads(resp.read())
except urllib.error.HTTPError as e:
status = e.code
try:
body = json.loads(e.read())
except Exception:
body = {}
except Exception as e:
status = 0
body = {}
print(f" [FAIL] {t['name']}")
print(f" EXCEPTION: {e}")
failed += 1
continue
ok = status == 200 and t["check"](body)
tag = "PASS" if ok else "FAIL"
if ok:
passed += 1
else:
failed += 1
top = body.get("results", [{}])[0] if body.get("results") else {}
top_summary = f"source={top.get('source','')} type={top.get('type','')} conf={top.get('confidence','')} name={top.get('name','')[:50]}"
print(f" [{tag}] {t['name']}")
if not ok:
print(f" HTTP {status}, count={body.get('count','?')}, top: {top_summary}")
else:
labeled = f" labeled_as={top.get('labeled_as')}" if top.get('labeled_as') else ""
print(f"{top_summary}{labeled}")
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

168
lib/nav_tools.py Normal file
View file

@ -0,0 +1,168 @@
"""Navigation tools: geocoding via Photon and routing via Valhalla."""
import math
import re
import requests
from .utils import setup_logging
logger = setup_logging('recon.nav_tools')
PHOTON_URL = "http://localhost:2322"
VALHALLA_URL = "http://localhost:8002"
# Regional bias for Photon searches (Idaho-centric for Matt's use case).
# Adjustable — Photon uses these to rank nearby results higher.
GEOCODE_BIAS_LAT = 42.5736
GEOCODE_BIAS_LON = -114.6066
GEOCODE_BIAS_ZOOM = 10
# Distance threshold (meters) for annotating Photon results with address
# book labels. 75m covers GPS jitter + geocoder imprecision.
ADDRESS_BOOK_ANNOTATION_RADIUS_M = 75
# Coordinate regex — handles comma-separated and space-separated forms.
_COORD_RE = re.compile(
r'^\s*(-?\d+\.\d+)\s*[,\s]\s*(-?\d+\.\d+)\s*$'
)
VALID_MODES = {"auto", "pedestrian", "bicycle", "truck"}
def _parse_coords(text: str):
"""Return (lat, lon) if text looks like coordinates with valid bounds, else None."""
m = _COORD_RE.match(text.strip())
if not m:
return None
lat, lon = float(m.group(1)), float(m.group(2))
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters between two (lat, lon) points."""
R = 6_371_000 # Earth radius in meters
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def geocode(query: str, limit: int = 10):
"""Delegate to the structured geocode module. See lib/geocode.py."""
from . import geocode as geocode_mod
return geocode_mod.geocode(query, limit=limit)
def _geocode(query: str):
"""Internal: returns (lat, lon, display_name) tuple for route()."""
result = geocode(query, limit=1)
results = result.get('results', [])
if not results:
raise ValueError(f"Could not find location: {query}")
top = results[0]
return top['lat'], top['lon'], top['name']
def reverse_geocode(lat: float, lon: float) -> str:
"""Reverse geocode coordinates via Photon. Returns formatted address string."""
try:
resp = requests.get(
f"{PHOTON_URL}/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
data = resp.json()
features = data.get("features", [])
if not features:
return f"{lat}, {lon}"
props = features[0]["properties"]
parts = []
for key in ("name", "housenumber", "street", "city", "state", "country", "postcode"):
v = props.get(key)
if v:
parts.append(v)
return ", ".join(parts) if parts else f"{lat}, {lon}"
def route(origin: str, destination: str, mode: str = "auto") -> dict:
"""
Get a route between two locations.
Args:
origin: Starting location address, place name, or "lat,lon"
destination: Destination address, place name, or "lat,lon"
mode: Travel mode auto, pedestrian, bicycle, truck
Returns:
dict with summary, maneuvers, origin/destination info, and raw shape
"""
if mode not in VALID_MODES:
mode = "auto"
# Geocode both endpoints
orig_lat, orig_lon, orig_name = _geocode(origin)
dest_lat, dest_lon, dest_name = _geocode(destination)
# Query Valhalla
valhalla_req = {
"locations": [
{"lat": orig_lat, "lon": orig_lon},
{"lat": dest_lat, "lon": dest_lon},
],
"costing": mode,
"directions_options": {"units": "miles"},
}
try:
resp = requests.post(
f"{VALHALLA_URL}/route",
json=valhalla_req,
timeout=30,
)
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
if resp.status_code != 200:
try:
err = resp.json()
msg = err.get("error", "Unknown routing error")
except Exception:
msg = f"Routing error (HTTP {resp.status_code})"
raise RuntimeError(f"No route found between locations: {msg}")
data = resp.json()
trip = data["trip"]
summary = trip["summary"]
leg = trip["legs"][0]
# Build maneuver list
maneuvers = []
for m in leg["maneuvers"]:
streets = m.get("street_names", [])
maneuvers.append({
"instruction": m["instruction"],
"distance_miles": round(m.get("length", 0), 2),
"street_name": streets[0] if streets else "",
"type": m.get("type", 0),
"verbal_succinct": m.get("verbal_succinct_transition_instruction", ""),
})
return {
"origin": {"name": orig_name, "lat": orig_lat, "lon": orig_lon},
"destination": {"name": dest_name, "lat": dest_lat, "lon": dest_lon},
"summary": {
"distance_miles": round(summary["length"], 1),
"time_minutes": round(summary["time"] / 60, 1),
"mode": mode,
},
"maneuvers": maneuvers,
"shape": leg.get("shape", ""),
}

77
lib/nav_tools_test.py Normal file
View file

@ -0,0 +1,77 @@
"""Tests for nav_tools — run against live Photon + Valhalla services."""
import sys
import json
from nav_tools import route, reverse_geocode
def test_route_named():
"""route("Buhl Idaho", "Boise Idaho", "auto") returns maneuvers."""
print("TEST 1: route('Buhl Idaho', 'Boise Idaho', 'auto')")
r = route("Buhl Idaho", "Boise Idaho", "auto")
assert r["summary"]["distance_miles"] > 50, f"Expected >50 mi, got {r['summary']['distance_miles']}"
assert r["summary"]["time_minutes"] > 60, f"Expected >60 min, got {r['summary']['time_minutes']}"
assert len(r["maneuvers"]) > 5, f"Expected >5 maneuvers, got {len(r['maneuvers'])}"
assert r["shape"], "Missing polyline shape"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min, {len(r['maneuvers'])} maneuvers")
print(f" Origin: {r['origin']['name']}")
print(f" Destination: {r['destination']['name']}")
print(f" First maneuver: {r['maneuvers'][0]['instruction']}")
def test_route_coords():
"""route with raw lat,lon coordinates."""
print("\nTEST 2: route('42.5991,-114.7636', '43.615,-116.2023', 'auto')")
r = route("42.5991,-114.7636", "43.615,-116.2023", "auto")
assert r["summary"]["distance_miles"] > 100, f"Expected >100 mi, got {r['summary']['distance_miles']}"
assert len(r["maneuvers"]) > 3, f"Expected >3 maneuvers"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min")
def test_route_pedestrian():
"""route with pedestrian mode."""
print("\nTEST 3: route('Buhl Idaho', 'Boise Idaho', 'pedestrian')")
r = route("Buhl Idaho", "Boise Idaho", "pedestrian")
assert r["summary"]["mode"] == "pedestrian"
assert r["summary"]["time_minutes"] > r["summary"]["distance_miles"], "Walking should take more min than miles"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min (pedestrian)")
def test_reverse_geocode():
"""reverse_geocode near Buhl, Idaho."""
print("\nTEST 4: reverse_geocode(42.5991, -114.7636)")
result = reverse_geocode(42.5991, -114.7636)
assert "Buhl" in result or "Twin Falls" in result or "Idaho" in result, f"Expected Buhl/Idaho, got: {result}"
print(f" OK — {result}")
def test_route_bad_origin():
"""route with nonexistent place returns clean error."""
print("\nTEST 5: route('nonexistent place xyz123abc', 'Boise Idaho')")
try:
r = route("nonexistent place xyz123abc", "Boise Idaho")
print(f" FAIL — expected error, got result: {r['summary']}")
return False
except ValueError as e:
print(f" OK — clean error: {e}")
except RuntimeError as e:
print(f" OK — runtime error: {e}")
if __name__ == "__main__":
passed = 0
failed = 0
tests = [test_route_named, test_route_coords, test_route_pedestrian, test_reverse_geocode, test_route_bad_origin]
for test in tests:
try:
test()
passed += 1
except Exception as e:
print(f" FAIL — {e}")
failed += 1
print(f"\n{'='*40}")
print(f"Results: {passed} passed, {failed} failed out of {len(tests)}")
sys.exit(1 if failed else 0)

228
lib/netsyms.py Normal file
View file

@ -0,0 +1,228 @@
"""
RECON Netsyms AddressDatabase2025 SQLite-backed US+CA address lookup.
Provides 159.78M geocoded addresses as tier-2 between address book
(exact named locations) and Photon (full-text global geocoding).
Database: /mnt/nav/addresses/AddressDatabase2025.sqlite (read-only)
"""
import os
import re
import sqlite3
import threading
from .utils import setup_logging
logger = setup_logging('recon.netsyms')
_DB_PATH = '/mnt/nav/addresses/AddressDatabase2025.sqlite'
_conn = None
_lock = threading.Lock()
_cached_row_count = None
# US states + DC + territories, CA provinces, for free-text parsing
_STATE_CODES = {
'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY',
'DC', 'PR', 'VI', 'GU', 'AS', 'MP',
# Canadian provinces
'AB', 'BC', 'MB', 'NB', 'NL', 'NS', 'NT', 'NU', 'ON', 'PE',
'QC', 'SK', 'YT',
}
_NUMBER_RE = re.compile(r'^(\d+[\w-]*)(.*)$')
def _get_conn():
"""Lazy-open a read-only SQLite connection."""
global _conn
if _conn is not None:
return _conn
with _lock:
if _conn is not None:
return _conn
uri = f'file:{_DB_PATH}?mode=ro'
_conn = sqlite3.connect(uri, uri=True, check_same_thread=False)
_conn.row_factory = sqlite3.Row
logger.info("Netsyms DB opened: %s", _DB_PATH)
return _conn
def _row_to_dict(row):
"""Convert a sqlite3.Row to a plain dict with lat/lon keys."""
return {
'zipcode': row['zipcode'],
'number': row['number'],
'street': row['street'],
'street2': row['street2'],
'city': row['city'],
'state': row['state'],
'plus4': row['plus4'],
'country': row['country'],
'lat': float(row['latitude']),
'lon': float(row['longitude']),
'source': row['source'],
}
def lookup_by_street(number, street, city=None, state=None,
zipcode=None, country=None, limit=20):
"""Match on number + street, with optional qualifiers."""
conn = _get_conn()
clauses = ['number = ?', 'street = ?']
params = [str(number).strip().upper(), street.strip().upper()]
if city:
clauses.append('city = ?')
params.append(city.strip().upper())
if state:
clauses.append('state = ?')
params.append(state.strip().upper())
if zipcode:
clauses.append('zipcode = ?')
params.append(zipcode.strip())
if country:
clauses.append('country = ?')
params.append(country.strip().upper())
sql = f"SELECT * FROM addresses WHERE {' AND '.join(clauses)} LIMIT ?"
params.append(limit)
with _lock:
try:
rows = conn.execute(sql, params).fetchall()
except sqlite3.Error as e:
logger.warning("Netsyms lookup_by_street error: %s", e)
return []
results = [_row_to_dict(r) for r in rows]
logger.debug("lookup_by_street(%s, %s, city=%s, state=%s) → %d results",
number, street, city, state, len(results))
return results
def lookup_free_text(query, country_hint=None):
"""Parse a free-text address and look it up."""
q = query.strip()
if not q:
return []
# Strip trailing zipcode if present
zipcode = None
zip_match = re.search(r'\b(\d{5})\s*$', q)
if zip_match:
zipcode = zip_match.group(1)
q = q[:zip_match.start()].strip().rstrip(',').strip()
# Strip trailing state
tokens = re.split(r'[,\s]+', q)
tokens = [t for t in tokens if t]
if not tokens:
return []
state = None
if len(tokens) >= 2 and tokens[-1].upper() in _STATE_CODES:
state = tokens[-1].upper()
tokens = tokens[:-1]
# Leading digits → number
number = None
if tokens and re.match(r'^\d', tokens[0]):
number = tokens[0]
tokens = tokens[1:]
if not tokens:
# Only a number, or empty — try zipcode if we have one
if zipcode:
return lookup_by_zipcode(zipcode, limit=20)
return []
# If state was found and we have 2+ tokens remaining, last token is city
city = None
if state and len(tokens) >= 2:
city = tokens[-1]
tokens = tokens[:-1]
street = ' '.join(tokens)
if number:
results = lookup_by_street(number, street, city=city, state=state,
zipcode=zipcode, country=country_hint)
if results:
logger.debug("lookup_free_text(%r) → %d results via street match",
query, len(results))
return results
# Fallback: try zipcode only if available
if zipcode:
return lookup_by_zipcode(zipcode, limit=20)
logger.debug("lookup_free_text(%r) → 0 results", query)
return []
def lookup_by_zipcode(zipcode, limit=100):
"""Direct zipcode lookup."""
conn = _get_conn()
sql = "SELECT * FROM addresses WHERE zipcode = ? LIMIT ?"
params = [zipcode.strip(), limit]
with _lock:
try:
rows = conn.execute(sql, params).fetchall()
except sqlite3.Error as e:
logger.warning("Netsyms lookup_by_zipcode error: %s", e)
return []
results = [_row_to_dict(r) for r in rows]
logger.debug("lookup_by_zipcode(%s) → %d results", zipcode, len(results))
return results
def health():
"""Health check with cached row count."""
global _cached_row_count
try:
file_size = os.path.getsize(_DB_PATH)
except OSError:
return {'ok': False, 'row_count': 0, 'file_size_bytes': 0,
'indexed_countries': []}
try:
conn = _get_conn()
except Exception:
return {'ok': False, 'row_count': 0, 'file_size_bytes': file_size,
'indexed_countries': []}
if _cached_row_count is None:
with _lock:
if _cached_row_count is None:
try:
row = conn.execute(
"SELECT COUNT(*) AS cnt FROM addresses"
).fetchone()
_cached_row_count = row['cnt']
except sqlite3.Error:
_cached_row_count = 0
with _lock:
try:
rows = conn.execute(
"SELECT DISTINCT country FROM addresses"
).fetchall()
countries = sorted(r['country'] for r in rows)
except sqlite3.Error:
countries = []
return {
'ok': True,
'row_count': _cached_row_count,
'file_size_bytes': file_size,
'indexed_countries': countries,
}

108
lib/netsyms_api.py Normal file
View file

@ -0,0 +1,108 @@
"""
RECON Netsyms API + Geocode Flask Blueprints.
GET /api/netsyms/lookup?q=<free text>&country=<optional>
GET /api/netsyms/health
GET /api/geocode?q=<query>&limit=<N> (Photon-first search with ranked results)
"""
from flask import Blueprint, request, jsonify
from . import netsyms
from . import address_book
from . import nav_tools
from .utils import setup_logging
logger = setup_logging('recon.netsyms_api')
netsyms_bp = Blueprint('netsyms', __name__)
geocode_bp = Blueprint('geocode', __name__)
@netsyms_bp.route('/api/netsyms/lookup')
def api_netsyms_lookup():
q = request.args.get('q', '').strip()
if not q:
return jsonify({'error': 'Missing q parameter'}), 400
country = request.args.get('country', '').strip() or None
results = netsyms.lookup_free_text(q, country_hint=country)
return jsonify({'results': results, 'count': len(results), 'query': q})
@netsyms_bp.route('/api/netsyms/health')
def api_netsyms_health():
return jsonify(netsyms.health())
@geocode_bp.route('/api/geocode')
def api_geocode():
"""
Photon-first geocoding with ranked candidates.
GET /api/geocode?q=<query>&limit=<N>
Always returns 200 OK with:
{query, results: [{name, lat, lon, source, confidence, type, raw, ...}], count}
- source: "address_book" | "coordinates" | "photon"
- confidence: "exact" | "high" | "medium" | "low"
- type: "nickname" | "coordinates" | "street_address" | "poi" | "locality"
- labeled_as: present when result is within 75m of an address book entry
- Empty results array is valid (no match). No 404s.
"""
q = request.args.get('q', '').strip()
limit = request.args.get('limit', '10')
try:
limit = max(1, min(int(limit), 20))
except (ValueError, TypeError):
limit = 10
result = nav_tools.geocode(q, limit=limit)
return jsonify(result)
@geocode_bp.route('/api/reverse')
def api_reverse():
"""
Reverse geocode coordinates via Photon.
GET /api/reverse?lat=X&lon=Y
Returns same shape as /api/geocode:
{query: "lat,lon", results: [{name, lat, lon, source, type, raw, ...}], count}
Returns 200 OK with empty results on no match. 400 on invalid coords.
"""
try:
lat = float(request.args.get('lat', ''))
lon = float(request.args.get('lon', ''))
except (ValueError, TypeError):
return jsonify({'error': 'Missing or invalid lat/lon parameters'}), 400
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return jsonify({'error': 'Coordinates out of range'}), 400
query_str = f"{lat},{lon}"
try:
import requests as http_requests
resp = http_requests.get(
"http://localhost:2322/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
features = data.get("features", [])
except Exception:
logger.warning("Photon reverse geocode failed for %s", query_str)
return jsonify({'query': query_str, 'results': [], 'count': 0})
if not features:
return jsonify({'query': query_str, 'results': [], 'count': 0})
from .geocode import _parse_photon_features
results = _parse_photon_features(features, source='photon_reverse')
return jsonify({'query': query_str, 'results': results, 'count': len(results)})

80
lib/netsyms_test.py Normal file
View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""Tests for Netsyms address database module."""
import sys
import os
# Ensure the lib directory is importable
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib import netsyms
def test_lookup_by_street_lowercase():
results = netsyms.lookup_by_street("214", "North St", city="Filer", state="ID")
assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}"
r = results[0]
assert abs(r['lat'] - 42.5736) < 0.01, f"Lat mismatch: {r['lat']}"
assert abs(r['lon'] - (-114.6066)) < 0.01, f"Lon mismatch: {r['lon']}"
print(" PASS: lookup_by_street (lowercase)")
def test_lookup_by_street_uppercase():
results = netsyms.lookup_by_street("214", "NORTH ST", city="FILER", state="ID")
assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}"
r = results[0]
assert abs(r['lat'] - 42.5736) < 0.01, f"Lat mismatch: {r['lat']}"
print(" PASS: lookup_by_street (uppercase)")
def test_lookup_nonexistent():
results = netsyms.lookup_by_street("999999", "Nonexistent Rd",
city="Filer", state="ID")
assert results == [], f"Expected empty list, got {len(results)} results"
print(" PASS: lookup_by_street (nonexistent)")
def test_free_text_with_commas():
results = netsyms.lookup_free_text("214 North St, Filer, ID")
assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}"
r = results[0]
assert r['city'] == 'FILER', f"City mismatch: {r['city']}"
assert r['state'] == 'ID', f"State mismatch: {r['state']}"
print(" PASS: lookup_free_text (commas)")
def test_free_text_no_commas():
results = netsyms.lookup_free_text("214 North St Filer ID")
assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}"
r = results[0]
assert r['state'] == 'ID', f"State mismatch: {r['state']}"
print(" PASS: lookup_free_text (no commas)")
def test_lookup_by_zipcode():
results = netsyms.lookup_by_zipcode("83328", limit=5)
assert len(results) == 5, f"Expected 5 results, got {len(results)}"
for r in results:
assert r['zipcode'] == '83328', f"Zipcode mismatch: {r['zipcode']}"
print(" PASS: lookup_by_zipcode")
def test_health():
h = netsyms.health()
assert h['ok'] is True, f"Health not OK: {h}"
assert h['row_count'] >= 159_000_000, f"Row count too low: {h['row_count']}"
assert 'US' in h['indexed_countries'], f"US not in countries: {h['indexed_countries']}"
assert 'CA' in h['indexed_countries'], f"CA not in countries: {h['indexed_countries']}"
print(" PASS: health")
if __name__ == '__main__':
print("Running Netsyms tests...")
test_lookup_by_street_lowercase()
test_lookup_by_street_uppercase()
test_lookup_nonexistent()
test_free_text_with_commas()
test_free_text_no_commas()
test_lookup_by_zipcode()
test_health()
print("All tests passed.")

161
lib/query_router.py Normal file
View file

@ -0,0 +1,161 @@
"""Semantic query router for Aurora.
Classifies user queries into routes (nav_route, nav_reverse_geocode,
direct_answer, rag_search) by comparing query embeddings against
pre-computed route centroids from example queries.
TEI endpoint: http://100.64.0.14:8090/embed (cortex via Tailscale)
"""
import math
import threading
import requests
# ── Route examples ────────────────────────────────────────────────────────────
ROUTE_EXAMPLES = {
"nav_route": [
"how do I get to Boise",
"directions to Twin Falls",
"how do I get from Buhl to Boise",
"drive from Jerome to Sun Valley",
"route from Boise to McCall",
"what's the fastest way to Sun Valley",
"how far is it to Twin Falls",
"take me to Shoshone",
"navigate to the airport",
"how do I drive to Salt Lake City",
"walking directions to the park",
"bike route to downtown",
],
"nav_reverse_geocode": [
"what town is at 42.5, -114.7",
"where am I right now",
"what is at coordinates 43.6, -116.2",
"what location is 42.574, -114.607",
"where is this place 44.0, -114.3",
"what city is near 42.7, -114.5",
"reverse geocode 43.0, -115.0",
"what's at this location 42.9, -114.8",
],
"direct_answer": [
"hello",
"hey aurora",
"good morning",
"thanks",
"thank you",
"what's your name",
"who are you",
"tell me a joke",
"how are you",
"hi there",
],
"rag_search": [
"what does the survival manual say about water",
"how to purify water in the field",
"how to treat a gunshot wound",
"what is the ranger handbook chapter on patrolling",
"field manual water purification",
"how to build a shelter in the wilderness",
"tactical combat casualty care procedures",
"what does FM 21-76 say about fire starting",
],
}
# ── Module-level cache ────────────────────────────────────────────────────────
_ROUTE_CENTROIDS: dict | None = None
_LOCK = threading.Lock()
def _embed_batch(texts: list[str], tei_url: str) -> list[list[float]]:
"""Embed a batch of texts via TEI."""
resp = requests.post(tei_url, json={"inputs": texts}, timeout=30)
resp.raise_for_status()
return resp.json()
def _compute_centroid(vectors: list[list[float]]) -> list[float]:
"""Element-wise mean of vectors."""
n = len(vectors)
dim = len(vectors[0])
centroid = [0.0] * dim
for vec in vectors:
for i in range(dim):
centroid[i] += vec[i]
for i in range(dim):
centroid[i] /= n
return centroid
def _cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors (pure Python)."""
dot = 0.0
norm_a = 0.0
norm_b = 0.0
for i in range(len(a)):
dot += a[i] * b[i]
norm_a += a[i] * a[i]
norm_b += b[i] * b[i]
denom = math.sqrt(norm_a) * math.sqrt(norm_b)
if denom == 0:
return 0.0
return dot / denom
def _ensure_centroids(tei_url: str) -> dict[str, list[float]]:
"""Lazy-init: embed all examples in one batch, compute centroids, cache."""
global _ROUTE_CENTROIDS
if _ROUTE_CENTROIDS is not None:
return _ROUTE_CENTROIDS
with _LOCK:
if _ROUTE_CENTROIDS is not None:
return _ROUTE_CENTROIDS
# Flatten all examples into one batch
all_texts = []
route_ranges: dict[str, tuple[int, int]] = {}
offset = 0
for route, examples in ROUTE_EXAMPLES.items():
route_ranges[route] = (offset, offset + len(examples))
all_texts.extend(examples)
offset += len(examples)
all_vectors = _embed_batch(all_texts, tei_url)
centroids = {}
for route, (start, end) in route_ranges.items():
centroids[route] = _compute_centroid(all_vectors[start:end])
_ROUTE_CENTROIDS = centroids
return _ROUTE_CENTROIDS
def classify(
query: str,
tei_url: str = "http://100.64.0.14:8090/embed",
threshold: float = 0.45,
) -> tuple[str, float]:
"""Classify a query into a route.
Returns (route_name, confidence). If no route exceeds the threshold,
returns ("rag_search", best_score) as the safe default.
"""
centroids = _ensure_centroids(tei_url)
# Embed the query
vecs = _embed_batch([query], tei_url)
query_vec = vecs[0]
# Compare against all centroids
best_route = "rag_search"
best_score = 0.0
for route, centroid in centroids.items():
sim = _cosine_similarity(query_vec, centroid)
if sim > best_score:
best_score = sim
best_route = route
if best_score < threshold:
return ("rag_search", best_score)
return (best_route, best_score)

49
lib/query_router_test.py Normal file
View file

@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""Test suite for the semantic query router."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib.query_router import classify
TEST_QUERIES = [
("how do I get from Buhl to Boise", "nav_route"),
("what does the survival manual say about water", "rag_search"),
("what town is at 42.5, -114.7", "nav_reverse_geocode"),
("hey aurora", "direct_answer"),
("what's the fastest way to Sun Valley", "nav_route"),
("how to purify water in the field", "rag_search"),
("good morning", "direct_answer"),
]
def main():
print("Query Router Test Suite")
print("=" * 70)
passed = 0
failed = 0
for query, expected in TEST_QUERIES:
route, confidence = classify(query)
status = "PASS" if route == expected else "FAIL"
if status == "PASS":
passed += 1
else:
failed += 1
print(f" [{status}] {query!r}")
print(f"{route} ({confidence:.3f}) expected={expected}")
print("=" * 70)
print(f"Results: {passed}/{passed + failed} passed")
if failed:
print(f" {failed} FAILED")
sys.exit(1)
else:
print(" All tests passed!")
if __name__ == "__main__":
main()