mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
fix(wzdx): drop 'unknown' direction from subject + extract dedup mixin (v0.9.1)
Two v0.9.0 fast-follows. Production code; central-supervisor + central-gui restart (adapter base change + template change). No migration, no new stream. (a) Work-zone subject + detail no longer leak vehicle direction "unknown" (common in AZ mcdot etc.) -- gated on direction not in (None, "unknown") in both wzdx partials. Was "Work zone on MORELAND ST unknown". (b) is_published/mark_published/sweep_old_ids extracted from per-adapter inline copies onto the SourceAdapter base (beside bump_last_seen); a dedup_sweep_days class attr parameterizes the retention window (NWIS=30, default=14). Inline copies deleted from inciweb/nwis/wzdx; the other 10 adapters keep theirs as a future cleanup. Net dedup code down ~52 lines. Full suite: 744 passed, 1 skipped (central and unprivileged zvx, 3x each). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c6c5367ccf
commit
9cd2183cc3
9 changed files with 169 additions and 106 deletions
|
|
@ -270,6 +270,19 @@ class implements the standard `published_ids` bump; adapters using that table
|
||||||
inherit it and need not override. It is a safe no-op when the adapter has no
|
inherit it and need not override. It is a safe no-op when the adapter has no
|
||||||
`_db` handle.
|
`_db` handle.
|
||||||
|
|
||||||
|
**`def is_published(self, event_id: str) -> bool`** — Optional. Returns whether
|
||||||
|
`event_id` is already in the `published_ids` dedup table. The base class
|
||||||
|
implements the standard query; adapters using that table inherit it. Safe
|
||||||
|
(returns `False`) when the adapter has no `_db` handle.
|
||||||
|
|
||||||
|
**`def mark_published(self, event_id: str) -> None`** — Optional. Records
|
||||||
|
`event_id` as published (refreshing `last_seen` on re-publish). Base-class
|
||||||
|
default operates on `published_ids`; safe no-op without `_db`.
|
||||||
|
|
||||||
|
**`def sweep_old_ids(self) -> int`** — Optional. Purges dedup rows older than
|
||||||
|
`dedup_sweep_days` (default 14; NWIS overrides to 30) and returns the count
|
||||||
|
deleted. Base-class default; safe no-op (returns 0) without `_db`.
|
||||||
|
|
||||||
**`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** —
|
**`async def preview_for_settings(self, settings: BaseModel) -> list[dict] | None`** —
|
||||||
Optional. The settings-page preview hook. The default returns `None` (no
|
Optional. The settings-page preview hook. The default returns `None` (no
|
||||||
preview). See [§11](#11-settings-preview-hook) for the contract.
|
preview). See [§11](#11-settings-preview-hook) for the contract.
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
"""Base adapter interface for event sources."""
|
"""Base adapter interface for event sources."""
|
||||||
|
|
||||||
|
import logging
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from collections.abc import AsyncIterator
|
from collections.abc import AsyncIterator
|
||||||
from typing import TYPE_CHECKING, Literal
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
@ -11,6 +12,8 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
from central.models import Event
|
from central.models import Event
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class SourceAdapter(ABC):
|
class SourceAdapter(ABC):
|
||||||
"""
|
"""
|
||||||
|
|
@ -92,6 +95,56 @@ class SourceAdapter(ABC):
|
||||||
"""Optional lifecycle hook called on graceful shutdown."""
|
"""Optional lifecycle hook called on graceful shutdown."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
dedup_sweep_days: int = 14
|
||||||
|
"""Retention window (days) for the ``published_ids`` dedup table; the
|
||||||
|
inherited ``sweep_old_ids`` deletes rows older than this. Override per
|
||||||
|
adapter (NWIS uses 30; most use the 14-day default)."""
|
||||||
|
|
||||||
|
def is_published(self, event_id: str) -> bool:
|
||||||
|
"""True if ``event_id`` is already recorded. No-op-safe without ``_db``."""
|
||||||
|
db = getattr(self, "_db", None)
|
||||||
|
if db is None:
|
||||||
|
return False
|
||||||
|
cur = db.execute(
|
||||||
|
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
||||||
|
(self.name, event_id),
|
||||||
|
)
|
||||||
|
return cur.fetchone() is not None
|
||||||
|
|
||||||
|
def mark_published(self, event_id: str) -> None:
|
||||||
|
"""Record ``event_id`` as published; refresh ``last_seen`` on re-publish."""
|
||||||
|
db = getattr(self, "_db", None)
|
||||||
|
if db is None:
|
||||||
|
return
|
||||||
|
db.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
||||||
|
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||||
|
ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP
|
||||||
|
""",
|
||||||
|
(self.name, event_id),
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
def sweep_old_ids(self) -> int:
|
||||||
|
"""Purge dedup rows older than ``dedup_sweep_days``; returns count deleted."""
|
||||||
|
db = getattr(self, "_db", None)
|
||||||
|
if db is None:
|
||||||
|
return 0
|
||||||
|
cur = db.execute(
|
||||||
|
"DELETE FROM published_ids WHERE adapter = ? "
|
||||||
|
"AND last_seen < datetime('now', ?)",
|
||||||
|
(self.name, f"-{self.dedup_sweep_days} days"),
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
count = cur.rowcount
|
||||||
|
if count > 0:
|
||||||
|
logger.info(
|
||||||
|
"Swept old dedup entries",
|
||||||
|
extra={"adapter": self.name, "count": count},
|
||||||
|
)
|
||||||
|
return count
|
||||||
|
|
||||||
def bump_last_seen(self, event_id: str) -> None:
|
def bump_last_seen(self, event_id: str) -> None:
|
||||||
"""Refresh the dedup ``last_seen`` for an already-published event.
|
"""Refresh the dedup ``last_seen`` for an already-published event.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -246,45 +246,6 @@ class InciWebAdapter(SourceAdapter):
|
||||||
extra={"region": self.region.model_dump() if self.region else None},
|
extra={"region": self.region.model_dump() if self.region else None},
|
||||||
)
|
)
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
"""Check if an event has already been published."""
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
"""Mark an event as published."""
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
"""Remove published_ids older than 14 days. Returns count deleted."""
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("InciWeb swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
"""Compute NATS subject for an event."""
|
"""Compute NATS subject for an event."""
|
||||||
state = event.geo.primary_region
|
state = event.geo.primary_region
|
||||||
|
|
|
||||||
|
|
@ -142,6 +142,7 @@ class NWISAdapter(SourceAdapter):
|
||||||
|
|
||||||
# Continuous high-volume water-gauge feed -> the /telemetry tab, not /events.
|
# Continuous high-volume water-gauge feed -> the /telemetry tab, not /events.
|
||||||
data_class = "telemetry"
|
data_class = "telemetry"
|
||||||
|
dedup_sweep_days = 30 # telemetry keeps dedup ids longer than the 14-day default
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|
@ -218,42 +219,6 @@ class NWISAdapter(SourceAdapter):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
def is_published(self, dedup_key: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, dedup_key),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, dedup_key: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, dedup_key),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("NWIS swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
# event.category is "hydro.<parameter_code>.<agency>.<bare_site_no>"
|
# event.category is "hydro.<parameter_code>.<agency>.<bare_site_no>"
|
||||||
parts = event.category.split(".")
|
parts = event.category.split(".")
|
||||||
|
|
|
||||||
|
|
@ -170,35 +170,6 @@ class WZDxAdapter(SourceAdapter):
|
||||||
self._states = self._read_states(new_config)
|
self._states = self._read_states(new_config)
|
||||||
logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None})
|
logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None})
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"INSERT INTO published_ids (adapter, event_id) VALUES (?, ?) "
|
|
||||||
"ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
return cur.rowcount
|
|
||||||
|
|
||||||
@retry(
|
@retry(
|
||||||
stop=stop_after_attempt(3),
|
stop=stop_after_attempt(3),
|
||||||
wait=wait_exponential_jitter(initial=1, max=30),
|
wait=wait_exponential_jitter(initial=1, max=30),
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@
|
||||||
shape (event_status, no lanes) both render without error. #}
|
shape (event_status, no lanes) both render without error. #}
|
||||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||||
{% set roads = d.get('road_names') or [] %}
|
{% set roads = d.get('road_names') or [] %}
|
||||||
{% if roads %}<dt>Road</dt><dd>{{ roads | join(', ') }}{% if d.get('direction') %} ({{ d.direction }}){% endif %}</dd>{% endif %}
|
{% if roads %}<dt>Road</dt><dd>{{ roads | join(', ') }}{% if d.get('direction') and d.direction != 'unknown' %} ({{ d.direction }}){% endif %}</dd>{% endif %}
|
||||||
{% if d.get('vehicle_impact') is not none %}<dt>Vehicle impact</dt><dd>{{ d.vehicle_impact }}</dd>{% endif %}
|
{% if d.get('vehicle_impact') is not none %}<dt>Vehicle impact</dt><dd>{{ d.vehicle_impact }}</dd>{% endif %}
|
||||||
{% if d.get('event_status') is not none %}<dt>Status</dt><dd>{{ d.event_status }}</dd>{% endif %}
|
{% if d.get('event_status') is not none %}<dt>Status</dt><dd>{{ d.event_status }}</dd>{% endif %}
|
||||||
{% if d.get('start_date') is not none %}<dt>Starts</dt><dd>{{ d.start_date }}</dd>{% endif %}
|
{% if d.get('start_date') is not none %}<dt>Starts</dt><dd>{{ d.start_date }}</dd>{% endif %}
|
||||||
|
|
|
||||||
|
|
@ -4,4 +4,4 @@
|
||||||
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
{% set d = (event.data.get('data') or {}).get('data') or {} %}
|
||||||
{%- set roads = d.get('road_names') or [] -%}
|
{%- set roads = d.get('road_names') or [] -%}
|
||||||
{%- set road = roads[0] if roads else None -%}
|
{%- set road = roads[0] if roads else None -%}
|
||||||
Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') %} {{ d.direction }}{% endif %}
|
Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') and d.direction != 'unknown' %} {{ d.direction }}{% endif %}
|
||||||
|
|
|
||||||
92
tests/test_dedup_mixin.py
Normal file
92
tests/test_dedup_mixin.py
Normal file
|
|
@ -0,0 +1,92 @@
|
||||||
|
"""Tests for the shared dedup mixin on SourceAdapter (v0.9.1 extraction).
|
||||||
|
|
||||||
|
is_published / mark_published / sweep_old_ids moved from per-adapter inline
|
||||||
|
copies onto the base; dedup_sweep_days parameterizes the retention window
|
||||||
|
(NWIS keeps 30 days, the default is 14).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
from central.adapter import SourceAdapter
|
||||||
|
from central.adapters.inciweb import InciWebAdapter
|
||||||
|
from central.adapters.nwis import NWISAdapter
|
||||||
|
from central.adapters.wzdx import WZDxAdapter
|
||||||
|
|
||||||
|
_DDL = (
|
||||||
|
"CREATE TABLE published_ids (adapter TEXT, event_id TEXT, "
|
||||||
|
"first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
|
||||||
|
"last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (adapter, event_id))"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _StubAdapter(SourceAdapter):
|
||||||
|
"""Minimal concrete adapter exercising only the inherited dedup mixin."""
|
||||||
|
|
||||||
|
name = "stub"
|
||||||
|
display_name = "Stub"
|
||||||
|
description = "test"
|
||||||
|
settings_schema = None
|
||||||
|
default_cadence_s = 600
|
||||||
|
|
||||||
|
async def poll(self): # pragma: no cover - not exercised
|
||||||
|
return
|
||||||
|
yield
|
||||||
|
|
||||||
|
async def apply_config(self, new_config): # pragma: no cover
|
||||||
|
...
|
||||||
|
|
||||||
|
def subject_for(self, event): # pragma: no cover
|
||||||
|
return "x"
|
||||||
|
|
||||||
|
|
||||||
|
def _adapter(tmp_path, dbname="d.db", sweep_days=None):
|
||||||
|
a = _StubAdapter()
|
||||||
|
a._db = sqlite3.connect(tmp_path / dbname)
|
||||||
|
a._db.execute(_DDL)
|
||||||
|
a._db.commit()
|
||||||
|
if sweep_days is not None:
|
||||||
|
a.dedup_sweep_days = sweep_days
|
||||||
|
return a
|
||||||
|
|
||||||
|
|
||||||
|
def test_dedup_roundtrip(tmp_path):
|
||||||
|
a = _adapter(tmp_path)
|
||||||
|
assert a.is_published("e1") is False
|
||||||
|
a.mark_published("e1")
|
||||||
|
assert a.is_published("e1") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_db_is_safe():
|
||||||
|
a = _StubAdapter()
|
||||||
|
a._db = None
|
||||||
|
assert a.is_published("e") is False
|
||||||
|
a.mark_published("e") # no raise
|
||||||
|
assert a.sweep_old_ids() == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_sweep_respects_dedup_sweep_days(tmp_path):
|
||||||
|
a = _adapter(tmp_path, "a.db", sweep_days=14)
|
||||||
|
a._db.execute(
|
||||||
|
"INSERT INTO published_ids (adapter, event_id, last_seen) "
|
||||||
|
"VALUES ('stub', 'old', datetime('now','-20 days'))"
|
||||||
|
)
|
||||||
|
a._db.commit()
|
||||||
|
assert a.sweep_old_ids() == 1 # 20d > 14d -> swept
|
||||||
|
|
||||||
|
b = _adapter(tmp_path, "b.db", sweep_days=30)
|
||||||
|
b._db.execute(
|
||||||
|
"INSERT INTO published_ids (adapter, event_id, last_seen) "
|
||||||
|
"VALUES ('stub', 'old', datetime('now','-20 days'))"
|
||||||
|
)
|
||||||
|
b._db.commit()
|
||||||
|
assert b.sweep_old_ids() == 0 # 20d < 30d -> kept
|
||||||
|
|
||||||
|
|
||||||
|
def test_named_adapters_inherit_base():
|
||||||
|
for cls in (InciWebAdapter, NWISAdapter, WZDxAdapter):
|
||||||
|
for m in ("is_published", "mark_published", "sweep_old_ids"):
|
||||||
|
assert m not in cls.__dict__, f"{cls.__name__} still overrides {m}"
|
||||||
|
assert getattr(cls, m) is getattr(SourceAdapter, m)
|
||||||
|
assert NWISAdapter.dedup_sweep_days == 30
|
||||||
|
assert WZDxAdapter.dedup_sweep_days == 14
|
||||||
|
assert InciWebAdapter.dedup_sweep_days == 14
|
||||||
|
|
@ -150,3 +150,11 @@ def test_summary_partial_renders_subject():
|
||||||
flat = {"road_names": ["I-80"], "direction": "eastbound"}
|
flat = {"road_names": ["I-80"], "direction": "eastbound"}
|
||||||
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
|
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
|
||||||
assert _derive_subject(row) == "Work zone on I-80 eastbound"
|
assert _derive_subject(row) == "Work zone on I-80 eastbound"
|
||||||
|
|
||||||
|
|
||||||
|
def test_summary_omits_unknown_direction():
|
||||||
|
# direction "unknown" (common in e.g. AZ mcdot) must not leak into the subject.
|
||||||
|
from central.gui.routes import _derive_subject
|
||||||
|
flat = {"road_names": ["I-80"], "direction": "unknown"}
|
||||||
|
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
|
||||||
|
assert _derive_subject(row) == "Work zone on I-80"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue