From 5c6d77381b60d1e8d80a0fbbc38da52d897e0dc7 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Wed, 27 May 2026 07:23:05 +0000 Subject: [PATCH] fix(nws): scope sweep_old_ids to its own adapter (v0.9.19.1) NWSAdapter.sweep_old_ids ran an unscoped global DELETE FROM published_ids (no adapter = ? filter), so each per-adapter sweep cycle purged EVERY adapter dedup rows older than 8 days -- capping dedup memory at ~8d for the 8 adapters whose window exceeds it (eonet/gdacs/nwis 30d; swpc_kindex/ protons/alerts + wfigs_incidents/perimeters 14d). Events that went silent past 8d then re-listed were re-published as downstream duplicates. Extract the 3 dedup methods onto the SourceAdapter base (finishing v0.9.19; is_published/mark_published were byte-identical) and preserve the 8-day window via dedup_sweep_days = 8, so the inherited sweep scopes to adapter=?. Effect: each adapter retains dedup to its configured window -> fewer downstream duplicates. Retention-window scoping, NOT a dedup-forward-only change (no event-shape change, no migration/backfill). Adds a regression guard asserting a foreign adapter row survives an nws sweep. Suite 900->901/1. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/central/adapters/nws.py | 37 +-------------------------------- tests/test_nws_normalization.py | 31 +++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 4ba2819..8fa11c5 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -211,6 +211,7 @@ class NWSAdapter(SourceAdapter): requires_api_key = None wizard_order = 1 default_cadence_s = 60 + dedup_sweep_days = 8 # Alerts cover forecast zones/counties (polygons), not a single point. enrichment_locations = [] @@ -393,42 +394,6 @@ class NWSAdapter(SourceAdapter): ) self._db.commit() - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id) - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id) - ) - self._db.commit() - - # TODO(v0.9.19.1): unscoped global DELETE -- clobbers other adapters' dedup rows; scope to adapter + move to base. - def sweep_old_ids(self) -> int: - """Remove published_ids older than 8 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE last_seen < datetime('now', '-8 days')" - ) - self._db.commit() - return cur.rowcount - @retry( stop=stop_after_attempt(5), wait=wait_exponential_jitter(initial=1, max=60), diff --git a/tests/test_nws_normalization.py b/tests/test_nws_normalization.py index 706ce1d..701b5be 100644 --- a/tests/test_nws_normalization.py +++ b/tests/test_nws_normalization.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from pathlib import Path +import sqlite3 from unittest.mock import MagicMock import pytest @@ -353,6 +354,36 @@ class TestDeduplication: assert event2 is not None assert event1.id == event2.id + def test_sweep_only_deletes_own_adapter_rows( + self, adapter: NWSAdapter, tmp_path: Path + ) -> None: + """Regression (v0.9.19.1): sweep_old_ids must be adapter-scoped. + + NWS previously ran an unscoped global DELETE that purged *every* + adapter's published_ids older than 8 days; the inherited base method + scopes the delete to ``adapter = ?``. + """ + adapter._db = sqlite3.connect(tmp_path / "dedup.db") + adapter._db.execute( + "CREATE TABLE published_ids (adapter TEXT, event_id TEXT, " + "first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" + ) + for adp in ("nws", "eonet"): + adapter._db.execute( + "INSERT INTO published_ids (adapter, event_id, last_seen) " + "VALUES (?, 'old', datetime('now', '-9 days'))", + (adp,), + ) + adapter._db.commit() + assert adapter.dedup_sweep_days == 8 + assert adapter.sweep_old_ids() == 1 # only the nws row + survivors = { + r[0] for r in adapter._db.execute("SELECT adapter FROM published_ids") + } + assert survivors == {"eonet"} # foreign adapter's row survives + class TestGeometry: """Tests for geometry computation."""