nws: rewrite _render() to 4-line format with SAME-branched hazard and motion

New wire format:
  L1: emoji + event type (no office name)
  L2: NWSheadline (title-cased, 80 chars) or "{event} for {area}" fallback
  L3: SAME-code-branched hazard + certainty/threat:
      TOR: on-ground/radar + damage threat
      SVR: wind/hail + radar confirmed/indicated
      FFW/FLW: hazard sentence + inferred flood cause
      Others: hazard sentence + certainty if Observed/Likely
  L4: motion (compass + mph from eventMotionDescription) + locations

Drop expires, area/county, and impact lines. Add _parse_motion() helper
for eventMotionDescription (knots -> mph conversion). Add "locations"
pattern to _parse_nws_description(). Update test to remove expires check.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson (via Claude) 2026-06-08 07:05:22 +00:00
commit a4ecd05c60
2 changed files with 92 additions and 33 deletions

View file

@ -96,6 +96,7 @@ def _parse_nws_description(description: str) -> dict:
"impact": r"IMPACT\.\.\.(.*?)(?=\n\n|\nLocations|$)",
"tornado": r"TORNADO\.\.\.(.*?)(?=\n\n|\n[A-Z]+\.\.\.|$)",
"tornado_threat": r"TORNADO DAMAGE THREAT\.\.\.(.*?)(?=\n\n|\n[A-Z]+\.\.\.|$)",
"locations": r"Locations impacted include[.…]*\s*(.*?)(?=\n\n|$)",
}
for key, pattern in patterns.items():
m = re.search(pattern, description or "", re.DOTALL | re.IGNORECASE)
@ -106,6 +107,26 @@ def _parse_nws_description(description: str) -> dict:
return result
def _parse_motion(params: dict) -> tuple:
"""Parse eventMotionDescription into (compass, speed_mph).
Format: '...DEG...KT' e.g. '254DEG...35KT'
Returns (compass_str, speed_mph_int) or (None, None)."""
raw = (params.get("eventMotionDescription") or [""])[0]
if not raw:
return None, None
m = re.search(r"(\d+)DEG\.+(\d+)KT", raw)
if not m:
return None, None
deg = int(m.group(1))
knots = int(m.group(2))
mph = round(knots * 1.15)
# Convert bearing (direction storm is coming FROM) to heading (direction moving TOWARD)
heading = (deg + 180) % 360
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
compass = dirs[round(heading / 45) % 8]
return compass, mph
def _now() -> int: return int(time.time())
@ -284,49 +305,88 @@ def _render(*, event_type, area_desc, geocoder_city, county, state,
expires_epoch, lat, lon, now, prefix: str = "", d: dict = None) -> str:
d = d or {}
params = d.get("parameters") or {}
# Emoji: prefer SAME event code, fall back to event_type substring match
same_code = ((d.get("eventCode") or {}).get("SAME") or [""])[0]
emoji = _SAME_EMOJI.get(same_code) or _emoji_for_event(event_type)
# Office
office = _nws_office(params)
office_seg = f" — NWS {office}" if office else ""
prefix_seg = f"{prefix}: " if prefix else ""
# Line 1
line1 = f"{emoji} {prefix_seg}{event_type or 'Weather Alert'}{office_seg}"
# Line 2: first areaDesc segment only, max 60 chars
area = area_desc or county or ""
area = area.split(";")[0].strip()
if len(area) > 60:
area = area[:57] + "..."
line2 = area
# Parse description
desc = _parse_nws_description(d.get("description") or "")
# Line 3: hazard
hazard = desc.get("tornado") or desc.get("hazard") or ""
if not hazard:
# SAME code drives emoji and line-3 branching
same_code = ((d.get("eventCode") or {}).get("SAME") or [""])[0]
emoji = _SAME_EMOJI.get(same_code) or _emoji_for_event(event_type)
prefix_seg = f"{prefix}: " if prefix else ""
# Line 1: emoji + event type (no office)
line1 = f"{emoji} {prefix_seg}{event_type or 'Weather Alert'}"
# Line 2: NWSheadline (title-cased, 80 chars) or fallback
nws_hl = (params.get("NWSheadline") or [""])[0].strip()
if nws_hl:
nws_hl = nws_hl.title()
if len(nws_hl) > 80:
nws_hl = nws_hl[:77] + "..."
line2 = nws_hl
else:
area_first = (area_desc or "").split(";")[0].strip()
line2 = f"{event_type or 'Weather Alert'} for {area_first}" if area_first else ""
# Line 3: hazard + certainty/threat (SAME-code branched)
certainty = (d.get("certainty") or "").strip()
line3 = ""
if same_code == "TOR":
detection = (params.get("tornadoDetection") or [""])[0]
status = "On ground" if detection == "OBSERVED" else "Radar indicated"
threat = (params.get("tornadoDamageThreat") or [""])[0]
threat_seg = f" | {threat.title()} damage threat" if threat else ""
line3 = f"{status}{threat_seg}"
elif same_code == "SVR":
wind = (params.get("maxWindGust") or [""])[0]
hail = (params.get("maxHailSize") or [""])[0]
bits = []
if wind and wind not in ("0 MPH", ""): bits.append(f"{wind} winds")
if hail and hail not in ("0.00", "0", ""): bits.append(f"{hail} in hail")
hazard = ". ".join(bits)
line3 = hazard
hazard = ", ".join(bits)
confirm = "Radar confirmed" if certainty == "Observed" else "Radar indicated"
line3 = f"{hazard} | {confirm}" if hazard else confirm
elif same_code in ("FFW", "FLW"):
hazard_text = desc.get("hazard") or ""
# First sentence only
if ". " in hazard_text:
hazard_text = hazard_text.split(". ")[0]
# Infer flood cause from description
desc_lower = (d.get("description") or "").lower()
flood_cause = ""
for keyword, label in [("thunderstorm", "Thunderstorms"),
("dam", "Dam failure"),
("snowmelt", "Snowmelt"),
("ice jam", "Ice jam")]:
if keyword in desc_lower:
flood_cause = label
break
cause_seg = f" | {flood_cause}" if flood_cause else ""
line3 = f"{hazard_text}{cause_seg}" if hazard_text else flood_cause
else:
# SPS, WSW, etc.: first hazard sentence + certainty if Observed/Likely
hazard_text = desc.get("hazard") or ""
if ". " in hazard_text:
hazard_text = hazard_text.split(". ")[0]
cert_seg = ""
if certainty in ("Observed", "Likely"):
cert_seg = f" | {certainty}"
line3 = f"{hazard_text}{cert_seg}" if hazard_text else ""
# Line 4: impact
impact = desc.get("impact") or ""
line4 = impact
# Line 4: motion + locations
compass, speed_mph = _parse_motion(params)
motion = f"Moving {compass} {speed_mph} mph" if compass and speed_mph else ""
locations = desc.get("locations") or ""
if len(locations) > 40:
locations = locations[:37] + "..."
if motion and locations:
line4 = f"{motion}{locations}"
elif motion:
line4 = motion
elif locations:
line4 = locations
else:
line4 = ""
# Line 5: expires
expires_seg = _format_expires_short(expires_epoch, now=now) if expires_epoch else ""
line5 = f"Expires: {expires_seg}" if expires_seg else ""
lines = [l for l in (line1, line2, line3, line4, line5) if l]
lines = [l for l in (line1, line2, line3, line4) if l]
return "\n".join(lines)

View file

@ -194,9 +194,8 @@ def test_commit_callback_updates_last_broadcast(mem_db):
assert el["handled"] == 1
def test_wire_includes_event_area_and_expires(mem_db):
def test_wire_includes_event_and_headline(mem_db):
env = _nws_env(severity_str="Severe", lat=42.500, lon=-114.460)
wire = handle_nws(env, env["subject"], data={}, now=1_000_000)
assert "Severe Thunderstorm Warning" in wire
assert "Twin Falls County" in wire
assert "until" in wire.lower()