diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index bf65657..4cd1768 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -211,6 +211,7 @@ Optional class attributes (default to `None` / not-in-wizard): | `requires_api_key` | `str \| None` | Key alias if an API key is required, else `None`. | | `api_key_field` | `str \| None` | Names the `settings_schema` field that holds the api_key alias reference. The GUI renders this as a select populated from `config.api_keys`; the wizard validates it against the staged `api_keys` state. | | `wizard_order` | `int \| None` | Position in the setup wizard. `None` excludes the adapter from the wizard. | +| `data_class` | `Literal["event", "telemetry"]` | GUI classification. `"event"` (default) = discrete events, shown on `/events`. `"telemetry"` = continuous/high-volume feeds (e.g. NWIS) shown on `/telemetry` instead, so they don't drown discrete-event signal. GUI-only — does not affect publishing or the `events.json` contract. | ### 4.2 Required methods diff --git a/src/central/adapter.py b/src/central/adapter.py index b53665f..7500925 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from pydantic import BaseModel @@ -48,6 +48,12 @@ class SourceAdapter(ABC): latitude and longitude; the supervisor extracts them, runs registered enrichers, and attaches results under Event.data["_enriched"].""" + data_class: Literal["event", "telemetry"] = "event" + """How the GUI classifies this source. "event" (default) = discrete events, + shown on /events. "telemetry" = continuous/high-volume feeds (e.g. NWIS water + gauges) that would drown discrete-event signal; shown on /telemetry instead. + GUI-only — does not affect publishing or the events.json contract.""" + @abstractmethod async def poll(self) -> AsyncIterator[Event]: """ diff --git a/src/central/adapters/nwis.py b/src/central/adapters/nwis.py index 6948a69..5a7db62 100644 --- a/src/central/adapters/nwis.py +++ b/src/central/adapters/nwis.py @@ -127,6 +127,9 @@ class NWISAdapter(SourceAdapter): # Site lat/lon mirrored from Geo.centroid into event.data (see _build_event). enrichment_locations = [("latitude", "longitude")] + # Continuous high-volume water-gauge feed -> the /telemetry tab, not /events. + data_class = "telemetry" + def __init__( self, config: AdapterConfig, diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 8e1347f..cb56fa5 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2703,17 +2703,30 @@ def _resolve_time(token: str | None, now: datetime): return None, None, False, f"Unknown time preset: {token}" -def _adapter_filter_options(): +def _class_adapter_names(data_class: str) -> list[str]: + """Registry-derived adapter names for a data_class ('event' | 'telemetry').""" + return sorted( + name for name, cls in discover_adapters().items() + if getattr(cls, "data_class", "event") == data_class + ) + + +def _adapter_filter_options(data_class: str | None = None): """Registry-derived (flat, grouped) adapter options for the chip-picker. - flat: [{name, display_name, color}] sorted by name (color by sorted index, - matching the map legend). grouped: [(group_label, [{value,label,color}])]. + flat: [{name, display_name, color}] sorted by name. grouped: + [(group_label, [{value,label,color}])]. Colors are keyed to the FULL sorted + registry (stable per-adapter across the /events and /telemetry tabs). When + data_class is given, only that class's adapters are returned. """ reg = discover_adapters() ordered = sorted(reg.values(), key=lambda c: c.name) color_by_name = { cls.name: EVENTS_PALETTE[i % len(EVENTS_PALETTE)] for i, cls in enumerate(ordered) } + if data_class is not None: + ordered = [c for c in ordered if getattr(c, "data_class", "event") == data_class] + shown = {c.name for c in ordered} flat = [ {"name": cls.name, "display_name": cls.display_name, "color": color_by_name[cls.name]} for cls in ordered @@ -2722,7 +2735,7 @@ def _adapter_filter_options(): for group_label, names in ADAPTER_GROUPS.items(): items = [ {"value": n, "label": reg[n].display_name, "color": color_by_name[n]} - for n in names if n in reg + for n in names if n in reg and n in shown ] if items: grouped.append((group_label, items)) @@ -2992,6 +3005,7 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult: limit = parsed_params["limit"] q = parsed_params.get("q") adapters = parsed_params.get("adapters") or [] + class_adapters = parsed_params.get("class_adapters") # data_class split (GUI tabs) categories = parsed_params.get("categories") or [] event_types = parsed_params.get("event_types") or [] severities = parsed_params.get("severities") or [] @@ -3022,6 +3036,12 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult: query_params.append(adapters) param_idx += 1 + if class_adapters is not None: + # data_class split (/events vs /telemetry); registry-derived name list. + conditions.append(f"adapter = ANY(${param_idx})") + query_params.append(class_adapters) + param_idx += 1 + if categories: conditions.append(f"category = ANY(${param_idx})") query_params.append(categories) @@ -3250,59 +3270,33 @@ async def events_json(request: Request): # --- Events feed frontend routes --- -@router.get("/events", response_class=HTMLResponse) -async def events_list(request: Request) -> HTMLResponse: - """Events feed page with filter form, table, and map.""" - templates = _get_templates() - operator = getattr(request.state, "operator", None) - csrf_token = getattr(request.state, "csrf_token", "") +async def _events_query(request: Request, data_class: str): + """Shared parse + class-scoped fetch for the /events and /telemetry tabs. + Returns (parsed, error, events, next_cursor, total, class_adapters). + The data_class split is registry-derived and injected as `class_adapters`, + which _fetch_events applies as an `adapter = ANY(...)` condition. + """ params = request.query_params - - # Parse parameters (GUI defaults to Last 24h when no time filter is given). parsed, error = _parse_events_params(params, default_time=DEFAULT_TIME, default_offset=0) + class_adapters = _class_adapter_names(data_class) + if parsed is not None: + parsed["class_adapters"] = class_adapters - # Get system settings for map tiles + DISTINCT filter-option lists. - pool = get_pool() - all_categories: list[str] = [] - all_event_types: list[str] = [] - async with pool.acquire() as conn: - system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system") - try: - cat_rows = await conn.fetch("SELECT DISTINCT category FROM events ORDER BY 1") - all_categories = [r["category"] for r in cat_rows] - et_rows = await conn.fetch( - "SELECT DISTINCT split_part(category, '.', 1) AS et FROM events ORDER BY 1" - ) - all_event_types = [r["et"] for r in et_rows] - except Exception: - logger.warning("Failed to load filter options", exc_info=True) - - tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png" - tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap" - - events = [] - next_cursor = None - total = 0 - if error: - # Validation error - show error banner but don't fail the page - pass - else: + events, next_cursor, total = [], None, 0 + if not error: result = await _fetch_events(parsed) if result.error: error = result.error else: - events = result.events - next_cursor = result.next_cursor - total = result.total or 0 - + events, next_cursor, total = result.events, result.next_cursor, result.total or 0 _decorate_table_events(events) + return parsed, error, events, next_cursor, total, class_adapters - pagination = _build_pagination(total, (parsed or {}).get("offset") or 0, - (parsed or {}).get("limit") or 50) - adapters_flat, adapters_grouped = _adapter_filter_options() + +def _events_filter_state(parsed: dict | None, params) -> dict: pstate = parsed or {} - filter_state = { + return { "q": pstate.get("q") or "", "adapters": pstate.get("adapters") or [], "categories": pstate.get("categories") or [], @@ -3316,8 +3310,46 @@ async def events_list(request: Request) -> HTMLResponse: "map_filter": pstate.get("map_filter", False), "limit": str(pstate.get("limit", 50)), } - active_pills = _build_active_pills(pstate, len(adapters_flat)) if parsed else [] - # Paginator links append offset; keep cursor + offset out of the carried qs. + + +async def _events_page(request: Request, data_class: str, base_path: str) -> HTMLResponse: + """Full events/telemetry page (shared by /events and /telemetry).""" + templates = _get_templates() + operator = getattr(request.state, "operator", None) + csrf_token = getattr(request.state, "csrf_token", "") + params = request.query_params + + parsed, error, events, next_cursor, total, class_adapters = \ + await _events_query(request, data_class) + + # System map tiles + DISTINCT filter-option lists, scoped to this data_class. + pool = get_pool() + all_categories: list[str] = [] + all_event_types: list[str] = [] + async with pool.acquire() as conn: + system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system") + try: + cat_rows = await conn.fetch( + "SELECT DISTINCT category FROM events WHERE adapter = ANY($1) ORDER BY 1", + class_adapters, + ) + all_categories = [r["category"] for r in cat_rows] + et_rows = await conn.fetch( + "SELECT DISTINCT split_part(category, '.', 1) AS et FROM events " + "WHERE adapter = ANY($1) ORDER BY 1", + class_adapters, + ) + all_event_types = [r["et"] for r in et_rows] + except Exception: + logger.warning("Failed to load filter options", exc_info=True) + + tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png" + tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap" + + pagination = _build_pagination(total, (parsed or {}).get("offset") or 0, + (parsed or {}).get("limit") or 50) + adapters_flat, adapters_grouped = _adapter_filter_options(data_class) + active_pills = _build_active_pills(parsed or {}, len(adapters_flat)) if parsed else [] query_string = urlencode([(k, v) for k, v in _query_items(params) if k not in ("cursor", "offset")]) @@ -3327,6 +3359,7 @@ async def events_list(request: Request) -> HTMLResponse: context={ "operator": operator, "csrf_token": csrf_token, + "base_path": base_path, "events": events, "next_cursor": next_cursor, "pagination": pagination, @@ -3340,42 +3373,23 @@ async def events_list(request: Request) -> HTMLResponse: "severity_order": SEVERITY_ORDER, "time_presets": [(t, TIME_PRESET_LABELS[t]) for t in ("last_15m", "last_1h", "last_6h", "last_24h", "last_7d", "active", "all")], - "filter_state": filter_state, + "filter_state": _events_filter_state(parsed, params), "active_pills": active_pills, "query_string": query_string, }, ) -@router.get("/events/rows", response_class=HTMLResponse) -async def events_rows(request: Request) -> HTMLResponse: - """HTMX fragment: events table rows only (no page chrome).""" +async def _events_rows_fragment(request: Request, data_class: str, base_path: str) -> HTMLResponse: + """HTMX rows fragment (shared by /events/rows and /telemetry/rows).""" templates = _get_templates() - params = request.query_params - # Parse parameters (same GUI default as the page). - parsed, error = _parse_events_params(params, default_time=DEFAULT_TIME, default_offset=0) - - events = [] - next_cursor = None - total = 0 - if error: - pass - else: - result = await _fetch_events(parsed) - if result.error: - error = result.error - else: - events = result.events - next_cursor = result.next_cursor - total = result.total or 0 - - _decorate_table_events(events) + parsed, error, events, next_cursor, total, _ = await _events_query(request, data_class) pagination = _build_pagination(total, (parsed or {}).get("offset") or 0, (parsed or {}).get("limit") or 50) - adapters_flat, _ = _adapter_filter_options() + adapters_flat, _ = _adapter_filter_options(data_class) active_pills = _build_active_pills(parsed or {}, len(adapters_flat)) if parsed else [] query_string = urlencode([(k, v) for k, v in _query_items(params) if k not in ("cursor", "offset")]) @@ -3384,6 +3398,7 @@ async def events_rows(request: Request) -> HTMLResponse: request=request, name="_events_rows.html", context={ + "base_path": base_path, "events": events, "next_cursor": next_cursor, "pagination": pagination, @@ -3393,7 +3408,28 @@ async def events_rows(request: Request) -> HTMLResponse: "oob_pills": True, # emit an out-of-band #active-pills update on swap }, ) - # Push the bookmarkable full-page URL (not the /events/rows fragment path), - # so back/forward + bookmarking land on the same filtered view. - response.headers["HX-Push-Url"] = "/events?" + str(request.url.query) + # Push the bookmarkable full-page URL (not the fragment path). + response.headers["HX-Push-Url"] = base_path + "?" + str(request.url.query) return response + + +@router.get("/events", response_class=HTMLResponse) +async def events_list(request: Request) -> HTMLResponse: + """Events feed page (data_class=event): filter form, table, and map.""" + return await _events_page(request, "event", "/events") + + +@router.get("/events/rows", response_class=HTMLResponse) +async def events_rows(request: Request) -> HTMLResponse: + return await _events_rows_fragment(request, "event", "/events") + + +@router.get("/telemetry", response_class=HTMLResponse) +async def telemetry_list(request: Request) -> HTMLResponse: + """Telemetry feed page (data_class=telemetry; e.g. NWIS). Same shape as /events.""" + return await _events_page(request, "telemetry", "/telemetry") + + +@router.get("/telemetry/rows", response_class=HTMLResponse) +async def telemetry_rows(request: Request) -> HTMLResponse: + return await _events_rows_fragment(request, "telemetry", "/telemetry") diff --git a/src/central/gui/templates/_events_rows.html b/src/central/gui/templates/_events_rows.html index 8038b8f..f5edeea 100644 --- a/src/central/gui/templates/_events_rows.html +++ b/src/central/gui/templates/_events_rows.html @@ -72,20 +72,21 @@ {# Real offset paginator (v0.7.3). Each link carries offset + the filter query_string (which excludes cursor/offset); limit persists via query_string. #} -{% macro page_link(off, label, cls, extra) %} +{# base_path passed into the macro (macros don't see render-context vars). #} +{% macro page_link(base, off, label, cls) %} {% set qs = "offset=" ~ off ~ ("&" ~ query_string if query_string else "") %} -{{ label }} +{{ label }} {% endmacro %}