mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 11:54:37 +02:00
fix(nws): scope sweep_old_ids to its own adapter (v0.9.19.1)
NWSAdapter.sweep_old_ids ran an unscoped global DELETE FROM published_ids (no adapter = ? filter), so each per-adapter sweep cycle purged EVERY adapter dedup rows older than 8 days -- capping dedup memory at ~8d for the 8 adapters whose window exceeds it (eonet/gdacs/nwis 30d; swpc_kindex/ protons/alerts + wfigs_incidents/perimeters 14d). Events that went silent past 8d then re-listed were re-published as downstream duplicates. Extract the 3 dedup methods onto the SourceAdapter base (finishing v0.9.19; is_published/mark_published were byte-identical) and preserve the 8-day window via dedup_sweep_days = 8, so the inherited sweep scopes to adapter=?. Effect: each adapter retains dedup to its configured window -> fewer downstream duplicates. Retention-window scoping, NOT a dedup-forward-only change (no event-shape change, no migration/backfill). Adds a regression guard asserting a foreign adapter row survives an nws sweep. Suite 900->901/1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
195a544c36
commit
5c6d77381b
2 changed files with 32 additions and 36 deletions
|
|
@ -211,6 +211,7 @@ class NWSAdapter(SourceAdapter):
|
||||||
requires_api_key = None
|
requires_api_key = None
|
||||||
wizard_order = 1
|
wizard_order = 1
|
||||||
default_cadence_s = 60
|
default_cadence_s = 60
|
||||||
|
dedup_sweep_days = 8
|
||||||
|
|
||||||
# Alerts cover forecast zones/counties (polygons), not a single point.
|
# Alerts cover forecast zones/counties (polygons), not a single point.
|
||||||
enrichment_locations = []
|
enrichment_locations = []
|
||||||
|
|
@ -393,42 +394,6 @@ class NWSAdapter(SourceAdapter):
|
||||||
)
|
)
|
||||||
self._db.commit()
|
self._db.commit()
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
"""Check if an event has already been published."""
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id)
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
"""Mark an event as published."""
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id)
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
# TODO(v0.9.19.1): unscoped global DELETE -- clobbers other adapters' dedup rows; scope to adapter + move to base.
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
"""Remove published_ids older than 8 days. Returns count deleted."""
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE last_seen < datetime('now', '-8 days')"
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
return cur.rowcount
|
|
||||||
|
|
||||||
@retry(
|
@retry(
|
||||||
stop=stop_after_attempt(5),
|
stop=stop_after_attempt(5),
|
||||||
wait=wait_exponential_jitter(initial=1, max=60),
|
wait=wait_exponential_jitter(initial=1, max=60),
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import sqlite3
|
||||||
from unittest.mock import MagicMock
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
@ -353,6 +354,36 @@ class TestDeduplication:
|
||||||
assert event2 is not None
|
assert event2 is not None
|
||||||
assert event1.id == event2.id
|
assert event1.id == event2.id
|
||||||
|
|
||||||
|
def test_sweep_only_deletes_own_adapter_rows(
|
||||||
|
self, adapter: NWSAdapter, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
"""Regression (v0.9.19.1): sweep_old_ids must be adapter-scoped.
|
||||||
|
|
||||||
|
NWS previously ran an unscoped global DELETE that purged *every*
|
||||||
|
adapter's published_ids older than 8 days; the inherited base method
|
||||||
|
scopes the delete to ``adapter = ?``.
|
||||||
|
"""
|
||||||
|
adapter._db = sqlite3.connect(tmp_path / "dedup.db")
|
||||||
|
adapter._db.execute(
|
||||||
|
"CREATE TABLE published_ids (adapter TEXT, event_id TEXT, "
|
||||||
|
"first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
|
||||||
|
"last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
|
||||||
|
"PRIMARY KEY (adapter, event_id))"
|
||||||
|
)
|
||||||
|
for adp in ("nws", "eonet"):
|
||||||
|
adapter._db.execute(
|
||||||
|
"INSERT INTO published_ids (adapter, event_id, last_seen) "
|
||||||
|
"VALUES (?, 'old', datetime('now', '-9 days'))",
|
||||||
|
(adp,),
|
||||||
|
)
|
||||||
|
adapter._db.commit()
|
||||||
|
assert adapter.dedup_sweep_days == 8
|
||||||
|
assert adapter.sweep_old_ids() == 1 # only the nws row
|
||||||
|
survivors = {
|
||||||
|
r[0] for r in adapter._db.execute("SELECT adapter FROM published_ids")
|
||||||
|
}
|
||||||
|
assert survivors == {"eonet"} # foreign adapter's row survives
|
||||||
|
|
||||||
|
|
||||||
class TestGeometry:
|
class TestGeometry:
|
||||||
"""Tests for geometry computation."""
|
"""Tests for geometry computation."""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue