diff --git a/src/central/cloudevents_wire.py b/src/central/cloudevents_wire.py index f8e5630..1d9d776 100644 --- a/src/central/cloudevents_wire.py +++ b/src/central/cloudevents_wire.py @@ -22,8 +22,17 @@ def wrap_event( or None to use defaults. Returns: - A tuple of (envelope_dict, msg_id) where msg_id is the - CloudEvent id for use as Nats-Msg-Id header. + A tuple of (envelope_dict, msg_id). ``msg_id`` is what we hand to + JetStream as the ``Nats-Msg-Id`` header. As of v0.10.8 it has the + category-discriminated shape ``"{event.id}:{event.category}"`` so + that an incident envelope and a perimeter envelope sharing the + same source GUID (e.g. wfigs_incidents and wfigs_perimeters both + emitting a Summit Creek IRWIN within JetStream's 2-min dedup + window on CENTRAL_FIRE) do NOT collide -- the subject is not part + of the dedup key, so the category suffix is what makes them + distinct. ``envelope["id"]`` itself stays the bare ``event.id`` + per the CloudEvents spec, so subscribers that key off the + payload id are unaffected. """ # Resolve CloudEventsConfig from various input types if config is None: @@ -67,4 +76,4 @@ def wrap_event( envelope: dict[str, Any] = dict(ce.get_attributes()) envelope["data"] = ce.data - return envelope, event.id + return envelope, f"{event.id}:{event.category}" diff --git a/tests/test_models.py b/tests/test_models.py index 7ed5c54..ff09e21 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -64,11 +64,17 @@ class TestCloudEventsWire: def test_required_fields_present( self, sample_event: Event, sample_config: Config ) -> None: - """Required CloudEvents fields are present.""" + """Required CloudEvents fields are present. + + v0.10.8: msg_id is now category-discriminated. The CloudEvents + envelope ``id`` stays the bare ``event.id`` per CE spec; only the + ``Nats-Msg-Id`` header (= the second return value of wrap_event) + carries the ``"{event.id}:{event.category}"`` shape. + """ envelope, msg_id = wrap_event(sample_event, sample_config) - assert msg_id == sample_event.id - assert envelope["id"] == sample_event.id + assert msg_id == f"{sample_event.id}:{sample_event.category}" + assert envelope["id"] == sample_event.id # CE spec: bare event.id assert envelope["source"] == sample_config.cloudevents.source assert envelope["type"] == "central.wx.alert.severe_thunderstorm_warning.v1" assert envelope["specversion"] == "1.0" @@ -76,6 +82,77 @@ class TestCloudEventsWire: assert envelope["datacontenttype"] == "application/json" assert "data" in envelope + def test_msgid_disambiguates_incident_vs_perimeter_same_guid( + self, sample_geo: Geo, sample_config: Config + ) -> None: + """v0.10.8 regression guard: Summit Creek dedup collision. + + On 2026-06-07 a wfigs poll yielded both an incident envelope and + a perimeter envelope for the same IRWIN ``{6B1C6EB1-...}`` within + JetStream's 2-min dedup window on CENTRAL_FIRE. Both publishes + carried the bare IRWIN as ``Nats-Msg-Id`` (JetStream's dedup key + is per-stream, NOT per-subject), so the second publish was + silently dropped -- supervisor logs showed 'Published event' for + both but only one landed in the stream. The fix adds an + ``:{event.category}`` suffix to the msg_id so the two are + distinct without changing ``envelope["id"]`` (CE spec) or the + adapter-side event.id contract. + """ + irwin = "{6B1C6EB1-30F7-4613-9C58-4801DC8FD822}" # Summit Creek + incident = Event( + id=irwin, + adapter="wfigs_incidents", + category="fire.incident.wildfire", + time=datetime(2026, 6, 7, 6, 25, 5, tzinfo=timezone.utc), + severity=2, + geo=sample_geo, + data={}, + ) + perimeter = Event( + id=irwin, + adapter="wfigs_perimeters", + category="fire.perimeter.wildfire", + time=datetime(2026, 6, 7, 6, 25, 5, tzinfo=timezone.utc), + severity=2, + geo=sample_geo, + data={}, + ) + + _, incident_msgid = wrap_event(incident, sample_config) + _, perimeter_msgid = wrap_event(perimeter, sample_config) + + assert incident_msgid != perimeter_msgid + assert incident_msgid == f"{irwin}:fire.incident.wildfire" + assert perimeter_msgid == f"{irwin}:fire.perimeter.wildfire" + + @pytest.mark.parametrize("category", [ + "fire.incident.wildfire", + "fire.perimeter.wildfire", + "wx.alert.severe_thunderstorm_warning", + "quake.usgs_quake", + "hydro.00060.usgs.13162225", + ]) + def test_msgid_shape_is_id_colon_category( + self, sample_geo: Geo, sample_config: Config, category: str + ) -> None: + """Every wrap_event call returns msg_id == ``f'{id}:{category}'``. + + Covers both multi-class adapters (where the discriminator is + load-bearing) and single-class adapters (where it's a no-op for + dedup but still consistent shape across the codebase). + """ + event = Event( + id=f"test:{category}:42", + adapter="test", + category=category, + time=datetime(2026, 6, 7, 6, 25, 5, tzinfo=timezone.utc), + severity=1, + geo=sample_geo, + data={}, + ) + _, msg_id = wrap_event(event, sample_config) + assert msg_id == f"{event.id}:{category}" + def test_extension_attributes_lowercase( self, sample_event: Event, sample_config: Config ) -> None: