central/src/central/models.py
Ubuntu 8601a19f60 feat(schema): add adapter column to events, drop source
Replaces module-path-based source column (e.g. "central/adapters/nws")
with stable adapter identifier (e.g. "nws") that foreign-keys to
config.adapters.name.

Migration 011:
- ADD COLUMN adapter TEXT
- Backfill via REPLACE(source, 'central/adapters/', '')
- SET NOT NULL + FK RESTRICT
- CREATE INDEX (adapter, received DESC) for dashboard queries
- DROP COLUMN source

Code changes:
- Event model: source field renamed to adapter
- All adapters: use adapter="name" instead of source="central/adapters/name"
- Archive: write adapter column instead of source

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-17 16:09:59 +00:00

79 lines
2.6 KiB
Python

"""Data models for Central event processing."""
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict
class Geo(BaseModel):
"""Geographic context for an event."""
model_config = ConfigDict(extra="forbid", frozen=True)
centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order
bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat)
regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...]
primary_region: str | None = None # alphabetically first region, used for subject
class Event(BaseModel):
"""Canonical event representation for all adapters."""
model_config = ConfigDict(extra="forbid", frozen=True)
id: str # unique, stable across republish
adapter: str # adapter identity, e.g. "nws"
category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
time: datetime # event-time UTC, not processing-time
expires: datetime | None = None
severity: int | None = None # 0..4 or None for "Unknown"
geo: Geo
data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event) -> str:
"""
Compute the NATS subject for an event based on its category.
Dispatch by category prefix:
- fire.*: returns central.<category> directly
- wx.*: uses weather alert subject logic
Weather alert subjects:
central.wx.alert.us.<state_lower>.county.<county_lower>
or
central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone.
Fire hotspot subjects:
central.fire.hotspot.<satellite>.<confidence>
"""
# Fire events: subject is just central.<category>
if ev.category.startswith("fire."):
return f"central.{ev.category}"
# Weather events: use geo-based subject logic
prefix = "central.wx"
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = ev.geo.primary_region
# Parse US-<STATE>-<CODE> format
# County codes are like "Ada", "Canyon" (names)
# Zone codes start with "Z" like "Z033"
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"