central/tests/test_dedup_mixin.py
Matt Johnson 9cd2183cc3 fix(wzdx): drop 'unknown' direction from subject + extract dedup mixin (v0.9.1)
Two v0.9.0 fast-follows. Production code; central-supervisor + central-gui
restart (adapter base change + template change). No migration, no new stream.

(a) Work-zone subject + detail no longer leak vehicle direction "unknown"
(common in AZ mcdot etc.) -- gated on direction not in (None, "unknown") in both
wzdx partials. Was "Work zone on MORELAND ST unknown".

(b) is_published/mark_published/sweep_old_ids extracted from per-adapter inline
copies onto the SourceAdapter base (beside bump_last_seen); a dedup_sweep_days
class attr parameterizes the retention window (NWIS=30, default=14). Inline
copies deleted from inciweb/nwis/wzdx; the other 10 adapters keep theirs as a
future cleanup. Net dedup code down ~52 lines.

Full suite: 744 passed, 1 skipped (central and unprivileged zvx, 3x each).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 21:18:21 +00:00

92 lines
2.8 KiB
Python

"""Tests for the shared dedup mixin on SourceAdapter (v0.9.1 extraction).
is_published / mark_published / sweep_old_ids moved from per-adapter inline
copies onto the base; dedup_sweep_days parameterizes the retention window
(NWIS keeps 30 days, the default is 14).
"""
import sqlite3
from central.adapter import SourceAdapter
from central.adapters.inciweb import InciWebAdapter
from central.adapters.nwis import NWISAdapter
from central.adapters.wzdx import WZDxAdapter
_DDL = (
"CREATE TABLE published_ids (adapter TEXT, event_id TEXT, "
"first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
"last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (adapter, event_id))"
)
class _StubAdapter(SourceAdapter):
"""Minimal concrete adapter exercising only the inherited dedup mixin."""
name = "stub"
display_name = "Stub"
description = "test"
settings_schema = None
default_cadence_s = 600
async def poll(self): # pragma: no cover - not exercised
return
yield
async def apply_config(self, new_config): # pragma: no cover
...
def subject_for(self, event): # pragma: no cover
return "x"
def _adapter(tmp_path, dbname="d.db", sweep_days=None):
a = _StubAdapter()
a._db = sqlite3.connect(tmp_path / dbname)
a._db.execute(_DDL)
a._db.commit()
if sweep_days is not None:
a.dedup_sweep_days = sweep_days
return a
def test_dedup_roundtrip(tmp_path):
a = _adapter(tmp_path)
assert a.is_published("e1") is False
a.mark_published("e1")
assert a.is_published("e1") is True
def test_no_db_is_safe():
a = _StubAdapter()
a._db = None
assert a.is_published("e") is False
a.mark_published("e") # no raise
assert a.sweep_old_ids() == 0
def test_sweep_respects_dedup_sweep_days(tmp_path):
a = _adapter(tmp_path, "a.db", sweep_days=14)
a._db.execute(
"INSERT INTO published_ids (adapter, event_id, last_seen) "
"VALUES ('stub', 'old', datetime('now','-20 days'))"
)
a._db.commit()
assert a.sweep_old_ids() == 1 # 20d > 14d -> swept
b = _adapter(tmp_path, "b.db", sweep_days=30)
b._db.execute(
"INSERT INTO published_ids (adapter, event_id, last_seen) "
"VALUES ('stub', 'old', datetime('now','-20 days'))"
)
b._db.commit()
assert b.sweep_old_ids() == 0 # 20d < 30d -> kept
def test_named_adapters_inherit_base():
for cls in (InciWebAdapter, NWISAdapter, WZDxAdapter):
for m in ("is_published", "mark_published", "sweep_old_ids"):
assert m not in cls.__dict__, f"{cls.__name__} still overrides {m}"
assert getattr(cls, m) is getattr(SourceAdapter, m)
assert NWISAdapter.dedup_sweep_days == 30
assert WZDxAdapter.dedup_sweep_days == 14
assert InciWebAdapter.dedup_sweep_days == 14