refactor(fire_digest): replace LLM renderer with deterministic output

Remove _build_prompt, _llm_render, _terse_fallback and all LLM backend
references. render_digest() now queries the fires table directly and
builds a structured multi-line wire: header with count, up to 5 fires
with acres/containment/anchor, and a +N more overflow line.

FireDigestScheduler no longer accepts or uses llm_backend. Updated the
pipeline __init__.py call site accordingly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson (via Claude) 2026-06-07 16:03:51 +00:00
commit f320333f5f
2 changed files with 68 additions and 135 deletions

View file

@ -239,7 +239,7 @@ async def start_pipeline(bus: EventBus, config) -> DigestScheduler:
disp = comps.get("dispatcher")
llm_backend = comps.get("llm_backend")
if disp is not None:
fd_sched = FireDigestScheduler(disp, llm_backend)
fd_sched = FireDigestScheduler(disp)
await fd_sched.start()
comps["fire_digest_scheduler"] = fd_sched
bus._pipeline_components = comps

View file

@ -1,15 +1,13 @@
"""v0.7-fire-tracker-4 fire-digest scheduled broadcaster.
Twice-daily (default 06:00 + 18:00 Mountain) summary of active fires +
the last 24 h of growth / spotting events, rendered by the LLM into a
terse mesh wire and broadcast via dispatcher.dispatch_scheduled_broadcast.
Twice-daily (default 06:00 + 18:00 Mountain) deterministic summary of
active fires, broadcast via dispatcher.dispatch_scheduled_broadcast.
Modeled after band_conditions.py (cf. v0.5.11 scheduled broadcaster).
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta, timezone
@ -47,102 +45,83 @@ def _slot_epoch(now_dt: datetime, hh_mm: str, tz_name: str) -> int:
# ===========================================================================
# Data + prompt
# Deterministic renderer
# ===========================================================================
def _gather_fire_context(now: int, *, window_h: int = 24):
"""Build the LLM-facing data block for the digest prompt."""
def _get_anchor(lat, lon) -> str:
"""Get location anchor for a fire using nearest_town from central_normalizer."""
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
return ""
try:
from meshai.central_normalizer import nearest_town
max_mi = float(adapter_config.wfigs.anchor_max_mi)
nt = nearest_town(lat, lon, max_distance_mi=max_mi)
except Exception:
return ""
if nt and nt.get("name"):
town = nt["name"].title()
d = nt.get("distance_mi")
bearing = nt.get("bearing")
if isinstance(d, (int, float)):
if d < 1:
return f"near {town}"
return f"{int(round(d))} mi {bearing or ''} of {town}".strip()
return f"near {town}"
return ""
async def render_digest(*, now: Optional[int] = None) -> tuple[str, str]:
"""Build the digest wire string deterministically.
Returns (wire, source). source is 'deterministic' on success,
'no_fires' if there are no active fires (wire is empty).
"""
from meshai.persistence import get_db
now = now if now is not None else int(time.time())
conn = get_db()
cutoff = now - window_h * 3600
fires = conn.execute(
"SELECT irwin_id, incident_name, current_acres, "
"current_contained_pct, lat, lon, state, county, last_pass_at "
"SELECT incident_name, current_acres, current_contained_pct, "
"lat, lon, county, state "
"FROM fires "
"WHERE tombstoned_at IS NULL "
"ORDER BY COALESCE(current_acres, 0) DESC LIMIT 20",
).fetchall()
if not fires:
return None
return "", "no_fires"
fire_blocks: list[str] = []
for f in fires:
passes = conn.execute(
"SELECT pass_id, drift_mi_from_prev, drift_direction, "
"drift_mi_per_hour FROM fire_passes "
"WHERE irwin_id=? AND pass_ended_at >= ? "
"ORDER BY pass_ended_at DESC LIMIT 4",
(f["irwin_id"], cutoff),
).fetchall()
growth_summary = "no recent passes"
if passes:
drifts = [
f"{p['drift_mi_from_prev']:.1f}mi {p['drift_direction']}"
for p in passes
if p["drift_mi_from_prev"] is not None
and p["drift_direction"] is not None
]
if drifts:
growth_summary = "drift " + ", ".join(drifts)
else:
growth_summary = f"{len(passes)} pass(es), no drift recorded"
n = len(fires)
lines: list[str] = []
lines.append(f"\U0001f525 Fire Digest \u2014 {n} active wildfire(s) in Idaho")
spot_count = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE source='firms' AND category LIKE 'wildfire_spotting%'",
).fetchone()[0]
for f in fires[:5]:
name = f["incident_name"] or "(unnamed)"
acres = f["current_acres"]
contained = f["current_contained_pct"]
anchor = (f"{f['county']}/{f['state']}"
if f["county"] and f["state"] else "ID")
fire_blocks.append(
f"- {f['incident_name'] or '(unnamed)'} "
f"({f['current_acres'] or 0:.0f} ac, "
f"{f['current_contained_pct'] or 0}% contained, {anchor}); "
f"{growth_summary}"
)
if not fire_blocks:
return None
return "\n".join(fire_blocks)
acres_str = f"{int(acres):,} ac" if acres and acres > 0 else "size unknown"
contained_str = (f"{int(contained)}% contained"
if contained is not None else "containment unknown")
anchor = _get_anchor(f["lat"], f["lon"])
if not anchor:
county = f["county"]
state = f["state"]
if county and state:
anchor = f"{county} Co {state}"
def _build_prompt(context_block: str, *, max_chars: int) -> str:
return (
f"You are a wildfire radio dispatcher writing a single-message "
f"summary for mesh-radio operators in Idaho. You have data on "
f"{context_block.count(chr(10)) + 1} active fires. "
f"Write ONE message of <= {max_chars} characters that names the "
f"top fires, includes any movement direction/speed, and notes "
f"any spotting or possible new fires. Be terse: this is "
f"bandwidth-constrained mesh radio. No markdown, no bullet "
f"points, no greeting, no sign-off. Plain text only.\n\n"
f"DATA:\n{context_block}"
)
parts = [f"{name}: {acres_str}, {contained_str}"]
if anchor:
parts[0] += f", {anchor}"
lines.append(parts[0])
if n > 5:
lines.append(f"+ {n - 5} more")
def _terse_fallback(context_block: str, *, max_chars: int) -> str:
"""Used when the LLM call fails or no LLM is configured."""
lines = context_block.splitlines()
fires_n = len(lines)
head = f"Fires today ({fires_n}): "
body_parts: list[str] = []
for line in lines[:3]:
# line shape: "- <name> (<acres> ac, <pct>% contained, anchor); ..."
if line.startswith("- "):
line = line[2:]
# Trim to "<name> <acres>ac"
head_part = line.split("(", 1)
if len(head_part) == 2:
name = head_part[0].strip()
rest = head_part[1].split(",", 1)[0]
body_parts.append(f"{name} {rest}")
else:
body_parts.append(line[:40])
body = "; ".join(body_parts)
if fires_n > 3:
body += f"; +{fires_n - 3} more"
out = head + body
return out[:max_chars]
return "\n".join(lines), "deterministic"
# ===========================================================================
@ -169,50 +148,6 @@ def _record_slot_attempt(slot_epoch_s: int, *,
return int(cur.lastrowid) if cur.rowcount > 0 else None
async def _llm_render(prompt: str, llm_backend, *, max_chars: int) -> Optional[str]:
"""Call the LLM backend and return the trimmed/cleaned wire string."""
if llm_backend is None:
return None
try:
text = await llm_backend.generate(
messages=[{"role": "user", "content": prompt}],
system_prompt="",
max_tokens=512,
)
except Exception:
logger.exception("fire_digest: LLM call failed")
return None
if not text:
return None
# Strip markdown the LLM may have added even though prompted not to.
try:
from meshai.chunker import strip_markdown
text = strip_markdown(text)
except Exception:
pass
# Replace newlines with spaces (a digest is a single-line wire).
text = " ".join(text.split())
return text[:max_chars]
async def render_digest(*, llm_backend, now: Optional[int] = None,
max_chars: Optional[int] = None) -> tuple[str, str]:
"""Build the digest wire string. Returns (wire, source). source is
'llm' on success, 'fallback_terse' on LLM failure, 'no_fires' if
there are no active fires (wire is empty in that case)."""
now = now if now is not None else int(time.time())
cap = (max_chars if max_chars is not None
else int(adapter_config.fires.digest_max_chars))
ctx = _gather_fire_context(now)
if ctx is None:
return "", "no_fires"
prompt = _build_prompt(ctx, max_chars=cap)
wire = await _llm_render(prompt, llm_backend, max_chars=cap)
if wire:
return wire, "llm"
return _terse_fallback(ctx, max_chars=cap), "fallback_terse"
# ===========================================================================
# Scheduler
# ===========================================================================
@ -221,11 +156,10 @@ async def render_digest(*, llm_backend, now: Optional[int] = None,
class FireDigestScheduler:
"""Fires fire-digest broadcasts at configured local times."""
def __init__(self, dispatcher, llm_backend, *,
def __init__(self, dispatcher, *,
clock: Optional[Callable[[], float]] = None,
sleep: Optional[Callable[[float], Any]] = None):
self._dispatcher = dispatcher
self._llm = llm_backend
self._clock = clock or time.time
self._sleep = sleep or asyncio.sleep
self._task: Optional[asyncio.Task] = None
@ -300,8 +234,7 @@ class FireDigestScheduler:
async def fire_slot(self, slot_epoch_s: int, hh_mm: str) -> bool:
"""Build + broadcast for the given slot. Returns True on broadcast."""
wire, source = await render_digest(llm_backend=self._llm,
now=int(self._clock()))
wire, source = await render_digest(now=int(self._clock()))
if source == "no_fires":
self._logger.info(
"fire-digest: silent skip for %s (no active fires)", hh_mm)