meshai/meshai/dashboard/api/curation_routes.py
Matt Johnson (via Claude) e3bf53ade4 feat(v0.6-4): gauge_sites + town_anchors curation tables + GUI CRUD
Closes Section A.5 (gauge_sites) and A.12 (town_anchors) of the audit
doc by lifting both Python-dict curation lists into editable SQLite
tables. Operators can add/edit/disable rows from the dashboard without
a deploy; runtime reads go through cached accessors that invalidate
when the REST API mutates state.

Schema:
  v8.sql adds gauge_sites(site_id PK, gauge_name, lat, lon, action_ft,
    flood_minor_ft, flood_moderate_ft, flood_major_ft, enabled, updated_at).
  v9.sql adds town_anchors(anchor_id AUTOINC PK, name UNIQUE, lat, lon,
    state, enabled, updated_at).
  SCHEMA_VERSION 7 -> 9.

Seed (meshai/persistence/curation.py):
  _GAUGE_SITES_SEED carries the original 9 Idaho rows from
  IDAHO_CURATED_SITES verbatim.
  _TOWN_ANCHORS_SEED carries the 29 Idaho-and-neighbor towns from
  _TOWN_COORDS verbatim.
  seed_gauge_sites() / seed_town_anchors() INSERT OR IGNORE -- safe to
  re-run; never overwrites user edits.

Handler integration:
  - meshai/central/idaho_gauge_sites.py: IDAHO_CURATED_SITES dict deleted.
    lookup_site() now calls meshai.persistence.curation.lookup_gauge_site()
    which reads the table. THRESHOLD_RANK, normalize_site_id, and
    compute_threshold_state remain in this module (CODE per Matt s rule).
  - meshai/central/nwis_handler.py drops IDAHO_CURATED_SITES from its
    import list; the table-backed lookup_site() is API-compatible.
  - meshai/central_normalizer.py: _TOWN_COORDS dict deleted.
    _compute_distance_bearing() now calls
    meshai.persistence.curation.lookup_town_anchor() with the same
    lowercased-name semantics it always used.

REST API (meshai/dashboard/api/curation_routes.py):
  /api/gauge-sites  GET list, GET one, POST add, PUT update, DELETE
  /api/town-anchors GET list, GET one, POST add, PUT update, DELETE
  Every mutation calls invalidate_curation_cache() so handler reads see
  the new state on the next call -- no container restart.

Dashboard (dashboard-frontend/src/pages/):
  - GaugeSites.tsx: table view with Add row / Edit row inline / Delete
    confirm + per-row enabled toggle. 8 columns mirror the schema.
  - TownAnchors.tsx: same pattern, 5 columns. Name is lowercased on
    save to match the lookup key.
  - Left-nav entries "Gauge Sites" (Droplets icon) and "Town Anchors"
    (MapPin icon) added to Layout.tsx; routes added to App.tsx.

Tests (tests/test_curation.py, 18 cases):
  - v8/v9 tables exist
  - Seed lands every row from both dicts
  - Seed idempotent; never overwrites user edits
  - lookup_gauge_site hits/miss, disabled rows are invisible
  - lookup_town_anchor case-insensitive
  - REST API: GET list, GET one, GET 404, POST add, PUT update, DELETE,
    POST missing-field 400; both gauge_sites + town_anchors
  - Accessor reflects API mutations after invalidate_curation_cache()

tests/test_nwis_handler.py back-compat: IDAHO_CURATED_SITES dict alias
points at _GAUGE_SITES_SEED so the existing assertion suite still passes.
tests/test_adapter_config_foundation.py schema_meta v7 -> v9 bump.

Test count: 797 -> 819 (+18 curation cases + 4 maintenance updates).
2026-06-05 20:19:13 +00:00

280 lines
9.5 KiB
Python

