refactor: move CloudEvents config to code constants

CloudEvents envelope format is protocol-level (not operator config).
When using DB config source without TOML, wrap_event() now uses
DEFAULT_CLOUDEVENTS_CONFIG from cloudevents_constants.py.

Changes:
- Add cloudevents_constants.py with DEFAULT_CLOUDEVENTS_CONFIG
- Update wrap_event() to accept Config, CloudEventsConfig, or None
- Simplify supervisor: always use wrap_event (has defaults)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-16 02:38:11 +00:00
commit b16151abf1
2 changed files with 41 additions and 6 deletions

View file

@ -0,0 +1,17 @@
"""CloudEvents configuration constants.
These defaults are used when no TOML configuration file is available
(e.g., when using database-backed configuration source).
These values are baked into the code because CloudEvents envelope format
is part of the Central protocol contract, not operator-configurable.
"""
from central.config import CloudEventsConfig
# Default CloudEvents configuration - used when TOML is unavailable
DEFAULT_CLOUDEVENTS_CONFIG = CloudEventsConfig(
type_prefix="central",
source="central.echo6.co",
schema_version="1.0",
)

View file

@ -1,30 +1,48 @@
"""CloudEvents wire format helpers."""
from typing import Any
from typing import Any, Union
from cloudevents.v1.http import CloudEvent
from central.config import Config
from central.config import Config, CloudEventsConfig
from central.cloudevents_constants import DEFAULT_CLOUDEVENTS_CONFIG
from central.models import Event
def wrap_event(event: Event, config: Config) -> tuple[dict[str, Any], str]:
def wrap_event(
event: Event,
config: Union[Config, CloudEventsConfig, None] = None,
) -> tuple[dict[str, Any], str]:
"""
Wrap an Event into a CNCF CloudEvents v1.0 JSON envelope.
Args:
event: The event to wrap
config: Either a full Config object, a CloudEventsConfig object,
or None to use defaults.
Returns:
A tuple of (envelope_dict, msg_id) where msg_id is the
CloudEvent id for use as Nats-Msg-Id header.
"""
# Resolve CloudEventsConfig from various input types
if config is None:
ce_config = DEFAULT_CLOUDEVENTS_CONFIG
elif isinstance(config, CloudEventsConfig):
ce_config = config
else:
# It's a full Config object
ce_config = config.cloudevents
# Build CE type: {prefix}.{category}.v1
ce_type = f"{config.cloudevents.type_prefix}.{event.category}.v1"
ce_type = f"{ce_config.type_prefix}.{event.category}.v1"
# Serialize event data
event_data = event.model_dump(mode="json")
# Build extension attributes - lowercase, no underscores per CE spec
extensions: dict[str, Any] = {
"centralschemaversion": config.cloudevents.schema_version,
"centralschemaversion": ce_config.schema_version,
"centralcategory": event.category,
}
@ -36,7 +54,7 @@ def wrap_event(event: Event, config: Config) -> tuple[dict[str, Any], str]:
ce = CloudEvent(
attributes={
"id": event.id,
"source": config.cloudevents.source,
"source": ce_config.source,
"type": ce_type,
"time": event.time.isoformat(),
"datacontenttype": "application/json",