From 81a2c30fdb690ddf7980fcde1cd5a9139810f64f Mon Sep 17 00:00:00 2001 From: malice Date: Mon, 8 Jun 2026 01:12:22 -0600 Subject: [PATCH] v0.10.8: discriminate Nats-Msg-Id by event.category to prevent incident+perimeter dedup collision (#96) JetStream's 2-min per-stream duplicate window keys on Nats-Msg-Id alone -- the subject is NOT part of the dedup key. So when wfigs_incidents and wfigs_perimeters published envelopes for the same Summit Creek IRWIN {6B1C6EB1-30F7-4613-9C58-4801DC8FD822} within seconds of each other on 2026-06-07, the second publish was silently dropped: supervisor logged 'Published event' for both, but CENTRAL_FIRE's last_seq advanced by 25 instead of 26. Root cause: cloudevents_wire.wrap_event returned event.id verbatim as msg_id, and both wfigs adapters use the bare IRWIN GUID as event.id. Fix: synthesize msg_id as f'{event.id}:{event.category}' in wrap_event. Categories already differ ('fire.incident.X' vs 'fire.perimeter.X') so this is a natural and load-bearing discriminator. envelope['id'] stays the bare event.id per CloudEvents spec, so subscribers that key off the payload id field are unaffected. Adapter-side event.id construction unchanged. Backward compatibility: this is a one-time msg_id-shape change. The JetStream 2-min dedup window is per-(event-id, category) starting now instead of per-event-id. For events still in the dedup window at the moment of the deploy under the OLD shape, their first post-deploy publish under the NEW shape is a fresh msg_id and lands normally -- no collision risk. Quake and other single-class adapters get a no-op shape change (their category is constant per adapter, so dedup still catches genuine same-id duplicates). Tests: - test_required_fields_present updated for the new assertion. - New test_msgid_disambiguates_incident_vs_perimeter_same_guid regression guard with the real Summit Creek IRWIN -- assert the two msg_ids differ and match the expected shape exactly. - New parametrize test_msgid_shape_is_id_colon_category over 5 categories covering multi-class (fire.incident, fire.perimeter) and single-class (quake, wx.alert, hydro) adapters. Full sweep: 1028 passed, 0 failures (+6 from this PR), ruff clean. Deploy: NOT an output-shape change to envelope payload, no published_ids flush needed. Squash-merge -> tag v0.10.8 -> pull on central -> restart central-supervisor. Verify by forcing a Summit Creek republish and confirming both .incident.id.cassia AND .perimeter.id.cassia land post-deploy with the same IRWIN. Co-authored-by: Claude Opus 4.7 (1M context) --- src/central/cloudevents_wire.py | 15 ++++-- tests/test_models.py | 83 +++++++++++++++++++++++++++++++-- 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/src/central/cloudevents_wire.py b/src/central/cloudevents_wire.py index f8e5630..1d9d776 100644 --- a/src/central/cloudevents_wire.py +++ b/src/central/cloudevents_wire.py @@ -22,8 +22,17 @@ def wrap_event( or None to use defaults. Returns: - A tuple of (envelope_dict, msg_id) where msg_id is the - CloudEvent id for use as Nats-Msg-Id header. + A tuple of (envelope_dict, msg_id). ``msg_id`` is what we hand to + JetStream as the ``Nats-Msg-Id`` header. As of v0.10.8 it has the + category-discriminated shape ``"{event.id}:{event.category}"`` so + that an incident envelope and a perimeter envelope sharing the + same source GUID (e.g. wfigs_incidents and wfigs_perimeters both + emitting a Summit Creek IRWIN within JetStream's 2-min dedup + window on CENTRAL_FIRE) do NOT collide -- the subject is not part + of the dedup key, so the category suffix is what makes them + distinct. ``envelope["id"]`` itself stays the bare ``event.id`` + per the CloudEvents spec, so subscribers that key off the + payload id are unaffected. """ # Resolve CloudEventsConfig from various input types if config is None: @@ -67,4 +76,4 @@ def wrap_event( envelope: dict[str, Any] = dict(ce.get_attributes()) envelope["data"] = ce.data - return envelope, event.id + return envelope, f"{event.id}:{event.category}" diff --git a/tests/test_models.py b/tests/test_models.py index 7ed5c54..ff09e21 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -64,11 +64,17 @@ class TestCloudEventsWire: def test_required_fields_present( self, sample_event: Event, sample_config: Config ) -> None: - """Required CloudEvents fields are present.""" + """Required CloudEvents fields are present. + + v0.10.8: msg_id is now category-discriminated. The CloudEvents + envelope ``id`` stays the bare ``event.id`` per CE spec; only the + ``Nats-Msg-Id`` header (= the second return value of wrap_event) + carries the ``"{event.id}:{event.category}"`` shape. + """ envelope, msg_id = wrap_event(sample_event, sample_config) - assert msg_id == sample_event.id - assert envelope["id"] == sample_event.id + assert msg_id == f"{sample_event.id}:{sample_event.category}" + assert envelope["id"] == sample_event.id # CE spec: bare event.id assert envelope["source"] == sample_config.cloudevents.source assert envelope["type"] == "central.wx.alert.severe_thunderstorm_warning.v1" assert envelope["specversion"] == "1.0" @@ -76,6 +82,77 @@ class TestCloudEventsWire: assert envelope["datacontenttype"] == "application/json" assert "data" in envelope + def test_msgid_disambiguates_incident_vs_perimeter_same_guid( + self, sample_geo: Geo, sample_config: Config + ) -> None: + """v0.10.8 regression guard: Summit Creek dedup collision. + + On 2026-06-07 a wfigs poll yielded both an incident envelope and + a perimeter envelope for the same IRWIN ``{6B1C6EB1-...}`` within + JetStream's 2-min dedup window on CENTRAL_FIRE. Both publishes + carried the bare IRWIN as ``Nats-Msg-Id`` (JetStream's dedup key + is per-stream, NOT per-subject), so the second publish was + silently dropped -- supervisor logs showed 'Published event' for + both but only one landed in the stream. The fix adds an + ``:{event.category}`` suffix to the msg_id so the two are + distinct without changing ``envelope["id"]`` (CE spec) or the + adapter-side event.id contract. + """ + irwin = "{6B1C6EB1-30F7-4613-9C58-4801DC8FD822}" # Summit Creek + incident = Event( + id=irwin, + adapter="wfigs_incidents", + category="fire.incident.wildfire", + time=datetime(2026, 6, 7, 6, 25, 5, tzinfo=timezone.utc), + severity=2, + geo=sample_geo, + data={}, + ) + perimeter = Event( + id=irwin, + adapter="wfigs_perimeters", + category="fire.perimeter.wildfire", + time=datetime(2026, 6, 7, 6, 25, 5, tzinfo=timezone.utc), + severity=2, + geo=sample_geo, + data={}, + ) + + _, incident_msgid = wrap_event(incident, sample_config) + _, perimeter_msgid = wrap_event(perimeter, sample_config) + + assert incident_msgid != perimeter_msgid + assert incident_msgid == f"{irwin}:fire.incident.wildfire" + assert perimeter_msgid == f"{irwin}:fire.perimeter.wildfire" + + @pytest.mark.parametrize("category", [ + "fire.incident.wildfire", + "fire.perimeter.wildfire", + "wx.alert.severe_thunderstorm_warning", + "quake.usgs_quake", + "hydro.00060.usgs.13162225", + ]) + def test_msgid_shape_is_id_colon_category( + self, sample_geo: Geo, sample_config: Config, category: str + ) -> None: + """Every wrap_event call returns msg_id == ``f'{id}:{category}'``. + + Covers both multi-class adapters (where the discriminator is + load-bearing) and single-class adapters (where it's a no-op for + dedup but still consistent shape across the codebase). + """ + event = Event( + id=f"test:{category}:42", + adapter="test", + category=category, + time=datetime(2026, 6, 7, 6, 25, 5, tzinfo=timezone.utc), + severity=1, + geo=sample_geo, + data={}, + ) + _, msg_id = wrap_event(event, sample_config) + assert msg_id == f"{event.id}:{category}" + def test_extension_attributes_lowercase( self, sample_event: Event, sample_config: Config ) -> None: