diff --git a/src/central/gui/resend.py b/src/central/gui/resend.py index 2fe3ebe..effe80d 100644 --- a/src/central/gui/resend.py +++ b/src/central/gui/resend.py @@ -52,10 +52,11 @@ _FETCH_BATCH = 200 _FETCH_TIMEOUT_S = 2.0 _INACTIVE_THRESHOLD_S = 30.0 -# Hard cap per stream per operation. 24h * worst-case CENTRAL_TRAFFIC_FLOW -# volume is still well under this; bump if a legitimate operator action -# ever hits it. -_MAX_MSGS_PER_STREAM = 50_000 +# Hard cap per stream per operation. v0.10.5.2 dropped this from 50_000 to +# 5_000 after the BY_START_TIME feedback loop ran wild: a legitimate +# operator window should never exceed this, so hitting the cap is now a +# warning condition the operator should hear about. +_MAX_MSGS_PER_STREAM = 5_000 # Audit-log meta subject. CENTRAL_META filter (`central.meta.>`) already # captures it; archive does NOT consume CENTRAL_META. @@ -97,13 +98,22 @@ async def _iter_window( stream_name: str, subject_filter: str, cutoff: datetime, + max_stream_seq: int, ): - """Yield each NATS message in ``stream_name`` since ``cutoff``. + """Yield each NATS message in ``stream_name`` since ``cutoff`` up to ``max_stream_seq``. Uses an ephemeral pull-consumer (``durable=None``, ``ack_policy=NONE``, ``inactive_threshold=30s``) with ``DeliverPolicy.BY_START_TIME`` so the JetStream server filters server-side and we never paginate over the full stream history. + + ``max_stream_seq`` is the snapshot of the stream's ``last_seq`` taken + immediately before iteration began. Any message with + ``msg.metadata.sequence.stream > max_stream_seq`` arrived AFTER the + snapshot -- either an unrelated adapter publish or, critically, a + republish from the very wave we're currently emitting. Iteration stops + cleanly at that boundary, which kills the v0.10.5 feedback loop where + BY_START_TIME alone kept matching our own republished messages. """ config = ConsumerConfig( deliver_policy=DeliverPolicy.BY_START_TIME, @@ -136,10 +146,21 @@ async def _iter_window( if not msgs: break for msg in msgs: + # Pull-consumer delivery order is stream-seq ascending, so the + # first message past the snapshot means every remaining message + # also is -- exit the generator cleanly (finally still runs). + if msg.metadata.sequence.stream > max_stream_seq: + return yielded += 1 yield msg if yielded >= _MAX_MSGS_PER_STREAM: - break + logger.warning( + "resend: per-stream message cap reached, " + "remaining matches in window were not processed", + extra={"stream": stream_name, + "cap": _MAX_MSGS_PER_STREAM}, + ) + return finally: try: await sub.unsubscribe() @@ -147,23 +168,62 @@ async def _iter_window( pass +async def _snapshot_last_seqs(js: JetStreamContext) -> tuple[dict[str, int], set[str]]: + """Capture each event-bearing stream's ``last_seq`` as the resend boundary. + + Returns ``(snapshot, errored)``. Streams that don't exist (fresh dev box) + are omitted from ``snapshot`` and NOT marked errored -- they're simply + empty. Streams whose ``stream_info`` call raises any other exception are + added to ``errored`` so the caller can report them without iterating. + + Taken all at once at the top of preview/execute so every stream sees a + point-in-time boundary that pre-dates any republish we're about to do. + """ + snapshot: dict[str, int] = {} + errored: set[str] = set() + for s in _event_bearing_streams(): + try: + info = await js.stream_info(s.name) + snapshot[s.name] = info.state.last_seq + except NotFoundError: + # Empty/absent stream: skip silently, no error. + pass + except Exception: + logger.exception("resend: snapshot failed", extra={"stream": s.name}) + errored.add(s.name) + return snapshot, errored + + async def preview_resend(js: JetStreamContext, minutes: int) -> dict[str, Any]: """Count messages per event-bearing stream within the last ``minutes``. Streams that error out are reported with ``None`` in ``by_stream`` and - ``errors`` incremented; the preview never raises. + ``errors`` incremented; the preview never raises. The per-stream message + count is bounded by the snapshot of ``last_seq`` captured at the top of + the call so a preview taken immediately after a resend doesn't include + the messages we just republished. """ if minutes <= 0 or not is_valid_window(minutes): return {"count": 0, "by_stream": {}, "minutes": minutes, "window_label": window_label(minutes), "errors": 0} cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes) + snapshot, errored = await _snapshot_last_seqs(js) by_stream: dict[str, int | None] = {} total = 0 errors = 0 for s in _event_bearing_streams(): + if s.name in errored: + by_stream[s.name] = None + errors += 1 + continue + if s.name not in snapshot: + by_stream[s.name] = 0 + continue try: n = 0 - async for _ in _iter_window(js, s.name, s.subject_filter, cutoff): + async for _ in _iter_window( + js, s.name, s.subject_filter, cutoff, snapshot[s.name], + ): n += 1 by_stream[s.name] = n total += n @@ -203,6 +263,8 @@ async def execute_resend( started_mono = time.monotonic() ts_ms = int(time.time() * 1000) + snapshot, errored = await _snapshot_last_seqs(js) + published = 0 errors = 0 by_stream: dict[str, dict[str, int]] = {} @@ -210,8 +272,17 @@ async def execute_resend( for s in _event_bearing_streams(): n_ok = 0 n_err = 0 + if s.name in errored: + by_stream[s.name] = {"published": 0, "errors": 1} + errors += 1 + continue + if s.name not in snapshot: + by_stream[s.name] = {"published": 0, "errors": 0} + continue try: - async for msg in _iter_window(js, s.name, s.subject_filter, cutoff): + async for msg in _iter_window( + js, s.name, s.subject_filter, cutoff, snapshot[s.name], + ): hdr = msg.headers or {} orig = hdr.get("Nats-Msg-Id") or hdr.get("nats-msg-id") if orig: diff --git a/tests/test_resend.py b/tests/test_resend.py index a4a781e..e0c025c 100644 --- a/tests/test_resend.py +++ b/tests/test_resend.py @@ -12,6 +12,7 @@ success fragment. from __future__ import annotations import json +from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock import pytest @@ -30,11 +31,16 @@ from central.gui.resend import ( def _mk_msg(subject: str, data: bytes = b'{"data":{"x":1}}', - headers: dict | None = None): + headers: dict | None = None, stream_seq: int = 0): msg = MagicMock() msg.subject = subject msg.data = data msg.headers = headers if headers is not None else {"Nats-Msg-Id": subject} + # Mirror nats-py's Msg.metadata.sequence.stream so the v0.10.5.2 snapshot + # filter has a concrete int to compare against (default 0 stays well + # below the high default snapshot in _mk_js so no existing test gets + # filtered out). + msg.metadata.sequence.stream = stream_seq return msg @@ -43,6 +49,10 @@ def _mk_js(per_stream_msgs: dict[str, list]) -> MagicMock: The fetch sequence returns the list once, then empty (terminates the iterator). publish is captured for assertions; unsubscribe is no-op. + stream_info returns a snapshot that's intentionally far above any test + message's stream_seq so the v0.10.5.2 boundary check is a no-op for + every existing test -- only the dedicated regression tests below + exercise the filter explicitly. """ js = MagicMock() captured_publishes: list[tuple[str, bytes, dict]] = [] @@ -53,6 +63,13 @@ def _mk_js(per_stream_msgs: dict[str, list]) -> MagicMock: js.publish = AsyncMock(side_effect=_publish) js._captured = captured_publishes + async def _stream_info(name): + info = MagicMock() + info.state.last_seq = 10**12 + return info + + js.stream_info = AsyncMock(side_effect=_stream_info) + async def _pull_subscribe(filter_subj, durable=None, stream=None, config=None): sub = MagicMock() msgs = list(per_stream_msgs.get(stream, [])) @@ -281,6 +298,13 @@ async def test_pull_subscribe_inactive_threshold_within_nats_range(): js = MagicMock() js.pull_subscribe = AsyncMock(side_effect=_capture_config) + # v0.10.5.2: preview_resend now snapshots last_seq before iterating; give + # the bare mock a stream_info AsyncMock so we still reach pull_subscribe. + async def _stream_info(name): + info = MagicMock() + info.state.last_seq = 10**12 + return info + js.stream_info = AsyncMock(side_effect=_stream_info) await preview_resend(js, minutes=60) INT64_MAX_NS = 9_223_372_036_854_775_807 @@ -300,6 +324,119 @@ async def test_pull_subscribe_inactive_threshold_within_nats_range(): ) +# --- BY_START_TIME feedback-loop guard (v0.10.5.2) --------------------------- + + +def _mk_batched_sub(batches: list[list]) -> MagicMock: + """Pull-sub mock whose fetch returns one entry from ``batches`` per call, + then empty -- lets a test stage messages that arrive AFTER the snapshot. + """ + sub = MagicMock() + calls = {"n": 0} + + async def _fetch(batch=200, timeout=2.0): + if calls["n"] < len(batches): + out = batches[calls["n"]] + calls["n"] += 1 + return out + return [] + + sub.fetch = AsyncMock(side_effect=_fetch) + sub.unsubscribe = AsyncMock() + return sub + + +@pytest.mark.asyncio +async def test_iter_window_stops_at_snapshot_last_seq_boundary(): + """v0.10.5.2 regression guard: the feedback loop that took down central. + + v0.10.5 used DeliverPolicy.BY_START_TIME with no upper bound on the + consumer. As soon as execute_resend republished a message, the new copy + matched the time filter and the same consumer fetched it again, + republishing in an unbounded loop until the per-stream cap tripped -- + potentially 450k spurious republishes across 9 streams, which is what + timed out central-gui's POST and brought the host down. + + Fix: snapshot the stream's ``last_seq`` at the top of preview/execute + and pass it as ``max_stream_seq`` to ``_iter_window``. Any message + whose ``metadata.sequence.stream`` exceeds the snapshot was published + AFTER we started -- either an unrelated adapter ingest or the very + republish we just emitted -- and must not be touched. + + Simulation: batch 1 holds the 100 legit pre-snapshot messages + (stream_seqs 100..199, snapshot=199). Batch 2 holds 100 post-snapshot + messages (stream_seqs 200..299) -- the "echoes" that v0.10.5 would + have looped on. Assert iter yields exactly 100 (all from batch 1) and + NEVER yields a batch-2 message. + """ + pre_snapshot = [ + _mk_msg(f"central.fire.x.{seq}", stream_seq=seq) + for seq in range(100, 200) + ] + post_snapshot = [ + _mk_msg(f"central.fire.x.{seq}", stream_seq=seq) + for seq in range(200, 300) + ] + sub = _mk_batched_sub([pre_snapshot, post_snapshot]) + + js = MagicMock() + js.pull_subscribe = AsyncMock(return_value=sub) + + yielded = [] + async for msg in resend_mod._iter_window( + js, "CENTRAL_FIRE", "central.fire.>", + cutoff=datetime(2026, 6, 7, 0, 0, 0, tzinfo=timezone.utc), + max_stream_seq=199, + ): + yielded.append(msg.metadata.sequence.stream) + + assert len(yielded) == 100 + assert yielded == list(range(100, 200)) + # Cleanup still runs after early return. + sub.unsubscribe.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_iter_window_enforces_per_stream_cap_with_warning(caplog): + """v0.10.5.2 regression guard: cap dropped from 50_000 to 5_000. + + A legit operator window should never exceed this. If it does, we want + the operator to hear about it via a warning log -- silent truncation + is exactly the kind of behavior that hid the v0.10.5 feedback loop + from the operator until it took the host down. + + Simulation: snapshot is set high enough (10**6) that it never limits + iteration; cap (5000) is the only stopper. Batches of 200 messages + each, seqs 1..6200, so the cap trips well before the source runs out. + """ + batches = [ + [_mk_msg(f"central.x.{seq}", stream_seq=seq) + for seq in range(start, start + 200)] + for start in range(1, 6201, 200) + ] + sub = _mk_batched_sub(batches) + + js = MagicMock() + js.pull_subscribe = AsyncMock(return_value=sub) + + yielded = 0 + with caplog.at_level("WARNING", logger="central.gui.resend"): + async for _ in resend_mod._iter_window( + js, "CENTRAL_FIRE", "central.fire.>", + cutoff=datetime(2026, 6, 7, 0, 0, 0, tzinfo=timezone.utc), + max_stream_seq=10**6, + ): + yielded += 1 + + assert yielded == resend_mod._MAX_MSGS_PER_STREAM == 5_000 + cap_warnings = [r for r in caplog.records + if r.levelname == "WARNING" + and "cap reached" in r.getMessage()] + assert len(cap_warnings) == 1 + assert cap_warnings[0].stream == "CENTRAL_FIRE" + sub.unsubscribe.assert_awaited_once() + + # --- stream-set safety -------------------------------------------------------