foundation: models, adapter ABC, config, CE wire, schema

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-15 21:06:02 +00:00 committed by Ubuntu
commit 714971fe99
9 changed files with 1051 additions and 0 deletions

35
src/central/adapter.py Normal file
View file

@ -0,0 +1,35 @@
"""Base adapter interface for event sources."""
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from central.models import Event
class SourceAdapter(ABC):
"""
Abstract base class for event source adapters.
Adapters yield Events. The supervisor handles scheduling,
CloudEvents wrapping, publish, and metadata heartbeats.
"""
name: str # short identifier, e.g. "nws"
cadence_s: int # seconds between poll() calls
@abstractmethod
async def poll(self) -> AsyncIterator[Event]:
"""
Poll the source for new events.
Yields Event objects for each new/updated event found.
"""
...
async def startup(self) -> None:
"""Optional lifecycle hook called before first poll."""
pass
async def shutdown(self) -> None:
"""Optional lifecycle hook called on graceful shutdown."""
pass

View file

@ -0,0 +1,52 @@
"""CloudEvents wire format helpers."""
from typing import Any
from cloudevents.v1.http import CloudEvent
from central.config import Config
from central.models import Event
def wrap_event(event: Event, config: Config) -> tuple[dict[str, Any], str]:
"""
Wrap an Event into a CNCF CloudEvents v1.0 JSON envelope.
Returns:
A tuple of (envelope_dict, msg_id) where msg_id is the
CloudEvent id for use as Nats-Msg-Id header.
"""
# Build CE type: {prefix}.{category}.v1
ce_type = f"{config.cloudevents.type_prefix}.{event.category}.v1"
# Serialize event data
event_data = event.model_dump(mode="json")
# Build extension attributes - lowercase, no underscores per CE spec
extensions: dict[str, Any] = {
"hubschemaversion": config.cloudevents.schema_version,
"hubcategory": event.category,
}
# Only include hubseverity if severity is present
if event.severity is not None:
extensions["hubseverity"] = event.severity
# Create CloudEvent
ce = CloudEvent(
attributes={
"id": event.id,
"source": config.cloudevents.source,
"type": ce_type,
"time": event.time.isoformat(),
"datacontenttype": "application/json",
**extensions,
},
data=event_data,
)
# Build envelope dict from CloudEvent
envelope: dict[str, Any] = dict(ce.get_attributes())
envelope["data"] = ce.data
return envelope, event.id

111
src/central/config.py Normal file
View file

@ -0,0 +1,111 @@
"""Configuration loader for Central."""
import tomllib
from pathlib import Path
from typing import Any
from pydantic import BaseModel, ConfigDict, field_validator
class NWSAdapterConfig(BaseModel):
"""NWS adapter configuration."""
model_config = ConfigDict(extra="forbid")
enabled: bool = True
cadence_s: int = 60
states: list[str] = []
contact_email: str
@field_validator("contact_email")
@classmethod
def validate_contact_email(cls, v: str) -> str:
if "CHANGE_ME" in v or v.endswith("example.invalid"):
raise ValueError(
f"adapters.nws.contact_email is still a placeholder: {v!r}"
)
return v
class CloudEventsConfig(BaseModel):
"""CloudEvents envelope configuration."""
model_config = ConfigDict(extra="forbid")
type_prefix: str = "central"
source: str = "central"
schema_version: str = "1.0"
class NATSConfig(BaseModel):
"""NATS connection configuration."""
model_config = ConfigDict(extra="forbid")
url: str = "nats://localhost:4222"
class PostgresConfig(BaseModel):
"""PostgreSQL connection configuration."""
model_config = ConfigDict(extra="forbid")
dsn: str
@field_validator("dsn")
@classmethod
def validate_dsn(cls, v: str) -> str:
if "CHANGE_ME" in v:
raise ValueError(f"postgres.dsn contains placeholder: {v!r}")
return v
class Config(BaseModel):
"""Root configuration model."""
model_config = ConfigDict(extra="forbid")
adapters: dict[str, NWSAdapterConfig]
cloudevents: CloudEventsConfig
nats: NATSConfig
postgres: PostgresConfig
def _check_placeholders(data: dict[str, Any], path: str = "") -> None:
"""Recursively check for CHANGE_ME placeholders in config data."""
for key, value in data.items():
current_path = f"{path}.{key}" if path else key
if isinstance(value, str) and "CHANGE_ME" in value:
raise ValueError(
f"Configuration field {current_path!r} contains placeholder: {value!r}"
)
elif isinstance(value, dict):
_check_placeholders(value, current_path)
def load_config(path: str = "/etc/central/central.toml") -> Config:
"""
Load and validate configuration from TOML file.
Raises ValueError if any field contains CHANGE_ME placeholder.
"""
config_path = Path(path)
with config_path.open("rb") as f:
data = tomllib.load(f)
# Check for any remaining placeholders before Pydantic validation
_check_placeholders(data)
# Parse adapters section
adapters_raw = data.get("adapters", {})
adapters = {}
for name, adapter_data in adapters_raw.items():
adapters[name] = NWSAdapterConfig(**adapter_data)
return Config(
adapters=adapters,
cloudevents=CloudEventsConfig(**data.get("cloudevents", {})),
nats=NATSConfig(**data.get("nats", {})),
postgres=PostgresConfig(**data.get("postgres", {})),
)

68
src/central/models.py Normal file
View file

@ -0,0 +1,68 @@
"""Data models for Central event processing."""
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict
class Geo(BaseModel):
"""Geographic context for an event."""
model_config = ConfigDict(extra="forbid", frozen=True)
centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order
bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat)
regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...]
primary_region: str | None = None # alphabetically first region, used for subject
class Event(BaseModel):
"""Canonical event representation for all adapters."""
model_config = ConfigDict(extra="forbid", frozen=True)
id: str # unique, stable across republish
source: str # adapter identity, e.g. "central/adapters/nws"
category: str # e.g. "wx.alert.severe_thunderstorm_warning"
time: datetime # event-time UTC, not processing-time
expires: datetime | None = None
severity: int | None = None # 0..4 or None for "Unknown"
geo: Geo
data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event, prefix: str = "central.wx") -> str:
"""
Compute the NATS subject for an alert-style event.
For weather alerts the subject is:
central.wx.alert.us.<state_lower>.county.<county_lower>
or
central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone.
If primary_region is None or unparseable, returns:
central.wx.alert.us.unknown
"""
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = ev.geo.primary_region
# Parse US-<STATE>-<CODE> format
# County codes are like "Ada", "Canyon" (names)
# Zone codes start with "Z" like "Z033"
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"