feat(wfigs): multi-line renderer with delta/bold logic and new fire fields

Normalizer: add fire_cause, agency, personnel, unique_fire_id from
WFIGS raw payload to the normalized incident dict.

Renderer: replace single-line wire format with structured multi-line
output — header, size/contained with bold deltas on updates, location
anchor, cause/discovered date, and unique fire ID. Update call sites
pass last_bcast_acres and last_bcast_contained for case-(iii) updates
to enable delta calculation and selective bolding.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson (via Claude) 2026-06-07 07:36:14 +00:00
commit ebb34b75ef
2 changed files with 75 additions and 24 deletions

View file

@ -214,7 +214,9 @@ def handle_wfigs(normalized: dict, envelope: dict, subject: str,
) )
if (changed_acres or changed_contained) and eight_hours_passed: if (changed_acres or changed_contained) and eight_hours_passed:
wire = _render(normalized, prefix="Update") wire = _render(normalized, prefix="Update",
last_bcast_acres=last_bcast_acres,
last_bcast_contained=last_bcast_contained)
# v0.6-3c: severity override for fire updates # v0.6-3c: severity override for fire updates
if isinstance(data, dict): if isinstance(data, dict):
data["_severity_override"] = "immediate" if (acres and acres > 1000) or contained_pct == 0 else "priority" data["_severity_override"] = "immediate" if (acres and acres > 1000) or contained_pct == 0 else "priority"
@ -318,32 +320,76 @@ def _log_event_returning_id(conn, *, now, source, category, severity_word,
# ---------- renderer ------------------------------------------------------ # ---------- renderer ------------------------------------------------------
def _render(n: dict, *, prefix: str = "") -> str: def _render(n: dict, *, prefix: str = "",
"""MEDIUM-style mesh wire string. See spec in module docstring.""" last_bcast_acres=None, last_bcast_contained=None,
movement=None) -> str:
"""MEDIUM-style mesh wire string with delta/bold logic for updates."""
import datetime as _dt
name = n.get("incident_name") or "(unnamed)" name = n.get("incident_name") or "(unnamed)"
itype = n.get("incident_type") or "incident"
lat = n.get("lat")
lon = n.get("lon")
anchor = _location_anchor(n)
acres = n.get("acres") acres = n.get("acres")
contained = n.get("contained_pct") contained_pct = n.get("contained_pct")
cause = n.get("fire_cause")
unique_fire_id = n.get("unique_fire_id")
declared_at_epoch = n.get("declared_at_epoch")
anchor = _location_anchor(n)
acres_str = "N/A" if acres is None else f"{int(acres):,} ac" lines: list[str] = []
contained_str = (
"containment unknown" if contained is None
else f"{int(contained)}% contained"
)
prefix_str = f"{prefix}: " if prefix else "" # Line 1: header
head = f"🔥 {prefix_str}{name} ({itype}), {anchor}" lines.append(f"🔥 {name} \u2014 {prefix}")
body = f"{acres_str}, {contained_str}"
coords = "" # Line 2: size / contained with delta + bold
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): acres_str = f"{int(acres):,} ac" if acres is not None else "size unknown"
coords = f", @ {lat:.3f},{lon:.3f}" delta_str = ""
if prefix == "Update" and last_bcast_acres is not None and acres is not None and acres > last_bcast_acres:
delta_str = f" (+{int(acres - last_bcast_acres):,})"
contained_str = f"{int(contained_pct)}% contained" if contained_pct is not None else "containment unknown"
return f"{head}: {body}{coords}" acres_changed = (prefix == "Update" and last_bcast_acres is not None
and acres is not None and acres > last_bcast_acres)
contained_changed = (prefix == "Update" and last_bcast_contained is not None
and contained_pct is not None and contained_pct > last_bcast_contained)
if acres_changed and contained_changed:
size_line = f"**{acres_str}{delta_str} | {contained_str}**"
elif acres_changed:
size_line = f"**{acres_str}{delta_str}** | {contained_str}"
elif contained_changed:
size_line = f"{acres_str} | **{contained_str}**"
else:
size_line = f"{acres_str} | {contained_str}"
lines.append(size_line)
# Line 3: movement or plain anchor
if (isinstance(movement, dict)
and movement.get("direction") and movement.get("speed_mph") is not None):
lines.append(f"**Moving {movement['direction']} {movement['speed_mph']:.1f} mi/h | Near: {anchor}**")
else:
lines.append(f"Near: {anchor}")
# Line 4: cause / discovered
cause_part = cause if cause else None
disc_part = None
if declared_at_epoch is not None:
try:
dt = _dt.datetime.fromtimestamp(declared_at_epoch,
tz=_dt.timezone(_dt.timedelta(hours=-6)))
disc_part = dt.strftime("%b %d %-I:%M %p")
except Exception:
pass
if cause_part and disc_part:
lines.append(f"Cause: {cause_part} | Discovered: {disc_part}")
elif cause_part:
lines.append(f"Cause: {cause_part}")
elif disc_part:
lines.append(f"Discovered: {disc_part}")
# Line 5: unique fire ID
if unique_fire_id:
lines.append(f"ID: {unique_fire_id}")
return "\n".join(lines)
def _location_anchor(n: dict) -> str: def _location_anchor(n: dict) -> str:

View file

@ -696,6 +696,7 @@ def _parse_wfigs_incidents(inner_data: dict, geo: dict) -> dict:
# Geocoder-side anchor enrichment for the renderer. # Geocoder-side anchor enrichment for the renderer.
city = geocoder.get("city") city = geocoder.get("city")
raw = inner_data.get("raw") or {}
return { return {
"irwin_id": irwin_id, "irwin_id": irwin_id,
@ -710,6 +711,10 @@ def _parse_wfigs_incidents(inner_data: dict, geo: dict) -> dict:
"landclass": landclass, "landclass": landclass,
"geocoder_city": city, "geocoder_city": city,
"declared_at_epoch": declared_at_epoch, "declared_at_epoch": declared_at_epoch,
"fire_cause": raw.get("FireCause"),
"agency": raw.get("POOJurisdictionalAgency"),
"personnel": raw.get("TotalIncidentPersonnel"),
"unique_fire_id": raw.get("UniqueFireIdentifier"),
} }