"""v0.6-4 REST API for gauge_sites + town_anchors curation tables.
Endpoints (mirrors the adapter_config CRUD shape):
gauge_sites:
GET /api/gauge-sites list (enabled and disabled)
GET /api/gauge-sites/{site_id} single row
POST /api/gauge-sites add new row
PUT /api/gauge-sites/{site_id} partial update
DELETE /api/gauge-sites/{site_id} remove row
town_anchors:
GET /api/town-anchors list
GET /api/town-anchors/{anchor_id} single
POST /api/town-anchors add new
PUT /api/town-anchors/{anchor_id} partial update
DELETE /api/town-anchors/{anchor_id} remove
Every mutating call invalidates the in-process curation cache so
handler-side reads (lookup_gauge_site, lookup_town_anchor) see the new
state on the next call -- no container restart.
"""
from __future__ import annotations
import logging
import time
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Request
from meshai.persistence.curation import invalidate_curation_cache
logger = logging.getLogger(__name__)
router = APIRouter(tags=["curation"])
def _get_conn():
from meshai.persistence import get_db
return get_db()
def _gauge_row_to_dict(r) -> dict:
return {
"site_id": r["site_id"],
"gauge_name": r["gauge_name"],
"lat": r["lat"],
"lon": r["lon"],
"action_ft": r["action_ft"],
"flood_minor_ft": r["flood_minor_ft"],
"flood_moderate_ft": r["flood_moderate_ft"],
"flood_major_ft": r["flood_major_ft"],
"enabled": bool(r["enabled"]),
"updated_at": r["updated_at"],
}
def _town_row_to_dict(r) -> dict:
return {
"anchor_id": r["anchor_id"],
"name": r["name"],
"lat": r["lat"],
"lon": r["lon"],
"state": r["state"],
"enabled": bool(r["enabled"]),
"updated_at": r["updated_at"],
}
# ============================================================================
# gauge_sites
# ============================================================================
@router.get("/gauge-sites")
async def list_gauges(request: Request) -> list[dict]:
conn = _get_conn()
return [_gauge_row_to_dict(r) for r in conn.execute(
"SELECT * FROM gauge_sites ORDER BY site_id"
).fetchall()]
@router.get("/gauge-sites/{site_id}")
async def get_gauge(site_id: str, request: Request) -> dict:
conn = _get_conn()
r = conn.execute(
"SELECT * FROM gauge_sites WHERE site_id=?", (site_id,)
).fetchone()
if r is None:
raise HTTPException(404, f"gauge_sites: {site_id} not found")
return _gauge_row_to_dict(r)
@router.post("/gauge-sites")
async def add_gauge(request: Request) -> dict:
body = await request.json()
if not isinstance(body, dict): raise HTTPException(400, "body must be a JSON object")
required = ("site_id", "gauge_name", "lat", "lon")
for f in required:
if f not in body:
raise HTTPException(400, f"missing required field: {f}")
if not isinstance(body["site_id"], str) or not body["site_id"].strip():
raise HTTPException(400, "site_id must be a non-empty string")
try:
lat = float(body["lat"]); lon = float(body["lon"])
except (TypeError, ValueError):
raise HTTPException(400, "lat/lon must be numeric")
conn = _get_conn()
try:
conn.execute(
"INSERT INTO gauge_sites(site_id, gauge_name, lat, lon, action_ft, "
"flood_minor_ft, flood_moderate_ft, flood_major_ft, enabled, updated_at) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
(body["site_id"].strip(),
str(body["gauge_name"]).strip(),
lat, lon,
body.get("action_ft"),
body.get("flood_minor_ft"),
body.get("flood_moderate_ft"),
body.get("flood_major_ft"),
1 if body.get("enabled", True) else 0,
time.time()),
)
except Exception as e:
raise HTTPException(400, f"insert failed: {e}")
invalidate_curation_cache()
return await get_gauge(body["site_id"].strip(), request)
@router.put("/gauge-sites/{site_id}")
async def update_gauge(site_id: str, request: Request) -> dict:
body = await request.json()
if not isinstance(body, dict): raise HTTPException(400, "body must be a JSON object")
conn = _get_conn()
existing = conn.execute(
"SELECT * FROM gauge_sites WHERE site_id=?", (site_id,)
).fetchone()
if existing is None:
raise HTTPException(404, f"gauge_sites: {site_id} not found")
fields = []
args: list[Any] = []
for col, validator in (
("gauge_name", lambda v: str(v).strip()),
("lat", float), ("lon", float),
("action_ft", lambda v: float(v) if v is not None else None),
("flood_minor_ft", lambda v: float(v) if v is not None else None),
("flood_moderate_ft", lambda v: float(v) if v is not None else None),
("flood_major_ft", lambda v: float(v) if v is not None else None),
("enabled", lambda v: 1 if bool(v) else 0),
):
if col in body:
try: args.append(validator(body[col]))
except (TypeError, ValueError):
raise HTTPException(400, f"invalid value for {col}: {body[col]!r}")
fields.append(f"{col}=?")
if not fields:
raise HTTPException(400, "no editable fields provided")
fields.append("updated_at=?")
args.append(time.time())
args.append(site_id)
conn.execute(
f"UPDATE gauge_sites SET {', '.join(fields)} WHERE site_id=?",
tuple(args),
)
invalidate_curation_cache()
return await get_gauge(site_id, request)
@router.delete("/gauge-sites/{site_id}")
async def delete_gauge(site_id: str, request: Request) -> dict:
conn = _get_conn()
cur = conn.execute("DELETE FROM gauge_sites WHERE site_id=?", (site_id,))
if cur.rowcount == 0:
raise HTTPException(404, f"gauge_sites: {site_id} not found")
invalidate_curation_cache()
return {"deleted": site_id}
# ============================================================================
# town_anchors
# ============================================================================
@router.get("/town-anchors")
async def list_towns(request: Request) -> list[dict]:
conn = _get_conn()
return [_town_row_to_dict(r) for r in conn.execute(
"SELECT * FROM town_anchors ORDER BY name"
).fetchall()]
@router.get("/town-anchors/{anchor_id}")
async def get_town(anchor_id: int, request: Request) -> dict:
conn = _get_conn()
r = conn.execute(
"SELECT * FROM town_anchors WHERE anchor_id=?", (anchor_id,)
).fetchone()
if r is None:
raise HTTPException(404, f"town_anchors: {anchor_id} not found")
return _town_row_to_dict(r)
@router.post("/town-anchors")
async def add_town(request: Request) -> dict:
body = await request.json()
if not isinstance(body, dict): raise HTTPException(400, "body must be a JSON object")
for f in ("name", "lat", "lon"):
if f not in body: raise HTTPException(400, f"missing required field: {f}")
name = str(body["name"]).strip().lower()
if not name: raise HTTPException(400, "name must be non-empty")
try:
lat = float(body["lat"]); lon = float(body["lon"])
except (TypeError, ValueError):
raise HTTPException(400, "lat/lon must be numeric")
conn = _get_conn()
try:
cur = conn.execute(
"INSERT INTO town_anchors(name, lat, lon, state, enabled, updated_at) "
"VALUES (?,?,?,?,?,?)",
(name, lat, lon, body.get("state"),
1 if body.get("enabled", True) else 0, time.time()),
)
except Exception as e:
raise HTTPException(400, f"insert failed: {e}")
invalidate_curation_cache()
return await get_town(cur.lastrowid, request)
@router.put("/town-anchors/{anchor_id}")
async def update_town(anchor_id: int, request: Request) -> dict:
body = await request.json()
if not isinstance(body, dict): raise HTTPException(400, "body must be a JSON object")
conn = _get_conn()
existing = conn.execute(
"SELECT * FROM town_anchors WHERE anchor_id=?", (anchor_id,)
).fetchone()
if existing is None:
raise HTTPException(404, f"town_anchors: {anchor_id} not found")
fields = []
args: list[Any] = []
for col, validator in (
("name", lambda v: str(v).strip().lower()),
("lat", float), ("lon", float),
("state", lambda v: str(v).strip() if v else None),
("enabled", lambda v: 1 if bool(v) else 0),
):
if col in body:
try: args.append(validator(body[col]))
except (TypeError, ValueError):
raise HTTPException(400, f"invalid value for {col}: {body[col]!r}")
fields.append(f"{col}=?")
if not fields:
raise HTTPException(400, "no editable fields provided")
fields.append("updated_at=?")
args.append(time.time())
args.append(anchor_id)
conn.execute(
f"UPDATE town_anchors SET {', '.join(fields)} WHERE anchor_id=?",
tuple(args),
)
invalidate_curation_cache()
return await get_town(anchor_id, request)
@router.delete("/town-anchors/{anchor_id}")
async def delete_town(anchor_id: int, request: Request) -> dict:
conn = _get_conn()
cur = conn.execute("DELETE FROM town_anchors WHERE anchor_id=?", (anchor_id,))
if cur.rowcount == 0:
raise HTTPException(404, f"town_anchors: {anchor_id} not found")
invalidate_curation_cache()
return {"deleted": anchor_id}