mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-11 12:24:37 +02:00
feat(gui): add streams view (1b-6) (#21)
* feat(gui): add streams view (1b-6)
Add streams list and edit routes with live JetStream data:
- GET /streams: list all streams with live size/messages
- POST /streams/{name}: update max_age_s with validation
Features:
- Live data from JetStream (bytes, messages, timestamps)
- Graceful degradation when NATS unavailable
- Preset chip buttons (1d, 7d, 14d, 30d, 365d)
- Custom days input with Save button
- Current selection highlighted
- Managed by supervisor badge
- Audit logging with before/after max_age_s
Files:
- src/central/gui/audit.py: add STREAM_UPDATE constant
- src/central/gui/routes.py: add streams_list and streams_update handlers
- src/central/gui/templates/base.html: add Streams nav link
- src/central/gui/templates/streams_list.html: new template
- tests/test_streams.py: 9 tests covering all requirements
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(gui): use get_msg().time for stream timestamps, fix badge layout
- nats-py StreamState doesn't expose first_ts/last_ts
- Fetch timestamps via js.get_msg(stream, seq=N).time instead
- Handle edge cases: empty streams, single-message streams, get_msg failures
- Fix badge overlap using flex layout instead of float:right
- Change label from "Max bytes (config)" to "Max bytes (current)"
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
---------
Co-authored-by: Matt Johnson <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
c7d15afcba
commit
e097b504af
5 changed files with 915 additions and 0 deletions
|
|
@ -1,9 +1,12 @@
|
|||
"""Route handlers for Central GUI."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger("central.gui.routes")
|
||||
|
||||
from fastapi import APIRouter, Depends, Form, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
||||
from fastapi_csrf_protect import CsrfProtect
|
||||
|
|
@ -22,6 +25,7 @@ from central.gui.audit import (
|
|||
AUTH_LOGOUT,
|
||||
AUTH_PASSWORD_CHANGE,
|
||||
OPERATOR_CREATE,
|
||||
STREAM_UPDATE,
|
||||
write_audit,
|
||||
)
|
||||
from central.gui.db import get_pool
|
||||
|
|
@ -872,3 +876,231 @@ async def adapters_edit_submit(
|
|||
)
|
||||
|
||||
return RedirectResponse(url="/adapters", status_code=302)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Streams routes
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@router.get("/streams", response_class=HTMLResponse)
|
||||
async def streams_list(
|
||||
request: Request,
|
||||
csrf_protect: CsrfProtect = Depends(),
|
||||
) -> HTMLResponse:
|
||||
"""List all streams with live data."""
|
||||
from central.gui.nats import get_js
|
||||
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
operator = request.state.operator
|
||||
js = get_js()
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT name, max_age_s, max_bytes, managed_max_bytes, updated_at
|
||||
FROM config.streams
|
||||
ORDER BY name
|
||||
"""
|
||||
)
|
||||
|
||||
streams = []
|
||||
for row in rows:
|
||||
stream_data = {
|
||||
"name": row["name"],
|
||||
"max_age_s": row["max_age_s"],
|
||||
"max_bytes_cfg": row["max_bytes"],
|
||||
"managed_max_bytes": row["managed_max_bytes"],
|
||||
"live_bytes": None,
|
||||
"live_messages": None,
|
||||
"live_first_seq": None,
|
||||
"live_last_seq": None,
|
||||
"live_first_ts": None,
|
||||
"live_last_ts": None,
|
||||
"first_ts_error": None,
|
||||
"last_ts_error": None,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
# Fetch live data from JetStream
|
||||
if js is not None:
|
||||
try:
|
||||
info = await js.stream_info(row["name"])
|
||||
stream_data["live_bytes"] = info.state.bytes
|
||||
stream_data["live_messages"] = info.state.messages
|
||||
stream_data["live_first_seq"] = info.state.first_seq
|
||||
stream_data["live_last_seq"] = info.state.last_seq
|
||||
|
||||
# Fetch first / last message timestamps via get_msg
|
||||
# RawStreamMsg has .time attribute (not .metadata.timestamp)
|
||||
if info.state.first_seq > 0:
|
||||
try:
|
||||
first_msg = await js.get_msg(row["name"], seq=info.state.first_seq)
|
||||
stream_data["live_first_ts"] = first_msg.time
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"get_msg first failed",
|
||||
extra={"stream": row["name"], "err": type(e).__name__},
|
||||
)
|
||||
stream_data["live_first_ts"] = None
|
||||
stream_data["first_ts_error"] = type(e).__name__
|
||||
|
||||
if info.state.last_seq > 0 and info.state.last_seq != info.state.first_seq:
|
||||
try:
|
||||
last_msg = await js.get_msg(row["name"], seq=info.state.last_seq)
|
||||
stream_data["live_last_ts"] = last_msg.time
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"get_msg last failed",
|
||||
extra={"stream": row["name"], "err": type(e).__name__},
|
||||
)
|
||||
stream_data["live_last_ts"] = None
|
||||
stream_data["last_ts_error"] = type(e).__name__
|
||||
elif info.state.last_seq == info.state.first_seq and info.state.first_seq > 0:
|
||||
# Single message in stream
|
||||
stream_data["live_last_ts"] = stream_data.get("live_first_ts")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Stream info failed", extra={"stream": row["name"]})
|
||||
stream_data["error"] = f"unavailable: {type(e).__name__}"
|
||||
else:
|
||||
stream_data["error"] = "NATS unavailable"
|
||||
|
||||
streams.append(stream_data)
|
||||
|
||||
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
|
||||
response = templates.TemplateResponse(
|
||||
request=request,
|
||||
name="streams_list.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": csrf_token,
|
||||
"streams": streams,
|
||||
},
|
||||
)
|
||||
csrf_protect.set_csrf_cookie(signed_token, response)
|
||||
return response
|
||||
|
||||
|
||||
@router.post("/streams/{name}", response_class=HTMLResponse)
|
||||
async def streams_update(
|
||||
request: Request,
|
||||
name: str,
|
||||
csrf_protect: CsrfProtect = Depends(),
|
||||
) -> Response:
|
||||
"""Update stream max_age_s."""
|
||||
from central.gui.nats import get_js
|
||||
|
||||
templates = _get_templates()
|
||||
pool = get_pool()
|
||||
operator = request.state.operator
|
||||
|
||||
# Validate CSRF
|
||||
await csrf_protect.validate_csrf(request)
|
||||
|
||||
form = await request.form()
|
||||
max_age_s_str = form.get("max_age_s", "").strip()
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
# Parse max_age_s
|
||||
try:
|
||||
max_age_s = int(max_age_s_str)
|
||||
except (ValueError, TypeError):
|
||||
max_age_s = 0
|
||||
errors[name] = "max_age_s must be a valid integer"
|
||||
|
||||
# Validate range: 1 hour to 5 years
|
||||
MIN_AGE = 3600 # 1 hour
|
||||
MAX_AGE = 5 * 365 * 24 * 3600 # 5 years (157680000)
|
||||
if not errors and (max_age_s < MIN_AGE or max_age_s > MAX_AGE):
|
||||
errors[name] = f"max_age_s must be between {MIN_AGE} (1 hour) and {MAX_AGE} (5 years)"
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
# Check stream exists
|
||||
row = await conn.fetchrow(
|
||||
"SELECT name, max_age_s FROM config.streams WHERE name = $1",
|
||||
name,
|
||||
)
|
||||
|
||||
if row is None:
|
||||
return Response(status_code=404, content="Stream not found")
|
||||
|
||||
if errors:
|
||||
# Re-render with errors
|
||||
js = get_js()
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT name, max_age_s, max_bytes, managed_max_bytes, updated_at
|
||||
FROM config.streams
|
||||
ORDER BY name
|
||||
"""
|
||||
)
|
||||
|
||||
streams = []
|
||||
for r in rows:
|
||||
stream_data = {
|
||||
"name": r["name"],
|
||||
"max_age_s": r["max_age_s"],
|
||||
"max_bytes_cfg": r["max_bytes"],
|
||||
"managed_max_bytes": r["managed_max_bytes"],
|
||||
"live_bytes": None,
|
||||
"live_messages": None,
|
||||
"live_first_ts": None,
|
||||
"live_last_ts": None,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
if js is not None:
|
||||
try:
|
||||
info = await js.stream_info(r["name"])
|
||||
stream_data["live_bytes"] = info.state.bytes
|
||||
stream_data["live_messages"] = info.state.messages
|
||||
stream_data["live_first_ts"] = info.state.first_ts
|
||||
stream_data["live_last_ts"] = info.state.last_ts
|
||||
except Exception:
|
||||
stream_data["error"] = "unavailable"
|
||||
else:
|
||||
stream_data["error"] = "NATS unavailable"
|
||||
|
||||
streams.append(stream_data)
|
||||
|
||||
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
|
||||
response = templates.TemplateResponse(
|
||||
request=request,
|
||||
name="streams_list.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": csrf_token,
|
||||
"streams": streams,
|
||||
"errors": errors,
|
||||
},
|
||||
)
|
||||
csrf_protect.set_csrf_cookie(signed_token, response)
|
||||
return response
|
||||
|
||||
old_max_age_s = row["max_age_s"]
|
||||
|
||||
# Update stream
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE config.streams
|
||||
SET max_age_s = $1, updated_at = now()
|
||||
WHERE name = $2
|
||||
""",
|
||||
max_age_s,
|
||||
name,
|
||||
)
|
||||
|
||||
# Write audit log
|
||||
await write_audit(
|
||||
conn,
|
||||
STREAM_UPDATE,
|
||||
operator_id=operator.id,
|
||||
target=name,
|
||||
before={"max_age_s": old_max_age_s},
|
||||
after={"max_age_s": max_age_s},
|
||||
)
|
||||
|
||||
return RedirectResponse(url="/streams", status_code=302)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue