mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
Merge pull request #81 from zvx-echo6/v0_9_19_dedup_extraction
v0.9.19 - extract 9 adapters inline dedup onto SourceAdapter base
This commit is contained in:
commit
195a544c36
10 changed files with 5 additions and 336 deletions
|
|
@ -147,6 +147,7 @@ class EONETAdapter(SourceAdapter):
|
||||||
api_key_field = None
|
api_key_field = None
|
||||||
wizard_order = None
|
wizard_order = None
|
||||||
default_cadence_s = 1800
|
default_cadence_s = 1800
|
||||||
|
dedup_sweep_days = 30
|
||||||
|
|
||||||
# Event lat/lon mirrored from Geo.centroid into event.data (see poll()).
|
# Event lat/lon mirrored from Geo.centroid into event.data (see poll()).
|
||||||
enrichment_locations = [("latitude", "longitude")]
|
enrichment_locations = [("latitude", "longitude")]
|
||||||
|
|
@ -220,42 +221,6 @@ class EONETAdapter(SourceAdapter):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
def is_published(self, dedup_key: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, dedup_key),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, dedup_key: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, dedup_key),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("EONET swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
# event.category is "disaster.eonet.<subject_category>[.removed]"
|
# event.category is "disaster.eonet.<subject_category>[.removed]"
|
||||||
parts = event.category.split(".")
|
parts = event.category.split(".")
|
||||||
|
|
|
||||||
|
|
@ -69,6 +69,7 @@ class FIRMSAdapter(SourceAdapter):
|
||||||
api_key_field = "api_key_alias"
|
api_key_field = "api_key_alias"
|
||||||
wizard_order = 2
|
wizard_order = 2
|
||||||
default_cadence_s = 300
|
default_cadence_s = 300
|
||||||
|
dedup_sweep_days = 2 # 48h dedup window (FIRMS hotspots churn fast)
|
||||||
|
|
||||||
# Enrichment pilot (PR J): FIRMS rows carry top-level latitude/longitude.
|
# Enrichment pilot (PR J): FIRMS rows carry top-level latitude/longitude.
|
||||||
enrichment_locations = [("latitude", "longitude")]
|
enrichment_locations = [("latitude", "longitude")]
|
||||||
|
|
@ -203,45 +204,6 @@ class FIRMSAdapter(SourceAdapter):
|
||||||
self._db = None
|
self._db = None
|
||||||
logger.info("FIRMS adapter shut down")
|
logger.info("FIRMS adapter shut down")
|
||||||
|
|
||||||
def is_published(self, stable_id: str) -> bool:
|
|
||||||
"""Check if an event has already been published."""
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, stable_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, stable_id: str) -> None:
|
|
||||||
"""Mark an event as published."""
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, stable_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
"""Remove published_ids older than 48 hours. Returns count deleted."""
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-48 hours')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("FIRMS swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def _build_stable_id(
|
def _build_stable_id(
|
||||||
self, satellite: str, acq_date: str, acq_time: str, lat: float, lon: float
|
self, satellite: str, acq_date: str, acq_time: str, lat: float, lon: float
|
||||||
) -> str:
|
) -> str:
|
||||||
|
|
|
||||||
|
|
@ -149,6 +149,7 @@ class GDACSAdapter(SourceAdapter):
|
||||||
api_key_field = None
|
api_key_field = None
|
||||||
wizard_order = None
|
wizard_order = None
|
||||||
default_cadence_s = 600
|
default_cadence_s = 600
|
||||||
|
dedup_sweep_days = 30
|
||||||
|
|
||||||
# Event lat/lon mirrored from Geo.centroid into event.data (see poll()).
|
# Event lat/lon mirrored from Geo.centroid into event.data (see poll()).
|
||||||
enrichment_locations = [("latitude", "longitude")]
|
enrichment_locations = [("latitude", "longitude")]
|
||||||
|
|
@ -204,42 +205,6 @@ class GDACSAdapter(SourceAdapter):
|
||||||
)
|
)
|
||||||
logger.info("GDACS config updated", extra={"event_types": self.event_types})
|
logger.info("GDACS config updated", extra={"event_types": self.event_types})
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-30 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("GDACS swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
parts = event.category.split(".")
|
parts = event.category.split(".")
|
||||||
country_subj = subject_for_country(event.data.get("country"))
|
country_subj = subject_for_country(event.data.get("country"))
|
||||||
|
|
|
||||||
|
|
@ -418,6 +418,7 @@ class NWSAdapter(SourceAdapter):
|
||||||
)
|
)
|
||||||
self._db.commit()
|
self._db.commit()
|
||||||
|
|
||||||
|
# TODO(v0.9.19.1): unscoped global DELETE -- clobbers other adapters' dedup rows; scope to adapter + move to base.
|
||||||
def sweep_old_ids(self) -> int:
|
def sweep_old_ids(self) -> int:
|
||||||
"""Remove published_ids older than 8 days. Returns count deleted."""
|
"""Remove published_ids older than 8 days. Returns count deleted."""
|
||||||
if not self._db:
|
if not self._db:
|
||||||
|
|
|
||||||
|
|
@ -88,42 +88,6 @@ class SWPCAlertsAdapter(SourceAdapter):
|
||||||
async def apply_config(self, new_config: AdapterConfig) -> None:
|
async def apply_config(self, new_config: AdapterConfig) -> None:
|
||||||
logger.info("SWPC alerts config updated")
|
logger.info("SWPC alerts config updated")
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("SWPC alerts swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
product_id = event.data.get("product_id") or "unknown"
|
product_id = event.data.get("product_id") or "unknown"
|
||||||
return f"central.space.alert.{product_id.lower()}"
|
return f"central.space.alert.{product_id.lower()}"
|
||||||
|
|
|
||||||
|
|
@ -88,42 +88,6 @@ class SWPCKindexAdapter(SourceAdapter):
|
||||||
async def apply_config(self, new_config: AdapterConfig) -> None:
|
async def apply_config(self, new_config: AdapterConfig) -> None:
|
||||||
logger.info("SWPC kindex config updated")
|
logger.info("SWPC kindex config updated")
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("SWPC kindex swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
return "central.space.kindex"
|
return "central.space.kindex"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -87,42 +87,6 @@ class SWPCProtonsAdapter(SourceAdapter):
|
||||||
async def apply_config(self, new_config: AdapterConfig) -> None:
|
async def apply_config(self, new_config: AdapterConfig) -> None:
|
||||||
logger.info("SWPC protons config updated")
|
logger.info("SWPC protons config updated")
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("SWPC protons swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
return "central.space.proton_flux"
|
return "central.space.proton_flux"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -78,6 +78,7 @@ class USGSQuakeAdapter(SourceAdapter):
|
||||||
requires_api_key = None
|
requires_api_key = None
|
||||||
wizard_order = 3
|
wizard_order = 3
|
||||||
default_cadence_s = 60
|
default_cadence_s = 60
|
||||||
|
dedup_sweep_days = 7
|
||||||
|
|
||||||
# Epicenter lat/lon are top-level keys in event.data (see _build_event).
|
# Epicenter lat/lon are top-level keys in event.data (see _build_event).
|
||||||
enrichment_locations = [("latitude", "longitude")]
|
enrichment_locations = [("latitude", "longitude")]
|
||||||
|
|
@ -200,45 +201,6 @@ class USGSQuakeAdapter(SourceAdapter):
|
||||||
self._db = None
|
self._db = None
|
||||||
logger.info("USGS quake adapter shut down")
|
logger.info("USGS quake adapter shut down")
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
"""Check if an event has already been published."""
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
"""Mark an event as published."""
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
"""Remove published_ids older than 7 days. Returns count deleted."""
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-7 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("USGS quake swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def _build_url(self) -> str:
|
def _build_url(self) -> str:
|
||||||
"""Build USGS GeoJSON feed URL."""
|
"""Build USGS GeoJSON feed URL."""
|
||||||
return f"{USGS_FEED_BASE}/{self._feed}.geojson"
|
return f"{USGS_FEED_BASE}/{self._feed}.geojson"
|
||||||
|
|
|
||||||
|
|
@ -133,45 +133,6 @@ class WFIGSIncidentsAdapter(SourceAdapter):
|
||||||
extra={"region": self.region.model_dump() if self.region else None},
|
extra={"region": self.region.model_dump() if self.region else None},
|
||||||
)
|
)
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
"""Check if an event has already been published."""
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
"""Mark an event as published."""
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
"""Remove published_ids older than 14 days. Returns count deleted."""
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("WFIGS incidents swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
"""Compute NATS subject for an event."""
|
"""Compute NATS subject for an event."""
|
||||||
# Removal events have a different subject pattern
|
# Removal events have a different subject pattern
|
||||||
|
|
|
||||||
|
|
@ -147,45 +147,6 @@ class WFIGSPerimetersAdapter(SourceAdapter):
|
||||||
extra={"region": self.region.model_dump() if self.region else None},
|
extra={"region": self.region.model_dump() if self.region else None},
|
||||||
)
|
)
|
||||||
|
|
||||||
def is_published(self, event_id: str) -> bool:
|
|
||||||
"""Check if an event has already been published."""
|
|
||||||
if not self._db:
|
|
||||||
return False
|
|
||||||
cur = self._db.execute(
|
|
||||||
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
return cur.fetchone() is not None
|
|
||||||
|
|
||||||
def mark_published(self, event_id: str) -> None:
|
|
||||||
"""Mark an event as published."""
|
|
||||||
if not self._db:
|
|
||||||
return
|
|
||||||
self._db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
|
|
||||||
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
||||||
ON CONFLICT (adapter, event_id) DO UPDATE SET
|
|
||||||
last_seen = CURRENT_TIMESTAMP
|
|
||||||
""",
|
|
||||||
(self.name, event_id),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
def sweep_old_ids(self) -> int:
|
|
||||||
"""Remove published_ids older than 14 days. Returns count deleted."""
|
|
||||||
if not self._db:
|
|
||||||
return 0
|
|
||||||
cur = self._db.execute(
|
|
||||||
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
|
|
||||||
(self.name,),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
count = cur.rowcount
|
|
||||||
if count > 0:
|
|
||||||
logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count})
|
|
||||||
return count
|
|
||||||
|
|
||||||
def subject_for(self, event: Event) -> str:
|
def subject_for(self, event: Event) -> str:
|
||||||
"""Compute NATS subject for an event."""
|
"""Compute NATS subject for an event."""
|
||||||
# Removal events have a different subject pattern
|
# Removal events have a different subject pattern
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue