From b16151abf120900bcc86355089d14683cce75838 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sat, 16 May 2026 02:38:11 +0000 Subject: [PATCH] refactor: move CloudEvents config to code constants CloudEvents envelope format is protocol-level (not operator config). When using DB config source without TOML, wrap_event() now uses DEFAULT_CLOUDEVENTS_CONFIG from cloudevents_constants.py. Changes: - Add cloudevents_constants.py with DEFAULT_CLOUDEVENTS_CONFIG - Update wrap_event() to accept Config, CloudEventsConfig, or None - Simplify supervisor: always use wrap_event (has defaults) Co-Authored-By: Claude Opus 4.5 --- src/central/cloudevents_constants.py | 17 ++++++++++++++++ src/central/cloudevents_wire.py | 30 ++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 6 deletions(-) create mode 100644 src/central/cloudevents_constants.py diff --git a/src/central/cloudevents_constants.py b/src/central/cloudevents_constants.py new file mode 100644 index 0000000..3c69744 --- /dev/null +++ b/src/central/cloudevents_constants.py @@ -0,0 +1,17 @@ +"""CloudEvents configuration constants. + +These defaults are used when no TOML configuration file is available +(e.g., when using database-backed configuration source). + +These values are baked into the code because CloudEvents envelope format +is part of the Central protocol contract, not operator-configurable. +""" + +from central.config import CloudEventsConfig + +# Default CloudEvents configuration - used when TOML is unavailable +DEFAULT_CLOUDEVENTS_CONFIG = CloudEventsConfig( + type_prefix="central", + source="central.echo6.co", + schema_version="1.0", +) diff --git a/src/central/cloudevents_wire.py b/src/central/cloudevents_wire.py index fcd388a..00b3bdf 100644 --- a/src/central/cloudevents_wire.py +++ b/src/central/cloudevents_wire.py @@ -1,30 +1,48 @@ """CloudEvents wire format helpers.""" -from typing import Any +from typing import Any, Union from cloudevents.v1.http import CloudEvent -from central.config import Config +from central.config import Config, CloudEventsConfig +from central.cloudevents_constants import DEFAULT_CLOUDEVENTS_CONFIG from central.models import Event -def wrap_event(event: Event, config: Config) -> tuple[dict[str, Any], str]: +def wrap_event( + event: Event, + config: Union[Config, CloudEventsConfig, None] = None, +) -> tuple[dict[str, Any], str]: """ Wrap an Event into a CNCF CloudEvents v1.0 JSON envelope. + Args: + event: The event to wrap + config: Either a full Config object, a CloudEventsConfig object, + or None to use defaults. + Returns: A tuple of (envelope_dict, msg_id) where msg_id is the CloudEvent id for use as Nats-Msg-Id header. """ + # Resolve CloudEventsConfig from various input types + if config is None: + ce_config = DEFAULT_CLOUDEVENTS_CONFIG + elif isinstance(config, CloudEventsConfig): + ce_config = config + else: + # It's a full Config object + ce_config = config.cloudevents + # Build CE type: {prefix}.{category}.v1 - ce_type = f"{config.cloudevents.type_prefix}.{event.category}.v1" + ce_type = f"{ce_config.type_prefix}.{event.category}.v1" # Serialize event data event_data = event.model_dump(mode="json") # Build extension attributes - lowercase, no underscores per CE spec extensions: dict[str, Any] = { - "centralschemaversion": config.cloudevents.schema_version, + "centralschemaversion": ce_config.schema_version, "centralcategory": event.category, } @@ -36,7 +54,7 @@ def wrap_event(event: Event, config: Config) -> tuple[dict[str, Any], str]: ce = CloudEvent( attributes={ "id": event.id, - "source": config.cloudevents.source, + "source": ce_config.source, "type": ce_type, "time": event.time.isoformat(), "datacontenttype": "application/json",