diff --git a/src/central/adapters/eonet.py b/src/central/adapters/eonet.py index 87003c7..d759f69 100644 --- a/src/central/adapters/eonet.py +++ b/src/central/adapters/eonet.py @@ -147,6 +147,7 @@ class EONETAdapter(SourceAdapter): api_key_field = None wizard_order = None default_cadence_s = 1800 + dedup_sweep_days = 30 # Event lat/lon mirrored from Geo.centroid into event.data (see poll()). enrichment_locations = [("latitude", "longitude")] @@ -220,42 +221,6 @@ class EONETAdapter(SourceAdapter): }, ) - def is_published(self, dedup_key: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, dedup_key), - ) - return cur.fetchone() is not None - - def mark_published(self, dedup_key: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, dedup_key), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("EONET swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: # event.category is "disaster.eonet.[.removed]" parts = event.category.split(".") diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 46a0d81..2fda0fb 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -69,6 +69,7 @@ class FIRMSAdapter(SourceAdapter): api_key_field = "api_key_alias" wizard_order = 2 default_cadence_s = 300 + dedup_sweep_days = 2 # 48h dedup window (FIRMS hotspots churn fast) # Enrichment pilot (PR J): FIRMS rows carry top-level latitude/longitude. enrichment_locations = [("latitude", "longitude")] @@ -203,45 +204,6 @@ class FIRMSAdapter(SourceAdapter): self._db = None logger.info("FIRMS adapter shut down") - def is_published(self, stable_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, stable_id), - ) - return cur.fetchone() is not None - - def mark_published(self, stable_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, stable_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 48 hours. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-48 hours')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("FIRMS swept old dedup entries", extra={"count": count}) - return count - def _build_stable_id( self, satellite: str, acq_date: str, acq_time: str, lat: float, lon: float ) -> str: diff --git a/src/central/adapters/gdacs.py b/src/central/adapters/gdacs.py index cff6801..53475ce 100644 --- a/src/central/adapters/gdacs.py +++ b/src/central/adapters/gdacs.py @@ -149,6 +149,7 @@ class GDACSAdapter(SourceAdapter): api_key_field = None wizard_order = None default_cadence_s = 600 + dedup_sweep_days = 30 # Event lat/lon mirrored from Geo.centroid into event.data (see poll()). enrichment_locations = [("latitude", "longitude")] @@ -204,42 +205,6 @@ class GDACSAdapter(SourceAdapter): ) logger.info("GDACS config updated", extra={"event_types": self.event_types}) - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("GDACS swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: parts = event.category.split(".") country_subj = subject_for_country(event.data.get("country")) diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 6c8cf91..4ba2819 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -418,6 +418,7 @@ class NWSAdapter(SourceAdapter): ) self._db.commit() + # TODO(v0.9.19.1): unscoped global DELETE -- clobbers other adapters' dedup rows; scope to adapter + move to base. def sweep_old_ids(self) -> int: """Remove published_ids older than 8 days. Returns count deleted.""" if not self._db: diff --git a/src/central/adapters/swpc_alerts.py b/src/central/adapters/swpc_alerts.py index 8bd3317..9960d1b 100644 --- a/src/central/adapters/swpc_alerts.py +++ b/src/central/adapters/swpc_alerts.py @@ -88,42 +88,6 @@ class SWPCAlertsAdapter(SourceAdapter): async def apply_config(self, new_config: AdapterConfig) -> None: logger.info("SWPC alerts config updated") - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("SWPC alerts swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: product_id = event.data.get("product_id") or "unknown" return f"central.space.alert.{product_id.lower()}" diff --git a/src/central/adapters/swpc_kindex.py b/src/central/adapters/swpc_kindex.py index ab5d29f..7b5d8f8 100644 --- a/src/central/adapters/swpc_kindex.py +++ b/src/central/adapters/swpc_kindex.py @@ -88,42 +88,6 @@ class SWPCKindexAdapter(SourceAdapter): async def apply_config(self, new_config: AdapterConfig) -> None: logger.info("SWPC kindex config updated") - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("SWPC kindex swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: return "central.space.kindex" diff --git a/src/central/adapters/swpc_protons.py b/src/central/adapters/swpc_protons.py index 7c1d84c..2430997 100644 --- a/src/central/adapters/swpc_protons.py +++ b/src/central/adapters/swpc_protons.py @@ -87,42 +87,6 @@ class SWPCProtonsAdapter(SourceAdapter): async def apply_config(self, new_config: AdapterConfig) -> None: logger.info("SWPC protons config updated") - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("SWPC protons swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: return "central.space.proton_flux" diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 4fd8911..28bd973 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -78,6 +78,7 @@ class USGSQuakeAdapter(SourceAdapter): requires_api_key = None wizard_order = 3 default_cadence_s = 60 + dedup_sweep_days = 7 # Epicenter lat/lon are top-level keys in event.data (see _build_event). enrichment_locations = [("latitude", "longitude")] @@ -200,45 +201,6 @@ class USGSQuakeAdapter(SourceAdapter): self._db = None logger.info("USGS quake adapter shut down") - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 7 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-7 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("USGS quake swept old dedup entries", extra={"count": count}) - return count - def _build_url(self) -> str: """Build USGS GeoJSON feed URL.""" return f"{USGS_FEED_BASE}/{self._feed}.geojson" diff --git a/src/central/adapters/wfigs_incidents.py b/src/central/adapters/wfigs_incidents.py index 3c26505..7538fc2 100644 --- a/src/central/adapters/wfigs_incidents.py +++ b/src/central/adapters/wfigs_incidents.py @@ -133,45 +133,6 @@ class WFIGSIncidentsAdapter(SourceAdapter): extra={"region": self.region.model_dump() if self.region else None}, ) - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 14 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("WFIGS incidents swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: """Compute NATS subject for an event.""" # Removal events have a different subject pattern diff --git a/src/central/adapters/wfigs_perimeters.py b/src/central/adapters/wfigs_perimeters.py index 246ff77..c083db4 100644 --- a/src/central/adapters/wfigs_perimeters.py +++ b/src/central/adapters/wfigs_perimeters.py @@ -147,45 +147,6 @@ class WFIGSPerimetersAdapter(SourceAdapter): extra={"region": self.region.model_dump() if self.region else None}, ) - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 14 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count}) - return count - def subject_for(self, event: Event) -> str: """Compute NATS subject for an event.""" # Removal events have a different subject pattern