From 9cd2183cc3650dc0610b3b79e0d87ba829fba351 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Mon, 25 May 2026 21:18:07 +0000 Subject: [PATCH] fix(wzdx): drop 'unknown' direction from subject + extract dedup mixin (v0.9.1) Two v0.9.0 fast-follows. Production code; central-supervisor + central-gui restart (adapter base change + template change). No migration, no new stream. (a) Work-zone subject + detail no longer leak vehicle direction "unknown" (common in AZ mcdot etc.) -- gated on direction not in (None, "unknown") in both wzdx partials. Was "Work zone on MORELAND ST unknown". (b) is_published/mark_published/sweep_old_ids extracted from per-adapter inline copies onto the SourceAdapter base (beside bump_last_seen); a dedup_sweep_days class attr parameterizes the retention window (NWIS=30, default=14). Inline copies deleted from inciweb/nwis/wzdx; the other 10 adapters keep theirs as a future cleanup. Net dedup code down ~52 lines. Full suite: 744 passed, 1 skipped (central and unprivileged zvx, 3x each). Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/PRODUCER-INTEGRATION.md | 13 +++ src/central/adapter.py | 53 +++++++++++ src/central/adapters/inciweb.py | 39 -------- src/central/adapters/nwis.py | 37 +------- src/central/adapters/wzdx.py | 29 ------ .../gui/templates/_event_rows/wzdx.html | 2 +- .../gui/templates/_event_summaries/wzdx.html | 2 +- tests/test_dedup_mixin.py | 92 +++++++++++++++++++ tests/test_wzdx.py | 8 ++ 9 files changed, 169 insertions(+), 106 deletions(-) create mode 100644 tests/test_dedup_mixin.py diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index 71c8dfe..026bc82 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -270,6 +270,19 @@ class implements the standard `published_ids` bump; adapters using that table inherit it and need not override. It is a safe no-op when the adapter has no `_db` handle. +**`def is_published(self, event_id: str) -> bool`** — Optional. Returns whether +`event_id` is already in the `published_ids` dedup table. The base class +implements the standard query; adapters using that table inherit it. Safe +(returns `False`) when the adapter has no `_db` handle. + +**`def mark_published(self, event_id: str) -> None`** — Optional. Records +`event_id` as published (refreshing `last_seen` on re-publish). Base-class +default operates on `published_ids`; safe no-op without `_db`. + +**`def sweep_old_ids(self) -> int`** — Optional. Purges dedup rows older than +`dedup_sweep_days` (default 14; NWIS overrides to 30) and returns the count +deleted. Base-class default; safe no-op (returns 0) without `_db`. + **`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** — Optional. The settings-page preview hook. The default returns `None` (no preview). See [§11](#11-settings-preview-hook) for the contract. diff --git a/src/central/adapter.py b/src/central/adapter.py index 7500925..2b64034 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -1,5 +1,6 @@ """Base adapter interface for event sources.""" +import logging from abc import ABC, abstractmethod from collections.abc import AsyncIterator from typing import TYPE_CHECKING, Literal @@ -11,6 +12,8 @@ if TYPE_CHECKING: from central.models import Event +logger = logging.getLogger(__name__) + class SourceAdapter(ABC): """ @@ -92,6 +95,56 @@ class SourceAdapter(ABC): """Optional lifecycle hook called on graceful shutdown.""" pass + dedup_sweep_days: int = 14 + """Retention window (days) for the ``published_ids`` dedup table; the + inherited ``sweep_old_ids`` deletes rows older than this. Override per + adapter (NWIS uses 30; most use the 14-day default).""" + + def is_published(self, event_id: str) -> bool: + """True if ``event_id`` is already recorded. No-op-safe without ``_db``.""" + db = getattr(self, "_db", None) + if db is None: + return False + cur = db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Record ``event_id`` as published; refresh ``last_seen`` on re-publish.""" + db = getattr(self, "_db", None) + if db is None: + return + db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + db.commit() + + def sweep_old_ids(self) -> int: + """Purge dedup rows older than ``dedup_sweep_days``; returns count deleted.""" + db = getattr(self, "_db", None) + if db is None: + return 0 + cur = db.execute( + "DELETE FROM published_ids WHERE adapter = ? " + "AND last_seen < datetime('now', ?)", + (self.name, f"-{self.dedup_sweep_days} days"), + ) + db.commit() + count = cur.rowcount + if count > 0: + logger.info( + "Swept old dedup entries", + extra={"adapter": self.name, "count": count}, + ) + return count + def bump_last_seen(self, event_id: str) -> None: """Refresh the dedup ``last_seen`` for an already-published event. diff --git a/src/central/adapters/inciweb.py b/src/central/adapters/inciweb.py index af6bf74..8a35449 100644 --- a/src/central/adapters/inciweb.py +++ b/src/central/adapters/inciweb.py @@ -246,45 +246,6 @@ class InciWebAdapter(SourceAdapter): extra={"region": self.region.model_dump() if self.region else None}, ) - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 14 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("InciWeb swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: """Compute NATS subject for an event.""" state = event.geo.primary_region diff --git a/src/central/adapters/nwis.py b/src/central/adapters/nwis.py index 431c4bb..435a6e2 100644 --- a/src/central/adapters/nwis.py +++ b/src/central/adapters/nwis.py @@ -142,6 +142,7 @@ class NWISAdapter(SourceAdapter): # Continuous high-volume water-gauge feed -> the /telemetry tab, not /events. data_class = "telemetry" + dedup_sweep_days = 30 # telemetry keeps dedup ids longer than the 14-day default def __init__( self, @@ -218,42 +219,6 @@ class NWISAdapter(SourceAdapter): }, ) - def is_published(self, dedup_key: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, dedup_key), - ) - return cur.fetchone() is not None - - def mark_published(self, dedup_key: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, dedup_key), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("NWIS swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: # event.category is "hydro..." parts = event.category.split(".") diff --git a/src/central/adapters/wzdx.py b/src/central/adapters/wzdx.py index 260875f..36b9e37 100644 --- a/src/central/adapters/wzdx.py +++ b/src/central/adapters/wzdx.py @@ -170,35 +170,6 @@ class WZDxAdapter(SourceAdapter): self._states = self._read_states(new_config) logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None}) - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - "INSERT INTO published_ids (adapter, event_id) VALUES (?, ?) " - "ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP", - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - return cur.rowcount - @retry( stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=30), diff --git a/src/central/gui/templates/_event_rows/wzdx.html b/src/central/gui/templates/_event_rows/wzdx.html index f0fb7a8..90e6a59 100644 --- a/src/central/gui/templates/_event_rows/wzdx.html +++ b/src/central/gui/templates/_event_rows/wzdx.html @@ -3,7 +3,7 @@ shape (event_status, no lanes) both render without error. #} {% set d = (event.data.get('data') or {}).get('data') or {} %} {% set roads = d.get('road_names') or [] %} -{% if roads %}
Road
{{ roads | join(', ') }}{% if d.get('direction') %} ({{ d.direction }}){% endif %}
{% endif %} +{% if roads %}
Road
{{ roads | join(', ') }}{% if d.get('direction') and d.direction != 'unknown' %} ({{ d.direction }}){% endif %}
{% endif %} {% if d.get('vehicle_impact') is not none %}
Vehicle impact
{{ d.vehicle_impact }}
{% endif %} {% if d.get('event_status') is not none %}
Status
{{ d.event_status }}
{% endif %} {% if d.get('start_date') is not none %}
Starts
{{ d.start_date }}
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/wzdx.html b/src/central/gui/templates/_event_summaries/wzdx.html index 35bd6b8..486df7b 100644 --- a/src/central/gui/templates/_event_summaries/wzdx.html +++ b/src/central/gui/templates/_event_summaries/wzdx.html @@ -4,4 +4,4 @@ {% set d = (event.data.get('data') or {}).get('data') or {} %} {%- set roads = d.get('road_names') or [] -%} {%- set road = roads[0] if roads else None -%} -Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') %} {{ d.direction }}{% endif %} +Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') and d.direction != 'unknown' %} {{ d.direction }}{% endif %} diff --git a/tests/test_dedup_mixin.py b/tests/test_dedup_mixin.py new file mode 100644 index 0000000..5a4ac9f --- /dev/null +++ b/tests/test_dedup_mixin.py @@ -0,0 +1,92 @@ +"""Tests for the shared dedup mixin on SourceAdapter (v0.9.1 extraction). + +is_published / mark_published / sweep_old_ids moved from per-adapter inline +copies onto the base; dedup_sweep_days parameterizes the retention window +(NWIS keeps 30 days, the default is 14). +""" + +import sqlite3 + +from central.adapter import SourceAdapter +from central.adapters.inciweb import InciWebAdapter +from central.adapters.nwis import NWISAdapter +from central.adapters.wzdx import WZDxAdapter + +_DDL = ( + "CREATE TABLE published_ids (adapter TEXT, event_id TEXT, " + "first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (adapter, event_id))" +) + + +class _StubAdapter(SourceAdapter): + """Minimal concrete adapter exercising only the inherited dedup mixin.""" + + name = "stub" + display_name = "Stub" + description = "test" + settings_schema = None + default_cadence_s = 600 + + async def poll(self): # pragma: no cover - not exercised + return + yield + + async def apply_config(self, new_config): # pragma: no cover + ... + + def subject_for(self, event): # pragma: no cover + return "x" + + +def _adapter(tmp_path, dbname="d.db", sweep_days=None): + a = _StubAdapter() + a._db = sqlite3.connect(tmp_path / dbname) + a._db.execute(_DDL) + a._db.commit() + if sweep_days is not None: + a.dedup_sweep_days = sweep_days + return a + + +def test_dedup_roundtrip(tmp_path): + a = _adapter(tmp_path) + assert a.is_published("e1") is False + a.mark_published("e1") + assert a.is_published("e1") is True + + +def test_no_db_is_safe(): + a = _StubAdapter() + a._db = None + assert a.is_published("e") is False + a.mark_published("e") # no raise + assert a.sweep_old_ids() == 0 + + +def test_sweep_respects_dedup_sweep_days(tmp_path): + a = _adapter(tmp_path, "a.db", sweep_days=14) + a._db.execute( + "INSERT INTO published_ids (adapter, event_id, last_seen) " + "VALUES ('stub', 'old', datetime('now','-20 days'))" + ) + a._db.commit() + assert a.sweep_old_ids() == 1 # 20d > 14d -> swept + + b = _adapter(tmp_path, "b.db", sweep_days=30) + b._db.execute( + "INSERT INTO published_ids (adapter, event_id, last_seen) " + "VALUES ('stub', 'old', datetime('now','-20 days'))" + ) + b._db.commit() + assert b.sweep_old_ids() == 0 # 20d < 30d -> kept + + +def test_named_adapters_inherit_base(): + for cls in (InciWebAdapter, NWISAdapter, WZDxAdapter): + for m in ("is_published", "mark_published", "sweep_old_ids"): + assert m not in cls.__dict__, f"{cls.__name__} still overrides {m}" + assert getattr(cls, m) is getattr(SourceAdapter, m) + assert NWISAdapter.dedup_sweep_days == 30 + assert WZDxAdapter.dedup_sweep_days == 14 + assert InciWebAdapter.dedup_sweep_days == 14 diff --git a/tests/test_wzdx.py b/tests/test_wzdx.py index 2c045ed..dfa5e0b 100644 --- a/tests/test_wzdx.py +++ b/tests/test_wzdx.py @@ -150,3 +150,11 @@ def test_summary_partial_renders_subject(): flat = {"road_names": ["I-80"], "direction": "eastbound"} row = {"adapter": "wzdx", "data": {"data": {"data": flat}}} assert _derive_subject(row) == "Work zone on I-80 eastbound" + + +def test_summary_omits_unknown_direction(): + # direction "unknown" (common in e.g. AZ mcdot) must not leak into the subject. + from central.gui.routes import _derive_subject + flat = {"road_names": ["I-80"], "direction": "unknown"} + row = {"adapter": "wzdx", "data": {"data": {"data": flat}}} + assert _derive_subject(row) == "Work zone on I-80"