central/tests/test_resend.py

453 lines
17 KiB
Python
Raw Normal View History

"""Tests for v0.10.5 'Re-send recent events' (resend.py + routes).
Backend: preview_resend counts per stream; execute_resend republishes with
the suffix-style Nats-Msg-Id; per-message failures don't sink the batch;
audit-log meta-event is emitted on completion.
Frontend: the dashboard renders the new card; preview returns the
confirmation fragment with the count; POST validates CSRF and returns the
success fragment.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.gui import resend as resend_mod
from central.gui.resend import (
TIME_WINDOWS,
execute_resend,
is_valid_window,
preview_resend,
window_label,
)
# --- helpers -----------------------------------------------------------------
def _mk_msg(subject: str, data: bytes = b'{"data":{"x":1}}',
headers: dict | None = None, stream_seq: int = 0):
msg = MagicMock()
msg.subject = subject
msg.data = data
msg.headers = headers if headers is not None else {"Nats-Msg-Id": subject}
# Mirror nats-py's Msg.metadata.sequence.stream so the v0.10.5.2 snapshot
# filter has a concrete int to compare against (default 0 stays well
# below the high default snapshot in _mk_js so no existing test gets
# filtered out).
msg.metadata.sequence.stream = stream_seq
return msg
def _mk_js(per_stream_msgs: dict[str, list]) -> MagicMock:
"""JS mock whose pull_subscribe yields a per-stream message list.
The fetch sequence returns the list once, then empty (terminates the
iterator). publish is captured for assertions; unsubscribe is no-op.
stream_info returns a snapshot that's intentionally far above any test
message's stream_seq so the v0.10.5.2 boundary check is a no-op for
every existing test -- only the dedicated regression tests below
exercise the filter explicitly.
"""
js = MagicMock()
captured_publishes: list[tuple[str, bytes, dict]] = []
async def _publish(subject, data, headers=None):
captured_publishes.append((subject, data, dict(headers or {})))
js.publish = AsyncMock(side_effect=_publish)
js._captured = captured_publishes
async def _stream_info(name):
info = MagicMock()
info.state.last_seq = 10**12
return info
js.stream_info = AsyncMock(side_effect=_stream_info)
async def _pull_subscribe(filter_subj, durable=None, stream=None, config=None):
sub = MagicMock()
msgs = list(per_stream_msgs.get(stream, []))
calls = {"n": 0}
async def _fetch(batch=200, timeout=2.0):
if calls["n"] == 0:
calls["n"] += 1
return msgs
return []
sub.fetch = AsyncMock(side_effect=_fetch)
sub.unsubscribe = AsyncMock()
return sub
js.pull_subscribe = AsyncMock(side_effect=_pull_subscribe)
return js
# --- pure-config tests -------------------------------------------------------
def test_time_windows_locked_set():
"""The dropdown is the operator-facing source of truth -- nothing else
should accept arbitrary minute values."""
assert is_valid_window(60) is True
assert is_valid_window(5) is True
assert is_valid_window(1440) is True
assert is_valid_window(0) is False
assert is_valid_window(-1) is False
assert is_valid_window(7) is False # off-list value
assert is_valid_window(99999) is False
def test_window_label_round_trips_dropdown():
for m, label in TIME_WINDOWS:
assert window_label(m) == label
# Off-list falls back to the bare minute count (operator never sees this).
assert window_label(7) == "7 minutes"
# --- preview -----------------------------------------------------------------
@pytest.mark.asyncio
async def test_preview_counts_per_stream():
js = _mk_js({
"CENTRAL_FIRE": [_mk_msg("central.fire.incident.id.cassia") for _ in range(3)],
"CENTRAL_TRAFFIC": [_mk_msg("central.traffic.incident.id") for _ in range(7)],
})
out = await preview_resend(js, minutes=60)
assert out["count"] == 10
assert out["by_stream"]["CENTRAL_FIRE"] == 3
assert out["by_stream"]["CENTRAL_TRAFFIC"] == 7
# Every event-bearing stream gets a key (zero when empty).
assert out["by_stream"]["CENTRAL_WX"] == 0
# CENTRAL_META is intentionally excluded -- never appears in the dict.
assert "CENTRAL_META" not in out["by_stream"]
assert out["window_label"] == "1 hour"
@pytest.mark.asyncio
async def test_preview_rejects_invalid_window():
js = _mk_js({})
out = await preview_resend(js, minutes=7)
assert out["count"] == 0
assert out["by_stream"] == {}
js.pull_subscribe.assert_not_called()
@pytest.mark.asyncio
async def test_preview_per_stream_error_does_not_sink_batch():
"""A NATS error on one stream marks its count as None but the rest count."""
js = _mk_js({"CENTRAL_FIRE": [_mk_msg("central.fire.x") for _ in range(2)]})
original = js.pull_subscribe.side_effect
async def _maybe_fail(filter_subj, durable=None, stream=None, config=None):
if stream == "CENTRAL_TRAFFIC":
raise RuntimeError("simulated stream-level NATS error")
return await original(filter_subj, durable=durable, stream=stream, config=config)
js.pull_subscribe = AsyncMock(side_effect=_maybe_fail)
out = await preview_resend(js, minutes=60)
assert out["by_stream"]["CENTRAL_FIRE"] == 2
assert out["by_stream"]["CENTRAL_TRAFFIC"] is None
assert out["errors"] == 1
# Total only counts streams that succeeded.
assert out["count"] == 2
# --- execute -----------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_replays_with_suffix_msg_id():
"""Each republish keeps subject + data, gets new {orig}:resend:{ts} msg id."""
msgs = [
_mk_msg("central.fire.incident.id.cassia",
data=b'{"data":{"name":"Summit Creek"}}',
headers={"Nats-Msg-Id": "wfigs_incidents:cassia:1"}),
_mk_msg("central.fire.incident.id.owyhee",
data=b'{"data":{"name":"Blue Ridge"}}',
headers={"Nats-Msg-Id": "wfigs_incidents:owyhee:2"}),
]
js = _mk_js({"CENTRAL_FIRE": msgs})
nc = MagicMock()
nc.publish = AsyncMock()
out = await execute_resend(js, nc, minutes=60, operator="matt")
assert out["published"] == 2
assert out["errors"] == 0
pubs = js._captured
# Subject + data preserved byte-for-byte
assert pubs[0][0] == "central.fire.incident.id.cassia"
assert pubs[0][1] == b'{"data":{"name":"Summit Creek"}}'
# New msg id is {orig}:resend:{ts_ms} -- avoids JetStream dedup window.
assert pubs[0][2]["Nats-Msg-Id"].startswith("wfigs_incidents:cassia:1:resend:")
assert pubs[1][2]["Nats-Msg-Id"].startswith("wfigs_incidents:owyhee:2:resend:")
@pytest.mark.asyncio
async def test_execute_emits_audit_log_meta_event():
js = _mk_js({"CENTRAL_FIRE": [_mk_msg("central.fire.x") for _ in range(3)]})
nc = MagicMock()
nc.publish = AsyncMock()
await execute_resend(js, nc, minutes=60, operator="matt")
nc.publish.assert_awaited_once()
subject, payload = nc.publish.await_args.args
assert subject == "central.meta.action.resend"
meta = json.loads(payload.decode())
assert meta["operator"] == "matt"
assert meta["window_minutes"] == 60
assert meta["count"] == 3
assert meta["errors"] == 0
assert "started_at" in meta and "finished_at" in meta
@pytest.mark.asyncio
async def test_execute_per_message_failure_does_not_sink_batch():
"""One bad publish counts as an error but the rest still ship."""
msgs = [_mk_msg(f"central.fire.x.{i}",
headers={"Nats-Msg-Id": f"id-{i}"}) for i in range(3)]
js = _mk_js({"CENTRAL_FIRE": msgs})
calls = {"n": 0}
async def _flaky_publish(subject, data, headers=None):
calls["n"] += 1
if calls["n"] == 2:
raise RuntimeError("simulated NATS publish error")
js.publish = AsyncMock(side_effect=_flaky_publish)
nc = MagicMock()
nc.publish = AsyncMock()
out = await execute_resend(js, nc, minutes=60, operator="matt")
assert out["published"] == 2
assert out["errors"] == 1
assert out["by_stream"]["CENTRAL_FIRE"]["published"] == 2
assert out["by_stream"]["CENTRAL_FIRE"]["errors"] == 1
@pytest.mark.asyncio
async def test_execute_handles_message_with_no_original_msg_id():
"""Older publishes might lack Nats-Msg-Id -- we still mint a unique id."""
msg = _mk_msg("central.fire.x", headers={})
js = _mk_js({"CENTRAL_FIRE": [msg]})
nc = MagicMock()
nc.publish = AsyncMock()
await execute_resend(js, nc, minutes=60, operator="matt")
new_id = js._captured[0][2]["Nats-Msg-Id"]
assert new_id.startswith("resend:") and "CENTRAL_FIRE" in new_id
@pytest.mark.asyncio
async def test_execute_audit_log_failure_does_not_sink_result():
"""nc.publish failure is logged and swallowed; published count still returns."""
js = _mk_js({"CENTRAL_FIRE": [_mk_msg("central.fire.x")]})
nc = MagicMock()
nc.publish = AsyncMock(side_effect=RuntimeError("audit publish failed"))
out = await execute_resend(js, nc, minutes=60, operator="matt")
assert out["published"] == 1
assert out["errors"] == 0 # audit failure is logged-only, not counted
@pytest.mark.asyncio
async def test_execute_rejects_invalid_window():
js = _mk_js({})
nc = MagicMock()
nc.publish = AsyncMock()
out = await execute_resend(js, nc, minutes=7, operator="matt")
assert out["published"] == 0
js.pull_subscribe.assert_not_called()
nc.publish.assert_not_called()
# --- ConsumerConfig regression guard (v0.10.5.1) -----------------------------
@pytest.mark.asyncio
async def test_pull_subscribe_inactive_threshold_within_nats_range():
"""v0.10.5.1 regression guard: ``inactive_threshold`` on the ephemeral
consumer must be a number nats-py can serialise as a Go ``time.Duration``.
v0.10.5 passed ``int(30e9)`` thinking it was nanoseconds. nats-py treats
the value as float SECONDS and multiplies by 1e9 internally, so the
server received 30e18 -- out of int64 range. NATS rejected the consumer
with ``err_code=10025``; preview_resend caught the exception per-stream
and returned 0 events across the board (silent verification failure).
Assert the captured config has:
- ``inactive_threshold`` in [1, 3600] seconds (operator sanity range)
- the nanosecond-equivalent (value * 1e9) fits within int64
The ``int64`` ceiling is 9_223_372_036_854_775_807 -- anything above that
triggers the same JSON unmarshal error that broke v0.10.5.
"""
captured_configs: list = []
async def _capture_config(filter_subj, durable=None, stream=None, config=None):
captured_configs.append(config)
sub = MagicMock()
sub.fetch = AsyncMock(return_value=[])
sub.unsubscribe = AsyncMock()
return sub
js = MagicMock()
js.pull_subscribe = AsyncMock(side_effect=_capture_config)
# v0.10.5.2: preview_resend now snapshots last_seq before iterating; give
# the bare mock a stream_info AsyncMock so we still reach pull_subscribe.
async def _stream_info(name):
info = MagicMock()
info.state.last_seq = 10**12
return info
js.stream_info = AsyncMock(side_effect=_stream_info)
await preview_resend(js, minutes=60)
INT64_MAX_NS = 9_223_372_036_854_775_807
assert captured_configs, "expected at least one pull_subscribe call"
for cfg in captured_configs:
threshold = cfg.inactive_threshold
assert threshold is not None, "inactive_threshold must be set"
assert 1 <= threshold <= 3600, (
f"inactive_threshold={threshold!r}s outside [1, 3600] sanity range"
)
ns = threshold * 1_000_000_000
assert ns < INT64_MAX_NS, (
f"inactive_threshold={threshold!r} would produce ns={ns}, "
f"overflowing int64 ({INT64_MAX_NS}). This is the exact v0.10.5 "
f"bug -- a unit confusion that produced 30e18 and triggered "
f"err_code=10025 from NATS."
)
# --- BY_START_TIME feedback-loop guard (v0.10.5.2) ---------------------------
def _mk_batched_sub(batches: list[list]) -> MagicMock:
"""Pull-sub mock whose fetch returns one entry from ``batches`` per call,
then empty -- lets a test stage messages that arrive AFTER the snapshot.
"""
sub = MagicMock()
calls = {"n": 0}
async def _fetch(batch=200, timeout=2.0):
if calls["n"] < len(batches):
out = batches[calls["n"]]
calls["n"] += 1
return out
return []
sub.fetch = AsyncMock(side_effect=_fetch)
sub.unsubscribe = AsyncMock()
return sub
@pytest.mark.asyncio
async def test_iter_window_stops_at_snapshot_last_seq_boundary():
"""v0.10.5.2 regression guard: the feedback loop that took down central.
v0.10.5 used DeliverPolicy.BY_START_TIME with no upper bound on the
consumer. As soon as execute_resend republished a message, the new copy
matched the time filter and the same consumer fetched it again,
republishing in an unbounded loop until the per-stream cap tripped --
potentially 450k spurious republishes across 9 streams, which is what
timed out central-gui's POST and brought the host down.
Fix: snapshot the stream's ``last_seq`` at the top of preview/execute
and pass it as ``max_stream_seq`` to ``_iter_window``. Any message
whose ``metadata.sequence.stream`` exceeds the snapshot was published
AFTER we started -- either an unrelated adapter ingest or the very
republish we just emitted -- and must not be touched.
Simulation: batch 1 holds the 100 legit pre-snapshot messages
(stream_seqs 100..199, snapshot=199). Batch 2 holds 100 post-snapshot
messages (stream_seqs 200..299) -- the "echoes" that v0.10.5 would
have looped on. Assert iter yields exactly 100 (all from batch 1) and
NEVER yields a batch-2 message.
"""
pre_snapshot = [
_mk_msg(f"central.fire.x.{seq}", stream_seq=seq)
for seq in range(100, 200)
]
post_snapshot = [
_mk_msg(f"central.fire.x.{seq}", stream_seq=seq)
for seq in range(200, 300)
]
sub = _mk_batched_sub([pre_snapshot, post_snapshot])
js = MagicMock()
js.pull_subscribe = AsyncMock(return_value=sub)
yielded = []
async for msg in resend_mod._iter_window(
js, "CENTRAL_FIRE", "central.fire.>",
cutoff=datetime(2026, 6, 7, 0, 0, 0, tzinfo=timezone.utc),
max_stream_seq=199,
):
yielded.append(msg.metadata.sequence.stream)
assert len(yielded) == 100
assert yielded == list(range(100, 200))
# Cleanup still runs after early return.
sub.unsubscribe.assert_awaited_once()
@pytest.mark.asyncio
async def test_iter_window_enforces_per_stream_cap_with_warning(caplog):
"""v0.10.5.2 regression guard: cap dropped from 50_000 to 5_000.
A legit operator window should never exceed this. If it does, we want
the operator to hear about it via a warning log -- silent truncation
is exactly the kind of behavior that hid the v0.10.5 feedback loop
from the operator until it took the host down.
Simulation: snapshot is set high enough (10**6) that it never limits
iteration; cap (5000) is the only stopper. Batches of 200 messages
each, seqs 1..6200, so the cap trips well before the source runs out.
"""
batches = [
[_mk_msg(f"central.x.{seq}", stream_seq=seq)
for seq in range(start, start + 200)]
for start in range(1, 6201, 200)
]
sub = _mk_batched_sub(batches)
js = MagicMock()
js.pull_subscribe = AsyncMock(return_value=sub)
yielded = 0
with caplog.at_level("WARNING", logger="central.gui.resend"):
async for _ in resend_mod._iter_window(
js, "CENTRAL_FIRE", "central.fire.>",
cutoff=datetime(2026, 6, 7, 0, 0, 0, tzinfo=timezone.utc),
max_stream_seq=10**6,
):
yielded += 1
assert yielded == resend_mod._MAX_MSGS_PER_STREAM == 5_000
cap_warnings = [r for r in caplog.records
if r.levelname == "WARNING"
and "cap reached" in r.getMessage()]
assert len(cap_warnings) == 1
assert cap_warnings[0].stream == "CENTRAL_FIRE"
sub.unsubscribe.assert_awaited_once()
# --- stream-set safety -------------------------------------------------------
def test_central_meta_excluded_from_replay_set():
"""CENTRAL_META is status-only; replaying it would broadcast stale audit
records back through archive's consumers."""
names = [s.name for s in resend_mod._event_bearing_streams()]
assert "CENTRAL_META" not in names
# Sanity: the 9 event-bearing streams are present.
for expected in ("CENTRAL_FIRE", "CENTRAL_TRAFFIC", "CENTRAL_WX",
"CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_DISASTER",
"CENTRAL_HYDRO", "CENTRAL_TRAFFIC_FLOW",
"CENTRAL_TRAFFIC_CAMERAS"):
assert expected in names