feat(models): add quake event subject routing

Update subject_for_event to handle quake.* category events.
Subject format: central.quake.event.<magnitude_tier>

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-16 20:51:41 +00:00
commit 668027b442

View file

@ -1,79 +1,87 @@
"""Data models for Central event processing.""" """Data models for Central event processing."""
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
class Geo(BaseModel): class Geo(BaseModel):
"""Geographic context for an event.""" """Geographic context for an event."""
model_config = ConfigDict(extra="forbid", frozen=True) model_config = ConfigDict(extra="forbid", frozen=True)
centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order
bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat)
regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...]
primary_region: str | None = None # alphabetically first region, used for subject primary_region: str | None = None # alphabetically first region, used for subject
class Event(BaseModel): class Event(BaseModel):
"""Canonical event representation for all adapters.""" """Canonical event representation for all adapters."""
model_config = ConfigDict(extra="forbid", frozen=True) model_config = ConfigDict(extra="forbid", frozen=True)
id: str # unique, stable across republish id: str # unique, stable across republish
source: str # adapter identity, e.g. "central/adapters/nws" source: str # adapter identity, e.g. "central/adapters/nws"
category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high" category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
time: datetime # event-time UTC, not processing-time time: datetime # event-time UTC, not processing-time
expires: datetime | None = None expires: datetime | None = None
severity: int | None = None # 0..4 or None for "Unknown" severity: int | None = None # 0..4 or None for "Unknown"
geo: Geo geo: Geo
data: dict[str, Any] # adapter-specific payload data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event) -> str: def subject_for_event(ev: Event) -> str:
""" """
Compute the NATS subject for an event based on its category. Compute the NATS subject for an event based on its category.
Dispatch by category prefix: Dispatch by category prefix:
- fire.*: returns central.<category> directly - fire.*: returns central.<category> directly
- wx.*: uses weather alert subject logic - quake.*: returns central.<category> directly
- wx.*: uses weather alert subject logic
Weather alert subjects:
central.wx.alert.us.<state_lower>.county.<county_lower> Weather alert subjects:
or central.wx.alert.us.<state_lower>.county.<county_lower>
central.wx.alert.us.<state_lower>.zone.<zone_lower> or
based on whether the primary_region encodes a county or a zone. central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone.
Fire hotspot subjects:
central.fire.hotspot.<satellite>.<confidence> Fire hotspot subjects:
""" central.fire.hotspot.<satellite>.<confidence>
# Fire events: subject is just central.<category>
if ev.category.startswith("fire."): Quake event subjects:
return f"central.{ev.category}" central.quake.event.<magnitude_tier>
"""
# Weather events: use geo-based subject logic # Fire events: subject is just central.<category>
prefix = "central.wx" if ev.category.startswith("fire."):
return f"central.{ev.category}"
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown" # Quake events: subject is just central.<category>
if ev.category.startswith("quake."):
region = ev.geo.primary_region return f"central.{ev.category}"
# Parse US-<STATE>-<CODE> format # Weather events: use geo-based subject logic
# County codes are like "Ada", "Canyon" (names) prefix = "central.wx"
# Zone codes start with "Z" like "Z033"
parts = region.split("-") if ev.geo.primary_region is None:
if len(parts) < 3 or parts[0] != "US": return f"{prefix}.alert.us.unknown"
return f"{prefix}.alert.us.unknown"
region = ev.geo.primary_region
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" # Parse US-<STATE>-<CODE> format
# County codes are like "Ada", "Canyon" (names)
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): # Zone codes start with "Z" like "Z033"
# Zone code like Z033 parts = region.split("-")
return f"{prefix}.alert.us.{state}.zone.{code.lower()}" if len(parts) < 3 or parts[0] != "US":
else: return f"{prefix}.alert.us.unknown"
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}" state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"