cleanup: remove /api/contacts + /api/address_book handlers + pull entire /nav-i/* subtree (extraction #3 shadow) (#12)

* cleanup: remove /api/address_book handlers (extraction #3 shadow)

Removes address_book_bp (lib/address_book_api.py: /api/address_book/lookup +
/api/address_book/list) + its registration in lib/api.py. Edge-shadowed since
extraction #3 — navi-contacts (:8423) serves /api/address_book/* on
navi.echo6.co; no recon-side consumer (no template/JS reference).

lib/address_book.py is KEPT — geocode.py (nickname short-circuit + annotation)
and netsyms_api.py import it.

NOT removed this PR: contacts_bp. The recon dashboard at /deleted-contacts
(recon-product, stays) calls /api/contacts/<id>/{restore,restore-as,purge} via
XHR, and recon.echo6.co proxies straight to recon:8420 (verified the Caddy
block — no navi-contacts routing there). Removing contacts_bp would break those
dashboard actions. Flagged for a decision; lib/contacts.py also stays (dashboard
ContactsDB reads). See PR body.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* cleanup: deprecate /nav-i + /deleted-contacts; remove contacts_bp + lib/contacts.py

Probe found recon's /deleted-contacts dashboard reads /opt/recon/data/contacts.db
— frozen since extraction #3 moved write ownership to navi-contacts
(/var/lib/navi-backend/contacts.db). The page has been silently rendering ~25-day
stale data, and its restore/restore-as/purge XHRs hit recon's contacts_bp (the
recon.echo6.co Caddy block proxies straight to recon:8420 — no navi-contacts
routing there). Per Matt's decision, deprecate the pages entirely; they'll be
re-surfaced later as a proper admin page consuming navi-contacts via API.

Removed:
- contacts_bp (lib/contacts_api.py, all 10 /api/contacts* routes) + its
  registration in lib/api.py — edge-shadowed by navi-contacts :8423 since #3,
  and now free of recon-product consumers once the dashboard goes.
- /nav-i (navi_landing_page) + /deleted-contacts (deleted_contacts_page) route
  handlers; templates/navi/landing.html + templates/navi/deleted_contacts.html.
- lib/contacts.py (ContactsDB) — the dashboard was its only non-contacts_bp
  consumer; both gone.
- The two dead NAVI_SUBNAV entries (Overview→/nav-i, Deleted Contacts→
  /deleted-contacts).

Kept / adapted:
- /nav-i/api-keys page (recon-product key management) stays. NAVI_SUBNAV reduced
  to just its API Keys entry; the base.html top-nav "Nav-I" link repointed
  /nav-i -> /nav-i/api-keys so the surviving section page stays reachable
  (minimal href change, not a nav restructure — flagged in PR).
- lib/address_book.py — geocode.py + netsyms_api.py still consume it (untouched).

Out-of-band follow-up after merge: delete the stale /opt/recon/data/contacts.db
(frozen 2026-04-28; data, not code).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* cleanup: pull the entire /nav-i/* subtree (api-keys page is a weaker dup of /settings/keys)

Completes the contacts cleanup by removing the rest of /nav-i/. The
/nav-i/api-keys page was (a) a weaker duplicate of /settings/keys for Gemini
(it lacked remove + reload-from-.env), and (b) a write-only-to-dead-files
surface for TomTom + Google Places: it wrote /opt/recon/.env, but the live
navi-traffic (:8421) and navi-places (:8425) services read their own
/etc/navi-backend/<svc>.env and have ignored recon's copy since extractions
#1 + #5. End state: no /nav-i/* URLs in recon.

Removed:
- /nav-i/api-keys route + template (templates/navi/api_keys.html)
- all /api/nav-i/api-keys/* endpoints (list/update/test/restart-recon)
- lib/api_keys_admin.py (its only importers were those 4 endpoints; _KEY_DEFS/
  _read_env/_write_env were private to it)
- the now-orphaned NAVI_SUBNAV
- the "Nav-I" top-nav entry in base.html (reverses the /nav-i->/nav-i/api-keys
  repoint from the previous commit, now that the page itself is gone)

Kept (Gemini's real home, recon-product):
- /settings/keys + /api/keys/* + lib/key_manager.py (KeyManager) — they import
  key_manager directly, never api_keys_admin, so untouched.

Note: TOMTOM_API_KEY now has zero recon .py references. GOOGLE_PLACES_API_KEY
still has one (lib/google_places.py), kept in the prior /api/place cleanup as
place_detail's dep; its only caller (_enrich_with_google) is unreachable since
the /api/place handlers were removed — left in place pending /api/wiki-enrich
retirement (out of scope here).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: zvx-echo6 <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
malice 2026-05-23 03:34:22 -06:00 committed by GitHub
commit d56b1d5f92
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 0 additions and 1257 deletions

View file

@ -1,31 +0,0 @@
"""
RECON Address Book API Flask Blueprint.
GET /api/address_book/lookup?q=<query> best match or 404
GET /api/address_book/list all entries
"""
from flask import Blueprint, request, jsonify
from . import address_book
address_book_bp = Blueprint('address_book', __name__)
@address_book_bp.route('/api/address_book/lookup')
def api_address_book_lookup():
q = request.args.get('q', '').strip()
if not q:
return jsonify({'error': 'Missing q parameter'}), 400
result = address_book.lookup(q)
if result is None:
return '', 404
return jsonify(result)
@address_book_bp.route('/api/address_book/list')
def api_address_book_list():
entries = address_book.list_all()
return jsonify(entries)

View file

@ -59,14 +59,6 @@ class _LargeZimRequest(_FlaskRequest):
return super()._get_file_stream(total_content_length, content_type, filename, content_length)
app.request_class = _LargeZimRequest
# ── Address Book Blueprint ──
from .address_book_api import address_book_bp
app.register_blueprint(address_book_bp)
# ── Contacts Blueprint ──
from .contacts_api import contacts_bp
app.register_blueprint(contacts_bp)
# ── Netsyms + Geocode Blueprints ──
from .netsyms_api import netsyms_bp, geocode_bp
app.register_blueprint(netsyms_bp)
@ -109,12 +101,6 @@ SETTINGS_SUBNAV = [
{'href': '/settings/health', 'label': 'Service Health'},
]
NAVI_SUBNAV = [
{'href': '/nav-i', 'label': 'Overview'},
{'href': '/deleted-contacts', 'label': 'Deleted Contacts'},
{'href': '/nav-i/api-keys', 'label': 'API Keys'},
]
def _format_source_citation(payload):
"""Format a human-readable citation from a search result payload."""
@ -341,36 +327,6 @@ def failures_page():
failures=failures)
@app.route("/deleted-contacts")
def deleted_contacts_page():
from .auth import get_user_id
from .contacts import ContactsDB
user_id = get_user_id() or "anonymous"
db = ContactsDB()
contacts = db.list_deleted(user_id)
return render_template("navi/deleted_contacts.html",
domain="navi", subnav=NAVI_SUBNAV, active_page="/deleted-contacts",
contacts=contacts)
@app.route("/nav-i")
def navi_landing_page():
from .auth import get_user_id
from .contacts import ContactsDB
user_id = get_user_id() or "anonymous"
db = ContactsDB()
deleted_count = len(db.list_deleted(user_id))
return render_template("navi/landing.html",
domain="navi", subnav=NAVI_SUBNAV, active_page="/nav-i",
deleted_count=deleted_count)
@app.route("/nav-i/api-keys")
def navi_api_keys_page():
return render_template("navi/api_keys.html",
domain="navi", subnav=NAVI_SUBNAV, active_page="/nav-i/api-keys")
@app.route('/peertube')
def peertube_dashboard():
return render_template('peertube/dashboard.html',
@ -1408,60 +1364,6 @@ def api_keys_reload():
# ── Nav-I API Key Admin ──
@app.route('/api/nav-i/api-keys/list', methods=['GET'])
def navi_api_keys_list():
from .api_keys_admin import list_keys
return jsonify({'keys': list_keys()})
@app.route('/api/nav-i/api-keys/update', methods=['POST'])
def navi_api_keys_update():
from .auth import require_auth
from .api_keys_admin import update_key, update_gemini_key
data = request.get_json(force=True)
name = data.get('name', '')
new_value = data.get('new_value', '')
index = data.get('index') # optional, for Gemini key replacement
if not name or not new_value:
return jsonify({'error': 'name and new_value required'}), 400
if name == 'GEMINI_KEY' and index is not None:
result = update_gemini_key(int(index), new_value)
else:
result = update_key(name, new_value)
if result.get('success'):
return jsonify(result)
return jsonify(result), 400
@app.route('/api/nav-i/api-keys/test', methods=['POST'])
def navi_api_keys_test():
from .api_keys_admin import test_key
data = request.get_json(force=True)
name = data.get('name', '')
index = data.get('index') # optional, for testing specific Gemini key
if not name:
return jsonify({'error': 'name required'}), 400
result = test_key(name, index=int(index) if index is not None else None)
return jsonify(result)
@app.route('/api/nav-i/api-keys/restart-recon', methods=['POST'])
def navi_api_keys_restart():
import subprocess
try:
result = subprocess.run(
['sudo', 'systemctl', 'restart', 'recon'],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return jsonify({'success': True, 'note': 'RECON service restarted'})
return jsonify({'success': False, 'error': result.stderr.strip()}), 500
except subprocess.TimeoutExpired:
return jsonify({'success': False, 'error': 'Restart timed out'}), 500
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ── YouTube Cookie Management ──

View file

@ -1,358 +0,0 @@
"""
Nav-I API Keys Admin unified view/update/test for third-party API keys.
Manages three provider categories:
- Gemini (multiple keys via KeyManager singleton)
- TomTom (single key in .env)
- Google Places (single key in .env)
All key values are masked in responses. Full values never leave the server
except as user-supplied input on update.
"""
import os
import re
import shutil
import tempfile
import time
import requests as http_requests
from .utils import setup_logging
logger = setup_logging('recon.api_keys_admin')
ENV_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
# Key definitions: env_name → display metadata
_KEY_DEFS = {
'TOMTOM_API_KEY': {
'display_name': 'TomTom',
'provider': 'tomtom',
},
'GOOGLE_PLACES_API_KEY': {
'display_name': 'Google Places',
'provider': 'google_places',
},
}
# ── .env read/write helpers ─────────────────────────────────────────────
def _read_env():
"""Read .env file into a dict of key=value pairs, preserving order."""
entries = [] # list of (key, value, raw_line) — preserves order and comments
if not os.path.exists(ENV_PATH):
return entries
with open(ENV_PATH, 'r') as f:
for line in f:
raw = line.rstrip('\n')
stripped = raw.strip()
if not stripped or stripped.startswith('#'):
entries.append((None, None, raw))
continue
m = re.match(r'^([A-Za-z_][A-Za-z0-9_]*)=(.*)$', stripped)
if m:
entries.append((m.group(1), m.group(2).strip().strip('"').strip("'"), raw))
else:
entries.append((None, None, raw))
return entries
def _write_env(entries):
"""Atomically write .env from entries list. Backs up to .env.bak first."""
# Backup current .env
if os.path.exists(ENV_PATH):
bak_path = ENV_PATH + '.bak'
shutil.copy2(ENV_PATH, bak_path)
# Write to temp file, then rename (atomic on same filesystem)
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(ENV_PATH), prefix='.env.', suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
for key, value, raw in entries:
if key is not None:
f.write(f'{key}={value}\n')
else:
f.write(raw + '\n')
os.rename(tmp_path, ENV_PATH)
except Exception:
# Clean up temp file on failure
try:
os.unlink(tmp_path)
except OSError:
pass
raise
logger.info(f"Wrote .env atomically ({len([e for e in entries if e[0]])} keys)")
def _get_env_value(name):
"""Get a single value from .env by key name."""
for key, value, _ in _read_env():
if key == name:
return value
return None
def _set_env_value(name, new_value):
"""Set a single value in .env. Adds if not present."""
entries = _read_env()
found = False
for i, (key, value, raw) in enumerate(entries):
if key == name:
entries[i] = (name, new_value, f'{name}={new_value}')
found = True
break
if not found:
entries.append((name, new_value, f'{name}={new_value}'))
_write_env(entries)
# ── Masking ─────────────────────────────────────────────────────────────
def _mask_key(value):
"""Mask a key: first 4 chars + '...' + last 4 chars. Never return full value."""
if not value:
return None
if len(value) <= 8:
return '****'
return value[:4] + '...' + value[-4:]
# ── List ────────────────────────────────────────────────────────────────
def list_keys():
"""
Return masked status of all managed API keys.
Returns list of dicts with: name, display_name, provider, masked_value,
is_set, count (for multi-key providers like Gemini).
"""
result = []
env_mtime = None
if os.path.exists(ENV_PATH):
env_mtime = time.strftime('%Y-%m-%dT%H:%M:%SZ',
time.gmtime(os.path.getmtime(ENV_PATH)))
# Gemini keys (via KeyManager)
from .key_manager import get_key_manager
km = get_key_manager()
gemini_keys = km.get_masked_keys()
gemini_count = len(gemini_keys)
# Show a single summary entry for Gemini with count
first_masked = gemini_keys[0]['masked'] if gemini_keys else None
result.append({
'name': 'GEMINI_KEY',
'display_name': 'Gemini',
'provider': 'gemini',
'masked_value': first_masked,
'is_set': gemini_count > 0,
'count': gemini_count,
'last_modified': env_mtime,
'keys': gemini_keys, # full list with per-key stats
})
# Single-value keys
for env_name, meta in _KEY_DEFS.items():
value = _get_env_value(env_name)
result.append({
'name': env_name,
'display_name': meta['display_name'],
'provider': meta['provider'],
'masked_value': _mask_key(value),
'is_set': bool(value),
'count': 1 if value else 0,
'last_modified': env_mtime,
})
return result
# ── Update ──────────────────────────────────────────────────────────────
def update_key(name, new_value):
"""
Update a key value. For Gemini, name should be 'GEMINI_KEY' with an
optional 'index' for replacing a specific key, or use the KeyManager API.
For TomTom/Google Places, writes directly to .env.
Returns dict with success status and masked value.
"""
new_value = new_value.strip()
if not new_value:
return {'success': False, 'error': 'Key value cannot be empty'}
if name == 'GEMINI_KEY':
# Use KeyManager for Gemini
from .key_manager import get_key_manager
km = get_key_manager()
try:
idx = km.add_gemini_key(new_value)
return {
'success': True,
'name': name,
'masked_value': _mask_key(new_value),
'action': 'added',
'index': idx,
}
except ValueError as e:
return {'success': False, 'error': str(e)}
if name in _KEY_DEFS:
_set_env_value(name, new_value)
return {
'success': True,
'name': name,
'masked_value': _mask_key(new_value),
'action': 'updated',
}
return {'success': False, 'error': f'Unknown key: {name}'}
def update_gemini_key(index, new_value):
"""Replace a specific Gemini key by index."""
new_value = new_value.strip()
if not new_value:
return {'success': False, 'error': 'Key value cannot be empty'}
from .key_manager import get_key_manager
km = get_key_manager()
try:
km.replace_gemini_key(index, new_value)
return {
'success': True,
'name': 'GEMINI_KEY',
'index': index,
'masked_value': _mask_key(new_value),
'action': 'replaced',
}
except (ValueError, IndexError) as e:
return {'success': False, 'error': str(e)}
# ── Test ────────────────────────────────────────────────────────────────
def test_key(name, index=None):
"""
Test a key against its provider API using the current .env value.
Returns dict with: success, latency_ms, error, note.
"""
if name == 'GEMINI_KEY':
return _test_gemini(index)
elif name == 'TOMTOM_API_KEY':
return _test_tomtom()
elif name == 'GOOGLE_PLACES_API_KEY':
return _test_google_places()
else:
return {'success': False, 'error': f'Unknown key: {name}', 'latency_ms': 0}
def _test_gemini(index=None):
"""Test Gemini key by listing models."""
from .key_manager import get_key_manager
km = get_key_manager()
if index is not None:
key = km.get_gemini_key(index)
if not key:
return {'success': False, 'error': f'Gemini key index {index} not found', 'latency_ms': 0}
else:
key = km.get_gemini_key(0)
if not key:
return {'success': False, 'error': 'No Gemini keys configured', 'latency_ms': 0}
t0 = time.time()
try:
resp = http_requests.get(
f"https://generativelanguage.googleapis.com/v1beta/models?key={key}",
timeout=10
)
latency = int((time.time() - t0) * 1000)
if resp.status_code == 200 and 'models' in resp.text:
return {'success': True, 'latency_ms': latency, 'error': None,
'note': 'Models list returned successfully'}
elif resp.status_code == 403:
return {'success': False, 'latency_ms': latency,
'error': 'Key disabled or quota exhausted'}
elif resp.status_code == 429:
return {'success': True, 'latency_ms': latency, 'error': None,
'note': 'Valid key — currently rate-limited'}
else:
return {'success': False, 'latency_ms': latency,
'error': f'HTTP {resp.status_code}'}
except Exception as e:
latency = int((time.time() - t0) * 1000)
return {'success': False, 'latency_ms': latency, 'error': str(e)}
def _test_tomtom():
"""Test TomTom key with a minimal geocode request."""
key = _get_env_value('TOMTOM_API_KEY')
if not key:
return {'success': False, 'error': 'TOMTOM_API_KEY not set', 'latency_ms': 0}
t0 = time.time()
try:
resp = http_requests.get(
f"https://api.tomtom.com/search/2/geocode/Boise.json",
params={'key': key, 'limit': 1},
timeout=10
)
latency = int((time.time() - t0) * 1000)
if resp.status_code == 200:
data = resp.json()
count = data.get('summary', {}).get('totalResults', 0)
return {'success': True, 'latency_ms': latency, 'error': None,
'note': f'Geocode returned {count} result(s)'}
elif resp.status_code == 403:
return {'success': False, 'latency_ms': latency,
'error': 'Invalid or expired key'}
else:
return {'success': False, 'latency_ms': latency,
'error': f'HTTP {resp.status_code}'}
except Exception as e:
latency = int((time.time() - t0) * 1000)
return {'success': False, 'latency_ms': latency, 'error': str(e)}
def _test_google_places():
"""Test Google Places (New) API key with a minimal searchText request."""
key = _get_env_value('GOOGLE_PLACES_API_KEY')
if not key:
return {'success': False, 'error': 'GOOGLE_PLACES_API_KEY not set', 'latency_ms': 0}
t0 = time.time()
try:
resp = http_requests.post(
"https://places.googleapis.com/v1/places:searchText",
json={'textQuery': 'Boise Idaho', 'maxResultCount': 1},
headers={
'X-Goog-Api-Key': key,
'X-Goog-FieldMask': 'places.displayName',
},
timeout=10
)
latency = int((time.time() - t0) * 1000)
if resp.status_code == 200:
data = resp.json()
count = len(data.get('places', []))
return {'success': True, 'latency_ms': latency, 'error': None,
'note': f'searchText returned {count} place(s)'}
elif resp.status_code == 403:
return {'success': False, 'latency_ms': latency,
'error': 'Key not authorized for Places API (New)'}
elif resp.status_code == 429:
return {'success': True, 'latency_ms': latency, 'error': None,
'note': 'Valid key — quota exceeded'}
else:
body = resp.text[:200]
return {'success': False, 'latency_ms': latency,
'error': f'HTTP {resp.status_code}: {body}'}
except Exception as e:
latency = int((time.time() - t0) * 1000)
return {'success': False, 'latency_ms': latency, 'error': str(e)}

View file

@ -1,230 +0,0 @@
"""
RECON Contacts Database per-user phone book with soft delete and proximity queries.
Separate DB at data/contacts.db. Thread-local connections with WAL mode (StatusDB pattern).
"""
import math
import os
import sqlite3
import threading
from datetime import datetime, timezone
_local = threading.local()
_SCHEMA = """
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
label TEXT NOT NULL,
name TEXT,
call_sign TEXT,
phone TEXT,
email TEXT,
category TEXT,
notes TEXT,
lat REAL,
lon REAL,
osm_type TEXT,
osm_id INTEGER,
address TEXT,
show_proximity INTEGER DEFAULT 0,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
deleted_at TEXT,
deleted_by TEXT
);
CREATE INDEX IF NOT EXISTS idx_contacts_user ON contacts(user_id);
CREATE INDEX IF NOT EXISTS idx_contacts_user_category ON contacts(user_id, category);
CREATE INDEX IF NOT EXISTS idx_contacts_user_deleted ON contacts(user_id, deleted_at);
CREATE INDEX IF NOT EXISTS idx_contacts_geo ON contacts(lat, lon);
CREATE UNIQUE INDEX IF NOT EXISTS idx_contacts_home_work
ON contacts(user_id, label)
WHERE label IN ('Home', 'Work') AND deleted_at IS NULL;
"""
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters."""
R = 6_371_000
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _row_to_dict(row):
"""Convert sqlite3.Row to dict, casting show_proximity to bool."""
d = dict(row)
d['show_proximity'] = bool(d.get('show_proximity', 0))
return d
class ContactsDB:
def __init__(self, db_path=None):
if db_path is None:
db_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data', 'contacts.db')
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_db()
def _get_conn(self):
if not hasattr(_local, 'contacts_conn') or _local.contacts_conn is None:
_local.contacts_conn = sqlite3.connect(self.db_path, timeout=30)
_local.contacts_conn.row_factory = sqlite3.Row
_local.contacts_conn.execute("PRAGMA journal_mode=WAL")
_local.contacts_conn.execute("PRAGMA busy_timeout=5000")
return _local.contacts_conn
def _init_db(self):
conn = self._get_conn()
conn.executescript(_SCHEMA)
conn.commit()
def list_all(self, user_id, category=None, search=None):
conn = self._get_conn()
sql = "SELECT * FROM contacts WHERE user_id = ? AND deleted_at IS NULL"
params = [user_id]
if category:
sql += " AND category = ?"
params.append(category)
if search:
sql += " AND (label LIKE ? OR name LIKE ? OR call_sign LIKE ? OR phone LIKE ?)"
like = f"%{search}%"
params.extend([like, like, like, like])
sql += " ORDER BY label"
return [_row_to_dict(r) for r in conn.execute(sql, params).fetchall()]
def list_deleted(self, user_id):
conn = self._get_conn()
rows = conn.execute(
"SELECT * FROM contacts WHERE user_id = ? AND deleted_at IS NOT NULL ORDER BY deleted_at DESC",
(user_id,)
).fetchall()
return [_row_to_dict(r) for r in rows]
def get(self, user_id, contact_id, include_deleted=False):
conn = self._get_conn()
sql = "SELECT * FROM contacts WHERE id = ? AND user_id = ?"
if not include_deleted:
sql += " AND deleted_at IS NULL"
row = conn.execute(sql, (contact_id, user_id)).fetchone()
return _row_to_dict(row) if row else None
def create(self, user_id, **fields):
conn = self._get_conn()
fields.pop('id', None)
fields.pop('user_id', None)
fields.pop('created_at', None)
fields.pop('updated_at', None)
fields.pop('deleted_at', None)
fields.pop('deleted_by', None)
if 'show_proximity' in fields:
fields['show_proximity'] = 1 if fields['show_proximity'] else 0
columns = ['user_id'] + list(fields.keys())
placeholders = ', '.join(['?'] * len(columns))
col_str = ', '.join(columns)
values = [user_id] + list(fields.values())
try:
cur = conn.execute(f"INSERT INTO contacts ({col_str}) VALUES ({placeholders})", values)
conn.commit()
return self.get(user_id, cur.lastrowid), None
except sqlite3.IntegrityError:
return None, 'conflict'
def update(self, user_id, contact_id, **fields):
conn = self._get_conn()
fields.pop('id', None)
fields.pop('user_id', None)
fields.pop('created_at', None)
fields.pop('deleted_at', None)
fields.pop('deleted_by', None)
if 'show_proximity' in fields:
fields['show_proximity'] = 1 if fields['show_proximity'] else 0
fields['updated_at'] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
sets = ', '.join(f"{k} = ?" for k in fields)
values = list(fields.values()) + [contact_id, user_id]
conn.execute(f"UPDATE contacts SET {sets} WHERE id = ? AND user_id = ? AND deleted_at IS NULL", values)
conn.commit()
return self.get(user_id, contact_id)
def soft_delete(self, user_id, contact_id):
conn = self._get_conn()
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
conn.execute(
"UPDATE contacts SET deleted_at = ?, deleted_by = ? WHERE id = ? AND user_id = ? AND deleted_at IS NULL",
(now, user_id, contact_id, user_id)
)
conn.commit()
return self.get(user_id, contact_id, include_deleted=True)
def restore(self, user_id, contact_id):
conn = self._get_conn()
row = self.get(user_id, contact_id, include_deleted=True)
if not row or not row.get('deleted_at'):
return None, 'not_found'
if row.get('label') in ('Home', 'Work'):
existing = conn.execute(
"SELECT id FROM contacts WHERE user_id = ? AND label = ? AND deleted_at IS NULL AND id != ?",
(user_id, row['label'], contact_id)
).fetchone()
if existing:
return None, 'conflict'
conn.execute(
"UPDATE contacts SET deleted_at = NULL, deleted_by = NULL WHERE id = ? AND user_id = ?",
(contact_id, user_id)
)
conn.commit()
return self.get(user_id, contact_id), None
def restore_as(self, user_id, contact_id, new_label):
"""Restore a soft-deleted contact with a new label (for Home/Work conflict resolution)."""
conn = self._get_conn()
row = self.get(user_id, contact_id, include_deleted=True)
if not row or not row.get('deleted_at'):
return None, 'not_found'
if not new_label or not new_label.strip():
return None, 'invalid_label'
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%fZ')
try:
conn.execute(
"UPDATE contacts SET deleted_at = NULL, deleted_by = NULL, label = ?, updated_at = ? WHERE id = ? AND user_id = ?",
(new_label.strip(), now, contact_id, user_id)
)
conn.commit()
except sqlite3.IntegrityError:
return None, 'conflict'
return self.get(user_id, contact_id), None
def purge(self, user_id, contact_id):
conn = self._get_conn()
row = self.get(user_id, contact_id, include_deleted=True)
if not row:
return False, 'not_found'
if not row.get('deleted_at'):
return False, 'not_deleted'
conn.execute("DELETE FROM contacts WHERE id = ? AND user_id = ?", (contact_id, user_id))
conn.commit()
return True, None
def find_nearby(self, user_id, lat, lon, radius_m=75):
conn = self._get_conn()
# Bounding box pre-filter (~111km per degree lat)
dlat = radius_m / 111_000
dlon = radius_m / (111_000 * math.cos(math.radians(lat)))
rows = conn.execute(
"""SELECT * FROM contacts
WHERE user_id = ? AND deleted_at IS NULL AND show_proximity = 1
AND lat BETWEEN ? AND ? AND lon BETWEEN ? AND ?""",
(user_id, lat - dlat, lat + dlat, lon - dlon, lon + dlon)
).fetchall()
results = []
for r in rows:
dist = _haversine_m(lat, lon, r['lat'], r['lon'])
if dist <= radius_m:
d = _row_to_dict(r)
d['distance_m'] = round(dist, 1)
results.append(d)
results.sort(key=lambda x: x['distance_m'])
return results

View file

@ -1,132 +0,0 @@
"""
RECON Contacts API Flask Blueprint.
Per-user phone book with soft delete, restore, purge, and proximity queries.
All endpoints require Authentik forward-auth (X-Authentik-Username header).
"""
from flask import Blueprint, request, jsonify
from .auth import require_auth
from .contacts import ContactsDB
contacts_bp = Blueprint('contacts', __name__)
_db = None
def _get_db():
global _db
if _db is None:
_db = ContactsDB()
return _db
@contacts_bp.route('/api/contacts', methods=['GET'])
@require_auth
def list_contacts():
db = _get_db()
category = request.args.get('category')
search = request.args.get('search')
return jsonify(db.list_all(request.user_id, category=category, search=search))
@contacts_bp.route('/api/contacts', methods=['POST'])
@require_auth
def create_contact():
db = _get_db()
data = request.get_json(force=True)
contact, err = db.create(request.user_id, **data)
if err == 'conflict':
return jsonify({'error': 'You already have a Home/Work contact'}), 409
return jsonify(contact), 201
@contacts_bp.route('/api/contacts/nearby', methods=['GET'])
@require_auth
def nearby_contacts():
db = _get_db()
lat = request.args.get('lat', type=float)
lon = request.args.get('lon', type=float)
radius_m = request.args.get('radius_m', 75, type=float)
if lat is None or lon is None:
return jsonify({'error': 'lat and lon required'}), 400
return jsonify(db.find_nearby(request.user_id, lat, lon, radius_m))
@contacts_bp.route('/api/contacts/deleted', methods=['GET'])
@require_auth
def list_deleted():
db = _get_db()
return jsonify(db.list_deleted(request.user_id))
@contacts_bp.route('/api/contacts/<int:contact_id>', methods=['GET'])
@require_auth
def get_contact(contact_id):
db = _get_db()
contact = db.get(request.user_id, contact_id)
if not contact:
return jsonify({'error': 'Not found'}), 404
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>', methods=['PATCH'])
@require_auth
def update_contact(contact_id):
db = _get_db()
data = request.get_json(force=True)
contact = db.update(request.user_id, contact_id, **data)
if not contact:
return jsonify({'error': 'Not found'}), 404
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>', methods=['DELETE'])
@require_auth
def delete_contact(contact_id):
db = _get_db()
contact = db.soft_delete(request.user_id, contact_id)
if not contact:
return jsonify({'error': 'Not found'}), 404
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>/restore', methods=['POST'])
@require_auth
def restore_contact(contact_id):
db = _get_db()
contact, err = db.restore(request.user_id, contact_id)
if err == 'not_found':
return jsonify({'error': 'Not found'}), 404
if err == 'conflict':
return jsonify({'error': 'You already have a Home/Work contact'}), 409
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>/restore-as', methods=['POST'])
@require_auth
def restore_as_contact(contact_id):
db = _get_db()
data = request.get_json(force=True)
new_label = data.get('label', '').strip()
if not new_label:
return jsonify({'error': 'label is required'}), 400
contact, err = db.restore_as(request.user_id, contact_id, new_label)
if err == 'not_found':
return jsonify({'error': 'Not found'}), 404
if err == 'invalid_label':
return jsonify({'error': 'Invalid label'}), 400
if err == 'conflict':
return jsonify({'error': 'Label conflict'}), 409
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>/purge', methods=['DELETE'])
@require_auth
def purge_contact(contact_id):
db = _get_db()
ok, err = db.purge(request.user_id, contact_id)
if err == 'not_found':
return jsonify({'error': 'Not found'}), 404
if err == 'not_deleted':
return jsonify({'error': 'Contact must be deleted before purging'}), 400
return jsonify({'ok': True})