feat(models): add fire event subject routing

Update subject_for_event to handle fire.* category events:
- Fire events: central.<category> (e.g., central.fire.hotspot.viirs_snpp.high)
- Weather events: existing geo-based subject logic

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-16 19:58:37 +00:00
commit a007418e0a

View file

@ -1,68 +1,79 @@
"""Data models for Central event processing.""" """Data models for Central event processing."""
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
class Geo(BaseModel): class Geo(BaseModel):
"""Geographic context for an event.""" """Geographic context for an event."""
model_config = ConfigDict(extra="forbid", frozen=True) model_config = ConfigDict(extra="forbid", frozen=True)
centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order centroid: tuple[float, float] | None = None # (lon, lat) GeoJSON order
bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat) bbox: tuple[float, float, float, float] | None = None # (minLon, minLat, maxLon, maxLat)
regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...] regions: list[str] = [] # ["US-ID-Ada", "US-ID-Z033", ...]
primary_region: str | None = None # alphabetically first region, used for subject primary_region: str | None = None # alphabetically first region, used for subject
class Event(BaseModel): class Event(BaseModel):
"""Canonical event representation for all adapters.""" """Canonical event representation for all adapters."""
model_config = ConfigDict(extra="forbid", frozen=True) model_config = ConfigDict(extra="forbid", frozen=True)
id: str # unique, stable across republish id: str # unique, stable across republish
source: str # adapter identity, e.g. "central/adapters/nws" source: str # adapter identity, e.g. "central/adapters/nws"
category: str # e.g. "wx.alert.severe_thunderstorm_warning" category: str # e.g. "wx.alert.severe_thunderstorm_warning" or "fire.hotspot.viirs_snpp.high"
time: datetime # event-time UTC, not processing-time time: datetime # event-time UTC, not processing-time
expires: datetime | None = None expires: datetime | None = None
severity: int | None = None # 0..4 or None for "Unknown" severity: int | None = None # 0..4 or None for "Unknown"
geo: Geo geo: Geo
data: dict[str, Any] # adapter-specific payload data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event, prefix: str = "central.wx") -> str: def subject_for_event(ev: Event) -> str:
""" """
Compute the NATS subject for an alert-style event. Compute the NATS subject for an event based on its category.
For weather alerts the subject is: Dispatch by category prefix:
central.wx.alert.us.<state_lower>.county.<county_lower> - fire.*: returns central.<category> directly
or - wx.*: uses weather alert subject logic
central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone. Weather alert subjects:
central.wx.alert.us.<state_lower>.county.<county_lower>
If primary_region is None or unparseable, returns: or
central.wx.alert.us.unknown central.wx.alert.us.<state_lower>.zone.<zone_lower>
""" based on whether the primary_region encodes a county or a zone.
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown" Fire hotspot subjects:
central.fire.hotspot.<satellite>.<confidence>
region = ev.geo.primary_region """
# Fire events: subject is just central.<category>
# Parse US-<STATE>-<CODE> format if ev.category.startswith("fire."):
# County codes are like "Ada", "Canyon" (names) return f"central.{ev.category}"
# Zone codes start with "Z" like "Z033"
parts = region.split("-") # Weather events: use geo-based subject logic
if len(parts) < 3 or parts[0] != "US": prefix = "central.wx"
return f"{prefix}.alert.us.unknown"
if ev.geo.primary_region is None:
state = parts[1].lower() return f"{prefix}.alert.us.unknown"
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
region = ev.geo.primary_region
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033 # Parse US-<STATE>-<CODE> format
return f"{prefix}.alert.us.{state}.zone.{code.lower()}" # County codes are like "Ada", "Canyon" (names)
else: # Zone codes start with "Z" like "Z033"
# County name parts = region.split("-")
return f"{prefix}.alert.us.{state}.county.{code.lower()}" if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"