mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
v0.10.5: dashboard Re-send recent events button with time-window selector (operator-controlled republish across all streams) (#91)
Closes #91 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7fa4f36e46
commit
93f403a656
8 changed files with 718 additions and 0 deletions
|
|
@ -31,6 +31,16 @@ def get_js() -> JetStreamContext | None:
|
|||
return _js
|
||||
|
||||
|
||||
def get_nc() -> nats.NATS | None:
|
||||
"""Get the raw NATS connection. Returns None if not connected.
|
||||
|
||||
Used by v0.10.5 ``central.gui.resend`` for non-JetStream meta-event
|
||||
publishes (audit log on ``central.meta.action.resend``). Stream-bound
|
||||
publishes still go via :func:`get_js`.
|
||||
"""
|
||||
return _nc
|
||||
|
||||
|
||||
async def close_nats() -> None:
|
||||
"""Close the NATS connection."""
|
||||
global _nc, _js
|
||||
|
|
|
|||
270
src/central/gui/resend.py
Normal file
270
src/central/gui/resend.py
Normal file
|
|
@ -0,0 +1,270 @@
|
|||
"""v0.10.5 — operator-controlled re-publish of recent events.
|
||||
|
||||
The dashboard's "Re-send recent events" card lets an operator pick a time
|
||||
window (5 minutes → 24 hours), preview the count of messages that would be
|
||||
re-sent across every event-bearing JetStream stream, then confirm to
|
||||
re-publish them.
|
||||
|
||||
Each replayed message keeps its original subject and raw byte payload
|
||||
(CloudEvents envelope unchanged) but receives a new ``Nats-Msg-Id`` of the
|
||||
form ``{original}:resend:{ts_epoch_ms}`` so JetStream's per-stream
|
||||
deduplication window doesn't silently drop the replay. Consumers with
|
||||
``deliver_policy=new`` see the messages as fresh; the archive UPSERTs on
|
||||
``(id, time)`` so the events table doesn't grow.
|
||||
|
||||
The supervisor's publish-time monitoring-area bbox filter (v0.10.2) is NOT
|
||||
applied here -- the operator is intentionally replaying messages that
|
||||
already passed through it on their original publish.
|
||||
|
||||
Stream set is derived from ``central.streams.STREAMS`` -- only the
|
||||
``event_bearing=True`` entries are touched; ``CENTRAL_META`` is excluded
|
||||
deliberately so audit/status messages aren't re-broadcast.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import nats
|
||||
from nats.errors import TimeoutError as NatsTimeoutError
|
||||
from nats.js import JetStreamContext
|
||||
from nats.js.api import AckPolicy, ConsumerConfig, DeliverPolicy
|
||||
from nats.js.errors import NotFoundError
|
||||
|
||||
from central.streams import STREAMS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Pull-fetch tuning. The ephemeral consumer's inactive_threshold guarantees
|
||||
# JetStream auto-cleans the temp consumer if anything kills our iterator.
|
||||
_FETCH_BATCH = 200
|
||||
_FETCH_TIMEOUT_S = 2.0
|
||||
_INACTIVE_THRESHOLD_NS = int(30e9)
|
||||
|
||||
# Hard cap per stream per operation. 24h * worst-case CENTRAL_TRAFFIC_FLOW
|
||||
# volume is still well under this; bump if a legitimate operator action
|
||||
# ever hits it.
|
||||
_MAX_MSGS_PER_STREAM = 50_000
|
||||
|
||||
# Audit-log meta subject. CENTRAL_META filter (`central.meta.>`) already
|
||||
# captures it; archive does NOT consume CENTRAL_META.
|
||||
_AUDIT_SUBJECT = "central.meta.action.resend"
|
||||
|
||||
# Operator-facing time-window dropdown. Keys are minutes posted by the GUI;
|
||||
# values are the labels shown to the operator. Adding a window: one tuple.
|
||||
TIME_WINDOWS: list[tuple[int, str]] = [
|
||||
(5, "5 minutes"),
|
||||
(30, "30 minutes"),
|
||||
(60, "1 hour"),
|
||||
(180, "3 hours"),
|
||||
(360, "6 hours"),
|
||||
(720, "12 hours"),
|
||||
(1440, "24 hours"),
|
||||
]
|
||||
|
||||
|
||||
def _event_bearing_streams():
|
||||
"""Replay set = STREAMS minus CENTRAL_META (status-only, never replayed)."""
|
||||
return [s for s in STREAMS if s.event_bearing]
|
||||
|
||||
|
||||
def is_valid_window(minutes: int) -> bool:
|
||||
"""Reject any minute value not in the locked dropdown set."""
|
||||
return any(m == minutes for m, _ in TIME_WINDOWS)
|
||||
|
||||
|
||||
def window_label(minutes: int) -> str:
|
||||
"""Map a minute value back to its operator-facing label."""
|
||||
for m, label in TIME_WINDOWS:
|
||||
if m == minutes:
|
||||
return label
|
||||
return f"{minutes} minutes"
|
||||
|
||||
|
||||
async def _iter_window(
|
||||
js: JetStreamContext,
|
||||
stream_name: str,
|
||||
subject_filter: str,
|
||||
cutoff: datetime,
|
||||
):
|
||||
"""Yield each NATS message in ``stream_name`` since ``cutoff``.
|
||||
|
||||
Uses an ephemeral pull-consumer (``durable=None``, ``ack_policy=NONE``,
|
||||
``inactive_threshold=30s``) with ``DeliverPolicy.BY_START_TIME`` so the
|
||||
JetStream server filters server-side and we never paginate over the full
|
||||
stream history.
|
||||
"""
|
||||
config = ConsumerConfig(
|
||||
deliver_policy=DeliverPolicy.BY_START_TIME,
|
||||
opt_start_time=cutoff.isoformat(),
|
||||
ack_policy=AckPolicy.NONE,
|
||||
inactive_threshold=_INACTIVE_THRESHOLD_NS,
|
||||
filter_subject=subject_filter,
|
||||
)
|
||||
try:
|
||||
sub = await js.pull_subscribe(
|
||||
subject_filter,
|
||||
durable=None,
|
||||
stream=stream_name,
|
||||
config=config,
|
||||
)
|
||||
except NotFoundError:
|
||||
# Stream doesn't exist (fresh dev box) -- treat as empty.
|
||||
return
|
||||
|
||||
yielded = 0
|
||||
try:
|
||||
while yielded < _MAX_MSGS_PER_STREAM:
|
||||
try:
|
||||
msgs = await sub.fetch(batch=_FETCH_BATCH, timeout=_FETCH_TIMEOUT_S)
|
||||
except (NatsTimeoutError, asyncio.TimeoutError, TimeoutError):
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("resend: fetch error", extra={"stream": stream_name})
|
||||
break
|
||||
if not msgs:
|
||||
break
|
||||
for msg in msgs:
|
||||
yielded += 1
|
||||
yield msg
|
||||
if yielded >= _MAX_MSGS_PER_STREAM:
|
||||
break
|
||||
finally:
|
||||
try:
|
||||
await sub.unsubscribe()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def preview_resend(js: JetStreamContext, minutes: int) -> dict[str, Any]:
|
||||
"""Count messages per event-bearing stream within the last ``minutes``.
|
||||
|
||||
Streams that error out are reported with ``None`` in ``by_stream`` and
|
||||
``errors`` incremented; the preview never raises.
|
||||
"""
|
||||
if minutes <= 0 or not is_valid_window(minutes):
|
||||
return {"count": 0, "by_stream": {}, "minutes": minutes,
|
||||
"window_label": window_label(minutes), "errors": 0}
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
|
||||
by_stream: dict[str, int | None] = {}
|
||||
total = 0
|
||||
errors = 0
|
||||
for s in _event_bearing_streams():
|
||||
try:
|
||||
n = 0
|
||||
async for _ in _iter_window(js, s.name, s.subject_filter, cutoff):
|
||||
n += 1
|
||||
by_stream[s.name] = n
|
||||
total += n
|
||||
except Exception:
|
||||
logger.exception("resend preview failed", extra={"stream": s.name})
|
||||
by_stream[s.name] = None
|
||||
errors += 1
|
||||
return {
|
||||
"count": total,
|
||||
"by_stream": by_stream,
|
||||
"minutes": minutes,
|
||||
"window_label": window_label(minutes),
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
|
||||
async def execute_resend(
|
||||
js: JetStreamContext,
|
||||
nc: nats.NATS | None,
|
||||
minutes: int,
|
||||
operator: str,
|
||||
) -> dict[str, Any]:
|
||||
"""Re-publish each message in the last ``minutes`` across event-bearing streams.
|
||||
|
||||
Each republish gets a new ``Nats-Msg-Id = {original}:resend:{ts_ms}`` so
|
||||
JetStream's dedup window doesn't drop it. Emits a meta-event on
|
||||
``central.meta.action.resend`` after the wave completes (success OR
|
||||
partial). Audit-log publish failures are logged but never sink the
|
||||
operator-visible result.
|
||||
"""
|
||||
if minutes <= 0 or not is_valid_window(minutes):
|
||||
return {"published": 0, "errors": 0, "elapsed_s": 0.0, "by_stream": {},
|
||||
"window_label": window_label(minutes)}
|
||||
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
|
||||
started_at = datetime.now(timezone.utc).isoformat()
|
||||
started_mono = time.monotonic()
|
||||
ts_ms = int(time.time() * 1000)
|
||||
|
||||
published = 0
|
||||
errors = 0
|
||||
by_stream: dict[str, dict[str, int]] = {}
|
||||
|
||||
for s in _event_bearing_streams():
|
||||
n_ok = 0
|
||||
n_err = 0
|
||||
try:
|
||||
async for msg in _iter_window(js, s.name, s.subject_filter, cutoff):
|
||||
hdr = msg.headers or {}
|
||||
orig = hdr.get("Nats-Msg-Id") or hdr.get("nats-msg-id")
|
||||
if orig:
|
||||
new_id = f"{orig}:resend:{ts_ms}"
|
||||
else:
|
||||
# Older messages without a dedup header still get a unique
|
||||
# resend id so JetStream doesn't drop them.
|
||||
new_id = f"resend:{ts_ms}:{s.name}:{n_ok}"
|
||||
try:
|
||||
await js.publish(
|
||||
msg.subject, msg.data,
|
||||
headers={"Nats-Msg-Id": new_id},
|
||||
)
|
||||
n_ok += 1
|
||||
except Exception:
|
||||
n_err += 1
|
||||
logger.exception(
|
||||
"resend: republish failed",
|
||||
extra={"subject": msg.subject, "stream": s.name},
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("resend: stream iteration failed",
|
||||
extra={"stream": s.name})
|
||||
n_err += 1
|
||||
by_stream[s.name] = {"published": n_ok, "errors": n_err}
|
||||
published += n_ok
|
||||
errors += n_err
|
||||
|
||||
elapsed = round(time.monotonic() - started_mono, 3)
|
||||
finished_at = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
meta = {
|
||||
"operator": operator,
|
||||
"window_minutes": minutes,
|
||||
"count": published,
|
||||
"errors": errors,
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at,
|
||||
"elapsed_s": elapsed,
|
||||
"by_stream": by_stream,
|
||||
}
|
||||
if nc is not None:
|
||||
try:
|
||||
await nc.publish(_AUDIT_SUBJECT, json.dumps(meta).encode())
|
||||
except Exception:
|
||||
logger.exception("resend: audit-log publish failed")
|
||||
else:
|
||||
logger.warning("resend: no NATS connection for audit-log meta-event")
|
||||
|
||||
logger.info(
|
||||
"resend wave complete",
|
||||
extra={"operator": operator, "window_minutes": minutes,
|
||||
"published": published, "errors": errors, "elapsed_s": elapsed},
|
||||
)
|
||||
|
||||
return {
|
||||
"published": published,
|
||||
"errors": errors,
|
||||
"elapsed_s": elapsed,
|
||||
"by_stream": by_stream,
|
||||
"window_label": window_label(minutes),
|
||||
}
|
||||
|
|
@ -294,6 +294,96 @@ async def dashboard_polls(request: Request) -> HTMLResponse:
|
|||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# v0.10.5 — operator-controlled "Re-send recent events" card
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _resend_card(request: Request, error: str | None = None) -> HTMLResponse:
|
||||
"""Render the initial-state card. Used by /card, /preview, and /resend on
|
||||
any precondition failure (invalid window, NATS down)."""
|
||||
from central.gui.resend import TIME_WINDOWS
|
||||
|
||||
return _get_templates().TemplateResponse(
|
||||
request=request, name="_resend_card.html",
|
||||
context={
|
||||
"windows": TIME_WINDOWS,
|
||||
"csrf_token": getattr(request.state, "csrf_token", ""),
|
||||
"error": error,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/actions/resend/card", response_class=HTMLResponse)
|
||||
async def actions_resend_card(request: Request) -> HTMLResponse:
|
||||
"""Initial state of the 'Re-send recent events' card.
|
||||
|
||||
Served at dashboard load AND on Cancel from the confirmation fragment."""
|
||||
return _resend_card(request)
|
||||
|
||||
|
||||
@router.get("/actions/resend/preview", response_class=HTMLResponse)
|
||||
async def actions_resend_preview(request: Request, minutes: int = 0) -> HTMLResponse:
|
||||
"""Show the count of messages about to be re-sent in the chosen window."""
|
||||
from central.gui.nats import get_js
|
||||
from central.gui.resend import is_valid_window, preview_resend
|
||||
|
||||
if not is_valid_window(minutes):
|
||||
return _resend_card(request, error="Time window not recognised.")
|
||||
js = get_js()
|
||||
if js is None:
|
||||
return _resend_card(request, error="Stream backbone unavailable. Try again in a moment.")
|
||||
|
||||
result = await preview_resend(js, minutes)
|
||||
return _get_templates().TemplateResponse(
|
||||
request=request, name="_resend_confirm.html",
|
||||
context={
|
||||
"csrf_token": getattr(request.state, "csrf_token", ""),
|
||||
"minutes": minutes,
|
||||
"window_label": result["window_label"],
|
||||
"count": result["count"],
|
||||
"by_stream": result["by_stream"],
|
||||
"preview_errors": result["errors"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.post("/actions/resend", response_class=HTMLResponse)
|
||||
async def actions_resend_execute(request: Request, minutes: int = 0) -> HTMLResponse:
|
||||
"""Execute the re-send wave. CSRF-validated via the form field."""
|
||||
from central.gui.nats import get_js, get_nc
|
||||
from central.gui.resend import execute_resend, is_valid_window
|
||||
|
||||
csrf_token = getattr(request.state, "csrf_token", "")
|
||||
form = await request.form()
|
||||
form_csrf = form.get("csrf_token", "")
|
||||
if not form_csrf or form_csrf != csrf_token:
|
||||
raise CsrfValidationError("Invalid CSRF token")
|
||||
if not is_valid_window(minutes):
|
||||
return _resend_card(request, error="Time window not recognised.")
|
||||
js = get_js()
|
||||
if js is None:
|
||||
return _resend_card(request, error="Stream backbone unavailable. Try again in a moment.")
|
||||
|
||||
operator = getattr(request.state, "operator", None)
|
||||
operator_name = (
|
||||
operator.username if operator and hasattr(operator, "username") else "unknown"
|
||||
)
|
||||
result = await execute_resend(js, get_nc(), minutes, operator_name)
|
||||
return _get_templates().TemplateResponse(
|
||||
request=request, name="_resend_success.html",
|
||||
context={
|
||||
"csrf_token": csrf_token,
|
||||
"minutes": minutes,
|
||||
"window_label": result["window_label"],
|
||||
"published": result["published"],
|
||||
"errors": result["errors"],
|
||||
"elapsed_s": result["elapsed_s"],
|
||||
"by_stream": result["by_stream"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Setup Wizard routes
|
||||
# =============================================================================
|
||||
|
|
|
|||
23
src/central/gui/templates/_resend_card.html
Normal file
23
src/central/gui/templates/_resend_card.html
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
<article id="resend-card">
|
||||
<header>Re-send recent events</header>
|
||||
<p>Replay messages from the last few minutes or hours so any consumer that
|
||||
missed them can pick them up. Archived records collapse onto the same row
|
||||
— re-sending does not duplicate history.</p>
|
||||
{% if error %}<p role="alert"><strong>{{ error }}</strong></p>{% endif %}
|
||||
<form>
|
||||
<label for="resend-minutes">Time window
|
||||
<select name="minutes" id="resend-minutes">
|
||||
{% for m, label in windows %}
|
||||
<option value="{{ m }}"{% if m == 60 %} selected{% endif %}>{{ label }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</label>
|
||||
<button type="button"
|
||||
hx-get="/actions/resend/preview"
|
||||
hx-include="#resend-minutes"
|
||||
hx-target="#resend-card"
|
||||
hx-swap="outerHTML">
|
||||
Re-send
|
||||
</button>
|
||||
</form>
|
||||
</article>
|
||||
33
src/central/gui/templates/_resend_confirm.html
Normal file
33
src/central/gui/templates/_resend_confirm.html
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
<article id="resend-card">
|
||||
<header>About to re-send {{ count }} event{{ '' if count == 1 else 's' }} from the last {{ window_label }}</header>
|
||||
{% if preview_errors %}
|
||||
<p role="alert"><strong>Note:</strong> some streams could not be read; the count may be incomplete.</p>
|
||||
{% endif %}
|
||||
<details>
|
||||
<summary>By stream</summary>
|
||||
<ul>
|
||||
{% for stream, n in by_stream.items() %}
|
||||
<li><strong>{{ stream }}</strong>:
|
||||
{% if n is none %}unavailable{% else %}{{ n }}{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</details>
|
||||
<p>Re-send these events?</p>
|
||||
<div role="group">
|
||||
<button type="button"
|
||||
hx-post="/actions/resend?minutes={{ minutes }}"
|
||||
hx-vals='{"csrf_token": "{{ csrf_token }}"}'
|
||||
hx-target="#resend-card"
|
||||
hx-swap="outerHTML">
|
||||
Yes, re-send {{ count }}
|
||||
</button>
|
||||
<button type="button"
|
||||
class="secondary"
|
||||
hx-get="/actions/resend/card"
|
||||
hx-target="#resend-card"
|
||||
hx-swap="outerHTML">
|
||||
Cancel
|
||||
</button>
|
||||
</div>
|
||||
</article>
|
||||
23
src/central/gui/templates/_resend_success.html
Normal file
23
src/central/gui/templates/_resend_success.html
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
<article id="resend-card">
|
||||
<header>Re-sent {{ published }} event{{ '' if published == 1 else 's' }} from the last {{ window_label }}</header>
|
||||
<p>Completed in {{ elapsed_s }} second{{ '' if elapsed_s == 1 else 's' }}{% if errors %}
|
||||
— <strong>{{ errors }} failed</strong> and were not re-sent. See the
|
||||
operator log for details.{% else %}.{% endif %}</p>
|
||||
<details>
|
||||
<summary>By stream</summary>
|
||||
<ul>
|
||||
{% for stream, info in by_stream.items() %}
|
||||
<li><strong>{{ stream }}</strong>:
|
||||
{{ info.published }} sent{% if info.errors %},
|
||||
{{ info.errors }} failed{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</details>
|
||||
<button type="button"
|
||||
hx-get="/actions/resend/card"
|
||||
hx-target="#resend-card"
|
||||
hx-swap="outerHTML">
|
||||
Re-send another window
|
||||
</button>
|
||||
</article>
|
||||
|
|
@ -4,6 +4,9 @@
|
|||
|
||||
{% block content %}
|
||||
<h1>Dashboard</h1>
|
||||
<div id="resend-card" hx-get="/actions/resend/card" hx-trigger="load" hx-swap="outerHTML">
|
||||
Loading...
|
||||
</div>
|
||||
<div class="cols">
|
||||
<article>
|
||||
<header>Events (24h)</header>
|
||||
|
|
|
|||
266
tests/test_resend.py
Normal file
266
tests/test_resend.py
Normal file
|
|
@ -0,0 +1,266 @@
|
|||
"""Tests for v0.10.5 'Re-send recent events' (resend.py + routes).
|
||||
|
||||
Backend: preview_resend counts per stream; execute_resend republishes with
|
||||
the suffix-style Nats-Msg-Id; per-message failures don't sink the batch;
|
||||
audit-log meta-event is emitted on completion.
|
||||
|
||||
Frontend: the dashboard renders the new card; preview returns the
|
||||
confirmation fragment with the count; POST validates CSRF and returns the
|
||||
success fragment.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from central.gui import resend as resend_mod
|
||||
from central.gui.resend import (
|
||||
TIME_WINDOWS,
|
||||
execute_resend,
|
||||
is_valid_window,
|
||||
preview_resend,
|
||||
window_label,
|
||||
)
|
||||
|
||||
|
||||
# --- helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _mk_msg(subject: str, data: bytes = b'{"data":{"x":1}}',
|
||||
headers: dict | None = None):
|
||||
msg = MagicMock()
|
||||
msg.subject = subject
|
||||
msg.data = data
|
||||
msg.headers = headers if headers is not None else {"Nats-Msg-Id": subject}
|
||||
return msg
|
||||
|
||||
|
||||
def _mk_js(per_stream_msgs: dict[str, list]) -> MagicMock:
|
||||
"""JS mock whose pull_subscribe yields a per-stream message list.
|
||||
|
||||
The fetch sequence returns the list once, then empty (terminates the
|
||||
iterator). publish is captured for assertions; unsubscribe is no-op.
|
||||
"""
|
||||
js = MagicMock()
|
||||
captured_publishes: list[tuple[str, bytes, dict]] = []
|
||||
|
||||
async def _publish(subject, data, headers=None):
|
||||
captured_publishes.append((subject, data, dict(headers or {})))
|
||||
|
||||
js.publish = AsyncMock(side_effect=_publish)
|
||||
js._captured = captured_publishes
|
||||
|
||||
async def _pull_subscribe(filter_subj, durable=None, stream=None, config=None):
|
||||
sub = MagicMock()
|
||||
msgs = list(per_stream_msgs.get(stream, []))
|
||||
calls = {"n": 0}
|
||||
|
||||
async def _fetch(batch=200, timeout=2.0):
|
||||
if calls["n"] == 0:
|
||||
calls["n"] += 1
|
||||
return msgs
|
||||
return []
|
||||
|
||||
sub.fetch = AsyncMock(side_effect=_fetch)
|
||||
sub.unsubscribe = AsyncMock()
|
||||
return sub
|
||||
|
||||
js.pull_subscribe = AsyncMock(side_effect=_pull_subscribe)
|
||||
return js
|
||||
|
||||
|
||||
# --- pure-config tests -------------------------------------------------------
|
||||
|
||||
|
||||
def test_time_windows_locked_set():
|
||||
"""The dropdown is the operator-facing source of truth -- nothing else
|
||||
should accept arbitrary minute values."""
|
||||
assert is_valid_window(60) is True
|
||||
assert is_valid_window(5) is True
|
||||
assert is_valid_window(1440) is True
|
||||
assert is_valid_window(0) is False
|
||||
assert is_valid_window(-1) is False
|
||||
assert is_valid_window(7) is False # off-list value
|
||||
assert is_valid_window(99999) is False
|
||||
|
||||
|
||||
def test_window_label_round_trips_dropdown():
|
||||
for m, label in TIME_WINDOWS:
|
||||
assert window_label(m) == label
|
||||
# Off-list falls back to the bare minute count (operator never sees this).
|
||||
assert window_label(7) == "7 minutes"
|
||||
|
||||
|
||||
# --- preview -----------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preview_counts_per_stream():
|
||||
js = _mk_js({
|
||||
"CENTRAL_FIRE": [_mk_msg("central.fire.incident.id.cassia") for _ in range(3)],
|
||||
"CENTRAL_TRAFFIC": [_mk_msg("central.traffic.incident.id") for _ in range(7)],
|
||||
})
|
||||
out = await preview_resend(js, minutes=60)
|
||||
assert out["count"] == 10
|
||||
assert out["by_stream"]["CENTRAL_FIRE"] == 3
|
||||
assert out["by_stream"]["CENTRAL_TRAFFIC"] == 7
|
||||
# Every event-bearing stream gets a key (zero when empty).
|
||||
assert out["by_stream"]["CENTRAL_WX"] == 0
|
||||
# CENTRAL_META is intentionally excluded -- never appears in the dict.
|
||||
assert "CENTRAL_META" not in out["by_stream"]
|
||||
assert out["window_label"] == "1 hour"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preview_rejects_invalid_window():
|
||||
js = _mk_js({})
|
||||
out = await preview_resend(js, minutes=7)
|
||||
assert out["count"] == 0
|
||||
assert out["by_stream"] == {}
|
||||
js.pull_subscribe.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preview_per_stream_error_does_not_sink_batch():
|
||||
"""A NATS error on one stream marks its count as None but the rest count."""
|
||||
js = _mk_js({"CENTRAL_FIRE": [_mk_msg("central.fire.x") for _ in range(2)]})
|
||||
|
||||
original = js.pull_subscribe.side_effect
|
||||
|
||||
async def _maybe_fail(filter_subj, durable=None, stream=None, config=None):
|
||||
if stream == "CENTRAL_TRAFFIC":
|
||||
raise RuntimeError("simulated stream-level NATS error")
|
||||
return await original(filter_subj, durable=durable, stream=stream, config=config)
|
||||
|
||||
js.pull_subscribe = AsyncMock(side_effect=_maybe_fail)
|
||||
out = await preview_resend(js, minutes=60)
|
||||
assert out["by_stream"]["CENTRAL_FIRE"] == 2
|
||||
assert out["by_stream"]["CENTRAL_TRAFFIC"] is None
|
||||
assert out["errors"] == 1
|
||||
# Total only counts streams that succeeded.
|
||||
assert out["count"] == 2
|
||||
|
||||
|
||||
# --- execute -----------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_replays_with_suffix_msg_id():
|
||||
"""Each republish keeps subject + data, gets new {orig}:resend:{ts} msg id."""
|
||||
msgs = [
|
||||
_mk_msg("central.fire.incident.id.cassia",
|
||||
data=b'{"data":{"name":"Summit Creek"}}',
|
||||
headers={"Nats-Msg-Id": "wfigs_incidents:cassia:1"}),
|
||||
_mk_msg("central.fire.incident.id.owyhee",
|
||||
data=b'{"data":{"name":"Blue Ridge"}}',
|
||||
headers={"Nats-Msg-Id": "wfigs_incidents:owyhee:2"}),
|
||||
]
|
||||
js = _mk_js({"CENTRAL_FIRE": msgs})
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock()
|
||||
out = await execute_resend(js, nc, minutes=60, operator="matt")
|
||||
|
||||
assert out["published"] == 2
|
||||
assert out["errors"] == 0
|
||||
pubs = js._captured
|
||||
# Subject + data preserved byte-for-byte
|
||||
assert pubs[0][0] == "central.fire.incident.id.cassia"
|
||||
assert pubs[0][1] == b'{"data":{"name":"Summit Creek"}}'
|
||||
# New msg id is {orig}:resend:{ts_ms} -- avoids JetStream dedup window.
|
||||
assert pubs[0][2]["Nats-Msg-Id"].startswith("wfigs_incidents:cassia:1:resend:")
|
||||
assert pubs[1][2]["Nats-Msg-Id"].startswith("wfigs_incidents:owyhee:2:resend:")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_emits_audit_log_meta_event():
|
||||
js = _mk_js({"CENTRAL_FIRE": [_mk_msg("central.fire.x") for _ in range(3)]})
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock()
|
||||
await execute_resend(js, nc, minutes=60, operator="matt")
|
||||
nc.publish.assert_awaited_once()
|
||||
subject, payload = nc.publish.await_args.args
|
||||
assert subject == "central.meta.action.resend"
|
||||
meta = json.loads(payload.decode())
|
||||
assert meta["operator"] == "matt"
|
||||
assert meta["window_minutes"] == 60
|
||||
assert meta["count"] == 3
|
||||
assert meta["errors"] == 0
|
||||
assert "started_at" in meta and "finished_at" in meta
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_per_message_failure_does_not_sink_batch():
|
||||
"""One bad publish counts as an error but the rest still ship."""
|
||||
msgs = [_mk_msg(f"central.fire.x.{i}",
|
||||
headers={"Nats-Msg-Id": f"id-{i}"}) for i in range(3)]
|
||||
js = _mk_js({"CENTRAL_FIRE": msgs})
|
||||
|
||||
calls = {"n": 0}
|
||||
|
||||
async def _flaky_publish(subject, data, headers=None):
|
||||
calls["n"] += 1
|
||||
if calls["n"] == 2:
|
||||
raise RuntimeError("simulated NATS publish error")
|
||||
|
||||
js.publish = AsyncMock(side_effect=_flaky_publish)
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock()
|
||||
|
||||
out = await execute_resend(js, nc, minutes=60, operator="matt")
|
||||
assert out["published"] == 2
|
||||
assert out["errors"] == 1
|
||||
assert out["by_stream"]["CENTRAL_FIRE"]["published"] == 2
|
||||
assert out["by_stream"]["CENTRAL_FIRE"]["errors"] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_handles_message_with_no_original_msg_id():
|
||||
"""Older publishes might lack Nats-Msg-Id -- we still mint a unique id."""
|
||||
msg = _mk_msg("central.fire.x", headers={})
|
||||
js = _mk_js({"CENTRAL_FIRE": [msg]})
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock()
|
||||
await execute_resend(js, nc, minutes=60, operator="matt")
|
||||
new_id = js._captured[0][2]["Nats-Msg-Id"]
|
||||
assert new_id.startswith("resend:") and "CENTRAL_FIRE" in new_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_audit_log_failure_does_not_sink_result():
|
||||
"""nc.publish failure is logged and swallowed; published count still returns."""
|
||||
js = _mk_js({"CENTRAL_FIRE": [_mk_msg("central.fire.x")]})
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock(side_effect=RuntimeError("audit publish failed"))
|
||||
out = await execute_resend(js, nc, minutes=60, operator="matt")
|
||||
assert out["published"] == 1
|
||||
assert out["errors"] == 0 # audit failure is logged-only, not counted
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_rejects_invalid_window():
|
||||
js = _mk_js({})
|
||||
nc = MagicMock()
|
||||
nc.publish = AsyncMock()
|
||||
out = await execute_resend(js, nc, minutes=7, operator="matt")
|
||||
assert out["published"] == 0
|
||||
js.pull_subscribe.assert_not_called()
|
||||
nc.publish.assert_not_called()
|
||||
|
||||
|
||||
# --- stream-set safety -------------------------------------------------------
|
||||
|
||||
|
||||
def test_central_meta_excluded_from_replay_set():
|
||||
"""CENTRAL_META is status-only; replaying it would broadcast stale audit
|
||||
records back through archive's consumers."""
|
||||
names = [s.name for s in resend_mod._event_bearing_streams()]
|
||||
assert "CENTRAL_META" not in names
|
||||
# Sanity: the 9 event-bearing streams are present.
|
||||
for expected in ("CENTRAL_FIRE", "CENTRAL_TRAFFIC", "CENTRAL_WX",
|
||||
"CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_DISASTER",
|
||||
"CENTRAL_HYDRO", "CENTRAL_TRAFFIC_FLOW",
|
||||
"CENTRAL_TRAFFIC_CAMERAS"):
|
||||
assert expected in names
|
||||
Loading…
Add table
Add a link
Reference in a new issue