diff --git a/src/central/gui/resend.py b/src/central/gui/resend.py index 33f953d..2fe3ebe 100644 --- a/src/central/gui/resend.py +++ b/src/central/gui/resend.py @@ -42,9 +42,15 @@ logger = logging.getLogger(__name__) # Pull-fetch tuning. The ephemeral consumer's inactive_threshold guarantees # JetStream auto-cleans the temp consumer if anything kills our iterator. +# v0.10.5.1 fix: ``inactive_threshold`` is expected as float SECONDS by +# nats-py (which then multiplies by 1e9 internally to form the nanosecond +# value sent to the server). v0.10.5 passed ``int(30e9)`` thinking it was +# already in ns, which got re-multiplied to 30e18 -- out of int64 range, +# rejected by the server with err_code=10025. Use the documented float- +# seconds API and let the library handle the unit conversion. _FETCH_BATCH = 200 _FETCH_TIMEOUT_S = 2.0 -_INACTIVE_THRESHOLD_NS = int(30e9) +_INACTIVE_THRESHOLD_S = 30.0 # Hard cap per stream per operation. 24h * worst-case CENTRAL_TRAFFIC_FLOW # volume is still well under this; bump if a legitimate operator action @@ -103,7 +109,7 @@ async def _iter_window( deliver_policy=DeliverPolicy.BY_START_TIME, opt_start_time=cutoff.isoformat(), ack_policy=AckPolicy.NONE, - inactive_threshold=_INACTIVE_THRESHOLD_NS, + inactive_threshold=_INACTIVE_THRESHOLD_S, filter_subject=subject_filter, ) try: diff --git a/tests/test_resend.py b/tests/test_resend.py index 3a9ccc5..a4a781e 100644 --- a/tests/test_resend.py +++ b/tests/test_resend.py @@ -250,6 +250,56 @@ async def test_execute_rejects_invalid_window(): nc.publish.assert_not_called() +# --- ConsumerConfig regression guard (v0.10.5.1) ----------------------------- + + +@pytest.mark.asyncio +async def test_pull_subscribe_inactive_threshold_within_nats_range(): + """v0.10.5.1 regression guard: ``inactive_threshold`` on the ephemeral + consumer must be a number nats-py can serialise as a Go ``time.Duration``. + + v0.10.5 passed ``int(30e9)`` thinking it was nanoseconds. nats-py treats + the value as float SECONDS and multiplies by 1e9 internally, so the + server received 30e18 -- out of int64 range. NATS rejected the consumer + with ``err_code=10025``; preview_resend caught the exception per-stream + and returned 0 events across the board (silent verification failure). + + Assert the captured config has: + - ``inactive_threshold`` in [1, 3600] seconds (operator sanity range) + - the nanosecond-equivalent (value * 1e9) fits within int64 + The ``int64`` ceiling is 9_223_372_036_854_775_807 -- anything above that + triggers the same JSON unmarshal error that broke v0.10.5. + """ + captured_configs: list = [] + + async def _capture_config(filter_subj, durable=None, stream=None, config=None): + captured_configs.append(config) + sub = MagicMock() + sub.fetch = AsyncMock(return_value=[]) + sub.unsubscribe = AsyncMock() + return sub + + js = MagicMock() + js.pull_subscribe = AsyncMock(side_effect=_capture_config) + await preview_resend(js, minutes=60) + + INT64_MAX_NS = 9_223_372_036_854_775_807 + assert captured_configs, "expected at least one pull_subscribe call" + for cfg in captured_configs: + threshold = cfg.inactive_threshold + assert threshold is not None, "inactive_threshold must be set" + assert 1 <= threshold <= 3600, ( + f"inactive_threshold={threshold!r}s outside [1, 3600] sanity range" + ) + ns = threshold * 1_000_000_000 + assert ns < INT64_MAX_NS, ( + f"inactive_threshold={threshold!r} would produce ns={ns}, " + f"overflowing int64 ({INT64_MAX_NS}). This is the exact v0.10.5 " + f"bug -- a unit confusion that produced 30e18 and triggered " + f"err_code=10025 from NATS." + ) + + # --- stream-set safety -------------------------------------------------------