diff --git a/src/central/config_store.py b/src/central/config_store.py index 0d51658..bd0b199 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -202,6 +202,62 @@ class ConfigStore: ) return [StreamConfig(**dict(row)) for row in rows] + # ------------------------------------------------------------------------- + # Archived-events retention (v0.9.13) -- DML on public.events, keyed by the + # per-stream max_age_s above. Lives here because this is the supervisor's + # Postgres gateway; the events table shares the database. + # ------------------------------------------------------------------------- + + async def delete_events_older_than( + self, category_domains: list[str], max_age_s: int, batch: int = 10000 + ) -> int: + """Delete archived events whose category first-token is in + ``category_domains`` and whose ``time`` is older than ``max_age_s`` seconds. + + Deletes in batches (default 10k) so the initial bulk reclaim can't take a + long table lock. Returns the total number of rows deleted.""" + if not category_domains: + return 0 + total = 0 + async with self._pool.acquire() as conn: + while True: + # NOTE: events is a TimescaleDB hypertable -- ctid is only unique + # WITHIN a chunk, so batching on ctid would delete same-ctid rows in + # other chunks. Batch on the composite primary key (id, time), which + # is globally unique. + result = await conn.execute( + """ + DELETE FROM events WHERE (id, time) IN ( + SELECT id, time FROM events + WHERE split_part(category, '.', 1) = ANY($1::text[]) + AND time < now() - ($2 * interval '1 second') + LIMIT $3 + ) + """, + category_domains, max_age_s, batch, + ) + n = int(result.split()[-1]) + total += n + if n < batch: + break + return total + + async def unmapped_event_domains(self, mapped: list[str]) -> list[str]: + """Return distinct ``events.category`` first-token domains NOT in ``mapped``. + + Used to surface category domains that no stream's retention map covers, so + a new adapter's events can't silently dodge retention.""" + async with self._pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT DISTINCT split_part(category, '.', 1) AS domain + FROM events + WHERE split_part(category, '.', 1) <> ALL($1::text[]) + """, + mapped, + ) + return [r["domain"] for r in rows] + async def upsert_stream(self, name: str, max_age_s: int) -> None: """Insert or update a stream's max_age_s (operator-facing).""" async with self._pool.acquire() as conn: diff --git a/src/central/gui/templates/streams_list.html b/src/central/gui/templates/streams_list.html index 7ebf5ea..f9d55d0 100644 --- a/src/central/gui/templates/streams_list.html +++ b/src/central/gui/templates/streams_list.html @@ -4,6 +4,9 @@ {% block content %}

Streams

+

Max age is the source-of-truth retention horizon: it bounds both how +long messages live in NATS and how long archived events are kept in the events table +(swept hourly by the supervisor).

