fix: swpc geomag dedup — suppress swpc_alerts/kindex double-broadcast within 10-min window per G-scale

In-memory dict keyed on scale_code (G1-G5) with 600s suppression window.
Only geomag events are deduplicated — flare and proton sub-types have no
cross-adapter overlap. Suppressed events still persist to swpc_events and
event_log (handled=0) for audit trail.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson (via Claude) 2026-06-09 14:48:35 +00:00
commit 3ff819eed9

View file

@ -54,6 +54,14 @@ _S_SCALE_THRESHOLDS = [
]
# Geomag cross-sub-adapter dedup: swpc_alerts and swpc_kindex can both
# fire for the same G-storm. Suppress the second broadcast for the same
# G-scale within this window. In-memory dict keyed on scale_code;
# cleared on process restart (acceptable — worst case one dup on restart).
GEOMAG_DEDUP_WINDOW_SECONDS = 600
_geomag_recent: dict[str, float] = {} # scale_code -> broadcast_ts
def _trunc(s: str, limit: int = 120) -> str:
"""Truncate *s* at the last word boundary at or before *limit* chars."""
if len(s) <= limit:
@ -296,6 +304,25 @@ def handle_swpc(envelope: dict, subject: str,
table_name="swpc_events", table_pk=event_id)
return None
# Geomag cross-sub-adapter coalescing guard.
if event_kind == "geomag" and scale_code:
prev_ts = _geomag_recent.get(scale_code)
if prev_ts is not None and (now - prev_ts) < GEOMAG_DEDUP_WINDOW_SECONDS:
logger.debug(
"swpc_handler: geomag dedup — suppressing %s from %s "
"(already broadcast %.0fs ago)",
scale_code, adapter, now - prev_ts,
)
# Still persist + log, but no broadcast.
_upsert_swpc(conn, event_id=event_id, adapter=adapter,
payload_json=payload_json, occurred_at=occurred_at or now,
first_seen_at=now, set_last_broadcast=False)
_log_event(conn, now=now, source="swpc", category=category_raw,
severity_word=severity_word, event_id_external=event_id,
subject=subject, handled=0,
table_name="swpc_events", table_pk=event_id)
return None
# Broadcast-worthy. Per-event dedup + commit pattern.
log_id = _log_event_returning_id(
conn, now=now, source="swpc", category=category_raw,
@ -324,12 +351,16 @@ def handle_swpc(envelope: dict, subject: str,
first_seen_at=now, set_last_broadcast=False)
wire = _render(event_kind, scale_code, label, scalar_str,
is_update=False, detail=_detail, time_tag=_time_tag)
if event_kind == "geomag" and scale_code:
_geomag_recent[scale_code] = now
_attach_commit(data, event_id=event_id, event_log_row_id=log_id)
return wire
if row["last_broadcast_at"] is None:
wire = _render(event_kind, scale_code, label, scalar_str,
is_update=False, detail=_detail, time_tag=_time_tag)
if event_kind == "geomag" and scale_code:
_geomag_recent[scale_code] = now
_attach_commit(data, event_id=event_id, event_log_row_id=log_id)
return wire