mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
feat(telemetry-separation): dedicated /telemetry tab split from /events by adapter data_class (v0.7.4)
PR #5 of the v0.7.x GUI rework arc. Production code; central-gui restart only (supervisor untouched -- data_class is read only by central-gui per request). - SourceAdapter gains a `data_class` class attr (Literal["event","telemetry"], default "event"). NWIS opts in as "telemetry" (continuous high-volume water gauges); every other adapter stays "event". The /events vs /telemetry split is thus registry-derived from class attrs -- no hardcoded adapter-name lists. - routes.py refactor: `_class_adapter_names(data_class)` and a `data_class` arg on `_adapter_filter_options` scope the flat + domain-grouped chip/legend lists to a class (colors stay keyed to the FULL registry, so an adapter keeps one color across tabs). `_fetch_events` accepts `class_adapters` and adds an `adapter = ANY(...)` condition. Shared `_events_query`, `_events_page(data_class, base_path)` and `_events_rows_fragment(...)` back both tabs; `/events`, `/events/rows`, `/telemetry`, `/telemetry/rows` are thin wrappers. - Templates parameterized with a `base_path` context var (form action, hx-get, hx-push-url header, clear-all redirect, JS BASE_PATH const); the `_events_rows` paginator macro takes `base`. Same templates serve both tabs; nav gains a Telemetry link. - /events.json UNCHANGED -- the cursor path sets no `class_adapters`, so the subject + pagination contract is intact (TestEventsJsonSubject still passes). Adds TestTelemetrySeparation (data_class defaults, registry split 11 event / 1 telemetry, class-scoped filter options, color stability, and the `adapter = ANY(...)` SQL shape incl. the no-class events.json path). Updates the events frontend tests for the base_path-parameterized templates. Full suite: 682 passed, 1 skipped (central and unprivileged zvx, 3x each). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
76d519bcd5
commit
8d193d3266
9 changed files with 247 additions and 92 deletions
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
|
@ -48,6 +48,12 @@ class SourceAdapter(ABC):
|
|||
latitude and longitude; the supervisor extracts them, runs registered
|
||||
enrichers, and attaches results under Event.data["_enriched"]."""
|
||||
|
||||
data_class: Literal["event", "telemetry"] = "event"
|
||||
"""How the GUI classifies this source. "event" (default) = discrete events,
|
||||
shown on /events. "telemetry" = continuous/high-volume feeds (e.g. NWIS water
|
||||
gauges) that would drown discrete-event signal; shown on /telemetry instead.
|
||||
GUI-only — does not affect publishing or the events.json contract."""
|
||||
|
||||
@abstractmethod
|
||||
async def poll(self) -> AsyncIterator[Event]:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -127,6 +127,9 @@ class NWISAdapter(SourceAdapter):
|
|||
# Site lat/lon mirrored from Geo.centroid into event.data (see _build_event).
|
||||
enrichment_locations = [("latitude", "longitude")]
|
||||
|
||||
# Continuous high-volume water-gauge feed -> the /telemetry tab, not /events.
|
||||
data_class = "telemetry"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: AdapterConfig,
|
||||
|
|
|
|||
|
|
@ -2703,17 +2703,30 @@ def _resolve_time(token: str | None, now: datetime):
|
|||
return None, None, False, f"Unknown time preset: {token}"
|
||||
|
||||
|
||||
def _adapter_filter_options():
|
||||
def _class_adapter_names(data_class: str) -> list[str]:
|
||||
"""Registry-derived adapter names for a data_class ('event' | 'telemetry')."""
|
||||
return sorted(
|
||||
name for name, cls in discover_adapters().items()
|
||||
if getattr(cls, "data_class", "event") == data_class
|
||||
)
|
||||
|
||||
|
||||
def _adapter_filter_options(data_class: str | None = None):
|
||||
"""Registry-derived (flat, grouped) adapter options for the chip-picker.
|
||||
|
||||
flat: [{name, display_name, color}] sorted by name (color by sorted index,
|
||||
matching the map legend). grouped: [(group_label, [{value,label,color}])].
|
||||
flat: [{name, display_name, color}] sorted by name. grouped:
|
||||
[(group_label, [{value,label,color}])]. Colors are keyed to the FULL sorted
|
||||
registry (stable per-adapter across the /events and /telemetry tabs). When
|
||||
data_class is given, only that class's adapters are returned.
|
||||
"""
|
||||
reg = discover_adapters()
|
||||
ordered = sorted(reg.values(), key=lambda c: c.name)
|
||||
color_by_name = {
|
||||
cls.name: EVENTS_PALETTE[i % len(EVENTS_PALETTE)] for i, cls in enumerate(ordered)
|
||||
}
|
||||
if data_class is not None:
|
||||
ordered = [c for c in ordered if getattr(c, "data_class", "event") == data_class]
|
||||
shown = {c.name for c in ordered}
|
||||
flat = [
|
||||
{"name": cls.name, "display_name": cls.display_name, "color": color_by_name[cls.name]}
|
||||
for cls in ordered
|
||||
|
|
@ -2722,7 +2735,7 @@ def _adapter_filter_options():
|
|||
for group_label, names in ADAPTER_GROUPS.items():
|
||||
items = [
|
||||
{"value": n, "label": reg[n].display_name, "color": color_by_name[n]}
|
||||
for n in names if n in reg
|
||||
for n in names if n in reg and n in shown
|
||||
]
|
||||
if items:
|
||||
grouped.append((group_label, items))
|
||||
|
|
@ -2992,6 +3005,7 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
|
|||
limit = parsed_params["limit"]
|
||||
q = parsed_params.get("q")
|
||||
adapters = parsed_params.get("adapters") or []
|
||||
class_adapters = parsed_params.get("class_adapters") # data_class split (GUI tabs)
|
||||
categories = parsed_params.get("categories") or []
|
||||
event_types = parsed_params.get("event_types") or []
|
||||
severities = parsed_params.get("severities") or []
|
||||
|
|
@ -3022,6 +3036,12 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
|
|||
query_params.append(adapters)
|
||||
param_idx += 1
|
||||
|
||||
if class_adapters is not None:
|
||||
# data_class split (/events vs /telemetry); registry-derived name list.
|
||||
conditions.append(f"adapter = ANY(${param_idx})")
|
||||
query_params.append(class_adapters)
|
||||
param_idx += 1
|
||||
|
||||
if categories:
|
||||
conditions.append(f"category = ANY(${param_idx})")
|
||||
query_params.append(categories)
|
||||
|
|
@ -3250,59 +3270,33 @@ async def events_json(request: Request):
|
|||
|
||||
# --- Events feed frontend routes ---
|
||||
|
||||
@router.get("/events", response_class=HTMLResponse)
|
||||
async def events_list(request: Request) -> HTMLResponse:
|
||||
"""Events feed page with filter form, table, and map."""
|
||||
templates = _get_templates()
|
||||
operator = getattr(request.state, "operator", None)
|
||||
csrf_token = getattr(request.state, "csrf_token", "")
|
||||
async def _events_query(request: Request, data_class: str):
|
||||
"""Shared parse + class-scoped fetch for the /events and /telemetry tabs.
|
||||
|
||||
Returns (parsed, error, events, next_cursor, total, class_adapters).
|
||||
The data_class split is registry-derived and injected as `class_adapters`,
|
||||
which _fetch_events applies as an `adapter = ANY(...)` condition.
|
||||
"""
|
||||
params = request.query_params
|
||||
|
||||
# Parse parameters (GUI defaults to Last 24h when no time filter is given).
|
||||
parsed, error = _parse_events_params(params, default_time=DEFAULT_TIME, default_offset=0)
|
||||
class_adapters = _class_adapter_names(data_class)
|
||||
if parsed is not None:
|
||||
parsed["class_adapters"] = class_adapters
|
||||
|
||||
# Get system settings for map tiles + DISTINCT filter-option lists.
|
||||
pool = get_pool()
|
||||
all_categories: list[str] = []
|
||||
all_event_types: list[str] = []
|
||||
async with pool.acquire() as conn:
|
||||
system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system")
|
||||
try:
|
||||
cat_rows = await conn.fetch("SELECT DISTINCT category FROM events ORDER BY 1")
|
||||
all_categories = [r["category"] for r in cat_rows]
|
||||
et_rows = await conn.fetch(
|
||||
"SELECT DISTINCT split_part(category, '.', 1) AS et FROM events ORDER BY 1"
|
||||
)
|
||||
all_event_types = [r["et"] for r in et_rows]
|
||||
except Exception:
|
||||
logger.warning("Failed to load filter options", exc_info=True)
|
||||
|
||||
tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
|
||||
tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap"
|
||||
|
||||
events = []
|
||||
next_cursor = None
|
||||
total = 0
|
||||
if error:
|
||||
# Validation error - show error banner but don't fail the page
|
||||
pass
|
||||
else:
|
||||
events, next_cursor, total = [], None, 0
|
||||
if not error:
|
||||
result = await _fetch_events(parsed)
|
||||
if result.error:
|
||||
error = result.error
|
||||
else:
|
||||
events = result.events
|
||||
next_cursor = result.next_cursor
|
||||
total = result.total or 0
|
||||
|
||||
events, next_cursor, total = result.events, result.next_cursor, result.total or 0
|
||||
_decorate_table_events(events)
|
||||
return parsed, error, events, next_cursor, total, class_adapters
|
||||
|
||||
pagination = _build_pagination(total, (parsed or {}).get("offset") or 0,
|
||||
(parsed or {}).get("limit") or 50)
|
||||
adapters_flat, adapters_grouped = _adapter_filter_options()
|
||||
|
||||
def _events_filter_state(parsed: dict | None, params) -> dict:
|
||||
pstate = parsed or {}
|
||||
filter_state = {
|
||||
return {
|
||||
"q": pstate.get("q") or "",
|
||||
"adapters": pstate.get("adapters") or [],
|
||||
"categories": pstate.get("categories") or [],
|
||||
|
|
@ -3316,8 +3310,46 @@ async def events_list(request: Request) -> HTMLResponse:
|
|||
"map_filter": pstate.get("map_filter", False),
|
||||
"limit": str(pstate.get("limit", 50)),
|
||||
}
|
||||
active_pills = _build_active_pills(pstate, len(adapters_flat)) if parsed else []
|
||||
# Paginator links append offset; keep cursor + offset out of the carried qs.
|
||||
|
||||
|
||||
async def _events_page(request: Request, data_class: str, base_path: str) -> HTMLResponse:
|
||||
"""Full events/telemetry page (shared by /events and /telemetry)."""
|
||||
templates = _get_templates()
|
||||
operator = getattr(request.state, "operator", None)
|
||||
csrf_token = getattr(request.state, "csrf_token", "")
|
||||
params = request.query_params
|
||||
|
||||
parsed, error, events, next_cursor, total, class_adapters = \
|
||||
await _events_query(request, data_class)
|
||||
|
||||
# System map tiles + DISTINCT filter-option lists, scoped to this data_class.
|
||||
pool = get_pool()
|
||||
all_categories: list[str] = []
|
||||
all_event_types: list[str] = []
|
||||
async with pool.acquire() as conn:
|
||||
system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system")
|
||||
try:
|
||||
cat_rows = await conn.fetch(
|
||||
"SELECT DISTINCT category FROM events WHERE adapter = ANY($1) ORDER BY 1",
|
||||
class_adapters,
|
||||
)
|
||||
all_categories = [r["category"] for r in cat_rows]
|
||||
et_rows = await conn.fetch(
|
||||
"SELECT DISTINCT split_part(category, '.', 1) AS et FROM events "
|
||||
"WHERE adapter = ANY($1) ORDER BY 1",
|
||||
class_adapters,
|
||||
)
|
||||
all_event_types = [r["et"] for r in et_rows]
|
||||
except Exception:
|
||||
logger.warning("Failed to load filter options", exc_info=True)
|
||||
|
||||
tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
|
||||
tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap"
|
||||
|
||||
pagination = _build_pagination(total, (parsed or {}).get("offset") or 0,
|
||||
(parsed or {}).get("limit") or 50)
|
||||
adapters_flat, adapters_grouped = _adapter_filter_options(data_class)
|
||||
active_pills = _build_active_pills(parsed or {}, len(adapters_flat)) if parsed else []
|
||||
query_string = urlencode([(k, v) for k, v in _query_items(params)
|
||||
if k not in ("cursor", "offset")])
|
||||
|
||||
|
|
@ -3327,6 +3359,7 @@ async def events_list(request: Request) -> HTMLResponse:
|
|||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": csrf_token,
|
||||
"base_path": base_path,
|
||||
"events": events,
|
||||
"next_cursor": next_cursor,
|
||||
"pagination": pagination,
|
||||
|
|
@ -3340,42 +3373,23 @@ async def events_list(request: Request) -> HTMLResponse:
|
|||
"severity_order": SEVERITY_ORDER,
|
||||
"time_presets": [(t, TIME_PRESET_LABELS[t]) for t in
|
||||
("last_15m", "last_1h", "last_6h", "last_24h", "last_7d", "active", "all")],
|
||||
"filter_state": filter_state,
|
||||
"filter_state": _events_filter_state(parsed, params),
|
||||
"active_pills": active_pills,
|
||||
"query_string": query_string,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/events/rows", response_class=HTMLResponse)
|
||||
async def events_rows(request: Request) -> HTMLResponse:
|
||||
"""HTMX fragment: events table rows only (no page chrome)."""
|
||||
async def _events_rows_fragment(request: Request, data_class: str, base_path: str) -> HTMLResponse:
|
||||
"""HTMX rows fragment (shared by /events/rows and /telemetry/rows)."""
|
||||
templates = _get_templates()
|
||||
|
||||
params = request.query_params
|
||||
|
||||
# Parse parameters (same GUI default as the page).
|
||||
parsed, error = _parse_events_params(params, default_time=DEFAULT_TIME, default_offset=0)
|
||||
|
||||
events = []
|
||||
next_cursor = None
|
||||
total = 0
|
||||
if error:
|
||||
pass
|
||||
else:
|
||||
result = await _fetch_events(parsed)
|
||||
if result.error:
|
||||
error = result.error
|
||||
else:
|
||||
events = result.events
|
||||
next_cursor = result.next_cursor
|
||||
total = result.total or 0
|
||||
|
||||
_decorate_table_events(events)
|
||||
parsed, error, events, next_cursor, total, _ = await _events_query(request, data_class)
|
||||
|
||||
pagination = _build_pagination(total, (parsed or {}).get("offset") or 0,
|
||||
(parsed or {}).get("limit") or 50)
|
||||
adapters_flat, _ = _adapter_filter_options()
|
||||
adapters_flat, _ = _adapter_filter_options(data_class)
|
||||
active_pills = _build_active_pills(parsed or {}, len(adapters_flat)) if parsed else []
|
||||
query_string = urlencode([(k, v) for k, v in _query_items(params)
|
||||
if k not in ("cursor", "offset")])
|
||||
|
|
@ -3384,6 +3398,7 @@ async def events_rows(request: Request) -> HTMLResponse:
|
|||
request=request,
|
||||
name="_events_rows.html",
|
||||
context={
|
||||
"base_path": base_path,
|
||||
"events": events,
|
||||
"next_cursor": next_cursor,
|
||||
"pagination": pagination,
|
||||
|
|
@ -3393,7 +3408,28 @@ async def events_rows(request: Request) -> HTMLResponse:
|
|||
"oob_pills": True, # emit an out-of-band #active-pills update on swap
|
||||
},
|
||||
)
|
||||
# Push the bookmarkable full-page URL (not the /events/rows fragment path),
|
||||
# so back/forward + bookmarking land on the same filtered view.
|
||||
response.headers["HX-Push-Url"] = "/events?" + str(request.url.query)
|
||||
# Push the bookmarkable full-page URL (not the fragment path).
|
||||
response.headers["HX-Push-Url"] = base_path + "?" + str(request.url.query)
|
||||
return response
|
||||
|
||||
|
||||
@router.get("/events", response_class=HTMLResponse)
|
||||
async def events_list(request: Request) -> HTMLResponse:
|
||||
"""Events feed page (data_class=event): filter form, table, and map."""
|
||||
return await _events_page(request, "event", "/events")
|
||||
|
||||
|
||||
@router.get("/events/rows", response_class=HTMLResponse)
|
||||
async def events_rows(request: Request) -> HTMLResponse:
|
||||
return await _events_rows_fragment(request, "event", "/events")
|
||||
|
||||
|
||||
@router.get("/telemetry", response_class=HTMLResponse)
|
||||
async def telemetry_list(request: Request) -> HTMLResponse:
|
||||
"""Telemetry feed page (data_class=telemetry; e.g. NWIS). Same shape as /events."""
|
||||
return await _events_page(request, "telemetry", "/telemetry")
|
||||
|
||||
|
||||
@router.get("/telemetry/rows", response_class=HTMLResponse)
|
||||
async def telemetry_rows(request: Request) -> HTMLResponse:
|
||||
return await _events_rows_fragment(request, "telemetry", "/telemetry")
|
||||
|
|
|
|||
|
|
@ -72,20 +72,21 @@
|
|||
|
||||
{# Real offset paginator (v0.7.3). Each link carries offset + the filter
|
||||
query_string (which excludes cursor/offset); limit persists via query_string. #}
|
||||
{% macro page_link(off, label, cls, extra) %}
|
||||
{# base_path passed into the macro (macros don't see render-context vars). #}
|
||||
{% macro page_link(base, off, label, cls) %}
|
||||
{% set qs = "offset=" ~ off ~ ("&" ~ query_string if query_string else "") %}
|
||||
<a href="/events?{{ qs }}" role="button" class="page-link {{ cls }}"
|
||||
hx-get="/events/rows?{{ qs }}" hx-target="#events-rows" hx-push-url="true" {{ extra }}>{{ label }}</a>
|
||||
<a href="{{ base }}?{{ qs }}" role="button" class="page-link {{ cls }}"
|
||||
hx-get="{{ base }}/rows?{{ qs }}" hx-target="#events-rows" hx-push-url="true">{{ label }}</a>
|
||||
{% endmacro %}
|
||||
<nav class="paginator" aria-label="Pagination">
|
||||
<div class="paginator-pages">
|
||||
{% if pagination.prev_offset is not none %}{{ page_link(pagination.prev_offset, "‹ Previous", "page-prev", "") }}{% else %}<span class="page-link disabled">‹ Previous</span>{% endif %}
|
||||
{% if pagination.prev_offset is not none %}{{ page_link(base_path, pagination.prev_offset, "‹ Previous", "page-prev") }}{% else %}<span class="page-link disabled">‹ Previous</span>{% endif %}
|
||||
{% for p in pagination.pages %}
|
||||
{% if p.ellipsis %}<span class="page-ellipsis">…</span>
|
||||
{% elif p.current %}<span class="page-link current" aria-current="page">{{ p.page }}</span>
|
||||
{% else %}{{ page_link(p.offset, p.page, "", "") }}{% endif %}
|
||||
{% else %}{{ page_link(base_path, p.offset, p.page, "") }}{% endif %}
|
||||
{% endfor %}
|
||||
{% if pagination.next_offset is not none %}{{ page_link(pagination.next_offset, "Next ›", "page-next", "") }}{% else %}<span class="page-link disabled">Next ›</span>{% endif %}
|
||||
{% if pagination.next_offset is not none %}{{ page_link(base_path, pagination.next_offset, "Next ›", "page-next") }}{% else %}<span class="page-link disabled">Next ›</span>{% endif %}
|
||||
</div>
|
||||
<div class="paginator-meta">
|
||||
{% if pagination.total %}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
<li><a href="/">Dashboard</a></li>
|
||||
<li><a href="/adapters">Adapters</a></li>
|
||||
<li><a href="/events">Events</a></li>
|
||||
<li><a href="/telemetry">Telemetry</a></li>
|
||||
<li><a href="/streams">Streams</a></li>
|
||||
<li><a href="/enrichment">Enrichment</a></li>
|
||||
<li><a href="/api-keys">API Keys</a></li>
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@
|
|||
"#f59e0b", "#dc2626", "#7c3aed", "#2563eb", "#059669", "#db2777",
|
||||
"#0891b2", "#65a30d", "#ea580c", "#4f46e5", "#9333ea", "#0d9488"
|
||||
] %}
|
||||
<h1>Events</h1>
|
||||
<h1>{{ "Telemetry" if base_path == "/telemetry" else "Events" }}</h1>
|
||||
|
||||
{% if filter_error %}
|
||||
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
|
||||
|
|
@ -212,8 +212,8 @@
|
|||
{% endif %}
|
||||
|
||||
{% from "_chip_picker.html" import chip_picker %}
|
||||
<form id="filter-form" class="filter-form" action="/events" method="get"
|
||||
hx-get="/events/rows" hx-target="#events-rows" hx-push-url="true">
|
||||
<form id="filter-form" class="filter-form" action="{{ base_path }}" method="get"
|
||||
hx-get="{{ base_path }}/rows" hx-target="#events-rows" hx-push-url="true">
|
||||
|
||||
{# Full-width search (server-side ILIKE over subject + location). #}
|
||||
<input type="search" id="filter-q" name="q" class="filter-search"
|
||||
|
|
@ -261,7 +261,7 @@
|
|||
|
||||
<div class="filter-actions">
|
||||
<button type="submit" class="filter-apply">Apply</button>
|
||||
<a href="/events" role="button" class="outline" id="filter-clear-all">Clear all</a>
|
||||
<a href="{{ base_path }}" role="button" class="outline" id="filter-clear-all">Clear all</a>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
|
|
@ -749,6 +749,7 @@
|
|||
(function () {
|
||||
var form = document.getElementById("filter-form");
|
||||
if (!form) return;
|
||||
var BASE_PATH = {{ base_path | tojson }}; // "/events" or "/telemetry"
|
||||
|
||||
function submitForm() { if (window.htmx) htmx.trigger(form, "submit"); }
|
||||
|
||||
|
|
@ -859,7 +860,7 @@
|
|||
submitForm();
|
||||
return;
|
||||
}
|
||||
if (e.target.closest("[data-clear-all]")) { window.location.href = "/events"; }
|
||||
if (e.target.closest("[data-clear-all]")) { window.location.href = BASE_PATH; }
|
||||
});
|
||||
|
||||
// Legend: collapse/expand toggle.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue