From 71a43d3c98f8ad45fb50e3bb2982ef59bcbb5208 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sat, 16 May 2026 18:49:59 +0000 Subject: [PATCH] schema: add config.streams table with column-filtered notify - config.streams table for JetStream retention config - Column-filtered NOTIFY: only fires on max_age_s changes - Prevents self-loop when supervisor updates max_bytes - Seeds CENTRAL_WX (7d/10GB) and CENTRAL_META (1d/100MB) Co-Authored-By: Claude Opus 4.5 --- sql/migrations/003_add_streams_table.sql | 46 ++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 sql/migrations/003_add_streams_table.sql diff --git a/sql/migrations/003_add_streams_table.sql b/sql/migrations/003_add_streams_table.sql new file mode 100644 index 0000000..27d59cd --- /dev/null +++ b/sql/migrations/003_add_streams_table.sql @@ -0,0 +1,46 @@ +-- Migration: 003_add_streams_table +-- Creates the config.streams table for JetStream stream retention configuration. +-- Uses column-filtered NOTIFY to prevent self-loop when supervisor updates max_bytes. + +-- Streams configuration table +CREATE TABLE config.streams ( + name TEXT PRIMARY KEY, + max_age_s BIGINT NOT NULL, + max_bytes BIGINT NOT NULL DEFAULT 1073741824, -- 1GB default + managed_max_bytes BOOLEAN NOT NULL DEFAULT true, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Auto-update trigger for updated_at +CREATE TRIGGER streams_set_updated_at + BEFORE UPDATE ON config.streams + FOR EACH ROW EXECUTE FUNCTION config.set_updated_at(); + +-- Column-filtered NOTIFY trigger for streams. +-- Fires on INSERT/DELETE always. +-- On UPDATE, only fires when max_age_s changes (operator-touchable field), +-- NOT when max_bytes changes (supervisor-managed), to prevent recompute loop. +CREATE OR REPLACE FUNCTION config.notify_streams_change() +RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'INSERT' OR TG_OP = 'DELETE' THEN + PERFORM pg_notify('config_changed', 'streams:' || + COALESCE(NEW.name, OLD.name)); + ELSIF TG_OP = 'UPDATE' AND + OLD.max_age_s IS DISTINCT FROM NEW.max_age_s THEN + PERFORM pg_notify('config_changed', 'streams:' || NEW.name); + END IF; + RETURN COALESCE(NEW, OLD); +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER streams_notify + AFTER INSERT OR UPDATE OR DELETE ON config.streams + FOR EACH ROW EXECUTE FUNCTION config.notify_streams_change(); + +-- Seed with current stream values from investigation +-- CENTRAL_WX: 7d max_age (604800s), 10GB max_bytes (will be clamped to 6GB on first recompute) +-- CENTRAL_META: 1d max_age (86400s), 100MB max_bytes (will be raised to 1GB floor) +INSERT INTO config.streams (name, max_age_s, max_bytes) VALUES + ('CENTRAL_WX', 604800, 10737418240), + ('CENTRAL_META', 86400, 104857600);