Merge pull request #61 from zvx-echo6/feat/wzdx-direction-and-dedup-mixin

fix(wzdx): drop 'unknown' direction from subject + extract dedup mixin (v0.9.1)
This commit is contained in:
malice 2026-05-25 15:19:47 -06:00 committed by GitHub
commit efb2a5799d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 169 additions and 106 deletions

View file

@ -270,6 +270,19 @@ class implements the standard `published_ids` bump; adapters using that table
inherit it and need not override. It is a safe no-op when the adapter has no
`_db` handle.
**`def is_published(self, event_id: str) -> bool`** — Optional. Returns whether
`event_id` is already in the `published_ids` dedup table. The base class
implements the standard query; adapters using that table inherit it. Safe
(returns `False`) when the adapter has no `_db` handle.
**`def mark_published(self, event_id: str) -> None`** — Optional. Records
`event_id` as published (refreshing `last_seen` on re-publish). Base-class
default operates on `published_ids`; safe no-op without `_db`.
**`def sweep_old_ids(self) -> int`** — Optional. Purges dedup rows older than
`dedup_sweep_days` (default 14; NWIS overrides to 30) and returns the count
deleted. Base-class default; safe no-op (returns 0) without `_db`.
**`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** —
Optional. The settings-page preview hook. The default returns `None` (no
preview). See [§11](#11-settings-preview-hook) for the contract.

View file

@ -1,5 +1,6 @@
"""Base adapter interface for event sources."""
import logging
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Literal
@ -11,6 +12,8 @@ if TYPE_CHECKING:
from central.models import Event
logger = logging.getLogger(__name__)
class SourceAdapter(ABC):
"""
@ -92,6 +95,56 @@ class SourceAdapter(ABC):
"""Optional lifecycle hook called on graceful shutdown."""
pass
dedup_sweep_days: int = 14
"""Retention window (days) for the ``published_ids`` dedup table; the
inherited ``sweep_old_ids`` deletes rows older than this. Override per
adapter (NWIS uses 30; most use the 14-day default)."""
def is_published(self, event_id: str) -> bool:
"""True if ``event_id`` is already recorded. No-op-safe without ``_db``."""
db = getattr(self, "_db", None)
if db is None:
return False
cur = db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Record ``event_id`` as published; refresh ``last_seen`` on re-publish."""
db = getattr(self, "_db", None)
if db is None:
return
db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
db.commit()
def sweep_old_ids(self) -> int:
"""Purge dedup rows older than ``dedup_sweep_days``; returns count deleted."""
db = getattr(self, "_db", None)
if db is None:
return 0
cur = db.execute(
"DELETE FROM published_ids WHERE adapter = ? "
"AND last_seen < datetime('now', ?)",
(self.name, f"-{self.dedup_sweep_days} days"),
)
db.commit()
count = cur.rowcount
if count > 0:
logger.info(
"Swept old dedup entries",
extra={"adapter": self.name, "count": count},
)
return count
def bump_last_seen(self, event_id: str) -> None:
"""Refresh the dedup ``last_seen`` for an already-published event.

View file

@ -246,45 +246,6 @@ class InciWebAdapter(SourceAdapter):
extra={"region": self.region.model_dump() if self.region else None},
)
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 14 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("InciWeb swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for an event."""
state = event.geo.primary_region

View file

@ -142,6 +142,7 @@ class NWISAdapter(SourceAdapter):
# Continuous high-volume water-gauge feed -> the /telemetry tab, not /events.
data_class = "telemetry"
dedup_sweep_days = 30 # telemetry keeps dedup ids longer than the 14-day default
def __init__(
self,
@ -218,42 +219,6 @@ class NWISAdapter(SourceAdapter):
},
)
def is_published(self, dedup_key: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, dedup_key),
)
return cur.fetchone() is not None
def mark_published(self, dedup_key: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, dedup_key),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("NWIS swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
# event.category is "hydro.<parameter_code>.<agency>.<bare_site_no>"
parts = event.category.split(".")

View file

@ -170,35 +170,6 @@ class WZDxAdapter(SourceAdapter):
self._states = self._read_states(new_config)
logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None})
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"INSERT INTO published_ids (adapter, event_id) VALUES (?, ?) "
"ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
return cur.rowcount
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),

View file

@ -3,7 +3,7 @@
shape (event_status, no lanes) both render without error. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %}
{% set roads = d.get('road_names') or [] %}
{% if roads %}<dt>Road</dt><dd>{{ roads | join(', ') }}{% if d.get('direction') %} ({{ d.direction }}){% endif %}</dd>{% endif %}
{% if roads %}<dt>Road</dt><dd>{{ roads | join(', ') }}{% if d.get('direction') and d.direction != 'unknown' %} ({{ d.direction }}){% endif %}</dd>{% endif %}
{% if d.get('vehicle_impact') is not none %}<dt>Vehicle impact</dt><dd>{{ d.vehicle_impact }}</dd>{% endif %}
{% if d.get('event_status') is not none %}<dt>Status</dt><dd>{{ d.event_status }}</dd>{% endif %}
{% if d.get('start_date') is not none %}<dt>Starts</dt><dd>{{ d.start_date }}</dd>{% endif %}

View file

@ -4,4 +4,4 @@
{% set d = (event.data.get('data') or {}).get('data') or {} %}
{%- set roads = d.get('road_names') or [] -%}
{%- set road = roads[0] if roads else None -%}
Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') %} {{ d.direction }}{% endif %}
Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') and d.direction != 'unknown' %} {{ d.direction }}{% endif %}

92
tests/test_dedup_mixin.py Normal file
View file

@ -0,0 +1,92 @@
"""Tests for the shared dedup mixin on SourceAdapter (v0.9.1 extraction).
is_published / mark_published / sweep_old_ids moved from per-adapter inline
copies onto the base; dedup_sweep_days parameterizes the retention window
(NWIS keeps 30 days, the default is 14).
"""
import sqlite3
from central.adapter import SourceAdapter
from central.adapters.inciweb import InciWebAdapter
from central.adapters.nwis import NWISAdapter
from central.adapters.wzdx import WZDxAdapter
_DDL = (
"CREATE TABLE published_ids (adapter TEXT, event_id TEXT, "
"first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
"last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (adapter, event_id))"
)
class _StubAdapter(SourceAdapter):
"""Minimal concrete adapter exercising only the inherited dedup mixin."""
name = "stub"
display_name = "Stub"
description = "test"
settings_schema = None
default_cadence_s = 600
async def poll(self): # pragma: no cover - not exercised
return
yield
async def apply_config(self, new_config): # pragma: no cover
...
def subject_for(self, event): # pragma: no cover
return "x"
def _adapter(tmp_path, dbname="d.db", sweep_days=None):
a = _StubAdapter()
a._db = sqlite3.connect(tmp_path / dbname)
a._db.execute(_DDL)
a._db.commit()
if sweep_days is not None:
a.dedup_sweep_days = sweep_days
return a
def test_dedup_roundtrip(tmp_path):
a = _adapter(tmp_path)
assert a.is_published("e1") is False
a.mark_published("e1")
assert a.is_published("e1") is True
def test_no_db_is_safe():
a = _StubAdapter()
a._db = None
assert a.is_published("e") is False
a.mark_published("e") # no raise
assert a.sweep_old_ids() == 0
def test_sweep_respects_dedup_sweep_days(tmp_path):
a = _adapter(tmp_path, "a.db", sweep_days=14)
a._db.execute(
"INSERT INTO published_ids (adapter, event_id, last_seen) "
"VALUES ('stub', 'old', datetime('now','-20 days'))"
)
a._db.commit()
assert a.sweep_old_ids() == 1 # 20d > 14d -> swept
b = _adapter(tmp_path, "b.db", sweep_days=30)
b._db.execute(
"INSERT INTO published_ids (adapter, event_id, last_seen) "
"VALUES ('stub', 'old', datetime('now','-20 days'))"
)
b._db.commit()
assert b.sweep_old_ids() == 0 # 20d < 30d -> kept
def test_named_adapters_inherit_base():
for cls in (InciWebAdapter, NWISAdapter, WZDxAdapter):
for m in ("is_published", "mark_published", "sweep_old_ids"):
assert m not in cls.__dict__, f"{cls.__name__} still overrides {m}"
assert getattr(cls, m) is getattr(SourceAdapter, m)
assert NWISAdapter.dedup_sweep_days == 30
assert WZDxAdapter.dedup_sweep_days == 14
assert InciWebAdapter.dedup_sweep_days == 14

View file

@ -150,3 +150,11 @@ def test_summary_partial_renders_subject():
flat = {"road_names": ["I-80"], "direction": "eastbound"}
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
assert _derive_subject(row) == "Work zone on I-80 eastbound"
def test_summary_omits_unknown_direction():
# direction "unknown" (common in e.g. AZ mcdot) must not leak into the subject.
from central.gui.routes import _derive_subject
flat = {"road_names": ["I-80"], "direction": "unknown"}
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
assert _derive_subject(row) == "Work zone on I-80"