v0.10.5.2: fix BY_START_TIME feedback loop in Re-send (snapshot last_seq boundary) (#93)

The v0.10.5 ephemeral pull-consumer used DeliverPolicy.BY_START_TIME with
no upper bound, so every republish satisfied the same time filter and
the consumer fetched its own output back -- an unbounded loop bounded
only by the per-stream cap. Operator-triggered 5-minute resend on
2026-06-07 ran the loop long enough to time out central-gui's POST and
the host went down with it.

Fix: snapshot each event-bearing stream's last_seq up front via a new
_snapshot_last_seqs() helper, pass it to _iter_window as max_stream_seq,
and exit the generator the first time msg.metadata.sequence.stream
exceeds the snapshot. Pull-consumer delivery is stream-seq ascending so
one boundary check suffices.

Also drop _MAX_MSGS_PER_STREAM 50_000 -> 5_000 and add a WARNING log
when the cap is hit -- a legitimate operator window should never reach
it, and silent truncation hid the v0.10.5 loop until the host fell over.

Two regression tests cover the new behavior: one stages pre/post-snapshot
batches and asserts the post-snapshot batch is never yielded; one
overwhelms the cap and asserts the warning fires.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
malice 2026-06-06 22:36:04 -06:00 committed by GitHub
commit b17d8bcd54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 218 additions and 10 deletions

View file

@ -52,10 +52,11 @@ _FETCH_BATCH = 200
_FETCH_TIMEOUT_S = 2.0
_INACTIVE_THRESHOLD_S = 30.0
# Hard cap per stream per operation. 24h * worst-case CENTRAL_TRAFFIC_FLOW
# volume is still well under this; bump if a legitimate operator action
# ever hits it.
_MAX_MSGS_PER_STREAM = 50_000
# Hard cap per stream per operation. v0.10.5.2 dropped this from 50_000 to
# 5_000 after the BY_START_TIME feedback loop ran wild: a legitimate
# operator window should never exceed this, so hitting the cap is now a
# warning condition the operator should hear about.
_MAX_MSGS_PER_STREAM = 5_000
# Audit-log meta subject. CENTRAL_META filter (`central.meta.>`) already
# captures it; archive does NOT consume CENTRAL_META.
@ -97,13 +98,22 @@ async def _iter_window(
stream_name: str,
subject_filter: str,
cutoff: datetime,
max_stream_seq: int,
):
"""Yield each NATS message in ``stream_name`` since ``cutoff``.
"""Yield each NATS message in ``stream_name`` since ``cutoff`` up to ``max_stream_seq``.
Uses an ephemeral pull-consumer (``durable=None``, ``ack_policy=NONE``,
``inactive_threshold=30s``) with ``DeliverPolicy.BY_START_TIME`` so the
JetStream server filters server-side and we never paginate over the full
stream history.
``max_stream_seq`` is the snapshot of the stream's ``last_seq`` taken
immediately before iteration began. Any message with
``msg.metadata.sequence.stream > max_stream_seq`` arrived AFTER the
snapshot -- either an unrelated adapter publish or, critically, a
republish from the very wave we're currently emitting. Iteration stops
cleanly at that boundary, which kills the v0.10.5 feedback loop where
BY_START_TIME alone kept matching our own republished messages.
"""
config = ConsumerConfig(
deliver_policy=DeliverPolicy.BY_START_TIME,
@ -136,10 +146,21 @@ async def _iter_window(
if not msgs:
break
for msg in msgs:
# Pull-consumer delivery order is stream-seq ascending, so the
# first message past the snapshot means every remaining message
# also is -- exit the generator cleanly (finally still runs).
if msg.metadata.sequence.stream > max_stream_seq:
return
yielded += 1
yield msg
if yielded >= _MAX_MSGS_PER_STREAM:
break
logger.warning(
"resend: per-stream message cap reached, "
"remaining matches in window were not processed",
extra={"stream": stream_name,
"cap": _MAX_MSGS_PER_STREAM},
)
return
finally:
try:
await sub.unsubscribe()
@ -147,23 +168,62 @@ async def _iter_window(
pass
async def _snapshot_last_seqs(js: JetStreamContext) -> tuple[dict[str, int], set[str]]:
"""Capture each event-bearing stream's ``last_seq`` as the resend boundary.
Returns ``(snapshot, errored)``. Streams that don't exist (fresh dev box)
are omitted from ``snapshot`` and NOT marked errored -- they're simply
empty. Streams whose ``stream_info`` call raises any other exception are
added to ``errored`` so the caller can report them without iterating.
Taken all at once at the top of preview/execute so every stream sees a
point-in-time boundary that pre-dates any republish we're about to do.
"""
snapshot: dict[str, int] = {}
errored: set[str] = set()
for s in _event_bearing_streams():
try:
info = await js.stream_info(s.name)
snapshot[s.name] = info.state.last_seq
except NotFoundError:
# Empty/absent stream: skip silently, no error.
pass
except Exception:
logger.exception("resend: snapshot failed", extra={"stream": s.name})
errored.add(s.name)
return snapshot, errored
async def preview_resend(js: JetStreamContext, minutes: int) -> dict[str, Any]:
"""Count messages per event-bearing stream within the last ``minutes``.
Streams that error out are reported with ``None`` in ``by_stream`` and
``errors`` incremented; the preview never raises.
``errors`` incremented; the preview never raises. The per-stream message
count is bounded by the snapshot of ``last_seq`` captured at the top of
the call so a preview taken immediately after a resend doesn't include
the messages we just republished.
"""
if minutes <= 0 or not is_valid_window(minutes):
return {"count": 0, "by_stream": {}, "minutes": minutes,
"window_label": window_label(minutes), "errors": 0}
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
snapshot, errored = await _snapshot_last_seqs(js)
by_stream: dict[str, int | None] = {}
total = 0
errors = 0
for s in _event_bearing_streams():
if s.name in errored:
by_stream[s.name] = None
errors += 1
continue
if s.name not in snapshot:
by_stream[s.name] = 0
continue
try:
n = 0
async for _ in _iter_window(js, s.name, s.subject_filter, cutoff):
async for _ in _iter_window(
js, s.name, s.subject_filter, cutoff, snapshot[s.name],
):
n += 1
by_stream[s.name] = n
total += n
@ -203,6 +263,8 @@ async def execute_resend(
started_mono = time.monotonic()
ts_ms = int(time.time() * 1000)
snapshot, errored = await _snapshot_last_seqs(js)
published = 0
errors = 0
by_stream: dict[str, dict[str, int]] = {}
@ -210,8 +272,17 @@ async def execute_resend(
for s in _event_bearing_streams():
n_ok = 0
n_err = 0
if s.name in errored:
by_stream[s.name] = {"published": 0, "errors": 1}
errors += 1
continue
if s.name not in snapshot:
by_stream[s.name] = {"published": 0, "errors": 0}
continue
try:
async for msg in _iter_window(js, s.name, s.subject_filter, cutoff):
async for msg in _iter_window(
js, s.name, s.subject_filter, cutoff, snapshot[s.name],
):
hdr = msg.headers or {}
orig = hdr.get("Nats-Msg-Id") or hdr.get("nats-msg-id")
if orig:

View file

@ -12,6 +12,7 @@ success fragment.
from __future__ import annotations
import json
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
@ -30,11 +31,16 @@ from central.gui.resend import (
def _mk_msg(subject: str, data: bytes = b'{"data":{"x":1}}',
headers: dict | None = None):
headers: dict | None = None, stream_seq: int = 0):
msg = MagicMock()
msg.subject = subject
msg.data = data
msg.headers = headers if headers is not None else {"Nats-Msg-Id": subject}
# Mirror nats-py's Msg.metadata.sequence.stream so the v0.10.5.2 snapshot
# filter has a concrete int to compare against (default 0 stays well
# below the high default snapshot in _mk_js so no existing test gets
# filtered out).
msg.metadata.sequence.stream = stream_seq
return msg
@ -43,6 +49,10 @@ def _mk_js(per_stream_msgs: dict[str, list]) -> MagicMock:
The fetch sequence returns the list once, then empty (terminates the
iterator). publish is captured for assertions; unsubscribe is no-op.
stream_info returns a snapshot that's intentionally far above any test
message's stream_seq so the v0.10.5.2 boundary check is a no-op for
every existing test -- only the dedicated regression tests below
exercise the filter explicitly.
"""
js = MagicMock()
captured_publishes: list[tuple[str, bytes, dict]] = []
@ -53,6 +63,13 @@ def _mk_js(per_stream_msgs: dict[str, list]) -> MagicMock:
js.publish = AsyncMock(side_effect=_publish)
js._captured = captured_publishes
async def _stream_info(name):
info = MagicMock()
info.state.last_seq = 10**12
return info
js.stream_info = AsyncMock(side_effect=_stream_info)
async def _pull_subscribe(filter_subj, durable=None, stream=None, config=None):
sub = MagicMock()
msgs = list(per_stream_msgs.get(stream, []))
@ -281,6 +298,13 @@ async def test_pull_subscribe_inactive_threshold_within_nats_range():
js = MagicMock()
js.pull_subscribe = AsyncMock(side_effect=_capture_config)
# v0.10.5.2: preview_resend now snapshots last_seq before iterating; give
# the bare mock a stream_info AsyncMock so we still reach pull_subscribe.
async def _stream_info(name):
info = MagicMock()
info.state.last_seq = 10**12
return info
js.stream_info = AsyncMock(side_effect=_stream_info)
await preview_resend(js, minutes=60)
INT64_MAX_NS = 9_223_372_036_854_775_807
@ -300,6 +324,119 @@ async def test_pull_subscribe_inactive_threshold_within_nats_range():
)
# --- BY_START_TIME feedback-loop guard (v0.10.5.2) ---------------------------
def _mk_batched_sub(batches: list[list]) -> MagicMock:
"""Pull-sub mock whose fetch returns one entry from ``batches`` per call,
then empty -- lets a test stage messages that arrive AFTER the snapshot.
"""
sub = MagicMock()
calls = {"n": 0}
async def _fetch(batch=200, timeout=2.0):
if calls["n"] < len(batches):
out = batches[calls["n"]]
calls["n"] += 1
return out
return []
sub.fetch = AsyncMock(side_effect=_fetch)
sub.unsubscribe = AsyncMock()
return sub
@pytest.mark.asyncio
async def test_iter_window_stops_at_snapshot_last_seq_boundary():
"""v0.10.5.2 regression guard: the feedback loop that took down central.
v0.10.5 used DeliverPolicy.BY_START_TIME with no upper bound on the
consumer. As soon as execute_resend republished a message, the new copy
matched the time filter and the same consumer fetched it again,
republishing in an unbounded loop until the per-stream cap tripped --
potentially 450k spurious republishes across 9 streams, which is what
timed out central-gui's POST and brought the host down.
Fix: snapshot the stream's ``last_seq`` at the top of preview/execute
and pass it as ``max_stream_seq`` to ``_iter_window``. Any message
whose ``metadata.sequence.stream`` exceeds the snapshot was published
AFTER we started -- either an unrelated adapter ingest or the very
republish we just emitted -- and must not be touched.
Simulation: batch 1 holds the 100 legit pre-snapshot messages
(stream_seqs 100..199, snapshot=199). Batch 2 holds 100 post-snapshot
messages (stream_seqs 200..299) -- the "echoes" that v0.10.5 would
have looped on. Assert iter yields exactly 100 (all from batch 1) and
NEVER yields a batch-2 message.
"""
pre_snapshot = [
_mk_msg(f"central.fire.x.{seq}", stream_seq=seq)
for seq in range(100, 200)
]
post_snapshot = [
_mk_msg(f"central.fire.x.{seq}", stream_seq=seq)
for seq in range(200, 300)
]
sub = _mk_batched_sub([pre_snapshot, post_snapshot])
js = MagicMock()
js.pull_subscribe = AsyncMock(return_value=sub)
yielded = []
async for msg in resend_mod._iter_window(
js, "CENTRAL_FIRE", "central.fire.>",
cutoff=datetime(2026, 6, 7, 0, 0, 0, tzinfo=timezone.utc),
max_stream_seq=199,
):
yielded.append(msg.metadata.sequence.stream)
assert len(yielded) == 100
assert yielded == list(range(100, 200))
# Cleanup still runs after early return.
sub.unsubscribe.assert_awaited_once()
@pytest.mark.asyncio
async def test_iter_window_enforces_per_stream_cap_with_warning(caplog):
"""v0.10.5.2 regression guard: cap dropped from 50_000 to 5_000.
A legit operator window should never exceed this. If it does, we want
the operator to hear about it via a warning log -- silent truncation
is exactly the kind of behavior that hid the v0.10.5 feedback loop
from the operator until it took the host down.
Simulation: snapshot is set high enough (10**6) that it never limits
iteration; cap (5000) is the only stopper. Batches of 200 messages
each, seqs 1..6200, so the cap trips well before the source runs out.
"""
batches = [
[_mk_msg(f"central.x.{seq}", stream_seq=seq)
for seq in range(start, start + 200)]
for start in range(1, 6201, 200)
]
sub = _mk_batched_sub(batches)
js = MagicMock()
js.pull_subscribe = AsyncMock(return_value=sub)
yielded = 0
with caplog.at_level("WARNING", logger="central.gui.resend"):
async for _ in resend_mod._iter_window(
js, "CENTRAL_FIRE", "central.fire.>",
cutoff=datetime(2026, 6, 7, 0, 0, 0, tzinfo=timezone.utc),
max_stream_seq=10**6,
):
yielded += 1
assert yielded == resend_mod._MAX_MSGS_PER_STREAM == 5_000
cap_warnings = [r for r in caplog.records
if r.levelname == "WARNING"
and "cap reached" in r.getMessage()]
assert len(cap_warnings) == 1
assert cap_warnings[0].stream == "CENTRAL_FIRE"
sub.unsubscribe.assert_awaited_once()
# --- stream-set safety -------------------------------------------------------