refactor(adapters): extract inline dedup onto SourceAdapter base (v0.9.19)

Finishes the v0.9.1 dedup-mixin extraction (9cd2183) for the remaining 9
adapters. is_published/mark_published were byte-identical to the base;
sweep_old_ids windows preserved via dedup_sweep_days class attr where != 14
(usgs_quake=7, eonet=30, gdacs=30, firms=2 == old 48h). Pure behavior-
preserving refactor; suite holds 900/1.

nws left fully inline (unscoped global sweep) with a TODO(v0.9.19.1) marker
-- its fix carries a real behavior change, shipped separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-27 07:12:54 +00:00
commit 417c7abad6
10 changed files with 5 additions and 336 deletions

View file

@ -147,6 +147,7 @@ class EONETAdapter(SourceAdapter):
api_key_field = None
wizard_order = None
default_cadence_s = 1800
dedup_sweep_days = 30
# Event lat/lon mirrored from Geo.centroid into event.data (see poll()).
enrichment_locations = [("latitude", "longitude")]
@ -220,42 +221,6 @@ class EONETAdapter(SourceAdapter):
},
)
def is_published(self, dedup_key: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, dedup_key),
)
return cur.fetchone() is not None
def mark_published(self, dedup_key: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, dedup_key),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("EONET swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
# event.category is "disaster.eonet.<subject_category>[.removed]"
parts = event.category.split(".")

View file

@ -69,6 +69,7 @@ class FIRMSAdapter(SourceAdapter):
api_key_field = "api_key_alias"
wizard_order = 2
default_cadence_s = 300
dedup_sweep_days = 2 # 48h dedup window (FIRMS hotspots churn fast)
# Enrichment pilot (PR J): FIRMS rows carry top-level latitude/longitude.
enrichment_locations = [("latitude", "longitude")]
@ -203,45 +204,6 @@ class FIRMSAdapter(SourceAdapter):
self._db = None
logger.info("FIRMS adapter shut down")
def is_published(self, stable_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, stable_id),
)
return cur.fetchone() is not None
def mark_published(self, stable_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, stable_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 48 hours. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-48 hours')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("FIRMS swept old dedup entries", extra={"count": count})
return count
def _build_stable_id(
self, satellite: str, acq_date: str, acq_time: str, lat: float, lon: float
) -> str:

View file

@ -149,6 +149,7 @@ class GDACSAdapter(SourceAdapter):
api_key_field = None
wizard_order = None
default_cadence_s = 600
dedup_sweep_days = 30
# Event lat/lon mirrored from Geo.centroid into event.data (see poll()).
enrichment_locations = [("latitude", "longitude")]
@ -204,42 +205,6 @@ class GDACSAdapter(SourceAdapter):
)
logger.info("GDACS config updated", extra={"event_types": self.event_types})
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("GDACS swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
parts = event.category.split(".")
country_subj = subject_for_country(event.data.get("country"))

View file

@ -418,6 +418,7 @@ class NWSAdapter(SourceAdapter):
)
self._db.commit()
# TODO(v0.9.19.1): unscoped global DELETE -- clobbers other adapters' dedup rows; scope to adapter + move to base.
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 8 days. Returns count deleted."""
if not self._db:

View file

@ -88,42 +88,6 @@ class SWPCAlertsAdapter(SourceAdapter):
async def apply_config(self, new_config: AdapterConfig) -> None:
logger.info("SWPC alerts config updated")
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("SWPC alerts swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
product_id = event.data.get("product_id") or "unknown"
return f"central.space.alert.{product_id.lower()}"

View file

@ -88,42 +88,6 @@ class SWPCKindexAdapter(SourceAdapter):
async def apply_config(self, new_config: AdapterConfig) -> None:
logger.info("SWPC kindex config updated")
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("SWPC kindex swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
return "central.space.kindex"

View file

@ -87,42 +87,6 @@ class SWPCProtonsAdapter(SourceAdapter):
async def apply_config(self, new_config: AdapterConfig) -> None:
logger.info("SWPC protons config updated")
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("SWPC protons swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
return "central.space.proton_flux"

View file

@ -78,6 +78,7 @@ class USGSQuakeAdapter(SourceAdapter):
requires_api_key = None
wizard_order = 3
default_cadence_s = 60
dedup_sweep_days = 7
# Epicenter lat/lon are top-level keys in event.data (see _build_event).
enrichment_locations = [("latitude", "longitude")]
@ -200,45 +201,6 @@ class USGSQuakeAdapter(SourceAdapter):
self._db = None
logger.info("USGS quake adapter shut down")
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 7 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-7 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("USGS quake swept old dedup entries", extra={"count": count})
return count
def _build_url(self) -> str:
"""Build USGS GeoJSON feed URL."""
return f"{USGS_FEED_BASE}/{self._feed}.geojson"

View file

@ -133,45 +133,6 @@ class WFIGSIncidentsAdapter(SourceAdapter):
extra={"region": self.region.model_dump() if self.region else None},
)
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 14 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("WFIGS incidents swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for an event."""
# Removal events have a different subject pattern

View file

@ -147,45 +147,6 @@ class WFIGSPerimetersAdapter(SourceAdapter):
extra={"region": self.region.model_dump() if self.region else None},
)
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 14 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for an event."""
# Removal events have a different subject pattern