{% for stream in streams %} diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 9af2b39..b54cf1b 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -113,6 +113,33 @@ STREAM_SUBJECTS = {s.name: [s.subject_filter] for s in STREAM_REGISTRY} # Recompute interval for stream max_bytes (1 hour) STREAM_RECOMPUTE_INTERVAL_S = 3600 +# How often to sweep expired archived events out of the events table (1 hour). +EVENTS_RETENTION_INTERVAL_S = 3600 + +# Maps each event-bearing stream to the events.category first-token domains it +# owns. Category domains do NOT equal stream subject domains for the traffic +# family (flow/incident/closure/work_zone/camera all live under central.traffic*), +# so the mapping is explicit. The events-retention sweep uses each stream's +# config.streams.max_age_s (the /streams 1/7/14/30/365d buttons) as the +# source-of-truth horizon. Keep in sync with the StreamEntry registry: a category +# domain present in events but absent here is logged by _sweep_events_retention so +# a new adapter's events can't silently evade retention. +STREAM_CATEGORY_DOMAINS: dict[str, tuple[str, ...]] = { + "CENTRAL_WX": ("wx",), + "CENTRAL_FIRE": ("fire",), + "CENTRAL_QUAKE": ("quake",), + "CENTRAL_SPACE": ("space",), + "CENTRAL_DISASTER": ("disaster",), + "CENTRAL_HYDRO": ("hydro",), + "CENTRAL_TRAFFIC": ("incident", "closure", "work_zone"), + "CENTRAL_TRAFFIC_FLOW": ("flow",), + "CENTRAL_TRAFFIC_CAMERAS": ("camera",), +} + + +def _all_mapped_domains() -> set[str]: + """Every category domain covered by STREAM_CATEGORY_DOMAINS.""" + return {d for domains in STREAM_CATEGORY_DOMAINS.values() for d in domains} class JsonFormatter(logging.Formatter): @@ -646,6 +673,69 @@ class Supervisor: extra={"stream": stream_name, "error": str(e)}, ) + async def _sweep_events_retention(self) -> None: + """Delete archived events older than their stream's max_age_s (the + per-stream retention from /streams). Fail-safe: a stream missing from + config.streams or with max_age_s <= 0 is skipped (never deleted).""" + streams = {s.name: s for s in await self._config_store.list_streams()} + total = 0 + for stream_name, domains in STREAM_CATEGORY_DOMAINS.items(): + cfg = streams.get(stream_name) + if cfg is None or cfg.max_age_s is None or cfg.max_age_s <= 0: + logger.warning( + "Skipping events retention: no/invalid max_age_s", + extra={"stream": stream_name}, + ) + continue + try: + deleted = await self._config_store.delete_events_older_than( + list(domains), cfg.max_age_s + ) + except Exception as e: + logger.error( + "Events retention sweep failed for stream", + extra={"stream": stream_name, "error": str(e)}, + ) + continue + total += deleted + if deleted: + logger.info( + "Swept expired events", + extra={"stream": stream_name, "deleted": deleted, + "max_age_s": cfg.max_age_s}, + ) + # Surface any category domain no stream's map covers (potential leak). + try: + unmapped = await self._config_store.unmapped_event_domains( + list(_all_mapped_domains()) + ) + if unmapped: + logger.warning( + "events categories not covered by retention map", + extra={"unmapped_domains": sorted(unmapped)}, + ) + except Exception as e: + logger.error("Failed checking unmapped event domains", + extra={"error": str(e)}) + logger.info("Events retention sweep complete", + extra={"total_deleted": total}) + + async def _events_retention_loop(self) -> None: + """Sweep expired archived events immediately on startup, then hourly.""" + while not self._shutdown_event.is_set(): + try: + await self._sweep_events_retention() + except Exception as e: + logger.error("Events retention loop error", extra={"error": str(e)}) + try: + await asyncio.wait_for( + self._shutdown_event.wait(), + timeout=EVENTS_RETENTION_INTERVAL_S, + ) + break # shutdown requested + except asyncio.TimeoutError: + pass + async def _stream_retention_recompute_loop(self) -> None: """Periodically recompute max_bytes for all streams.""" while not self._shutdown_event.is_set(): @@ -885,6 +975,9 @@ class Supervisor: # Start stream retention recompute loop self._tasks.append(asyncio.create_task(self._stream_retention_recompute_loop())) + # Start archived-events retention sweep loop (v0.9.13) + self._tasks.append(asyncio.create_task(self._events_retention_loop())) + logger.info( "Supervisor started", extra={"adapters": list(self._adapter_states.keys())}, diff --git a/tests/test_events_retention.py b/tests/test_events_retention.py new file mode 100644 index 0000000..893c7c2 --- /dev/null +++ b/tests/test_events_retention.py @@ -0,0 +1,122 @@ +"""Archived-events retention sweep (v0.9.13). + +The supervisor deletes events older than their stream's max_age_s (the per-stream +/streams retention). Mapping events.category -> stream is explicit because category +domains don't equal stream subject domains for the traffic family. Fail-safe: +streams with missing/<=0 max_age_s are skipped (never deleted), and any category +domain the map doesn't cover is surfaced as a warning. +""" +import pytest +from unittest.mock import AsyncMock, MagicMock + +from central.supervisor import ( + STREAM_CATEGORY_DOMAINS, + EVENTS_RETENTION_INTERVAL_S, + Supervisor, + _all_mapped_domains, +) +from central.streams import STREAMS as STREAM_REGISTRY + +# Every category first-token domain observed in the events table (the coverage +# guard: a new adapter domain must be added to STREAM_CATEGORY_DOMAINS or this fails). +KNOWN_EVENT_DOMAINS = { + "wx", "fire", "quake", "space", "disaster", "hydro", + "incident", "closure", "work_zone", "flow", "camera", +} + + +class TestRetentionMap: + def test_map_covers_all_known_domains(self): + assert KNOWN_EVENT_DOMAINS <= _all_mapped_domains() + + def test_map_keys_are_event_bearing_streams(self): + event_streams = {s.name for s in STREAM_REGISTRY if s.event_bearing} + assert set(STREAM_CATEGORY_DOMAINS) <= event_streams + + def test_central_meta_not_in_map(self): + # CENTRAL_META is status-only (not event-bearing) -> no events rows to sweep. + assert "CENTRAL_META" not in STREAM_CATEGORY_DOMAINS + + def test_no_domain_mapped_to_two_streams(self): + seen, dupes = set(), set() + for domains in STREAM_CATEGORY_DOMAINS.values(): + for d in domains: + (dupes if d in seen else seen).add(d) + assert not dupes + + def test_interval_is_hourly(self): + assert EVENTS_RETENTION_INTERVAL_S == 3600 + + +def _stream(name, max_age_s): + s = MagicMock() + s.name = name + s.max_age_s = max_age_s + return s + + +def _supervisor_with_store(store): + sup = Supervisor.__new__(Supervisor) # bypass __init__/connections + sup._config_store = store + return sup + + +@pytest.mark.asyncio +async def test_sweep_deletes_per_event_bearing_stream(): + store = AsyncMock() + store.list_streams.return_value = [ + _stream(name, 604800) for name in STREAM_CATEGORY_DOMAINS + ] + store.delete_events_older_than.return_value = 5 + store.unmapped_event_domains.return_value = [] + sup = _supervisor_with_store(store) + + await sup._sweep_events_retention() + + assert store.delete_events_older_than.await_count == len(STREAM_CATEGORY_DOMAINS) + # CENTRAL_TRAFFIC carries the multi-domain traffic family. + calls = {c.args[0][0]: c.args for c in store.delete_events_older_than.await_args_list} + assert "incident" in [d for c in store.delete_events_older_than.await_args_list for d in c.args[0]] + + +@pytest.mark.asyncio +async def test_sweep_skips_missing_and_nonpositive_max_age(): + store = AsyncMock() + # WX present/valid; FIRE max_age_s=0 (skip); others absent (skip). + store.list_streams.return_value = [_stream("CENTRAL_WX", 86400), _stream("CENTRAL_FIRE", 0)] + store.delete_events_older_than.return_value = 0 + store.unmapped_event_domains.return_value = [] + sup = _supervisor_with_store(store) + + await sup._sweep_events_retention() + + swept_domains = [c.args[0] for c in store.delete_events_older_than.await_args_list] + assert ["wx"] in swept_domains # WX swept + assert ["fire"] not in swept_domains # FIRE (max_age 0) skipped + assert store.delete_events_older_than.await_count == 1 + + +@pytest.mark.asyncio +async def test_sweep_passes_max_age_to_delete(): + store = AsyncMock() + store.list_streams.return_value = [_stream("CENTRAL_WX", 123456)] + store.delete_events_older_than.return_value = 0 + store.unmapped_event_domains.return_value = [] + sup = _supervisor_with_store(store) + + await sup._sweep_events_retention() + + args = store.delete_events_older_than.await_args + assert args.args == (["wx"], 123456) + + +@pytest.mark.asyncio +async def test_sweep_warns_on_unmapped_domains_but_does_not_raise(): + store = AsyncMock() + store.list_streams.return_value = [_stream("CENTRAL_WX", 86400)] + store.delete_events_older_than.return_value = 0 + store.unmapped_event_domains.return_value = ["mystery_domain"] + sup = _supervisor_with_store(store) + # Should complete without raising even though an unmapped domain exists. + await sup._sweep_events_retention() + store.unmapped_event_domains.assert_awaited_once()