Merge pull request #74 from zvx-echo6/v0_9_13_events_retention

v0.9.13: per-stream archived-events retention sweep
This commit is contained in:
malice 2026-05-26 20:36:44 -06:00 committed by GitHub
commit 05b89df3a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 274 additions and 0 deletions

View file

@ -202,6 +202,62 @@ class ConfigStore:
) )
return [StreamConfig(**dict(row)) for row in rows] return [StreamConfig(**dict(row)) for row in rows]
# -------------------------------------------------------------------------
# Archived-events retention (v0.9.13) -- DML on public.events, keyed by the
# per-stream max_age_s above. Lives here because this is the supervisor's
# Postgres gateway; the events table shares the database.
# -------------------------------------------------------------------------
async def delete_events_older_than(
self, category_domains: list[str], max_age_s: int, batch: int = 10000
) -> int:
"""Delete archived events whose category first-token is in
``category_domains`` and whose ``time`` is older than ``max_age_s`` seconds.
Deletes in batches (default 10k) so the initial bulk reclaim can't take a
long table lock. Returns the total number of rows deleted."""
if not category_domains:
return 0
total = 0
async with self._pool.acquire() as conn:
while True:
# NOTE: events is a TimescaleDB hypertable -- ctid is only unique
# WITHIN a chunk, so batching on ctid would delete same-ctid rows in
# other chunks. Batch on the composite primary key (id, time), which
# is globally unique.
result = await conn.execute(
"""
DELETE FROM events WHERE (id, time) IN (
SELECT id, time FROM events
WHERE split_part(category, '.', 1) = ANY($1::text[])
AND time < now() - ($2 * interval '1 second')
LIMIT $3
)
""",
category_domains, max_age_s, batch,
)
n = int(result.split()[-1])
total += n
if n < batch:
break
return total
async def unmapped_event_domains(self, mapped: list[str]) -> list[str]:
"""Return distinct ``events.category`` first-token domains NOT in ``mapped``.
Used to surface category domains that no stream's retention map covers, so
a new adapter's events can't silently dodge retention."""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT DISTINCT split_part(category, '.', 1) AS domain
FROM events
WHERE split_part(category, '.', 1) <> ALL($1::text[])
""",
mapped,
)
return [r["domain"] for r in rows]
async def upsert_stream(self, name: str, max_age_s: int) -> None: async def upsert_stream(self, name: str, max_age_s: int) -> None:
"""Insert or update a stream's max_age_s (operator-facing).""" """Insert or update a stream's max_age_s (operator-facing)."""
async with self._pool.acquire() as conn: async with self._pool.acquire() as conn:

View file

@ -4,6 +4,9 @@
{% block content %} {% block content %}
<h1>Streams</h1> <h1>Streams</h1>
<p class="muted">Max age is the source-of-truth retention horizon: it bounds both how
long messages live in NATS and how long archived events are kept in the events table
(swept hourly by the supervisor).</p>
<div class="cols"> <div class="cols">
{% for stream in streams %} {% for stream in streams %}

View file

@ -113,6 +113,33 @@ STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY}
# Recompute interval for stream max_bytes (1 hour) # Recompute interval for stream max_bytes (1 hour)
STREAM_RECOMPUTE_INTERVAL_S = 3600 STREAM_RECOMPUTE_INTERVAL_S = 3600
# How often to sweep expired archived events out of the events table (1 hour).
EVENTS_RETENTION_INTERVAL_S = 3600
# Maps each event-bearing stream to the events.category first-token domains it
# owns. Category domains do NOT equal stream subject domains for the traffic
# family (flow/incident/closure/work_zone/camera all live under central.traffic*),
# so the mapping is explicit. The events-retention sweep uses each stream's
# config.streams.max_age_s (the /streams 1/7/14/30/365d buttons) as the
# source-of-truth horizon. Keep in sync with the StreamEntry registry: a category
# domain present in events but absent here is logged by _sweep_events_retention so
# a new adapter's events can't silently evade retention.
STREAM_CATEGORY_DOMAINS: dict[str, tuple[str, ...]] = {
"CENTRAL_WX": ("wx",),
"CENTRAL_FIRE": ("fire",),
"CENTRAL_QUAKE": ("quake",),
"CENTRAL_SPACE": ("space",),
"CENTRAL_DISASTER": ("disaster",),
"CENTRAL_HYDRO": ("hydro",),
"CENTRAL_TRAFFIC": ("incident", "closure", "work_zone"),
"CENTRAL_TRAFFIC_FLOW": ("flow",),
"CENTRAL_TRAFFIC_CAMERAS": ("camera",),
}
def _all_mapped_domains() -> set[str]:
"""Every category domain covered by STREAM_CATEGORY_DOMAINS."""
return {d for domains in STREAM_CATEGORY_DOMAINS.values() for d in domains}
class JsonFormatter(logging.Formatter): class JsonFormatter(logging.Formatter):
@ -646,6 +673,69 @@ class Supervisor:
extra={"stream": stream_name, "error": str(e)}, extra={"stream": stream_name, "error": str(e)},
) )
async def _sweep_events_retention(self) -> None:
"""Delete archived events older than their stream's max_age_s (the
per-stream retention from /streams). Fail-safe: a stream missing from
config.streams or with max_age_s <= 0 is skipped (never deleted)."""
streams = {s.name: s for s in await self._config_store.list_streams()}
total = 0
for stream_name, domains in STREAM_CATEGORY_DOMAINS.items():
cfg = streams.get(stream_name)
if cfg is None or cfg.max_age_s is None or cfg.max_age_s <= 0:
logger.warning(
"Skipping events retention: no/invalid max_age_s",
extra={"stream": stream_name},
)
continue
try:
deleted = await self._config_store.delete_events_older_than(
list(domains), cfg.max_age_s
)
except Exception as e:
logger.error(
"Events retention sweep failed for stream",
extra={"stream": stream_name, "error": str(e)},
)
continue
total += deleted
if deleted:
logger.info(
"Swept expired events",
extra={"stream": stream_name, "deleted": deleted,
"max_age_s": cfg.max_age_s},
)
# Surface any category domain no stream's map covers (potential leak).
try:
unmapped = await self._config_store.unmapped_event_domains(
list(_all_mapped_domains())
)
if unmapped:
logger.warning(
"events categories not covered by retention map",
extra={"unmapped_domains": sorted(unmapped)},
)
except Exception as e:
logger.error("Failed checking unmapped event domains",
extra={"error": str(e)})
logger.info("Events retention sweep complete",
extra={"total_deleted": total})
async def _events_retention_loop(self) -> None:
"""Sweep expired archived events immediately on startup, then hourly."""
while not self._shutdown_event.is_set():
try:
await self._sweep_events_retention()
except Exception as e:
logger.error("Events retention loop error", extra={"error": str(e)})
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=EVENTS_RETENTION_INTERVAL_S,
)
break # shutdown requested
except asyncio.TimeoutError:
pass
async def _stream_retention_recompute_loop(self) -> None: async def _stream_retention_recompute_loop(self) -> None:
"""Periodically recompute max_bytes for all streams.""" """Periodically recompute max_bytes for all streams."""
while not self._shutdown_event.is_set(): while not self._shutdown_event.is_set():
@ -885,6 +975,9 @@ class Supervisor:
# Start stream retention recompute loop # Start stream retention recompute loop
self._tasks.append(asyncio.create_task(self._stream_retention_recompute_loop())) self._tasks.append(asyncio.create_task(self._stream_retention_recompute_loop()))
# Start archived-events retention sweep loop (v0.9.13)
self._tasks.append(asyncio.create_task(self._events_retention_loop()))
logger.info( logger.info(
"Supervisor started", "Supervisor started",
extra={"adapters": list(self._adapter_states.keys())}, extra={"adapters": list(self._adapter_states.keys())},

View file

@ -0,0 +1,122 @@
"""Archived-events retention sweep (v0.9.13).
The supervisor deletes events older than their stream's max_age_s (the per-stream
/streams retention). Mapping events.category -> stream is explicit because category
domains don't equal stream subject domains for the traffic family. Fail-safe:
streams with missing/<=0 max_age_s are skipped (never deleted), and any category
domain the map doesn't cover is surfaced as a warning.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock
from central.supervisor import (
STREAM_CATEGORY_DOMAINS,
EVENTS_RETENTION_INTERVAL_S,
Supervisor,
_all_mapped_domains,
)
from central.streams import STREAMS as STREAM_REGISTRY
# Every category first-token domain observed in the events table (the coverage
# guard: a new adapter domain must be added to STREAM_CATEGORY_DOMAINS or this fails).
KNOWN_EVENT_DOMAINS = {
"wx", "fire", "quake", "space", "disaster", "hydro",
"incident", "closure", "work_zone", "flow", "camera",
}
class TestRetentionMap:
def test_map_covers_all_known_domains(self):
assert KNOWN_EVENT_DOMAINS <= _all_mapped_domains()
def test_map_keys_are_event_bearing_streams(self):
event_streams = {s.name for s in STREAM_REGISTRY if s.event_bearing}
assert set(STREAM_CATEGORY_DOMAINS) <= event_streams
def test_central_meta_not_in_map(self):
# CENTRAL_META is status-only (not event-bearing) -> no events rows to sweep.
assert "CENTRAL_META" not in STREAM_CATEGORY_DOMAINS
def test_no_domain_mapped_to_two_streams(self):
seen, dupes = set(), set()
for domains in STREAM_CATEGORY_DOMAINS.values():
for d in domains:
(dupes if d in seen else seen).add(d)
assert not dupes
def test_interval_is_hourly(self):
assert EVENTS_RETENTION_INTERVAL_S == 3600
def _stream(name, max_age_s):
s = MagicMock()
s.name = name
s.max_age_s = max_age_s
return s
def _supervisor_with_store(store):
sup = Supervisor.__new__(Supervisor) # bypass __init__/connections
sup._config_store = store
return sup
@pytest.mark.asyncio
async def test_sweep_deletes_per_event_bearing_stream():
store = AsyncMock()
store.list_streams.return_value = [
_stream(name, 604800) for name in STREAM_CATEGORY_DOMAINS
]
store.delete_events_older_than.return_value = 5
store.unmapped_event_domains.return_value = []
sup = _supervisor_with_store(store)
await sup._sweep_events_retention()
assert store.delete_events_older_than.await_count == len(STREAM_CATEGORY_DOMAINS)
# CENTRAL_TRAFFIC carries the multi-domain traffic family.
calls = {c.args[0][0]: c.args for c in store.delete_events_older_than.await_args_list}
assert "incident" in [d for c in store.delete_events_older_than.await_args_list for d in c.args[0]]
@pytest.mark.asyncio
async def test_sweep_skips_missing_and_nonpositive_max_age():
store = AsyncMock()
# WX present/valid; FIRE max_age_s=0 (skip); others absent (skip).
store.list_streams.return_value = [_stream("CENTRAL_WX", 86400), _stream("CENTRAL_FIRE", 0)]
store.delete_events_older_than.return_value = 0
store.unmapped_event_domains.return_value = []
sup = _supervisor_with_store(store)
await sup._sweep_events_retention()
swept_domains = [c.args[0] for c in store.delete_events_older_than.await_args_list]
assert ["wx"] in swept_domains # WX swept
assert ["fire"] not in swept_domains # FIRE (max_age 0) skipped
assert store.delete_events_older_than.await_count == 1
@pytest.mark.asyncio
async def test_sweep_passes_max_age_to_delete():
store = AsyncMock()
store.list_streams.return_value = [_stream("CENTRAL_WX", 123456)]
store.delete_events_older_than.return_value = 0
store.unmapped_event_domains.return_value = []
sup = _supervisor_with_store(store)
await sup._sweep_events_retention()
args = store.delete_events_older_than.await_args
assert args.args == (["wx"], 123456)
@pytest.mark.asyncio
async def test_sweep_warns_on_unmapped_domains_but_does_not_raise():
store = AsyncMock()
store.list_streams.return_value = [_stream("CENTRAL_WX", 86400)]
store.delete_events_older_than.return_value = 0
store.unmapped_event_domains.return_value = ["mystery_domain"]
sup = _supervisor_with_store(store)
# Should complete without raising even though an unmapped domain exists.
await sup._sweep_events_retention()
store.unmapped_event_domains.assert_awaited_once()