mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
v0.9.13: per-stream archived-events retention sweep
Hourly supervisor sweep deletes archived events older than their stream's config.streams.max_age_s (the /streams 1/7/14/30/365d buttons -- the per-stream source-of-truth). Explicit STREAM_CATEGORY_DOMAINS map (category domains do not equal stream subject domains for the traffic family); fail-safe skip on missing/<=0 max_age_s; unmapped-domain warning so a new adapter cannot silently dodge retention. TimescaleDB hypertable gotcha: events.ctid is chunk-local, NOT globally unique -- DELETE batching keys on the composite PK (id, time). See the inline NOTE in config_store.delete_events_older_than and the PR body for the incident write-up. - config_store: delete_events_older_than (batched on (id,time)) + unmapped_event_domains - supervisor: STREAM_CATEGORY_DOMAINS, _sweep_events_retention, _events_retention_loop - streams_list.html: max-age also governs archived-events retention - 9 tests Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
3ff6f1ebc0
commit
9f67b4c1f2
4 changed files with 274 additions and 0 deletions
|
|
@ -202,6 +202,62 @@ class ConfigStore:
|
|||
)
|
||||
return [StreamConfig(**dict(row)) for row in rows]
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Archived-events retention (v0.9.13) -- DML on public.events, keyed by the
|
||||
# per-stream max_age_s above. Lives here because this is the supervisor's
|
||||
# Postgres gateway; the events table shares the database.
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def delete_events_older_than(
|
||||
self, category_domains: list[str], max_age_s: int, batch: int = 10000
|
||||
) -> int:
|
||||
"""Delete archived events whose category first-token is in
|
||||
``category_domains`` and whose ``time`` is older than ``max_age_s`` seconds.
|
||||
|
||||
Deletes in batches (default 10k) so the initial bulk reclaim can't take a
|
||||
long table lock. Returns the total number of rows deleted."""
|
||||
if not category_domains:
|
||||
return 0
|
||||
total = 0
|
||||
async with self._pool.acquire() as conn:
|
||||
while True:
|
||||
# NOTE: events is a TimescaleDB hypertable -- ctid is only unique
|
||||
# WITHIN a chunk, so batching on ctid would delete same-ctid rows in
|
||||
# other chunks. Batch on the composite primary key (id, time), which
|
||||
# is globally unique.
|
||||
result = await conn.execute(
|
||||
"""
|
||||
DELETE FROM events WHERE (id, time) IN (
|
||||
SELECT id, time FROM events
|
||||
WHERE split_part(category, '.', 1) = ANY($1::text[])
|
||||
AND time < now() - ($2 * interval '1 second')
|
||||
LIMIT $3
|
||||
)
|
||||
""",
|
||||
category_domains, max_age_s, batch,
|
||||
)
|
||||
n = int(result.split()[-1])
|
||||
total += n
|
||||
if n < batch:
|
||||
break
|
||||
return total
|
||||
|
||||
async def unmapped_event_domains(self, mapped: list[str]) -> list[str]:
|
||||
"""Return distinct ``events.category`` first-token domains NOT in ``mapped``.
|
||||
|
||||
Used to surface category domains that no stream's retention map covers, so
|
||||
a new adapter's events can't silently dodge retention."""
|
||||
async with self._pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT DISTINCT split_part(category, '.', 1) AS domain
|
||||
FROM events
|
||||
WHERE split_part(category, '.', 1) <> ALL($1::text[])
|
||||
""",
|
||||
mapped,
|
||||
)
|
||||
return [r["domain"] for r in rows]
|
||||
|
||||
async def upsert_stream(self, name: str, max_age_s: int) -> None:
|
||||
"""Insert or update a stream's max_age_s (operator-facing)."""
|
||||
async with self._pool.acquire() as conn:
|
||||
|
|
|
|||
|
|
@ -4,6 +4,9 @@
|
|||
|
||||
{% block content %}
|
||||
<h1>Streams</h1>
|
||||
<p class="muted">Max age is the source-of-truth retention horizon: it bounds both how
|
||||
long messages live in NATS and how long archived events are kept in the events table
|
||||
(swept hourly by the supervisor).</p>
|
||||
|
||||
<div class="cols">
|
||||
{% for stream in streams %}
|
||||
|
|
|
|||
|
|
@ -113,6 +113,33 @@ STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY}
|
|||
|
||||
# Recompute interval for stream max_bytes (1 hour)
|
||||
STREAM_RECOMPUTE_INTERVAL_S = 3600
|
||||
# How often to sweep expired archived events out of the events table (1 hour).
|
||||
EVENTS_RETENTION_INTERVAL_S = 3600
|
||||
|
||||
# Maps each event-bearing stream to the events.category first-token domains it
|
||||
# owns. Category domains do NOT equal stream subject domains for the traffic
|
||||
# family (flow/incident/closure/work_zone/camera all live under central.traffic*),
|
||||
# so the mapping is explicit. The events-retention sweep uses each stream's
|
||||
# config.streams.max_age_s (the /streams 1/7/14/30/365d buttons) as the
|
||||
# source-of-truth horizon. Keep in sync with the StreamEntry registry: a category
|
||||
# domain present in events but absent here is logged by _sweep_events_retention so
|
||||
# a new adapter's events can't silently evade retention.
|
||||
STREAM_CATEGORY_DOMAINS: dict[str, tuple[str, ...]] = {
|
||||
"CENTRAL_WX": ("wx",),
|
||||
"CENTRAL_FIRE": ("fire",),
|
||||
"CENTRAL_QUAKE": ("quake",),
|
||||
"CENTRAL_SPACE": ("space",),
|
||||
"CENTRAL_DISASTER": ("disaster",),
|
||||
"CENTRAL_HYDRO": ("hydro",),
|
||||
"CENTRAL_TRAFFIC": ("incident", "closure", "work_zone"),
|
||||
"CENTRAL_TRAFFIC_FLOW": ("flow",),
|
||||
"CENTRAL_TRAFFIC_CAMERAS": ("camera",),
|
||||
}
|
||||
|
||||
|
||||
def _all_mapped_domains() -> set[str]:
|
||||
"""Every category domain covered by STREAM_CATEGORY_DOMAINS."""
|
||||
return {d for domains in STREAM_CATEGORY_DOMAINS.values() for d in domains}
|
||||
|
||||
|
||||
class JsonFormatter(logging.Formatter):
|
||||
|
|
@ -646,6 +673,69 @@ class Supervisor:
|
|||
extra={"stream": stream_name, "error": str(e)},
|
||||
)
|
||||
|
||||
async def _sweep_events_retention(self) -> None:
|
||||
"""Delete archived events older than their stream's max_age_s (the
|
||||
per-stream retention from /streams). Fail-safe: a stream missing from
|
||||
config.streams or with max_age_s <= 0 is skipped (never deleted)."""
|
||||
streams = {s.name: s for s in await self._config_store.list_streams()}
|
||||
total = 0
|
||||
for stream_name, domains in STREAM_CATEGORY_DOMAINS.items():
|
||||
cfg = streams.get(stream_name)
|
||||
if cfg is None or cfg.max_age_s is None or cfg.max_age_s <= 0:
|
||||
logger.warning(
|
||||
"Skipping events retention: no/invalid max_age_s",
|
||||
extra={"stream": stream_name},
|
||||
)
|
||||
continue
|
||||
try:
|
||||
deleted = await self._config_store.delete_events_older_than(
|
||||
list(domains), cfg.max_age_s
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Events retention sweep failed for stream",
|
||||
extra={"stream": stream_name, "error": str(e)},
|
||||
)
|
||||
continue
|
||||
total += deleted
|
||||
if deleted:
|
||||
logger.info(
|
||||
"Swept expired events",
|
||||
extra={"stream": stream_name, "deleted": deleted,
|
||||
"max_age_s": cfg.max_age_s},
|
||||
)
|
||||
# Surface any category domain no stream's map covers (potential leak).
|
||||
try:
|
||||
unmapped = await self._config_store.unmapped_event_domains(
|
||||
list(_all_mapped_domains())
|
||||
)
|
||||
if unmapped:
|
||||
logger.warning(
|
||||
"events categories not covered by retention map",
|
||||
extra={"unmapped_domains": sorted(unmapped)},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed checking unmapped event domains",
|
||||
extra={"error": str(e)})
|
||||
logger.info("Events retention sweep complete",
|
||||
extra={"total_deleted": total})
|
||||
|
||||
async def _events_retention_loop(self) -> None:
|
||||
"""Sweep expired archived events immediately on startup, then hourly."""
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
await self._sweep_events_retention()
|
||||
except Exception as e:
|
||||
logger.error("Events retention loop error", extra={"error": str(e)})
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._shutdown_event.wait(),
|
||||
timeout=EVENTS_RETENTION_INTERVAL_S,
|
||||
)
|
||||
break # shutdown requested
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
|
||||
async def _stream_retention_recompute_loop(self) -> None:
|
||||
"""Periodically recompute max_bytes for all streams."""
|
||||
while not self._shutdown_event.is_set():
|
||||
|
|
@ -885,6 +975,9 @@ class Supervisor:
|
|||
# Start stream retention recompute loop
|
||||
self._tasks.append(asyncio.create_task(self._stream_retention_recompute_loop()))
|
||||
|
||||
# Start archived-events retention sweep loop (v0.9.13)
|
||||
self._tasks.append(asyncio.create_task(self._events_retention_loop()))
|
||||
|
||||
logger.info(
|
||||
"Supervisor started",
|
||||
extra={"adapters": list(self._adapter_states.keys())},
|
||||
|
|
|
|||
122
tests/test_events_retention.py
Normal file
122
tests/test_events_retention.py
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
"""Archived-events retention sweep (v0.9.13).
|
||||
|
||||
The supervisor deletes events older than their stream's max_age_s (the per-stream
|
||||
/streams retention). Mapping events.category -> stream is explicit because category
|
||||
domains don't equal stream subject domains for the traffic family. Fail-safe:
|
||||
streams with missing/<=0 max_age_s are skipped (never deleted), and any category
|
||||
domain the map doesn't cover is surfaced as a warning.
|
||||
"""
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from central.supervisor import (
|
||||
STREAM_CATEGORY_DOMAINS,
|
||||
EVENTS_RETENTION_INTERVAL_S,
|
||||
Supervisor,
|
||||
_all_mapped_domains,
|
||||
)
|
||||
from central.streams import STREAMS as STREAM_REGISTRY
|
||||
|
||||
# Every category first-token domain observed in the events table (the coverage
|
||||
# guard: a new adapter domain must be added to STREAM_CATEGORY_DOMAINS or this fails).
|
||||
KNOWN_EVENT_DOMAINS = {
|
||||
"wx", "fire", "quake", "space", "disaster", "hydro",
|
||||
"incident", "closure", "work_zone", "flow", "camera",
|
||||
}
|
||||
|
||||
|
||||
class TestRetentionMap:
|
||||
def test_map_covers_all_known_domains(self):
|
||||
assert KNOWN_EVENT_DOMAINS <= _all_mapped_domains()
|
||||
|
||||
def test_map_keys_are_event_bearing_streams(self):
|
||||
event_streams = {s.name for s in STREAM_REGISTRY if s.event_bearing}
|
||||
assert set(STREAM_CATEGORY_DOMAINS) <= event_streams
|
||||
|
||||
def test_central_meta_not_in_map(self):
|
||||
# CENTRAL_META is status-only (not event-bearing) -> no events rows to sweep.
|
||||
assert "CENTRAL_META" not in STREAM_CATEGORY_DOMAINS
|
||||
|
||||
def test_no_domain_mapped_to_two_streams(self):
|
||||
seen, dupes = set(), set()
|
||||
for domains in STREAM_CATEGORY_DOMAINS.values():
|
||||
for d in domains:
|
||||
(dupes if d in seen else seen).add(d)
|
||||
assert not dupes
|
||||
|
||||
def test_interval_is_hourly(self):
|
||||
assert EVENTS_RETENTION_INTERVAL_S == 3600
|
||||
|
||||
|
||||
def _stream(name, max_age_s):
|
||||
s = MagicMock()
|
||||
s.name = name
|
||||
s.max_age_s = max_age_s
|
||||
return s
|
||||
|
||||
|
||||
def _supervisor_with_store(store):
|
||||
sup = Supervisor.__new__(Supervisor) # bypass __init__/connections
|
||||
sup._config_store = store
|
||||
return sup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sweep_deletes_per_event_bearing_stream():
|
||||
store = AsyncMock()
|
||||
store.list_streams.return_value = [
|
||||
_stream(name, 604800) for name in STREAM_CATEGORY_DOMAINS
|
||||
]
|
||||
store.delete_events_older_than.return_value = 5
|
||||
store.unmapped_event_domains.return_value = []
|
||||
sup = _supervisor_with_store(store)
|
||||
|
||||
await sup._sweep_events_retention()
|
||||
|
||||
assert store.delete_events_older_than.await_count == len(STREAM_CATEGORY_DOMAINS)
|
||||
# CENTRAL_TRAFFIC carries the multi-domain traffic family.
|
||||
calls = {c.args[0][0]: c.args for c in store.delete_events_older_than.await_args_list}
|
||||
assert "incident" in [d for c in store.delete_events_older_than.await_args_list for d in c.args[0]]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sweep_skips_missing_and_nonpositive_max_age():
|
||||
store = AsyncMock()
|
||||
# WX present/valid; FIRE max_age_s=0 (skip); others absent (skip).
|
||||
store.list_streams.return_value = [_stream("CENTRAL_WX", 86400), _stream("CENTRAL_FIRE", 0)]
|
||||
store.delete_events_older_than.return_value = 0
|
||||
store.unmapped_event_domains.return_value = []
|
||||
sup = _supervisor_with_store(store)
|
||||
|
||||
await sup._sweep_events_retention()
|
||||
|
||||
swept_domains = [c.args[0] for c in store.delete_events_older_than.await_args_list]
|
||||
assert ["wx"] in swept_domains # WX swept
|
||||
assert ["fire"] not in swept_domains # FIRE (max_age 0) skipped
|
||||
assert store.delete_events_older_than.await_count == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sweep_passes_max_age_to_delete():
|
||||
store = AsyncMock()
|
||||
store.list_streams.return_value = [_stream("CENTRAL_WX", 123456)]
|
||||
store.delete_events_older_than.return_value = 0
|
||||
store.unmapped_event_domains.return_value = []
|
||||
sup = _supervisor_with_store(store)
|
||||
|
||||
await sup._sweep_events_retention()
|
||||
|
||||
args = store.delete_events_older_than.await_args
|
||||
assert args.args == (["wx"], 123456)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sweep_warns_on_unmapped_domains_but_does_not_raise():
|
||||
store = AsyncMock()
|
||||
store.list_streams.return_value = [_stream("CENTRAL_WX", 86400)]
|
||||
store.delete_events_older_than.return_value = 0
|
||||
store.unmapped_event_domains.return_value = ["mystery_domain"]
|
||||
sup = _supervisor_with_store(store)
|
||||
# Should complete without raising even though an unmapped domain exists.
|
||||
await sup._sweep_events_retention()
|
||||
store.unmapped_event_domains.assert_awaited_once()
|
||||
Loading…
Add table
Add a link
Reference in a new issue