From 3ff819eed945d3907d0bb9ab89b0bfbc0055173c Mon Sep 17 00:00:00 2001 From: "Matt Johnson (via Claude)" Date: Tue, 9 Jun 2026 14:48:35 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20swpc=20geomag=20dedup=20=E2=80=94=20supp?= =?UTF-8?q?ress=20swpc=5Falerts/kindex=20double-broadcast=20within=2010-mi?= =?UTF-8?q?n=20window=20per=20G-scale?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In-memory dict keyed on scale_code (G1-G5) with 600s suppression window. Only geomag events are deduplicated — flare and proton sub-types have no cross-adapter overlap. Suppressed events still persist to swpc_events and event_log (handled=0) for audit trail. Co-Authored-By: Claude Opus 4.6 --- meshai/central/swpc_handler.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/meshai/central/swpc_handler.py b/meshai/central/swpc_handler.py index 5cb94ac..c28bb28 100644 --- a/meshai/central/swpc_handler.py +++ b/meshai/central/swpc_handler.py @@ -54,6 +54,14 @@ _S_SCALE_THRESHOLDS = [ ] +# Geomag cross-sub-adapter dedup: swpc_alerts and swpc_kindex can both +# fire for the same G-storm. Suppress the second broadcast for the same +# G-scale within this window. In-memory dict keyed on scale_code; +# cleared on process restart (acceptable — worst case one dup on restart). +GEOMAG_DEDUP_WINDOW_SECONDS = 600 +_geomag_recent: dict[str, float] = {} # scale_code -> broadcast_ts + + def _trunc(s: str, limit: int = 120) -> str: """Truncate *s* at the last word boundary at or before *limit* chars.""" if len(s) <= limit: @@ -296,6 +304,25 @@ def handle_swpc(envelope: dict, subject: str, table_name="swpc_events", table_pk=event_id) return None + # Geomag cross-sub-adapter coalescing guard. + if event_kind == "geomag" and scale_code: + prev_ts = _geomag_recent.get(scale_code) + if prev_ts is not None and (now - prev_ts) < GEOMAG_DEDUP_WINDOW_SECONDS: + logger.debug( + "swpc_handler: geomag dedup — suppressing %s from %s " + "(already broadcast %.0fs ago)", + scale_code, adapter, now - prev_ts, + ) + # Still persist + log, but no broadcast. + _upsert_swpc(conn, event_id=event_id, adapter=adapter, + payload_json=payload_json, occurred_at=occurred_at or now, + first_seen_at=now, set_last_broadcast=False) + _log_event(conn, now=now, source="swpc", category=category_raw, + severity_word=severity_word, event_id_external=event_id, + subject=subject, handled=0, + table_name="swpc_events", table_pk=event_id) + return None + # Broadcast-worthy. Per-event dedup + commit pattern. log_id = _log_event_returning_id( conn, now=now, source="swpc", category=category_raw, @@ -324,12 +351,16 @@ def handle_swpc(envelope: dict, subject: str, first_seen_at=now, set_last_broadcast=False) wire = _render(event_kind, scale_code, label, scalar_str, is_update=False, detail=_detail, time_tag=_time_tag) + if event_kind == "geomag" and scale_code: + _geomag_recent[scale_code] = now _attach_commit(data, event_id=event_id, event_log_row_id=log_id) return wire if row["last_broadcast_at"] is None: wire = _render(event_kind, scale_code, label, scalar_str, is_update=False, detail=_detail, time_tag=_time_tag) + if event_kind == "geomag" and scale_code: + _geomag_recent[scale_code] = now _attach_commit(data, event_id=event_id, event_log_row_id=log_id) return wire