cleanup: remove /api/geocode + /api/reverse handlers (extraction #6 shadow)

All three routes (/api/geocode, /api/reverse, /api/reverse/<lat>/<lon>) are
edge-shadowed since extraction #6 — navi-geo :8426 serves them via nginx.

- netsyms_api.py: drop geocode_bp + its three handlers, the bundle-private
  helpers, and module state (TTLCache/lock/_TZ_DB_PATH/_DEM). netsyms_bp
  (/api/netsyms/lookup + /health) survives.
- api.py: drop the geocode_bp import + register_blueprint line.
- DELETE lib/geocode.py, lib/nav_tools.py (both orphaned once the handlers go).
- DELETE reverse_bundle_test.py, geocode_test.py, nav_tools_test.py.

Decouples netsyms_api.py from landclass.py and offroute/dem.py — prerequisite
for cleanups #5 and #6.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
malice 2026-05-23 04:04:45 -06:00 committed by GitHub
commit d7292c4cc7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 3 additions and 1610 deletions

View file

@ -59,10 +59,9 @@ class _LargeZimRequest(_FlaskRequest):
return super()._get_file_stream(total_content_length, content_type, filename, content_length)
app.request_class = _LargeZimRequest
# ── Netsyms + Geocode Blueprints ──
from .netsyms_api import netsyms_bp, geocode_bp
# ── Netsyms Blueprint ──
from .netsyms_api import netsyms_bp
app.register_blueprint(netsyms_bp)
app.register_blueprint(geocode_bp)
# ── Wiki-enrich Blueprint (extraction #5 prep — HTTP wrapper over wiki_index) ──
from .wiki_enrich_api import wiki_enrich_bp

View file

@ -1,774 +0,0 @@
"""
RECON geocode structured preprocessing, multi-source retrieval, reranking.
Replaces the naive Photon-only search with:
1. usaddress parsing + intent classification (ADDRESS / POI / LOCALITY / COORD / POSTCODE)
2. Multi-source retrieval: ADDRESS Netsyms + Photon; POI/LOCALITY Photon /api
3. Python reranker with weighted signals
Public entry point: geocode(query, limit) {query, results, count}
"""
import math
import re
import logging
import requests
import usaddress
from rapidfuzz import fuzz
from .utils import setup_logging
logger = setup_logging('recon.geocode')
# ── Trace logger for reranking audit ──
_trace_logger = logging.getLogger('recon.geocode.trace')
_trace_handler = logging.FileHandler('/tmp/geocode_rerank_trace.log')
_trace_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
_trace_logger.addHandler(_trace_handler)
_trace_logger.setLevel(logging.DEBUG)
# ── Config constants ──
PHOTON_URL = "http://localhost:2322"
GEOCODE_BIAS_LAT = 42.5736
GEOCODE_BIAS_LON = -114.6066
GEOCODE_BIAS_ZOOM = 10
ADDRESS_BOOK_ANNOTATION_RADIUS_M = 75
# ── Reranker weights ──
# Derived from research analysis of failure modes:
# housenumber_exact is the strongest signal because Photon's soft-boost
# lets wrong-number results bubble up. street_name_fuzz and locality_fuzz
# handle abbreviation/case variation. source_authority gives Netsyms a
# boost for US addresses since it has USPS-verified data.
W_HOUSENUMBER_EXACT = 6.0 # exact housenumber match
W_HOUSENUMBER_MISMATCH = -5.0 # housenumber present but wrong
W_STREET_NAME_FUZZ = 3.0 # fuzzy street name similarity [0..1] * weight
W_TOKEN_COVERAGE = 2.0 # fraction of query tokens found in result
W_STREET_TYPE_MATCH = 1.5 # "st" matches "street", etc.
W_LOCALITY_FUZZ = 2.0 # city/state fuzzy match
W_SOURCE_AUTHORITY = 2.0 # Netsyms for US addresses
W_LAYER_RANK = 1.0 # type-appropriate results ranked higher
W_PHOTON_POSITION_NORM = 1.0 # Photon's native ranking (normalized by position)
W_STATE_EXACT = 1.0 # exact state code match
W_POI_CLASS_BOOST = 3.0 # amenity/shop/etc boost for business-name queries
W_HIGHWAY_CLASS_PENALTY = -4.0 # highway/route penalty for business-name queries
# ── US abbreviation expansions ──
# Applied ONLY to parsed StreetName/StreetNamePostType tokens, NOT to ordinals.
_STREET_TYPE_ABBREVS = {
'st': 'street', 'ave': 'avenue', 'blvd': 'boulevard', 'dr': 'drive',
'rd': 'road', 'ln': 'lane', 'ct': 'court', 'cir': 'circle',
'pl': 'place', 'way': 'way', 'pkwy': 'parkway', 'hwy': 'highway',
'trl': 'trail', 'ter': 'terrace', 'sq': 'square',
}
_DIRECTIONAL_ABBREVS = {
'n': 'north', 's': 'south', 'e': 'east', 'w': 'west',
'ne': 'northeast', 'nw': 'northwest', 'se': 'southeast', 'sw': 'southwest',
}
_ORDINAL_RE = re.compile(r'^\d+(st|nd|rd|th)$', re.IGNORECASE)
# ── Road keywords (for detecting when query is about a road vs a business) ──
_ROAD_KEYWORDS = (
set(_STREET_TYPE_ABBREVS.keys())
| set(_STREET_TYPE_ABBREVS.values())
| {'route', 'rte', 'pass'}
)
# ── US state codes ──
_STATE_CODES = {
'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY', 'DC',
}
# ── Full state name → code (for intent classifier) ──
_STATE_NAME_TO_CODE = {
'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR',
'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE',
'florida': 'FL', 'georgia': 'GA', 'hawaii': 'HI', 'idaho': 'ID',
'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA', 'kansas': 'KS',
'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME', 'maryland': 'MD',
'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN',
'mississippi': 'MS', 'missouri': 'MO', 'montana': 'MT', 'nebraska': 'NE',
'nevada': 'NV', 'new hampshire': 'NH', 'new jersey': 'NJ',
'new mexico': 'NM', 'new york': 'NY', 'north carolina': 'NC',
'north dakota': 'ND', 'ohio': 'OH', 'oklahoma': 'OK', 'oregon': 'OR',
'pennsylvania': 'PA', 'rhode island': 'RI', 'south carolina': 'SC',
'south dakota': 'SD', 'tennessee': 'TN', 'texas': 'TX', 'utah': 'UT',
'vermont': 'VT', 'virginia': 'VA', 'washington': 'WA',
'west virginia': 'WV', 'wisconsin': 'WI', 'wyoming': 'WY',
}
# Coordinate regex
_COORD_RE = re.compile(r'^\s*(-?\d+\.?\d*)\s*[,\s]\s*(-?\d+\.?\d*)\s*$')
# ═══════════════════════════════════════════════════════════════════
# STEP 1: PREPROCESSING
# ═══════════════════════════════════════════════════════════════════
def _parse_coords(text):
"""Return (lat, lon) if text looks like coordinates with valid bounds, else None."""
m = _COORD_RE.match(text.strip())
if not m:
return None
lat, lon = float(m.group(1)), float(m.group(2))
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def _classify_and_parse(query):
"""
Parse query with usaddress, classify intent, expand abbreviations.
Returns (intent, parsed_dict) where:
intent: 'ADDRESS' | 'POI' | 'LOCALITY' | 'POSTCODE' | 'COORD' | 'UNKNOWN'
parsed_dict: {number, street, city, state, zipcode, raw_query, expanded_query}
"""
q = query.strip()
parsed = {
'number': None, 'street': None, 'street_raw': None,
'city': None, 'state': None,
'zipcode': None, 'raw_query': q, 'expanded_query': q,
}
# Coordinate check first
if _parse_coords(q):
return 'COORD', parsed
# Try usaddress
try:
tagged, addr_type = usaddress.tag(q)
except usaddress.RepeatedLabelError:
# Ambiguous input — fall back to free-text Photon
return 'UNKNOWN', parsed
# Extract components
number = tagged.get('AddressNumber', '').strip()
street_name = tagged.get('StreetName', '').strip()
street_pre_dir = tagged.get('StreetNamePreDirectional', '').strip()
street_post_type = tagged.get('StreetNamePostType', '').strip()
place = tagged.get('PlaceName', '').strip()
state = tagged.get('StateName', '').strip()
zipcode = tagged.get('ZipCode', '').strip()
# ── Fix usaddress edge case: "214 N St Filer" ──
# usaddress reads single-letter directional + "St" as PreDirectional + empty,
# mashing "St Filer" into StreetName. Detect: PreDirectional is single letter,
# StreetName has 2+ tokens where the first is a street type.
if (street_pre_dir and len(street_pre_dir) <= 2
and not street_name.strip().startswith(street_pre_dir)
and ' ' in street_name):
name_tokens = street_name.split()
first_lower = name_tokens[0].lower()
if first_lower in _STREET_TYPE_ABBREVS or first_lower in _STREET_TYPE_ABBREVS.values():
# "N" is actually the street name, "St" is the post-type
street_name = street_pre_dir
street_post_type = name_tokens[0]
if len(name_tokens) > 1:
place = ' '.join(name_tokens[1:])
street_pre_dir = ''
# ── Expand abbreviations (guard ordinals) ──
expanded_parts = []
if number:
parsed['number'] = number
expanded_parts.append(number)
if street_pre_dir:
exp = _DIRECTIONAL_ABBREVS.get(street_pre_dir.lower(), street_pre_dir)
expanded_parts.append(exp)
if street_name:
# Don't expand ordinals: "21st" stays "21st"
if _ORDINAL_RE.match(street_name):
expanded_parts.append(street_name)
else:
# Expand directional abbreviation if it IS the street name
exp = _DIRECTIONAL_ABBREVS.get(street_name.lower(), street_name)
expanded_parts.append(exp)
parsed['street'] = street_name
if street_post_type:
if _ORDINAL_RE.match(street_post_type):
expanded_parts.append(street_post_type)
else:
exp = _STREET_TYPE_ABBREVS.get(street_post_type.lower(), street_post_type)
expanded_parts.append(exp)
# Build raw street (original abbreviations, for Netsyms) and expanded (for Photon)
raw_street_parts = []
if street_pre_dir:
raw_street_parts.append(street_pre_dir)
if street_name:
raw_street_parts.append(street_name)
if street_post_type:
raw_street_parts.append(street_post_type)
parsed['street_raw'] = ' '.join(raw_street_parts)
# Build the full expanded street
if expanded_parts:
# The street is everything after the number
street_full = ' '.join(expanded_parts[1:] if number else expanded_parts)
parsed['street'] = street_full
if place:
parsed['city'] = place
expanded_parts.append(place)
if state:
parsed['state'] = state.upper()
expanded_parts.append(state)
if zipcode:
parsed['zipcode'] = zipcode
expanded_parts.append(zipcode)
parsed['expanded_query'] = ' '.join(expanded_parts)
# ── Intent classification ──
if addr_type == 'Street Address' and number:
return 'ADDRESS', parsed
elif zipcode and not number and not street_name:
return 'POSTCODE', parsed
elif addr_type == 'Ambiguous':
# Check if it looks like a locality: last token(s) are a state code or name
tokens = q.replace(',', ' ').split()
if len(tokens) >= 2:
last_upper = tokens[-1].upper()
if last_upper in _STATE_CODES:
parsed['city'] = ' '.join(tokens[:-1])
parsed['state'] = last_upper
return 'LOCALITY', parsed
# Check full state names (single-word like "idaho" or two-word like "new york")
last_lower = tokens[-1].lower()
if last_lower in _STATE_NAME_TO_CODE:
parsed['city'] = ' '.join(tokens[:-1])
parsed['state'] = _STATE_NAME_TO_CODE[last_lower]
return 'LOCALITY', parsed
if len(tokens) >= 3:
two_word = f"{tokens[-2].lower()} {last_lower}"
if two_word in _STATE_NAME_TO_CODE:
parsed['city'] = ' '.join(tokens[:-2])
parsed['state'] = _STATE_NAME_TO_CODE[two_word]
return 'LOCALITY', parsed
return 'UNKNOWN', parsed
else:
return 'UNKNOWN', parsed
# ═══════════════════════════════════════════════════════════════════
# STEP 2: RETRIEVAL
# ═══════════════════════════════════════════════════════════════════
def _retrieve_netsyms(parsed, limit=10, lat=None, lon=None):
"""Query Netsyms for structured address lookup. Returns list of candidate dicts."""
try:
from . import netsyms
except Exception:
return []
results = []
number = parsed.get('number', '')
street = parsed.get('street_raw') or parsed.get('street', '')
city = parsed.get('city', '')
state = parsed.get('state', '')
zipcode = parsed.get('zipcode', '')
# When viewport provided, fetch more results to sort from
fetch_limit = 200 if (lat is not None and lon is not None) else limit
if number and street:
rows = netsyms.lookup_by_street(
number, street, city=city, state=state, zipcode=zipcode, limit=fetch_limit
)
elif zipcode:
rows = netsyms.lookup_by_zipcode(zipcode, limit=fetch_limit)
else:
return []
for row in rows:
addr_parts = [row['number'], row['street']]
if row.get('street2'):
addr_parts.append(row['street2'])
addr_parts.extend([row['city'], row['state'], row['zipcode']])
display = ' '.join(p for p in addr_parts if p)
results.append({
'name': display,
'lat': row['lat'],
'lon': row['lon'],
'source': 'netsyms',
'type': 'street_address',
'raw': row,
'_number': row.get('number', ''),
'_street': row.get('street', ''),
'_city': row.get('city', ''),
'_state': row.get('state', ''),
})
# Sort by viewport distance if lat/lon provided, then limit
if lat is not None and lon is not None and results:
results.sort(key=lambda r: (r["lat"] - lat)**2 + (r["lon"] - lon)**2)
results = results[:limit]
return results
def _retrieve_photon_structured(parsed, limit=10):
"""Query Photon /structured endpoint for address lookup."""
params = {'limit': limit, 'countrycode': 'US'}
if parsed.get('street'):
params['street'] = parsed['street']
if parsed.get('number'):
params['housenumber'] = parsed['number']
if parsed.get('city'):
params['city'] = parsed['city']
if parsed.get('state'):
params['state'] = parsed['state']
if 'street' not in params:
return []
try:
resp = requests.get(f"{PHOTON_URL}/structured", params=params, timeout=5)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.debug("Photon /structured failed: %s", e)
return []
return _parse_photon_features(data.get('features', []), 'photon')
def _retrieve_photon_freetext(query, limit=10, lat=None, lon=None, zoom=None):
"""Query Photon /api for free-text search with location bias."""
try:
params = {
'q': query,
'limit': limit,
'lat': lat if lat is not None else GEOCODE_BIAS_LAT,
'lon': lon if lon is not None else GEOCODE_BIAS_LON,
'zoom': int(zoom) if zoom is not None else GEOCODE_BIAS_ZOOM,
}
resp = requests.get(f"{PHOTON_URL}/api", params=params, timeout=5)
resp.raise_for_status()
data = resp.json()
except Exception as e:
return []
return _parse_photon_features(data.get('features', []), 'photon')
def _parse_photon_features(features, source):
"""Convert Photon GeoJSON features to candidate dicts."""
results = []
for i, feature in enumerate(features):
props = feature.get('properties', {})
coords = feature.get('geometry', {}).get('coordinates', [0, 0])
osm_key = props.get('osm_key', '')
osm_value = props.get('osm_value', '')
feat_type = props.get('type', '')
has_hn = bool(props.get('housenumber'))
if osm_key in ('amenity', 'shop', 'tourism', 'leisure', 'office'):
rtype = 'poi'
elif has_hn or osm_value in ('house', 'residential'):
rtype = 'street_address'
elif feat_type in ('city', 'town', 'village', 'hamlet', 'county', 'state', 'country'):
rtype = 'locality'
else:
rtype = 'poi'
# Build display name
parts = []
hn = props.get('housenumber')
street = props.get('street')
name = props.get('name', '')
if hn and street:
parts.append(f"{hn} {street}")
if name and name != street:
parts.append(name)
elif name:
parts.append(name)
elif street:
parts.append(street)
for key in ('city', 'county', 'state', 'country'):
v = props.get(key)
if v and (not parts or v != parts[-1]):
parts.append(v)
display = ', '.join(p for p in parts if p) or 'Unknown'
results.append({
'name': display,
'lat': coords[1],
'lon': coords[0],
'source': source,
'type': rtype,
'raw': props,
'_photon_rank': i,
'_number': props.get('housenumber', ''),
'_street': props.get('street', ''),
# For locality results, the name IS the city (Photon omits 'city' on city-type features)
'_city': props.get('city', '') or (props.get('name', '') if rtype == 'locality' else ''),
'_state': props.get('state', ''),
})
return results
# ═══════════════════════════════════════════════════════════════════
# STEP 3: RERANKER
# ═══════════════════════════════════════════════════════════════════
def _expand_street_type(s):
"""Expand a street type abbreviation for comparison."""
return _STREET_TYPE_ABBREVS.get(s.lower(), s.lower())
def _score_candidate(candidate, parsed, intent):
"""
Score a candidate against the parsed query.
Returns (total_score, signal_breakdown_dict).
"""
signals = {}
total = 0.0
query_number = (parsed.get('number') or '').strip().upper()
query_street = (parsed.get('street') or '').strip().upper()
query_city = (parsed.get('city') or '').strip().upper()
query_state = (parsed.get('state') or '').strip().upper()
cand_number = (candidate.get('_number') or '').strip().upper()
cand_street = (candidate.get('_street') or '').strip().upper()
cand_city = (candidate.get('_city') or '').strip().upper()
cand_state = (candidate.get('_state') or '').strip().upper()
# ── Housenumber ──
if intent == 'ADDRESS' and query_number:
if cand_number == query_number:
signals['housenumber_exact'] = W_HOUSENUMBER_EXACT
total += W_HOUSENUMBER_EXACT
elif cand_number and cand_number != query_number:
signals['housenumber_mismatch'] = W_HOUSENUMBER_MISMATCH
total += W_HOUSENUMBER_MISMATCH
# ── Street name fuzz ──
if query_street and cand_street:
# Expand both for comparison
q_expanded = ' '.join(_expand_street_type(t) for t in query_street.split())
c_expanded = ' '.join(_expand_street_type(t) for t in cand_street.split())
ratio = fuzz.token_sort_ratio(q_expanded, c_expanded) / 100.0
score = ratio * W_STREET_NAME_FUZZ
signals['street_name_fuzz'] = round(score, 2)
total += score
# ── Street type match ──
if query_street and cand_street:
q_tokens = set(_expand_street_type(t) for t in query_street.split())
c_tokens = set(_expand_street_type(t) for t in cand_street.split())
# Check if the street type words overlap
street_types = set(_STREET_TYPE_ABBREVS.values())
q_types = q_tokens & street_types
c_types = c_tokens & street_types
if q_types and q_types & c_types:
signals['street_type_match'] = W_STREET_TYPE_MATCH
total += W_STREET_TYPE_MATCH
# ── Token coverage ──
raw_q = parsed.get('raw_query', '').upper()
q_tokens = set(raw_q.replace(',', ' ').split())
if q_tokens:
cand_text = candidate.get('name', '').upper()
matched = sum(1 for t in q_tokens if t in cand_text)
coverage = matched / len(q_tokens)
score = coverage * W_TOKEN_COVERAGE
signals['token_coverage'] = round(score, 2)
total += score
# ── Locality fuzz ──
if query_city and cand_city:
ratio = fuzz.ratio(query_city, cand_city) / 100.0
score = ratio * W_LOCALITY_FUZZ
signals['locality_fuzz'] = round(score, 2)
total += score
# ── State exact ──
if query_state and cand_state:
if cand_state == query_state:
signals['state_exact'] = W_STATE_EXACT
total += W_STATE_EXACT
# ── Source authority ──
if candidate.get('source') == 'netsyms' and intent == 'ADDRESS':
signals['source_authority'] = W_SOURCE_AUTHORITY
total += W_SOURCE_AUTHORITY
# ── Layer rank (type-appropriate bonus) ──
cand_type = candidate.get('type', '')
if intent == 'ADDRESS' and cand_type == 'street_address':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
elif intent == 'LOCALITY' and cand_type == 'locality':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
elif intent == 'POI' and cand_type == 'poi':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
# ── Photon position normalization ──
photon_rank = candidate.get('_photon_rank')
if photon_rank is not None:
# Top result gets full bonus, decays linearly
score = max(0, (1.0 - photon_rank / 10.0)) * W_PHOTON_POSITION_NORM
signals['photon_position'] = round(score, 2)
total += score
# ── Business intent POI boost ──
# When the query has no road keywords (likely a business/POI search),
# boost amenity/shop/etc results and penalize highway/route results.
# Skipped for LOCALITY, POSTCODE, COORD queries where class is irrelevant.
if intent not in ('LOCALITY', 'POSTCODE', 'COORD'):
q_tokens_lower = set(parsed.get('raw_query', '').lower().replace(',', ' ').split())
if not (q_tokens_lower & _ROAD_KEYWORDS):
osm_key = (candidate.get('raw') or {}).get('osm_key', '')
if osm_key in ('amenity', 'shop', 'tourism', 'leisure', 'office', 'craft'):
signals['poi_class_boost'] = W_POI_CLASS_BOOST
total += W_POI_CLASS_BOOST
elif osm_key in ('highway', 'route'):
signals['highway_class_penalty'] = W_HIGHWAY_CLASS_PENALTY
total += W_HIGHWAY_CLASS_PENALTY
return round(total, 2), signals
def _build_match_code(candidate, parsed, intent):
"""Build a match_code dict indicating match quality for each field."""
mc = {}
if intent == 'ADDRESS':
q_num = (parsed.get('number') or '').strip().upper()
c_num = (candidate.get('_number') or '').strip().upper()
if q_num and c_num == q_num:
mc['housenumber'] = 'matched'
elif q_num and c_num:
mc['housenumber'] = 'unmatched'
elif q_num and not c_num:
mc['housenumber'] = 'inferred'
q_street = (parsed.get('street') or '').strip().upper()
c_street = (candidate.get('_street') or '').strip().upper()
if q_street and c_street:
q_exp = ' '.join(_expand_street_type(t) for t in q_street.split())
c_exp = ' '.join(_expand_street_type(t) for t in c_street.split())
ratio = fuzz.token_sort_ratio(q_exp, c_exp) / 100.0
mc['street'] = 'matched' if ratio > 0.8 else 'unmatched'
elif q_street:
mc['street'] = 'inferred'
q_city = (parsed.get('city') or '').strip().upper()
c_city = (candidate.get('_city') or '').strip().upper()
if q_city and c_city:
ratio = fuzz.ratio(q_city, c_city) / 100.0
mc['city'] = 'matched' if ratio > 0.8 else 'unmatched'
elif q_city:
mc['city'] = 'inferred'
return mc
def _rerank(candidates, parsed, intent, query, limit):
"""Score, sort, and trim candidates. Trace-log top 3."""
scored = []
for c in candidates:
total, signals = _score_candidate(c, parsed, intent)
c['_score'] = total
c['_signals'] = signals
scored.append(c)
scored.sort(key=lambda c: c['_score'], reverse=True)
# Trace log for audit
_trace_logger.debug("─── Query: %r intent=%s ───", query, intent)
for i, c in enumerate(scored):
osm_key = (c.get('raw') or {}).get('osm_key', '')
osm_val = (c.get('raw') or {}).get('osm_value', '')
_trace_logger.debug(
" #%d score=%.2f src=%s key=%s/%s name=%s",
i, c['_score'], c.get('source', '?'), osm_key, osm_val,
c.get('name', '?')[:60]
)
_trace_logger.debug(" signals=%s", c.get('_signals', {}))
# Clean internal fields and add match_code
result = []
for c in scored[:limit]:
mc = _build_match_code(c, parsed, intent)
# Assign confidence from score
score = c.get('_score', 0)
if score >= 10:
confidence = 'exact'
elif score >= 5:
confidence = 'high'
elif score >= 2:
confidence = 'medium'
else:
confidence = 'low'
entry = {
'name': c['name'],
'lat': c['lat'],
'lon': c['lon'],
'source': c['source'],
'confidence': confidence,
'type': c.get('type', 'poi'),
'raw': c.get('raw'),
}
if mc:
entry['match_code'] = mc
result.append(entry)
return result
# ═══════════════════════════════════════════════════════════════════
# STEP 4: ANNOTATION
# ═══════════════════════════════════════════════════════════════════
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters."""
R = 6_371_000
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _annotate_with_address_book(results):
"""Add labeled_as to results within radius of an address book entry."""
try:
from . import address_book
entries = address_book.load()
except Exception:
return
for result in results:
rlat, rlon = result.get('lat'), result.get('lon')
if rlat is None or rlon is None:
continue
for entry in entries:
elat, elon = entry.get('lat'), entry.get('lon')
if elat is None or elon is None:
continue
if _haversine_m(rlat, rlon, elat, elon) <= ADDRESS_BOOK_ANNOTATION_RADIUS_M:
result['labeled_as'] = entry['name']
break
# ═══════════════════════════════════════════════════════════════════
# PUBLIC API
# ═══════════════════════════════════════════════════════════════════
def geocode(query, limit=10, lat=None, lon=None, zoom=None):
"""
Structured geocoding with multi-source retrieval and reranking.
Returns {query, results: [...], count} always 200-safe.
"""
limit = max(1, min(limit, 20))
q = (query or '').strip()
empty = {'query': q, 'results': [], 'count': 0}
if not q:
return empty
# ── Coordinate detection ──
coords = _parse_coords(q)
if coords:
return {
'query': q,
'results': [{
'name': q,
'lat': coords[0],
'lon': coords[1],
'source': 'coordinates',
'confidence': 'exact',
'type': 'coordinates',
'raw': None,
}],
'count': 1,
}
# ── Address book nickname short-circuit ──
normalized_q = ' '.join(q.lower().replace(',', ' ').split())
is_single_word = ' ' not in normalized_q
try:
from . import address_book
ab_match = address_book.lookup(q)
if (ab_match
and ab_match['confidence'] == 'exact'
and ab_match.get('lat') and ab_match.get('lon')
and is_single_word):
logger.info("geocode: nickname short-circuit %r%s", q, ab_match['name'])
return {
'query': q,
'results': [{
'name': ab_match.get('address') or ab_match['name'],
'lat': ab_match['lat'],
'lon': ab_match['lon'],
'source': 'address_book',
'confidence': 'exact',
'type': 'nickname',
'raw': ab_match,
}],
'count': 1,
}
except Exception as e:
logger.debug("geocode: address_book lookup failed: %s", e)
# ── Classify intent + parse ──
intent, parsed = _classify_and_parse(q)
logger.debug("geocode: intent=%s parsed=%s", intent, parsed)
# ── Retrieve candidates ──
candidates = []
if intent == 'ADDRESS':
# Parallel: Netsyms (structured) + Photon (freetext with expanded query)
netsyms_results = _retrieve_netsyms(parsed, limit=limit, lat=lat, lon=lon)
photon_results = _retrieve_photon_freetext(
parsed.get('expanded_query', q), limit=limit, lat=lat, lon=lon, zoom=zoom
)
# Also try Photon /structured for addresses
photon_struct = _retrieve_photon_structured(parsed, limit=5)
candidates = netsyms_results + photon_results + photon_struct
elif intent == 'POSTCODE':
netsyms_results = _retrieve_netsyms(parsed, limit=limit, lat=lat, lon=lon)
photon_results = _retrieve_photon_freetext(q, limit=limit, lat=lat, lon=lon, zoom=zoom)
candidates = netsyms_results + photon_results
elif intent in ('LOCALITY', 'POI', 'UNKNOWN'):
candidates = _retrieve_photon_freetext(q, limit=limit, lat=lat, lon=lon, zoom=zoom)
# ── Deduplicate by (lat, lon) proximity ──
deduped = []
for c in candidates:
is_dup = False
for existing in deduped:
if (_haversine_m(c['lat'], c['lon'], existing['lat'], existing['lon']) < 50
and c.get('source') == existing.get('source')):
is_dup = True
break
if not is_dup:
deduped.append(c)
candidates = deduped
# ── Rerank ──
results = _rerank(candidates, parsed, intent, q, limit)
# ── Address book annotation ──
_annotate_with_address_book(results)
logger.info("geocode: %r → intent=%s, %d results", q, intent, len(results))
return {'query': q, 'results': results, 'count': len(results)}

View file

@ -1,157 +0,0 @@
#!/usr/bin/env python3
"""Tests for RECON Photon-first geocode chain."""
import sys
import os
import json
import urllib.request
import urllib.parse
BASE = "http://localhost:8420"
TESTS = [
{
"name": "home → nickname short-circuit",
"query": "home",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "address_book"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "nickname"
),
},
{
"name": "214 north st filer → netsyms exact match (multi-word, not nickname)",
"query": "214 north st filer",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "netsyms"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "street_address"
),
},
{
"name": "214 North St, Filer, ID → netsyms (case/punctuation)",
"query": "214 North St, Filer, ID",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "netsyms",
},
{
"name": "214 NORTH ST FILER ID → netsyms (uppercase)",
"query": "214 NORTH ST FILER ID",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "netsyms",
},
{
"name": "1600 Pennsylvania Ave Washington DC → White House",
"query": "1600 Pennsylvania Ave Washington DC",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "photon"
),
},
{
"name": "1600 pennsylvania ave washington dc → lowercase",
"query": "1600 pennsylvania ave washington dc",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "starbucks filer → POI result",
"query": "starbucks filer",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "filer idaho → locality",
"query": "filer idaho",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "photon"
and r["results"][0]["type"] == "locality"
),
},
{
"name": "filer → partial query, at least 1 result",
"query": "filer",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "42.5736, -114.6066 → coordinates (with space)",
"query": "42.5736, -114.6066",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "coordinates"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "coordinates"
),
},
{
"name": "42.5736,-114.6066 → coordinates (no space)",
"query": "42.5736,-114.6066",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "coordinates"
and r["results"][0]["confidence"] == "exact"
),
},
{
"name": "boise → at least 1 result",
"query": "boise",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "toronto → CA canary",
"query": "toronto",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "asdfghjklqwerty → empty results, 200 OK",
"query": "asdfghjklqwerty",
"check": lambda r: r["count"] == 0 and r["results"] == [],
},
{
"name": "empty query → empty results",
"query": "",
"check": lambda r: r["count"] == 0 and r["results"] == [],
},
]
passed = 0
failed = 0
for t in TESTS:
q = urllib.parse.urlencode({"q": t["query"]}) if t["query"] else "q="
url = f"{BASE}/api/geocode?{q}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
status = resp.status
body = json.loads(resp.read())
except urllib.error.HTTPError as e:
status = e.code
try:
body = json.loads(e.read())
except Exception:
body = {}
except Exception as e:
status = 0
body = {}
print(f" [FAIL] {t['name']}")
print(f" EXCEPTION: {e}")
failed += 1
continue
ok = status == 200 and t["check"](body)
tag = "PASS" if ok else "FAIL"
if ok:
passed += 1
else:
failed += 1
top = body.get("results", [{}])[0] if body.get("results") else {}
top_summary = f"source={top.get('source','')} type={top.get('type','')} conf={top.get('confidence','')} name={top.get('name','')[:50]}"
print(f" [{tag}] {t['name']}")
if not ok:
print(f" HTTP {status}, count={body.get('count','?')}, top: {top_summary}")
else:
labeled = f" labeled_as={top.get('labeled_as')}" if top.get('labeled_as') else ""
print(f"{top_summary}{labeled}")
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View file

@ -1,168 +0,0 @@
"""Navigation tools: geocoding via Photon and routing via Valhalla."""
import math
import re
import requests
from .utils import setup_logging
logger = setup_logging('recon.nav_tools')
PHOTON_URL = "http://localhost:2322"
VALHALLA_URL = "http://localhost:8002"
# Regional bias for Photon searches (Idaho-centric for Matt's use case).
# Adjustable — Photon uses these to rank nearby results higher.
GEOCODE_BIAS_LAT = 42.5736
GEOCODE_BIAS_LON = -114.6066
GEOCODE_BIAS_ZOOM = 10
# Distance threshold (meters) for annotating Photon results with address
# book labels. 75m covers GPS jitter + geocoder imprecision.
ADDRESS_BOOK_ANNOTATION_RADIUS_M = 75
# Coordinate regex — handles comma-separated and space-separated forms.
_COORD_RE = re.compile(
r'^\s*(-?\d+\.\d+)\s*[,\s]\s*(-?\d+\.\d+)\s*$'
)
VALID_MODES = {"auto", "pedestrian", "bicycle", "truck"}
def _parse_coords(text: str):
"""Return (lat, lon) if text looks like coordinates with valid bounds, else None."""
m = _COORD_RE.match(text.strip())
if not m:
return None
lat, lon = float(m.group(1)), float(m.group(2))
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters between two (lat, lon) points."""
R = 6_371_000 # Earth radius in meters
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def geocode(query: str, limit: int = 10, lat=None, lon=None, zoom=None):
"""Delegate to the structured geocode module. See lib/geocode.py."""
from . import geocode as geocode_mod
return geocode_mod.geocode(query, limit=limit, lat=lat, lon=lon, zoom=zoom)
def _geocode(query: str):
"""Internal: returns (lat, lon, display_name) tuple for route()."""
result = geocode(query, limit=1)
results = result.get('results', [])
if not results:
raise ValueError(f"Could not find location: {query}")
top = results[0]
return top['lat'], top['lon'], top['name']
def reverse_geocode(lat: float, lon: float) -> str:
"""Reverse geocode coordinates via Photon. Returns formatted address string."""
try:
resp = requests.get(
f"{PHOTON_URL}/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
data = resp.json()
features = data.get("features", [])
if not features:
return f"{lat}, {lon}"
props = features[0]["properties"]
parts = []
for key in ("name", "housenumber", "street", "city", "state", "country", "postcode"):
v = props.get(key)
if v:
parts.append(v)
return ", ".join(parts) if parts else f"{lat}, {lon}"
def route(origin: str, destination: str, mode: str = "auto") -> dict:
"""
Get a route between two locations.
Args:
origin: Starting location address, place name, or "lat,lon"
destination: Destination address, place name, or "lat,lon"
mode: Travel mode auto, pedestrian, bicycle, truck
Returns:
dict with summary, maneuvers, origin/destination info, and raw shape
"""
if mode not in VALID_MODES:
mode = "auto"
# Geocode both endpoints
orig_lat, orig_lon, orig_name = _geocode(origin)
dest_lat, dest_lon, dest_name = _geocode(destination)
# Query Valhalla
valhalla_req = {
"locations": [
{"lat": orig_lat, "lon": orig_lon},
{"lat": dest_lat, "lon": dest_lon},
],
"costing": mode,
"directions_options": {"units": "miles"},
}
try:
resp = requests.post(
f"{VALHALLA_URL}/route",
json=valhalla_req,
timeout=30,
)
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
if resp.status_code != 200:
try:
err = resp.json()
msg = err.get("error", "Unknown routing error")
except Exception:
msg = f"Routing error (HTTP {resp.status_code})"
raise RuntimeError(f"No route found between locations: {msg}")
data = resp.json()
trip = data["trip"]
summary = trip["summary"]
leg = trip["legs"][0]
# Build maneuver list
maneuvers = []
for m in leg["maneuvers"]:
streets = m.get("street_names", [])
maneuvers.append({
"instruction": m["instruction"],
"distance_miles": round(m.get("length", 0), 2),
"street_name": streets[0] if streets else "",
"type": m.get("type", 0),
"verbal_succinct": m.get("verbal_succinct_transition_instruction", ""),
})
return {
"origin": {"name": orig_name, "lat": orig_lat, "lon": orig_lon},
"destination": {"name": dest_name, "lat": dest_lat, "lon": dest_lon},
"summary": {
"distance_miles": round(summary["length"], 1),
"time_minutes": round(summary["time"] / 60, 1),
"mode": mode,
},
"maneuvers": maneuvers,
"shape": leg.get("shape", ""),
}

View file

@ -1,77 +0,0 @@
"""Tests for nav_tools — run against live Photon + Valhalla services."""
import sys
import json
from nav_tools import route, reverse_geocode
def test_route_named():
"""route("Buhl Idaho", "Boise Idaho", "auto") returns maneuvers."""
print("TEST 1: route('Buhl Idaho', 'Boise Idaho', 'auto')")
r = route("Buhl Idaho", "Boise Idaho", "auto")
assert r["summary"]["distance_miles"] > 50, f"Expected >50 mi, got {r['summary']['distance_miles']}"
assert r["summary"]["time_minutes"] > 60, f"Expected >60 min, got {r['summary']['time_minutes']}"
assert len(r["maneuvers"]) > 5, f"Expected >5 maneuvers, got {len(r['maneuvers'])}"
assert r["shape"], "Missing polyline shape"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min, {len(r['maneuvers'])} maneuvers")
print(f" Origin: {r['origin']['name']}")
print(f" Destination: {r['destination']['name']}")
print(f" First maneuver: {r['maneuvers'][0]['instruction']}")
def test_route_coords():
"""route with raw lat,lon coordinates."""
print("\nTEST 2: route('42.5991,-114.7636', '43.615,-116.2023', 'auto')")
r = route("42.5991,-114.7636", "43.615,-116.2023", "auto")
assert r["summary"]["distance_miles"] > 100, f"Expected >100 mi, got {r['summary']['distance_miles']}"
assert len(r["maneuvers"]) > 3, f"Expected >3 maneuvers"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min")
def test_route_pedestrian():
"""route with pedestrian mode."""
print("\nTEST 3: route('Buhl Idaho', 'Boise Idaho', 'pedestrian')")
r = route("Buhl Idaho", "Boise Idaho", "pedestrian")
assert r["summary"]["mode"] == "pedestrian"
assert r["summary"]["time_minutes"] > r["summary"]["distance_miles"], "Walking should take more min than miles"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min (pedestrian)")
def test_reverse_geocode():
"""reverse_geocode near Buhl, Idaho."""
print("\nTEST 4: reverse_geocode(42.5991, -114.7636)")
result = reverse_geocode(42.5991, -114.7636)
assert "Buhl" in result or "Twin Falls" in result or "Idaho" in result, f"Expected Buhl/Idaho, got: {result}"
print(f" OK — {result}")
def test_route_bad_origin():
"""route with nonexistent place returns clean error."""
print("\nTEST 5: route('nonexistent place xyz123abc', 'Boise Idaho')")
try:
r = route("nonexistent place xyz123abc", "Boise Idaho")
print(f" FAIL — expected error, got result: {r['summary']}")
return False
except ValueError as e:
print(f" OK — clean error: {e}")
except RuntimeError as e:
print(f" OK — runtime error: {e}")
if __name__ == "__main__":
passed = 0
failed = 0
tests = [test_route_named, test_route_coords, test_route_pedestrian, test_reverse_geocode, test_route_bad_origin]
for test in tests:
try:
test()
passed += 1
except Exception as e:
print(f" FAIL — {e}")
failed += 1
print(f"\n{'='*40}")
print(f"Results: {passed} passed, {failed} failed out of {len(tests)}")
sys.exit(1 if failed else 0)

View file

@ -1,29 +1,19 @@
"""
RECON Netsyms API + Geocode Flask Blueprints.
RECON Netsyms API Flask Blueprint.
GET /api/netsyms/lookup?q=<free text>&country=<optional>
GET /api/netsyms/health
GET /api/geocode?q=<query>&limit=<N> (Photon-first search with ranked results)
GET /api/reverse/<lat>/<lon> (localhost-sourced enrichment bundle for Central)
"""
import sqlite3
import threading
from cachetools import TTLCache
from flask import Blueprint, request, jsonify
from . import netsyms
from . import address_book
from . import nav_tools
from .geocode import PHOTON_URL
from .offroute.dem import DEMReader
from .utils import setup_logging
logger = setup_logging('recon.netsyms_api')
netsyms_bp = Blueprint('netsyms', __name__)
geocode_bp = Blueprint('geocode', __name__)
@netsyms_bp.route('/api/netsyms/lookup')
@ -40,252 +30,3 @@ def api_netsyms_lookup():
@netsyms_bp.route('/api/netsyms/health')
def api_netsyms_health():
return jsonify(netsyms.health())
def _safe_float(val, lo, hi):
"""Parse val as float; return None if missing, non-numeric, or out of [lo, hi]."""
if val is None:
return None
try:
f = float(val)
if lo <= f <= hi:
return f
except (ValueError, TypeError):
pass
return None
@geocode_bp.route('/api/geocode')
def api_geocode():
"""
Photon-first geocoding with ranked candidates.
GET /api/geocode?q=<query>&limit=<N>
Always returns 200 OK with:
{query, results: [{name, lat, lon, source, confidence, type, raw, ...}], count}
- source: "address_book" | "coordinates" | "photon"
- confidence: "exact" | "high" | "medium" | "low"
- type: "nickname" | "coordinates" | "street_address" | "poi" | "locality"
- labeled_as: present when result is within 75m of an address book entry
- Empty results array is valid (no match). No 404s.
"""
q = request.args.get('q', '').strip()
limit = request.args.get('limit', '10')
try:
limit = max(1, min(int(limit), 20))
except (ValueError, TypeError):
limit = 10
# Viewport bias parameters (optional)
lat = _safe_float(request.args.get("lat"), -90, 90)
lon = _safe_float(request.args.get("lon"), -180, 180)
zoom = _safe_float(request.args.get("zoom"), 0, 22)
result = nav_tools.geocode(q, limit=limit, lat=lat, lon=lon, zoom=zoom)
return jsonify(result)
@geocode_bp.route('/api/reverse')
def api_reverse():
"""
Reverse geocode coordinates via Photon.
GET /api/reverse?lat=X&lon=Y
Returns same shape as /api/geocode:
{query: "lat,lon", results: [{name, lat, lon, source, type, raw, ...}], count}
Returns 200 OK with empty results on no match. 400 on invalid coords.
"""
try:
lat = float(request.args.get('lat', ''))
lon = float(request.args.get('lon', ''))
except (ValueError, TypeError):
return jsonify({'error': 'Missing or invalid lat/lon parameters'}), 400
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return jsonify({'error': 'Coordinates out of range'}), 400
query_str = f"{lat},{lon}"
try:
import requests as http_requests
resp = http_requests.get(
"http://localhost:2322/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
features = data.get("features", [])
except Exception:
logger.warning("Photon reverse geocode failed for %s", query_str)
return jsonify({'query': query_str, 'results': [], 'count': 0})
if not features:
return jsonify({'query': query_str, 'results': [], 'count': 0})
from .geocode import _parse_photon_features
results = _parse_photon_features(features, source='photon_reverse')
return jsonify({'query': query_str, 'results': results, 'count': len(results)})
# ─────────────────────────────────────────────────────────────────────────
# /api/reverse/<lat>/<lon> — localhost-sourced enrichment bundle (Central)
#
# Sibling to the query-string /api/reverse above; that route is unchanged.
# Every component is sourced from localhost only (Photon, timezones.sqlite,
# in-process landclass/PostGIS, planet-DEM PMTiles). Each lookup is
# independent: a component failure logs a warning and yields null — never 5xx.
# ─────────────────────────────────────────────────────────────────────────
_TZ_DB_PATH = "/mnt/nav/sources/timezones.sqlite"
# Full bundle cache: key=(round(lat,4), round(lon,4)) -> dict. ~10k entries, 24h TTL.
_REVERSE_BUNDLE_CACHE = TTLCache(maxsize=10_000, ttl=86_400)
_REVERSE_BUNDLE_LOCK = threading.Lock()
_BUNDLE_KEYS = ('name', 'city', 'county', 'state', 'country',
'postal_code', 'timezone', 'landclass', 'elevation_m')
# planet-DEM elevation source (single PMTiles, replaces Valhalla /height).
# Instantiated once at import; the underlying mmap is lazy. None if unavailable.
try:
_DEM = DEMReader()
except Exception as e: # pragma: no cover - depends on PMTiles availability
logger.warning("DEMReader unavailable, elevation will be null: %s", e)
_DEM = None
def _spatialite_blob_to_wkb(blob):
"""Recover standard WKB from a SpatiaLite geometry BLOB.
Layout: [00][endian][srid:4][mbr:32][7C][WKB body][FE]. The body omits the
leading byte-order marker, so we re-prepend it and drop the trailing 0xFE.
"""
return bytes([blob[1]]) + blob[39:-1]
def _reverse_photon(lat, lon):
"""Nearest-feature admin fields from local Photon. Returns the six address
fields (any value may be None). Mirrors the existing /api/reverse call."""
import requests as http_requests
resp = http_requests.get(
f"{PHOTON_URL}/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
features = resp.json().get("features", [])
if not features:
return {}
props = features[0].get("properties", {})
return {
"name": props.get("name"),
"city": props.get("city"),
"county": props.get("county"),
"state": props.get("state"),
"country": props.get("country"),
"postal_code": props.get("postcode"),
}
def _reverse_timezone(lat, lon):
"""IANA tzid for the point from local timezones.sqlite (SpatiaLite tz_world).
Uses the table's R-tree index for an MBR prefilter, then shapely
point-in-polygon on the few candidates. Returns None if unresolved.
"""
from shapely import wkb
from shapely.geometry import Point
con = sqlite3.connect(f"file:{_TZ_DB_PATH}?mode=ro", uri=True)
try:
cur = con.cursor()
cur.execute(
"SELECT pkid FROM idx_tz_world_geom "
"WHERE xmin<=? AND xmax>=? AND ymin<=? AND ymax>=?",
(lon, lon, lat, lat),
)
candidates = [r[0] for r in cur.fetchall()]
if not candidates:
return None
pt = Point(lon, lat)
for pk in candidates:
row = cur.execute(
"SELECT tzid, geom FROM tz_world WHERE pk_uid=?", (pk,)
).fetchone()
if row and wkb.loads(_spatialite_blob_to_wkb(row[1])).contains(pt):
return row[0]
return None
finally:
con.close()
def _reverse_landclass(lat, lon):
"""Most-specific PAD-US land class for the point, looked up in-process.
Returns None when there is no coverage or landclass is unavailable."""
from .landclass import lookup_landclass, format_summary
return format_summary(lookup_landclass(lat, lon))
def _reverse_elevation(lat, lon):
"""Elevation in metres from the planet-DEM PMTiles — the single elevation
source per OFFROUTE-ARCHITECTURE.md §9. None on failure, on untiled points
(e.g. true ocean), or if DEMReader could not be initialized at startup."""
if _DEM is None:
return None
return _DEM.sample_point(lat, lon)
@geocode_bp.route('/api/reverse/<lat>/<lon>')
def api_reverse_bundle(lat, lon):
"""Localhost-sourced reverse-geocode enrichment bundle for Central.
GET /api/reverse/<lat>/<lon>
Always returns 200 with EXACTLY these keys (any may be null):
name, city, county, state, country, postal_code, timezone, landclass, elevation_m
lat/lon are parsed manually (not via Flask's <float:> converter, which
rejects negative and integer coordinates) so out-of-range or unparseable
input yields 400 per contract; 503 is reserved for catastrophic failure.
"""
try:
lat = float(lat)
lon = float(lon)
except (ValueError, TypeError):
return jsonify({'error': 'lat and lon must be numbers'}), 400
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return jsonify({'error': 'lat must be -90..90, lon must be -180..180'}), 400
key = (round(lat, 4), round(lon, 4))
with _REVERSE_BUNDLE_LOCK:
cached = _REVERSE_BUNDLE_CACHE.get(key)
if cached is not None:
return jsonify(cached)
bundle = {k: None for k in _BUNDLE_KEYS}
try:
bundle.update(_reverse_photon(lat, lon))
except Exception:
logger.warning("reverse-bundle: Photon lookup failed for %s,%s", lat, lon)
try:
bundle['timezone'] = _reverse_timezone(lat, lon)
except Exception:
logger.warning("reverse-bundle: timezone lookup failed for %s,%s", lat, lon)
try:
bundle['landclass'] = _reverse_landclass(lat, lon)
except Exception:
logger.warning("reverse-bundle: landclass lookup failed for %s,%s", lat, lon)
try:
bundle['elevation_m'] = _reverse_elevation(lat, lon)
except Exception:
logger.warning("reverse-bundle: elevation lookup failed for %s,%s", lat, lon)
with _REVERSE_BUNDLE_LOCK:
_REVERSE_BUNDLE_CACHE[key] = bundle
return jsonify(bundle)

View file

@ -1,171 +0,0 @@
#!/usr/bin/env python3
"""Tests for the /api/reverse/<lat>/<lon> enrichment bundle (lib.netsyms_api).
Photon/DEM/landclass are mocked so the suite runs without live services;
one timezone test exercises the real SpatiaLite DB when it is present. Plain
asserts + a __main__ runner, matching the rest of lib/*_test.py.
"""
import os
import sys
from unittest import mock
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from flask import Flask
from lib import netsyms_api
EXPECTED_KEYS = set(netsyms_api._BUNDLE_KEYS)
def _client():
app = Flask(__name__)
app.register_blueprint(netsyms_api.geocode_bp)
return app.test_client()
def _clear_cache():
netsyms_api._REVERSE_BUNDLE_CACHE.clear()
def test_happy_path():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon', return_value={
'name': 'Where you are', 'city': 'Boise', 'county': 'Ada',
'state': 'Idaho', 'country': 'United States', 'postal_code': '83701'}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value='America/Boise'), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value='Boise National Forest'), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=824):
resp = _client().get('/api/reverse/43.6150/-116.2023')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS, data.keys()
assert data['city'] == 'Boise' and data['timezone'] == 'America/Boise'
assert data['landclass'] == 'Boise National Forest' and data['elevation_m'] == 824
print(" PASS: happy path — all 9 fields populated, exact key set")
def test_negative_and_integer_coords_parse():
# Regression: Flask's <float:> converter would 404 these; manual parse must not.
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon', return_value={}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=None):
for path in ('/api/reverse/43.6/-116.2', '/api/reverse/43/-116'):
resp = _client().get(path)
assert resp.status_code == 200, f"{path} -> {resp.status_code}"
assert set(resp.get_json().keys()) == EXPECTED_KEYS
print(" PASS: negative and integer coordinates parse (200, not 404)")
def test_partial_failure_returns_200_with_nulls():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon',
side_effect=RuntimeError('photon down')), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value='America/Boise'), \
mock.patch.object(netsyms_api, '_reverse_landclass',
side_effect=RuntimeError('postgis down')), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=824):
resp = _client().get('/api/reverse/43.6150/-116.2023')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS
assert data['name'] is None and data['city'] is None # photon failed -> nulls
assert data['landclass'] is None # landclass failed -> null
assert data['timezone'] == 'America/Boise' and data['elevation_m'] == 824
print(" PASS: per-component failure -> 200 with nulls, no 5xx")
def test_ocean_point_mostly_null():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon', return_value={}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value='Etc/GMT+2'), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=0):
resp = _client().get('/api/reverse/0.0/-30.0')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS
assert data['city'] is None and data['country'] is None and data['landclass'] is None
print(" PASS: ocean point -> 200, mostly null")
def test_invalid_input_400():
_clear_cache()
client = _client()
for path in ('/api/reverse/9999/0', '/api/reverse/0/9999', '/api/reverse/abc/0'):
resp = client.get(path)
assert resp.status_code == 400, f"{path} -> {resp.status_code}"
print(" PASS: out-of-range / unparseable input -> 400")
def test_cache_hit_serves_without_recompute():
_clear_cache()
with mock.patch.object(netsyms_api, '_reverse_photon',
return_value={'name': 'X'}) as m_photon, \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_elevation', return_value=None):
client = _client()
client.get('/api/reverse/12.3456/-65.4321')
client.get('/api/reverse/12.3456/-65.4321') # same key (rounded) -> cached
assert m_photon.call_count == 1, f"expected 1 compute, got {m_photon.call_count}"
print(" PASS: second identical request served from cache (no recompute)")
def test_real_timezone_db():
if not os.path.exists(netsyms_api._TZ_DB_PATH):
print(" SKIP: real timezone test (timezones.sqlite not present)")
return
assert netsyms_api._reverse_timezone(43.6150, -116.2023) == 'America/Boise'
assert netsyms_api._reverse_timezone(40.7128, -74.0060) == 'America/New_York'
print(" PASS: real timezones.sqlite point-in-polygon")
def test_elevation_from_dem_reader_mock():
# elevation_m comes from DEMReader.sample_point (not Valhalla); other
# components stubbed to null so the bundle is hermetic.
_clear_cache()
fake_dem = mock.Mock()
fake_dem.sample_point.return_value = 824
with mock.patch.object(netsyms_api, '_DEM', fake_dem), \
mock.patch.object(netsyms_api, '_reverse_photon', return_value={}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None):
resp = _client().get('/api/reverse/43.6150/-116.2023')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS
assert data['elevation_m'] == 824, data['elevation_m']
fake_dem.sample_point.assert_called_once()
print(" PASS: elevation_m sourced from DEMReader.sample_point")
def test_elevation_dem_unavailable():
# DEMReader failed to init at startup (_DEM is None) -> elevation_m null, 200.
_clear_cache()
with mock.patch.object(netsyms_api, '_DEM', None), \
mock.patch.object(netsyms_api, '_reverse_photon', return_value={}), \
mock.patch.object(netsyms_api, '_reverse_timezone', return_value=None), \
mock.patch.object(netsyms_api, '_reverse_landclass', return_value=None):
resp = _client().get('/api/reverse/43.6150/-116.2023')
assert resp.status_code == 200, resp.status_code
data = resp.get_json()
assert set(data.keys()) == EXPECTED_KEYS
assert data['elevation_m'] is None
print(" PASS: DEMReader unavailable -> elevation_m null, still 200")
if __name__ == '__main__':
print("Running reverse-bundle tests...")
test_happy_path()
test_negative_and_integer_coords_parse()
test_partial_failure_returns_200_with_nulls()
test_ocean_point_mostly_null()
test_invalid_input_400()
test_cache_hit_serves_without_recompute()
test_real_timezone_db()
test_elevation_from_dem_reader_mock()
test_elevation_dem_unavailable()
print("All tests passed.")